fix: Handle product_refresh and payload_fetch in run-now endpoint

The run-now endpoint only fanned out to stores for product_discovery
schedules, not product_refresh or payload_fetch. This caused single
tasks to be created without dispensary_id, which then failed.

Now all crawl roles (product_discovery, product_refresh, payload_fetch)
with state_code properly fan out to individual store tasks.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-13 03:49:10 -07:00
parent 5084cb1a85
commit c969c7385b

View File

@@ -547,8 +547,9 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
const schedule = scheduleResult.rows[0];
let tasksCreated = 0;
// For product_discovery with state_code, fan out to individual stores
if (schedule.role === 'product_discovery' && schedule.state_code) {
// For product crawl roles with state_code, fan out to individual stores
const isCrawlRole = ['product_discovery', 'product_refresh', 'payload_fetch'].includes(schedule.role);
if (isCrawlRole && schedule.state_code) {
// Find stores in this state needing refresh
const storeResult = await pool.query(`
SELECT d.id
@@ -557,11 +558,11 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
WHERE d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND s.code = $1
-- No pending/running product_discovery task already
-- No pending/running crawl task already
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role = 'product_discovery'
AND t.role IN ('product_discovery', 'product_refresh', 'payload_fetch')
AND t.status IN ('pending', 'claimed', 'running')
)
ORDER BY d.last_fetch_at NULLS FIRST, d.id
@@ -570,10 +571,10 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length > 0) {
// Create staggered tasks for all stores
// Create staggered tasks for all stores (always use product_discovery role)
const result = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery',
'product_discovery', // Normalize to product_discovery
15, // 15 seconds stagger
schedule.platform || 'dutchie',
schedule.method || 'http'
@@ -588,7 +589,7 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
stateCode: schedule.state_code,
});
}
} else if (schedule.role !== 'product_discovery') {
} else if (!isCrawlRole) {
// For other schedules (store_discovery, analytics_refresh), create a single task
await taskService.createTask({
role: schedule.role,
@@ -598,9 +599,9 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
});
tasksCreated = 1;
} else {
// product_discovery without state_code - shouldn't happen, reject
// Crawl role without state_code - shouldn't happen, reject
return res.status(400).json({
error: 'product_discovery schedules require a state_code',
error: `${schedule.role} schedules require a state_code`,
});
}