feat: Parallelized store discovery, modification tracking, and task deduplication

Store Discovery Parallelization:
- Add store_discovery_state handler for per-state parallel discovery
- Add POST /api/tasks/batch/store-discovery endpoint
- 8 workers can now process states in parallel (~30-45 min vs 3+ hours)

Modification Tracking (Migration 090):
- Add last_modified_at, last_modified_by_task, last_modified_task_id to dispensaries
- Add same columns to store_products
- Update all handlers to set tracking info on modifications

Stale Task Recovery:
- Add periodic stale cleanup every 10 minutes (worker-0 only)
- Prevents orphaned tasks from blocking queue after worker crashes

Task Deduplication:
- createStaggeredTasks now skips if pending/active task exists for same role
- Skips if same role completed within last 4 hours
- API responses include skipped count

🤖 Generated with [Claude Code](https://claude.com/claude-code)
This commit is contained in:
Kelly
2025-12-12 22:15:04 -07:00
parent e4e8438d8b
commit c62f8cbf06
11 changed files with 815 additions and 51 deletions

View File

@@ -0,0 +1,66 @@
-- Migration 090: Add modification tracking columns
--
-- Tracks when records were last modified and by which task.
-- Enables debugging, auditing, and understanding data freshness.
--
-- Columns added:
-- last_modified_at - When the record was last modified by a task
-- last_modified_by_task - Which task role modified it (e.g., 'product_refresh')
-- last_modified_task_id - The specific task ID that modified it
-- ============================================================
-- dispensaries table
-- ============================================================
ALTER TABLE dispensaries
ADD COLUMN IF NOT EXISTS last_modified_at TIMESTAMPTZ;
ALTER TABLE dispensaries
ADD COLUMN IF NOT EXISTS last_modified_by_task VARCHAR(50);
ALTER TABLE dispensaries
ADD COLUMN IF NOT EXISTS last_modified_task_id INTEGER;
-- Index for querying recently modified records
CREATE INDEX IF NOT EXISTS idx_dispensaries_last_modified
ON dispensaries(last_modified_at DESC)
WHERE last_modified_at IS NOT NULL;
-- Index for querying by task type
CREATE INDEX IF NOT EXISTS idx_dispensaries_modified_by_task
ON dispensaries(last_modified_by_task)
WHERE last_modified_by_task IS NOT NULL;
COMMENT ON COLUMN dispensaries.last_modified_at IS 'Timestamp when this record was last modified by a task';
COMMENT ON COLUMN dispensaries.last_modified_by_task IS 'Task role that last modified this record (e.g., store_discovery_state, entry_point_discovery)';
COMMENT ON COLUMN dispensaries.last_modified_task_id IS 'ID of the worker_tasks record that last modified this';
-- ============================================================
-- store_products table
-- ============================================================
ALTER TABLE store_products
ADD COLUMN IF NOT EXISTS last_modified_at TIMESTAMPTZ;
ALTER TABLE store_products
ADD COLUMN IF NOT EXISTS last_modified_by_task VARCHAR(50);
ALTER TABLE store_products
ADD COLUMN IF NOT EXISTS last_modified_task_id INTEGER;
-- Index for querying recently modified products
CREATE INDEX IF NOT EXISTS idx_store_products_last_modified
ON store_products(last_modified_at DESC)
WHERE last_modified_at IS NOT NULL;
-- Index for querying by task type
CREATE INDEX IF NOT EXISTS idx_store_products_modified_by_task
ON store_products(last_modified_by_task)
WHERE last_modified_by_task IS NOT NULL;
-- Composite index for finding products modified by a specific task
CREATE INDEX IF NOT EXISTS idx_store_products_task_modified
ON store_products(dispensary_id, last_modified_at DESC)
WHERE last_modified_at IS NOT NULL;
COMMENT ON COLUMN store_products.last_modified_at IS 'Timestamp when this record was last modified by a task';
COMMENT ON COLUMN store_products.last_modified_by_task IS 'Task role that last modified this record (e.g., product_refresh, product_discovery)';
COMMENT ON COLUMN store_products.last_modified_task_id IS 'ID of the worker_tasks record that last modified this';

View File

@@ -131,6 +131,14 @@ export interface PromotionSummary {
newDispensaryIds: number[]; newDispensaryIds: number[];
} }
/**
* Task tracking info for modification audit trail
*/
export interface TaskTrackingInfo {
taskId: number;
taskRole: string;
}
/** /**
* Generate a URL-safe slug from name and city * Generate a URL-safe slug from name and city
*/ */
@@ -283,7 +291,8 @@ async function ensureCrawlerProfile(
* Idempotent: uses ON CONFLICT on platform_dispensary_id * Idempotent: uses ON CONFLICT on platform_dispensary_id
*/ */
async function promoteLocation( async function promoteLocation(
loc: DiscoveryLocationRow loc: DiscoveryLocationRow,
taskTracking?: TaskTrackingInfo
): Promise<PromotionResult> { ): Promise<PromotionResult> {
const slug = loc.platform_slug || generateSlug(loc.name, loc.city || '', loc.state_code || ''); const slug = loc.platform_slug || generateSlug(loc.name, loc.city || '', loc.state_code || '');
@@ -325,13 +334,16 @@ async function promoteLocation(
dutchie_verified, dutchie_verified,
dutchie_verified_at, dutchie_verified_at,
dutchie_discovery_id, dutchie_discovery_id,
last_modified_at,
last_modified_by_task,
last_modified_task_id,
created_at, created_at,
updated_at updated_at
) VALUES ( ) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
$11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
$21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30,
$31, $32, $33, $34, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP $31, $32, $33, $34, $35, $36, $37, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
) )
ON CONFLICT (platform_dispensary_id) WHERE platform_dispensary_id IS NOT NULL ON CONFLICT (platform_dispensary_id) WHERE platform_dispensary_id IS NOT NULL
DO UPDATE SET DO UPDATE SET
@@ -362,6 +374,9 @@ async function promoteLocation(
country = EXCLUDED.country, country = EXCLUDED.country,
status = EXCLUDED.status, status = EXCLUDED.status,
dutchie_discovery_id = EXCLUDED.dutchie_discovery_id, dutchie_discovery_id = EXCLUDED.dutchie_discovery_id,
last_modified_at = EXCLUDED.last_modified_at,
last_modified_by_task = EXCLUDED.last_modified_by_task,
last_modified_task_id = EXCLUDED.last_modified_task_id,
updated_at = CURRENT_TIMESTAMP updated_at = CURRENT_TIMESTAMP
RETURNING id, (xmax = 0) AS inserted RETURNING id, (xmax = 0) AS inserted
`, [ `, [
@@ -399,6 +414,9 @@ async function promoteLocation(
true, // $32 dutchie_verified true, // $32 dutchie_verified
new Date(), // $33 dutchie_verified_at new Date(), // $33 dutchie_verified_at
loc.id, // $34 dutchie_discovery_id loc.id, // $34 dutchie_discovery_id
taskTracking ? new Date() : null, // $35 last_modified_at
taskTracking?.taskRole || null, // $36 last_modified_by_task
taskTracking?.taskId || null, // $37 last_modified_task_id
]); ]);
const dispensaryId = upsertResult.rows[0].id; const dispensaryId = upsertResult.rows[0].id;
@@ -446,10 +464,12 @@ async function promoteLocation(
* *
* @param stateCode Optional filter by state (e.g., 'CA', 'AZ') * @param stateCode Optional filter by state (e.g., 'CA', 'AZ')
* @param dryRun If true, only validate without making changes * @param dryRun If true, only validate without making changes
* @param taskTracking Optional task info for modification audit trail
*/ */
export async function promoteDiscoveredLocations( export async function promoteDiscoveredLocations(
stateCode?: string, stateCode?: string,
dryRun = false dryRun = false,
taskTracking?: TaskTrackingInfo
): Promise<PromotionSummary> { ): Promise<PromotionSummary> {
const startTime = Date.now(); const startTime = Date.now();
@@ -524,7 +544,7 @@ export async function promoteDiscoveredLocations(
} }
try { try {
const promotionResult = await promoteLocation(loc); const promotionResult = await promoteLocation(loc, taskTracking);
results.push(promotionResult); results.push(promotionResult);
if (promotionResult.action === 'created') { if (promotionResult.action === 'created') {

View File

@@ -1182,17 +1182,20 @@ router.post('/batch/staggered', async (req: Request, res: Response) => {
method method
); );
const totalDuration = (dispensary_ids.length - 1) * stagger_seconds; const totalDuration = (result.created - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000); const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({ res.status(201).json({
success: true, success: true,
created: result.created, created: result.created,
skipped: result.skipped,
task_ids: result.taskIds, task_ids: result.taskIds,
stagger_seconds, stagger_seconds,
total_duration_seconds: totalDuration, total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(), estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.created} staggered ${role} tasks (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`, message: result.skipped > 0
? `Created ${result.created} staggered ${role} tasks, skipped ${result.skipped} (duplicate/recently completed)`
: `Created ${result.created} staggered ${role} tasks (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
}); });
} catch (error: unknown) { } catch (error: unknown) {
console.error('Error creating staggered tasks:', error); console.error('Error creating staggered tasks:', error);
@@ -1326,6 +1329,107 @@ router.post('/batch/entry-point-discovery', async (req: Request, res: Response)
} }
}); });
/**
* POST /api/tasks/batch/store-discovery
* Create parallelized store_discovery_state tasks for all active states
*
* Instead of one monolithic store_discovery task that takes hours,
* this creates individual tasks for each state that can run in parallel.
*
* Body (optional):
* - stagger_seconds: number (default: 10) - Seconds between each state task
* - priority: number (default: 5) - Task priority
* - states: string[] (optional) - Specific state codes to discover (default: all active)
*/
router.post('/batch/store-discovery', async (req: Request, res: Response) => {
try {
const {
stagger_seconds = 10,
priority = 5,
states: specificStates,
} = req.body;
// Get active states
let statesQuery = `
SELECT code, name FROM states WHERE is_active = true
`;
const params: any[] = [];
if (specificStates && Array.isArray(specificStates) && specificStates.length > 0) {
statesQuery += ` AND code = ANY($1)`;
params.push(specificStates.map((s: string) => s.toUpperCase()));
}
statesQuery += ` ORDER BY code`;
const statesResult = await pool.query(statesQuery, params);
if (statesResult.rows.length === 0) {
return res.json({
success: true,
message: 'No active states to discover',
tasks_created: 0,
});
}
// Check for existing pending/running store_discovery_state tasks
const existingResult = await pool.query(`
SELECT payload->>'state_code' as state_code
FROM worker_tasks
WHERE role = 'store_discovery_state'
AND status IN ('pending', 'claimed', 'running')
`);
const existingStates = new Set(existingResult.rows.map((r: any) => r.state_code));
// Filter out states that already have pending tasks
const statesToCreate = statesResult.rows.filter(
(s: { code: string }) => !existingStates.has(s.code)
);
if (statesToCreate.length === 0) {
return res.json({
success: true,
message: 'All states already have pending store_discovery_state tasks',
tasks_created: 0,
skipped: statesResult.rows.length,
});
}
// Create staggered tasks for each state
const taskIds: number[] = [];
for (let i = 0; i < statesToCreate.length; i++) {
const state = statesToCreate[i];
const scheduledFor = new Date(Date.now() + i * stagger_seconds * 1000);
const result = await pool.query(`
INSERT INTO worker_tasks (role, priority, scheduled_for, method, payload)
VALUES ('store_discovery_state', $1, $2, 'http', $3)
RETURNING id
`, [priority, scheduledFor, JSON.stringify({ state_code: state.code })]);
taskIds.push(result.rows[0].id);
}
const totalDuration = statesToCreate.length * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
tasks_created: taskIds.length,
task_ids: taskIds,
states: statesToCreate.map((s: { code: string; name: string }) => s.code),
skipped: statesResult.rows.length - statesToCreate.length,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_start_completion: estimatedEndTime.toISOString(),
message: `Created ${taskIds.length} store_discovery_state tasks for parallel execution`,
});
} catch (error: unknown) {
console.error('Error creating store discovery tasks:', error);
res.status(500).json({ error: 'Failed to create store discovery tasks' });
}
});
// ============================================================ // ============================================================
// STATE-BASED CRAWL ENDPOINTS // STATE-BASED CRAWL ENDPOINTS
// ============================================================ // ============================================================
@@ -1414,11 +1518,13 @@ router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => {
state_name: state.name, state_name: state.name,
tasks_created: result.created, tasks_created: result.created,
stores_in_state: dispensariesResult.rows.length, stores_in_state: dispensariesResult.rows.length,
skipped: dispensariesResult.rows.length - result.created, skipped: result.skipped,
stagger_seconds, stagger_seconds,
total_duration_seconds: totalDuration, total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(), estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.created} product_discovery tasks for ${state.name} (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`, message: result.skipped > 0
? `Created ${result.created} product_discovery tasks for ${state.name}, skipped ${result.skipped} (duplicate/recently completed)`
: `Created ${result.created} product_discovery tasks for ${state.name} (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
}); });
} catch (error: unknown) { } catch (error: unknown) {
console.error('Error creating state crawl tasks:', error); console.error('Error creating state crawl tasks:', error);

View File

@@ -48,9 +48,12 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
await pool.query(` await pool.query(`
UPDATE dispensaries UPDATE dispensaries
SET last_id_resolution_at = NOW(), SET last_id_resolution_at = NOW(),
id_resolution_status = 'resolved' id_resolution_status = 'resolved',
last_modified_at = NOW(),
last_modified_by_task = $2,
last_modified_task_id = $3
WHERE id = $1 WHERE id = $1
`, [dispensaryId]); `, [dispensaryId, task.role, task.id]);
return { return {
success: true, success: true,
alreadyResolved: true, alreadyResolved: true,
@@ -93,9 +96,13 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
// Mark as non-dutchie menu type // Mark as non-dutchie menu type
await pool.query(` await pool.query(`
UPDATE dispensaries UPDATE dispensaries
SET menu_type = 'unknown', updated_at = NOW() SET menu_type = 'unknown',
updated_at = NOW(),
last_modified_at = NOW(),
last_modified_by_task = $2,
last_modified_task_id = $3
WHERE id = $1 WHERE id = $1
`, [dispensaryId]); `, [dispensaryId, task.role, task.id]);
return { return {
success: false, success: false,
@@ -141,9 +148,12 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
END, END,
id_resolution_status = 'failed', id_resolution_status = 'failed',
id_resolution_error = $3, id_resolution_error = $3,
updated_at = NOW() updated_at = NOW(),
last_modified_at = NOW(),
last_modified_by_task = $4,
last_modified_task_id = $5
WHERE id = $1 WHERE id = $1
`, [dispensaryId, result.httpStatus || 0, reason]); `, [dispensaryId, result.httpStatus || 0, reason, task.role, task.id]);
return { return {
success: false, success: false,
@@ -159,7 +169,7 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
await ctx.heartbeat(); await ctx.heartbeat();
// ============================================================ // ============================================================
// STEP 5: Update dispensary with resolved ID // STEP 5: Update dispensary with resolved ID and tracking
// ============================================================ // ============================================================
await pool.query(` await pool.query(`
UPDATE dispensaries UPDATE dispensaries
@@ -169,9 +179,12 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
crawl_enabled = true, crawl_enabled = true,
id_resolution_status = 'resolved', id_resolution_status = 'resolved',
id_resolution_error = NULL, id_resolution_error = NULL,
updated_at = NOW() updated_at = NOW(),
last_modified_at = NOW(),
last_modified_by_task = $3,
last_modified_task_id = $4
WHERE id = $1 WHERE id = $1
`, [dispensaryId, platformId]); `, [dispensaryId, platformId, task.role, task.id]);
console.log(`[EntryPointDiscovery] Updated dispensary ${dispensaryId} with platform ID`); console.log(`[EntryPointDiscovery] Updated dispensary ${dispensaryId} with platform ID`);

View File

@@ -14,6 +14,7 @@ export { handlePayloadFetch as handlePayloadFetchCurl } from './payload-fetch-cu
export { handleProductRefresh } from './product-refresh'; export { handleProductRefresh } from './product-refresh';
export { handleStoreDiscovery } from './store-discovery'; export { handleStoreDiscovery } from './store-discovery';
export { handleStoreDiscoveryHttp } from './store-discovery-http'; export { handleStoreDiscoveryHttp } from './store-discovery-http';
export { handleStoreDiscoveryState } from './store-discovery-state';
export { handleEntryPointDiscovery } from './entry-point-discovery'; export { handleEntryPointDiscovery } from './entry-point-discovery';
export { handleAnalyticsRefresh } from './analytics-refresh'; export { handleAnalyticsRefresh } from './analytics-refresh';
export { handleWhoami } from './whoami'; export { handleWhoami } from './whoami';

View File

@@ -326,13 +326,16 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
console.log(`[ProductDiscoveryHTTP] Saved payload #${payloadResult.id} (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`); console.log(`[ProductDiscoveryHTTP] Saved payload #${payloadResult.id} (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`);
// ============================================================ // ============================================================
// STEP 6: Update dispensary last_fetch_at // STEP 6: Update dispensary last_fetch_at and tracking
// ============================================================ // ============================================================
await pool.query(` await pool.query(`
UPDATE dispensaries UPDATE dispensaries
SET last_fetch_at = NOW() SET last_fetch_at = NOW(),
last_modified_at = NOW(),
last_modified_by_task = $2,
last_modified_task_id = $3
WHERE id = $1 WHERE id = $1
`, [dispensaryId]); `, [dispensaryId, task.role, task.id]);
// ============================================================ // ============================================================
// STEP 7: Queue product_refresh task to process the payload // STEP 7: Queue product_refresh task to process the payload

View File

@@ -296,13 +296,26 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
} }
// ============================================================ // ============================================================
// STEP 7: Update dispensary last_crawl_at // STEP 7: Update dispensary last_crawl_at and tracking
// ============================================================ // ============================================================
await pool.query(` await pool.query(`
UPDATE dispensaries UPDATE dispensaries
SET last_crawl_at = NOW() SET last_crawl_at = NOW(),
last_modified_at = NOW(),
last_modified_by_task = $2,
last_modified_task_id = $3
WHERE id = $1 WHERE id = $1
`, [dispensaryId]); `, [dispensaryId, task.role, task.id]);
// Bulk update store_products tracking for this dispensary
await pool.query(`
UPDATE store_products
SET last_modified_at = NOW(),
last_modified_by_task = $2,
last_modified_task_id = $3
WHERE dispensary_id = $1
AND last_seen_at >= NOW() - INTERVAL '5 minutes'
`, [dispensaryId, task.role, task.id]);
// ============================================================ // ============================================================
// STEP 8: Mark payload as processed // STEP 8: Mark payload as processed

View File

@@ -20,7 +20,7 @@
import { TaskContext, TaskResult } from '../task-worker'; import { TaskContext, TaskResult } from '../task-worker';
import { upsertLocation } from '../../discovery/location-discovery'; import { upsertLocation } from '../../discovery/location-discovery';
import { promoteDiscoveredLocations } from '../../discovery/promotion'; import { promoteDiscoveredLocations, TaskTrackingInfo } from '../../discovery/promotion';
import { saveDiscoveryPayload } from '../../utils/payload-storage'; import { saveDiscoveryPayload } from '../../utils/payload-storage';
// GraphQL hashes - MUST match CLAUDE.md / dutchie/client.ts // GraphQL hashes - MUST match CLAUDE.md / dutchie/client.ts
@@ -405,7 +405,12 @@ export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskRe
// Auto-promote valid locations for this state // Auto-promote valid locations for this state
try { try {
const promotionResult = await promoteDiscoveredLocations(stateCode); // Pass task tracking info for modification audit trail
const taskTracking: TaskTrackingInfo = {
taskId: task.id,
taskRole: task.role,
};
const promotionResult = await promoteDiscoveredLocations(stateCode, false, taskTracking);
const promoted = promotionResult.created + promotionResult.updated; const promoted = promotionResult.created + promotionResult.updated;
if (promoted > 0) { if (promoted > 0) {
console.log(`[StoreDiscoveryHTTP] Promoted ${promoted} locations in ${stateCode} (${promotionResult.created} new, ${promotionResult.updated} updated)`); console.log(`[StoreDiscoveryHTTP] Promoted ${promoted} locations in ${stateCode} (${promotionResult.created} new, ${promotionResult.updated} updated)`);

View File

@@ -0,0 +1,468 @@
/**
* Store Discovery State Handler (Parallelized)
*
* Discovers stores for a SINGLE state using Puppeteer + StealthPlugin.
* This enables parallel discovery across multiple workers.
*
* Task payload: { state_code: 'AZ' }
*
* Flow:
* 1. Launch browser with proxy
* 2. Fetch cities for the target state
* 3. Fetch stores for each city
* 4. Upsert to dutchie_discovery_locations
* 5. Auto-promote valid locations to dispensaries table
* 6. Save raw payload for historical analysis
*/
import { TaskContext, TaskResult } from '../task-worker';
import { upsertLocation } from '../../discovery/location-discovery';
import { promoteDiscoveredLocations, TaskTrackingInfo } from '../../discovery/promotion';
import { saveDiscoveryPayload } from '../../utils/payload-storage';
// GraphQL hashes - MUST match CLAUDE.md / dutchie/client.ts
const GET_ALL_CITIES_HASH = 'ae547a0466ace5a48f91e55bf6699eacd87e3a42841560f0c0eabed5a0a920e6';
const CONSUMER_DISPENSARIES_HASH = '0a5bfa6ca1d64ae47bcccb7c8077c87147cbc4e6982c17ceec97a2a4948b311b';
interface DiscoveredLocation {
id: string;
name: string;
slug: string;
cName?: string;
address?: string;
city?: string;
state?: string;
zip?: string;
latitude?: number;
longitude?: number;
offerPickup?: boolean;
offerDelivery?: boolean;
isRecreational?: boolean;
isMedical?: boolean;
phone?: string;
email?: string;
website?: string;
description?: string;
logoImage?: string;
bannerImage?: string;
chainSlug?: string;
enterpriseId?: string;
retailType?: string;
status?: string;
timezone?: string;
location?: {
ln1?: string;
ln2?: string;
city?: string;
state?: string;
zipcode?: string;
country?: string;
geometry?: { coordinates?: [number, number] };
};
}
export async function handleStoreDiscoveryState(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator, updateStep } = ctx;
const platform = task.platform || 'dutchie';
// Get state_code from task payload
const taskPayload = task.payload as { state_code?: string } | null;
const stateCode = taskPayload?.state_code;
if (!stateCode) {
return { success: false, error: 'No state_code specified in task payload' };
}
let browser: any = null;
try {
updateStep('starting', `Discovering stores in ${stateCode}`);
console.log(`[StoreDiscoveryState] Starting discovery for ${stateCode}`);
// ============================================================
// STEP 1: Setup Puppeteer with proxy
// ============================================================
updateStep('preflight', 'Launching browser');
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
// Get proxy from CrawlRotator if available
let proxyUrl: string | null = null;
if (crawlRotator) {
const currentProxy = crawlRotator.proxy.getCurrent();
if (currentProxy) {
proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy);
console.log(`[StoreDiscoveryState] Using proxy: ${currentProxy.host}:${currentProxy.port}`);
}
}
// Build browser args
const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox'];
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`);
}
browser = await puppeteer.launch({
headless: 'new',
args: browserArgs,
});
const page = await browser.newPage();
// Setup proxy auth if needed
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
if (proxyUrlParsed.username && proxyUrlParsed.password) {
await page.authenticate({
username: decodeURIComponent(proxyUrlParsed.username),
password: decodeURIComponent(proxyUrlParsed.password),
});
}
}
await ctx.heartbeat();
// ============================================================
// STEP 2: Establish session by visiting dispensaries page
// ============================================================
updateStep('navigating', 'Loading session page');
const sessionUrl = 'https://dutchie.com/dispensaries';
console.log(`[StoreDiscoveryState] Establishing session at ${sessionUrl}...`);
await page.goto(sessionUrl, {
waitUntil: 'networkidle2',
timeout: 60000,
});
// Handle potential age gate
try {
await page.waitForTimeout(1500);
await page.evaluate(() => {
const buttons = Array.from(document.querySelectorAll('button'));
for (const btn of buttons) {
const text = btn.textContent?.toLowerCase() || '';
if (text.includes('yes') || text.includes('enter') || text.includes('21')) {
(btn as HTMLButtonElement).click();
return true;
}
}
return false;
});
} catch {
// Age gate might not be present
}
console.log(`[StoreDiscoveryState] Session established`);
await ctx.heartbeat();
// ============================================================
// STEP 3: Fetch cities for this state via GraphQL
// ============================================================
updateStep('fetching', `Fetching cities for ${stateCode}`);
const citiesResult = await page.evaluate(async (hash: string, targetState: string) => {
const logs: string[] = [];
try {
const extensions = {
persistedQuery: { version: 1, sha256Hash: hash },
};
const qs = new URLSearchParams({
operationName: 'getAllCitiesByState',
variables: JSON.stringify({}),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
},
credentials: 'include',
});
logs.push(`getAllCitiesByState: HTTP ${response.status}`);
if (!response.ok) {
return { cities: [], logs };
}
const json = await response.json();
const statesData = json?.data?.statesWithDispensaries || [];
// Find our target state
const stateData = statesData.find((s: any) =>
s.name?.toUpperCase() === targetState.toUpperCase()
);
if (!stateData) {
logs.push(`State ${targetState} not found in response`);
return { cities: [], logs };
}
const cities = Array.isArray(stateData.cities)
? stateData.cities.filter((c: string | null) => c !== null)
: [];
logs.push(`Found ${cities.length} cities for ${targetState}`);
return { cities, country: stateData.country || 'US', logs };
} catch (err: any) {
logs.push(`Error: ${err.message}`);
return { cities: [], logs };
}
}, GET_ALL_CITIES_HASH, stateCode);
citiesResult.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
if (citiesResult.cities.length === 0) {
await browser.close();
return {
success: true,
stateCode,
storesDiscovered: 0,
message: `No cities found for ${stateCode}`
};
}
console.log(`[StoreDiscoveryState] Discovering ${citiesResult.cities.length} cities in ${stateCode}...`);
await ctx.heartbeat();
// ============================================================
// STEP 4: Fetch stores for each city
// ============================================================
let totalDiscovered = 0;
let totalUpserted = 0;
const allNewStoreIds: number[] = [];
const stateRawStores: any[] = [];
const stateCityData: { city: string; stores: any[] }[] = [];
for (const city of citiesResult.cities) {
try {
const cityResult = await page.evaluate(async (
cityName: string,
stateCodeParam: string,
hash: string
) => {
const logs: string[] = [];
const allDispensaries: any[] = [];
let page = 0;
const perPage = 200;
try {
while (page < 5) { // Max 5 pages per city
const variables = {
dispensaryFilter: {
activeOnly: true,
city: cityName,
state: stateCodeParam,
},
page,
perPage,
};
const extensions = {
persistedQuery: { version: 1, sha256Hash: hash },
};
const qs = new URLSearchParams({
operationName: 'ConsumerDispensaries',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
},
credentials: 'include',
});
if (!response.ok) {
logs.push(`${cityName}: HTTP ${response.status}`);
break;
}
const json = await response.json();
const dispensaries = json?.data?.filteredDispensaries || [];
if (dispensaries.length === 0) {
break;
}
// Filter to ensure correct state
const stateFiltered = dispensaries.filter((d: any) =>
d.location?.state?.toUpperCase() === stateCodeParam.toUpperCase()
);
allDispensaries.push(...stateFiltered);
if (dispensaries.length < perPage) {
break;
}
page++;
// Small delay between pages
await new Promise(r => setTimeout(r, 100));
}
logs.push(`${cityName}: ${allDispensaries.length} stores`);
} catch (err: any) {
logs.push(`${cityName}: Error - ${err.message}`);
}
return { dispensaries: allDispensaries, logs };
}, city, stateCode, CONSUMER_DISPENSARIES_HASH);
cityResult.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
// Accumulate raw store data
stateRawStores.push(...cityResult.dispensaries);
stateCityData.push({ city, stores: cityResult.dispensaries });
// Upsert each discovered location
for (const disp of cityResult.dispensaries) {
try {
const location = normalizeLocation(disp);
if (!location.id) {
continue; // Skip locations without platform ID
}
const result = await upsertLocation(pool, location as any, null);
if (result) {
totalUpserted++;
if (result.isNew) {
totalDiscovered++;
}
}
} catch (err: any) {
console.error(`[StoreDiscoveryState] Upsert error for ${disp.name}:`, err.message);
}
}
// Small delay between cities to avoid rate limiting
await new Promise(r => setTimeout(r, 300));
} catch (err: any) {
console.error(`[StoreDiscoveryState] Error fetching ${city}, ${stateCode}:`, err.message);
}
// Heartbeat every few cities
if (stateCityData.length % 10 === 0) {
await ctx.heartbeat();
}
}
await ctx.heartbeat();
// ============================================================
// STEP 5: Save raw payload for this state
// ============================================================
if (stateRawStores.length > 0) {
try {
const rawPayload = {
stateCode,
platform,
fetchedAt: new Date().toISOString(),
storeCount: stateRawStores.length,
citiesProcessed: stateCityData.length,
cities: stateCityData,
stores: stateRawStores,
};
const payloadResult = await saveDiscoveryPayload(pool, stateCode, rawPayload, stateRawStores.length);
console.log(`[StoreDiscoveryState] Saved payload for ${stateCode}: ${stateRawStores.length} stores (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`);
} catch (err: any) {
console.error(`[StoreDiscoveryState] Failed to save payload for ${stateCode}:`, err.message);
}
}
// ============================================================
// STEP 6: Auto-promote valid locations
// ============================================================
try {
// Pass task tracking info for modification audit trail
const taskTracking: TaskTrackingInfo = {
taskId: task.id,
taskRole: task.role,
};
const promotionResult = await promoteDiscoveredLocations(stateCode, false, taskTracking);
const promoted = promotionResult.created + promotionResult.updated;
if (promoted > 0) {
console.log(`[StoreDiscoveryState] Promoted ${promoted} locations in ${stateCode} (${promotionResult.created} new, ${promotionResult.updated} updated)`);
const newIds = (promotionResult as any).newDispensaryIds || [];
allNewStoreIds.push(...newIds);
}
} catch (err: any) {
console.error(`[StoreDiscoveryState] Promotion error for ${stateCode}:`, err.message);
}
await browser.close();
browser = null;
console.log(`[StoreDiscoveryState] Complete for ${stateCode}: ${totalDiscovered} new, ${totalUpserted} upserted`);
return {
success: true,
stateCode,
storesDiscovered: totalDiscovered,
storesUpserted: totalUpserted,
citiesProcessed: stateCityData.length,
newStoreIds: allNewStoreIds,
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[StoreDiscoveryState] Error for ${stateCode}:`, errorMessage);
return {
success: false,
stateCode,
error: errorMessage,
};
} finally {
if (browser) {
await browser.close().catch(() => {});
}
}
}
/**
* Normalize a raw dispensary response to our DiscoveredLocation format
*/
function normalizeLocation(raw: any): DiscoveredLocation {
const loc = raw.location || {};
const coords = loc.geometry?.coordinates || [];
return {
id: raw.id || raw._id || '',
name: raw.name || '',
slug: raw.slug || raw.cName || '',
cName: raw.cName || raw.slug || '',
address: raw.address || loc.ln1 || '',
city: raw.city || loc.city || '',
state: raw.state || loc.state || '',
zip: raw.zip || loc.zipcode || loc.zip || '',
latitude: coords[1] || raw.latitude,
longitude: coords[0] || raw.longitude,
timezone: raw.timezone || '',
offerPickup: raw.offerPickup ?? raw.storeSettings?.offerPickup ?? true,
offerDelivery: raw.offerDelivery ?? raw.storeSettings?.offerDelivery ?? false,
isRecreational: raw.isRecreational ?? raw.recDispensary ?? true,
isMedical: raw.isMedical ?? raw.medicalDispensary ?? true,
phone: raw.phone || '',
email: raw.email || '',
website: raw.embedBackUrl || '',
description: raw.description || '',
logoImage: raw.logoImage || '',
bannerImage: raw.bannerImage || '',
chainSlug: raw.chain || '',
enterpriseId: raw.retailer?.enterpriseId || '',
retailType: raw.retailType || '',
status: raw.status || '',
location: loc,
};
}

View File

@@ -28,6 +28,7 @@ async function tableExists(tableName: string): Promise<boolean> {
// product_refresh: Legacy role (deprecated but kept for compatibility) // product_refresh: Legacy role (deprecated but kept for compatibility)
export type TaskRole = export type TaskRole =
| 'store_discovery' | 'store_discovery'
| 'store_discovery_state' // Per-state parallelized store discovery
| 'entry_point_discovery' | 'entry_point_discovery'
| 'product_discovery' | 'product_discovery'
| 'payload_fetch' // Fetches from API, saves to disk | 'payload_fetch' // Fetches from API, saves to disk
@@ -706,37 +707,67 @@ class TaskService {
role: TaskRole, role: TaskRole,
staggerSeconds: number = 15, staggerSeconds: number = 15,
platform: string = 'dutchie', platform: string = 'dutchie',
method: 'curl' | 'http' | null = null method: 'curl' | 'http' | null = null,
): Promise<{ created: number; taskIds: number[] }> { options: { skipRecentHours?: number } = {}
): Promise<{ created: number; skipped: number; taskIds: number[] }> {
if (dispensaryIds.length === 0) { if (dispensaryIds.length === 0) {
return { created: 0, taskIds: [] }; return { created: 0, skipped: 0, taskIds: [] };
} }
// Use a single INSERT with generate_series for efficiency const { skipRecentHours = 4 } = options; // Skip if completed within last 4 hours
// Filter out dispensaries that:
// 1. Already have a pending/claimed/running task for this role
// 2. Had this role completed recently (within skipRecentHours)
const result = await pool.query(` const result = await pool.query(`
WITH task_data AS ( WITH input_ids AS (
SELECT SELECT unnest($1::int[]) as dispensary_id
unnest($1::int[]) as dispensary_id, ),
generate_series(0, array_length($1::int[], 1) - 1) as idx eligible_ids AS (
SELECT i.dispensary_id
FROM input_ids i
WHERE NOT EXISTS (
-- No pending/active task for same role
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = i.dispensary_id
AND t.role = $2
AND t.status IN ('pending', 'claimed', 'running')
)
AND NOT EXISTS (
-- No recent completion for same role
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = i.dispensary_id
AND t.role = $2
AND t.status = 'completed'
AND t.completed_at > NOW() - ($6::int * INTERVAL '1 hour')
)
),
numbered AS (
SELECT dispensary_id, ROW_NUMBER() OVER (ORDER BY dispensary_id) - 1 as idx
FROM eligible_ids
) )
INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status) INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status)
SELECT SELECT
$2::varchar as role, $2::varchar as role,
td.dispensary_id, n.dispensary_id,
$3::varchar as platform, $3::varchar as platform,
$4::varchar as method, $4::varchar as method,
NOW() + (td.idx * $5::int * INTERVAL '1 second') as scheduled_for, NOW() + (n.idx * $5::int * INTERVAL '1 second') as scheduled_for,
'pending' as status 'pending' as status
FROM task_data td FROM numbered n
ON CONFLICT DO NOTHING
RETURNING id RETURNING id
`, [dispensaryIds, role, platform, method, staggerSeconds]); `, [dispensaryIds, role, platform, method, staggerSeconds, skipRecentHours]);
const taskIds = result.rows.map((r: { id: number }) => r.id); const taskIds = result.rows.map((r: { id: number }) => r.id);
const skipped = dispensaryIds.length - taskIds.length;
if (skipped > 0) {
console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks, skipped ${skipped} (duplicate/recent)`);
} else {
console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks (${staggerSeconds}s apart)`); console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks (${staggerSeconds}s apart)`);
}
return { created: taskIds.length, taskIds }; return { created: taskIds.length, skipped, taskIds };
} }
/** /**

View File

@@ -76,6 +76,7 @@ import { handleProductDiscovery } from './handlers/product-discovery-curl';
import { handleProductDiscoveryHttp } from './handlers/product-discovery-http'; import { handleProductDiscoveryHttp } from './handlers/product-discovery-http';
import { handleStoreDiscovery } from './handlers/store-discovery'; import { handleStoreDiscovery } from './handlers/store-discovery';
import { handleStoreDiscoveryHttp } from './handlers/store-discovery-http'; import { handleStoreDiscoveryHttp } from './handlers/store-discovery-http';
import { handleStoreDiscoveryState } from './handlers/store-discovery-state';
import { handleEntryPointDiscovery } from './handlers/entry-point-discovery'; import { handleEntryPointDiscovery } from './handlers/entry-point-discovery';
import { handleAnalyticsRefresh } from './handlers/analytics-refresh'; import { handleAnalyticsRefresh } from './handlers/analytics-refresh';
import { handleWhoami } from './handlers/whoami'; import { handleWhoami } from './handlers/whoami';
@@ -159,6 +160,7 @@ const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
product_refresh: handleProductRefresh, // disk -> DB product_refresh: handleProductRefresh, // disk -> DB
product_discovery: handleProductDiscovery, // Default: curl (see getHandlerForTask for http override) product_discovery: handleProductDiscovery, // Default: curl (see getHandlerForTask for http override)
store_discovery: handleStoreDiscovery, store_discovery: handleStoreDiscovery,
store_discovery_state: handleStoreDiscoveryState, // Per-state parallelized discovery
entry_point_discovery: handleEntryPointDiscovery, entry_point_discovery: handleEntryPointDiscovery,
analytics_refresh: handleAnalyticsRefresh, analytics_refresh: handleAnalyticsRefresh,
whoami: handleWhoami, // Tests proxy + anti-detect whoami: handleWhoami, // Tests proxy + anti-detect
@@ -221,6 +223,7 @@ export class TaskWorker {
private isRunning: boolean = false; private isRunning: boolean = false;
private heartbeatInterval: NodeJS.Timeout | null = null; private heartbeatInterval: NodeJS.Timeout | null = null;
private registryHeartbeatInterval: NodeJS.Timeout | null = null; private registryHeartbeatInterval: NodeJS.Timeout | null = null;
private staleCleanupInterval: NodeJS.Timeout | null = null;
private crawlRotator: CrawlRotator; private crawlRotator: CrawlRotator;
// ========================================================================== // ==========================================================================
@@ -798,6 +801,44 @@ export class TaskWorker {
} }
} }
/**
* Run stale task cleanup once
* Recovers tasks left in claimed/running status after worker crashes
*/
private async runStaleTaskCleanup(): Promise<void> {
try {
console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`);
const cleanupResult = await taskService.cleanupStaleTasks(30); // 30 minute threshold
if (cleanupResult.cleaned > 0) {
console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`);
}
} catch (err: any) {
console.error(`[TaskWorker] Stale task cleanup error:`, err.message);
}
}
/**
* Start periodic stale task cleanup (every 10 minutes)
* Only run by worker-0 to avoid races
*/
private startPeriodicStaleCleanup(): void {
const STALE_CLEANUP_INTERVAL_MS = 10 * 60 * 1000; // 10 minutes
this.staleCleanupInterval = setInterval(async () => {
await this.runStaleTaskCleanup();
}, STALE_CLEANUP_INTERVAL_MS);
console.log(`[TaskWorker] ${this.friendlyName} started periodic stale cleanup (every 10 min)`);
}
/**
* Stop periodic stale task cleanup
*/
private stopPeriodicStaleCleanup(): void {
if (this.staleCleanupInterval) {
clearInterval(this.staleCleanupInterval);
this.staleCleanupInterval = null;
}
}
/** /**
* Start the worker loop * Start the worker loop
* *
@@ -814,18 +855,14 @@ export class TaskWorker {
// Start registry heartbeat immediately // Start registry heartbeat immediately
this.startRegistryHeartbeat(); this.startRegistryHeartbeat();
// Cleanup stale tasks on startup (only worker-0 does this to avoid races) // Cleanup stale tasks on startup and periodically (only worker-0 does this to avoid races)
// This handles tasks left in 'claimed'/'running' status when workers restart // This handles tasks left in 'claimed'/'running' status when workers restart or crash
if (this.workerId.endsWith('-0') || this.workerId === 'scraper-worker-0') { if (this.workerId.endsWith('-0') || this.workerId === 'scraper-worker-0') {
try { // Run immediately on startup
console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`); await this.runStaleTaskCleanup();
const cleanupResult = await taskService.cleanupStaleTasks(30); // 30 minute threshold
if (cleanupResult.cleaned > 0) { // Start periodic cleanup every 10 minutes
console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`); this.startPeriodicStaleCleanup();
}
} catch (err: any) {
console.error(`[TaskWorker] Stale task cleanup error:`, err.message);
}
} }
const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)'; const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)';
@@ -980,6 +1017,7 @@ export class TaskWorker {
this.isRunning = false; this.isRunning = false;
this.stopHeartbeat(); this.stopHeartbeat();
this.stopRegistryHeartbeat(); this.stopRegistryHeartbeat();
this.stopPeriodicStaleCleanup();
await this.deregister(); await this.deregister();
console.log(`[TaskWorker] ${this.friendlyName} stopped`); console.log(`[TaskWorker] ${this.friendlyName} stopped`);
} }