diff --git a/backend/migrations/021_dispensary_crawl_schedule.sql b/backend/migrations/021_dispensary_crawl_schedule.sql new file mode 100644 index 00000000..1d39e204 --- /dev/null +++ b/backend/migrations/021_dispensary_crawl_schedule.sql @@ -0,0 +1,126 @@ +-- Migration 021: Dispensary Crawl Schedule +-- Creates a schedule table linked directly to dispensaries (not stores) + +-- Dispensary crawl schedule table +CREATE TABLE IF NOT EXISTS dispensary_crawl_schedule ( + id SERIAL PRIMARY KEY, + dispensary_id INTEGER NOT NULL REFERENCES dispensaries(id) ON DELETE CASCADE, + + -- Schedule settings + is_active BOOLEAN NOT NULL DEFAULT TRUE, + interval_minutes INTEGER NOT NULL DEFAULT 240, -- 4 hours default + priority INTEGER NOT NULL DEFAULT 0, -- Higher = scheduled first + + -- Timing + last_run_at TIMESTAMPTZ, + next_run_at TIMESTAMPTZ, + + -- Status tracking + last_status VARCHAR(50), -- 'success', 'error', 'sandbox_only', 'detection_only', 'running' + last_summary TEXT, + last_error TEXT, + last_duration_ms INTEGER, + + -- Run tracking + consecutive_failures INTEGER DEFAULT 0, + total_runs INTEGER DEFAULT 0, + successful_runs INTEGER DEFAULT 0, + + -- Metadata + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + + UNIQUE(dispensary_id) +); + +-- Indexes +CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_schedule_active ON dispensary_crawl_schedule(is_active); +CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_schedule_next_run ON dispensary_crawl_schedule(next_run_at) WHERE is_active = TRUE; +CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_schedule_status ON dispensary_crawl_schedule(last_status); +CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_schedule_priority ON dispensary_crawl_schedule(priority DESC, next_run_at ASC); + +-- Dispensary crawl jobs table (separate from store crawl_jobs) +CREATE TABLE IF NOT EXISTS dispensary_crawl_jobs ( + id SERIAL PRIMARY KEY, + dispensary_id INTEGER NOT NULL REFERENCES dispensaries(id) ON DELETE CASCADE, + schedule_id INTEGER REFERENCES dispensary_crawl_schedule(id) ON DELETE SET NULL, + + -- Job info + job_type VARCHAR(50) NOT NULL DEFAULT 'orchestrator', -- 'orchestrator', 'detection', 'products', 'sandbox' + trigger_type VARCHAR(50) NOT NULL DEFAULT 'scheduled', -- 'scheduled', 'manual', 'bootstrap' + status VARCHAR(20) NOT NULL DEFAULT 'pending', -- 'pending', 'running', 'completed', 'failed', 'cancelled' + priority INTEGER DEFAULT 0, + + -- Timing + scheduled_at TIMESTAMPTZ DEFAULT NOW(), + started_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + duration_ms INTEGER, + + -- Results + detection_ran BOOLEAN DEFAULT FALSE, + crawl_ran BOOLEAN DEFAULT FALSE, + crawl_type VARCHAR(20), -- 'production', 'sandbox', 'none' + products_found INTEGER, + products_new INTEGER, + products_updated INTEGER, + + -- Detection results + detected_provider VARCHAR(50), + detected_confidence SMALLINT, + detected_mode VARCHAR(20), + + -- Error tracking + error_message TEXT, + + -- Worker tracking + worker_id VARCHAR(100), + run_id UUID, + + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +-- Indexes +CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_status ON dispensary_crawl_jobs(status); +CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_dispensary ON dispensary_crawl_jobs(dispensary_id, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_pending ON dispensary_crawl_jobs(priority DESC, scheduled_at ASC) WHERE status = 'pending'; +CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_recent ON dispensary_crawl_jobs(created_at DESC); + +-- View for dispensary schedule status with latest job info +CREATE OR REPLACE VIEW dispensary_crawl_status AS +SELECT + d.id AS dispensary_id, + d.name AS dispensary_name, + d.city, + d.website, + d.menu_url, + d.product_provider, + d.product_confidence, + d.product_crawler_mode, + d.last_product_scan_at, + COALESCE(dcs.is_active, TRUE) AS schedule_active, + COALESCE(dcs.interval_minutes, 240) AS interval_minutes, + COALESCE(dcs.priority, 0) AS priority, + dcs.last_run_at, + dcs.next_run_at, + dcs.last_status, + dcs.last_summary, + dcs.last_error, + dcs.consecutive_failures, + dcs.total_runs, + dcs.successful_runs, + dcj.id AS latest_job_id, + dcj.job_type AS latest_job_type, + dcj.status AS latest_job_status, + dcj.started_at AS latest_job_started, + dcj.products_found AS latest_products_found +FROM dispensaries d +LEFT JOIN dispensary_crawl_schedule dcs ON dcs.dispensary_id = d.id +LEFT JOIN LATERAL ( + SELECT * FROM dispensary_crawl_jobs + WHERE dispensary_id = d.id + ORDER BY created_at DESC + LIMIT 1 +) dcj ON true +ORDER BY COALESCE(dcs.priority, 0) DESC, d.name; diff --git a/backend/migrations/022_scheduler_dispensary_link_fields.sql b/backend/migrations/022_scheduler_dispensary_link_fields.sql new file mode 100644 index 00000000..1ea8c16f --- /dev/null +++ b/backend/migrations/022_scheduler_dispensary_link_fields.sql @@ -0,0 +1,77 @@ +-- ===================================================== +-- Add dispensary slug/state to crawl_schedule_status view +-- ===================================================== +-- Enables proper linking from scheduler to dispensary detail page +-- which uses route /dispensaries/:state/:city/:slug + +DROP VIEW IF EXISTS crawl_schedule_status; +CREATE OR REPLACE VIEW crawl_schedule_status AS +SELECT + s.id AS store_id, + s.name AS store_name, + s.slug AS store_slug, + s.timezone, + s.active, + s.scrape_enabled, + s.last_scraped_at, + + -- Dispensary info (master record) + s.dispensary_id, + d.name AS dispensary_name, + d.company_name AS dispensary_company, + d.city AS dispensary_city, + d.state AS dispensary_state, + d.slug AS dispensary_slug, + d.address AS dispensary_address, + d.menu_url AS dispensary_menu_url, + + -- Provider intelligence from dispensary (if linked) + d.product_provider, + d.product_confidence, + d.product_crawler_mode, + + -- Schedule settings (use store override or global) + COALESCE(scs.enabled, TRUE) AS schedule_enabled, + COALESCE(scs.interval_hours, cs_global.interval_hours, 4) AS interval_hours, + COALESCE(scs.daily_special_enabled, TRUE) AS daily_special_enabled, + COALESCE(scs.daily_special_time, '00:01'::TIME) AS daily_special_time, + COALESCE(scs.priority, 0) AS priority, + + -- Orchestrator status + scs.last_status, + scs.last_summary, + scs.last_run_at AS schedule_last_run, + scs.last_error, + + -- Next scheduled run calculation + CASE + WHEN s.last_scraped_at IS NULL THEN NOW() + ELSE s.last_scraped_at + (COALESCE(scs.interval_hours, cs_global.interval_hours, 4) || ' hours')::INTERVAL + END AS next_scheduled_run, + + -- Latest job info + cj.id AS latest_job_id, + cj.status AS latest_job_status, + cj.job_type AS latest_job_type, + cj.trigger_type AS latest_job_trigger, + cj.started_at AS latest_job_started, + cj.completed_at AS latest_job_completed, + cj.products_found AS latest_products_found, + cj.products_new AS latest_products_new, + cj.products_updated AS latest_products_updated, + cj.error_message AS latest_job_error + +FROM stores s +LEFT JOIN dispensaries d ON d.id = s.dispensary_id +LEFT JOIN store_crawl_schedule scs ON scs.store_id = s.id +LEFT JOIN crawler_schedule cs_global ON cs_global.schedule_type = 'global_interval' +LEFT JOIN LATERAL ( + SELECT * FROM crawl_jobs cj2 + WHERE cj2.store_id = s.id + ORDER BY cj2.created_at DESC + LIMIT 1 +) cj ON TRUE +WHERE s.active = TRUE; + +-- Grant permissions +GRANT SELECT ON crawl_schedule_status TO dutchie; diff --git a/backend/migrations/023_dispensary_crawl_status_add_link_fields.sql b/backend/migrations/023_dispensary_crawl_status_add_link_fields.sql new file mode 100644 index 00000000..f2a763e5 --- /dev/null +++ b/backend/migrations/023_dispensary_crawl_status_add_link_fields.sql @@ -0,0 +1,46 @@ +-- Migration 023: Add state and slug to dispensary_crawl_status view +-- Enables proper linking from scheduler to dispensary detail page + +DROP VIEW IF EXISTS dispensary_crawl_status; + +CREATE OR REPLACE VIEW dispensary_crawl_status AS +SELECT + d.id AS dispensary_id, + d.name AS dispensary_name, + d.city, + d.state, + d.slug, + d.website, + d.menu_url, + d.product_provider, + d.product_confidence, + d.product_crawler_mode, + d.last_product_scan_at, + COALESCE(dcs.is_active, TRUE) AS schedule_active, + COALESCE(dcs.interval_minutes, 240) AS interval_minutes, + COALESCE(dcs.priority, 0) AS priority, + dcs.last_run_at, + dcs.next_run_at, + dcs.last_status, + dcs.last_summary, + dcs.last_error, + dcs.consecutive_failures, + dcs.total_runs, + dcs.successful_runs, + dcj.id AS latest_job_id, + dcj.job_type AS latest_job_type, + dcj.status AS latest_job_status, + dcj.started_at AS latest_job_started, + dcj.products_found AS latest_products_found +FROM dispensaries d +LEFT JOIN dispensary_crawl_schedule dcs ON dcs.dispensary_id = d.id +LEFT JOIN LATERAL ( + SELECT * FROM dispensary_crawl_jobs + WHERE dispensary_id = d.id + ORDER BY created_at DESC + LIMIT 1 +) dcj ON true +ORDER BY COALESCE(dcs.priority, 0) DESC, d.name; + +-- Grant permissions +GRANT SELECT ON dispensary_crawl_status TO dutchie; diff --git a/backend/src/routes/schedule.ts b/backend/src/routes/schedule.ts index 3742f000..44ef84b3 100644 --- a/backend/src/routes/schedule.ts +++ b/backend/src/routes/schedule.ts @@ -20,6 +20,13 @@ import { runBatchOrchestrator, getStoresDueForOrchestration, } from '../services/store-crawl-orchestrator'; +import { + runDispensaryOrchestrator, + runBatchDispensaryOrchestrator, + getDispensariesDueForOrchestration, + ensureAllDispensariesHaveSchedules, +} from '../services/dispensary-orchestrator'; +import { pool } from '../db/migrate'; const router = Router(); router.use(authMiddleware); @@ -341,4 +348,245 @@ router.get('/due', async (req: Request, res: Response) => { } }); +// ============================================ +// Dispensary Schedule Endpoints (NEW - dispensary-centric) +// ============================================ + +/** + * GET /api/schedule/dispensaries + * Get all dispensary schedule statuses (uses the view) + */ +router.get('/dispensaries', async (req: Request, res: Response) => { + try { + const result = await pool.query(` + SELECT * FROM dispensary_crawl_status + ORDER BY priority DESC, dispensary_name + `); + res.json({ dispensaries: result.rows }); + } catch (error: any) { + console.error('Error fetching dispensary schedules:', error); + res.status(500).json({ error: 'Failed to fetch dispensary schedules' }); + } +}); + +/** + * GET /api/schedule/dispensaries/:id + * Get schedule for a specific dispensary + */ +router.get('/dispensaries/:id', async (req: Request, res: Response) => { + try { + const dispensaryId = parseInt(req.params.id); + if (isNaN(dispensaryId)) { + return res.status(400).json({ error: 'Invalid dispensary ID' }); + } + + const result = await pool.query(` + SELECT * FROM dispensary_crawl_status + WHERE dispensary_id = $1 + `, [dispensaryId]); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Dispensary not found' }); + } + + res.json({ schedule: result.rows[0] }); + } catch (error: any) { + console.error('Error fetching dispensary schedule:', error); + res.status(500).json({ error: 'Failed to fetch dispensary schedule' }); + } +}); + +/** + * PUT /api/schedule/dispensaries/:id + * Update schedule for a specific dispensary + */ +router.put('/dispensaries/:id', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => { + try { + const dispensaryId = parseInt(req.params.id); + if (isNaN(dispensaryId)) { + return res.status(400).json({ error: 'Invalid dispensary ID' }); + } + + const { + is_active, + interval_minutes, + priority + } = req.body; + + // Upsert schedule + const result = await pool.query(` + INSERT INTO dispensary_crawl_schedule (dispensary_id, is_active, interval_minutes, priority) + VALUES ($1, COALESCE($2, TRUE), COALESCE($3, 240), COALESCE($4, 0)) + ON CONFLICT (dispensary_id) DO UPDATE SET + is_active = COALESCE($2, dispensary_crawl_schedule.is_active), + interval_minutes = COALESCE($3, dispensary_crawl_schedule.interval_minutes), + priority = COALESCE($4, dispensary_crawl_schedule.priority), + updated_at = NOW() + RETURNING * + `, [dispensaryId, is_active, interval_minutes, priority]); + + res.json({ schedule: result.rows[0] }); + } catch (error: any) { + console.error('Error updating dispensary schedule:', error); + res.status(500).json({ error: 'Failed to update dispensary schedule' }); + } +}); + +/** + * GET /api/schedule/dispensary-jobs + * Get recent dispensary crawl jobs + */ +router.get('/dispensary-jobs', async (req: Request, res: Response) => { + try { + const limit = parseInt(req.query.limit as string) || 50; + const result = await pool.query(` + SELECT dcj.*, d.name as dispensary_name + FROM dispensary_crawl_jobs dcj + JOIN dispensaries d ON d.id = dcj.dispensary_id + ORDER BY dcj.created_at DESC + LIMIT $1 + `, [Math.min(limit, 200)]); + res.json({ jobs: result.rows }); + } catch (error: any) { + console.error('Error fetching dispensary jobs:', error); + res.status(500).json({ error: 'Failed to fetch dispensary jobs' }); + } +}); + +/** + * GET /api/schedule/dispensary-jobs/:dispensaryId + * Get recent jobs for a specific dispensary + */ +router.get('/dispensary-jobs/:dispensaryId', async (req: Request, res: Response) => { + try { + const dispensaryId = parseInt(req.params.dispensaryId); + if (isNaN(dispensaryId)) { + return res.status(400).json({ error: 'Invalid dispensary ID' }); + } + + const limit = parseInt(req.query.limit as string) || 10; + const result = await pool.query(` + SELECT dcj.*, d.name as dispensary_name + FROM dispensary_crawl_jobs dcj + JOIN dispensaries d ON d.id = dcj.dispensary_id + WHERE dcj.dispensary_id = $1 + ORDER BY dcj.created_at DESC + LIMIT $2 + `, [dispensaryId, Math.min(limit, 100)]); + + res.json({ jobs: result.rows }); + } catch (error: any) { + console.error('Error fetching dispensary jobs:', error); + res.status(500).json({ error: 'Failed to fetch dispensary jobs' }); + } +}); + +/** + * POST /api/schedule/trigger/dispensary/:id + * Trigger orchestrator for a specific dispensary (Run Now button) + */ +router.post('/trigger/dispensary/:id', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => { + try { + const dispensaryId = parseInt(req.params.id); + if (isNaN(dispensaryId)) { + return res.status(400).json({ error: 'Invalid dispensary ID' }); + } + + // Run the dispensary orchestrator + const result = await runDispensaryOrchestrator(dispensaryId); + + res.json({ + result, + message: result.summary, + success: result.status === 'success' || result.status === 'sandbox_only' || result.status === 'detection_only', + }); + } catch (error: any) { + console.error('Error triggering dispensary orchestrator:', error); + res.status(500).json({ error: 'Failed to trigger orchestrator' }); + } +}); + +/** + * POST /api/schedule/trigger/dispensaries/batch + * Trigger orchestrator for multiple dispensaries + */ +router.post('/trigger/dispensaries/batch', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => { + try { + const { dispensary_ids, concurrency } = req.body; + + if (!Array.isArray(dispensary_ids) || dispensary_ids.length === 0) { + return res.status(400).json({ error: 'dispensary_ids must be a non-empty array' }); + } + + const results = await runBatchDispensaryOrchestrator( + dispensary_ids, + concurrency || 3 + ); + + const summary = { + total: results.length, + success: results.filter(r => r.status === 'success').length, + sandbox_only: results.filter(r => r.status === 'sandbox_only').length, + detection_only: results.filter(r => r.status === 'detection_only').length, + error: results.filter(r => r.status === 'error').length, + }; + + res.json({ results, summary }); + } catch (error: any) { + console.error('Error triggering batch orchestrator:', error); + res.status(500).json({ error: 'Failed to trigger batch orchestrator' }); + } +}); + +/** + * GET /api/schedule/dispensary-due + * Get dispensaries that are due for orchestration + */ +router.get('/dispensary-due', async (req: Request, res: Response) => { + try { + const limit = parseInt(req.query.limit as string) || 10; + const dispensaryIds = await getDispensariesDueForOrchestration(Math.min(limit, 50)); + + // Get details for the due dispensaries + if (dispensaryIds.length > 0) { + const details = await pool.query(` + SELECT d.id, d.name, d.product_provider, d.product_crawler_mode, + dcs.next_run_at, dcs.last_status, dcs.priority + FROM dispensaries d + LEFT JOIN dispensary_crawl_schedule dcs ON dcs.dispensary_id = d.id + WHERE d.id = ANY($1) + ORDER BY COALESCE(dcs.priority, 0) DESC, dcs.last_run_at ASC NULLS FIRST + `, [dispensaryIds]); + + res.json({ dispensaries_due: details.rows, count: dispensaryIds.length }); + } else { + res.json({ dispensaries_due: [], count: 0 }); + } + } catch (error: any) { + console.error('Error getting dispensaries due for orchestration:', error); + res.status(500).json({ error: 'Failed to get dispensaries due' }); + } +}); + +/** + * POST /api/schedule/dispensaries/bootstrap + * Ensure all dispensaries have schedule entries + */ +router.post('/dispensaries/bootstrap', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => { + try { + const { interval_minutes } = req.body; + + const result = await ensureAllDispensariesHaveSchedules(interval_minutes || 240); + + res.json({ + message: `Created ${result.created} new schedules, ${result.existing} already existed`, + created: result.created, + existing: result.existing, + }); + } catch (error: any) { + console.error('Error bootstrapping dispensary schedules:', error); + res.status(500).json({ error: 'Failed to bootstrap schedules' }); + } +}); + export default router; diff --git a/backend/src/scripts/bootstrap-discovery.ts b/backend/src/scripts/bootstrap-discovery.ts new file mode 100644 index 00000000..2aa2a00c --- /dev/null +++ b/backend/src/scripts/bootstrap-discovery.ts @@ -0,0 +1,388 @@ +#!/usr/bin/env npx tsx +/** + * Bootstrap Discovery Script + * + * One-time (but reusable) bootstrap command that: + * 1. Ensures every Dispensary has a dispensary_crawl_schedule entry (4h default) + * 2. Optionally runs RunDispensaryOrchestrator for each dispensary + * + * Usage: + * npx tsx src/scripts/bootstrap-discovery.ts # Create schedules only + * npx tsx src/scripts/bootstrap-discovery.ts --run # Create schedules + run orchestrator + * npx tsx src/scripts/bootstrap-discovery.ts --run --limit=10 # Run for first 10 dispensaries + * npx tsx src/scripts/bootstrap-discovery.ts --dry-run # Preview what would happen + * npx tsx src/scripts/bootstrap-discovery.ts --status # Show current status only + */ + +import { pool } from '../db/migrate'; +import { + ensureAllDispensariesHaveSchedules, + runDispensaryOrchestrator, + runBatchDispensaryOrchestrator, + getDispensariesDueForOrchestration, +} from '../services/dispensary-orchestrator'; + +// Parse command line args +const args = process.argv.slice(2); +const flags = { + run: args.includes('--run'), + dryRun: args.includes('--dry-run'), + status: args.includes('--status'), + help: args.includes('--help') || args.includes('-h'), + limit: parseInt(args.find(a => a.startsWith('--limit='))?.split('=')[1] || '0'), + concurrency: parseInt(args.find(a => a.startsWith('--concurrency='))?.split('=')[1] || '3'), + interval: parseInt(args.find(a => a.startsWith('--interval='))?.split('=')[1] || '240'), + detectionOnly: args.includes('--detection-only'), + productionOnly: args.includes('--production-only'), + sandboxOnly: args.includes('--sandbox-only'), +}; + +async function showHelp() { + console.log(` +Bootstrap Discovery - Initialize Dispensary Crawl System + +USAGE: + npx tsx src/scripts/bootstrap-discovery.ts [OPTIONS] + +OPTIONS: + --run After creating schedules, run the orchestrator for each dispensary + --dry-run Show what would happen without making changes + --status Show current status and exit + --limit=N Limit how many dispensaries to process (0 = all, default: 0) + --concurrency=N How many dispensaries to process in parallel (default: 3) + --interval=M Default interval in minutes for new schedules (default: 240 = 4 hours) + --detection-only Only run detection, don't crawl + --production-only Only run dispensaries in production mode + --sandbox-only Only run dispensaries in sandbox mode + --help, -h Show this help message + +EXAMPLES: + # Create schedule entries for all dispensaries (no crawling) + npx tsx src/scripts/bootstrap-discovery.ts + + # Create schedules and run orchestrator for all dispensaries + npx tsx src/scripts/bootstrap-discovery.ts --run + + # Run orchestrator for first 10 dispensaries + npx tsx src/scripts/bootstrap-discovery.ts --run --limit=10 + + # Run with higher concurrency + npx tsx src/scripts/bootstrap-discovery.ts --run --concurrency=5 + + # Show current status + npx tsx src/scripts/bootstrap-discovery.ts --status + +WHAT IT DOES: + 1. Creates dispensary_crawl_schedule entries for all dispensaries that don't have one + 2. If --run: For each dispensary, runs the orchestrator which: + a. Checks if provider detection is needed (null/unknown/stale/low confidence) + b. Runs detection if needed + c. If Dutchie + production mode: runs production crawl + d. Otherwise: runs sandbox crawl + 3. Updates schedule status and job records +`); +} + +async function showStatus() { + console.log('\nšŸ“Š Current Dispensary Crawl Status\n'); + console.log('═'.repeat(70)); + + // Get dispensary counts by provider + const providerStats = await pool.query(` + SELECT + COALESCE(product_provider, 'undetected') as provider, + COUNT(*) as count, + COUNT(*) FILTER (WHERE product_crawler_mode = 'production') as production, + COUNT(*) FILTER (WHERE product_crawler_mode = 'sandbox') as sandbox, + COUNT(*) FILTER (WHERE product_crawler_mode IS NULL) as no_mode + FROM dispensaries + GROUP BY COALESCE(product_provider, 'undetected') + ORDER BY count DESC + `); + + console.log('\nProvider Distribution:'); + console.log('-'.repeat(60)); + console.log( + 'Provider'.padEnd(20) + + 'Total'.padStart(8) + + 'Production'.padStart(12) + + 'Sandbox'.padStart(10) + + 'No Mode'.padStart(10) + ); + console.log('-'.repeat(60)); + + for (const row of providerStats.rows) { + console.log( + row.provider.padEnd(20) + + row.count.toString().padStart(8) + + row.production.toString().padStart(12) + + row.sandbox.toString().padStart(10) + + row.no_mode.toString().padStart(10) + ); + } + + // Get schedule stats + const scheduleStats = await pool.query(` + SELECT + COUNT(DISTINCT d.id) as total_dispensaries, + COUNT(DISTINCT dcs.id) as with_schedule, + COUNT(DISTINCT d.id) - COUNT(DISTINCT dcs.id) as without_schedule, + COUNT(*) FILTER (WHERE dcs.is_active = TRUE) as active_schedules, + COUNT(*) FILTER (WHERE dcs.last_status = 'success') as last_success, + COUNT(*) FILTER (WHERE dcs.last_status = 'error') as last_error, + COUNT(*) FILTER (WHERE dcs.last_status = 'sandbox_only') as last_sandbox, + COUNT(*) FILTER (WHERE dcs.last_status = 'detection_only') as last_detection, + COUNT(*) FILTER (WHERE dcs.next_run_at <= NOW()) as due_now, + AVG(dcs.interval_minutes)::INTEGER as avg_interval + FROM dispensaries d + LEFT JOIN dispensary_crawl_schedule dcs ON dcs.dispensary_id = d.id + `); + + const s = scheduleStats.rows[0]; + console.log('\n\nSchedule Status:'); + console.log('-'.repeat(60)); + console.log(` Total Dispensaries: ${s.total_dispensaries}`); + console.log(` With Schedule: ${s.with_schedule}`); + console.log(` Without Schedule: ${s.without_schedule}`); + console.log(` Active Schedules: ${s.active_schedules || 0}`); + console.log(` Average Interval: ${s.avg_interval || 240} minutes`); + + console.log('\n Last Run Status:'); + console.log(` - Success: ${s.last_success || 0}`); + console.log(` - Error: ${s.last_error || 0}`); + console.log(` - Sandbox Only: ${s.last_sandbox || 0}`); + console.log(` - Detection Only: ${s.last_detection || 0}`); + console.log(` - Due Now: ${s.due_now || 0}`); + + // Get recent job stats + const jobStats = await pool.query(` + SELECT + COUNT(*) as total, + COUNT(*) FILTER (WHERE status = 'completed') as completed, + COUNT(*) FILTER (WHERE status = 'failed') as failed, + COUNT(*) FILTER (WHERE status = 'running') as running, + COUNT(*) FILTER (WHERE status = 'pending') as pending, + COUNT(*) FILTER (WHERE detection_ran = TRUE) as with_detection, + COUNT(*) FILTER (WHERE crawl_ran = TRUE) as with_crawl, + COUNT(*) FILTER (WHERE crawl_type = 'production') as production_crawls, + COUNT(*) FILTER (WHERE crawl_type = 'sandbox') as sandbox_crawls, + SUM(products_found) as total_products_found + FROM dispensary_crawl_jobs + WHERE created_at > NOW() - INTERVAL '24 hours' + `); + + const j = jobStats.rows[0]; + console.log('\n\nJobs (Last 24 Hours):'); + console.log('-'.repeat(60)); + console.log(` Total Jobs: ${j.total || 0}`); + console.log(` Completed: ${j.completed || 0}`); + console.log(` Failed: ${j.failed || 0}`); + console.log(` Running: ${j.running || 0}`); + console.log(` Pending: ${j.pending || 0}`); + console.log(` With Detection: ${j.with_detection || 0}`); + console.log(` With Crawl: ${j.with_crawl || 0}`); + console.log(` - Production: ${j.production_crawls || 0}`); + console.log(` - Sandbox: ${j.sandbox_crawls || 0}`); + console.log(` Products Found: ${j.total_products_found || 0}`); + + console.log('\n' + '═'.repeat(70) + '\n'); +} + +async function createSchedules(): Promise<{ created: number; existing: number }> { + console.log('\nšŸ“… Creating Dispensary Schedules...\n'); + + if (flags.dryRun) { + // Count how many would be created + const result = await pool.query(` + SELECT COUNT(*) as count + FROM dispensaries d + WHERE NOT EXISTS ( + SELECT 1 FROM dispensary_crawl_schedule dcs WHERE dcs.dispensary_id = d.id + ) + `); + + const wouldCreate = parseInt(result.rows[0].count); + console.log(` Would create ${wouldCreate} new schedule entries (${flags.interval} minute interval)`); + + return { created: wouldCreate, existing: 0 }; + } + + const result = await ensureAllDispensariesHaveSchedules(flags.interval); + + console.log(` āœ“ Created ${result.created} new schedule entries`); + console.log(` āœ“ ${result.existing} dispensaries already had schedules`); + + return result; +} + +async function getDispensariesToProcess(): Promise { + // Build query based on filters + let whereClause = 'TRUE'; + + if (flags.productionOnly) { + whereClause += ` AND d.product_crawler_mode = 'production'`; + } else if (flags.sandboxOnly) { + whereClause += ` AND d.product_crawler_mode = 'sandbox'`; + } + + if (flags.detectionOnly) { + whereClause += ` AND (d.product_provider IS NULL OR d.product_provider = 'unknown' OR d.product_confidence < 50)`; + } + + const limitClause = flags.limit > 0 ? `LIMIT ${flags.limit}` : ''; + + const query = ` + SELECT d.id, d.name, d.product_provider, d.product_crawler_mode + FROM dispensaries d + LEFT JOIN dispensary_crawl_schedule dcs ON dcs.dispensary_id = d.id + WHERE ${whereClause} + ORDER BY + COALESCE(dcs.priority, 0) DESC, + dcs.last_run_at ASC NULLS FIRST, + d.id ASC + ${limitClause} + `; + + const result = await pool.query(query); + return result.rows.map(row => row.id); +} + +async function runOrchestrator() { + console.log('\nšŸš€ Running Dispensary Orchestrator...\n'); + + const dispensaryIds = await getDispensariesToProcess(); + + if (dispensaryIds.length === 0) { + console.log(' No dispensaries to process.'); + return; + } + + console.log(` Found ${dispensaryIds.length} dispensaries to process`); + console.log(` Concurrency: ${flags.concurrency}`); + + if (flags.dryRun) { + console.log('\n Would process these dispensaries:'); + + const details = await pool.query( + `SELECT id, name, product_provider, product_crawler_mode + FROM dispensaries WHERE id = ANY($1) ORDER BY id`, + [dispensaryIds] + ); + + for (const row of details.rows.slice(0, 20)) { + console.log(` - [${row.id}] ${row.name} (${row.product_provider || 'undetected'}, ${row.product_crawler_mode || 'no mode'})`); + } + + if (details.rows.length > 20) { + console.log(` ... and ${details.rows.length - 20} more`); + } + + return; + } + + console.log('\n Starting batch processing...\n'); + + const results = await runBatchDispensaryOrchestrator(dispensaryIds, flags.concurrency); + + // Summarize results + const summary = { + total: results.length, + success: results.filter(r => r.status === 'success').length, + sandboxOnly: results.filter(r => r.status === 'sandbox_only').length, + detectionOnly: results.filter(r => r.status === 'detection_only').length, + error: results.filter(r => r.status === 'error').length, + detectionsRan: results.filter(r => r.detectionRan).length, + crawlsRan: results.filter(r => r.crawlRan).length, + productionCrawls: results.filter(r => r.crawlType === 'production').length, + sandboxCrawls: results.filter(r => r.crawlType === 'sandbox').length, + totalProducts: results.reduce((sum, r) => sum + (r.productsFound || 0), 0), + totalDuration: results.reduce((sum, r) => sum + r.durationMs, 0), + }; + + console.log('\n' + '═'.repeat(70)); + console.log(' Orchestrator Results'); + console.log('═'.repeat(70)); + console.log(` + Total Processed: ${summary.total} + + Status: + - Success: ${summary.success} + - Sandbox Only: ${summary.sandboxOnly} + - Detection Only: ${summary.detectionOnly} + - Error: ${summary.error} + + Operations: + - Detections Ran: ${summary.detectionsRan} + - Crawls Ran: ${summary.crawlsRan} + - Production: ${summary.productionCrawls} + - Sandbox: ${summary.sandboxCrawls} + + Results: + - Products Found: ${summary.totalProducts} + - Total Duration: ${(summary.totalDuration / 1000).toFixed(1)}s + - Avg per Dispensary: ${(summary.totalDuration / summary.total / 1000).toFixed(1)}s +`); + console.log('═'.repeat(70) + '\n'); + + // Show errors if any + const errors = results.filter(r => r.status === 'error'); + if (errors.length > 0) { + console.log('\nāš ļø Errors encountered:'); + for (const err of errors.slice(0, 10)) { + console.log(` - [${err.dispensaryId}] ${err.dispensaryName}: ${err.error}`); + } + if (errors.length > 10) { + console.log(` ... and ${errors.length - 10} more errors`); + } + } +} + +async function main() { + if (flags.help) { + await showHelp(); + process.exit(0); + } + + console.log('\n' + '═'.repeat(70)); + console.log(' Dispensary Crawl Bootstrap Discovery'); + console.log('═'.repeat(70)); + + if (flags.dryRun) { + console.log('\nšŸ” DRY RUN MODE - No changes will be made'); + } + + try { + // Always show status first + await showStatus(); + + if (flags.status) { + // Status-only mode, we're done + await pool.end(); + process.exit(0); + } + + // Step 1: Create schedule entries + await createSchedules(); + + // Step 2: Optionally run orchestrator + if (flags.run) { + await runOrchestrator(); + } else { + console.log('\nšŸ’” Tip: Use --run to also run the orchestrator for each dispensary'); + } + + // Show final status + if (!flags.dryRun) { + await showStatus(); + } + + } catch (error: any) { + console.error('\nāŒ Fatal error:', error.message); + console.error(error.stack); + process.exit(1); + } finally { + await pool.end(); + } +} + +main(); diff --git a/backend/src/services/dispensary-orchestrator.ts b/backend/src/services/dispensary-orchestrator.ts new file mode 100644 index 00000000..1acc124a --- /dev/null +++ b/backend/src/services/dispensary-orchestrator.ts @@ -0,0 +1,506 @@ +/** + * Dispensary Crawl Orchestrator + * + * Orchestrates the complete crawl workflow for a dispensary: + * 1. Load dispensary data + * 2. Check if provider detection is needed + * 3. Run provider detection if needed + * 4. Queue appropriate crawl jobs based on provider/mode + * 5. Update dispensary_crawl_schedule with meaningful status + * + * This works DIRECTLY with dispensaries (not through stores table). + */ + +import { v4 as uuidv4 } from 'uuid'; +import { pool } from '../db/migrate'; +import { crawlerLogger } from './crawler-logger'; +import { + detectMultiCategoryProviders, + updateAllCategoryProviders, + MultiCategoryDetectionResult, +} from './intelligence-detector'; +import { runCrawlProductsJob, runSandboxProductsJob } from './category-crawler-jobs'; + +// ======================================== +// Types +// ======================================== + +export type OrchestratorStatus = 'success' | 'error' | 'sandbox_only' | 'detection_only' | 'pending' | 'running'; + +export interface DispensaryOrchestratorResult { + status: OrchestratorStatus; + summary: string; + runId: string; + dispensaryId: number; + dispensaryName: string; + detectionRan: boolean; + detectionResult?: MultiCategoryDetectionResult; + crawlRan: boolean; + crawlType?: 'production' | 'sandbox' | 'none'; + productsFound?: number; + productsNew?: number; + productsUpdated?: number; + error?: string; + durationMs: number; +} + +interface DispensaryInfo { + id: number; + name: string; + city: string; + website: string | null; + menu_url: string | null; + product_provider: string | null; + product_confidence: number | null; + product_crawler_mode: string | null; + last_product_scan_at: Date | null; +} + +// ======================================== +// Main Orchestrator Function +// ======================================== + +/** + * Run the complete crawl orchestration for a dispensary + * + * Behavior: + * 1. Load the dispensary info + * 2. If product_provider is missing or stale (>7 days), run detection + * 3. After detection: + * - If product_provider = 'dutchie' and product_crawler_mode = 'production': Run production crawl + * - Otherwise: Run sandbox crawl + * 4. Update dispensary_crawl_schedule with status/summary + */ +export async function runDispensaryOrchestrator( + dispensaryId: number, + scheduleId?: number +): Promise { + const startTime = Date.now(); + const runId = uuidv4(); + + let result: DispensaryOrchestratorResult = { + status: 'pending', + summary: '', + runId, + dispensaryId, + dispensaryName: '', + detectionRan: false, + crawlRan: false, + durationMs: 0, + }; + + try { + // Mark schedule as running + await updateScheduleStatus(dispensaryId, 'running', 'Starting orchestrator...', null, runId); + + // 1. Load dispensary info + const dispensary = await getDispensaryInfo(dispensaryId); + if (!dispensary) { + throw new Error(`Dispensary ${dispensaryId} not found`); + } + + result.dispensaryName = dispensary.name; + + // 2. Check if provider detection is needed + const needsDetection = await checkNeedsDetection(dispensary); + + if (needsDetection) { + // Run provider detection + const websiteUrl = dispensary.menu_url || dispensary.website; + if (!websiteUrl) { + result.status = 'error'; + result.summary = 'No website URL available for detection'; + result.error = 'Dispensary has no menu_url or website configured'; + await updateScheduleStatus(dispensaryId, 'error', result.summary, result.error, runId); + result.durationMs = Date.now() - startTime; + await createJobRecord(dispensaryId, scheduleId, result); + return result; + } + + await updateScheduleStatus(dispensaryId, 'running', 'Running provider detection...', null, runId); + + const detectionResult = await detectMultiCategoryProviders(websiteUrl); + result.detectionRan = true; + result.detectionResult = detectionResult; + + // Save detection results to dispensary + await updateAllCategoryProviders(dispensaryId, detectionResult); + + crawlerLogger.providerDetected({ + dispensary_id: dispensaryId, + dispensary_name: dispensary.name, + detected_provider: detectionResult.product.provider, + confidence: detectionResult.product.confidence, + detection_method: 'dispensary_orchestrator', + menu_url: websiteUrl, + category: 'product', + }); + + // Refresh dispensary info after detection + const updatedDispensary = await getDispensaryInfo(dispensaryId); + if (updatedDispensary) { + Object.assign(dispensary, updatedDispensary); + } + } + + // 3. Determine crawl type and run + const provider = dispensary.product_provider; + const mode = dispensary.product_crawler_mode; + + if (provider === 'dutchie' && mode === 'production') { + // Production Dutchie crawl + await updateScheduleStatus(dispensaryId, 'running', 'Running Dutchie production crawl...', null, runId); + + try { + // Run the category-specific crawl job + const crawlResult = await runCrawlProductsJob(dispensaryId); + + result.crawlRan = true; + result.crawlType = 'production'; + + if (crawlResult.success) { + result.productsFound = crawlResult.data?.productsFound || 0; + + const detectionPart = result.detectionRan ? 'Detection + ' : ''; + result.summary = `${detectionPart}Dutchie products crawl completed`; + result.status = 'success'; + + crawlerLogger.jobCompleted({ + job_id: 0, + store_id: 0, + store_name: dispensary.name, + duration_ms: Date.now() - startTime, + products_found: result.productsFound || 0, + products_new: 0, + products_updated: 0, + provider: 'dutchie', + }); + } else { + result.status = 'error'; + result.error = crawlResult.message; + result.summary = `Dutchie crawl failed: ${crawlResult.message.slice(0, 100)}`; + } + + } catch (crawlError: any) { + result.status = 'error'; + result.error = crawlError.message; + result.summary = `Dutchie crawl failed: ${crawlError.message.slice(0, 100)}`; + result.crawlRan = true; + result.crawlType = 'production'; + + crawlerLogger.jobFailed({ + job_id: 0, + store_id: 0, + store_name: dispensary.name, + duration_ms: Date.now() - startTime, + error_message: crawlError.message, + provider: 'dutchie', + }); + } + + } else if (provider && provider !== 'unknown') { + // Sandbox crawl for non-Dutchie or sandbox mode + await updateScheduleStatus(dispensaryId, 'running', `Running ${provider} sandbox crawl...`, null, runId); + + try { + const sandboxResult = await runSandboxProductsJob(dispensaryId); + + result.crawlRan = true; + result.crawlType = 'sandbox'; + result.productsFound = sandboxResult.data?.productsExtracted || 0; + + const detectionPart = result.detectionRan ? 'Detection + ' : ''; + if (sandboxResult.success) { + result.summary = `${detectionPart}${provider} sandbox crawl (${result.productsFound} items, quality ${sandboxResult.data?.qualityScore || 0}%)`; + result.status = 'sandbox_only'; + } else { + result.summary = `${detectionPart}${provider} sandbox failed: ${sandboxResult.message}`; + result.status = 'error'; + result.error = sandboxResult.message; + } + + } catch (sandboxError: any) { + result.status = 'error'; + result.error = sandboxError.message; + result.summary = `Sandbox crawl failed: ${sandboxError.message.slice(0, 100)}`; + result.crawlRan = true; + result.crawlType = 'sandbox'; + } + + } else { + // No provider detected - detection only + if (result.detectionRan) { + result.summary = `Detection complete: provider=${dispensary.product_provider || 'unknown'}, confidence=${dispensary.product_confidence || 0}%`; + result.status = 'detection_only'; + } else { + result.summary = 'No provider detected and no crawl possible'; + result.status = 'error'; + result.error = 'Could not determine menu provider'; + } + } + + } catch (error: any) { + result.status = 'error'; + result.error = error.message; + result.summary = `Orchestrator error: ${error.message.slice(0, 100)}`; + + crawlerLogger.queueFailure({ + queue_type: 'dispensary_orchestrator', + error_message: error.message, + }); + } + + result.durationMs = Date.now() - startTime; + + // Update final schedule status + await updateScheduleStatus(dispensaryId, result.status, result.summary, result.error || null, runId); + + // Create job record + await createJobRecord(dispensaryId, scheduleId, result); + + return result; +} + +// ======================================== +// Helper Functions +// ======================================== + +async function getDispensaryInfo(dispensaryId: number): Promise { + const result = await pool.query( + `SELECT id, name, city, website, menu_url, + product_provider, product_confidence, product_crawler_mode, last_product_scan_at + FROM dispensaries + WHERE id = $1`, + [dispensaryId] + ); + return result.rows[0] || null; +} + +async function checkNeedsDetection(dispensary: DispensaryInfo): Promise { + // No provider = definitely needs detection + if (!dispensary.product_provider) return true; + + // Unknown provider = needs detection + if (dispensary.product_provider === 'unknown') return true; + + // Low confidence = needs re-detection + if (dispensary.product_confidence !== null && dispensary.product_confidence < 50) return true; + + // Stale detection (> 7 days) = needs refresh + if (dispensary.last_product_scan_at) { + const daysSince = (Date.now() - new Date(dispensary.last_product_scan_at).getTime()) / (1000 * 60 * 60 * 24); + if (daysSince > 7) return true; + } + + return false; +} + +async function updateScheduleStatus( + dispensaryId: number, + status: OrchestratorStatus, + summary: string, + error: string | null, + runId: string +): Promise { + await pool.query( + `INSERT INTO dispensary_crawl_schedule (dispensary_id, last_status, last_summary, last_error, last_run_at, updated_at) + VALUES ($1, $2, $3, $4, NOW(), NOW()) + ON CONFLICT (dispensary_id) DO UPDATE SET + last_status = $2, + last_summary = $3, + last_error = $4, + last_run_at = NOW(), + updated_at = NOW()`, + [dispensaryId, status, summary, error] + ); +} + +async function createJobRecord( + dispensaryId: number, + scheduleId: number | undefined, + result: DispensaryOrchestratorResult +): Promise { + await pool.query( + `INSERT INTO dispensary_crawl_jobs ( + dispensary_id, schedule_id, job_type, trigger_type, status, priority, + scheduled_at, started_at, completed_at, duration_ms, + detection_ran, crawl_ran, crawl_type, + products_found, products_new, products_updated, + detected_provider, detected_confidence, detected_mode, + error_message, run_id + ) VALUES ( + $1, $2, 'orchestrator', 'manual', $3, 100, + NOW(), NOW(), NOW(), $4, + $5, $6, $7, + $8, $9, $10, + $11, $12, $13, + $14, $15 + )`, + [ + dispensaryId, + scheduleId || null, + result.status === 'success' ? 'completed' : result.status === 'error' ? 'failed' : 'completed', + result.durationMs, + result.detectionRan, + result.crawlRan, + result.crawlType || null, + result.productsFound || null, + result.productsNew || null, + result.productsUpdated || null, + result.detectionResult?.product.provider || null, + result.detectionResult?.product.confidence || null, + result.detectionResult?.product.mode || null, + result.error || null, + result.runId, + ] + ); + + // Update schedule stats + if (result.status === 'success' || result.status === 'sandbox_only' || result.status === 'detection_only') { + await pool.query( + `UPDATE dispensary_crawl_schedule SET + total_runs = COALESCE(total_runs, 0) + 1, + successful_runs = COALESCE(successful_runs, 0) + 1, + consecutive_failures = 0, + next_run_at = NOW() + (interval_minutes || ' minutes')::INTERVAL, + last_duration_ms = $2 + WHERE dispensary_id = $1`, + [dispensaryId, result.durationMs] + ); + } else if (result.status === 'error') { + await pool.query( + `UPDATE dispensary_crawl_schedule SET + total_runs = COALESCE(total_runs, 0) + 1, + consecutive_failures = COALESCE(consecutive_failures, 0) + 1, + next_run_at = NOW() + (interval_minutes || ' minutes')::INTERVAL, + last_duration_ms = $2 + WHERE dispensary_id = $1`, + [dispensaryId, result.durationMs] + ); + } +} + +// ======================================== +// Batch Processing +// ======================================== + +/** + * Run orchestrator for multiple dispensaries + */ +export async function runBatchDispensaryOrchestrator( + dispensaryIds: number[], + concurrency: number = 3 +): Promise { + const results: DispensaryOrchestratorResult[] = []; + + // Process in batches + for (let i = 0; i < dispensaryIds.length; i += concurrency) { + const batch = dispensaryIds.slice(i, i + concurrency); + console.log(`Processing batch ${Math.floor(i / concurrency) + 1}: dispensaries ${batch.join(', ')}`); + + const batchResults = await Promise.all( + batch.map(id => runDispensaryOrchestrator(id)) + ); + results.push(...batchResults); + + // Small delay between batches to avoid overwhelming the system + if (i + concurrency < dispensaryIds.length) { + await new Promise(r => setTimeout(r, 1000)); + } + } + + return results; +} + +/** + * Get dispensaries that are due for orchestration + */ +export async function getDispensariesDueForOrchestration(limit: number = 10): Promise { + const result = await pool.query( + `SELECT d.id + FROM dispensaries d + LEFT JOIN dispensary_crawl_schedule dcs ON dcs.dispensary_id = d.id + WHERE COALESCE(dcs.is_active, TRUE) = TRUE + AND ( + dcs.next_run_at IS NULL + OR dcs.next_run_at <= NOW() + ) + AND (dcs.last_status IS NULL OR dcs.last_status NOT IN ('running', 'pending')) + ORDER BY COALESCE(dcs.priority, 0) DESC, dcs.last_run_at ASC NULLS FIRST + LIMIT $1`, + [limit] + ); + + return result.rows.map(row => row.id); +} + +/** + * Ensure all dispensaries have schedule entries + */ +export async function ensureAllDispensariesHaveSchedules( + intervalMinutes: number = 240 +): Promise<{ created: number; existing: number }> { + // Get all dispensary IDs that don't have a schedule + const result = await pool.query( + `INSERT INTO dispensary_crawl_schedule (dispensary_id, is_active, interval_minutes, priority) + SELECT d.id, TRUE, $1, 0 + FROM dispensaries d + WHERE NOT EXISTS ( + SELECT 1 FROM dispensary_crawl_schedule dcs WHERE dcs.dispensary_id = d.id + ) + RETURNING id`, + [intervalMinutes] + ); + + const existingCount = await pool.query('SELECT COUNT(*) FROM dispensary_crawl_schedule'); + + return { + created: result.rowCount || 0, + existing: parseInt(existingCount.rows[0].count) - (result.rowCount || 0), + }; +} + +// ======================================== +// Scheduler Integration +// ======================================== + +let dispensarySchedulerRunning = false; + +/** + * Process dispensaries using the intelligent orchestrator + * Called periodically by the scheduler + */ +export async function processDispensaryScheduler(): Promise { + if (dispensarySchedulerRunning) { + console.log('Dispensary scheduler already running, skipping...'); + return; + } + + dispensarySchedulerRunning = true; + + try { + // Get dispensaries due for orchestration + const dispensaryIds = await getDispensariesDueForOrchestration(3); + + if (dispensaryIds.length === 0) { + return; + } + + console.log(`Dispensary Scheduler: Processing ${dispensaryIds.length} dispensaries due for crawl`); + + // Process each dispensary through the orchestrator + for (const dispensaryId of dispensaryIds) { + try { + console.log(`Dispensary Scheduler: Starting crawl for dispensary ${dispensaryId}`); + const result = await runDispensaryOrchestrator(dispensaryId); + console.log(`Dispensary Scheduler: Dispensary ${dispensaryId} completed - ${result.summary}`); + } catch (error: any) { + console.error(`Dispensary Scheduler: Dispensary ${dispensaryId} failed - ${error.message}`); + } + } + + console.log(`Dispensary Scheduler: Finished processing ${dispensaryIds.length} dispensaries`); + } finally { + dispensarySchedulerRunning = false; + } +} diff --git a/frontend/src/components/Layout.tsx b/frontend/src/components/Layout.tsx index b43d5e08..729fdf1f 100755 --- a/frontend/src/components/Layout.tsx +++ b/frontend/src/components/Layout.tsx @@ -82,8 +82,8 @@ export function Layout({ children }: LayoutProps) { useEffect(() => { const fetchVersion = async () => { try { - const response = await api.get('/version'); - setVersionInfo(response.data); + const data = await api.getVersion(); + setVersionInfo(data); } catch (error) { console.error('Failed to fetch version info:', error); } diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 4dfb2a41..275e0ff7 100755 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -451,6 +451,33 @@ class ApiClient { }); } + // Dispensary Schedule (new dispensary-centric API) + async getDispensarySchedules() { + return this.request<{ dispensaries: any[] }>('/api/schedule/dispensaries'); + } + + async getDispensarySchedule(dispensaryId: number) { + return this.request<{ schedule: any }>(`/api/schedule/dispensaries/${dispensaryId}`); + } + + async updateDispensarySchedule(dispensaryId: number, data: any) { + return this.request<{ schedule: any }>(`/api/schedule/dispensaries/${dispensaryId}`, { + method: 'PUT', + body: JSON.stringify(data), + }); + } + + async getDispensaryCrawlJobs(limit?: number) { + const params = limit ? `?limit=${limit}` : ''; + return this.request<{ jobs: any[] }>(`/api/schedule/dispensary-jobs${params}`); + } + + async triggerDispensaryCrawl(dispensaryId: number) { + return this.request<{ result: any; message: string; success: boolean }>(`/api/schedule/trigger/dispensary/${dispensaryId}`, { + method: 'POST', + }); + } + async getCrawlJobs(limit?: number) { const params = limit ? `?limit=${limit}` : ''; return this.request<{ jobs: any[] }>(`/api/schedule/jobs${params}`); @@ -484,6 +511,16 @@ class ApiClient { method: 'POST', }); } + + // Version + async getVersion() { + return this.request<{ + build_version: string; + git_sha: string; + build_time: string; + image_tag: string; + }>('/api/version'); + } } export const api = new ApiClient(API_URL); diff --git a/frontend/src/pages/Dispensaries.tsx b/frontend/src/pages/Dispensaries.tsx index c389c3d7..47be67ff 100644 --- a/frontend/src/pages/Dispensaries.tsx +++ b/frontend/src/pages/Dispensaries.tsx @@ -60,9 +60,11 @@ export function Dispensaries() { }; const filteredDispensaries = dispensaries.filter(disp => { + const searchLower = searchTerm.toLowerCase(); const matchesSearch = !searchTerm || - disp.name.toLowerCase().includes(searchTerm.toLowerCase()) || - (disp.company_name && disp.company_name.toLowerCase().includes(searchTerm.toLowerCase())); + disp.name.toLowerCase().includes(searchLower) || + (disp.company_name && disp.company_name.toLowerCase().includes(searchLower)) || + (disp.dba_name && disp.dba_name.toLowerCase().includes(searchLower)); const matchesCity = !filterCity || disp.city === filterCity; return matchesSearch && matchesCity; }); diff --git a/frontend/src/pages/ScraperSchedule.tsx b/frontend/src/pages/ScraperSchedule.tsx index 16f23a57..71fe88b6 100644 --- a/frontend/src/pages/ScraperSchedule.tsx +++ b/frontend/src/pages/ScraperSchedule.tsx @@ -12,50 +12,41 @@ interface GlobalSchedule { description?: string; } -interface StoreSchedule { - store_id: number; - store_name: string; - store_slug: string; - timezone: string; - active: boolean; - scrape_enabled: boolean; - last_scraped_at: string | null; - schedule_enabled: boolean; - interval_hours: number; - daily_special_enabled: boolean; - daily_special_time: string; - priority: number; - next_scheduled_run: string; - latest_job_id: number | null; - latest_job_status: string | null; - latest_job_type: string | null; - latest_job_trigger: string | null; - latest_job_started: string | null; - latest_job_completed: string | null; - latest_products_found: number | null; - latest_products_new: number | null; - latest_products_updated: number | null; - latest_job_error: string | null; - // Dispensary info (from master AZDHS directory) - dispensary_id: number | null; - dispensary_name: string | null; - dispensary_company: string | null; - dispensary_city: string | null; - // Provider intelligence (from dispensary) +// Dispensary-centric schedule data (from dispensary_crawl_status view) +interface DispensarySchedule { + dispensary_id: number; + dispensary_name: string; + city: string | null; + state: string | null; + slug: string | null; + website: string | null; + menu_url: string | null; product_provider: string | null; product_confidence: number | null; product_crawler_mode: string | null; - // Orchestrator status + last_product_scan_at: string | null; + schedule_active: boolean; + interval_minutes: number; + priority: number; + last_run_at: string | null; + next_run_at: string | null; last_status: string | null; last_summary: string | null; - schedule_last_run: string | null; last_error: string | null; + consecutive_failures: number | null; + total_runs: number | null; + successful_runs: number | null; + latest_job_id: number | null; + latest_job_type: string | null; + latest_job_status: string | null; + latest_job_started: string | null; + latest_products_found: number | null; } interface CrawlJob { id: number; - store_id: number; - store_name: string; + dispensary_id: number; + dispensary_name: string; job_type: string; trigger_type: string; status: string; @@ -71,12 +62,12 @@ interface CrawlJob { export function ScraperSchedule() { const [globalSchedules, setGlobalSchedules] = useState([]); - const [storeSchedules, setStoreSchedules] = useState([]); + const [dispensarySchedules, setDispensarySchedules] = useState([]); const [jobs, setJobs] = useState([]); const [loading, setLoading] = useState(true); const [autoRefresh, setAutoRefresh] = useState(true); - const [activeTab, setActiveTab] = useState<'stores' | 'jobs' | 'global'>('stores'); - const [triggeringStore, setTriggeringStore] = useState(null); + const [activeTab, setActiveTab] = useState<'dispensaries' | 'jobs' | 'global'>('dispensaries'); + const [triggeringDispensary, setTriggeringDispensary] = useState(null); useEffect(() => { loadData(); @@ -89,14 +80,14 @@ export function ScraperSchedule() { const loadData = async () => { try { - const [globalData, storesData, jobsData] = await Promise.all([ + const [globalData, dispensaryData, jobsData] = await Promise.all([ api.getGlobalSchedule(), - api.getStoreSchedules(), - api.getCrawlJobs(100) + api.getDispensarySchedules(), + api.getDispensaryCrawlJobs(100) ]); setGlobalSchedules(globalData.schedules || []); - setStoreSchedules(storesData.stores || []); + setDispensarySchedules(dispensaryData.dispensaries || []); setJobs(jobsData.jobs || []); } catch (error) { console.error('Failed to load schedule data:', error); @@ -105,15 +96,15 @@ export function ScraperSchedule() { } }; - const handleTriggerCrawl = async (storeId: number) => { - setTriggeringStore(storeId); + const handleTriggerCrawl = async (dispensaryId: number) => { + setTriggeringDispensary(dispensaryId); try { - await api.triggerStoreCrawl(storeId); + await api.triggerDispensaryCrawl(dispensaryId); await loadData(); } catch (error) { console.error('Failed to trigger crawl:', error); } finally { - setTriggeringStore(null); + setTriggeringDispensary(null); } }; @@ -239,20 +230,20 @@ export function ScraperSchedule() { {/* Tabs */}
)} - {activeTab === 'stores' && ( + {activeTab === 'dispensaries' && (
- Dispensary / Store + Dispensary Provider Schedule Last Run @@ -400,55 +391,43 @@ export function ScraperSchedule() { - {storeSchedules.map((store) => ( - + {dispensarySchedules.map((disp) => ( +
- {store.dispensary_id ? ( + {disp.state && disp.city && disp.slug ? ( - {store.dispensary_name || store.store_name} + {disp.dispensary_name} ) : ( - {store.store_name} - )} - {!store.dispensary_id && ( - - Unmapped - + {disp.dispensary_name} )}
- {store.dispensary_city ? `${store.dispensary_city} | ${store.timezone}` : store.timezone} + {disp.city ? `${disp.city}, ${disp.state}` : disp.state}
- {store.product_provider ? ( + {disp.product_provider ? (
- {store.product_provider} + {disp.product_provider} - {store.product_crawler_mode !== 'production' && ( + {disp.product_crawler_mode !== 'production' && (
sandbox
)}
@@ -472,31 +451,31 @@ export function ScraperSchedule() { borderRadius: '12px', fontSize: '12px', fontWeight: '600', - background: store.schedule_enabled && store.scrape_enabled ? '#d1fae5' : '#fee2e2', - color: store.schedule_enabled && store.scrape_enabled ? '#065f46' : '#991b1b' + background: disp.schedule_active ? '#d1fae5' : '#fee2e2', + color: disp.schedule_active ? '#065f46' : '#991b1b' }}> - {store.schedule_enabled && store.scrape_enabled ? 'Active' : 'Disabled'} + {disp.schedule_active ? 'Active' : 'Disabled'} - Every {store.interval_hours}h + Every {Math.round(disp.interval_minutes / 60)}h
-
{formatTimeAgo(store.last_scraped_at)}
- {store.last_scraped_at && ( +
{formatTimeAgo(disp.last_run_at)}
+ {disp.last_run_at && (
- {new Date(store.last_scraped_at).toLocaleString()} + {new Date(disp.last_run_at).toLocaleString()}
)}
- {formatTimeUntil(store.next_scheduled_run)} + {disp.next_run_at ? formatTimeUntil(disp.next_run_at) : 'Not scheduled'}
- {store.last_status || store.latest_job_status ? ( + {disp.last_status || disp.latest_job_status ? (
- {store.last_status || store.latest_job_status} + {disp.last_status || disp.latest_job_status} - {store.last_error && ( + {disp.last_error && ( )}
- {store.last_summary ? ( + {disp.last_summary ? (
- {store.last_summary} + {disp.last_summary}
- ) : store.latest_products_found !== null ? ( + ) : disp.latest_products_found !== null ? (
- {store.latest_products_found} products - {store.latest_products_new !== null && ` (+${store.latest_products_new} new)`} + {disp.latest_products_found} products
) : null}
@@ -542,19 +520,19 @@ export function ScraperSchedule() { @@ -606,7 +584,7 @@ export function ScraperSchedule() { - + @@ -627,7 +605,7 @@ export function ScraperSchedule() { jobs.map((job) => (
StoreDispensary Type Trigger Status
-
{job.store_name}
+
{job.dispensary_name}
Job #{job.id}