/** * Task Service * * Central service for managing worker tasks with: * - Atomic task claiming (per-store locking) * - Task lifecycle management * - Auto-chaining of related tasks * - Capacity planning metrics */ import { pool } from '../db/pool'; import { isTaskPoolPaused } from './task-pool-state'; // Helper to check if a table exists async function tableExists(tableName: string): Promise { const result = await pool.query(` SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = $1 ) as exists `, [tableName]); return result.rows[0].exists; } // Per TASK_WORKFLOW_2024-12-10.md: Task roles // payload_fetch: Hits Dutchie API, saves raw payload to filesystem // product_refresh: Reads local payload, normalizes, upserts to DB export type TaskRole = | 'store_discovery' | 'entry_point_discovery' | 'product_discovery' | 'payload_fetch' // NEW: Fetches from API, saves to disk | 'product_refresh' // CHANGED: Now reads from local payload | 'analytics_refresh'; export type TaskStatus = | 'pending' | 'claimed' | 'running' | 'completed' | 'failed' | 'stale'; export interface WorkerTask { id: number; role: TaskRole; dispensary_id: number | null; dispensary_name?: string; // JOINed from dispensaries dispensary_slug?: string; // JOINed from dispensaries platform: string | null; status: TaskStatus; priority: number; scheduled_for: Date | null; worker_id: string | null; claimed_at: Date | null; started_at: Date | null; completed_at: Date | null; last_heartbeat_at: Date | null; result: Record | null; error_message: string | null; retry_count: number; max_retries: number; payload: Record | null; // Per TASK_WORKFLOW_2024-12-10.md: Task chaining data created_at: Date; updated_at: Date; } export interface CreateTaskParams { role: TaskRole; dispensary_id?: number; platform?: string; priority?: number; scheduled_for?: Date; payload?: Record; // Per TASK_WORKFLOW_2024-12-10.md: For task chaining data } export interface CapacityMetrics { role: string; pending_tasks: number; ready_tasks: number; claimed_tasks: number; running_tasks: number; completed_last_hour: number; failed_last_hour: number; active_workers: number; avg_duration_sec: number | null; tasks_per_worker_hour: number | null; estimated_hours_to_drain: number | null; } export interface TaskFilter { role?: TaskRole; status?: TaskStatus | TaskStatus[]; dispensary_id?: number; worker_id?: string; limit?: number; offset?: number; } class TaskService { /** * Create a new task */ async createTask(params: CreateTaskParams): Promise { const result = await pool.query( `INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for, payload) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *`, [ params.role, params.dispensary_id ?? null, params.platform ?? null, params.priority ?? 0, params.scheduled_for ?? null, params.payload ? JSON.stringify(params.payload) : null, ] ); return result.rows[0] as WorkerTask; } /** * Create multiple tasks in a batch */ async createTasks(tasks: CreateTaskParams[]): Promise { if (tasks.length === 0) return 0; const values = tasks.map((t, i) => { const base = i * 5; return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5})`; }); const params = tasks.flatMap((t) => [ t.role, t.dispensary_id ?? null, t.platform ?? null, t.priority ?? 0, t.scheduled_for ?? null, ]); const result = await pool.query( `INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for) VALUES ${values.join(', ')} ON CONFLICT DO NOTHING`, params ); return result.rowCount ?? 0; } /** * Claim a task atomically for a worker * If role is null, claims ANY available task (role-agnostic worker) * Returns null if task pool is paused. */ async claimTask(role: TaskRole | null, workerId: string): Promise { // Check if task pool is paused - don't claim any tasks if (isTaskPoolPaused()) { return null; } if (role) { // Role-specific claiming - use the SQL function const result = await pool.query( `SELECT * FROM claim_task($1, $2)`, [role, workerId] ); return (result.rows[0] as WorkerTask) || null; } // Role-agnostic claiming - claim ANY pending task const result = await pool.query(` UPDATE worker_tasks SET status = 'claimed', worker_id = $1, claimed_at = NOW() WHERE id = ( SELECT id FROM worker_tasks WHERE status = 'pending' AND (scheduled_for IS NULL OR scheduled_for <= NOW()) -- Exclude stores that already have an active task AND (dispensary_id IS NULL OR dispensary_id NOT IN ( SELECT dispensary_id FROM worker_tasks WHERE status IN ('claimed', 'running') AND dispensary_id IS NOT NULL )) ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING * `, [workerId]); return (result.rows[0] as WorkerTask) || null; } /** * Mark a task as running (worker started processing) */ async startTask(taskId: number): Promise { await pool.query( `UPDATE worker_tasks SET status = 'running', started_at = NOW(), last_heartbeat_at = NOW() WHERE id = $1`, [taskId] ); } /** * Update heartbeat to prevent stale detection */ async heartbeat(taskId: number): Promise { await pool.query( `UPDATE worker_tasks SET last_heartbeat_at = NOW() WHERE id = $1 AND status = 'running'`, [taskId] ); } /** * Mark a task as completed */ async completeTask(taskId: number, result?: Record): Promise { await pool.query( `UPDATE worker_tasks SET status = 'completed', completed_at = NOW(), result = $2 WHERE id = $1`, [taskId, result ? JSON.stringify(result) : null] ); } /** * Mark a task as failed, with auto-retry if under max_retries * Returns true if task was re-queued for retry, false if permanently failed */ async failTask(taskId: number, errorMessage: string): Promise { // Get current retry state const result = await pool.query( `SELECT retry_count, max_retries FROM worker_tasks WHERE id = $1`, [taskId] ); if (result.rows.length === 0) { return false; } const { retry_count, max_retries } = result.rows[0]; const newRetryCount = (retry_count || 0) + 1; if (newRetryCount < (max_retries || 3)) { // Re-queue for retry - reset to pending with incremented retry_count await pool.query( `UPDATE worker_tasks SET status = 'pending', worker_id = NULL, claimed_at = NULL, started_at = NULL, retry_count = $2, error_message = $3, updated_at = NOW() WHERE id = $1`, [taskId, newRetryCount, `Retry ${newRetryCount}: ${errorMessage}`] ); console.log(`[TaskService] Task ${taskId} queued for retry ${newRetryCount}/${max_retries || 3}`); return true; } // Max retries exceeded - mark as permanently failed await pool.query( `UPDATE worker_tasks SET status = 'failed', completed_at = NOW(), retry_count = $2, error_message = $3 WHERE id = $1`, [taskId, newRetryCount, `Failed after ${newRetryCount} attempts: ${errorMessage}`] ); console.log(`[TaskService] Task ${taskId} permanently failed after ${newRetryCount} attempts`); return false; } /** * Get a task by ID */ async getTask(taskId: number): Promise { const result = await pool.query( `SELECT * FROM worker_tasks WHERE id = $1`, [taskId] ); return (result.rows[0] as WorkerTask) || null; } /** * List tasks with filters */ async listTasks(filter: TaskFilter = {}): Promise { // Return empty list if table doesn't exist if (!await tableExists('worker_tasks')) { return []; } const conditions: string[] = []; const params: (string | number | string[])[] = []; let paramIndex = 1; if (filter.role) { conditions.push(`t.role = $${paramIndex++}`); params.push(filter.role); } if (filter.status) { if (Array.isArray(filter.status)) { conditions.push(`t.status = ANY($${paramIndex++})`); params.push(filter.status); } else { conditions.push(`t.status = $${paramIndex++}`); params.push(filter.status); } } if (filter.dispensary_id) { conditions.push(`t.dispensary_id = $${paramIndex++}`); params.push(filter.dispensary_id); } if (filter.worker_id) { conditions.push(`t.worker_id = $${paramIndex++}`); params.push(filter.worker_id); } const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; const limit = filter.limit ?? 100; const offset = filter.offset ?? 0; const result = await pool.query( `SELECT t.*, d.name as dispensary_name, d.slug as dispensary_slug FROM worker_tasks t LEFT JOIN dispensaries d ON d.id = t.dispensary_id ${whereClause} ORDER BY t.created_at DESC LIMIT ${limit} OFFSET ${offset}`, params ); return result.rows as WorkerTask[]; } /** * Get capacity metrics for all roles */ async getCapacityMetrics(): Promise { // Return empty metrics if worker_tasks table doesn't exist if (!await tableExists('worker_tasks')) { return []; } try { const result = await pool.query( `SELECT * FROM v_worker_capacity` ); return result.rows as CapacityMetrics[]; } catch { // View may not exist return []; } } /** * Get capacity metrics for a specific role */ async getRoleCapacity(role: TaskRole): Promise { // Return null if worker_tasks table doesn't exist if (!await tableExists('worker_tasks')) { return null; } try { const result = await pool.query( `SELECT * FROM v_worker_capacity WHERE role = $1`, [role] ); return (result.rows[0] as CapacityMetrics) || null; } catch { // View may not exist return null; } } /** * Recover stale tasks from dead workers */ async recoverStaleTasks(staleThresholdMinutes = 10): Promise { const result = await pool.query( `SELECT recover_stale_tasks($1)`, [staleThresholdMinutes] ); return (result.rows[0] as { recover_stale_tasks: number })?.recover_stale_tasks ?? 0; } /** * Generate daily resync tasks for all active stores */ async generateDailyResyncTasks(batchesPerDay = 6, date?: Date): Promise { const result = await pool.query( `SELECT generate_resync_tasks($1, $2)`, [batchesPerDay, date ?? new Date()] ); return (result.rows[0] as { generate_resync_tasks: number })?.generate_resync_tasks ?? 0; } /** * Chain next task after completion * Called automatically when a task completes successfully * * Per TASK_WORKFLOW_2024-12-10.md: Task chaining flow: * * Discovery flow (new stores): * store_discovery → product_discovery → payload_fetch → product_refresh * * Scheduled flow (existing stores): * payload_fetch → product_refresh * * Note: entry_point_discovery is deprecated since platform_dispensary_id * is now resolved during store promotion. */ async chainNextTask(completedTask: WorkerTask): Promise { if (completedTask.status !== 'completed') { return null; } switch (completedTask.role) { case 'store_discovery': { // Per TASK_WORKFLOW_2024-12-10.md: New stores discovered -> create product_discovery tasks // Skip entry_point_discovery since platform_dispensary_id is set during promotion const newStoreIds = (completedTask.result as { newStoreIds?: number[] })?.newStoreIds; if (newStoreIds && newStoreIds.length > 0) { console.log(`[TaskService] Chaining ${newStoreIds.length} product_discovery tasks for new stores`); for (const storeId of newStoreIds) { await this.createTask({ role: 'product_discovery', dispensary_id: storeId, platform: completedTask.platform ?? undefined, priority: 10, // High priority for new stores }); } } break; } case 'entry_point_discovery': { // DEPRECATED: Entry point resolution now happens during store promotion // Kept for backward compatibility with any in-flight tasks const success = (completedTask.result as { success?: boolean })?.success; if (success && completedTask.dispensary_id) { return this.createTask({ role: 'product_discovery', dispensary_id: completedTask.dispensary_id, platform: completedTask.platform ?? undefined, priority: 10, }); } break; } case 'product_discovery': { // Per TASK_WORKFLOW_2024-12-10.md: Product discovery chains internally to payload_fetch // No external chaining needed - handleProductDiscovery calls handlePayloadFetch directly break; } case 'payload_fetch': { // Per TASK_WORKFLOW_2024-12-10.md: payload_fetch chains to product_refresh // This is handled internally by the payload_fetch handler via taskService.createTask // No external chaining needed here break; } } return null; } /** * Create store discovery task for a platform/state */ async createStoreDiscoveryTask( platform: string, stateCode?: string, priority = 0 ): Promise { return this.createTask({ role: 'store_discovery', platform, priority, }); } /** * Create entry point discovery task for a specific store */ async createEntryPointTask( dispensaryId: number, platform: string, priority = 10 ): Promise { return this.createTask({ role: 'entry_point_discovery', dispensary_id: dispensaryId, platform, priority, }); } /** * Create product discovery task for a specific store */ async createProductDiscoveryTask( dispensaryId: number, platform: string, priority = 10 ): Promise { return this.createTask({ role: 'product_discovery', dispensary_id: dispensaryId, platform, priority, }); } /** * Get task counts by status for dashboard */ async getTaskCounts(): Promise> { const counts: Record = { pending: 0, claimed: 0, running: 0, completed: 0, failed: 0, stale: 0, }; // Return empty counts if table doesn't exist if (!await tableExists('worker_tasks')) { return counts; } const result = await pool.query( `SELECT status, COUNT(*) as count FROM worker_tasks GROUP BY status` ); for (const row of result.rows) { const typedRow = row as { status: TaskStatus; count: string }; counts[typedRow.status] = parseInt(typedRow.count, 10); } return counts; } /** * Get recent task completions for a role */ async getRecentCompletions(role: TaskRole, limit = 10): Promise { const result = await pool.query( `SELECT * FROM worker_tasks WHERE role = $1 AND status = 'completed' ORDER BY completed_at DESC LIMIT $2`, [role, limit] ); return result.rows as WorkerTask[]; } /** * Check if a store has any active tasks */ async hasActiveTask(dispensaryId: number): Promise { const result = await pool.query( `SELECT EXISTS( SELECT 1 FROM worker_tasks WHERE dispensary_id = $1 AND status IN ('claimed', 'running') ) as exists`, [dispensaryId] ); return (result.rows[0] as { exists: boolean })?.exists ?? false; } /** * Get the last completion time for a role */ async getLastCompletion(role: TaskRole): Promise { const result = await pool.query( `SELECT MAX(completed_at) as completed_at FROM worker_tasks WHERE role = $1 AND status = 'completed'`, [role] ); return (result.rows[0] as { completed_at: Date | null })?.completed_at ?? null; } /** * Calculate workers needed to complete tasks within SLA */ async calculateWorkersNeeded(role: TaskRole, slaHours: number): Promise { const capacity = await this.getRoleCapacity(role); if (!capacity || !capacity.tasks_per_worker_hour) { return 1; // Default to 1 worker if no data } const pendingTasks = capacity.pending_tasks; const tasksPerWorkerHour = capacity.tasks_per_worker_hour; const totalTaskCapacityNeeded = pendingTasks / slaHours; return Math.ceil(totalTaskCapacityNeeded / tasksPerWorkerHour); } } export const taskService = new TaskService();