refactor(scheduler): separate detection from product crawl jobs

Product crawl job now only targets ready dispensaries (menu_type=dutchie
AND platform_dispensary_id IS NOT NULL). Detection is handled by the
separate menu_detection schedule.

This ensures:
- Single path for menu discovery (menu_detection job)
- Product crawl only processes ready dutchie stores
- All 5 workers claim jobs via queue/locking

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-03 18:16:06 -07:00
parent e234dc2947
commit 54f40d26bb

View File

@@ -13,6 +13,9 @@
import { query, getClient } from '../db/connection';
import { crawlDispensaryProducts, CrawlResult } from './product-crawler';
import { mapDbRowToDispensary } from './discovery';
import { executeMenuDetectionJob } from './menu-detection';
import { bulkEnqueueJobs, enqueueJob, getQueueStats } from './job-queue';
import { JobSchedule, JobStatus, Dispensary } from '../types';
// Scheduler poll interval (how often we check for due jobs)
@@ -429,6 +432,8 @@ async function executeJob(schedule: JobSchedule): Promise<{
return executeProductCrawl(config);
case 'dutchie_az_discovery':
return executeDiscovery(config);
case 'dutchie_az_menu_detection':
return executeMenuDetectionJob(config);
default:
throw new Error(`Unknown job type: ${schedule.jobName}`);
}
@@ -436,6 +441,16 @@ async function executeJob(schedule: JobSchedule): Promise<{
/**
* Execute the AZ Dutchie product crawl job
*
* NEW BEHAVIOR: Instead of running crawls directly, this now ENQUEUES jobs
* into the crawl_jobs queue. Workers (running as separate replicas) will
* pick up and process these jobs.
*
* This allows:
* - Multiple workers to process jobs in parallel
* - No double-crawls (DB-level locking per dispensary)
* - Better scalability (add more worker replicas)
* - Live monitoring of individual job progress
*/
async function executeProductCrawl(config: Record<string, any>): Promise<{
status: JobStatus;
@@ -448,69 +463,59 @@ async function executeProductCrawl(config: Record<string, any>): Promise<{
const pricingType = config.pricingType || 'rec';
const useBothModes = config.useBothModes !== false;
// Get all dispensaries with platform IDs
const { rows: dispensaries } = await query<Dispensary>(
// Get all "ready" dispensaries (menu_type='dutchie' AND platform_dispensary_id IS NOT NULL AND not failed)
// Note: Menu detection is handled separately by the dutchie_az_menu_detection schedule
const { rows: rawRows } = await query(
`
SELECT * FROM dispensaries
WHERE state = 'AZ' AND menu_type = 'dutchie' AND platform_dispensary_id IS NOT NULL
ORDER BY last_crawled_at ASC NULLS FIRST
SELECT id FROM dispensaries
WHERE state = 'AZ'
AND menu_type = 'dutchie'
AND platform_dispensary_id IS NOT NULL
AND failed_at IS NULL
ORDER BY last_crawl_at ASC NULLS FIRST
`
);
const dispensaryIds = rawRows.map((r: any) => r.id);
if (dispensaries.length === 0) {
if (dispensaryIds.length === 0) {
return {
status: 'success',
itemsProcessed: 0,
itemsSucceeded: 0,
itemsFailed: 0,
metadata: { message: 'No dispensaries to crawl' },
metadata: { message: 'No ready dispensaries to crawl. Run menu detection to discover more.' },
};
}
console.log(`[Scheduler] Crawling ${dispensaries.length} dispensaries...`);
console.log(`[Scheduler] Enqueueing crawl jobs for ${dispensaryIds.length} dispensaries...`);
let succeeded = 0;
let failed = 0;
let totalProducts = 0;
let totalSnapshots = 0;
const errors: string[] = [];
for (const dispensary of dispensaries) {
try {
const result = await crawlDispensaryProducts(dispensary, pricingType, { useBothModes });
if (result.success) {
succeeded++;
totalProducts += result.productsUpserted;
totalSnapshots += result.snapshotsCreated;
} else {
failed++;
if (result.errorMessage) {
errors.push(`${dispensary.name}: ${result.errorMessage}`);
}
}
// Delay between dispensaries
await new Promise(r => setTimeout(r, 5000));
} catch (error: any) {
failed++;
errors.push(`${dispensary.name}: ${error.message}`);
// Bulk enqueue jobs (skips dispensaries that already have pending/running jobs)
const { enqueued, skipped } = await bulkEnqueueJobs(
'dutchie_product_crawl',
dispensaryIds,
{
priority: 0,
metadata: { pricingType, useBothModes },
}
}
);
const status: JobStatus = failed === 0 ? 'success' : succeeded === 0 ? 'error' : 'partial';
console.log(`[Scheduler] Enqueued ${enqueued} jobs, skipped ${skipped} (already queued)`);
// Get current queue stats
const queueStats = await getQueueStats();
return {
status,
itemsProcessed: dispensaries.length,
itemsSucceeded: succeeded,
itemsFailed: failed,
errorMessage: errors.length > 0 ? errors.slice(0, 5).join('; ') : undefined,
status: 'success',
itemsProcessed: dispensaryIds.length,
itemsSucceeded: enqueued,
itemsFailed: 0, // Enqueue itself doesn't fail
metadata: {
totalProducts,
totalSnapshots,
enqueued,
skipped,
queueStats,
pricingType,
useBothModes,
message: `Enqueued ${enqueued} jobs. Workers will process them. Check /scraper-monitor for progress.`,
},
};
}
@@ -748,6 +753,21 @@ export async function initializeDefaultSchedules(): Promise<void> {
});
console.log('[Scheduler] Created default product crawl schedule');
}
// Check if menu detection schedule exists
const menuDetectionExists = schedules.some(s => s.jobName === 'dutchie_az_menu_detection');
if (!menuDetectionExists) {
await createSchedule({
jobName: 'dutchie_az_menu_detection',
description: 'Detect menu providers and resolve platform IDs for AZ dispensaries',
enabled: true,
baseIntervalMinutes: 1440, // 24 hours
jitterMinutes: 60, // ±1 hour
jobConfig: { state: 'AZ', onlyUnknown: true },
startImmediately: false,
});
console.log('[Scheduler] Created default menu detection schedule');
}
}
// Re-export for backward compatibility