feat: Add stage checkpoints to task handlers and fix worker name display

Stage checkpoints (observational, non-blocking):
- product_refresh: success → 'production', failure tracking → 'failing' after 3
- product_discovery: success → 'hydrating', failure tracking
- entry_point_discovery: success → 'promoted', failure tracking

Worker name fix:
- Join worker_registry in tasks query to get friendly_name directly
- Update TasksDashboard to use worker_name from joined query
- Fallback to registry lookup then pod ID suffix

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-13 00:43:00 -07:00
parent 54f59c6082
commit b69d03c02f
5 changed files with 141 additions and 12 deletions

View File

@@ -410,7 +410,8 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
console.log(`[EntryPointDiscovery] ${isHardFailure ? 'HARD FAILURE' : 'Soft failure'}: ${failureReason}`);
await pool.query(`
// Stage checkpoint: track failures and potentially transition to 'failing'
const failureResult = await pool.query(`
UPDATE dispensaries
SET
menu_type = CASE
@@ -420,13 +421,25 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
END,
id_resolution_status = $3,
id_resolution_error = $4,
consecutive_failures = COALESCE(consecutive_failures, 0) + 1,
consecutive_successes = 0,
stage = CASE
WHEN COALESCE(consecutive_failures, 0) + 1 >= 3 THEN 'failing'
ELSE stage
END,
updated_at = NOW(),
last_modified_at = NOW(),
last_modified_by_task = $5,
last_modified_task_id = $6
WHERE id = $1
RETURNING consecutive_failures, stage
`, [dispensaryId, lastHttpStatus, failureStatus, failureReason, task.role, task.id]);
if (failureResult.rows[0]) {
const { consecutive_failures, stage } = failureResult.rows[0];
console.log(`[EntryPointDiscovery] Failure tracked: ${consecutive_failures} consecutive failures, stage: ${stage}`);
}
return {
success: false,
error: `Hard failure: ${failureReason}`,
@@ -441,17 +454,30 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[EntryPointDiscovery] Error for dispensary ${dispensaryId}:`, errorMessage);
// Mark as needs_investigation on unexpected errors
await pool.query(`
// Mark as needs_investigation on unexpected errors + track failures
// Stage checkpoint: after 3+ consecutive failures → 'failing'
const failureResult = await pool.query(`
UPDATE dispensaries
SET id_resolution_status = 'needs_investigation',
id_resolution_error = $2,
consecutive_failures = COALESCE(consecutive_failures, 0) + 1,
consecutive_successes = 0,
stage = CASE
WHEN COALESCE(consecutive_failures, 0) + 1 >= 3 THEN 'failing'
ELSE stage
END,
last_modified_at = NOW(),
last_modified_by_task = $3,
last_modified_task_id = $4
WHERE id = $1
RETURNING consecutive_failures, stage
`, [dispensaryId, errorMessage, task.role, task.id]);
if (failureResult.rows[0]) {
const { consecutive_failures, stage } = failureResult.rows[0];
console.log(`[EntryPointDiscovery] Failure tracked: ${consecutive_failures} consecutive failures, stage: ${stage}`);
}
return {
success: false,
error: errorMessage,
@@ -475,6 +501,8 @@ async function updateDispensaryWithPlatformId(
source: string,
slug: string
): Promise<void> {
// Update dispensary with platform ID and stage checkpoint
// Stage transitions: discovered/validated → promoted (ready for crawling)
await pool.query(`
UPDATE dispensaries
SET
@@ -483,6 +511,13 @@ async function updateDispensaryWithPlatformId(
crawl_enabled = true,
id_resolution_status = 'resolved',
id_resolution_error = NULL,
stage = CASE
WHEN stage IN ('discovered', 'validated') THEN 'promoted'
WHEN stage = 'failing' THEN 'promoted'
ELSE stage
END,
consecutive_successes = COALESCE(consecutive_successes, 0) + 1,
consecutive_failures = 0,
updated_at = NOW(),
last_modified_at = NOW(),
last_modified_by_task = $3,
@@ -490,7 +525,7 @@ async function updateDispensaryWithPlatformId(
WHERE id = $1
`, [dispensaryId, platformId, task.role, task.id]);
console.log(`[EntryPointDiscovery] Updated dispensary ${dispensaryId} with platform ID (source: ${source})`);
console.log(`[EntryPointDiscovery] Updated dispensary ${dispensaryId} with platform ID (source: ${source}) - stage checkpoint: promoted`);
// Queue product_discovery task
await pool.query(`

View File

@@ -349,6 +349,25 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
console.log(`[ProductDiscoveryHTTP] Queued product_refresh task for payload #${payloadResult.id}`);
// ============================================================
// STEP 8: Stage checkpoint - observational update
// Discovery success → hydrating (awaiting product_refresh completion)
// ============================================================
await pool.query(`
UPDATE dispensaries
SET
stage = CASE
WHEN stage IN ('promoted', 'sandbox') THEN 'hydrating'
WHEN stage = 'failing' THEN 'hydrating'
ELSE stage
END,
consecutive_successes = COALESCE(consecutive_successes, 0) + 1,
consecutive_failures = 0
WHERE id = $1
`, [dispensaryId]);
console.log(`[ProductDiscoveryHTTP] Stage checkpoint: hydrating`);
return {
success: true,
payloadId: payloadResult.id,
@@ -359,6 +378,32 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[ProductDiscoveryHTTP] Error for dispensary ${dispensaryId}:`, errorMessage);
// Stage checkpoint - track failures
// After 3+ consecutive failures, stage transitions to 'failing'
try {
const failureResult = await pool.query(`
UPDATE dispensaries
SET
consecutive_failures = COALESCE(consecutive_failures, 0) + 1,
consecutive_successes = 0,
stage = CASE
WHEN COALESCE(consecutive_failures, 0) + 1 >= 3 THEN 'failing'
ELSE stage
END
WHERE id = $1
RETURNING consecutive_failures, stage
`, [dispensaryId]);
if (failureResult.rows[0]) {
const { consecutive_failures, stage } = failureResult.rows[0];
console.log(`[ProductDiscoveryHTTP] Failure tracked: ${consecutive_failures} consecutive failures, stage: ${stage}`);
}
} catch (trackError) {
// Don't let tracking errors mask the original error
console.error(`[ProductDiscoveryHTTP] Failed to track failure:`, trackError);
}
return {
success: false,
error: errorMessage,

View File

@@ -326,7 +326,24 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
WHERE id = $1
`, [payloadId]);
console.log(`[ProductRefresh] Completed ${dispensary.name}`);
// ============================================================
// STEP 9: Stage checkpoint - observational update
// Reflects reality: successful refresh → production stage
// ============================================================
await pool.query(`
UPDATE dispensaries
SET
stage = CASE
WHEN stage IN ('promoted', 'sandbox', 'hydrating', 'failing') THEN 'production'
ELSE stage
END,
consecutive_successes = COALESCE(consecutive_successes, 0) + 1,
consecutive_failures = 0,
last_successful_crawl_at = NOW()
WHERE id = $1
`, [dispensaryId]);
console.log(`[ProductRefresh] Completed ${dispensary.name} - stage checkpoint: production`);
return {
success: true,
@@ -341,6 +358,32 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[ProductRefresh] Error for dispensary ${dispensaryId}:`, errorMessage);
// Stage checkpoint - track failures
// After 3+ consecutive failures, stage transitions to 'failing'
try {
const failureResult = await pool.query(`
UPDATE dispensaries
SET
consecutive_failures = COALESCE(consecutive_failures, 0) + 1,
consecutive_successes = 0,
stage = CASE
WHEN COALESCE(consecutive_failures, 0) + 1 >= 3 THEN 'failing'
ELSE stage
END
WHERE id = $1
RETURNING consecutive_failures, stage
`, [dispensaryId]);
if (failureResult.rows[0]) {
const { consecutive_failures, stage } = failureResult.rows[0];
console.log(`[ProductRefresh] Failure tracked: ${consecutive_failures} consecutive failures, stage: ${stage}`);
}
} catch (trackError) {
// Don't let tracking errors mask the original error
console.error(`[ProductRefresh] Failed to track failure:`, trackError);
}
return {
success: false,
error: errorMessage,

View File

@@ -395,9 +395,11 @@ class TaskService {
`SELECT
t.*,
d.name as dispensary_name,
d.slug as dispensary_slug
d.slug as dispensary_slug,
w.friendly_name as worker_name
FROM worker_tasks t
LEFT JOIN dispensaries d ON d.id = t.dispensary_id
LEFT JOIN worker_registry w ON w.worker_id = t.worker_id
${whereClause}
ORDER BY t.created_at DESC
LIMIT ${limit} OFFSET ${offset}`,

View File

@@ -39,6 +39,7 @@ interface Task {
priority: number;
scheduled_for: string | null;
worker_id: string | null;
worker_name?: string | null;
claimed_at: string | null;
started_at: string | null;
completed_at: string | null;
@@ -894,12 +895,15 @@ export default function TasksDashboard() {
return () => clearInterval(interval);
}, [roleFilter, statusFilter]);
// Create worker name lookup map
// Create worker name lookup map (fallback for tasks without joined worker_name)
const workerNameMap = new Map(workers.map(w => [w.worker_id, w.friendly_name]));
const getWorkerName = (workerId: string | null): string => {
if (!workerId) return '-';
return workerNameMap.get(workerId) || workerId.split('-').pop() || workerId;
const getWorkerName = (task: Task): string => {
// Prefer worker_name from joined query
if (task.worker_name) return task.worker_name;
// Fallback to registry lookup
if (!task.worker_id) return '-';
return workerNameMap.get(task.worker_id) || task.worker_id.split('-').pop() || task.worker_id;
};
const getTaskDuration = (task: Task): string => {
@@ -927,7 +931,7 @@ export default function TasksDashboard() {
const filteredTasks = tasks.filter((task) => {
if (searchQuery) {
const query = searchQuery.toLowerCase();
const workerName = getWorkerName(task.worker_id);
const workerName = getWorkerName(task);
return (
task.role.toLowerCase().includes(query) ||
task.dispensary_name?.toLowerCase().includes(query) ||
@@ -1488,7 +1492,7 @@ export default function TasksDashboard() {
</span>
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{getWorkerName(task.worker_id)}
{getWorkerName(task)}
</td>
<td className="px-4 py-3 text-sm font-mono text-gray-600">
{getTaskDuration(task)}