/** * Database-Driven Task Scheduler * * Per TASK_WORKFLOW_2024-12-10.md: * - Schedules stored in DB (survives restarts) * - Uses SELECT FOR UPDATE to prevent duplicate execution across replicas * - Polls every 60s to check if schedules are due * - Generates tasks into worker_tasks table for task-worker.ts to process * * 2024-12-10: Created to replace legacy node-cron scheduler */ import { pool } from '../db/pool'; import { taskService, TaskRole } from '../tasks/task-service'; // Per TASK_WORKFLOW_2024-12-10.md: Poll interval for checking schedules const POLL_INTERVAL_MS = 60_000; // 60 seconds interface TaskSchedule { id: number; name: string; role: TaskRole; enabled: boolean; interval_hours: number; last_run_at: Date | null; next_run_at: Date | null; state_code: string | null; priority: number; method: 'curl' | 'http' | null; is_immutable: boolean; description: string | null; platform: string | null; last_task_count: number | null; last_error: string | null; } class TaskScheduler { private pollTimer: NodeJS.Timeout | null = null; private isRunning = false; /** * Start the scheduler * Per TASK_WORKFLOW_2024-12-10.md: Called on API server startup */ async start(): Promise { if (this.isRunning) { console.log('[TaskScheduler] Already running'); return; } console.log('[TaskScheduler] Starting database-driven scheduler...'); this.isRunning = true; // Per TASK_WORKFLOW_2024-12-10.md: On startup, recover stale tasks try { const recovered = await taskService.recoverStaleTasks(10); if (recovered > 0) { console.log(`[TaskScheduler] Recovered ${recovered} stale tasks from dead workers`); } } catch (err: any) { console.error('[TaskScheduler] Failed to recover stale tasks:', err.message); } // Per TASK_WORKFLOW_2024-12-10.md: Ensure default schedules exist await this.ensureDefaultSchedules(); // Per TASK_WORKFLOW_2024-12-10.md: Check immediately on startup await this.checkAndRunDueSchedules(); // Per TASK_WORKFLOW_2024-12-10.md: Then poll every 60 seconds this.pollTimer = setInterval(async () => { await this.checkAndRunDueSchedules(); }, POLL_INTERVAL_MS); console.log('[TaskScheduler] Started - polling every 60s'); } /** * Stop the scheduler */ stop(): void { if (this.pollTimer) { clearInterval(this.pollTimer); this.pollTimer = null; } this.isRunning = false; console.log('[TaskScheduler] Stopped'); } /** * Ensure default schedules exist in the database * Per TASK_WORKFLOW_2024-12-10.md: Creates schedules if they don't exist * * NOTE: Per-state product_discovery schedules are created by migration 089. * This only creates core immutable schedules that should exist regardless. */ private async ensureDefaultSchedules(): Promise { // Core schedules - all use HTTP transport for browser-based scraping const defaults = [ { name: 'store_discovery_dutchie', role: 'store_discovery' as TaskRole, interval_hours: 168, // Weekly priority: 5, description: 'Discover new Dutchie stores weekly (HTTP transport)', method: 'http', is_immutable: true, platform: 'dutchie', }, { name: 'analytics_refresh', role: 'analytics_refresh' as TaskRole, interval_hours: 6, priority: 0, description: 'Refresh analytics materialized views every 6 hours', method: 'http', is_immutable: true, platform: null, }, ]; for (const sched of defaults) { try { await pool.query(` INSERT INTO task_schedules (name, role, interval_hours, priority, description, method, is_immutable, platform, enabled, next_run_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true, NOW()) ON CONFLICT (name) DO UPDATE SET method = EXCLUDED.method, is_immutable = EXCLUDED.is_immutable `, [sched.name, sched.role, sched.interval_hours, sched.priority, sched.description, sched.method, sched.is_immutable, sched.platform]); } catch (err: any) { // Table may not exist yet - will be created by migration if (!err.message.includes('does not exist')) { console.error(`[TaskScheduler] Failed to create default schedule ${sched.name}:`, err.message); } } } } /** * Check for and run any due schedules * Per TASK_WORKFLOW_2024-12-10.md: Uses SELECT FOR UPDATE SKIP LOCKED to prevent duplicates */ private async checkAndRunDueSchedules(): Promise { const client = await pool.connect(); try { await client.query('BEGIN'); // Per TASK_WORKFLOW_2024-12-10.md: Atomic claim of due schedules const result = await client.query(` SELECT * FROM task_schedules WHERE enabled = true AND (next_run_at IS NULL OR next_run_at <= NOW()) FOR UPDATE SKIP LOCKED `); for (const schedule of result.rows) { console.log(`[TaskScheduler] Running schedule: ${schedule.name} (${schedule.role})`); try { const tasksCreated = await this.executeSchedule(schedule); console.log(`[TaskScheduler] Schedule ${schedule.name} created ${tasksCreated} tasks`); // Per TASK_WORKFLOW_2024-12-10.md: Update last_run_at and calculate next_run_at await client.query(` UPDATE task_schedules SET last_run_at = NOW(), next_run_at = NOW() + ($1 || ' hours')::interval, last_task_count = $2, updated_at = NOW() WHERE id = $3 `, [schedule.interval_hours, tasksCreated, schedule.id]); } catch (err: any) { console.error(`[TaskScheduler] Schedule ${schedule.name} failed:`, err.message); // Still update next_run_at to prevent infinite retry loop await client.query(` UPDATE task_schedules SET next_run_at = NOW() + ($1 || ' hours')::interval, last_error = $2, updated_at = NOW() WHERE id = $3 `, [schedule.interval_hours, err.message, schedule.id]); } } await client.query('COMMIT'); } catch (err: any) { await client.query('ROLLBACK'); console.error('[TaskScheduler] Failed to check schedules:', err.message); } finally { client.release(); } } /** * Execute a schedule and create tasks * Per TASK_WORKFLOW_2024-12-10.md: Different logic per role * * TRANSPORT MODES: * - All schedules now use HTTP transport (Puppeteer/browser) * - Per-state product_discovery schedules process one state at a time * - Workers must pass HTTP preflight to claim HTTP tasks */ private async executeSchedule(schedule: TaskSchedule): Promise { switch (schedule.role) { case 'product_discovery': // Per-state product discovery using HTTP transport return this.generateProductDiscoveryTasks(schedule); case 'payload_fetch': // DEPRECATED: Legacy payload_fetch redirects to product_discovery console.log(`[TaskScheduler] payload_fetch is deprecated, using product_discovery instead`); return this.generateProductDiscoveryTasks(schedule); case 'product_refresh': // DEPRECATED: Legacy product_refresh redirects to product_discovery console.log(`[TaskScheduler] product_refresh is deprecated, using product_discovery instead`); return this.generateProductDiscoveryTasks(schedule); case 'store_discovery': return this.generateStoreDiscoveryTasks(schedule); case 'analytics_refresh': return this.generateAnalyticsRefreshTasks(schedule); default: console.warn(`[TaskScheduler] Unknown role: ${schedule.role}`); return 0; } } /** * Generate product_discovery tasks for stores in a specific state * Uses HTTP transport (Puppeteer/browser) for all tasks * * Per-state scheduling allows: * - Different crawl frequencies per state (e.g., AZ=4h, MI=6h) * - Better rate limit management (one state at a time) * - Easier debugging and monitoring per state */ private async generateProductDiscoveryTasks(schedule: TaskSchedule): Promise { // state_code is required for per-state schedules if (!schedule.state_code) { console.warn(`[TaskScheduler] Schedule ${schedule.name} has no state_code, skipping`); return 0; } // Find stores in this state needing refresh const result = await pool.query(` SELECT d.id FROM dispensaries d JOIN states s ON d.state_id = s.id WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NOT NULL AND s.code = $1 -- No pending/running product_discovery task already AND NOT EXISTS ( SELECT 1 FROM worker_tasks t WHERE t.dispensary_id = d.id AND t.role = 'product_discovery' AND t.status IN ('pending', 'claimed', 'running') ) -- Never fetched OR last fetch > interval ago AND ( d.last_fetch_at IS NULL OR d.last_fetch_at < NOW() - ($2 || ' hours')::interval ) ORDER BY d.last_fetch_at NULLS FIRST, d.id `, [schedule.state_code, schedule.interval_hours]); const dispensaryIds = result.rows.map((r: { id: number }) => r.id); if (dispensaryIds.length === 0) { console.log(`[TaskScheduler] No stores in ${schedule.state_code} need refresh`); return 0; } console.log(`[TaskScheduler] Creating ${dispensaryIds.length} product_discovery tasks for ${schedule.state_code}`); // Create product_discovery tasks with HTTP transport // Stagger by 15 seconds to prevent overwhelming proxies const { created } = await taskService.createStaggeredTasks( dispensaryIds, 'product_discovery', 15, // 15 seconds apart schedule.platform || 'dutchie', 'http' // Force HTTP transport ); return created; } /** * Generate store_discovery tasks * Uses HTTP transport (Puppeteer/browser) for browser-based discovery */ private async generateStoreDiscoveryTasks(schedule: TaskSchedule): Promise { // Check if discovery task already pending const existing = await taskService.listTasks({ role: 'store_discovery', status: ['pending', 'claimed', 'running'], limit: 1, }); if (existing.length > 0) { console.log('[TaskScheduler] Store discovery task already pending, skipping'); return 0; } await taskService.createTask({ role: 'store_discovery', platform: schedule.platform || 'dutchie', priority: schedule.priority, method: 'http', // Force HTTP transport for browser-based discovery }); return 1; } /** * Generate analytics_refresh tasks * Per TASK_WORKFLOW_2024-12-10.md: Single task to refresh all MVs */ private async generateAnalyticsRefreshTasks(schedule: TaskSchedule): Promise { // Check if analytics task already pending const existing = await taskService.listTasks({ role: 'analytics_refresh', status: ['pending', 'claimed', 'running'], limit: 1, }); if (existing.length > 0) { console.log('[TaskScheduler] Analytics refresh task already pending, skipping'); return 0; } await taskService.createTask({ role: 'analytics_refresh', priority: schedule.priority, }); return 1; } /** * Get all schedules for dashboard display * Returns schedules with full metadata including immutability flag */ async getSchedules(): Promise { try { const result = await pool.query(` SELECT id, name, role, enabled, interval_hours, last_run_at, next_run_at, state_code, priority, method, COALESCE(is_immutable, false) as is_immutable, description, platform, last_task_count, last_error, created_at, updated_at FROM task_schedules ORDER BY CASE role WHEN 'store_discovery' THEN 1 WHEN 'product_discovery' THEN 2 WHEN 'analytics_refresh' THEN 3 ELSE 4 END, state_code NULLS FIRST, name `); return result.rows as TaskSchedule[]; } catch { return []; } } /** * Get a single schedule by ID */ async getSchedule(id: number): Promise { try { const result = await pool.query(` SELECT * FROM task_schedules WHERE id = $1 `, [id]); return result.rows[0] as TaskSchedule || null; } catch { return null; } } /** * Update a schedule * Allows updating: enabled, interval_hours, priority * Does NOT allow updating: name, role, state_code, is_immutable */ async updateSchedule(id: number, updates: Partial): Promise { const setClauses: string[] = []; const values: any[] = []; let paramIndex = 1; if (updates.enabled !== undefined) { setClauses.push(`enabled = $${paramIndex++}`); values.push(updates.enabled); } if (updates.interval_hours !== undefined) { setClauses.push(`interval_hours = $${paramIndex++}`); values.push(updates.interval_hours); } if (updates.priority !== undefined) { setClauses.push(`priority = $${paramIndex++}`); values.push(updates.priority); } if (setClauses.length === 0) return; setClauses.push('updated_at = NOW()'); values.push(id); await pool.query(` UPDATE task_schedules SET ${setClauses.join(', ')} WHERE id = $${paramIndex} `, values); } /** * Delete a schedule (only if not immutable) * Returns true if deleted, false if immutable */ async deleteSchedule(id: number): Promise<{ deleted: boolean; reason?: string }> { // Check if schedule is immutable const result = await pool.query(` SELECT name, is_immutable FROM task_schedules WHERE id = $1 `, [id]); if (result.rows.length === 0) { return { deleted: false, reason: 'Schedule not found' }; } const schedule = result.rows[0]; if (schedule.is_immutable) { return { deleted: false, reason: `Schedule "${schedule.name}" is immutable and cannot be deleted. You can disable it instead.` }; } await pool.query(`DELETE FROM task_schedules WHERE id = $1`, [id]); return { deleted: true }; } /** * Trigger a schedule to run immediately */ async triggerSchedule(id: number): Promise { const result = await pool.query(` SELECT * FROM task_schedules WHERE id = $1 `, [id]); if (result.rows.length === 0) { throw new Error(`Schedule ${id} not found`); } return this.executeSchedule(result.rows[0] as TaskSchedule); } /** * Get schedule statistics for dashboard */ async getScheduleStats(): Promise<{ total: number; enabled: number; byRole: Record; byState: Record; }> { try { const result = await pool.query(` SELECT COUNT(*)::int as total, SUM(CASE WHEN enabled THEN 1 ELSE 0 END)::int as enabled_count, role, state_code FROM task_schedules GROUP BY role, state_code `); let total = 0; let enabled = 0; const byRole: Record = {}; const byState: Record = {}; for (const row of result.rows) { total += row.total; enabled += row.enabled_count; byRole[row.role] = (byRole[row.role] || 0) + row.total; if (row.state_code) { byState[row.state_code] = (byState[row.state_code] || 0) + row.total; } } return { total, enabled, byRole, byState }; } catch { return { total: 0, enabled: 0, byRole: {}, byState: {} }; } } } // Per TASK_WORKFLOW_2024-12-10.md: Singleton instance export const taskScheduler = new TaskScheduler();