#!/usr/bin/env npx tsx /** * Queue Intelligence Script * * Orchestrates the multi-category intelligence crawler system: * 1. Queue dispensaries that need provider detection (all 4 categories) * 2. Queue per-category production crawls (Dutchie products only for now) * 3. Queue per-category sandbox crawls (all providers) * * Each category (product, specials, brand, metadata) is handled independently. * A failure in one category does NOT affect other categories. * * Usage: * npx tsx src/scripts/queue-intelligence.ts [--detection] [--production] [--sandbox] [--all] * npx tsx src/scripts/queue-intelligence.ts --category=product --sandbox * npx tsx src/scripts/queue-intelligence.ts --process --category=product * npx tsx src/scripts/queue-intelligence.ts --dry-run */ import { pool } from '../db/pool'; import { logger } from '../services/logger'; import { detectMultiCategoryProviders, updateAllCategoryProviders, IntelligenceCategory, } from '../services/intelligence-detector'; import { runCrawlProductsJob, runCrawlSpecialsJob, runCrawlBrandIntelligenceJob, runCrawlMetadataJob, runSandboxProductsJob, runSandboxSpecialsJob, runSandboxBrandJob, runSandboxMetadataJob, runAllCategoryProductionCrawls, runAllCategorySandboxCrawls, processCategorySandboxJobs, } from '../services/category-crawler-jobs'; // Parse command line args const args = process.argv.slice(2); const flags = { detection: args.includes('--detection') || args.includes('--all'), production: args.includes('--production') || args.includes('--all'), sandbox: args.includes('--sandbox') || args.includes('--all'), dryRun: args.includes('--dry-run'), process: args.includes('--process'), help: args.includes('--help') || args.includes('-h'), limit: parseInt(args.find(a => a.startsWith('--limit='))?.split('=')[1] || '10'), category: args.find(a => a.startsWith('--category='))?.split('=')[1] as IntelligenceCategory | undefined, dispensary: parseInt(args.find(a => a.startsWith('--dispensary='))?.split('=')[1] || '0'), }; // If no specific flags, default to all if (!flags.detection && !flags.production && !flags.sandbox && !flags.process) { flags.detection = true; flags.production = true; flags.sandbox = true; } const CATEGORIES: IntelligenceCategory[] = ['product', 'specials', 'brand', 'metadata']; async function showHelp() { console.log(` Queue Intelligence - Multi-Category Crawler Orchestration USAGE: npx tsx src/scripts/queue-intelligence.ts [OPTIONS] OPTIONS: --detection Queue dispensaries that need multi-category detection --production Queue per-category production crawls --sandbox Queue per-category sandbox crawls --all Queue all job types (default if no specific flag) --process Process queued jobs instead of just queuing --category=CATEGORY Filter to specific category (product|specials|brand|metadata) --dispensary=ID Process only a specific dispensary --dry-run Show what would be queued without making changes --limit=N Maximum dispensaries to queue per type (default: 10) --help, -h Show this help message CATEGORIES: product - Product/menu data (Dutchie=production, others=sandbox) specials - Deals and specials (all sandbox for now) brand - Brand intelligence (all sandbox for now) metadata - Categories/taxonomy (all sandbox for now) EXAMPLES: # Queue all dispensaries for appropriate jobs npx tsx src/scripts/queue-intelligence.ts # Only queue product detection jobs npx tsx src/scripts/queue-intelligence.ts --detection --category=product # Process sandbox jobs for specials category npx tsx src/scripts/queue-intelligence.ts --process --category=specials --limit=5 # Run full detection for a specific dispensary npx tsx src/scripts/queue-intelligence.ts --process --detection --dispensary=123 # Dry run to see what would be queued npx tsx src/scripts/queue-intelligence.ts --dry-run `); } async function queueMultiCategoryDetection(): Promise { console.log('\n๐Ÿ“ก Queueing Multi-Category Detection Jobs...'); // Find dispensaries that need provider detection for any category: // - Any *_provider is null OR // - Any *_confidence < 70 // - has a website URL const query = ` SELECT id, name, website, menu_url, product_provider, product_confidence, product_crawler_mode, specials_provider, specials_confidence, specials_crawler_mode, brand_provider, brand_confidence, brand_crawler_mode, metadata_provider, metadata_confidence, metadata_crawler_mode FROM dispensaries WHERE (website IS NOT NULL OR menu_url IS NOT NULL) AND ( product_provider IS NULL OR product_confidence < 70 OR specials_provider IS NULL OR specials_confidence < 70 OR brand_provider IS NULL OR brand_confidence < 70 OR metadata_provider IS NULL OR metadata_confidence < 70 ) ORDER BY CASE WHEN product_provider IS NULL THEN 0 ELSE 1 END, product_confidence ASC LIMIT $1 `; const result = await pool.query(query, [flags.limit]); if (flags.dryRun) { console.log(` Would queue ${result.rows.length} dispensaries for multi-category detection:`); for (const row of result.rows) { const needsDetection: string[] = []; if (!row.product_provider || row.product_confidence < 70) needsDetection.push('product'); if (!row.specials_provider || row.specials_confidence < 70) needsDetection.push('specials'); if (!row.brand_provider || row.brand_confidence < 70) needsDetection.push('brand'); if (!row.metadata_provider || row.metadata_confidence < 70) needsDetection.push('metadata'); console.log(` - [${row.id}] ${row.name} (needs: ${needsDetection.join(', ')})`); } return result.rows.length; } let queued = 0; for (const dispensary of result.rows) { try { // Create detection jobs for each category that needs it for (const category of CATEGORIES) { const provider = dispensary[`${category}_provider`]; const confidence = dispensary[`${category}_confidence`]; if (!provider || confidence < 70) { await pool.query( `INSERT INTO sandbox_crawl_jobs (dispensary_id, category, job_type, status, priority) VALUES ($1, $2, 'detection', 'pending', 10) ON CONFLICT DO NOTHING`, [dispensary.id, category] ); } } console.log(` โœ“ Queued detection: [${dispensary.id}] ${dispensary.name}`); queued++; } catch (error: any) { console.error(` โœ— Failed to queue [${dispensary.id}]: ${error.message}`); } } return queued; } async function queueCategoryProductionCrawls(category?: IntelligenceCategory): Promise { const categories = category ? [category] : CATEGORIES; let totalQueued = 0; for (const cat of categories) { console.log(`\n๐Ÿญ Queueing Production ${cat.toUpperCase()} Crawls...`); // For now, only products have production-ready crawlers (Dutchie only) if (cat !== 'product') { console.log(` โญ๏ธ No production crawler for ${cat} yet - skipping`); continue; } // Find dispensaries ready for production crawl const query = ` SELECT id, name, ${cat}_provider as provider, last_${cat}_scan_at as last_scan FROM dispensaries WHERE ${cat}_provider = 'dutchie' AND ${cat}_crawler_mode = 'production' AND ${cat}_confidence >= 70 AND (last_${cat}_scan_at IS NULL OR last_${cat}_scan_at < NOW() - INTERVAL '4 hours') ORDER BY CASE WHEN last_${cat}_scan_at IS NULL THEN 0 ELSE 1 END, last_${cat}_scan_at ASC LIMIT $1 `; const result = await pool.query(query, [flags.limit]); if (flags.dryRun) { console.log(` Would queue ${result.rows.length} dispensaries for ${cat} production crawl:`); for (const row of result.rows) { const lastScan = row.last_scan ? new Date(row.last_scan).toISOString() : 'never'; console.log(` - [${row.id}] ${row.name} (provider: ${row.provider}, last: ${lastScan})`); } totalQueued += result.rows.length; continue; } for (const dispensary of result.rows) { try { // For products, use the existing crawl_jobs table for production await pool.query( `INSERT INTO crawl_jobs (store_id, job_type, trigger_type, status, priority, metadata) SELECT s.id, 'full_crawl', 'scheduled', 'pending', 50, jsonb_build_object('dispensary_id', $1, 'category', $2, 'source', 'queue-intelligence') FROM stores s JOIN dispensaries d ON (d.menu_url = s.dutchie_url OR d.name ILIKE '%' || s.name || '%') WHERE d.id = $1 LIMIT 1`, [dispensary.id, cat] ); console.log(` โœ“ Queued ${cat} production: [${dispensary.id}] ${dispensary.name}`); totalQueued++; } catch (error: any) { console.error(` โœ— Failed to queue [${dispensary.id}]: ${error.message}`); } } } return totalQueued; } async function queueCategorySandboxCrawls(category?: IntelligenceCategory): Promise { const categories = category ? [category] : CATEGORIES; let totalQueued = 0; for (const cat of categories) { console.log(`\n๐Ÿงช Queueing Sandbox ${cat.toUpperCase()} Crawls...`); // Find dispensaries in sandbox mode for this category const query = ` SELECT d.id, d.name, d.${cat}_provider as provider, d.${cat}_confidence as confidence, d.website, d.menu_url FROM dispensaries d WHERE d.${cat}_crawler_mode = 'sandbox' AND d.${cat}_provider IS NOT NULL AND (d.website IS NOT NULL OR d.menu_url IS NOT NULL) AND NOT EXISTS ( SELECT 1 FROM sandbox_crawl_jobs sj WHERE sj.dispensary_id = d.id AND sj.category = $1 AND sj.status IN ('pending', 'running') ) ORDER BY d.${cat}_confidence DESC, d.updated_at ASC LIMIT $2 `; const result = await pool.query(query, [cat, flags.limit]); if (flags.dryRun) { console.log(` Would queue ${result.rows.length} dispensaries for ${cat} sandbox crawl:`); for (const row of result.rows) { console.log(` - [${row.id}] ${row.name} (provider: ${row.provider}, confidence: ${row.confidence}%)`); } totalQueued += result.rows.length; continue; } for (const dispensary of result.rows) { try { // Create sandbox entry if needed const sandboxResult = await pool.query( `INSERT INTO crawler_sandboxes (dispensary_id, category, suspected_menu_provider, mode, status) VALUES ($1, $2, $3, 'template_learning', 'pending') ON CONFLICT (dispensary_id, category) WHERE status NOT IN ('moved_to_production', 'failed') DO UPDATE SET updated_at = NOW() RETURNING id`, [dispensary.id, cat, dispensary.provider] ); const sandboxId = sandboxResult.rows[0]?.id; // Create sandbox job await pool.query( `INSERT INTO sandbox_crawl_jobs (dispensary_id, sandbox_id, category, job_type, status, priority) VALUES ($1, $2, $3, 'crawl', 'pending', 5)`, [dispensary.id, sandboxId, cat] ); console.log(` โœ“ Queued ${cat} sandbox: [${dispensary.id}] ${dispensary.name} (${dispensary.provider})`); totalQueued++; } catch (error: any) { console.error(` โœ— Failed to queue [${dispensary.id}]: ${error.message}`); } } } return totalQueued; } async function processDetectionJobs(): Promise { console.log('\n๐Ÿ” Processing Detection Jobs...'); // Get pending detection jobs const jobs = await pool.query( `SELECT DISTINCT dispensary_id FROM sandbox_crawl_jobs WHERE job_type = 'detection' AND status = 'pending' ${flags.category ? `AND category = $2` : ''} ${flags.dispensary ? `AND dispensary_id = $${flags.category ? '3' : '2'}` : ''} LIMIT $1`, flags.category ? (flags.dispensary ? [flags.limit, flags.category, flags.dispensary] : [flags.limit, flags.category]) : (flags.dispensary ? [flags.limit, flags.dispensary] : [flags.limit]) ); for (const job of jobs.rows) { console.log(`\nProcessing detection for dispensary ${job.dispensary_id}...`); try { // Get dispensary info const dispResult = await pool.query( 'SELECT id, name, website, menu_url FROM dispensaries WHERE id = $1', [job.dispensary_id] ); const dispensary = dispResult.rows[0]; if (!dispensary) { console.log(` โœ— Dispensary not found`); continue; } const websiteUrl = dispensary.website || dispensary.menu_url; if (!websiteUrl) { console.log(` โœ— No website URL`); continue; } // Mark jobs as running await pool.query( `UPDATE sandbox_crawl_jobs SET status = 'running', started_at = NOW() WHERE dispensary_id = $1 AND job_type = 'detection' AND status = 'pending'`, [job.dispensary_id] ); // Run multi-category detection console.log(` Detecting providers for ${dispensary.name}...`); const detection = await detectMultiCategoryProviders(websiteUrl, { timeout: 45000 }); // Update all categories await updateAllCategoryProviders(job.dispensary_id, detection); // Mark jobs as completed await pool.query( `UPDATE sandbox_crawl_jobs SET status = 'completed', completed_at = NOW(), result_summary = $1 WHERE dispensary_id = $2 AND job_type = 'detection' AND status = 'running'`, [JSON.stringify({ product: { provider: detection.product.provider, confidence: detection.product.confidence }, specials: { provider: detection.specials.provider, confidence: detection.specials.confidence }, brand: { provider: detection.brand.provider, confidence: detection.brand.confidence }, metadata: { provider: detection.metadata.provider, confidence: detection.metadata.confidence }, }), job.dispensary_id] ); console.log(` โœ“ Detection complete:`); console.log(` Product: ${detection.product.provider} (${detection.product.confidence}%) -> ${detection.product.mode}`); console.log(` Specials: ${detection.specials.provider} (${detection.specials.confidence}%) -> ${detection.specials.mode}`); console.log(` Brand: ${detection.brand.provider} (${detection.brand.confidence}%) -> ${detection.brand.mode}`); console.log(` Metadata: ${detection.metadata.provider} (${detection.metadata.confidence}%) -> ${detection.metadata.mode}`); } catch (error: any) { console.log(` โœ— Error: ${error.message}`); await pool.query( `UPDATE sandbox_crawl_jobs SET status = 'failed', error_message = $1 WHERE dispensary_id = $2 AND job_type = 'detection' AND status = 'running'`, [error.message, job.dispensary_id] ); } } } async function processCrawlJobs(): Promise { const categories = flags.category ? [flags.category] : CATEGORIES; for (const cat of categories) { console.log(`\nโš™๏ธ Processing ${cat.toUpperCase()} Crawl Jobs...\n`); // Process sandbox jobs for this category if (flags.sandbox || !flags.production) { await processCategorySandboxJobs(cat, flags.limit); } // Process production jobs for this category if (flags.production && cat === 'product') { // Get pending production crawls const prodJobs = await pool.query( `SELECT d.id FROM dispensaries d WHERE d.product_provider = 'dutchie' AND d.product_crawler_mode = 'production' AND d.product_confidence >= 70 ${flags.dispensary ? 'AND d.id = $2' : ''} LIMIT $1`, flags.dispensary ? [flags.limit, flags.dispensary] : [flags.limit] ); for (const job of prodJobs.rows) { console.log(`Processing production ${cat} crawl for dispensary ${job.id}...`); const result = await runCrawlProductsJob(job.id); console.log(` ${result.success ? 'โœ“' : 'โœ—'} ${result.message}`); } } } } async function processSpecificDispensary(): Promise { if (!flags.dispensary) return; console.log(`\n๐ŸŽฏ Processing Dispensary ${flags.dispensary}...\n`); const dispResult = await pool.query( 'SELECT * FROM dispensaries WHERE id = $1', [flags.dispensary] ); if (dispResult.rows.length === 0) { console.log('Dispensary not found'); return; } const dispensary = dispResult.rows[0]; console.log(`Name: ${dispensary.name}`); console.log(`Website: ${dispensary.website || dispensary.menu_url || 'none'}`); console.log(''); if (flags.detection) { console.log('Running multi-category detection...'); const websiteUrl = dispensary.website || dispensary.menu_url; if (websiteUrl) { const detection = await detectMultiCategoryProviders(websiteUrl); await updateAllCategoryProviders(flags.dispensary, detection); console.log('Detection results:'); console.log(` Product: ${detection.product.provider} (${detection.product.confidence}%) -> ${detection.product.mode}`); console.log(` Specials: ${detection.specials.provider} (${detection.specials.confidence}%) -> ${detection.specials.mode}`); console.log(` Brand: ${detection.brand.provider} (${detection.brand.confidence}%) -> ${detection.brand.mode}`); console.log(` Metadata: ${detection.metadata.provider} (${detection.metadata.confidence}%) -> ${detection.metadata.mode}`); } } if (flags.production) { console.log('\nRunning production crawls...'); const results = await runAllCategoryProductionCrawls(flags.dispensary); console.log(` ${results.summary}`); } if (flags.sandbox) { console.log('\nRunning sandbox crawls...'); const results = await runAllCategorySandboxCrawls(flags.dispensary); console.log(` ${results.summary}`); } } async function showStats(): Promise { console.log('\n๐Ÿ“Š Multi-Category Intelligence Stats:'); // Per-category stats for (const cat of CATEGORIES) { const stats = await pool.query(` SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE ${cat}_provider IS NULL) as no_provider, COUNT(*) FILTER (WHERE ${cat}_provider = 'dutchie') as dutchie, COUNT(*) FILTER (WHERE ${cat}_provider = 'treez') as treez, COUNT(*) FILTER (WHERE ${cat}_provider NOT IN ('dutchie', 'treez', 'unknown') AND ${cat}_provider IS NOT NULL) as other, COUNT(*) FILTER (WHERE ${cat}_provider = 'unknown') as unknown, COUNT(*) FILTER (WHERE ${cat}_crawler_mode = 'production') as production, COUNT(*) FILTER (WHERE ${cat}_crawler_mode = 'sandbox') as sandbox, AVG(${cat}_confidence) as avg_confidence FROM dispensaries `); const s = stats.rows[0]; console.log(` ${cat.toUpperCase()}: Providers: Dutchie=${s.dutchie}, Treez=${s.treez}, Other=${s.other}, Unknown=${s.unknown}, None=${s.no_provider} Modes: Production=${s.production}, Sandbox=${s.sandbox} Avg Confidence: ${Math.round(s.avg_confidence || 0)}%`); } // Job stats per category console.log('\n Sandbox Jobs by Category:'); const jobStats = await pool.query(` SELECT category, COUNT(*) FILTER (WHERE status = 'pending') as pending, COUNT(*) FILTER (WHERE status = 'running') as running, COUNT(*) FILTER (WHERE status = 'completed') as completed, COUNT(*) FILTER (WHERE status = 'failed') as failed FROM sandbox_crawl_jobs GROUP BY category ORDER BY category `); for (const row of jobStats.rows) { console.log(` ${row.category}: pending=${row.pending}, running=${row.running}, completed=${row.completed}, failed=${row.failed}`); } } async function main() { if (flags.help) { await showHelp(); process.exit(0); } console.log('โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•'); console.log(' Multi-Category Intelligence Queue Manager'); console.log('โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•'); if (flags.dryRun) { console.log('\n๐Ÿ” DRY RUN MODE - No changes will be made\n'); } if (flags.category) { console.log(`\n๐Ÿ“Œ Filtering to category: ${flags.category}\n`); } try { // Show current stats first await showStats(); // If specific dispensary specified, process it directly if (flags.dispensary && flags.process) { await processSpecificDispensary(); } else if (flags.process) { // Process mode - run jobs if (flags.detection) { await processDetectionJobs(); } await processCrawlJobs(); } else { // Queuing mode let totalQueued = 0; if (flags.detection) { totalQueued += await queueMultiCategoryDetection(); } if (flags.production) { totalQueued += await queueCategoryProductionCrawls(flags.category); } if (flags.sandbox) { totalQueued += await queueCategorySandboxCrawls(flags.category); } console.log('\nโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•'); console.log(` Total queued: ${totalQueued}`); console.log('โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•\n'); } // Show updated stats if (!flags.dryRun) { await showStats(); } } catch (error) { console.error('Fatal error:', error); process.exit(1); } finally { await pool.end(); } } main();