From 54f40d26bb25eb1b818f5e25cf79b7346486cfb8 Mon Sep 17 00:00:00 2001 From: Kelly Date: Wed, 3 Dec 2025 18:16:06 -0700 Subject: [PATCH] refactor(scheduler): separate detection from product crawl jobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Product crawl job now only targets ready dispensaries (menu_type=dutchie AND platform_dispensary_id IS NOT NULL). Detection is handled by the separate menu_detection schedule. This ensures: - Single path for menu discovery (menu_detection job) - Product crawl only processes ready dutchie stores - All 5 workers claim jobs via queue/locking 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- backend/src/dutchie-az/services/scheduler.ts | 106 +++++++++++-------- 1 file changed, 63 insertions(+), 43 deletions(-) diff --git a/backend/src/dutchie-az/services/scheduler.ts b/backend/src/dutchie-az/services/scheduler.ts index 125cf877..84a93e58 100644 --- a/backend/src/dutchie-az/services/scheduler.ts +++ b/backend/src/dutchie-az/services/scheduler.ts @@ -13,6 +13,9 @@ import { query, getClient } from '../db/connection'; import { crawlDispensaryProducts, CrawlResult } from './product-crawler'; +import { mapDbRowToDispensary } from './discovery'; +import { executeMenuDetectionJob } from './menu-detection'; +import { bulkEnqueueJobs, enqueueJob, getQueueStats } from './job-queue'; import { JobSchedule, JobStatus, Dispensary } from '../types'; // Scheduler poll interval (how often we check for due jobs) @@ -429,6 +432,8 @@ async function executeJob(schedule: JobSchedule): Promise<{ return executeProductCrawl(config); case 'dutchie_az_discovery': return executeDiscovery(config); + case 'dutchie_az_menu_detection': + return executeMenuDetectionJob(config); default: throw new Error(`Unknown job type: ${schedule.jobName}`); } @@ -436,6 +441,16 @@ async function executeJob(schedule: JobSchedule): Promise<{ /** * Execute the AZ Dutchie product crawl job + * + * NEW BEHAVIOR: Instead of running crawls directly, this now ENQUEUES jobs + * into the crawl_jobs queue. Workers (running as separate replicas) will + * pick up and process these jobs. + * + * This allows: + * - Multiple workers to process jobs in parallel + * - No double-crawls (DB-level locking per dispensary) + * - Better scalability (add more worker replicas) + * - Live monitoring of individual job progress */ async function executeProductCrawl(config: Record): Promise<{ status: JobStatus; @@ -448,69 +463,59 @@ async function executeProductCrawl(config: Record): Promise<{ const pricingType = config.pricingType || 'rec'; const useBothModes = config.useBothModes !== false; - // Get all dispensaries with platform IDs - const { rows: dispensaries } = await query( + // Get all "ready" dispensaries (menu_type='dutchie' AND platform_dispensary_id IS NOT NULL AND not failed) + // Note: Menu detection is handled separately by the dutchie_az_menu_detection schedule + const { rows: rawRows } = await query( ` - SELECT * FROM dispensaries - WHERE state = 'AZ' AND menu_type = 'dutchie' AND platform_dispensary_id IS NOT NULL - ORDER BY last_crawled_at ASC NULLS FIRST + SELECT id FROM dispensaries + WHERE state = 'AZ' + AND menu_type = 'dutchie' + AND platform_dispensary_id IS NOT NULL + AND failed_at IS NULL + ORDER BY last_crawl_at ASC NULLS FIRST ` ); + const dispensaryIds = rawRows.map((r: any) => r.id); - if (dispensaries.length === 0) { + if (dispensaryIds.length === 0) { return { status: 'success', itemsProcessed: 0, itemsSucceeded: 0, itemsFailed: 0, - metadata: { message: 'No dispensaries to crawl' }, + metadata: { message: 'No ready dispensaries to crawl. Run menu detection to discover more.' }, }; } - console.log(`[Scheduler] Crawling ${dispensaries.length} dispensaries...`); + console.log(`[Scheduler] Enqueueing crawl jobs for ${dispensaryIds.length} dispensaries...`); - let succeeded = 0; - let failed = 0; - let totalProducts = 0; - let totalSnapshots = 0; - const errors: string[] = []; - - for (const dispensary of dispensaries) { - try { - const result = await crawlDispensaryProducts(dispensary, pricingType, { useBothModes }); - - if (result.success) { - succeeded++; - totalProducts += result.productsUpserted; - totalSnapshots += result.snapshotsCreated; - } else { - failed++; - if (result.errorMessage) { - errors.push(`${dispensary.name}: ${result.errorMessage}`); - } - } - - // Delay between dispensaries - await new Promise(r => setTimeout(r, 5000)); - } catch (error: any) { - failed++; - errors.push(`${dispensary.name}: ${error.message}`); + // Bulk enqueue jobs (skips dispensaries that already have pending/running jobs) + const { enqueued, skipped } = await bulkEnqueueJobs( + 'dutchie_product_crawl', + dispensaryIds, + { + priority: 0, + metadata: { pricingType, useBothModes }, } - } + ); - const status: JobStatus = failed === 0 ? 'success' : succeeded === 0 ? 'error' : 'partial'; + console.log(`[Scheduler] Enqueued ${enqueued} jobs, skipped ${skipped} (already queued)`); + + // Get current queue stats + const queueStats = await getQueueStats(); return { - status, - itemsProcessed: dispensaries.length, - itemsSucceeded: succeeded, - itemsFailed: failed, - errorMessage: errors.length > 0 ? errors.slice(0, 5).join('; ') : undefined, + status: 'success', + itemsProcessed: dispensaryIds.length, + itemsSucceeded: enqueued, + itemsFailed: 0, // Enqueue itself doesn't fail metadata: { - totalProducts, - totalSnapshots, + enqueued, + skipped, + queueStats, pricingType, useBothModes, + message: `Enqueued ${enqueued} jobs. Workers will process them. Check /scraper-monitor for progress.`, }, }; } @@ -748,6 +753,21 @@ export async function initializeDefaultSchedules(): Promise { }); console.log('[Scheduler] Created default product crawl schedule'); } + + // Check if menu detection schedule exists + const menuDetectionExists = schedules.some(s => s.jobName === 'dutchie_az_menu_detection'); + if (!menuDetectionExists) { + await createSchedule({ + jobName: 'dutchie_az_menu_detection', + description: 'Detect menu providers and resolve platform IDs for AZ dispensaries', + enabled: true, + baseIntervalMinutes: 1440, // 24 hours + jitterMinutes: 60, // ±1 hour + jobConfig: { state: 'AZ', onlyUnknown: true }, + startImmediately: false, + }); + console.log('[Scheduler] Created default menu detection schedule'); + } } // Re-export for backward compatibility