Files
cannaiq/backend/dist/scripts/queue-intelligence.js
Kelly 66e07b2009 fix(monitor): remove non-existent worker columns from job_run_logs query
The job_run_logs table tracks scheduled job orchestration, not individual
worker jobs. Worker info (worker_id, worker_hostname) belongs on
dispensary_crawl_jobs, not job_run_logs.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 18:45:05 -07:00

474 lines
23 KiB
JavaScript

#!/usr/bin/env npx tsx
"use strict";
/**
* Queue Intelligence Script
*
* Orchestrates the multi-category intelligence crawler system:
* 1. Queue dispensaries that need provider detection (all 4 categories)
* 2. Queue per-category production crawls (Dutchie products only for now)
* 3. Queue per-category sandbox crawls (all providers)
*
* Each category (product, specials, brand, metadata) is handled independently.
* A failure in one category does NOT affect other categories.
*
* Usage:
* npx tsx src/scripts/queue-intelligence.ts [--detection] [--production] [--sandbox] [--all]
* npx tsx src/scripts/queue-intelligence.ts --category=product --sandbox
* npx tsx src/scripts/queue-intelligence.ts --process --category=product
* npx tsx src/scripts/queue-intelligence.ts --dry-run
*/
Object.defineProperty(exports, "__esModule", { value: true });
const migrate_1 = require("../db/migrate");
const intelligence_detector_1 = require("../services/intelligence-detector");
const category_crawler_jobs_1 = require("../services/category-crawler-jobs");
// Parse command line args
const args = process.argv.slice(2);
const flags = {
detection: args.includes('--detection') || args.includes('--all'),
production: args.includes('--production') || args.includes('--all'),
sandbox: args.includes('--sandbox') || args.includes('--all'),
dryRun: args.includes('--dry-run'),
process: args.includes('--process'),
help: args.includes('--help') || args.includes('-h'),
limit: parseInt(args.find(a => a.startsWith('--limit='))?.split('=')[1] || '10'),
category: args.find(a => a.startsWith('--category='))?.split('=')[1],
dispensary: parseInt(args.find(a => a.startsWith('--dispensary='))?.split('=')[1] || '0'),
};
// If no specific flags, default to all
if (!flags.detection && !flags.production && !flags.sandbox && !flags.process) {
flags.detection = true;
flags.production = true;
flags.sandbox = true;
}
const CATEGORIES = ['product', 'specials', 'brand', 'metadata'];
async function showHelp() {
console.log(`
Queue Intelligence - Multi-Category Crawler Orchestration
USAGE:
npx tsx src/scripts/queue-intelligence.ts [OPTIONS]
OPTIONS:
--detection Queue dispensaries that need multi-category detection
--production Queue per-category production crawls
--sandbox Queue per-category sandbox crawls
--all Queue all job types (default if no specific flag)
--process Process queued jobs instead of just queuing
--category=CATEGORY Filter to specific category (product|specials|brand|metadata)
--dispensary=ID Process only a specific dispensary
--dry-run Show what would be queued without making changes
--limit=N Maximum dispensaries to queue per type (default: 10)
--help, -h Show this help message
CATEGORIES:
product - Product/menu data (Dutchie=production, others=sandbox)
specials - Deals and specials (all sandbox for now)
brand - Brand intelligence (all sandbox for now)
metadata - Categories/taxonomy (all sandbox for now)
EXAMPLES:
# Queue all dispensaries for appropriate jobs
npx tsx src/scripts/queue-intelligence.ts
# Only queue product detection jobs
npx tsx src/scripts/queue-intelligence.ts --detection --category=product
# Process sandbox jobs for specials category
npx tsx src/scripts/queue-intelligence.ts --process --category=specials --limit=5
# Run full detection for a specific dispensary
npx tsx src/scripts/queue-intelligence.ts --process --detection --dispensary=123
# Dry run to see what would be queued
npx tsx src/scripts/queue-intelligence.ts --dry-run
`);
}
async function queueMultiCategoryDetection() {
console.log('\n📡 Queueing Multi-Category Detection Jobs...');
// Find dispensaries that need provider detection for any category:
// - Any *_provider is null OR
// - Any *_confidence < 70
// - has a website URL
const query = `
SELECT id, name, website, menu_url,
product_provider, product_confidence, product_crawler_mode,
specials_provider, specials_confidence, specials_crawler_mode,
brand_provider, brand_confidence, brand_crawler_mode,
metadata_provider, metadata_confidence, metadata_crawler_mode
FROM dispensaries
WHERE (website IS NOT NULL OR menu_url IS NOT NULL)
AND (
product_provider IS NULL OR product_confidence < 70 OR
specials_provider IS NULL OR specials_confidence < 70 OR
brand_provider IS NULL OR brand_confidence < 70 OR
metadata_provider IS NULL OR metadata_confidence < 70
)
ORDER BY
CASE WHEN product_provider IS NULL THEN 0 ELSE 1 END,
product_confidence ASC
LIMIT $1
`;
const result = await migrate_1.pool.query(query, [flags.limit]);
if (flags.dryRun) {
console.log(` Would queue ${result.rows.length} dispensaries for multi-category detection:`);
for (const row of result.rows) {
const needsDetection = [];
if (!row.product_provider || row.product_confidence < 70)
needsDetection.push('product');
if (!row.specials_provider || row.specials_confidence < 70)
needsDetection.push('specials');
if (!row.brand_provider || row.brand_confidence < 70)
needsDetection.push('brand');
if (!row.metadata_provider || row.metadata_confidence < 70)
needsDetection.push('metadata');
console.log(` - [${row.id}] ${row.name} (needs: ${needsDetection.join(', ')})`);
}
return result.rows.length;
}
let queued = 0;
for (const dispensary of result.rows) {
try {
// Create detection jobs for each category that needs it
for (const category of CATEGORIES) {
const provider = dispensary[`${category}_provider`];
const confidence = dispensary[`${category}_confidence`];
if (!provider || confidence < 70) {
await migrate_1.pool.query(`INSERT INTO sandbox_crawl_jobs (dispensary_id, category, job_type, status, priority)
VALUES ($1, $2, 'detection', 'pending', 10)
ON CONFLICT DO NOTHING`, [dispensary.id, category]);
}
}
console.log(` ✓ Queued detection: [${dispensary.id}] ${dispensary.name}`);
queued++;
}
catch (error) {
console.error(` ✗ Failed to queue [${dispensary.id}]: ${error.message}`);
}
}
return queued;
}
async function queueCategoryProductionCrawls(category) {
const categories = category ? [category] : CATEGORIES;
let totalQueued = 0;
for (const cat of categories) {
console.log(`\n🏭 Queueing Production ${cat.toUpperCase()} Crawls...`);
// For now, only products have production-ready crawlers (Dutchie only)
if (cat !== 'product') {
console.log(` ⏭️ No production crawler for ${cat} yet - skipping`);
continue;
}
// Find dispensaries ready for production crawl
const query = `
SELECT id, name, ${cat}_provider as provider, last_${cat}_scan_at as last_scan
FROM dispensaries
WHERE ${cat}_provider = 'dutchie'
AND ${cat}_crawler_mode = 'production'
AND ${cat}_confidence >= 70
AND (last_${cat}_scan_at IS NULL OR last_${cat}_scan_at < NOW() - INTERVAL '4 hours')
ORDER BY
CASE WHEN last_${cat}_scan_at IS NULL THEN 0 ELSE 1 END,
last_${cat}_scan_at ASC
LIMIT $1
`;
const result = await migrate_1.pool.query(query, [flags.limit]);
if (flags.dryRun) {
console.log(` Would queue ${result.rows.length} dispensaries for ${cat} production crawl:`);
for (const row of result.rows) {
const lastScan = row.last_scan ? new Date(row.last_scan).toISOString() : 'never';
console.log(` - [${row.id}] ${row.name} (provider: ${row.provider}, last: ${lastScan})`);
}
totalQueued += result.rows.length;
continue;
}
for (const dispensary of result.rows) {
try {
// For products, use the existing crawl_jobs table for production
await migrate_1.pool.query(`INSERT INTO crawl_jobs (store_id, job_type, trigger_type, status, priority, metadata)
SELECT s.id, 'full_crawl', 'scheduled', 'pending', 50,
jsonb_build_object('dispensary_id', $1, 'category', $2, 'source', 'queue-intelligence')
FROM stores s
JOIN dispensaries d ON (d.menu_url = s.dutchie_url OR d.name ILIKE '%' || s.name || '%')
WHERE d.id = $1
LIMIT 1`, [dispensary.id, cat]);
console.log(` ✓ Queued ${cat} production: [${dispensary.id}] ${dispensary.name}`);
totalQueued++;
}
catch (error) {
console.error(` ✗ Failed to queue [${dispensary.id}]: ${error.message}`);
}
}
}
return totalQueued;
}
async function queueCategorySandboxCrawls(category) {
const categories = category ? [category] : CATEGORIES;
let totalQueued = 0;
for (const cat of categories) {
console.log(`\n🧪 Queueing Sandbox ${cat.toUpperCase()} Crawls...`);
// Find dispensaries in sandbox mode for this category
const query = `
SELECT d.id, d.name, d.${cat}_provider as provider, d.${cat}_confidence as confidence,
d.website, d.menu_url
FROM dispensaries d
WHERE d.${cat}_crawler_mode = 'sandbox'
AND d.${cat}_provider IS NOT NULL
AND (d.website IS NOT NULL OR d.menu_url IS NOT NULL)
AND NOT EXISTS (
SELECT 1 FROM sandbox_crawl_jobs sj
WHERE sj.dispensary_id = d.id
AND sj.category = $1
AND sj.status IN ('pending', 'running')
)
ORDER BY d.${cat}_confidence DESC, d.updated_at ASC
LIMIT $2
`;
const result = await migrate_1.pool.query(query, [cat, flags.limit]);
if (flags.dryRun) {
console.log(` Would queue ${result.rows.length} dispensaries for ${cat} sandbox crawl:`);
for (const row of result.rows) {
console.log(` - [${row.id}] ${row.name} (provider: ${row.provider}, confidence: ${row.confidence}%)`);
}
totalQueued += result.rows.length;
continue;
}
for (const dispensary of result.rows) {
try {
// Create sandbox entry if needed
const sandboxResult = await migrate_1.pool.query(`INSERT INTO crawler_sandboxes (dispensary_id, category, suspected_menu_provider, mode, status)
VALUES ($1, $2, $3, 'template_learning', 'pending')
ON CONFLICT (dispensary_id, category) WHERE status NOT IN ('moved_to_production', 'failed')
DO UPDATE SET updated_at = NOW()
RETURNING id`, [dispensary.id, cat, dispensary.provider]);
const sandboxId = sandboxResult.rows[0]?.id;
// Create sandbox job
await migrate_1.pool.query(`INSERT INTO sandbox_crawl_jobs (dispensary_id, sandbox_id, category, job_type, status, priority)
VALUES ($1, $2, $3, 'crawl', 'pending', 5)`, [dispensary.id, sandboxId, cat]);
console.log(` ✓ Queued ${cat} sandbox: [${dispensary.id}] ${dispensary.name} (${dispensary.provider})`);
totalQueued++;
}
catch (error) {
console.error(` ✗ Failed to queue [${dispensary.id}]: ${error.message}`);
}
}
}
return totalQueued;
}
async function processDetectionJobs() {
console.log('\n🔍 Processing Detection Jobs...');
// Get pending detection jobs
const jobs = await migrate_1.pool.query(`SELECT DISTINCT dispensary_id
FROM sandbox_crawl_jobs
WHERE job_type = 'detection' AND status = 'pending'
${flags.category ? `AND category = $2` : ''}
${flags.dispensary ? `AND dispensary_id = $${flags.category ? '3' : '2'}` : ''}
LIMIT $1`, flags.category
? (flags.dispensary ? [flags.limit, flags.category, flags.dispensary] : [flags.limit, flags.category])
: (flags.dispensary ? [flags.limit, flags.dispensary] : [flags.limit]));
for (const job of jobs.rows) {
console.log(`\nProcessing detection for dispensary ${job.dispensary_id}...`);
try {
// Get dispensary info
const dispResult = await migrate_1.pool.query('SELECT id, name, website, menu_url FROM dispensaries WHERE id = $1', [job.dispensary_id]);
const dispensary = dispResult.rows[0];
if (!dispensary) {
console.log(` ✗ Dispensary not found`);
continue;
}
const websiteUrl = dispensary.website || dispensary.menu_url;
if (!websiteUrl) {
console.log(` ✗ No website URL`);
continue;
}
// Mark jobs as running
await migrate_1.pool.query(`UPDATE sandbox_crawl_jobs SET status = 'running', started_at = NOW()
WHERE dispensary_id = $1 AND job_type = 'detection' AND status = 'pending'`, [job.dispensary_id]);
// Run multi-category detection
console.log(` Detecting providers for ${dispensary.name}...`);
const detection = await (0, intelligence_detector_1.detectMultiCategoryProviders)(websiteUrl, { timeout: 45000 });
// Update all categories
await (0, intelligence_detector_1.updateAllCategoryProviders)(job.dispensary_id, detection);
// Mark jobs as completed
await migrate_1.pool.query(`UPDATE sandbox_crawl_jobs SET status = 'completed', completed_at = NOW(),
result_summary = $1
WHERE dispensary_id = $2 AND job_type = 'detection' AND status = 'running'`, [JSON.stringify({
product: { provider: detection.product.provider, confidence: detection.product.confidence },
specials: { provider: detection.specials.provider, confidence: detection.specials.confidence },
brand: { provider: detection.brand.provider, confidence: detection.brand.confidence },
metadata: { provider: detection.metadata.provider, confidence: detection.metadata.confidence },
}), job.dispensary_id]);
console.log(` ✓ Detection complete:`);
console.log(` Product: ${detection.product.provider} (${detection.product.confidence}%) -> ${detection.product.mode}`);
console.log(` Specials: ${detection.specials.provider} (${detection.specials.confidence}%) -> ${detection.specials.mode}`);
console.log(` Brand: ${detection.brand.provider} (${detection.brand.confidence}%) -> ${detection.brand.mode}`);
console.log(` Metadata: ${detection.metadata.provider} (${detection.metadata.confidence}%) -> ${detection.metadata.mode}`);
}
catch (error) {
console.log(` ✗ Error: ${error.message}`);
await migrate_1.pool.query(`UPDATE sandbox_crawl_jobs SET status = 'failed', error_message = $1
WHERE dispensary_id = $2 AND job_type = 'detection' AND status = 'running'`, [error.message, job.dispensary_id]);
}
}
}
async function processCrawlJobs() {
const categories = flags.category ? [flags.category] : CATEGORIES;
for (const cat of categories) {
console.log(`\n⚙️ Processing ${cat.toUpperCase()} Crawl Jobs...\n`);
// Process sandbox jobs for this category
if (flags.sandbox || !flags.production) {
await (0, category_crawler_jobs_1.processCategorySandboxJobs)(cat, flags.limit);
}
// Process production jobs for this category
if (flags.production && cat === 'product') {
// Get pending production crawls
const prodJobs = await migrate_1.pool.query(`SELECT d.id
FROM dispensaries d
WHERE d.product_provider = 'dutchie'
AND d.product_crawler_mode = 'production'
AND d.product_confidence >= 70
${flags.dispensary ? 'AND d.id = $2' : ''}
LIMIT $1`, flags.dispensary ? [flags.limit, flags.dispensary] : [flags.limit]);
for (const job of prodJobs.rows) {
console.log(`Processing production ${cat} crawl for dispensary ${job.id}...`);
const result = await (0, category_crawler_jobs_1.runCrawlProductsJob)(job.id);
console.log(` ${result.success ? '✓' : '✗'} ${result.message}`);
}
}
}
}
async function processSpecificDispensary() {
if (!flags.dispensary)
return;
console.log(`\n🎯 Processing Dispensary ${flags.dispensary}...\n`);
const dispResult = await migrate_1.pool.query('SELECT * FROM dispensaries WHERE id = $1', [flags.dispensary]);
if (dispResult.rows.length === 0) {
console.log('Dispensary not found');
return;
}
const dispensary = dispResult.rows[0];
console.log(`Name: ${dispensary.name}`);
console.log(`Website: ${dispensary.website || dispensary.menu_url || 'none'}`);
console.log('');
if (flags.detection) {
console.log('Running multi-category detection...');
const websiteUrl = dispensary.website || dispensary.menu_url;
if (websiteUrl) {
const detection = await (0, intelligence_detector_1.detectMultiCategoryProviders)(websiteUrl);
await (0, intelligence_detector_1.updateAllCategoryProviders)(flags.dispensary, detection);
console.log('Detection results:');
console.log(` Product: ${detection.product.provider} (${detection.product.confidence}%) -> ${detection.product.mode}`);
console.log(` Specials: ${detection.specials.provider} (${detection.specials.confidence}%) -> ${detection.specials.mode}`);
console.log(` Brand: ${detection.brand.provider} (${detection.brand.confidence}%) -> ${detection.brand.mode}`);
console.log(` Metadata: ${detection.metadata.provider} (${detection.metadata.confidence}%) -> ${detection.metadata.mode}`);
}
}
if (flags.production) {
console.log('\nRunning production crawls...');
const results = await (0, category_crawler_jobs_1.runAllCategoryProductionCrawls)(flags.dispensary);
console.log(` ${results.summary}`);
}
if (flags.sandbox) {
console.log('\nRunning sandbox crawls...');
const results = await (0, category_crawler_jobs_1.runAllCategorySandboxCrawls)(flags.dispensary);
console.log(` ${results.summary}`);
}
}
async function showStats() {
console.log('\n📊 Multi-Category Intelligence Stats:');
// Per-category stats
for (const cat of CATEGORIES) {
const stats = await migrate_1.pool.query(`
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE ${cat}_provider IS NULL) as no_provider,
COUNT(*) FILTER (WHERE ${cat}_provider = 'dutchie') as dutchie,
COUNT(*) FILTER (WHERE ${cat}_provider = 'treez') as treez,
COUNT(*) FILTER (WHERE ${cat}_provider NOT IN ('dutchie', 'treez', 'unknown') AND ${cat}_provider IS NOT NULL) as other,
COUNT(*) FILTER (WHERE ${cat}_provider = 'unknown') as unknown,
COUNT(*) FILTER (WHERE ${cat}_crawler_mode = 'production') as production,
COUNT(*) FILTER (WHERE ${cat}_crawler_mode = 'sandbox') as sandbox,
AVG(${cat}_confidence) as avg_confidence
FROM dispensaries
`);
const s = stats.rows[0];
console.log(`
${cat.toUpperCase()}:
Providers: Dutchie=${s.dutchie}, Treez=${s.treez}, Other=${s.other}, Unknown=${s.unknown}, None=${s.no_provider}
Modes: Production=${s.production}, Sandbox=${s.sandbox}
Avg Confidence: ${Math.round(s.avg_confidence || 0)}%`);
}
// Job stats per category
console.log('\n Sandbox Jobs by Category:');
const jobStats = await migrate_1.pool.query(`
SELECT
category,
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'running') as running,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed
FROM sandbox_crawl_jobs
GROUP BY category
ORDER BY category
`);
for (const row of jobStats.rows) {
console.log(` ${row.category}: pending=${row.pending}, running=${row.running}, completed=${row.completed}, failed=${row.failed}`);
}
}
async function main() {
if (flags.help) {
await showHelp();
process.exit(0);
}
console.log('═══════════════════════════════════════════════════════');
console.log(' Multi-Category Intelligence Queue Manager');
console.log('═══════════════════════════════════════════════════════');
if (flags.dryRun) {
console.log('\n🔍 DRY RUN MODE - No changes will be made\n');
}
if (flags.category) {
console.log(`\n📌 Filtering to category: ${flags.category}\n`);
}
try {
// Show current stats first
await showStats();
// If specific dispensary specified, process it directly
if (flags.dispensary && flags.process) {
await processSpecificDispensary();
}
else if (flags.process) {
// Process mode - run jobs
if (flags.detection) {
await processDetectionJobs();
}
await processCrawlJobs();
}
else {
// Queuing mode
let totalQueued = 0;
if (flags.detection) {
totalQueued += await queueMultiCategoryDetection();
}
if (flags.production) {
totalQueued += await queueCategoryProductionCrawls(flags.category);
}
if (flags.sandbox) {
totalQueued += await queueCategorySandboxCrawls(flags.category);
}
console.log('\n═══════════════════════════════════════════════════════');
console.log(` Total queued: ${totalQueued}`);
console.log('═══════════════════════════════════════════════════════\n');
}
// Show updated stats
if (!flags.dryRun) {
await showStats();
}
}
catch (error) {
console.error('Fatal error:', error);
process.exit(1);
}
finally {
await migrate_1.pool.end();
}
}
main();