From b69d03c02f180408fa9d73346d33d0cf72308e78 Mon Sep 17 00:00:00 2001 From: Kelly Date: Sat, 13 Dec 2025 00:43:00 -0700 Subject: [PATCH] feat: Add stage checkpoints to task handlers and fix worker name display MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage checkpoints (observational, non-blocking): - product_refresh: success → 'production', failure tracking → 'failing' after 3 - product_discovery: success → 'hydrating', failure tracking - entry_point_discovery: success → 'promoted', failure tracking Worker name fix: - Join worker_registry in tasks query to get friendly_name directly - Update TasksDashboard to use worker_name from joined query - Fallback to registry lookup then pod ID suffix 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../tasks/handlers/entry-point-discovery.ts | 43 ++++++++++++++++-- .../tasks/handlers/product-discovery-http.ts | 45 +++++++++++++++++++ backend/src/tasks/handlers/product-refresh.ts | 45 ++++++++++++++++++- backend/src/tasks/task-service.ts | 4 +- cannaiq/src/pages/TasksDashboard.tsx | 16 ++++--- 5 files changed, 141 insertions(+), 12 deletions(-) diff --git a/backend/src/tasks/handlers/entry-point-discovery.ts b/backend/src/tasks/handlers/entry-point-discovery.ts index ede002ea..ea2d3c43 100644 --- a/backend/src/tasks/handlers/entry-point-discovery.ts +++ b/backend/src/tasks/handlers/entry-point-discovery.ts @@ -410,7 +410,8 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise= 3 THEN 'failing' + ELSE stage + END, updated_at = NOW(), last_modified_at = NOW(), last_modified_by_task = $5, last_modified_task_id = $6 WHERE id = $1 + RETURNING consecutive_failures, stage `, [dispensaryId, lastHttpStatus, failureStatus, failureReason, task.role, task.id]); + if (failureResult.rows[0]) { + const { consecutive_failures, stage } = failureResult.rows[0]; + console.log(`[EntryPointDiscovery] Failure tracked: ${consecutive_failures} consecutive failures, stage: ${stage}`); + } + return { success: false, error: `Hard failure: ${failureReason}`, @@ -441,17 +454,30 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise= 3 THEN 'failing' + ELSE stage + END, last_modified_at = NOW(), last_modified_by_task = $3, last_modified_task_id = $4 WHERE id = $1 + RETURNING consecutive_failures, stage `, [dispensaryId, errorMessage, task.role, task.id]); + if (failureResult.rows[0]) { + const { consecutive_failures, stage } = failureResult.rows[0]; + console.log(`[EntryPointDiscovery] Failure tracked: ${consecutive_failures} consecutive failures, stage: ${stage}`); + } + return { success: false, error: errorMessage, @@ -475,6 +501,8 @@ async function updateDispensaryWithPlatformId( source: string, slug: string ): Promise { + // Update dispensary with platform ID and stage checkpoint + // Stage transitions: discovered/validated → promoted (ready for crawling) await pool.query(` UPDATE dispensaries SET @@ -483,6 +511,13 @@ async function updateDispensaryWithPlatformId( crawl_enabled = true, id_resolution_status = 'resolved', id_resolution_error = NULL, + stage = CASE + WHEN stage IN ('discovered', 'validated') THEN 'promoted' + WHEN stage = 'failing' THEN 'promoted' + ELSE stage + END, + consecutive_successes = COALESCE(consecutive_successes, 0) + 1, + consecutive_failures = 0, updated_at = NOW(), last_modified_at = NOW(), last_modified_by_task = $3, @@ -490,7 +525,7 @@ async function updateDispensaryWithPlatformId( WHERE id = $1 `, [dispensaryId, platformId, task.role, task.id]); - console.log(`[EntryPointDiscovery] Updated dispensary ${dispensaryId} with platform ID (source: ${source})`); + console.log(`[EntryPointDiscovery] Updated dispensary ${dispensaryId} with platform ID (source: ${source}) - stage checkpoint: promoted`); // Queue product_discovery task await pool.query(` diff --git a/backend/src/tasks/handlers/product-discovery-http.ts b/backend/src/tasks/handlers/product-discovery-http.ts index 5c3f2abc..a0a12bd0 100644 --- a/backend/src/tasks/handlers/product-discovery-http.ts +++ b/backend/src/tasks/handlers/product-discovery-http.ts @@ -349,6 +349,25 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise= 3 THEN 'failing' + ELSE stage + END + WHERE id = $1 + RETURNING consecutive_failures, stage + `, [dispensaryId]); + + if (failureResult.rows[0]) { + const { consecutive_failures, stage } = failureResult.rows[0]; + console.log(`[ProductDiscoveryHTTP] Failure tracked: ${consecutive_failures} consecutive failures, stage: ${stage}`); + } + } catch (trackError) { + // Don't let tracking errors mask the original error + console.error(`[ProductDiscoveryHTTP] Failed to track failure:`, trackError); + } + return { success: false, error: errorMessage, diff --git a/backend/src/tasks/handlers/product-refresh.ts b/backend/src/tasks/handlers/product-refresh.ts index 568b56db..53cfae81 100644 --- a/backend/src/tasks/handlers/product-refresh.ts +++ b/backend/src/tasks/handlers/product-refresh.ts @@ -326,7 +326,24 @@ export async function handleProductRefresh(ctx: TaskContext): Promise= 3 THEN 'failing' + ELSE stage + END + WHERE id = $1 + RETURNING consecutive_failures, stage + `, [dispensaryId]); + + if (failureResult.rows[0]) { + const { consecutive_failures, stage } = failureResult.rows[0]; + console.log(`[ProductRefresh] Failure tracked: ${consecutive_failures} consecutive failures, stage: ${stage}`); + } + } catch (trackError) { + // Don't let tracking errors mask the original error + console.error(`[ProductRefresh] Failed to track failure:`, trackError); + } + return { success: false, error: errorMessage, diff --git a/backend/src/tasks/task-service.ts b/backend/src/tasks/task-service.ts index f1f23b4e..94a1775b 100644 --- a/backend/src/tasks/task-service.ts +++ b/backend/src/tasks/task-service.ts @@ -395,9 +395,11 @@ class TaskService { `SELECT t.*, d.name as dispensary_name, - d.slug as dispensary_slug + d.slug as dispensary_slug, + w.friendly_name as worker_name FROM worker_tasks t LEFT JOIN dispensaries d ON d.id = t.dispensary_id + LEFT JOIN worker_registry w ON w.worker_id = t.worker_id ${whereClause} ORDER BY t.created_at DESC LIMIT ${limit} OFFSET ${offset}`, diff --git a/cannaiq/src/pages/TasksDashboard.tsx b/cannaiq/src/pages/TasksDashboard.tsx index adaf83ec..169a5f02 100644 --- a/cannaiq/src/pages/TasksDashboard.tsx +++ b/cannaiq/src/pages/TasksDashboard.tsx @@ -39,6 +39,7 @@ interface Task { priority: number; scheduled_for: string | null; worker_id: string | null; + worker_name?: string | null; claimed_at: string | null; started_at: string | null; completed_at: string | null; @@ -894,12 +895,15 @@ export default function TasksDashboard() { return () => clearInterval(interval); }, [roleFilter, statusFilter]); - // Create worker name lookup map + // Create worker name lookup map (fallback for tasks without joined worker_name) const workerNameMap = new Map(workers.map(w => [w.worker_id, w.friendly_name])); - const getWorkerName = (workerId: string | null): string => { - if (!workerId) return '-'; - return workerNameMap.get(workerId) || workerId.split('-').pop() || workerId; + const getWorkerName = (task: Task): string => { + // Prefer worker_name from joined query + if (task.worker_name) return task.worker_name; + // Fallback to registry lookup + if (!task.worker_id) return '-'; + return workerNameMap.get(task.worker_id) || task.worker_id.split('-').pop() || task.worker_id; }; const getTaskDuration = (task: Task): string => { @@ -927,7 +931,7 @@ export default function TasksDashboard() { const filteredTasks = tasks.filter((task) => { if (searchQuery) { const query = searchQuery.toLowerCase(); - const workerName = getWorkerName(task.worker_id); + const workerName = getWorkerName(task); return ( task.role.toLowerCase().includes(query) || task.dispensary_name?.toLowerCase().includes(query) || @@ -1488,7 +1492,7 @@ export default function TasksDashboard() { - {getWorkerName(task.worker_id)} + {getWorkerName(task)} {getTaskDuration(task)}