/** * Task Queue API Routes * * Endpoints for managing worker tasks, viewing capacity metrics, * and generating batch tasks. */ import { Router, Request, Response } from 'express'; import { taskService, TaskRole, TaskStatus, TaskFilter, } from '../tasks/task-service'; import { pool } from '../db/pool'; import { isTaskPoolPaused, pauseTaskPool, resumeTaskPool, getTaskPoolStatus, } from '../tasks/task-pool-state'; const router = Router(); /** * GET /api/tasks * List tasks with optional filters * * Query params: * - role: Filter by role * - status: Filter by status (comma-separated for multiple) * - dispensary_id: Filter by dispensary * - worker_id: Filter by worker * - limit: Max results (default 100) * - offset: Pagination offset */ router.get('/', async (req: Request, res: Response) => { try { const filter: TaskFilter = {}; if (req.query.role) { filter.role = req.query.role as TaskRole; } if (req.query.status) { const statuses = (req.query.status as string).split(',') as TaskStatus[]; filter.status = statuses.length === 1 ? statuses[0] : statuses; } if (req.query.dispensary_id) { filter.dispensary_id = parseInt(req.query.dispensary_id as string, 10); } if (req.query.worker_id) { filter.worker_id = req.query.worker_id as string; } if (req.query.limit) { filter.limit = parseInt(req.query.limit as string, 10); } if (req.query.offset) { filter.offset = parseInt(req.query.offset as string, 10); } const tasks = await taskService.listTasks(filter); res.json({ tasks, count: tasks.length }); } catch (error: unknown) { console.error('Error listing tasks:', error); res.status(500).json({ error: 'Failed to list tasks' }); } }); /** * GET /api/tasks/counts * Get task counts by status */ router.get('/counts', async (_req: Request, res: Response) => { try { const counts = await taskService.getTaskCounts(); res.json(counts); } catch (error: unknown) { console.error('Error getting task counts:', error); res.status(500).json({ error: 'Failed to get task counts' }); } }); /** * GET /api/tasks/capacity * Get capacity metrics for all roles */ router.get('/capacity', async (_req: Request, res: Response) => { try { const metrics = await taskService.getCapacityMetrics(); res.json({ metrics }); } catch (error: unknown) { console.error('Error getting capacity metrics:', error); res.status(500).json({ error: 'Failed to get capacity metrics' }); } }); /** * GET /api/tasks/capacity/:role * Get capacity metrics for a specific role */ router.get('/capacity/:role', async (req: Request, res: Response) => { try { const role = req.params.role as TaskRole; const capacity = await taskService.getRoleCapacity(role); if (!capacity) { return res.status(404).json({ error: 'Role not found or no data' }); } // Calculate workers needed for different SLAs const workersFor1Hour = await taskService.calculateWorkersNeeded(role, 1); const workersFor4Hours = await taskService.calculateWorkersNeeded(role, 4); const workersFor8Hours = await taskService.calculateWorkersNeeded(role, 8); res.json({ ...capacity, workers_needed: { for_1_hour: workersFor1Hour, for_4_hours: workersFor4Hours, for_8_hours: workersFor8Hours, }, }); } catch (error: unknown) { console.error('Error getting role capacity:', error); res.status(500).json({ error: 'Failed to get role capacity' }); } }); /** * GET /api/tasks/:id * Get a specific task by ID */ router.get('/:id', async (req: Request, res: Response) => { try { const taskId = parseInt(req.params.id, 10); const task = await taskService.getTask(taskId); if (!task) { return res.status(404).json({ error: 'Task not found' }); } res.json(task); } catch (error: unknown) { console.error('Error getting task:', error); res.status(500).json({ error: 'Failed to get task' }); } }); /** * DELETE /api/tasks/:id * Delete a specific task by ID * Only allows deletion of failed, completed, or pending tasks (not running) */ router.delete('/:id', async (req: Request, res: Response) => { try { const taskId = parseInt(req.params.id, 10); // First check if task exists and its status const task = await taskService.getTask(taskId); if (!task) { return res.status(404).json({ error: 'Task not found' }); } // Don't allow deleting running tasks if (task.status === 'running' || task.status === 'claimed') { return res.status(400).json({ error: 'Cannot delete a running or claimed task' }); } // Delete the task await pool.query('DELETE FROM worker_tasks WHERE id = $1', [taskId]); res.json({ success: true, message: `Task ${taskId} deleted` }); } catch (error: unknown) { console.error('Error deleting task:', error); res.status(500).json({ error: 'Failed to delete task' }); } }); /** * POST /api/tasks * Create a new task * * Body: * - role: TaskRole (required) * - dispensary_id: number (optional) * - platform: string (optional) * - priority: number (optional, default 0) * - scheduled_for: ISO date string (optional) */ router.post('/', async (req: Request, res: Response) => { try { const { role, dispensary_id, platform, priority, scheduled_for } = req.body; if (!role) { return res.status(400).json({ error: 'Role is required' }); } // Check if store already has an active task if (dispensary_id) { const hasActive = await taskService.hasActiveTask(dispensary_id); if (hasActive) { return res.status(409).json({ error: 'Store already has an active task', dispensary_id, }); } } const task = await taskService.createTask({ role, dispensary_id, platform, priority, scheduled_for: scheduled_for ? new Date(scheduled_for) : undefined, }); res.status(201).json(task); } catch (error: unknown) { console.error('Error creating task:', error); res.status(500).json({ error: 'Failed to create task' }); } }); /** * POST /api/tasks/generate/resync * Generate daily resync tasks for all active stores * * Body: * - batches_per_day: number (optional, default 6 = every 4 hours) * - date: ISO date string (optional, default today) */ router.post('/generate/resync', async (req: Request, res: Response) => { try { const { batches_per_day, date } = req.body; const batchesPerDay = batches_per_day ?? 6; const targetDate = date ? new Date(date) : new Date(); const createdCount = await taskService.generateDailyResyncTasks( batchesPerDay, targetDate ); res.json({ success: true, tasks_created: createdCount, batches_per_day: batchesPerDay, date: targetDate.toISOString().split('T')[0], }); } catch (error: unknown) { console.error('Error generating resync tasks:', error); res.status(500).json({ error: 'Failed to generate resync tasks' }); } }); /** * POST /api/tasks/generate/discovery * Generate store discovery tasks for a platform * * Body: * - platform: string (required, e.g., 'dutchie') * - state_code: string (optional, e.g., 'AZ') * - priority: number (optional) */ router.post('/generate/discovery', async (req: Request, res: Response) => { try { const { platform, state_code, priority } = req.body; if (!platform) { return res.status(400).json({ error: 'Platform is required' }); } const task = await taskService.createStoreDiscoveryTask( platform, state_code, priority ?? 0 ); res.status(201).json(task); } catch (error: unknown) { console.error('Error creating discovery task:', error); res.status(500).json({ error: 'Failed to create discovery task' }); } }); /** * POST /api/tasks/recover-stale * Recover stale tasks from dead workers * * Body: * - threshold_minutes: number (optional, default 10) */ router.post('/recover-stale', async (req: Request, res: Response) => { try { const { threshold_minutes } = req.body; const recovered = await taskService.recoverStaleTasks(threshold_minutes ?? 10); res.json({ success: true, tasks_recovered: recovered, }); } catch (error: unknown) { console.error('Error recovering stale tasks:', error); res.status(500).json({ error: 'Failed to recover stale tasks' }); } }); /** * GET /api/tasks/role/:role/last-completion * Get the last completion time for a role */ router.get('/role/:role/last-completion', async (req: Request, res: Response) => { try { const role = req.params.role as TaskRole; const lastCompletion = await taskService.getLastCompletion(role); res.json({ role, last_completion: lastCompletion?.toISOString() ?? null, time_since: lastCompletion ? Math.floor((Date.now() - lastCompletion.getTime()) / 1000) : null, }); } catch (error: unknown) { console.error('Error getting last completion:', error); res.status(500).json({ error: 'Failed to get last completion' }); } }); /** * GET /api/tasks/role/:role/recent * Get recent completions for a role */ router.get('/role/:role/recent', async (req: Request, res: Response) => { try { const role = req.params.role as TaskRole; const limit = parseInt(req.query.limit as string, 10) || 10; const tasks = await taskService.getRecentCompletions(role, limit); res.json({ tasks }); } catch (error: unknown) { console.error('Error getting recent completions:', error); res.status(500).json({ error: 'Failed to get recent completions' }); } }); /** * GET /api/tasks/store/:dispensaryId/active * Check if a store has an active task */ router.get('/store/:dispensaryId/active', async (req: Request, res: Response) => { try { const dispensaryId = parseInt(req.params.dispensaryId, 10); const hasActive = await taskService.hasActiveTask(dispensaryId); res.json({ dispensary_id: dispensaryId, has_active_task: hasActive, }); } catch (error: unknown) { console.error('Error checking active task:', error); res.status(500).json({ error: 'Failed to check active task' }); } }); // ============================================================ // MIGRATION ROUTES - Disable old job systems // ============================================================ /** * GET /api/tasks/migration/status * Get status of old job systems vs new task queue */ router.get('/migration/status', async (_req: Request, res: Response) => { try { // Get old job system counts const [schedules, crawlJobs, rawPayloads, taskCounts] = await Promise.all([ pool.query(` SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE enabled = true) as enabled FROM job_schedules `), pool.query(` SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE status = 'pending') as pending, COUNT(*) FILTER (WHERE status = 'running') as running FROM dispensary_crawl_jobs `), pool.query(` SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE processed = false) as unprocessed FROM raw_payloads `), taskService.getTaskCounts(), ]); res.json({ old_systems: { job_schedules: { total: parseInt(schedules.rows[0].total) || 0, enabled: parseInt(schedules.rows[0].enabled) || 0, }, dispensary_crawl_jobs: { total: parseInt(crawlJobs.rows[0].total) || 0, pending: parseInt(crawlJobs.rows[0].pending) || 0, running: parseInt(crawlJobs.rows[0].running) || 0, }, raw_payloads: { total: parseInt(rawPayloads.rows[0].total) || 0, unprocessed: parseInt(rawPayloads.rows[0].unprocessed) || 0, }, }, new_task_queue: taskCounts, recommendation: schedules.rows[0].enabled > 0 ? 'Disable old job schedules before switching to new task queue' : 'Ready to use new task queue', }); } catch (error: unknown) { console.error('Error getting migration status:', error); res.status(500).json({ error: 'Failed to get migration status' }); } }); /** * POST /api/tasks/migration/disable-old-schedules * Disable all old job schedules to prepare for new task queue */ router.post('/migration/disable-old-schedules', async (_req: Request, res: Response) => { try { const result = await pool.query(` UPDATE job_schedules SET enabled = false, updated_at = NOW() WHERE enabled = true RETURNING id, job_name `); res.json({ success: true, disabled_count: result.rowCount, disabled_schedules: result.rows.map(r => ({ id: r.id, job_name: r.job_name })), }); } catch (error: unknown) { console.error('Error disabling old schedules:', error); res.status(500).json({ error: 'Failed to disable old schedules' }); } }); /** * POST /api/tasks/migration/cancel-pending-crawl-jobs * Cancel all pending crawl jobs from the old system */ router.post('/migration/cancel-pending-crawl-jobs', async (_req: Request, res: Response) => { try { const result = await pool.query(` UPDATE dispensary_crawl_jobs SET status = 'cancelled', completed_at = NOW(), updated_at = NOW() WHERE status = 'pending' RETURNING id `); res.json({ success: true, cancelled_count: result.rowCount, }); } catch (error: unknown) { console.error('Error cancelling pending crawl jobs:', error); res.status(500).json({ error: 'Failed to cancel pending crawl jobs' }); } }); /** * POST /api/tasks/migration/create-resync-tasks * Create product_refresh tasks for all crawl-enabled dispensaries */ router.post('/migration/create-resync-tasks', async (req: Request, res: Response) => { try { const { priority = 0, state_code } = req.body; let query = ` SELECT id, name FROM dispensaries WHERE crawl_enabled = true AND platform_dispensary_id IS NOT NULL `; const params: any[] = []; if (state_code) { query += ` AND state_id = (SELECT id FROM states WHERE code = $1) `; params.push(state_code.toUpperCase()); } query += ` ORDER BY id`; const dispensaries = await pool.query(query, params); let created = 0; for (const disp of dispensaries.rows) { // Check if already has pending/running task const hasActive = await taskService.hasActiveTask(disp.id); if (!hasActive) { await taskService.createTask({ role: 'product_refresh', dispensary_id: disp.id, platform: 'dutchie', priority, }); created++; } } res.json({ success: true, tasks_created: created, dispensaries_checked: dispensaries.rows.length, state_filter: state_code || 'all', }); } catch (error: unknown) { console.error('Error creating resync tasks:', error); res.status(500).json({ error: 'Failed to create resync tasks' }); } }); /** * POST /api/tasks/migration/full-migrate * One-click migration: disable old systems, create new tasks */ router.post('/migration/full-migrate', async (req: Request, res: Response) => { try { const results: any = { success: true, steps: [], }; // Step 1: Disable old job schedules const disableResult = await pool.query(` UPDATE job_schedules SET enabled = false, updated_at = NOW() WHERE enabled = true RETURNING id `); results.steps.push({ step: 'disable_job_schedules', count: disableResult.rowCount, }); // Step 2: Cancel pending crawl jobs const cancelResult = await pool.query(` UPDATE dispensary_crawl_jobs SET status = 'cancelled', completed_at = NOW(), updated_at = NOW() WHERE status = 'pending' RETURNING id `); results.steps.push({ step: 'cancel_pending_crawl_jobs', count: cancelResult.rowCount, }); // Step 3: Generate initial resync tasks const resyncCount = await taskService.generateDailyResyncTasks(6); results.steps.push({ step: 'generate_resync_tasks', count: resyncCount, }); // Step 4: Create store discovery task const discoveryTask = await taskService.createStoreDiscoveryTask('dutchie', undefined, 0); results.steps.push({ step: 'create_discovery_task', task_id: discoveryTask.id, }); // Step 5: Create analytics refresh task const analyticsTask = await taskService.createTask({ role: 'analytics_refresh', priority: 0, }); results.steps.push({ step: 'create_analytics_task', task_id: analyticsTask.id, }); results.message = 'Migration complete. New task workers will pick up tasks.'; res.json(results); } catch (error: unknown) { console.error('Error during full migration:', error); res.status(500).json({ error: 'Failed to complete migration' }); } }); /** * GET /api/tasks/pool/status * Check if task pool is paused */ router.get('/pool/status', async (_req: Request, res: Response) => { const status = getTaskPoolStatus(); res.json({ success: true, ...status, }); }); /** * POST /api/tasks/pool/pause * Pause the task pool - workers won't pick up new tasks */ router.post('/pool/pause', async (_req: Request, res: Response) => { pauseTaskPool(); res.json({ success: true, paused: true, message: 'Task pool paused - workers will not pick up new tasks', }); }); /** * POST /api/tasks/pool/resume * Resume the task pool - workers will pick up tasks again */ router.post('/pool/resume', async (_req: Request, res: Response) => { resumeTaskPool(); res.json({ success: true, paused: false, message: 'Task pool resumed - workers will pick up new tasks', }); }); export default router;