From 832ef1cf83646b0a22229a913c12393a9b135099 Mon Sep 17 00:00:00 2001 From: Kelly Date: Fri, 12 Dec 2025 09:19:57 -0700 Subject: [PATCH] feat(scheduler): Immutable schedules and HTTP-only pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Changes - **Migration 089**: Add is_immutable and method columns to task_schedules - Per-state product_discovery schedules (4h default) - Store discovery weekly (168h) - All schedules use HTTP transport (Puppeteer/browser) - **Task Scheduler**: HTTP-only product discovery with per-state scheduling - Each state has its own immutable schedule - Schedules can be edited (interval/priority) but not deleted - **TasksDashboard UI**: Full immutability support - Lock icon for immutable schedules - State and Method columns in schedules table - Disabled delete for immutable, restricted edit fields - **Store Discovery HTTP**: Auto-queue product_discovery for new stores - **Migration 088**: Discovery payloads storage schema 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/migrations/088_discovery_payloads.sql | 30 ++ .../migrations/089_immutable_schedules.sql | 105 ++++ backend/src/routes/tasks.ts | 170 +++++-- backend/src/services/task-scheduler.ts | 227 +++++++-- backend/src/tasks/handlers/index.ts | 1 + .../tasks/handlers/product-discovery-http.ts | 49 ++ .../tasks/handlers/store-discovery-http.ts | 480 ++++++++++++++++++ backend/src/tasks/task-service.ts | 78 ++- backend/src/tasks/task-worker.ts | 36 +- backend/src/utils/payload-storage.ts | 135 +++++ cannaiq/src/lib/api.ts | 2 + cannaiq/src/pages/TasksDashboard.tsx | 145 +++++- 12 files changed, 1349 insertions(+), 109 deletions(-) create mode 100644 backend/migrations/088_discovery_payloads.sql create mode 100644 backend/migrations/089_immutable_schedules.sql create mode 100644 backend/src/tasks/handlers/store-discovery-http.ts diff --git a/backend/migrations/088_discovery_payloads.sql b/backend/migrations/088_discovery_payloads.sql new file mode 100644 index 00000000..e32e3d20 --- /dev/null +++ b/backend/migrations/088_discovery_payloads.sql @@ -0,0 +1,30 @@ +-- Migration 088: Extend raw_crawl_payloads for discovery payloads +-- +-- Enables saving raw store data from Dutchie discovery crawls. +-- Store discovery returns raw dispensary objects - save them for historical analysis. + +-- Add payload_type to distinguish product crawls from discovery crawls +ALTER TABLE raw_crawl_payloads +ADD COLUMN IF NOT EXISTS payload_type VARCHAR(32) NOT NULL DEFAULT 'product'; + +-- Add state_code for discovery payloads (null for product payloads) +ALTER TABLE raw_crawl_payloads +ADD COLUMN IF NOT EXISTS state_code VARCHAR(10); + +-- Add store_count for discovery payloads (alternative to product_count) +ALTER TABLE raw_crawl_payloads +ADD COLUMN IF NOT EXISTS store_count INTEGER; + +-- Make dispensary_id nullable for discovery payloads +ALTER TABLE raw_crawl_payloads +ALTER COLUMN dispensary_id DROP NOT NULL; + +-- Add index for discovery payload queries +CREATE INDEX IF NOT EXISTS idx_raw_crawl_payloads_type_state + ON raw_crawl_payloads(payload_type, state_code) + WHERE payload_type = 'store_discovery'; + +-- Comments +COMMENT ON COLUMN raw_crawl_payloads.payload_type IS 'Type: product (default), store_discovery'; +COMMENT ON COLUMN raw_crawl_payloads.state_code IS 'State code for discovery payloads (e.g., AZ, MI)'; +COMMENT ON COLUMN raw_crawl_payloads.store_count IS 'Number of stores in discovery payload'; diff --git a/backend/migrations/089_immutable_schedules.sql b/backend/migrations/089_immutable_schedules.sql new file mode 100644 index 00000000..624cda69 --- /dev/null +++ b/backend/migrations/089_immutable_schedules.sql @@ -0,0 +1,105 @@ +-- Migration 089: Immutable Schedules with Per-State Product Discovery +-- +-- Key changes: +-- 1. Add is_immutable column - schedules can be edited but not deleted +-- 2. Add method column - all tasks use 'http' (Puppeteer transport) +-- 3. Store discovery weekly (168h) +-- 4. Per-state product_discovery schedules (4h default) +-- 5. Remove old payload_fetch schedules + +-- ===================================================== +-- 1) Add new columns to task_schedules +-- ===================================================== +ALTER TABLE task_schedules +ADD COLUMN IF NOT EXISTS is_immutable BOOLEAN DEFAULT FALSE; + +ALTER TABLE task_schedules +ADD COLUMN IF NOT EXISTS method VARCHAR(10) DEFAULT 'http'; + +-- ===================================================== +-- 2) Update store_discovery to weekly and immutable +-- ===================================================== +UPDATE task_schedules +SET interval_hours = 168, -- 7 days + is_immutable = TRUE, + method = 'http', + description = 'Discover new Dutchie stores weekly (HTTP transport)' +WHERE name = 'store_discovery_dutchie'; + +-- Insert if doesn't exist +INSERT INTO task_schedules (name, role, interval_hours, priority, description, is_immutable, method, platform, next_run_at) +VALUES ('store_discovery_dutchie', 'store_discovery', 168, 5, 'Discover new Dutchie stores weekly (HTTP transport)', TRUE, 'http', 'dutchie', NOW()) +ON CONFLICT (name) DO UPDATE SET + interval_hours = 168, + is_immutable = TRUE, + method = 'http', + description = 'Discover new Dutchie stores weekly (HTTP transport)'; + +-- ===================================================== +-- 3) Remove old payload_fetch and product_refresh_all schedules +-- ===================================================== +DELETE FROM task_schedules WHERE name IN ('payload_fetch_all', 'product_refresh_all'); + +-- ===================================================== +-- 4) Create per-state product_discovery schedules +-- ===================================================== +-- One schedule per state that has dispensaries with active cannabis programs +INSERT INTO task_schedules (name, role, state_code, interval_hours, priority, description, is_immutable, method, enabled, next_run_at) +SELECT + 'product_discovery_' || lower(s.code) AS name, + 'product_discovery' AS role, + s.code AS state_code, + 4 AS interval_hours, -- 4 hours default, editable + 10 AS priority, + 'Product discovery for ' || s.name || ' dispensaries (HTTP transport)' AS description, + TRUE AS is_immutable, -- Can edit but not delete + 'http' AS method, + CASE WHEN s.is_active THEN TRUE ELSE FALSE END AS enabled, + -- Stagger start times: each state starts 5 minutes after the previous + NOW() + (ROW_NUMBER() OVER (ORDER BY s.code) * INTERVAL '5 minutes') AS next_run_at +FROM states s +WHERE EXISTS ( + SELECT 1 FROM dispensaries d + WHERE d.state_id = s.id AND d.crawl_enabled = true +) +ON CONFLICT (name) DO UPDATE SET + is_immutable = TRUE, + method = 'http', + description = EXCLUDED.description; + +-- Also create schedules for states that might have stores discovered later +INSERT INTO task_schedules (name, role, state_code, interval_hours, priority, description, is_immutable, method, enabled, next_run_at) +SELECT + 'product_discovery_' || lower(s.code) AS name, + 'product_discovery' AS role, + s.code AS state_code, + 4 AS interval_hours, + 10 AS priority, + 'Product discovery for ' || s.name || ' dispensaries (HTTP transport)' AS description, + TRUE AS is_immutable, + 'http' AS method, + FALSE AS enabled, -- Disabled until stores exist + NOW() + INTERVAL '1 hour' +FROM states s +WHERE NOT EXISTS ( + SELECT 1 FROM task_schedules ts WHERE ts.name = 'product_discovery_' || lower(s.code) +) +ON CONFLICT (name) DO NOTHING; + +-- ===================================================== +-- 5) Make analytics_refresh immutable +-- ===================================================== +UPDATE task_schedules +SET is_immutable = TRUE, method = 'http' +WHERE name = 'analytics_refresh'; + +-- ===================================================== +-- 6) Add index for schedule lookups +-- ===================================================== +CREATE INDEX IF NOT EXISTS idx_task_schedules_state_code + ON task_schedules(state_code) + WHERE state_code IS NOT NULL; + +-- Comments +COMMENT ON COLUMN task_schedules.is_immutable IS 'If TRUE, schedule cannot be deleted (only edited)'; +COMMENT ON COLUMN task_schedules.method IS 'Transport method: http (Puppeteer/browser) or curl (axios)'; diff --git a/backend/src/routes/tasks.ts b/backend/src/routes/tasks.ts index b1cbbc4f..6b8783d1 100644 --- a/backend/src/routes/tasks.ts +++ b/backend/src/routes/tasks.ts @@ -157,6 +157,9 @@ router.get('/capacity/:role', async (req: Request, res: Response) => { /** * GET /api/tasks/schedules * List all task schedules + * + * Returns schedules with is_immutable flag - immutable schedules can only + * have their interval_hours, priority, and enabled fields updated (not deleted). */ router.get('/schedules', async (req: Request, res: Response) => { try { @@ -164,7 +167,9 @@ router.get('/schedules', async (req: Request, res: Response) => { let query = ` SELECT id, name, role, description, enabled, interval_hours, - priority, state_code, platform, last_run_at, next_run_at, + priority, state_code, platform, method, + COALESCE(is_immutable, false) as is_immutable, + last_run_at, next_run_at, last_task_count, last_error, created_at, updated_at FROM task_schedules `; @@ -173,7 +178,15 @@ router.get('/schedules', async (req: Request, res: Response) => { query += ` WHERE enabled = true`; } - query += ` ORDER BY name`; + query += ` ORDER BY + CASE role + WHEN 'store_discovery' THEN 1 + WHEN 'product_discovery' THEN 2 + WHEN 'analytics_refresh' THEN 3 + ELSE 4 + END, + state_code NULLS FIRST, + name`; const result = await pool.query(query); res.json({ schedules: result.rows }); @@ -187,25 +200,45 @@ router.get('/schedules', async (req: Request, res: Response) => { * DELETE /api/tasks/schedules * Bulk delete schedules * + * Immutable schedules are automatically skipped (not deleted). + * * Body: * - ids: number[] (required) - array of schedule IDs to delete - * - all: boolean (optional) - if true, delete all schedules (ids ignored) + * - all: boolean (optional) - if true, delete all non-immutable schedules (ids ignored) */ router.delete('/schedules', async (req: Request, res: Response) => { try { const { ids, all } = req.body; let result; + let skippedImmutable: { id: number; name: string }[] = []; if (all === true) { - // Delete all schedules + // First, find immutable schedules that will be skipped + const immutableResult = await pool.query(` + SELECT id, name FROM task_schedules WHERE is_immutable = true + `); + skippedImmutable = immutableResult.rows; + + // Delete all non-immutable schedules result = await pool.query(` - DELETE FROM task_schedules RETURNING id, name + DELETE FROM task_schedules + WHERE COALESCE(is_immutable, false) = false + RETURNING id, name `); } else if (Array.isArray(ids) && ids.length > 0) { - // Delete specific schedules by IDs + // First, find which of the requested IDs are immutable + const immutableResult = await pool.query(` + SELECT id, name FROM task_schedules + WHERE id = ANY($1) AND is_immutable = true + `, [ids]); + skippedImmutable = immutableResult.rows; + + // Delete only non-immutable schedules from the requested IDs result = await pool.query(` - DELETE FROM task_schedules WHERE id = ANY($1) RETURNING id, name + DELETE FROM task_schedules + WHERE id = ANY($1) AND COALESCE(is_immutable, false) = false + RETURNING id, name `, [ids]); } else { return res.status(400).json({ @@ -217,7 +250,11 @@ router.delete('/schedules', async (req: Request, res: Response) => { success: true, deleted_count: result.rowCount, deleted: result.rows, - message: `Deleted ${result.rowCount} schedule(s)`, + skipped_immutable_count: skippedImmutable.length, + skipped_immutable: skippedImmutable, + message: skippedImmutable.length > 0 + ? `Deleted ${result.rowCount} schedule(s), skipped ${skippedImmutable.length} immutable schedule(s)` + : `Deleted ${result.rowCount} schedule(s)`, }); } catch (error: unknown) { console.error('Error bulk deleting schedules:', error); @@ -311,6 +348,13 @@ router.get('/schedules/:id', async (req: Request, res: Response) => { /** * PUT /api/tasks/schedules/:id * Update an existing schedule + * + * For IMMUTABLE schedules, only these fields can be updated: + * - enabled (turn on/off) + * - interval_hours (change frequency) + * - priority (change priority) + * + * For regular schedules, all fields can be updated. */ router.put('/schedules/:id', async (req: Request, res: Response) => { try { @@ -326,23 +370,68 @@ router.put('/schedules/:id', async (req: Request, res: Response) => { platform, } = req.body; + // First check if schedule exists and if it's immutable + const checkResult = await pool.query(` + SELECT id, name, COALESCE(is_immutable, false) as is_immutable + FROM task_schedules WHERE id = $1 + `, [scheduleId]); + + if (checkResult.rows.length === 0) { + return res.status(404).json({ error: 'Schedule not found' }); + } + + const schedule = checkResult.rows[0]; + const isImmutable = schedule.is_immutable; + + // For immutable schedules, reject attempts to change protected fields + if (isImmutable) { + const protectedFields: string[] = []; + if (name !== undefined) protectedFields.push('name'); + if (role !== undefined) protectedFields.push('role'); + if (description !== undefined) protectedFields.push('description'); + if (state_code !== undefined) protectedFields.push('state_code'); + if (platform !== undefined) protectedFields.push('platform'); + + if (protectedFields.length > 0) { + return res.status(403).json({ + error: 'Cannot modify protected fields on immutable schedule', + message: `Schedule "${schedule.name}" is immutable. Only enabled, interval_hours, and priority can be changed.`, + protected_fields: protectedFields, + allowed_fields: ['enabled', 'interval_hours', 'priority'], + }); + } + } + // Build dynamic update query const updates: string[] = []; const values: any[] = []; let paramIndex = 1; - if (name !== undefined) { - updates.push(`name = $${paramIndex++}`); - values.push(name); - } - if (role !== undefined) { - updates.push(`role = $${paramIndex++}`); - values.push(role); - } - if (description !== undefined) { - updates.push(`description = $${paramIndex++}`); - values.push(description); + // These fields can only be updated on non-immutable schedules + if (!isImmutable) { + if (name !== undefined) { + updates.push(`name = $${paramIndex++}`); + values.push(name); + } + if (role !== undefined) { + updates.push(`role = $${paramIndex++}`); + values.push(role); + } + if (description !== undefined) { + updates.push(`description = $${paramIndex++}`); + values.push(description); + } + if (state_code !== undefined) { + updates.push(`state_code = $${paramIndex++}`); + values.push(state_code || null); + } + if (platform !== undefined) { + updates.push(`platform = $${paramIndex++}`); + values.push(platform || null); + } } + + // These fields can be updated on ALL schedules (including immutable) if (enabled !== undefined) { updates.push(`enabled = $${paramIndex++}`); values.push(enabled); @@ -360,14 +449,6 @@ router.put('/schedules/:id', async (req: Request, res: Response) => { updates.push(`priority = $${paramIndex++}`); values.push(priority); } - if (state_code !== undefined) { - updates.push(`state_code = $${paramIndex++}`); - values.push(state_code || null); - } - if (platform !== undefined) { - updates.push(`platform = $${paramIndex++}`); - values.push(platform || null); - } if (updates.length === 0) { return res.status(400).json({ error: 'No fields to update' }); @@ -381,14 +462,12 @@ router.put('/schedules/:id', async (req: Request, res: Response) => { SET ${updates.join(', ')} WHERE id = $${paramIndex} RETURNING id, name, role, description, enabled, interval_hours, - priority, state_code, platform, last_run_at, next_run_at, + priority, state_code, platform, method, + COALESCE(is_immutable, false) as is_immutable, + last_run_at, next_run_at, last_task_count, last_error, created_at, updated_at `, values); - if (result.rows.length === 0) { - return res.status(404).json({ error: 'Schedule not found' }); - } - res.json(result.rows[0]); } catch (error: any) { if (error.code === '23505') { @@ -402,22 +481,41 @@ router.put('/schedules/:id', async (req: Request, res: Response) => { /** * DELETE /api/tasks/schedules/:id * Delete a schedule + * + * Immutable schedules cannot be deleted - they can only be disabled. */ router.delete('/schedules/:id', async (req: Request, res: Response) => { try { const scheduleId = parseInt(req.params.id, 10); - const result = await pool.query(` - DELETE FROM task_schedules WHERE id = $1 RETURNING id, name + // First check if schedule exists and is immutable + const checkResult = await pool.query(` + SELECT id, name, COALESCE(is_immutable, false) as is_immutable + FROM task_schedules WHERE id = $1 `, [scheduleId]); - if (result.rows.length === 0) { + if (checkResult.rows.length === 0) { return res.status(404).json({ error: 'Schedule not found' }); } + const schedule = checkResult.rows[0]; + + // Prevent deletion of immutable schedules + if (schedule.is_immutable) { + return res.status(403).json({ + error: 'Cannot delete immutable schedule', + message: `Schedule "${schedule.name}" is immutable and cannot be deleted. You can disable it instead.`, + schedule_id: scheduleId, + is_immutable: true, + }); + } + + // Delete the schedule + await pool.query(`DELETE FROM task_schedules WHERE id = $1`, [scheduleId]); + res.json({ success: true, - message: `Schedule "${result.rows[0].name}" deleted`, + message: `Schedule "${schedule.name}" deleted`, }); } catch (error: unknown) { console.error('Error deleting schedule:', error); diff --git a/backend/src/services/task-scheduler.ts b/backend/src/services/task-scheduler.ts index 87b1f3e3..a3573361 100644 --- a/backend/src/services/task-scheduler.ts +++ b/backend/src/services/task-scheduler.ts @@ -26,6 +26,12 @@ interface TaskSchedule { next_run_at: Date | null; state_code: string | null; priority: number; + method: 'curl' | 'http' | null; + is_immutable: boolean; + description: string | null; + platform: string | null; + last_task_count: number | null; + last_error: string | null; } class TaskScheduler { @@ -84,24 +90,22 @@ class TaskScheduler { /** * Ensure default schedules exist in the database * Per TASK_WORKFLOW_2024-12-10.md: Creates schedules if they don't exist + * + * NOTE: Per-state product_discovery schedules are created by migration 089. + * This only creates core immutable schedules that should exist regardless. */ private async ensureDefaultSchedules(): Promise { - // Per TASK_WORKFLOW_2024-12-10.md: Default schedules for task generation - // NOTE: payload_fetch replaces direct product_refresh - it chains to product_refresh + // Core schedules - all use HTTP transport for browser-based scraping const defaults = [ - { - name: 'payload_fetch_all', - role: 'payload_fetch' as TaskRole, - interval_hours: 4, - priority: 0, - description: 'Fetch payloads from Dutchie API for all crawl-enabled stores every 4 hours. Chains to product_refresh.', - }, { name: 'store_discovery_dutchie', role: 'store_discovery' as TaskRole, - interval_hours: 24, + interval_hours: 168, // Weekly priority: 5, - description: 'Discover new Dutchie stores daily', + description: 'Discover new Dutchie stores weekly (HTTP transport)', + method: 'http', + is_immutable: true, + platform: 'dutchie', }, { name: 'analytics_refresh', @@ -109,16 +113,21 @@ class TaskScheduler { interval_hours: 6, priority: 0, description: 'Refresh analytics materialized views every 6 hours', + method: 'http', + is_immutable: true, + platform: null, }, ]; for (const sched of defaults) { try { await pool.query(` - INSERT INTO task_schedules (name, role, interval_hours, priority, description, enabled, next_run_at) - VALUES ($1, $2, $3, $4, $5, true, NOW()) - ON CONFLICT (name) DO NOTHING - `, [sched.name, sched.role, sched.interval_hours, sched.priority, sched.description]); + INSERT INTO task_schedules (name, role, interval_hours, priority, description, method, is_immutable, platform, enabled, next_run_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true, NOW()) + ON CONFLICT (name) DO UPDATE SET + method = EXCLUDED.method, + is_immutable = EXCLUDED.is_immutable + `, [sched.name, sched.role, sched.interval_hours, sched.priority, sched.description, sched.method, sched.is_immutable, sched.platform]); } catch (err: any) { // Table may not exist yet - will be created by migration if (!err.message.includes('does not exist')) { @@ -192,16 +201,27 @@ class TaskScheduler { /** * Execute a schedule and create tasks * Per TASK_WORKFLOW_2024-12-10.md: Different logic per role + * + * TRANSPORT MODES: + * - All schedules now use HTTP transport (Puppeteer/browser) + * - Per-state product_discovery schedules process one state at a time + * - Workers must pass HTTP preflight to claim HTTP tasks */ private async executeSchedule(schedule: TaskSchedule): Promise { switch (schedule.role) { + case 'product_discovery': + // Per-state product discovery using HTTP transport + return this.generateProductDiscoveryTasks(schedule); + case 'payload_fetch': - // Per TASK_WORKFLOW_2024-12-10.md: payload_fetch replaces direct product_refresh - return this.generatePayloadFetchTasks(schedule); + // DEPRECATED: Legacy payload_fetch redirects to product_discovery + console.log(`[TaskScheduler] payload_fetch is deprecated, using product_discovery instead`); + return this.generateProductDiscoveryTasks(schedule); case 'product_refresh': - // Legacy - kept for manual triggers, but scheduled crawls use payload_fetch - return this.generatePayloadFetchTasks(schedule); + // DEPRECATED: Legacy product_refresh redirects to product_discovery + console.log(`[TaskScheduler] product_refresh is deprecated, using product_discovery instead`); + return this.generateProductDiscoveryTasks(schedule); case 'store_discovery': return this.generateStoreDiscoveryTasks(schedule); @@ -216,50 +236,69 @@ class TaskScheduler { } /** - * Generate payload_fetch tasks for stores that need crawling - * Per TASK_WORKFLOW_2024-12-10.md: payload_fetch hits API, saves to disk, chains to product_refresh + * Generate product_discovery tasks for stores in a specific state + * Uses HTTP transport (Puppeteer/browser) for all tasks + * + * Per-state scheduling allows: + * - Different crawl frequencies per state (e.g., AZ=4h, MI=6h) + * - Better rate limit management (one state at a time) + * - Easier debugging and monitoring per state */ - private async generatePayloadFetchTasks(schedule: TaskSchedule): Promise { - // Per TASK_WORKFLOW_2024-12-10.md: Find stores needing refresh + private async generateProductDiscoveryTasks(schedule: TaskSchedule): Promise { + // state_code is required for per-state schedules + if (!schedule.state_code) { + console.warn(`[TaskScheduler] Schedule ${schedule.name} has no state_code, skipping`); + return 0; + } + + // Find stores in this state needing refresh const result = await pool.query(` SELECT d.id FROM dispensaries d + JOIN states s ON d.state_id = s.id WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NOT NULL - -- No pending/running payload_fetch or product_refresh task already + AND s.code = $1 + -- No pending/running product_discovery task already AND NOT EXISTS ( SELECT 1 FROM worker_tasks t WHERE t.dispensary_id = d.id - AND t.role IN ('payload_fetch', 'product_refresh') + AND t.role = 'product_discovery' AND t.status IN ('pending', 'claimed', 'running') ) -- Never fetched OR last fetch > interval ago AND ( d.last_fetch_at IS NULL - OR d.last_fetch_at < NOW() - ($1 || ' hours')::interval + OR d.last_fetch_at < NOW() - ($2 || ' hours')::interval ) - ${schedule.state_code ? 'AND d.state_id = (SELECT id FROM states WHERE code = $2)' : ''} - `, schedule.state_code ? [schedule.interval_hours, schedule.state_code] : [schedule.interval_hours]); + ORDER BY d.last_fetch_at NULLS FIRST, d.id + `, [schedule.state_code, schedule.interval_hours]); const dispensaryIds = result.rows.map((r: { id: number }) => r.id); if (dispensaryIds.length === 0) { + console.log(`[TaskScheduler] No stores in ${schedule.state_code} need refresh`); return 0; } - // Per TASK_WORKFLOW_2024-12-10.md: Create payload_fetch tasks (they chain to product_refresh) - const tasks = dispensaryIds.map((id: number) => ({ - role: 'payload_fetch' as TaskRole, - dispensary_id: id, - priority: schedule.priority, - })); + console.log(`[TaskScheduler] Creating ${dispensaryIds.length} product_discovery tasks for ${schedule.state_code}`); - return taskService.createTasks(tasks); + // Create product_discovery tasks with HTTP transport + // Stagger by 15 seconds to prevent overwhelming proxies + const { created } = await taskService.createStaggeredTasks( + dispensaryIds, + 'product_discovery', + 15, // 15 seconds apart + schedule.platform || 'dutchie', + 'http' // Force HTTP transport + ); + + return created; } /** * Generate store_discovery tasks - * Per TASK_WORKFLOW_2024-12-10.md: One task per platform + * Uses HTTP transport (Puppeteer/browser) for browser-based discovery */ private async generateStoreDiscoveryTasks(schedule: TaskSchedule): Promise { // Check if discovery task already pending @@ -276,8 +315,9 @@ class TaskScheduler { await taskService.createTask({ role: 'store_discovery', - platform: 'dutchie', + platform: schedule.platform || 'dutchie', priority: schedule.priority, + method: 'http', // Force HTTP transport for browser-based discovery }); return 1; @@ -310,11 +350,39 @@ class TaskScheduler { /** * Get all schedules for dashboard display + * Returns schedules with full metadata including immutability flag */ async getSchedules(): Promise { try { const result = await pool.query(` - SELECT * FROM task_schedules ORDER BY name + SELECT + id, + name, + role, + enabled, + interval_hours, + last_run_at, + next_run_at, + state_code, + priority, + method, + COALESCE(is_immutable, false) as is_immutable, + description, + platform, + last_task_count, + last_error, + created_at, + updated_at + FROM task_schedules + ORDER BY + CASE role + WHEN 'store_discovery' THEN 1 + WHEN 'product_discovery' THEN 2 + WHEN 'analytics_refresh' THEN 3 + ELSE 4 + END, + state_code NULLS FIRST, + name `); return result.rows as TaskSchedule[]; } catch { @@ -322,8 +390,24 @@ class TaskScheduler { } } + /** + * Get a single schedule by ID + */ + async getSchedule(id: number): Promise { + try { + const result = await pool.query(` + SELECT * FROM task_schedules WHERE id = $1 + `, [id]); + return result.rows[0] as TaskSchedule || null; + } catch { + return null; + } + } + /** * Update a schedule + * Allows updating: enabled, interval_hours, priority + * Does NOT allow updating: name, role, state_code, is_immutable */ async updateSchedule(id: number, updates: Partial): Promise { const setClauses: string[] = []; @@ -355,6 +439,33 @@ class TaskScheduler { `, values); } + /** + * Delete a schedule (only if not immutable) + * Returns true if deleted, false if immutable + */ + async deleteSchedule(id: number): Promise<{ deleted: boolean; reason?: string }> { + // Check if schedule is immutable + const result = await pool.query(` + SELECT name, is_immutable FROM task_schedules WHERE id = $1 + `, [id]); + + if (result.rows.length === 0) { + return { deleted: false, reason: 'Schedule not found' }; + } + + const schedule = result.rows[0]; + + if (schedule.is_immutable) { + return { + deleted: false, + reason: `Schedule "${schedule.name}" is immutable and cannot be deleted. You can disable it instead.` + }; + } + + await pool.query(`DELETE FROM task_schedules WHERE id = $1`, [id]); + return { deleted: true }; + } + /** * Trigger a schedule to run immediately */ @@ -369,6 +480,46 @@ class TaskScheduler { return this.executeSchedule(result.rows[0] as TaskSchedule); } + + /** + * Get schedule statistics for dashboard + */ + async getScheduleStats(): Promise<{ + total: number; + enabled: number; + byRole: Record; + byState: Record; + }> { + try { + const result = await pool.query(` + SELECT + COUNT(*)::int as total, + SUM(CASE WHEN enabled THEN 1 ELSE 0 END)::int as enabled_count, + role, + state_code + FROM task_schedules + GROUP BY role, state_code + `); + + let total = 0; + let enabled = 0; + const byRole: Record = {}; + const byState: Record = {}; + + for (const row of result.rows) { + total += row.total; + enabled += row.enabled_count; + byRole[row.role] = (byRole[row.role] || 0) + row.total; + if (row.state_code) { + byState[row.state_code] = (byState[row.state_code] || 0) + row.total; + } + } + + return { total, enabled, byRole, byState }; + } catch { + return { total: 0, enabled: 0, byRole: {}, byState: {} }; + } + } } // Per TASK_WORKFLOW_2024-12-10.md: Singleton instance diff --git a/backend/src/tasks/handlers/index.ts b/backend/src/tasks/handlers/index.ts index ecdeac16..91a5c5a2 100644 --- a/backend/src/tasks/handlers/index.ts +++ b/backend/src/tasks/handlers/index.ts @@ -13,6 +13,7 @@ export { handleProductDiscoveryHttp } from './product-discovery-http'; export { handlePayloadFetch as handlePayloadFetchCurl } from './payload-fetch-curl'; export { handleProductRefresh } from './product-refresh'; export { handleStoreDiscovery } from './store-discovery'; +export { handleStoreDiscoveryHttp } from './store-discovery-http'; export { handleEntryPointDiscovery } from './entry-point-discovery'; export { handleAnalyticsRefresh } from './analytics-refresh'; export { handleWhoami } from './whoami'; diff --git a/backend/src/tasks/handlers/product-discovery-http.ts b/backend/src/tasks/handlers/product-discovery-http.ts index 56d51555..ac4aedea 100644 --- a/backend/src/tasks/handlers/product-discovery-http.ts +++ b/backend/src/tasks/handlers/product-discovery-http.ts @@ -122,6 +122,55 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise { + const buttons = Array.from(document.querySelectorAll('button')); + for (const btn of buttons) { + const text = btn.textContent?.toLowerCase() || ''; + if (text.includes('yes') || text.includes('enter') || text.includes('21')) { + (btn as HTMLButtonElement).click(); + return true; + } + } + return false; + }); + } catch (ageGateErr) { + // Age gate might not be present, continue + console.log(`[ProductDiscoveryHTTP] No age gate detected or already dismissed`); + } + console.log(`[ProductDiscoveryHTTP] Session established, fetching products...`); await ctx.heartbeat(); diff --git a/backend/src/tasks/handlers/store-discovery-http.ts b/backend/src/tasks/handlers/store-discovery-http.ts new file mode 100644 index 00000000..f9d688d6 --- /dev/null +++ b/backend/src/tasks/handlers/store-discovery-http.ts @@ -0,0 +1,480 @@ +/** + * Store Discovery HTTP Handler (Browser-based) + * + * Uses Puppeteer + StealthPlugin to discover stores via browser context. + * Based on product-discovery-http.ts pattern. + * + * This handler: + * 1. Launches headless browser with proxy (if provided) + * 2. Establishes session by visiting Dutchie dispensaries page + * 3. Fetches cities for each state via getAllCitiesByState GraphQL + * 4. Fetches stores for each city via ConsumerDispensaries GraphQL + * 5. Upserts to dutchie_discovery_locations + * 6. Auto-promotes valid locations to dispensaries table + * + * Why browser-based: + * - Works with session-based residential proxies (Evomi) + * - Lower detection risk than curl/axios + * - Real Chrome TLS fingerprint + */ + +import { TaskContext, TaskResult } from '../task-worker'; +import { upsertLocation } from '../../discovery/location-discovery'; +import { promoteDiscoveredLocations } from '../../discovery/promotion'; +import { saveDiscoveryPayload } from '../../utils/payload-storage'; + +// GraphQL hashes - MUST match CLAUDE.md / dutchie/client.ts +const GET_ALL_CITIES_HASH = 'ae547a0466ace5a48f91e55bf6699eacd87e3a42841560f0c0eabed5a0a920e6'; +const CONSUMER_DISPENSARIES_HASH = '0a5bfa6ca1d64ae47bcccb7c8077c87147cbc4e6982c17ceec97a2a4948b311b'; + +interface StateWithCities { + name: string; + country: string; + cities: string[]; +} + +interface DiscoveredLocation { + id: string; + name: string; + slug: string; + cName?: string; + address?: string; + city?: string; + state?: string; + zip?: string; + latitude?: number; + longitude?: number; + offerPickup?: boolean; + offerDelivery?: boolean; + isRecreational?: boolean; + isMedical?: boolean; + phone?: string; + email?: string; + website?: string; + description?: string; + logoImage?: string; + bannerImage?: string; + chainSlug?: string; + enterpriseId?: string; + retailType?: string; + status?: string; + timezone?: string; + location?: { + ln1?: string; + ln2?: string; + city?: string; + state?: string; + zipcode?: string; + country?: string; + geometry?: { coordinates?: [number, number] }; + }; +} + +export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise { + const { pool, task, crawlRotator } = ctx; + const platform = task.platform || 'dutchie'; + + let browser: any = null; + + try { + console.log(`[StoreDiscoveryHTTP] Starting discovery for platform: ${platform}`); + + // ============================================================ + // STEP 1: Setup Puppeteer with proxy + // ============================================================ + const puppeteer = require('puppeteer-extra'); + const StealthPlugin = require('puppeteer-extra-plugin-stealth'); + puppeteer.use(StealthPlugin()); + + // Get proxy from CrawlRotator if available + let proxyUrl: string | null = null; + if (crawlRotator) { + const currentProxy = crawlRotator.proxy.getCurrent(); + if (currentProxy) { + proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy); + console.log(`[StoreDiscoveryHTTP] Using proxy: ${currentProxy.host}:${currentProxy.port}`); + } + } + + // Build browser args + const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox']; + if (proxyUrl) { + const proxyUrlParsed = new URL(proxyUrl); + browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`); + } + + browser = await puppeteer.launch({ + headless: 'new', + args: browserArgs, + }); + + const page = await browser.newPage(); + + // Setup proxy auth if needed + if (proxyUrl) { + const proxyUrlParsed = new URL(proxyUrl); + if (proxyUrlParsed.username && proxyUrlParsed.password) { + await page.authenticate({ + username: decodeURIComponent(proxyUrlParsed.username), + password: decodeURIComponent(proxyUrlParsed.password), + }); + } + } + + await ctx.heartbeat(); + + // ============================================================ + // STEP 2: Establish session by visiting dispensaries page + // ============================================================ + const sessionUrl = 'https://dutchie.com/dispensaries'; + console.log(`[StoreDiscoveryHTTP] Establishing session at ${sessionUrl}...`); + + await page.goto(sessionUrl, { + waitUntil: 'networkidle2', + timeout: 60000, + }); + + // Handle potential age gate + try { + await page.waitForTimeout(1500); + await page.evaluate(() => { + const buttons = Array.from(document.querySelectorAll('button')); + for (const btn of buttons) { + const text = btn.textContent?.toLowerCase() || ''; + if (text.includes('yes') || text.includes('enter') || text.includes('21')) { + (btn as HTMLButtonElement).click(); + return true; + } + } + return false; + }); + } catch { + // Age gate might not be present + } + + console.log(`[StoreDiscoveryHTTP] Session established`); + + await ctx.heartbeat(); + + // ============================================================ + // STEP 3: Get states to discover from database + // ============================================================ + const statesResult = await pool.query(` + SELECT code FROM states WHERE is_active = true ORDER BY code + `); + const stateCodesToDiscover = statesResult.rows.map((r: { code: string }) => r.code); + + if (stateCodesToDiscover.length === 0) { + await browser.close(); + return { success: true, storesDiscovered: 0, newStoreIds: [], message: 'No active states to discover' }; + } + + console.log(`[StoreDiscoveryHTTP] Will discover stores in ${stateCodesToDiscover.length} states`); + + // ============================================================ + // STEP 4: Fetch cities for each state via GraphQL + // ============================================================ + const statesWithCities = await page.evaluate(async (hash: string) => { + const logs: string[] = []; + try { + const extensions = { + persistedQuery: { version: 1, sha256Hash: hash }, + }; + const qs = new URLSearchParams({ + operationName: 'getAllCitiesByState', + variables: JSON.stringify({}), + extensions: JSON.stringify(extensions), + }); + const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`; + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Accept': 'application/json', + 'content-type': 'application/json', + }, + credentials: 'include', + }); + + logs.push(`getAllCitiesByState: HTTP ${response.status}`); + + if (!response.ok) { + return { states: [], logs }; + } + + const json = await response.json(); + const statesData = json?.data?.statesWithDispensaries || []; + + const states: StateWithCities[] = []; + for (const state of statesData) { + if (state && state.name) { + const cities = Array.isArray(state.cities) + ? state.cities.filter((c: string | null) => c !== null) + : []; + states.push({ + name: state.name, + country: state.country || 'US', + cities, + }); + } + } + + logs.push(`Found ${states.length} states with cities`); + return { states, logs }; + } catch (err: any) { + logs.push(`Error: ${err.message}`); + return { states: [], logs }; + } + }, GET_ALL_CITIES_HASH); + + statesWithCities.logs.forEach((log: string) => console.log(`[Browser] ${log}`)); + + if (statesWithCities.states.length === 0) { + await browser.close(); + return { success: false, error: 'Failed to fetch states with cities' }; + } + + await ctx.heartbeat(); + + // ============================================================ + // STEP 5: For each active state, fetch stores for each city + // ============================================================ + let totalDiscovered = 0; + let totalUpserted = 0; + const allNewStoreIds: number[] = []; + + for (const stateCode of stateCodesToDiscover) { + const stateData = statesWithCities.states.find( + (s: StateWithCities) => s.name.toUpperCase() === stateCode.toUpperCase() + ); + + if (!stateData || stateData.cities.length === 0) { + console.log(`[StoreDiscoveryHTTP] No cities found for ${stateCode}, skipping`); + continue; + } + + console.log(`[StoreDiscoveryHTTP] Discovering ${stateData.cities.length} cities in ${stateCode}...`); + + await ctx.heartbeat(); + + // Accumulate raw store data for this state + const stateRawStores: any[] = []; + const stateCityData: { city: string; stores: any[] }[] = []; + + // Fetch stores for each city in this state + for (const city of stateData.cities) { + try { + const cityResult = await page.evaluate(async ( + cityName: string, + stateCodeParam: string, + hash: string + ) => { + const logs: string[] = []; + const allDispensaries: any[] = []; + let page = 0; + const perPage = 200; + + try { + while (page < 5) { // Max 5 pages per city + const variables = { + dispensaryFilter: { + activeOnly: true, + city: cityName, + state: stateCodeParam, + }, + page, + perPage, + }; + + const extensions = { + persistedQuery: { version: 1, sha256Hash: hash }, + }; + + const qs = new URLSearchParams({ + operationName: 'ConsumerDispensaries', + variables: JSON.stringify(variables), + extensions: JSON.stringify(extensions), + }); + const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`; + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Accept': 'application/json', + 'content-type': 'application/json', + }, + credentials: 'include', + }); + + if (!response.ok) { + logs.push(`${cityName}: HTTP ${response.status}`); + break; + } + + const json = await response.json(); + const dispensaries = json?.data?.filteredDispensaries || []; + + if (dispensaries.length === 0) { + break; + } + + // Filter to ensure correct state + const stateFiltered = dispensaries.filter((d: any) => + d.location?.state?.toUpperCase() === stateCodeParam.toUpperCase() + ); + allDispensaries.push(...stateFiltered); + + if (dispensaries.length < perPage) { + break; + } + page++; + + // Small delay between pages + await new Promise(r => setTimeout(r, 100)); + } + + logs.push(`${cityName}: ${allDispensaries.length} stores`); + } catch (err: any) { + logs.push(`${cityName}: Error - ${err.message}`); + } + + return { dispensaries: allDispensaries, logs }; + }, city, stateCode, CONSUMER_DISPENSARIES_HASH); + + cityResult.logs.forEach((log: string) => console.log(`[Browser] ${log}`)); + + // Accumulate raw store data + stateRawStores.push(...cityResult.dispensaries); + stateCityData.push({ city, stores: cityResult.dispensaries }); + + // Upsert each discovered location + for (const disp of cityResult.dispensaries) { + try { + const location = normalizeLocation(disp); + if (!location.id) { + continue; // Skip locations without platform ID + } + + const result = await upsertLocation(pool, location as any, null); + if (result) { + totalUpserted++; + if (result.isNew) { + totalDiscovered++; + } + } + } catch (err: any) { + console.error(`[StoreDiscoveryHTTP] Upsert error for ${disp.name}:`, err.message); + } + } + + // Small delay between cities to avoid rate limiting + await new Promise(r => setTimeout(r, 300)); + } catch (err: any) { + console.error(`[StoreDiscoveryHTTP] Error fetching ${city}, ${stateCode}:`, err.message); + } + } + + // Heartbeat after each state + await ctx.heartbeat(); + + // ============================================================ + // STEP 5b: Save raw store payload for this state + // ============================================================ + if (stateRawStores.length > 0) { + try { + const rawPayload = { + stateCode, + platform, + fetchedAt: new Date().toISOString(), + storeCount: stateRawStores.length, + citiesProcessed: stateCityData.length, + cities: stateCityData, + stores: stateRawStores, + }; + + const payloadResult = await saveDiscoveryPayload(pool, stateCode, rawPayload, stateRawStores.length); + console.log(`[StoreDiscoveryHTTP] Saved raw payload for ${stateCode}: ${stateRawStores.length} stores (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`); + } catch (err: any) { + console.error(`[StoreDiscoveryHTTP] Failed to save payload for ${stateCode}:`, err.message); + } + } + + // Auto-promote valid locations for this state + try { + const promotionResult = await promoteDiscoveredLocations(stateCode); + const promoted = promotionResult.created + promotionResult.updated; + if (promoted > 0) { + console.log(`[StoreDiscoveryHTTP] Promoted ${promoted} locations in ${stateCode} (${promotionResult.created} new, ${promotionResult.updated} updated)`); + // newDispensaryIds is returned but not in typed interface + const newIds = (promotionResult as any).newDispensaryIds || []; + allNewStoreIds.push(...newIds); + } + } catch (err: any) { + console.error(`[StoreDiscoveryHTTP] Promotion error for ${stateCode}:`, err.message); + } + } + + await browser.close(); + browser = null; + + console.log(`[StoreDiscoveryHTTP] Complete: ${totalDiscovered} new, ${totalUpserted} upserted, ${allNewStoreIds.length} promoted`); + + return { + success: true, + storesDiscovered: totalDiscovered, + storesUpserted: totalUpserted, + statesProcessed: stateCodesToDiscover.length, + newStoreIds: allNewStoreIds, + }; + + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + console.error(`[StoreDiscoveryHTTP] Error:`, errorMessage); + return { + success: false, + error: errorMessage, + newStoreIds: [], + }; + } finally { + if (browser) { + await browser.close().catch(() => {}); + } + } +} + +/** + * Normalize a raw dispensary response to our DiscoveredLocation format + */ +function normalizeLocation(raw: any): DiscoveredLocation { + const loc = raw.location || {}; + const coords = loc.geometry?.coordinates || []; + + return { + id: raw.id || raw._id || '', + name: raw.name || '', + slug: raw.slug || raw.cName || '', + cName: raw.cName || raw.slug || '', + address: raw.address || loc.ln1 || '', + city: raw.city || loc.city || '', + state: raw.state || loc.state || '', + zip: raw.zip || loc.zipcode || loc.zip || '', + latitude: coords[1] || raw.latitude, + longitude: coords[0] || raw.longitude, + timezone: raw.timezone || '', + offerPickup: raw.offerPickup ?? raw.storeSettings?.offerPickup ?? true, + offerDelivery: raw.offerDelivery ?? raw.storeSettings?.offerDelivery ?? false, + isRecreational: raw.isRecreational ?? raw.recDispensary ?? true, + isMedical: raw.isMedical ?? raw.medicalDispensary ?? true, + phone: raw.phone || '', + email: raw.email || '', + website: raw.embedBackUrl || '', + description: raw.description || '', + logoImage: raw.logoImage || '', + bannerImage: raw.bannerImage || '', + chainSlug: raw.chain || '', + enterpriseId: raw.retailer?.enterpriseId || '', + retailType: raw.retailType || '', + status: raw.status || '', + location: loc, + }; +} diff --git a/backend/src/tasks/task-service.ts b/backend/src/tasks/task-service.ts index 49c76823..148206c4 100644 --- a/backend/src/tasks/task-service.ts +++ b/backend/src/tasks/task-service.ts @@ -476,15 +476,17 @@ class TaskService { case 'store_discovery': { // Per TASK_WORKFLOW_2024-12-10.md: New stores discovered -> create product_discovery tasks // Skip entry_point_discovery since platform_dispensary_id is set during promotion + // All product_discovery tasks use HTTP transport (Puppeteer/browser) const newStoreIds = (completedTask.result as { newStoreIds?: number[] })?.newStoreIds; if (newStoreIds && newStoreIds.length > 0) { - console.log(`[TaskService] Chaining ${newStoreIds.length} product_discovery tasks for new stores`); + console.log(`[TaskService] Chaining ${newStoreIds.length} product_discovery tasks for new stores (HTTP transport)`); for (const storeId of newStoreIds) { await this.createTask({ role: 'product_discovery', dispensary_id: storeId, platform: completedTask.platform ?? undefined, priority: 10, // High priority for new stores + method: 'http', // Force HTTP transport for browser-based scraping }); } } @@ -501,6 +503,7 @@ class TaskService { dispensary_id: completedTask.dispensary_id, platform: completedTask.platform ?? undefined, priority: 10, + method: 'http', // Force HTTP transport }); } break; @@ -525,6 +528,7 @@ class TaskService { /** * Create store discovery task for a platform/state + * Uses HTTP transport (Puppeteer/browser) by default */ async createStoreDiscoveryTask( platform: string, @@ -535,11 +539,13 @@ class TaskService { role: 'store_discovery', platform, priority, + method: 'http', // Force HTTP transport }); } /** * Create entry point discovery task for a specific store + * @deprecated Entry point resolution now happens during store promotion */ async createEntryPointTask( dispensaryId: number, @@ -551,11 +557,13 @@ class TaskService { dispensary_id: dispensaryId, platform, priority, + method: 'http', // Force HTTP transport }); } /** * Create product discovery task for a specific store + * Uses HTTP transport (Puppeteer/browser) by default */ async createProductDiscoveryTask( dispensaryId: number, @@ -567,6 +575,7 @@ class TaskService { dispensary_id: dispensaryId, platform, priority, + method: 'http', // Force HTTP transport }); } @@ -819,6 +828,73 @@ class TaskService { }; } + /** + * Cleanup stale tasks that are stuck in 'claimed' or 'running' status. + * + * This handles the case where workers crash/restart and leave tasks in-flight. + * These stale tasks block the queue because the claim query excludes dispensary_ids + * that have active tasks. + * + * Called automatically on worker startup and can be called periodically. + * + * @param staleMinutes - Tasks older than this (based on last_heartbeat_at or claimed_at) are reset + * @returns Object with cleanup stats + */ + async cleanupStaleTasks(staleMinutes: number = 30): Promise<{ + cleaned: number; + byStatus: { claimed: number; running: number }; + byRole: Record; + }> { + // First, get stats on what we're about to clean + const statsResult = await pool.query(` + SELECT status, role, COUNT(*)::int as count + FROM worker_tasks + WHERE status IN ('claimed', 'running') + AND COALESCE(last_heartbeat_at, claimed_at, created_at) < NOW() - INTERVAL '1 minute' * $1 + GROUP BY status, role + `, [staleMinutes]); + + const byStatus = { claimed: 0, running: 0 }; + const byRole: Record = {}; + + for (const row of statsResult.rows) { + const { status, role, count } = row as { status: string; role: string; count: number }; + if (status === 'claimed') byStatus.claimed += count; + if (status === 'running') byStatus.running += count; + byRole[role] = (byRole[role] || 0) + count; + } + + const totalStale = byStatus.claimed + byStatus.running; + + if (totalStale === 0) { + return { cleaned: 0, byStatus, byRole }; + } + + // Reset stale tasks to pending + const result = await pool.query(` + UPDATE worker_tasks + SET + status = 'pending', + worker_id = NULL, + claimed_at = NULL, + started_at = NULL, + last_heartbeat_at = NULL, + error_message = CONCAT(COALESCE(error_message, ''), ' [Auto-reset: stale after ', $1, ' min]'), + updated_at = NOW() + WHERE status IN ('claimed', 'running') + AND COALESCE(last_heartbeat_at, claimed_at, created_at) < NOW() - INTERVAL '1 minute' * $1 + `, [staleMinutes]); + + const cleaned = result.rowCount ?? 0; + + if (cleaned > 0) { + console.log(`[TaskService] Cleaned up ${cleaned} stale tasks (claimed: ${byStatus.claimed}, running: ${byStatus.running})`); + console.log(`[TaskService] Stale tasks by role: ${Object.entries(byRole).map(([r, c]) => `${r}:${c}`).join(', ')}`); + } + + return { cleaned, byStatus, byRole }; + } + /** * Calculate workers needed to complete tasks within SLA */ diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 0c7271f5..1a8062e6 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -75,6 +75,7 @@ import { handleProductRefresh } from './handlers/product-refresh'; import { handleProductDiscovery } from './handlers/product-discovery-curl'; import { handleProductDiscoveryHttp } from './handlers/product-discovery-http'; import { handleStoreDiscovery } from './handlers/store-discovery'; +import { handleStoreDiscoveryHttp } from './handlers/store-discovery-http'; import { handleEntryPointDiscovery } from './handlers/entry-point-discovery'; import { handleAnalyticsRefresh } from './handlers/analytics-refresh'; import { handleWhoami } from './handlers/whoami'; @@ -160,21 +161,30 @@ const TASK_HANDLERS: Record = { /** * Get the appropriate handler for a task, considering both role and method. * - * For product_discovery: - * - method='http' -> handleProductDiscoveryHttp (browser-based, for Evomi proxies) - * - method='curl' or unspecified -> handleProductDiscovery (curl-based) + * Dual-transport handlers: + * - product_discovery: curl (axios) or http (Puppeteer) + * - store_discovery: curl (axios) or http (Puppeteer) + * + * Default method is 'http' since all GraphQL queries should use browser transport + * for better TLS fingerprinting and session-based proxy compatibility. */ function getHandlerForTask(task: WorkerTask): TaskHandler | undefined { const role = task.role as TaskRole; - const method = task.method || 'curl'; + const method = task.method || 'http'; // Default to HTTP for all GraphQL tasks - // Special handling for product_discovery with method='http' + // product_discovery: dual-transport support if (role === 'product_discovery' && method === 'http') { console.log(`[TaskWorker] Using HTTP handler for product_discovery (method=${method})`); return handleProductDiscoveryHttp; } - // Default: use the static handler registry + // store_discovery: dual-transport support + if (role === 'store_discovery' && method === 'http') { + console.log(`[TaskWorker] Using HTTP handler for store_discovery (method=${method})`); + return handleStoreDiscoveryHttp; + } + + // Default: use the static handler registry (curl-based) return TASK_HANDLERS[role]; } @@ -719,6 +729,20 @@ export class TaskWorker { // Start registry heartbeat immediately this.startRegistryHeartbeat(); + // Cleanup stale tasks on startup (only worker-0 does this to avoid races) + // This handles tasks left in 'claimed'/'running' status when workers restart + if (this.workerId.endsWith('-0') || this.workerId === 'scraper-worker-0') { + try { + console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`); + const cleanupResult = await taskService.cleanupStaleTasks(30); // 30 minute threshold + if (cleanupResult.cleaned > 0) { + console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`); + } + } catch (err: any) { + console.error(`[TaskWorker] Stale task cleanup error:`, err.message); + } + } + const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)'; console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (stealth=lazy, max ${this.maxConcurrentTasks} concurrent tasks)`); diff --git a/backend/src/utils/payload-storage.ts b/backend/src/utils/payload-storage.ts index cdbede42..b30254ee 100644 --- a/backend/src/utils/payload-storage.ts +++ b/backend/src/utils/payload-storage.ts @@ -366,6 +366,141 @@ export async function listPayloadMetadata( })); } +/** + * Result from saving a discovery payload + */ +export interface SaveDiscoveryPayloadResult { + id: number; + storagePath: string; + sizeBytes: number; + sizeBytesRaw: number; + checksum: string; +} + +/** + * Generate storage path for a discovery payload + * + * Format: /storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz + */ +function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): string { + const year = timestamp.getFullYear(); + const month = String(timestamp.getMonth() + 1).padStart(2, '0'); + const day = String(timestamp.getDate()).padStart(2, '0'); + const ts = timestamp.getTime(); + + return path.join( + PAYLOAD_BASE_PATH, + 'discovery', + String(year), + month, + day, + `state_${stateCode.toLowerCase()}_${ts}.json.gz` + ); +} + +/** + * Save a raw store discovery payload to filesystem and record metadata in DB + * + * @param pool - Database connection pool + * @param stateCode - State code (e.g., 'AZ', 'MI') + * @param payload - Raw JSON payload from discovery GraphQL + * @param storeCount - Number of stores in payload + * @returns SaveDiscoveryPayloadResult with file info and DB record ID + */ +export async function saveDiscoveryPayload( + pool: Pool, + stateCode: string, + payload: any, + storeCount: number = 0 +): Promise { + const timestamp = new Date(); + const storagePath = generateDiscoveryStoragePath(stateCode, timestamp); + + // Serialize and compress + const jsonStr = JSON.stringify(payload); + const rawSize = Buffer.byteLength(jsonStr, 'utf8'); + const compressed = await gzip(Buffer.from(jsonStr, 'utf8')); + const compressedSize = compressed.length; + const checksum = calculateChecksum(compressed); + + // Write to filesystem + await ensureDir(storagePath); + await fs.promises.writeFile(storagePath, compressed); + + // Record metadata in DB + const result = await pool.query(` + INSERT INTO raw_crawl_payloads ( + payload_type, + state_code, + storage_path, + store_count, + size_bytes, + size_bytes_raw, + fetched_at, + checksum_sha256 + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + RETURNING id + `, [ + 'store_discovery', + stateCode.toUpperCase(), + storagePath, + storeCount, + compressedSize, + rawSize, + timestamp, + checksum + ]); + + console.log(`[PayloadStorage] Saved discovery payload for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`); + + return { + id: result.rows[0].id, + storagePath, + sizeBytes: compressedSize, + sizeBytesRaw: rawSize, + checksum + }; +} + +/** + * Get the latest discovery payload for a state + * + * @param pool - Database connection pool + * @param stateCode - State code (e.g., 'AZ', 'MI') + * @returns Parsed payload and metadata, or null if none exists + */ +export async function getLatestDiscoveryPayload( + pool: Pool, + stateCode: string +): Promise<{ payload: any; metadata: any } | null> { + const result = await pool.query(` + SELECT id, state_code, storage_path, store_count, fetched_at + FROM raw_crawl_payloads + WHERE payload_type = 'store_discovery' + AND state_code = $1 + ORDER BY fetched_at DESC + LIMIT 1 + `, [stateCode.toUpperCase()]); + + if (result.rows.length === 0) { + return null; + } + + const row = result.rows[0]; + const payload = await loadPayloadFromPath(row.storage_path); + + return { + payload, + metadata: { + id: row.id, + stateCode: row.state_code, + storeCount: row.store_count, + fetchedAt: row.fetched_at, + storagePath: row.storage_path + } + }; +} + /** * Delete old payloads (for retention policy) * diff --git a/cannaiq/src/lib/api.ts b/cannaiq/src/lib/api.ts index 5ed66c28..1f6ba742 100755 --- a/cannaiq/src/lib/api.ts +++ b/cannaiq/src/lib/api.ts @@ -3075,6 +3075,8 @@ export interface TaskSchedule { priority: number; state_code: string | null; platform: string | null; + method: 'curl' | 'http' | null; + is_immutable: boolean; last_run_at: string | null; next_run_at: string | null; last_task_count: number; diff --git a/cannaiq/src/pages/TasksDashboard.tsx b/cannaiq/src/pages/TasksDashboard.tsx index 59022023..e882462f 100644 --- a/cannaiq/src/pages/TasksDashboard.tsx +++ b/cannaiq/src/pages/TasksDashboard.tsx @@ -25,6 +25,8 @@ import { Play, Pause, Timer, + Lock, + Globe, } from 'lucide-react'; interface Task { @@ -385,9 +387,7 @@ const ROLES = [ 'store_discovery', 'entry_point_discovery', 'product_discovery', - 'product_refresh', 'analytics_refresh', - 'payload_fetch', ]; // ============================================================ @@ -414,6 +414,7 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo const [error, setError] = useState(null); const isNew = !schedule; + const isImmutable = schedule?.is_immutable ?? false; useEffect(() => { if (schedule) { @@ -440,7 +441,7 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo }, [schedule, isOpen]); const handleSubmit = async () => { - if (!name.trim()) { + if (!isImmutable && !name.trim()) { setError('Name is required'); return; } @@ -449,16 +450,23 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo setError(null); try { - const data = { - name: name.trim(), - role, - description: description.trim() || undefined, - enabled, - interval_hours: intervalHours, - priority, - state_code: stateCode.trim() || undefined, - platform: platform.trim() || undefined, - }; + // For immutable schedules, only send allowed fields + const data = isImmutable + ? { + enabled, + interval_hours: intervalHours, + priority, + } + : { + name: name.trim(), + role, + description: description.trim() || undefined, + enabled, + interval_hours: intervalHours, + priority, + state_code: stateCode.trim() || undefined, + platform: platform.trim() || undefined, + }; if (isNew) { await api.createTaskSchedule(data as any); @@ -498,6 +506,15 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo )} + {isImmutable && ( +
+ +
+ Immutable schedule. Only Enabled, Interval, and Priority can be modified. +
+
+ )} +
setName(e.target.value)} placeholder="e.g., product_refresh_all" - className="w-full px-3 py-2 border border-gray-200 rounded-lg" + disabled={isImmutable} + className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${ + isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : '' + }`} />
@@ -514,7 +534,10 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo