feat: Worker improvements and Run Now duplicate prevention
- Fix Run Now to prevent duplicate task creation - Add loading state to Run Now button in UI - Return early when no stores need refresh - Worker dashboard improvements - Browser pooling architecture updates - K8s worker config updates (8 replicas, 3 concurrent tasks)
This commit is contained in:
@@ -526,14 +526,17 @@ router.delete('/schedules/:id', async (req: Request, res: Response) => {
|
||||
/**
|
||||
* POST /api/tasks/schedules/:id/run-now
|
||||
* Manually trigger a scheduled task to run immediately
|
||||
*
|
||||
* For product_discovery schedules with state_code, this creates individual
|
||||
* tasks for each store in that state (fans out properly).
|
||||
*/
|
||||
router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const scheduleId = parseInt(req.params.id, 10);
|
||||
|
||||
// Get the schedule
|
||||
// Get the full schedule
|
||||
const scheduleResult = await pool.query(`
|
||||
SELECT id, name, role, state_code, platform, priority
|
||||
SELECT id, name, role, state_code, platform, priority, interval_hours, method
|
||||
FROM task_schedules WHERE id = $1
|
||||
`, [scheduleId]);
|
||||
|
||||
@@ -542,27 +545,80 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
|
||||
}
|
||||
|
||||
const schedule = scheduleResult.rows[0];
|
||||
let tasksCreated = 0;
|
||||
|
||||
// Create a task based on the schedule
|
||||
const task = await taskService.createTask({
|
||||
role: schedule.role,
|
||||
platform: schedule.platform,
|
||||
priority: schedule.priority + 10, // Boost priority for manual runs
|
||||
});
|
||||
// For product_discovery with state_code, fan out to individual stores
|
||||
if (schedule.role === 'product_discovery' && schedule.state_code) {
|
||||
// Find stores in this state needing refresh
|
||||
const storeResult = await pool.query(`
|
||||
SELECT d.id
|
||||
FROM dispensaries d
|
||||
JOIN states s ON d.state_id = s.id
|
||||
WHERE d.crawl_enabled = true
|
||||
AND d.platform_dispensary_id IS NOT NULL
|
||||
AND s.code = $1
|
||||
-- No pending/running product_discovery task already
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM worker_tasks t
|
||||
WHERE t.dispensary_id = d.id
|
||||
AND t.role = 'product_discovery'
|
||||
AND t.status IN ('pending', 'claimed', 'running')
|
||||
)
|
||||
ORDER BY d.last_fetch_at NULLS FIRST, d.id
|
||||
`, [schedule.state_code]);
|
||||
|
||||
const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id);
|
||||
|
||||
if (dispensaryIds.length > 0) {
|
||||
// Create staggered tasks for all stores
|
||||
const result = await taskService.createStaggeredTasks(
|
||||
dispensaryIds,
|
||||
'product_discovery',
|
||||
15, // 15 seconds stagger
|
||||
schedule.platform || 'dutchie',
|
||||
schedule.method || 'http'
|
||||
);
|
||||
tasksCreated = result.created;
|
||||
} else {
|
||||
// No stores need refresh - return early with message
|
||||
return res.json({
|
||||
success: true,
|
||||
message: `No ${schedule.state_code} stores need refresh at this time`,
|
||||
tasksCreated: 0,
|
||||
stateCode: schedule.state_code,
|
||||
});
|
||||
}
|
||||
} else if (schedule.role !== 'product_discovery') {
|
||||
// For other schedules (store_discovery, analytics_refresh), create a single task
|
||||
await taskService.createTask({
|
||||
role: schedule.role,
|
||||
platform: schedule.platform,
|
||||
priority: schedule.priority + 10,
|
||||
method: schedule.method,
|
||||
});
|
||||
tasksCreated = 1;
|
||||
} else {
|
||||
// product_discovery without state_code - shouldn't happen, reject
|
||||
return res.status(400).json({
|
||||
error: 'product_discovery schedules require a state_code',
|
||||
});
|
||||
}
|
||||
|
||||
// Update last_run_at on the schedule
|
||||
await pool.query(`
|
||||
UPDATE task_schedules
|
||||
SET last_run_at = NOW(),
|
||||
next_run_at = NOW() + (interval_hours || ' hours')::interval,
|
||||
last_task_count = $2,
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
`, [scheduleId]);
|
||||
`, [scheduleId, tasksCreated]);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: `Schedule "${schedule.name}" triggered`,
|
||||
task,
|
||||
tasksCreated,
|
||||
stateCode: schedule.state_code,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error running schedule:', error);
|
||||
@@ -1187,6 +1243,142 @@ router.post('/batch/az-stores', async (req: Request, res: Response) => {
|
||||
}
|
||||
});
|
||||
|
||||
// ============================================================
|
||||
// STATE-BASED CRAWL ENDPOINTS
|
||||
// ============================================================
|
||||
|
||||
/**
|
||||
* POST /api/tasks/crawl-state/:stateCode
|
||||
* Create product_discovery tasks for all stores in a state
|
||||
*
|
||||
* This is the primary endpoint for triggering crawls by state.
|
||||
* Creates staggered tasks for all crawl-enabled stores in the specified state.
|
||||
*
|
||||
* Params:
|
||||
* - stateCode: State code (e.g., 'AZ', 'CA', 'CO')
|
||||
*
|
||||
* Body (optional):
|
||||
* - stagger_seconds: number (default: 15) - Seconds between each task
|
||||
* - priority: number (default: 10) - Task priority
|
||||
* - method: 'curl' | 'http' | null (default: 'http')
|
||||
*
|
||||
* Returns:
|
||||
* - tasks_created: Number of tasks created
|
||||
* - stores_in_state: Total stores found for the state
|
||||
* - skipped: Number skipped (already have active tasks)
|
||||
*/
|
||||
router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const stateCode = req.params.stateCode.toUpperCase();
|
||||
const {
|
||||
stagger_seconds = 15,
|
||||
priority = 10,
|
||||
method = 'http',
|
||||
} = req.body;
|
||||
|
||||
// Verify state exists
|
||||
const stateResult = await pool.query(`
|
||||
SELECT id, code, name FROM states WHERE code = $1
|
||||
`, [stateCode]);
|
||||
|
||||
if (stateResult.rows.length === 0) {
|
||||
return res.status(404).json({
|
||||
error: 'State not found',
|
||||
state_code: stateCode,
|
||||
});
|
||||
}
|
||||
|
||||
const state = stateResult.rows[0];
|
||||
|
||||
// Get all crawl-enabled dispensaries in this state
|
||||
const dispensariesResult = await pool.query(`
|
||||
SELECT d.id, d.name
|
||||
FROM dispensaries d
|
||||
WHERE d.state_id = $1
|
||||
AND d.crawl_enabled = true
|
||||
AND d.platform_dispensary_id IS NOT NULL
|
||||
ORDER BY d.last_fetch_at NULLS FIRST, d.id
|
||||
`, [state.id]);
|
||||
|
||||
if (dispensariesResult.rows.length === 0) {
|
||||
return res.status(200).json({
|
||||
success: true,
|
||||
message: `No crawl-enabled stores found in ${state.name}`,
|
||||
state_code: stateCode,
|
||||
state_name: state.name,
|
||||
tasks_created: 0,
|
||||
stores_in_state: 0,
|
||||
});
|
||||
}
|
||||
|
||||
const dispensaryIds = dispensariesResult.rows.map((d: { id: number }) => d.id);
|
||||
|
||||
// Create staggered tasks
|
||||
const result = await taskService.createStaggeredTasks(
|
||||
dispensaryIds,
|
||||
'product_discovery',
|
||||
stagger_seconds,
|
||||
'dutchie',
|
||||
method
|
||||
);
|
||||
|
||||
const totalDuration = (result.created - 1) * stagger_seconds;
|
||||
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
|
||||
|
||||
res.status(201).json({
|
||||
success: true,
|
||||
state_code: stateCode,
|
||||
state_name: state.name,
|
||||
tasks_created: result.created,
|
||||
stores_in_state: dispensariesResult.rows.length,
|
||||
skipped: dispensariesResult.rows.length - result.created,
|
||||
stagger_seconds,
|
||||
total_duration_seconds: totalDuration,
|
||||
estimated_completion: estimatedEndTime.toISOString(),
|
||||
message: `Created ${result.created} product_discovery tasks for ${state.name} (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error creating state crawl tasks:', error);
|
||||
res.status(500).json({ error: 'Failed to create state crawl tasks' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/tasks/states
|
||||
* List all states with their store counts and crawl status
|
||||
*/
|
||||
router.get('/states', async (_req: Request, res: Response) => {
|
||||
try {
|
||||
const result = await pool.query(`
|
||||
SELECT
|
||||
s.code,
|
||||
s.name,
|
||||
COUNT(d.id)::int as total_stores,
|
||||
COUNT(d.id) FILTER (WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NOT NULL)::int as crawl_enabled_stores,
|
||||
COUNT(d.id) FILTER (WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NULL)::int as missing_platform_id,
|
||||
MAX(d.last_fetch_at) as last_crawl_at,
|
||||
(SELECT COUNT(*) FROM worker_tasks t
|
||||
JOIN dispensaries d2 ON t.dispensary_id = d2.id
|
||||
WHERE d2.state_id = s.id
|
||||
AND t.role = 'product_discovery'
|
||||
AND t.status IN ('pending', 'claimed', 'running'))::int as active_tasks
|
||||
FROM states s
|
||||
LEFT JOIN dispensaries d ON d.state_id = s.id
|
||||
GROUP BY s.id, s.code, s.name
|
||||
HAVING COUNT(d.id) > 0
|
||||
ORDER BY COUNT(d.id) DESC
|
||||
`);
|
||||
|
||||
res.json({
|
||||
states: result.rows,
|
||||
total_states: result.rows.length,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error listing states:', error);
|
||||
res.status(500).json({ error: 'Failed to list states' });
|
||||
}
|
||||
});
|
||||
|
||||
// ============================================================
|
||||
// TASK POOL MANAGEMENT
|
||||
// ============================================================
|
||||
|
||||
Reference in New Issue
Block a user