feat: Idempotent entry_point_discovery with bulk endpoint
- Track id_resolution_status, attempts, and errors in handler - Add POST /api/tasks/batch/entry-point-discovery endpoint - Skip already-resolved stores, retry failed with force flag
This commit is contained in:
@@ -1243,6 +1243,89 @@ router.post('/batch/az-stores', async (req: Request, res: Response) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* POST /api/tasks/batch/entry-point-discovery
|
||||||
|
* Create entry_point_discovery tasks for stores missing platform_dispensary_id
|
||||||
|
*
|
||||||
|
* This is idempotent - stores that already have platform_dispensary_id are skipped.
|
||||||
|
* Only creates tasks for stores with menu_url set and crawl_enabled = true.
|
||||||
|
*
|
||||||
|
* Body (optional):
|
||||||
|
* - state_code: string (optional) - Filter by state code
|
||||||
|
* - stagger_seconds: number (default: 5) - Seconds between tasks
|
||||||
|
* - force: boolean (default: false) - Re-run even for previously failed stores
|
||||||
|
*/
|
||||||
|
router.post('/batch/entry-point-discovery', async (req: Request, res: Response) => {
|
||||||
|
try {
|
||||||
|
const {
|
||||||
|
state_code,
|
||||||
|
stagger_seconds = 5,
|
||||||
|
force = false,
|
||||||
|
} = req.body;
|
||||||
|
|
||||||
|
// Find stores that need entry point discovery
|
||||||
|
const storeResult = await pool.query(`
|
||||||
|
SELECT d.id, d.name, d.menu_url
|
||||||
|
FROM dispensaries d
|
||||||
|
JOIN states s ON d.state_id = s.id
|
||||||
|
WHERE d.crawl_enabled = true
|
||||||
|
AND d.menu_url IS NOT NULL
|
||||||
|
AND d.platform_dispensary_id IS NULL
|
||||||
|
${state_code ? 'AND s.code = $1' : ''}
|
||||||
|
${!force ? "AND (d.id_resolution_status IS NULL OR d.id_resolution_status = 'pending')" : ''}
|
||||||
|
-- No pending/running entry_point_discovery task already
|
||||||
|
AND NOT EXISTS (
|
||||||
|
SELECT 1 FROM worker_tasks t
|
||||||
|
WHERE t.dispensary_id = d.id
|
||||||
|
AND t.role = 'entry_point_discovery'
|
||||||
|
AND t.status IN ('pending', 'claimed', 'running')
|
||||||
|
)
|
||||||
|
ORDER BY d.id
|
||||||
|
`, state_code ? [state_code.toUpperCase()] : []);
|
||||||
|
|
||||||
|
const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id);
|
||||||
|
|
||||||
|
if (dispensaryIds.length === 0) {
|
||||||
|
return res.json({
|
||||||
|
success: true,
|
||||||
|
message: state_code
|
||||||
|
? `No ${state_code.toUpperCase()} stores need entry point discovery`
|
||||||
|
: 'No stores need entry point discovery',
|
||||||
|
tasks_created: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create staggered tasks
|
||||||
|
const taskIds: number[] = [];
|
||||||
|
for (let i = 0; i < dispensaryIds.length; i++) {
|
||||||
|
const scheduledFor = new Date(Date.now() + i * stagger_seconds * 1000);
|
||||||
|
const result = await pool.query(`
|
||||||
|
INSERT INTO worker_tasks (role, dispensary_id, priority, scheduled_for, method)
|
||||||
|
VALUES ('entry_point_discovery', $1, 10, $2, 'http')
|
||||||
|
RETURNING id
|
||||||
|
`, [dispensaryIds[i], scheduledFor]);
|
||||||
|
taskIds.push(result.rows[0].id);
|
||||||
|
}
|
||||||
|
|
||||||
|
const totalDuration = dispensaryIds.length * stagger_seconds;
|
||||||
|
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
|
||||||
|
|
||||||
|
res.json({
|
||||||
|
success: true,
|
||||||
|
tasks_created: taskIds.length,
|
||||||
|
task_ids: taskIds,
|
||||||
|
stores: storeResult.rows.map((r: { id: number; name: string }) => ({ id: r.id, name: r.name })),
|
||||||
|
stagger_seconds,
|
||||||
|
total_duration_seconds: totalDuration,
|
||||||
|
estimated_completion: estimatedEndTime.toISOString(),
|
||||||
|
message: `Created ${taskIds.length} entry_point_discovery tasks${state_code ? ` for ${state_code.toUpperCase()}` : ''}`,
|
||||||
|
});
|
||||||
|
} catch (error: unknown) {
|
||||||
|
console.error('Error creating entry point discovery tasks:', error);
|
||||||
|
res.status(500).json({ error: 'Failed to create entry point discovery tasks' });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// ============================================================
|
// ============================================================
|
||||||
// STATE-BASED CRAWL ENDPOINTS
|
// STATE-BASED CRAWL ENDPOINTS
|
||||||
// ============================================================
|
// ============================================================
|
||||||
|
|||||||
@@ -41,9 +41,16 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
|
|||||||
|
|
||||||
const dispensary = dispResult.rows[0];
|
const dispensary = dispResult.rows[0];
|
||||||
|
|
||||||
// If already has platform_dispensary_id, we're done
|
// If already has platform_dispensary_id, we're done (idempotent)
|
||||||
if (dispensary.platform_dispensary_id) {
|
if (dispensary.platform_dispensary_id) {
|
||||||
console.log(`[EntryPointDiscovery] Dispensary ${dispensaryId} already has platform ID: ${dispensary.platform_dispensary_id}`);
|
console.log(`[EntryPointDiscovery] Dispensary ${dispensaryId} already has platform ID: ${dispensary.platform_dispensary_id}`);
|
||||||
|
// Update last_id_resolution_at to show we checked it
|
||||||
|
await pool.query(`
|
||||||
|
UPDATE dispensaries
|
||||||
|
SET last_id_resolution_at = NOW(),
|
||||||
|
id_resolution_status = 'resolved'
|
||||||
|
WHERE id = $1
|
||||||
|
`, [dispensaryId]);
|
||||||
return {
|
return {
|
||||||
success: true,
|
success: true,
|
||||||
alreadyResolved: true,
|
alreadyResolved: true,
|
||||||
@@ -51,6 +58,15 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Increment attempt counter
|
||||||
|
await pool.query(`
|
||||||
|
UPDATE dispensaries
|
||||||
|
SET id_resolution_attempts = COALESCE(id_resolution_attempts, 0) + 1,
|
||||||
|
last_id_resolution_at = NOW(),
|
||||||
|
id_resolution_status = 'pending'
|
||||||
|
WHERE id = $1
|
||||||
|
`, [dispensaryId]);
|
||||||
|
|
||||||
const menuUrl = dispensary.menu_url;
|
const menuUrl = dispensary.menu_url;
|
||||||
if (!menuUrl) {
|
if (!menuUrl) {
|
||||||
return { success: false, error: `Dispensary ${dispensaryId} has no menu_url` };
|
return { success: false, error: `Dispensary ${dispensaryId} has no menu_url` };
|
||||||
@@ -114,7 +130,7 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
|
|||||||
|
|
||||||
console.log(`[EntryPointDiscovery] Failed to resolve ${slug}: ${reason}`);
|
console.log(`[EntryPointDiscovery] Failed to resolve ${slug}: ${reason}`);
|
||||||
|
|
||||||
// Mark as failed resolution but keep menu_type as dutchie
|
// Mark as failed resolution
|
||||||
await pool.query(`
|
await pool.query(`
|
||||||
UPDATE dispensaries
|
UPDATE dispensaries
|
||||||
SET
|
SET
|
||||||
@@ -123,9 +139,11 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
|
|||||||
WHEN $2 = 403 THEN 'blocked'
|
WHEN $2 = 403 THEN 'blocked'
|
||||||
ELSE 'dutchie'
|
ELSE 'dutchie'
|
||||||
END,
|
END,
|
||||||
|
id_resolution_status = 'failed',
|
||||||
|
id_resolution_error = $3,
|
||||||
updated_at = NOW()
|
updated_at = NOW()
|
||||||
WHERE id = $1
|
WHERE id = $1
|
||||||
`, [dispensaryId, result.httpStatus || 0]);
|
`, [dispensaryId, result.httpStatus || 0, reason]);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
success: false,
|
success: false,
|
||||||
@@ -149,6 +167,8 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
|
|||||||
platform_dispensary_id = $2,
|
platform_dispensary_id = $2,
|
||||||
menu_type = 'dutchie',
|
menu_type = 'dutchie',
|
||||||
crawl_enabled = true,
|
crawl_enabled = true,
|
||||||
|
id_resolution_status = 'resolved',
|
||||||
|
id_resolution_error = NULL,
|
||||||
updated_at = NOW()
|
updated_at = NOW()
|
||||||
WHERE id = $1
|
WHERE id = $1
|
||||||
`, [dispensaryId, platformId]);
|
`, [dispensaryId, platformId]);
|
||||||
|
|||||||
Reference in New Issue
Block a user