diff --git a/backend/migrations/115_task_proxy_ip.sql b/backend/migrations/115_task_proxy_ip.sql new file mode 100644 index 00000000..6ed7179c --- /dev/null +++ b/backend/migrations/115_task_proxy_ip.sql @@ -0,0 +1,17 @@ +-- Migration: Add proxy_ip tracking to worker_tasks +-- Purpose: Prevent same IP from hitting multiple stores on same platform simultaneously +-- +-- Anti-detection measure: Dutchie/Jane may flag if same IP makes requests +-- for multiple different stores. This column lets us track and prevent that. + +-- Add proxy_ip column to track which proxy IP is being used for each task +ALTER TABLE worker_tasks ADD COLUMN IF NOT EXISTS proxy_ip VARCHAR(45); + +-- Index for quick lookup of active tasks by proxy IP +-- Used to check: "Is this IP already hitting another store?" +CREATE INDEX IF NOT EXISTS idx_worker_tasks_proxy_ip_active +ON worker_tasks (proxy_ip, platform) +WHERE status IN ('claimed', 'running') AND proxy_ip IS NOT NULL; + +-- Comment +COMMENT ON COLUMN worker_tasks.proxy_ip IS 'Proxy IP assigned to this task. Used to prevent same IP hitting multiple stores on same platform.'; diff --git a/backend/migrations/116_task_source_tracking.sql b/backend/migrations/116_task_source_tracking.sql new file mode 100644 index 00000000..1c9691df --- /dev/null +++ b/backend/migrations/116_task_source_tracking.sql @@ -0,0 +1,16 @@ +-- Migration: Add source tracking columns to worker_tasks +-- Purpose: Track where tasks originated from (schedule, API, manual) + +-- Add source tracking columns +ALTER TABLE worker_tasks ADD COLUMN IF NOT EXISTS source VARCHAR(50); +ALTER TABLE worker_tasks ADD COLUMN IF NOT EXISTS source_schedule_id INTEGER REFERENCES task_schedules(id); +ALTER TABLE worker_tasks ADD COLUMN IF NOT EXISTS source_metadata JSONB; + +-- Index for tracking tasks by schedule +CREATE INDEX IF NOT EXISTS idx_worker_tasks_source_schedule +ON worker_tasks (source_schedule_id) WHERE source_schedule_id IS NOT NULL; + +-- Comments +COMMENT ON COLUMN worker_tasks.source IS 'Origin of task: schedule, api, manual, chain'; +COMMENT ON COLUMN worker_tasks.source_schedule_id IS 'ID of schedule that created this task'; +COMMENT ON COLUMN worker_tasks.source_metadata IS 'Additional metadata about task origin'; diff --git a/backend/src/routes/tasks.ts b/backend/src/routes/tasks.ts index e91142bb..c93e7eed 100644 --- a/backend/src/routes/tasks.ts +++ b/backend/src/routes/tasks.ts @@ -225,23 +225,27 @@ router.get('/schedules', async (req: Request, res: Response) => { query = ` SELECT ts.id, ts.name, ts.role, ts.description, ts.enabled, ts.interval_hours, ts.priority, ts.state_code, ts.pool_id, tp.display_name as pool_name, + ts.dispensary_id, d.name as dispensary_name, ts.platform, ts.method, COALESCE(ts.is_immutable, false) as is_immutable, ts.last_run_at, ts.next_run_at, ts.last_task_count, ts.last_error, ts.created_at, ts.updated_at FROM task_schedules ts LEFT JOIN task_pools tp ON tp.id = ts.pool_id + LEFT JOIN dispensaries d ON d.id = ts.dispensary_id `; } else { // Fallback query without pool_id (migration 114 not yet run) query = ` SELECT ts.id, ts.name, ts.role, ts.description, ts.enabled, ts.interval_hours, ts.priority, ts.state_code, NULL::integer as pool_id, NULL::text as pool_name, + ts.dispensary_id, d.name as dispensary_name, ts.platform, ts.method, COALESCE(ts.is_immutable, false) as is_immutable, ts.last_run_at, ts.next_run_at, ts.last_task_count, ts.last_error, ts.created_at, ts.updated_at FROM task_schedules ts + LEFT JOIN dispensaries d ON d.id = ts.dispensary_id `; } @@ -534,13 +538,23 @@ router.put('/schedules/:id', async (req: Request, res: Response) => { SET ${updates.join(', ')} WHERE id = $${paramIndex} RETURNING id, name, role, description, enabled, interval_hours, - priority, state_code, platform, method, + priority, state_code, platform, method, dispensary_id, pool_id, COALESCE(is_immutable, false) as is_immutable, last_run_at, next_run_at, last_task_count, last_error, created_at, updated_at `, values); - res.json(result.rows[0]); + // Add dispensary_name if dispensary_id is set + const updatedSchedule = result.rows[0]; + if (updatedSchedule.dispensary_id) { + const dispResult = await pool.query( + 'SELECT name FROM dispensaries WHERE id = $1', + [updatedSchedule.dispensary_id] + ); + updatedSchedule.dispensary_name = dispResult.rows[0]?.name || null; + } + + res.json(updatedSchedule); } catch (error: any) { if (error.code === '23505') { return res.status(409).json({ error: 'A schedule with this name already exists' }); diff --git a/backend/src/tasks/handlers/product-discovery-dutchie.ts b/backend/src/tasks/handlers/product-discovery-dutchie.ts index 56bdc49e..71dc1436 100644 --- a/backend/src/tasks/handlers/product-discovery-dutchie.ts +++ b/backend/src/tasks/handlers/product-discovery-dutchie.ts @@ -382,7 +382,9 @@ export async function handleProductDiscoveryDutchie(ctx: TaskContext): Promise): Promise { + async completeTask(taskId: number, result?: Record): Promise { await pool.query( `UPDATE worker_tasks SET status = 'completed', completed_at = NOW(), result = $2 WHERE id = $1`, [taskId, result ? JSON.stringify(result) : null] ); + + // Verify completion was recorded + const verify = await pool.query( + `SELECT status FROM worker_tasks WHERE id = $1`, + [taskId] + ); + + if (verify.rows[0]?.status !== 'completed') { + console.error(`[TaskService] Task ${taskId} completion NOT VERIFIED - DB shows status: ${verify.rows[0]?.status}`); + return false; + } + + return true; } /** @@ -288,6 +302,19 @@ class TaskService { console.log(`[TaskService] Task ${taskId} released back to pending`); } + /** + * Verify a task claim by checking DB state matches expected + * Returns true if task is claimed by the specified worker + */ + async verifyTaskClaim(taskId: number, workerId: string): Promise { + const result = await pool.query( + `SELECT id FROM worker_tasks + WHERE id = $1 AND worker_id = $2 AND status = 'claimed'`, + [taskId, workerId] + ); + return result.rows.length > 0; + } + /** * Determine if an error is a "soft failure" (transient) that should be requeued * Soft failures: timeouts, connection issues, browser launch issues @@ -871,7 +898,7 @@ class TaskService { * @param staleMinutes - Tasks older than this (based on last_heartbeat_at or claimed_at) are reset * @returns Object with cleanup stats */ - async cleanupStaleTasks(staleMinutes: number = 30): Promise<{ + async cleanupStaleTasks(staleMinutes: number = 3): Promise<{ cleaned: number; byStatus: { claimed: number; running: number }; byRole: Record; @@ -941,6 +968,214 @@ class TaskService { return Math.ceil(totalTaskCapacityNeeded / tasksPerWorkerHour); } + + /** + * Get all active tasks (claimed/running) for a specific worker + * Used for reconciliation between worker memory and DB state + */ + async getWorkerActiveTasks(workerId: string): Promise<{ id: number; role: TaskRole; dispensary_id: number | null; claimed_at: Date }[]> { + const result = await pool.query( + `SELECT id, role, dispensary_id, claimed_at + FROM worker_tasks + WHERE worker_id = $1 AND status IN ('claimed', 'running')`, + [workerId] + ); + return result.rows; + } + + /** + * Verify task output actually exists for critical task types + * Checks that the task produced expected artifacts (MinIO payload, DB updates, etc.) + * + * @param task - The completed task to verify + * @returns true if output verified, false if output verification failed + */ + async verifyTaskOutput(task: WorkerTask): Promise<{ verified: boolean; reason?: string }> { + if (!task.dispensary_id) { + // Non-dispensary tasks don't need output verification + return { verified: true }; + } + + const claimedAt = task.claimed_at || task.created_at; + + switch (task.role) { + case 'product_refresh': + case 'product_discovery': { + // Verify payload was saved to raw_crawl_payloads after task was claimed + const payloadResult = await pool.query( + `SELECT id, product_count, fetched_at + FROM raw_crawl_payloads + WHERE dispensary_id = $1 + AND fetched_at > $2 + ORDER BY fetched_at DESC + LIMIT 1`, + [task.dispensary_id, claimedAt] + ); + + if (payloadResult.rows.length === 0) { + return { + verified: false, + reason: `No payload found in raw_crawl_payloads for dispensary ${task.dispensary_id} after ${claimedAt}` + }; + } + + const payload = payloadResult.rows[0]; + + // Verify products were actually updated + const productResult = await pool.query( + `SELECT COUNT(*)::int as count + FROM store_products + WHERE dispensary_id = $1 + AND updated_at > $2`, + [task.dispensary_id, claimedAt] + ); + + const productsUpdated = productResult.rows[0]?.count || 0; + + // If payload has products but none were updated, something went wrong + if (payload.product_count > 0 && productsUpdated === 0) { + return { + verified: false, + reason: `Payload has ${payload.product_count} products but 0 store_products were updated` + }; + } + + return { verified: true }; + } + + case 'payload_fetch': { + // Verify payload was saved + const result = await pool.query( + `SELECT id FROM raw_crawl_payloads + WHERE dispensary_id = $1 AND fetched_at > $2 + LIMIT 1`, + [task.dispensary_id, claimedAt] + ); + + if (result.rows.length === 0) { + return { + verified: false, + reason: `No payload found for dispensary ${task.dispensary_id} after ${claimedAt}` + }; + } + + return { verified: true }; + } + + default: + // Other task types - trust the handler + return { verified: true }; + } + } + + /** + * Release a task with a reason (for orphan reconciliation) + */ + async releaseTaskWithReason(taskId: number, reason: string): Promise { + await pool.query( + `UPDATE worker_tasks + SET status = 'pending', + worker_id = NULL, + claimed_at = NULL, + started_at = NULL, + proxy_ip = NULL, + error_message = CONCAT(COALESCE(error_message, ''), ' [', $2, ']'), + updated_at = NOW() + WHERE id = $1 AND status IN ('claimed', 'running')`, + [taskId, reason] + ); + console.log(`[TaskService] Task ${taskId} released: ${reason}`); + } + + // ========================================================================== + // IP-PER-STORE TRACKING + // ========================================================================== + // Prevents same proxy IP from hitting multiple stores on same platform. + // Anti-detection measure: platforms may flag same IP accessing multiple stores. + // ========================================================================== + + /** + * Check if a proxy IP is already being used for another store on the same platform + * + * @param proxyIp - The proxy IP to check + * @param platform - The platform (dutchie, jane, treez) + * @param dispensaryId - The store we want to use this IP for + * @returns Object with inUse flag and details if conflicting + */ + async checkProxyIpConflict( + proxyIp: string, + platform: string, + dispensaryId: number + ): Promise<{ inUse: boolean; conflictingTask?: { id: number; dispensary_id: number; dispensary_name?: string } }> { + const result = await pool.query( + `SELECT t.id, t.dispensary_id, d.name as dispensary_name + FROM worker_tasks t + LEFT JOIN dispensaries d ON d.id = t.dispensary_id + WHERE t.proxy_ip = $1 + AND t.platform = $2 + AND t.dispensary_id != $3 + AND t.status IN ('claimed', 'running') + LIMIT 1`, + [proxyIp, platform, dispensaryId] + ); + + if (result.rows.length > 0) { + return { + inUse: true, + conflictingTask: result.rows[0] + }; + } + + return { inUse: false }; + } + + /** + * Assign a proxy IP to a task + * Should be called after claiming task and selecting proxy, before executing + * + * @param taskId - The task to assign IP to + * @param proxyIp - The proxy IP being used + * @returns true if assigned successfully + */ + async assignProxyIp(taskId: number, proxyIp: string): Promise { + const result = await pool.query( + `UPDATE worker_tasks + SET proxy_ip = $2, updated_at = NOW() + WHERE id = $1 AND status IN ('claimed', 'running') + RETURNING id`, + [taskId, proxyIp] + ); + return result.rowCount !== null && result.rowCount > 0; + } + + /** + * Clear proxy IP from a task (on completion or release) + */ + async clearProxyIp(taskId: number): Promise { + await pool.query( + `UPDATE worker_tasks SET proxy_ip = NULL WHERE id = $1`, + [taskId] + ); + } + + /** + * Get list of proxy IPs currently in use for a platform + * Useful for proxy selection to avoid conflicts + * + * @param platform - The platform to check + * @returns Array of proxy IPs currently in use + */ + async getActiveProxyIps(platform: string): Promise { + const result = await pool.query( + `SELECT DISTINCT proxy_ip + FROM worker_tasks + WHERE platform = $1 + AND status IN ('claimed', 'running') + AND proxy_ip IS NOT NULL`, + [platform] + ); + return result.rows.map(r => r.proxy_ip); + } } export const taskService = new TaskService(); diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 1b1482ba..a59b298d 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -292,6 +292,7 @@ export class TaskWorker { private heartbeatInterval: NodeJS.Timeout | null = null; private registryHeartbeatInterval: NodeJS.Timeout | null = null; private staleCleanupInterval: NodeJS.Timeout | null = null; + private reconciliationInterval: NodeJS.Timeout | null = null; private crawlRotator: CrawlRotator; // ========================================================================== @@ -1379,7 +1380,7 @@ export class TaskWorker { private async runStaleTaskCleanup(): Promise { try { console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`); - const cleanupResult = await taskService.cleanupStaleTasks(30); // 30 minute threshold + const cleanupResult = await taskService.cleanupStaleTasks(3); // 3 minute threshold - fast release of orphaned tasks if (cleanupResult.cleaned > 0) { console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`); } @@ -1410,6 +1411,78 @@ export class TaskWorker { } } + // ========================================================================== + // TASK RECONCILIATION + // ========================================================================== + // Periodically reconcile worker's in-memory activeTasks with DB state. + // This catches orphaned tasks where DB says worker has task but worker + // doesn't know about it (memory lost, crash recovery, etc.) + // ========================================================================== + + /** + * Start periodic task reconciliation + */ + private startPeriodicReconciliation(): void { + const RECONCILIATION_INTERVAL_MS = 30 * 1000; // 30 seconds + this.reconciliationInterval = setInterval(async () => { + await this.reconcileActiveTasks(); + }, RECONCILIATION_INTERVAL_MS); + console.log(`[TaskWorker] ${this.friendlyName} started periodic task reconciliation (every 30s)`); + } + + /** + * Stop periodic task reconciliation + */ + private stopPeriodicReconciliation(): void { + if (this.reconciliationInterval) { + clearInterval(this.reconciliationInterval); + this.reconciliationInterval = null; + } + } + + /** + * Reconcile in-memory activeTasks with DB state + * + * This ensures worker's view matches DB truth: + * - Tasks in memory but not in DB → remove from memory (someone else took it) + * - Tasks in DB but not in memory → ORPHAN - release back to pool + */ + private async reconcileActiveTasks(): Promise { + try { + // Get all tasks DB says this worker has claimed/running + const dbTasks = await taskService.getWorkerActiveTasks(this.workerId); + const dbTaskIds = new Set(dbTasks.map(t => t.id)); + const memoryTaskIds = new Set(this.activeTasks.keys()); + + // Tasks in memory but not in DB → remove from tracking + for (const taskId of memoryTaskIds) { + if (!dbTaskIds.has(taskId)) { + console.warn(`[TaskWorker] ${this.friendlyName} RECONCILIATION: Task ${taskId} in memory but not in DB - removing from tracking`); + this.activeTasks.delete(taskId); + this.taskPromises.delete(taskId); + this.clearTaskStep(taskId); + } + } + + // Tasks in DB but not in memory → ORPHAN - release back to pool + for (const dbTask of dbTasks) { + if (!memoryTaskIds.has(dbTask.id)) { + console.error(`[TaskWorker] ${this.friendlyName} RECONCILIATION: Task ${dbTask.id} claimed in DB but not tracked - releasing as orphan`); + await taskService.releaseTaskWithReason(dbTask.id, `Orphan: worker ${this.workerId} has no memory of task`); + } + } + + // Log reconciliation result if any issues found + const orphansFound = dbTasks.filter(t => !memoryTaskIds.has(t.id)).length; + const staleMemory = Array.from(memoryTaskIds).filter(id => !dbTaskIds.has(id)).length; + if (orphansFound > 0 || staleMemory > 0) { + console.log(`[TaskWorker] ${this.friendlyName} RECONCILIATION complete: ${orphansFound} orphans released, ${staleMemory} stale memory entries removed`); + } + } catch (error: any) { + console.error(`[TaskWorker] ${this.friendlyName} reconciliation error:`, error.message); + } + } + /** * Start the worker loop * @@ -1435,6 +1508,9 @@ export class TaskWorker { // Start periodic cleanup every 10 minutes this.startPeriodicStaleCleanup(); + // Start periodic reconciliation every 30 seconds + this.startPeriodicReconciliation(); + const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)'; console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (stealth=lazy, max ${this.maxConcurrentTasks} concurrent tasks)`); @@ -1617,6 +1693,16 @@ export class TaskWorker { if (task) { console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`); + // ================================================================= + // CLAIM VERIFICATION - Ensure DB state matches before tracking + // Prevents orphaned tasks when claim succeeds in DB but worker loses track + // ================================================================= + const claimVerified = await taskService.verifyTaskClaim(task.id, this.workerId); + if (!claimVerified) { + console.error(`[TaskWorker] ${this.friendlyName} CLAIM VERIFICATION FAILED for task ${task.id} - DB state mismatch, skipping`); + return; // Don't track this task - something went wrong + } + // ================================================================= // PREFLIGHT CHECK - Use stored preflight results based on task method // We already ran dual-transport preflights at startup, so just verify @@ -1655,16 +1741,41 @@ export class TaskWorker { console.log(`[TaskWorker] ${this.friendlyName} preflight verified for task ${task.id}: ${preflightMsg}`); + // ================================================================= + // IP-PER-STORE CHECK - Prevent same IP hitting multiple stores + // ================================================================= + const currentProxyIp = this.preflightHttpResult?.proxyIp || this.preflightCurlResult?.proxyIp; + if (currentProxyIp && task.dispensary_id && task.platform) { + const ipConflict = await taskService.checkProxyIpConflict( + currentProxyIp, + task.platform, + task.dispensary_id + ); + + if (ipConflict.inUse) { + console.log(`[TaskWorker] ${this.friendlyName} IP CONFLICT: ${currentProxyIp} already hitting ${ipConflict.conflictingTask?.dispensary_name || ipConflict.conflictingTask?.dispensary_id} on ${task.platform}`); + console.log(`[TaskWorker] Releasing task ${task.id} back to pending - will retry with different timing`); + await taskService.releaseTask(task.id); + await this.sleep(5000); // Short wait before trying another task + return; + } + + // No conflict - assign IP to this task + await taskService.assignProxyIp(task.id, currentProxyIp); + } + this.activeTasks.set(task.id, task); // Start task in background (don't await) const taskPromise = this.executeTask(task); this.taskPromises.set(task.id, taskPromise); - // Clean up when done - taskPromise.finally(() => { + // Clean up when done (including proxy IP) + taskPromise.finally(async () => { this.activeTasks.delete(task.id); this.taskPromises.delete(task.id); + // Clear proxy IP so other tasks can use it + await taskService.clearProxyIp(task.id).catch(() => {}); }); // Immediately try to claim more tasks (don't wait for poll interval) @@ -1826,6 +1937,28 @@ export class TaskWorker { // Mark task as running await WorkerSession.startTask(nextTask.task_id, this.workerId); + // ================================================================= + // IP-PER-STORE CHECK - Prevent same IP hitting multiple stores + // ================================================================= + const currentProxyIp = this.preflightHttpResult?.proxyIp || this.currentSession?.ip_address; + if (currentProxyIp && workerTask.dispensary_id && workerTask.platform) { + const ipConflict = await taskService.checkProxyIpConflict( + currentProxyIp, + workerTask.platform, + workerTask.dispensary_id + ); + + if (ipConflict.inUse) { + console.log(`[TaskWorker] ${this.friendlyName} IP CONFLICT: ${currentProxyIp} already hitting ${ipConflict.conflictingTask?.dispensary_name} on ${workerTask.platform}`); + console.log(`[TaskWorker] Skipping task ${nextTask.task_id} - will retry later`); + await this.sleep(5000); + return; + } + + // No conflict - assign IP to this task + await taskService.assignProxyIp(nextTask.task_id, currentProxyIp); + } + console.log(`[TaskWorker] ${this.friendlyName} starting task ${nextTask.task_id} (${nextTask.role}) for ${nextTask.dispensary_name}`); this.activeTasks.set(nextTask.task_id, workerTask); @@ -1834,10 +1967,11 @@ export class TaskWorker { const taskPromise = this.executeSessionTask(workerTask); this.taskPromises.set(nextTask.task_id, taskPromise); - // Cleanup when done - taskPromise.finally(() => { + // Cleanup when done (including proxy IP) + taskPromise.finally(async () => { this.activeTasks.delete(nextTask.task_id); this.taskPromises.delete(nextTask.task_id); + await taskService.clearProxyIp(nextTask.task_id).catch(() => {}); }); } @@ -2020,6 +2154,28 @@ export class TaskWorker { updated_at: new Date(), }; + // ================================================================= + // IP-PER-STORE CHECK - Prevent same IP hitting multiple stores + // ================================================================= + const currentProxyIp = this.preflightHttpResult?.proxyIp; + if (currentProxyIp && workerTask.dispensary_id && workerTask.platform) { + const ipConflict = await taskService.checkProxyIpConflict( + currentProxyIp, + workerTask.platform, + workerTask.dispensary_id + ); + + if (ipConflict.inUse) { + console.log(`[TaskWorker] ${this.friendlyName} IP CONFLICT: ${currentProxyIp} already hitting ${ipConflict.conflictingTask?.dispensary_name} on ${workerTask.platform}`); + console.log(`[TaskWorker] Skipping task ${nextTask.task_id} - will retry later`); + await this.sleep(5000); + return; + } + + // No conflict - assign IP to this task + await taskService.assignProxyIp(nextTask.task_id, currentProxyIp); + } + console.log(`[TaskWorker] ${this.friendlyName} starting task ${nextTask.task_id} (${nextTask.role}) for ${nextTask.dispensary_name}`); this.activeTasks.set(nextTask.task_id, workerTask); @@ -2028,12 +2184,13 @@ export class TaskWorker { const taskPromise = this.executePoolTask(workerTask); this.taskPromises.set(nextTask.task_id, taskPromise); - // Cleanup when done - taskPromise.finally(() => { + // Cleanup when done (including proxy IP) + taskPromise.finally(async () => { this.activeTasks.delete(nextTask.task_id); this.taskPromises.delete(nextTask.task_id); // Remove from poolTasks this.poolTasks = this.poolTasks.filter(t => t.task_id !== nextTask.task_id); + await taskService.clearProxyIp(nextTask.task_id).catch(() => {}); }); } @@ -2091,8 +2248,23 @@ export class TaskWorker { this.clearTaskStep(task.id); if (result.success) { - await taskService.completeTask(task.id, { result: result.data }); - console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED`); + // Layer 1: Verify DB completion was recorded + const dbVerified = await taskService.completeTask(task.id, { result: result.data }); + if (!dbVerified) { + console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} DB COMPLETION VERIFICATION FAILED`); + await taskService.failTask(task.id, 'DB completion verification failed'); + return; + } + + // Layer 2: Verify output exists for payload-creating tasks + const outputVerification = await taskService.verifyTaskOutput(task); + if (!outputVerification.verified) { + console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} OUTPUT VERIFICATION FAILED: ${outputVerification.reason}`); + await taskService.failTask(task.id, `Output verification failed: ${outputVerification.reason}`); + return; + } + + console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED (verified)`); } else { await taskService.failTask(task.id, result.error || 'Unknown error'); console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} FAILED: ${result.error}`); @@ -2140,8 +2312,18 @@ export class TaskWorker { this.clearTaskStep(task.id); if (result.success) { + // Use WorkerSession completion but also verify with taskService await WorkerSession.completeTask(task.id, this.workerId, { result: result.data }); - console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED`); + + // Verify output exists for payload-creating tasks + const outputVerification = await taskService.verifyTaskOutput(task); + if (!outputVerification.verified) { + console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} OUTPUT VERIFICATION FAILED: ${outputVerification.reason}`); + await WorkerSession.failTask(task.id, this.workerId, `Output verification failed: ${outputVerification.reason}`); + return; + } + + console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED (verified)`); } else { await WorkerSession.failTask(task.id, this.workerId, result.error); console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} FAILED: ${result.error}`); @@ -2161,6 +2343,7 @@ export class TaskWorker { this.stopHeartbeat(); this.stopRegistryHeartbeat(); this.stopPeriodicStaleCleanup(); + this.stopPeriodicReconciliation(); // Clean up session pool state if using new system if (USE_SESSION_POOL && this.currentSession) { @@ -2217,10 +2400,30 @@ export class TaskWorker { // Clear step tracking this.clearTaskStep(task.id); - // Mark as completed - await taskService.completeTask(task.id, result); + // ================================================================= + // COMPLETION VERIFICATION - Ensure task actually completed + // ================================================================= + + // Layer 1: Verify DB completion was recorded + const dbVerified = await taskService.completeTask(task.id, result); + if (!dbVerified) { + console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} DB COMPLETION VERIFICATION FAILED`); + await taskService.failTask(task.id, 'DB completion verification failed'); + await this.reportTaskCompletion(false); + return; + } + + // Layer 2: Verify output exists for payload-creating tasks + const outputVerification = await taskService.verifyTaskOutput(task); + if (!outputVerification.verified) { + console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} OUTPUT VERIFICATION FAILED: ${outputVerification.reason}`); + await taskService.failTask(task.id, `Output verification failed: ${outputVerification.reason}`); + await this.reportTaskCompletion(false); + return; + } + await this.reportTaskCompletion(true); - console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id} [${this.activeTasks.size}/${this.maxConcurrentTasks} active]`); + console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id} (verified) [${this.activeTasks.size}/${this.maxConcurrentTasks} active]`); // Track identity task count (for identity pool rotation) if (USE_IDENTITY_POOL) { diff --git a/backend/src/utils/payload-storage.ts b/backend/src/utils/payload-storage.ts index 7372ded7..307cff36 100644 --- a/backend/src/utils/payload-storage.ts +++ b/backend/src/utils/payload-storage.ts @@ -102,28 +102,40 @@ export interface LoadPayloadResult { }; } +/** + * Generate a short hash from task ID for filename inclusion + * Uses last 5 chars of base36 representation for brevity + */ +function taskIdHash(taskId: number | null): string { + if (!taskId) return ''; + // Convert to base36 and take last 5 chars for short but unique identifier + return taskId.toString(36).slice(-5).padStart(5, '0'); +} + /** * Generate storage path/key for a payload * - * MinIO format: payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz - * Local format: ./storage/payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz + * MinIO format: payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_t{task_hash}_{timestamp}.json.gz + * Local format: ./storage/payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_t{task_hash}_{timestamp}.json.gz * * Platform defaults to 'dutchie' for backward compatibility + * Task ID is encoded as a 5-char base36 hash for traceability */ -function generateStoragePath(dispensaryId: number, timestamp: Date, platform: string = 'dutchie'): string { +function generateStoragePath(dispensaryId: number, timestamp: Date, platform: string = 'dutchie', taskId: number | null = null): string { const year = timestamp.getFullYear(); const month = String(timestamp.getMonth() + 1).padStart(2, '0'); const day = String(timestamp.getDate()).padStart(2, '0'); const ts = timestamp.getTime(); + const taskHash = taskId ? `_t${taskIdHash(taskId)}` : ''; - const relativePath = `payloads/${platform}/${year}/${month}/${day}/store_${dispensaryId}_${ts}.json.gz`; + const relativePath = `payloads/${platform}/${year}/${month}/${day}/store_${dispensaryId}${taskHash}_${ts}.json.gz`; if (useMinIO) { // MinIO uses forward slashes, no leading slash return relativePath; } else { // Local filesystem uses OS-specific path - return path.join(PAYLOAD_BASE_PATH, platform, String(year), month, day, `store_${dispensaryId}_${ts}.json.gz`); + return path.join(PAYLOAD_BASE_PATH, platform, String(year), month, day, `store_${dispensaryId}${taskHash}_${ts}.json.gz`); } } @@ -151,6 +163,7 @@ function calculateChecksum(data: Buffer): string { * @param crawlRunId - Optional crawl_run ID for linking * @param productCount - Number of products in payload * @param platform - Platform identifier ('dutchie' | 'jane'), defaults to 'dutchie' + * @param taskId - Optional task ID for traceability in filename * @returns SavePayloadResult with file info and DB record ID */ export async function saveRawPayload( @@ -159,10 +172,11 @@ export async function saveRawPayload( payload: any, crawlRunId: number | null = null, productCount: number = 0, - platform: string = 'dutchie' + platform: string = 'dutchie', + taskId: number | null = null ): Promise { const timestamp = new Date(); - const storagePath = generateStoragePath(dispensaryId, timestamp, platform); + const storagePath = generateStoragePath(dispensaryId, timestamp, platform, taskId); // Serialize and compress const jsonStr = JSON.stringify(payload); diff --git a/cannaiq/src/lib/api.ts b/cannaiq/src/lib/api.ts index d0e86e28..5dd8a06c 100755 --- a/cannaiq/src/lib/api.ts +++ b/cannaiq/src/lib/api.ts @@ -3140,6 +3140,8 @@ export interface TaskSchedule { interval_hours: number; priority: number; state_code: string | null; + dispensary_id: number | null; + dispensary_name: string | null; pool_id: number | null; pool_name: string | null; platform: string | null; diff --git a/cannaiq/src/pages/TasksDashboard.tsx b/cannaiq/src/pages/TasksDashboard.tsx index 94392667..fa200fa5 100644 --- a/cannaiq/src/pages/TasksDashboard.tsx +++ b/cannaiq/src/pages/TasksDashboard.tsx @@ -708,6 +708,15 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo )} + {/* Show dispensary if this is a per-store schedule */} + {schedule?.dispensary_id && ( +
+
+ Store: {schedule.dispensary_name || `ID ${schedule.dispensary_id}`} +
+
+ )} +
- + {task.dispensary?.name?.split(' ').slice(0, 2).join(' ') || task.role.replace(/_/g, ' ')} + + #{task.task_id} + {formatSecondsToTime(task.running_seconds)}