From c62f8cbf060d8fb523f078993f1ee408ba848285 Mon Sep 17 00:00:00 2001 From: Kelly Date: Fri, 12 Dec 2025 22:15:04 -0700 Subject: [PATCH] feat: Parallelized store discovery, modification tracking, and task deduplication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Store Discovery Parallelization: - Add store_discovery_state handler for per-state parallel discovery - Add POST /api/tasks/batch/store-discovery endpoint - 8 workers can now process states in parallel (~30-45 min vs 3+ hours) Modification Tracking (Migration 090): - Add last_modified_at, last_modified_by_task, last_modified_task_id to dispensaries - Add same columns to store_products - Update all handlers to set tracking info on modifications Stale Task Recovery: - Add periodic stale cleanup every 10 minutes (worker-0 only) - Prevents orphaned tasks from blocking queue after worker crashes Task Deduplication: - createStaggeredTasks now skips if pending/active task exists for same role - Skips if same role completed within last 4 hours - API responses include skipped count 🤖 Generated with [Claude Code](https://claude.com/claude-code) --- .../migrations/090_modification_tracking.sql | 66 +++ backend/src/discovery/promotion.ts | 28 +- backend/src/routes/tasks.ts | 114 ++++- .../tasks/handlers/entry-point-discovery.ts | 31 +- backend/src/tasks/handlers/index.ts | 1 + .../tasks/handlers/product-discovery-http.ts | 9 +- backend/src/tasks/handlers/product-refresh.ts | 19 +- .../tasks/handlers/store-discovery-http.ts | 9 +- .../tasks/handlers/store-discovery-state.ts | 468 ++++++++++++++++++ backend/src/tasks/task-service.ts | 61 ++- backend/src/tasks/task-worker.ts | 60 ++- 11 files changed, 815 insertions(+), 51 deletions(-) create mode 100644 backend/migrations/090_modification_tracking.sql create mode 100644 backend/src/tasks/handlers/store-discovery-state.ts diff --git a/backend/migrations/090_modification_tracking.sql b/backend/migrations/090_modification_tracking.sql new file mode 100644 index 00000000..3ea12ef1 --- /dev/null +++ b/backend/migrations/090_modification_tracking.sql @@ -0,0 +1,66 @@ +-- Migration 090: Add modification tracking columns +-- +-- Tracks when records were last modified and by which task. +-- Enables debugging, auditing, and understanding data freshness. +-- +-- Columns added: +-- last_modified_at - When the record was last modified by a task +-- last_modified_by_task - Which task role modified it (e.g., 'product_refresh') +-- last_modified_task_id - The specific task ID that modified it + +-- ============================================================ +-- dispensaries table +-- ============================================================ +ALTER TABLE dispensaries +ADD COLUMN IF NOT EXISTS last_modified_at TIMESTAMPTZ; + +ALTER TABLE dispensaries +ADD COLUMN IF NOT EXISTS last_modified_by_task VARCHAR(50); + +ALTER TABLE dispensaries +ADD COLUMN IF NOT EXISTS last_modified_task_id INTEGER; + +-- Index for querying recently modified records +CREATE INDEX IF NOT EXISTS idx_dispensaries_last_modified + ON dispensaries(last_modified_at DESC) + WHERE last_modified_at IS NOT NULL; + +-- Index for querying by task type +CREATE INDEX IF NOT EXISTS idx_dispensaries_modified_by_task + ON dispensaries(last_modified_by_task) + WHERE last_modified_by_task IS NOT NULL; + +COMMENT ON COLUMN dispensaries.last_modified_at IS 'Timestamp when this record was last modified by a task'; +COMMENT ON COLUMN dispensaries.last_modified_by_task IS 'Task role that last modified this record (e.g., store_discovery_state, entry_point_discovery)'; +COMMENT ON COLUMN dispensaries.last_modified_task_id IS 'ID of the worker_tasks record that last modified this'; + +-- ============================================================ +-- store_products table +-- ============================================================ +ALTER TABLE store_products +ADD COLUMN IF NOT EXISTS last_modified_at TIMESTAMPTZ; + +ALTER TABLE store_products +ADD COLUMN IF NOT EXISTS last_modified_by_task VARCHAR(50); + +ALTER TABLE store_products +ADD COLUMN IF NOT EXISTS last_modified_task_id INTEGER; + +-- Index for querying recently modified products +CREATE INDEX IF NOT EXISTS idx_store_products_last_modified + ON store_products(last_modified_at DESC) + WHERE last_modified_at IS NOT NULL; + +-- Index for querying by task type +CREATE INDEX IF NOT EXISTS idx_store_products_modified_by_task + ON store_products(last_modified_by_task) + WHERE last_modified_by_task IS NOT NULL; + +-- Composite index for finding products modified by a specific task +CREATE INDEX IF NOT EXISTS idx_store_products_task_modified + ON store_products(dispensary_id, last_modified_at DESC) + WHERE last_modified_at IS NOT NULL; + +COMMENT ON COLUMN store_products.last_modified_at IS 'Timestamp when this record was last modified by a task'; +COMMENT ON COLUMN store_products.last_modified_by_task IS 'Task role that last modified this record (e.g., product_refresh, product_discovery)'; +COMMENT ON COLUMN store_products.last_modified_task_id IS 'ID of the worker_tasks record that last modified this'; diff --git a/backend/src/discovery/promotion.ts b/backend/src/discovery/promotion.ts index 8fb0fc04..7999d2f5 100644 --- a/backend/src/discovery/promotion.ts +++ b/backend/src/discovery/promotion.ts @@ -131,6 +131,14 @@ export interface PromotionSummary { newDispensaryIds: number[]; } +/** + * Task tracking info for modification audit trail + */ +export interface TaskTrackingInfo { + taskId: number; + taskRole: string; +} + /** * Generate a URL-safe slug from name and city */ @@ -283,7 +291,8 @@ async function ensureCrawlerProfile( * Idempotent: uses ON CONFLICT on platform_dispensary_id */ async function promoteLocation( - loc: DiscoveryLocationRow + loc: DiscoveryLocationRow, + taskTracking?: TaskTrackingInfo ): Promise { const slug = loc.platform_slug || generateSlug(loc.name, loc.city || '', loc.state_code || ''); @@ -325,13 +334,16 @@ async function promoteLocation( dutchie_verified, dutchie_verified_at, dutchie_discovery_id, + last_modified_at, + last_modified_by_task, + last_modified_task_id, created_at, updated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, - $31, $32, $33, $34, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + $31, $32, $33, $34, $35, $36, $37, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP ) ON CONFLICT (platform_dispensary_id) WHERE platform_dispensary_id IS NOT NULL DO UPDATE SET @@ -362,6 +374,9 @@ async function promoteLocation( country = EXCLUDED.country, status = EXCLUDED.status, dutchie_discovery_id = EXCLUDED.dutchie_discovery_id, + last_modified_at = EXCLUDED.last_modified_at, + last_modified_by_task = EXCLUDED.last_modified_by_task, + last_modified_task_id = EXCLUDED.last_modified_task_id, updated_at = CURRENT_TIMESTAMP RETURNING id, (xmax = 0) AS inserted `, [ @@ -399,6 +414,9 @@ async function promoteLocation( true, // $32 dutchie_verified new Date(), // $33 dutchie_verified_at loc.id, // $34 dutchie_discovery_id + taskTracking ? new Date() : null, // $35 last_modified_at + taskTracking?.taskRole || null, // $36 last_modified_by_task + taskTracking?.taskId || null, // $37 last_modified_task_id ]); const dispensaryId = upsertResult.rows[0].id; @@ -446,10 +464,12 @@ async function promoteLocation( * * @param stateCode Optional filter by state (e.g., 'CA', 'AZ') * @param dryRun If true, only validate without making changes + * @param taskTracking Optional task info for modification audit trail */ export async function promoteDiscoveredLocations( stateCode?: string, - dryRun = false + dryRun = false, + taskTracking?: TaskTrackingInfo ): Promise { const startTime = Date.now(); @@ -524,7 +544,7 @@ export async function promoteDiscoveredLocations( } try { - const promotionResult = await promoteLocation(loc); + const promotionResult = await promoteLocation(loc, taskTracking); results.push(promotionResult); if (promotionResult.action === 'created') { diff --git a/backend/src/routes/tasks.ts b/backend/src/routes/tasks.ts index d5bf729c..ea420f73 100644 --- a/backend/src/routes/tasks.ts +++ b/backend/src/routes/tasks.ts @@ -1182,17 +1182,20 @@ router.post('/batch/staggered', async (req: Request, res: Response) => { method ); - const totalDuration = (dispensary_ids.length - 1) * stagger_seconds; + const totalDuration = (result.created - 1) * stagger_seconds; const estimatedEndTime = new Date(Date.now() + totalDuration * 1000); res.status(201).json({ success: true, created: result.created, + skipped: result.skipped, task_ids: result.taskIds, stagger_seconds, total_duration_seconds: totalDuration, estimated_completion: estimatedEndTime.toISOString(), - message: `Created ${result.created} staggered ${role} tasks (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`, + message: result.skipped > 0 + ? `Created ${result.created} staggered ${role} tasks, skipped ${result.skipped} (duplicate/recently completed)` + : `Created ${result.created} staggered ${role} tasks (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`, }); } catch (error: unknown) { console.error('Error creating staggered tasks:', error); @@ -1326,6 +1329,107 @@ router.post('/batch/entry-point-discovery', async (req: Request, res: Response) } }); +/** + * POST /api/tasks/batch/store-discovery + * Create parallelized store_discovery_state tasks for all active states + * + * Instead of one monolithic store_discovery task that takes hours, + * this creates individual tasks for each state that can run in parallel. + * + * Body (optional): + * - stagger_seconds: number (default: 10) - Seconds between each state task + * - priority: number (default: 5) - Task priority + * - states: string[] (optional) - Specific state codes to discover (default: all active) + */ +router.post('/batch/store-discovery', async (req: Request, res: Response) => { + try { + const { + stagger_seconds = 10, + priority = 5, + states: specificStates, + } = req.body; + + // Get active states + let statesQuery = ` + SELECT code, name FROM states WHERE is_active = true + `; + const params: any[] = []; + + if (specificStates && Array.isArray(specificStates) && specificStates.length > 0) { + statesQuery += ` AND code = ANY($1)`; + params.push(specificStates.map((s: string) => s.toUpperCase())); + } + + statesQuery += ` ORDER BY code`; + + const statesResult = await pool.query(statesQuery, params); + + if (statesResult.rows.length === 0) { + return res.json({ + success: true, + message: 'No active states to discover', + tasks_created: 0, + }); + } + + // Check for existing pending/running store_discovery_state tasks + const existingResult = await pool.query(` + SELECT payload->>'state_code' as state_code + FROM worker_tasks + WHERE role = 'store_discovery_state' + AND status IN ('pending', 'claimed', 'running') + `); + const existingStates = new Set(existingResult.rows.map((r: any) => r.state_code)); + + // Filter out states that already have pending tasks + const statesToCreate = statesResult.rows.filter( + (s: { code: string }) => !existingStates.has(s.code) + ); + + if (statesToCreate.length === 0) { + return res.json({ + success: true, + message: 'All states already have pending store_discovery_state tasks', + tasks_created: 0, + skipped: statesResult.rows.length, + }); + } + + // Create staggered tasks for each state + const taskIds: number[] = []; + for (let i = 0; i < statesToCreate.length; i++) { + const state = statesToCreate[i]; + const scheduledFor = new Date(Date.now() + i * stagger_seconds * 1000); + + const result = await pool.query(` + INSERT INTO worker_tasks (role, priority, scheduled_for, method, payload) + VALUES ('store_discovery_state', $1, $2, 'http', $3) + RETURNING id + `, [priority, scheduledFor, JSON.stringify({ state_code: state.code })]); + + taskIds.push(result.rows[0].id); + } + + const totalDuration = statesToCreate.length * stagger_seconds; + const estimatedEndTime = new Date(Date.now() + totalDuration * 1000); + + res.status(201).json({ + success: true, + tasks_created: taskIds.length, + task_ids: taskIds, + states: statesToCreate.map((s: { code: string; name: string }) => s.code), + skipped: statesResult.rows.length - statesToCreate.length, + stagger_seconds, + total_duration_seconds: totalDuration, + estimated_start_completion: estimatedEndTime.toISOString(), + message: `Created ${taskIds.length} store_discovery_state tasks for parallel execution`, + }); + } catch (error: unknown) { + console.error('Error creating store discovery tasks:', error); + res.status(500).json({ error: 'Failed to create store discovery tasks' }); + } +}); + // ============================================================ // STATE-BASED CRAWL ENDPOINTS // ============================================================ @@ -1414,11 +1518,13 @@ router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => { state_name: state.name, tasks_created: result.created, stores_in_state: dispensariesResult.rows.length, - skipped: dispensariesResult.rows.length - result.created, + skipped: result.skipped, stagger_seconds, total_duration_seconds: totalDuration, estimated_completion: estimatedEndTime.toISOString(), - message: `Created ${result.created} product_discovery tasks for ${state.name} (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`, + message: result.skipped > 0 + ? `Created ${result.created} product_discovery tasks for ${state.name}, skipped ${result.skipped} (duplicate/recently completed)` + : `Created ${result.created} product_discovery tasks for ${state.name} (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`, }); } catch (error: unknown) { console.error('Error creating state crawl tasks:', error); diff --git a/backend/src/tasks/handlers/entry-point-discovery.ts b/backend/src/tasks/handlers/entry-point-discovery.ts index 008b280a..eae9e9a5 100644 --- a/backend/src/tasks/handlers/entry-point-discovery.ts +++ b/backend/src/tasks/handlers/entry-point-discovery.ts @@ -48,9 +48,12 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise= NOW() - INTERVAL '5 minutes' + `, [dispensaryId, task.role, task.id]); // ============================================================ // STEP 8: Mark payload as processed diff --git a/backend/src/tasks/handlers/store-discovery-http.ts b/backend/src/tasks/handlers/store-discovery-http.ts index e856cc45..e434a7d7 100644 --- a/backend/src/tasks/handlers/store-discovery-http.ts +++ b/backend/src/tasks/handlers/store-discovery-http.ts @@ -20,7 +20,7 @@ import { TaskContext, TaskResult } from '../task-worker'; import { upsertLocation } from '../../discovery/location-discovery'; -import { promoteDiscoveredLocations } from '../../discovery/promotion'; +import { promoteDiscoveredLocations, TaskTrackingInfo } from '../../discovery/promotion'; import { saveDiscoveryPayload } from '../../utils/payload-storage'; // GraphQL hashes - MUST match CLAUDE.md / dutchie/client.ts @@ -405,7 +405,12 @@ export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise 0) { console.log(`[StoreDiscoveryHTTP] Promoted ${promoted} locations in ${stateCode} (${promotionResult.created} new, ${promotionResult.updated} updated)`); diff --git a/backend/src/tasks/handlers/store-discovery-state.ts b/backend/src/tasks/handlers/store-discovery-state.ts new file mode 100644 index 00000000..294c2c9f --- /dev/null +++ b/backend/src/tasks/handlers/store-discovery-state.ts @@ -0,0 +1,468 @@ +/** + * Store Discovery State Handler (Parallelized) + * + * Discovers stores for a SINGLE state using Puppeteer + StealthPlugin. + * This enables parallel discovery across multiple workers. + * + * Task payload: { state_code: 'AZ' } + * + * Flow: + * 1. Launch browser with proxy + * 2. Fetch cities for the target state + * 3. Fetch stores for each city + * 4. Upsert to dutchie_discovery_locations + * 5. Auto-promote valid locations to dispensaries table + * 6. Save raw payload for historical analysis + */ + +import { TaskContext, TaskResult } from '../task-worker'; +import { upsertLocation } from '../../discovery/location-discovery'; +import { promoteDiscoveredLocations, TaskTrackingInfo } from '../../discovery/promotion'; +import { saveDiscoveryPayload } from '../../utils/payload-storage'; + +// GraphQL hashes - MUST match CLAUDE.md / dutchie/client.ts +const GET_ALL_CITIES_HASH = 'ae547a0466ace5a48f91e55bf6699eacd87e3a42841560f0c0eabed5a0a920e6'; +const CONSUMER_DISPENSARIES_HASH = '0a5bfa6ca1d64ae47bcccb7c8077c87147cbc4e6982c17ceec97a2a4948b311b'; + +interface DiscoveredLocation { + id: string; + name: string; + slug: string; + cName?: string; + address?: string; + city?: string; + state?: string; + zip?: string; + latitude?: number; + longitude?: number; + offerPickup?: boolean; + offerDelivery?: boolean; + isRecreational?: boolean; + isMedical?: boolean; + phone?: string; + email?: string; + website?: string; + description?: string; + logoImage?: string; + bannerImage?: string; + chainSlug?: string; + enterpriseId?: string; + retailType?: string; + status?: string; + timezone?: string; + location?: { + ln1?: string; + ln2?: string; + city?: string; + state?: string; + zipcode?: string; + country?: string; + geometry?: { coordinates?: [number, number] }; + }; +} + +export async function handleStoreDiscoveryState(ctx: TaskContext): Promise { + const { pool, task, crawlRotator, updateStep } = ctx; + const platform = task.platform || 'dutchie'; + + // Get state_code from task payload + const taskPayload = task.payload as { state_code?: string } | null; + const stateCode = taskPayload?.state_code; + + if (!stateCode) { + return { success: false, error: 'No state_code specified in task payload' }; + } + + let browser: any = null; + + try { + updateStep('starting', `Discovering stores in ${stateCode}`); + console.log(`[StoreDiscoveryState] Starting discovery for ${stateCode}`); + + // ============================================================ + // STEP 1: Setup Puppeteer with proxy + // ============================================================ + updateStep('preflight', 'Launching browser'); + const puppeteer = require('puppeteer-extra'); + const StealthPlugin = require('puppeteer-extra-plugin-stealth'); + puppeteer.use(StealthPlugin()); + + // Get proxy from CrawlRotator if available + let proxyUrl: string | null = null; + if (crawlRotator) { + const currentProxy = crawlRotator.proxy.getCurrent(); + if (currentProxy) { + proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy); + console.log(`[StoreDiscoveryState] Using proxy: ${currentProxy.host}:${currentProxy.port}`); + } + } + + // Build browser args + const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox']; + if (proxyUrl) { + const proxyUrlParsed = new URL(proxyUrl); + browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`); + } + + browser = await puppeteer.launch({ + headless: 'new', + args: browserArgs, + }); + + const page = await browser.newPage(); + + // Setup proxy auth if needed + if (proxyUrl) { + const proxyUrlParsed = new URL(proxyUrl); + if (proxyUrlParsed.username && proxyUrlParsed.password) { + await page.authenticate({ + username: decodeURIComponent(proxyUrlParsed.username), + password: decodeURIComponent(proxyUrlParsed.password), + }); + } + } + + await ctx.heartbeat(); + + // ============================================================ + // STEP 2: Establish session by visiting dispensaries page + // ============================================================ + updateStep('navigating', 'Loading session page'); + const sessionUrl = 'https://dutchie.com/dispensaries'; + console.log(`[StoreDiscoveryState] Establishing session at ${sessionUrl}...`); + + await page.goto(sessionUrl, { + waitUntil: 'networkidle2', + timeout: 60000, + }); + + // Handle potential age gate + try { + await page.waitForTimeout(1500); + await page.evaluate(() => { + const buttons = Array.from(document.querySelectorAll('button')); + for (const btn of buttons) { + const text = btn.textContent?.toLowerCase() || ''; + if (text.includes('yes') || text.includes('enter') || text.includes('21')) { + (btn as HTMLButtonElement).click(); + return true; + } + } + return false; + }); + } catch { + // Age gate might not be present + } + + console.log(`[StoreDiscoveryState] Session established`); + + await ctx.heartbeat(); + + // ============================================================ + // STEP 3: Fetch cities for this state via GraphQL + // ============================================================ + updateStep('fetching', `Fetching cities for ${stateCode}`); + const citiesResult = await page.evaluate(async (hash: string, targetState: string) => { + const logs: string[] = []; + try { + const extensions = { + persistedQuery: { version: 1, sha256Hash: hash }, + }; + const qs = new URLSearchParams({ + operationName: 'getAllCitiesByState', + variables: JSON.stringify({}), + extensions: JSON.stringify(extensions), + }); + const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`; + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Accept': 'application/json', + 'content-type': 'application/json', + }, + credentials: 'include', + }); + + logs.push(`getAllCitiesByState: HTTP ${response.status}`); + + if (!response.ok) { + return { cities: [], logs }; + } + + const json = await response.json(); + const statesData = json?.data?.statesWithDispensaries || []; + + // Find our target state + const stateData = statesData.find((s: any) => + s.name?.toUpperCase() === targetState.toUpperCase() + ); + + if (!stateData) { + logs.push(`State ${targetState} not found in response`); + return { cities: [], logs }; + } + + const cities = Array.isArray(stateData.cities) + ? stateData.cities.filter((c: string | null) => c !== null) + : []; + + logs.push(`Found ${cities.length} cities for ${targetState}`); + return { cities, country: stateData.country || 'US', logs }; + } catch (err: any) { + logs.push(`Error: ${err.message}`); + return { cities: [], logs }; + } + }, GET_ALL_CITIES_HASH, stateCode); + + citiesResult.logs.forEach((log: string) => console.log(`[Browser] ${log}`)); + + if (citiesResult.cities.length === 0) { + await browser.close(); + return { + success: true, + stateCode, + storesDiscovered: 0, + message: `No cities found for ${stateCode}` + }; + } + + console.log(`[StoreDiscoveryState] Discovering ${citiesResult.cities.length} cities in ${stateCode}...`); + + await ctx.heartbeat(); + + // ============================================================ + // STEP 4: Fetch stores for each city + // ============================================================ + let totalDiscovered = 0; + let totalUpserted = 0; + const allNewStoreIds: number[] = []; + const stateRawStores: any[] = []; + const stateCityData: { city: string; stores: any[] }[] = []; + + for (const city of citiesResult.cities) { + try { + const cityResult = await page.evaluate(async ( + cityName: string, + stateCodeParam: string, + hash: string + ) => { + const logs: string[] = []; + const allDispensaries: any[] = []; + let page = 0; + const perPage = 200; + + try { + while (page < 5) { // Max 5 pages per city + const variables = { + dispensaryFilter: { + activeOnly: true, + city: cityName, + state: stateCodeParam, + }, + page, + perPage, + }; + + const extensions = { + persistedQuery: { version: 1, sha256Hash: hash }, + }; + + const qs = new URLSearchParams({ + operationName: 'ConsumerDispensaries', + variables: JSON.stringify(variables), + extensions: JSON.stringify(extensions), + }); + const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`; + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Accept': 'application/json', + 'content-type': 'application/json', + }, + credentials: 'include', + }); + + if (!response.ok) { + logs.push(`${cityName}: HTTP ${response.status}`); + break; + } + + const json = await response.json(); + const dispensaries = json?.data?.filteredDispensaries || []; + + if (dispensaries.length === 0) { + break; + } + + // Filter to ensure correct state + const stateFiltered = dispensaries.filter((d: any) => + d.location?.state?.toUpperCase() === stateCodeParam.toUpperCase() + ); + allDispensaries.push(...stateFiltered); + + if (dispensaries.length < perPage) { + break; + } + page++; + + // Small delay between pages + await new Promise(r => setTimeout(r, 100)); + } + + logs.push(`${cityName}: ${allDispensaries.length} stores`); + } catch (err: any) { + logs.push(`${cityName}: Error - ${err.message}`); + } + + return { dispensaries: allDispensaries, logs }; + }, city, stateCode, CONSUMER_DISPENSARIES_HASH); + + cityResult.logs.forEach((log: string) => console.log(`[Browser] ${log}`)); + + // Accumulate raw store data + stateRawStores.push(...cityResult.dispensaries); + stateCityData.push({ city, stores: cityResult.dispensaries }); + + // Upsert each discovered location + for (const disp of cityResult.dispensaries) { + try { + const location = normalizeLocation(disp); + if (!location.id) { + continue; // Skip locations without platform ID + } + + const result = await upsertLocation(pool, location as any, null); + if (result) { + totalUpserted++; + if (result.isNew) { + totalDiscovered++; + } + } + } catch (err: any) { + console.error(`[StoreDiscoveryState] Upsert error for ${disp.name}:`, err.message); + } + } + + // Small delay between cities to avoid rate limiting + await new Promise(r => setTimeout(r, 300)); + } catch (err: any) { + console.error(`[StoreDiscoveryState] Error fetching ${city}, ${stateCode}:`, err.message); + } + + // Heartbeat every few cities + if (stateCityData.length % 10 === 0) { + await ctx.heartbeat(); + } + } + + await ctx.heartbeat(); + + // ============================================================ + // STEP 5: Save raw payload for this state + // ============================================================ + if (stateRawStores.length > 0) { + try { + const rawPayload = { + stateCode, + platform, + fetchedAt: new Date().toISOString(), + storeCount: stateRawStores.length, + citiesProcessed: stateCityData.length, + cities: stateCityData, + stores: stateRawStores, + }; + + const payloadResult = await saveDiscoveryPayload(pool, stateCode, rawPayload, stateRawStores.length); + console.log(`[StoreDiscoveryState] Saved payload for ${stateCode}: ${stateRawStores.length} stores (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`); + } catch (err: any) { + console.error(`[StoreDiscoveryState] Failed to save payload for ${stateCode}:`, err.message); + } + } + + // ============================================================ + // STEP 6: Auto-promote valid locations + // ============================================================ + try { + // Pass task tracking info for modification audit trail + const taskTracking: TaskTrackingInfo = { + taskId: task.id, + taskRole: task.role, + }; + const promotionResult = await promoteDiscoveredLocations(stateCode, false, taskTracking); + const promoted = promotionResult.created + promotionResult.updated; + if (promoted > 0) { + console.log(`[StoreDiscoveryState] Promoted ${promoted} locations in ${stateCode} (${promotionResult.created} new, ${promotionResult.updated} updated)`); + const newIds = (promotionResult as any).newDispensaryIds || []; + allNewStoreIds.push(...newIds); + } + } catch (err: any) { + console.error(`[StoreDiscoveryState] Promotion error for ${stateCode}:`, err.message); + } + + await browser.close(); + browser = null; + + console.log(`[StoreDiscoveryState] Complete for ${stateCode}: ${totalDiscovered} new, ${totalUpserted} upserted`); + + return { + success: true, + stateCode, + storesDiscovered: totalDiscovered, + storesUpserted: totalUpserted, + citiesProcessed: stateCityData.length, + newStoreIds: allNewStoreIds, + }; + + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + console.error(`[StoreDiscoveryState] Error for ${stateCode}:`, errorMessage); + return { + success: false, + stateCode, + error: errorMessage, + }; + } finally { + if (browser) { + await browser.close().catch(() => {}); + } + } +} + +/** + * Normalize a raw dispensary response to our DiscoveredLocation format + */ +function normalizeLocation(raw: any): DiscoveredLocation { + const loc = raw.location || {}; + const coords = loc.geometry?.coordinates || []; + + return { + id: raw.id || raw._id || '', + name: raw.name || '', + slug: raw.slug || raw.cName || '', + cName: raw.cName || raw.slug || '', + address: raw.address || loc.ln1 || '', + city: raw.city || loc.city || '', + state: raw.state || loc.state || '', + zip: raw.zip || loc.zipcode || loc.zip || '', + latitude: coords[1] || raw.latitude, + longitude: coords[0] || raw.longitude, + timezone: raw.timezone || '', + offerPickup: raw.offerPickup ?? raw.storeSettings?.offerPickup ?? true, + offerDelivery: raw.offerDelivery ?? raw.storeSettings?.offerDelivery ?? false, + isRecreational: raw.isRecreational ?? raw.recDispensary ?? true, + isMedical: raw.isMedical ?? raw.medicalDispensary ?? true, + phone: raw.phone || '', + email: raw.email || '', + website: raw.embedBackUrl || '', + description: raw.description || '', + logoImage: raw.logoImage || '', + bannerImage: raw.bannerImage || '', + chainSlug: raw.chain || '', + enterpriseId: raw.retailer?.enterpriseId || '', + retailType: raw.retailType || '', + status: raw.status || '', + location: loc, + }; +} diff --git a/backend/src/tasks/task-service.ts b/backend/src/tasks/task-service.ts index a455b3ba..55b2148c 100644 --- a/backend/src/tasks/task-service.ts +++ b/backend/src/tasks/task-service.ts @@ -28,6 +28,7 @@ async function tableExists(tableName: string): Promise { // product_refresh: Legacy role (deprecated but kept for compatibility) export type TaskRole = | 'store_discovery' + | 'store_discovery_state' // Per-state parallelized store discovery | 'entry_point_discovery' | 'product_discovery' | 'payload_fetch' // Fetches from API, saves to disk @@ -706,37 +707,67 @@ class TaskService { role: TaskRole, staggerSeconds: number = 15, platform: string = 'dutchie', - method: 'curl' | 'http' | null = null - ): Promise<{ created: number; taskIds: number[] }> { + method: 'curl' | 'http' | null = null, + options: { skipRecentHours?: number } = {} + ): Promise<{ created: number; skipped: number; taskIds: number[] }> { if (dispensaryIds.length === 0) { - return { created: 0, taskIds: [] }; + return { created: 0, skipped: 0, taskIds: [] }; } - // Use a single INSERT with generate_series for efficiency + const { skipRecentHours = 4 } = options; // Skip if completed within last 4 hours + + // Filter out dispensaries that: + // 1. Already have a pending/claimed/running task for this role + // 2. Had this role completed recently (within skipRecentHours) const result = await pool.query(` - WITH task_data AS ( - SELECT - unnest($1::int[]) as dispensary_id, - generate_series(0, array_length($1::int[], 1) - 1) as idx + WITH input_ids AS ( + SELECT unnest($1::int[]) as dispensary_id + ), + eligible_ids AS ( + SELECT i.dispensary_id + FROM input_ids i + WHERE NOT EXISTS ( + -- No pending/active task for same role + SELECT 1 FROM worker_tasks t + WHERE t.dispensary_id = i.dispensary_id + AND t.role = $2 + AND t.status IN ('pending', 'claimed', 'running') + ) + AND NOT EXISTS ( + -- No recent completion for same role + SELECT 1 FROM worker_tasks t + WHERE t.dispensary_id = i.dispensary_id + AND t.role = $2 + AND t.status = 'completed' + AND t.completed_at > NOW() - ($6::int * INTERVAL '1 hour') + ) + ), + numbered AS ( + SELECT dispensary_id, ROW_NUMBER() OVER (ORDER BY dispensary_id) - 1 as idx + FROM eligible_ids ) INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status) SELECT $2::varchar as role, - td.dispensary_id, + n.dispensary_id, $3::varchar as platform, $4::varchar as method, - NOW() + (td.idx * $5::int * INTERVAL '1 second') as scheduled_for, + NOW() + (n.idx * $5::int * INTERVAL '1 second') as scheduled_for, 'pending' as status - FROM task_data td - ON CONFLICT DO NOTHING + FROM numbered n RETURNING id - `, [dispensaryIds, role, platform, method, staggerSeconds]); + `, [dispensaryIds, role, platform, method, staggerSeconds, skipRecentHours]); const taskIds = result.rows.map((r: { id: number }) => r.id); + const skipped = dispensaryIds.length - taskIds.length; - console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks (${staggerSeconds}s apart)`); + if (skipped > 0) { + console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks, skipped ${skipped} (duplicate/recent)`); + } else { + console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks (${staggerSeconds}s apart)`); + } - return { created: taskIds.length, taskIds }; + return { created: taskIds.length, skipped, taskIds }; } /** diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 8e4d9f05..5095e8be 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -76,6 +76,7 @@ import { handleProductDiscovery } from './handlers/product-discovery-curl'; import { handleProductDiscoveryHttp } from './handlers/product-discovery-http'; import { handleStoreDiscovery } from './handlers/store-discovery'; import { handleStoreDiscoveryHttp } from './handlers/store-discovery-http'; +import { handleStoreDiscoveryState } from './handlers/store-discovery-state'; import { handleEntryPointDiscovery } from './handlers/entry-point-discovery'; import { handleAnalyticsRefresh } from './handlers/analytics-refresh'; import { handleWhoami } from './handlers/whoami'; @@ -159,6 +160,7 @@ const TASK_HANDLERS: Record = { product_refresh: handleProductRefresh, // disk -> DB product_discovery: handleProductDiscovery, // Default: curl (see getHandlerForTask for http override) store_discovery: handleStoreDiscovery, + store_discovery_state: handleStoreDiscoveryState, // Per-state parallelized discovery entry_point_discovery: handleEntryPointDiscovery, analytics_refresh: handleAnalyticsRefresh, whoami: handleWhoami, // Tests proxy + anti-detect @@ -221,6 +223,7 @@ export class TaskWorker { private isRunning: boolean = false; private heartbeatInterval: NodeJS.Timeout | null = null; private registryHeartbeatInterval: NodeJS.Timeout | null = null; + private staleCleanupInterval: NodeJS.Timeout | null = null; private crawlRotator: CrawlRotator; // ========================================================================== @@ -798,6 +801,44 @@ export class TaskWorker { } } + /** + * Run stale task cleanup once + * Recovers tasks left in claimed/running status after worker crashes + */ + private async runStaleTaskCleanup(): Promise { + try { + console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`); + const cleanupResult = await taskService.cleanupStaleTasks(30); // 30 minute threshold + if (cleanupResult.cleaned > 0) { + console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`); + } + } catch (err: any) { + console.error(`[TaskWorker] Stale task cleanup error:`, err.message); + } + } + + /** + * Start periodic stale task cleanup (every 10 minutes) + * Only run by worker-0 to avoid races + */ + private startPeriodicStaleCleanup(): void { + const STALE_CLEANUP_INTERVAL_MS = 10 * 60 * 1000; // 10 minutes + this.staleCleanupInterval = setInterval(async () => { + await this.runStaleTaskCleanup(); + }, STALE_CLEANUP_INTERVAL_MS); + console.log(`[TaskWorker] ${this.friendlyName} started periodic stale cleanup (every 10 min)`); + } + + /** + * Stop periodic stale task cleanup + */ + private stopPeriodicStaleCleanup(): void { + if (this.staleCleanupInterval) { + clearInterval(this.staleCleanupInterval); + this.staleCleanupInterval = null; + } + } + /** * Start the worker loop * @@ -814,18 +855,14 @@ export class TaskWorker { // Start registry heartbeat immediately this.startRegistryHeartbeat(); - // Cleanup stale tasks on startup (only worker-0 does this to avoid races) - // This handles tasks left in 'claimed'/'running' status when workers restart + // Cleanup stale tasks on startup and periodically (only worker-0 does this to avoid races) + // This handles tasks left in 'claimed'/'running' status when workers restart or crash if (this.workerId.endsWith('-0') || this.workerId === 'scraper-worker-0') { - try { - console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`); - const cleanupResult = await taskService.cleanupStaleTasks(30); // 30 minute threshold - if (cleanupResult.cleaned > 0) { - console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`); - } - } catch (err: any) { - console.error(`[TaskWorker] Stale task cleanup error:`, err.message); - } + // Run immediately on startup + await this.runStaleTaskCleanup(); + + // Start periodic cleanup every 10 minutes + this.startPeriodicStaleCleanup(); } const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)'; @@ -980,6 +1017,7 @@ export class TaskWorker { this.isRunning = false; this.stopHeartbeat(); this.stopRegistryHeartbeat(); + this.stopPeriodicStaleCleanup(); await this.deregister(); console.log(`[TaskWorker] ${this.friendlyName} stopped`); }