feat: Support per-dispensary schedules (not just per-state)
- Add dispensary_id column to task_schedules table - Update scheduler to handle single-dispensary schedules - Update run-now endpoint to handle single-dispensary schedules - Update frontend modal to pass dispensary_id when 1 store selected - Fix existing "Deeply Rooted Hourly" schedule with dispensary_id=112 Now when you select ONE store and check "Make recurring", it creates a schedule that runs for that specific store every interval. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
12
backend/migrations/103_schedule_dispensary_id.sql
Normal file
12
backend/migrations/103_schedule_dispensary_id.sql
Normal file
@@ -0,0 +1,12 @@
|
||||
-- Migration: 103_schedule_dispensary_id.sql
|
||||
-- Description: Add dispensary_id to task_schedules for per-store schedules
|
||||
-- Created: 2025-12-13
|
||||
|
||||
-- Add dispensary_id column for single-store schedules
|
||||
ALTER TABLE task_schedules
|
||||
ADD COLUMN IF NOT EXISTS dispensary_id INTEGER REFERENCES dispensaries(id);
|
||||
|
||||
-- Index for quick lookups
|
||||
CREATE INDEX IF NOT EXISTS idx_task_schedules_dispensary_id ON task_schedules(dispensary_id);
|
||||
|
||||
COMMENT ON COLUMN task_schedules.dispensary_id IS 'For single-store schedules. If set, only this store is refreshed. If NULL, uses state_code for all stores in state.';
|
||||
BIN
backend/public/downloads/cannaiq-menus-1.7.0.zip
Normal file
BIN
backend/public/downloads/cannaiq-menus-1.7.0.zip
Normal file
Binary file not shown.
@@ -1 +1 @@
|
||||
cannaiq-menus-1.6.0.zip
|
||||
cannaiq-menus-1.7.0.zip
|
||||
@@ -286,6 +286,7 @@ router.post('/schedules', async (req: Request, res: Response) => {
|
||||
interval_hours,
|
||||
priority = 0,
|
||||
state_code,
|
||||
dispensary_id,
|
||||
platform,
|
||||
} = req.body;
|
||||
|
||||
@@ -300,12 +301,12 @@ router.post('/schedules', async (req: Request, res: Response) => {
|
||||
|
||||
const result = await pool.query(`
|
||||
INSERT INTO task_schedules
|
||||
(name, role, description, enabled, interval_hours, priority, state_code, platform, next_run_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
(name, role, description, enabled, interval_hours, priority, state_code, dispensary_id, platform, next_run_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
RETURNING id, name, role, description, enabled, interval_hours,
|
||||
priority, state_code, platform, last_run_at, next_run_at,
|
||||
priority, state_code, dispensary_id, platform, last_run_at, next_run_at,
|
||||
last_task_count, last_error, created_at, updated_at
|
||||
`, [name, role, description, enabled, interval_hours, priority, state_code, platform, nextRunAt]);
|
||||
`, [name, role, description, enabled, interval_hours, priority, state_code, dispensary_id, platform, nextRunAt]);
|
||||
|
||||
res.status(201).json(result.rows[0]);
|
||||
} catch (error: any) {
|
||||
@@ -536,7 +537,7 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
|
||||
|
||||
// Get the full schedule
|
||||
const scheduleResult = await pool.query(`
|
||||
SELECT id, name, role, state_code, platform, priority, interval_hours, method
|
||||
SELECT id, name, role, state_code, dispensary_id, platform, priority, interval_hours, method
|
||||
FROM task_schedules WHERE id = $1
|
||||
`, [scheduleId]);
|
||||
|
||||
@@ -547,9 +548,45 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
|
||||
const schedule = scheduleResult.rows[0];
|
||||
let tasksCreated = 0;
|
||||
|
||||
// For product crawl roles with state_code, fan out to individual stores
|
||||
const isCrawlRole = ['product_discovery', 'product_refresh', 'payload_fetch'].includes(schedule.role);
|
||||
if (isCrawlRole && schedule.state_code) {
|
||||
|
||||
// Single-dispensary schedule (e.g., "Deeply Rooted Hourly")
|
||||
if (isCrawlRole && schedule.dispensary_id) {
|
||||
// Check if this specific store can be refreshed (no pending task)
|
||||
const storeResult = await pool.query(`
|
||||
SELECT d.id, d.name
|
||||
FROM dispensaries d
|
||||
WHERE d.id = $1
|
||||
AND d.crawl_enabled = true
|
||||
AND d.platform_dispensary_id IS NOT NULL
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM worker_tasks t
|
||||
WHERE t.dispensary_id = d.id
|
||||
AND t.role IN ('product_discovery', 'product_refresh', 'payload_fetch')
|
||||
AND t.status IN ('pending', 'claimed', 'running')
|
||||
)
|
||||
`, [schedule.dispensary_id]);
|
||||
|
||||
if (storeResult.rows.length > 0) {
|
||||
await taskService.createTask({
|
||||
role: 'product_discovery',
|
||||
dispensary_id: schedule.dispensary_id,
|
||||
platform: schedule.platform || 'dutchie',
|
||||
priority: schedule.priority + 10,
|
||||
method: schedule.method || 'http',
|
||||
});
|
||||
tasksCreated = 1;
|
||||
} else {
|
||||
return res.json({
|
||||
success: true,
|
||||
message: `Store ${schedule.dispensary_id} has a pending task or is disabled`,
|
||||
tasksCreated: 0,
|
||||
dispensaryId: schedule.dispensary_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
// Per-state schedule (e.g., "AZ Product Refresh")
|
||||
else if (isCrawlRole && schedule.state_code) {
|
||||
// Find stores in this state needing refresh
|
||||
const storeResult = await pool.query(`
|
||||
SELECT d.id
|
||||
@@ -599,9 +636,9 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
|
||||
});
|
||||
tasksCreated = 1;
|
||||
} else {
|
||||
// Crawl role without state_code - shouldn't happen, reject
|
||||
// Crawl role without dispensary_id or state_code - reject
|
||||
return res.status(400).json({
|
||||
error: `${schedule.role} schedules require a state_code`,
|
||||
error: `${schedule.role} schedules require a dispensary_id or state_code`,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ interface TaskSchedule {
|
||||
last_run_at: Date | null;
|
||||
next_run_at: Date | null;
|
||||
state_code: string | null;
|
||||
dispensary_id: number | null; // For single-store schedules
|
||||
priority: number;
|
||||
method: 'curl' | 'http' | null;
|
||||
is_immutable: boolean;
|
||||
@@ -245,44 +246,75 @@ class TaskScheduler {
|
||||
* - Easier debugging and monitoring per state
|
||||
*/
|
||||
private async generateProductDiscoveryTasks(schedule: TaskSchedule): Promise<number> {
|
||||
// state_code is required for per-state schedules
|
||||
if (!schedule.state_code) {
|
||||
console.warn(`[TaskScheduler] Schedule ${schedule.name} has no state_code, skipping`);
|
||||
let dispensaryIds: number[] = [];
|
||||
|
||||
// Single-dispensary schedule (e.g., "Deeply Rooted Hourly")
|
||||
if (schedule.dispensary_id) {
|
||||
// Check if this specific store needs refresh
|
||||
const result = await pool.query(`
|
||||
SELECT d.id
|
||||
FROM dispensaries d
|
||||
WHERE d.id = $1
|
||||
AND d.crawl_enabled = true
|
||||
AND d.platform_dispensary_id IS NOT NULL
|
||||
-- No pending/running crawl task already
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM worker_tasks t
|
||||
WHERE t.dispensary_id = d.id
|
||||
AND t.role IN ('product_discovery', 'product_refresh', 'payload_fetch')
|
||||
AND t.status IN ('pending', 'claimed', 'running')
|
||||
)
|
||||
`, [schedule.dispensary_id]);
|
||||
|
||||
dispensaryIds = result.rows.map((r: { id: number }) => r.id);
|
||||
|
||||
if (dispensaryIds.length === 0) {
|
||||
console.log(`[TaskScheduler] Store ${schedule.dispensary_id} has pending task or is disabled`);
|
||||
return 0;
|
||||
}
|
||||
|
||||
console.log(`[TaskScheduler] Creating task for single store ${schedule.dispensary_id} (${schedule.name})`);
|
||||
}
|
||||
// Per-state schedule (e.g., "AZ Product Refresh")
|
||||
else if (schedule.state_code) {
|
||||
// Find stores in this state needing refresh
|
||||
const result = await pool.query(`
|
||||
SELECT d.id
|
||||
FROM dispensaries d
|
||||
JOIN states s ON d.state_id = s.id
|
||||
WHERE d.crawl_enabled = true
|
||||
AND d.platform_dispensary_id IS NOT NULL
|
||||
AND s.code = $1
|
||||
-- No pending/running product_discovery task already
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM worker_tasks t
|
||||
WHERE t.dispensary_id = d.id
|
||||
AND t.role = 'product_discovery'
|
||||
AND t.status IN ('pending', 'claimed', 'running')
|
||||
)
|
||||
-- Never fetched OR last fetch > interval ago
|
||||
AND (
|
||||
d.last_fetch_at IS NULL
|
||||
OR d.last_fetch_at < NOW() - ($2 || ' hours')::interval
|
||||
)
|
||||
ORDER BY d.last_fetch_at NULLS FIRST, d.id
|
||||
`, [schedule.state_code, schedule.interval_hours]);
|
||||
|
||||
dispensaryIds = result.rows.map((r: { id: number }) => r.id);
|
||||
|
||||
if (dispensaryIds.length === 0) {
|
||||
console.log(`[TaskScheduler] No stores in ${schedule.state_code} need refresh`);
|
||||
return 0;
|
||||
}
|
||||
|
||||
console.log(`[TaskScheduler] Creating ${dispensaryIds.length} product_discovery tasks for ${schedule.state_code}`);
|
||||
}
|
||||
// No dispensary_id or state_code - invalid schedule
|
||||
else {
|
||||
console.warn(`[TaskScheduler] Schedule ${schedule.name} has no dispensary_id or state_code, skipping`);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Find stores in this state needing refresh
|
||||
const result = await pool.query(`
|
||||
SELECT d.id
|
||||
FROM dispensaries d
|
||||
JOIN states s ON d.state_id = s.id
|
||||
WHERE d.crawl_enabled = true
|
||||
AND d.platform_dispensary_id IS NOT NULL
|
||||
AND s.code = $1
|
||||
-- No pending/running product_discovery task already
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM worker_tasks t
|
||||
WHERE t.dispensary_id = d.id
|
||||
AND t.role = 'product_discovery'
|
||||
AND t.status IN ('pending', 'claimed', 'running')
|
||||
)
|
||||
-- Never fetched OR last fetch > interval ago
|
||||
AND (
|
||||
d.last_fetch_at IS NULL
|
||||
OR d.last_fetch_at < NOW() - ($2 || ' hours')::interval
|
||||
)
|
||||
ORDER BY d.last_fetch_at NULLS FIRST, d.id
|
||||
`, [schedule.state_code, schedule.interval_hours]);
|
||||
|
||||
const dispensaryIds = result.rows.map((r: { id: number }) => r.id);
|
||||
|
||||
if (dispensaryIds.length === 0) {
|
||||
console.log(`[TaskScheduler] No stores in ${schedule.state_code} need refresh`);
|
||||
return 0;
|
||||
}
|
||||
|
||||
console.log(`[TaskScheduler] Creating ${dispensaryIds.length} product_discovery tasks for ${schedule.state_code}`);
|
||||
|
||||
// Create product_discovery tasks with HTTP transport
|
||||
// Stagger by 15 seconds to prevent overwhelming proxies
|
||||
const { created } = await taskService.createStaggeredTasks(
|
||||
|
||||
Reference in New Issue
Block a user