From 8b3ae4008972be4b29b0a6033ff988504e400466 Mon Sep 17 00:00:00 2001 From: Kelly Date: Sat, 13 Dec 2025 13:23:35 -0700 Subject: [PATCH] feat: Remove Run Now, add source tracking, optimize dashboard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove /run-now endpoint (use task priority instead) - Add source tracking to worker_tasks (source, source_schedule_id, source_metadata) - Parallelize dashboard API calls (Promise.all) - Add 1-5 min caching to /markets/dashboard and /national/summary - Add performance indexes for dashboard queries Migrations: - 104: Task source tracking columns - 105: Dashboard performance indexes 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- CLAUDE.md | 59 +++-- .../migrations/104_task_source_tracking.sql | 25 ++ .../105_dashboard_performance_indexes.sql | 25 ++ backend/src/discovery/promotion.ts | 15 +- backend/src/routes/analytics.ts | 41 ++- backend/src/routes/markets.ts | 19 +- backend/src/routes/tasks.ts | 237 ++++-------------- backend/src/services/task-scheduler.ts | 14 +- backend/src/tasks/task-service.ts | 175 ++++--------- cannaiq/src/lib/api.ts | 8 +- cannaiq/src/pages/CrawlSchedulePage.tsx | 24 -- cannaiq/src/pages/Dashboard.tsx | 49 ++-- cannaiq/src/pages/TasksDashboard.tsx | 40 +-- 13 files changed, 271 insertions(+), 460 deletions(-) create mode 100644 backend/migrations/104_task_source_tracking.sql create mode 100644 backend/migrations/105_dashboard_performance_indexes.sql diff --git a/CLAUDE.md b/CLAUDE.md index ebff9867..14cb20b0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,5 +1,8 @@ # Claude Guidelines for CannaiQ +## CURRENT ENVIRONMENT: PRODUCTION +**We are working in PRODUCTION only.** All database queries and API calls should target the remote production environment, not localhost. Use kubectl port-forward or remote DB connections as needed. + ## PERMANENT RULES (NEVER VIOLATE) ### 1. NO DELETE @@ -247,14 +250,14 @@ These binaries mimic real browser TLS fingerprints to avoid detection. --- -## Staggered Task Workflow (Added 2025-12-12) +## Bulk Task Workflow (Updated 2025-12-13) ### Overview -When creating many tasks at once (e.g., product refresh for all AZ stores), staggered scheduling prevents resource contention, proxy assignment lag, and API rate limiting. +Tasks are created with `scheduled_for = NOW()` by default. Worker-level controls handle pacing - no task-level staggering needed. ### How It Works ``` -1. Task created with scheduled_for = NOW() + (index * stagger_seconds) +1. Task created with scheduled_for = NOW() 2. Worker claims task only when scheduled_for <= NOW() 3. Worker runs preflight on EVERY task claim (proxy health check) 4. If preflight passes, worker executes task @@ -263,57 +266,51 @@ When creating many tasks at once (e.g., product refresh for all AZ stores), stag 7. Repeat - preflight runs on each new task claim ``` +### Worker-Level Throttling +These controls pace task execution - no staggering at task creation time: + +| Control | Purpose | +|---------|---------| +| `MAX_CONCURRENT_TASKS` | Limits concurrent tasks per pod (default: 3) | +| Working hours | Restricts when tasks run (configurable per schedule) | +| Preflight checks | Ensures proxy health before each task | +| Per-store locking | Only one active task per dispensary | + ### Key Points - **Preflight is per-task, not per-startup**: Each task claim triggers a new preflight check -- **Stagger prevents thundering herd**: 15 seconds between tasks is default -- **Task assignment is the trigger**: Worker picks up task → runs preflight → executes if passed +- **Worker controls pacing**: Tasks scheduled for NOW() but claimed based on worker capacity +- **Optional staggering**: Pass `stagger_seconds > 0` if you need explicit delays ### API Endpoints ```bash -# Create staggered tasks for specific dispensary IDs +# Create bulk tasks for specific dispensary IDs POST /api/tasks/batch/staggered { "dispensary_ids": [1, 2, 3, 4], "role": "product_refresh", # or "product_discovery" - "stagger_seconds": 15, # default: 15 + "stagger_seconds": 0, # default: 0 (all NOW) "platform": "dutchie", # default: "dutchie" "method": null # "curl" | "http" | null } -# Create staggered tasks for AZ stores (convenience endpoint) -POST /api/tasks/batch/az-stores +# Create bulk tasks for all stores in a state +POST /api/tasks/crawl-state/:stateCode { - "total_tasks": 24, # default: 24 - "stagger_seconds": 15, # default: 15 - "split_roles": true # default: true (12 refresh, 12 discovery) + "stagger_seconds": 0, # default: 0 (all NOW) + "method": "http" # default: "http" } ``` -### Example: 24 Tasks for AZ Stores +### Example: Tasks for AZ Stores ```bash -curl -X POST http://localhost:3010/api/tasks/batch/az-stores \ - -H "Content-Type: application/json" \ - -d '{"total_tasks": 24, "stagger_seconds": 15, "split_roles": true}' -``` - -Response: -```json -{ - "success": true, - "total": 24, - "product_refresh": 12, - "product_discovery": 12, - "stagger_seconds": 15, - "total_duration_seconds": 345, - "estimated_completion": "2025-12-12T08:40:00.000Z", - "message": "Created 24 staggered tasks for AZ stores (12 refresh, 12 discovery)" -} +curl -X POST http://localhost:3010/api/tasks/crawl-state/AZ \ + -H "Content-Type: application/json" ``` ### Related Files | File | Purpose | |------|---------| -| `src/tasks/task-service.ts` | `createStaggeredTasks()` and `createAZStoreTasks()` methods | +| `src/tasks/task-service.ts` | `createStaggeredTasks()` method | | `src/routes/tasks.ts` | API endpoints for batch task creation | | `src/tasks/task-worker.ts` | Worker task claiming and preflight logic | diff --git a/backend/migrations/104_task_source_tracking.sql b/backend/migrations/104_task_source_tracking.sql new file mode 100644 index 00000000..107f2aeb --- /dev/null +++ b/backend/migrations/104_task_source_tracking.sql @@ -0,0 +1,25 @@ +-- Migration 104: Add source tracking to worker_tasks +-- Purpose: Track WHERE tasks are created from (schedule vs API endpoint) +-- +-- All automated task creation should be visible in task_schedules. +-- This column helps identify "phantom" tasks created outside the schedule system. + +-- Add source column to worker_tasks +ALTER TABLE worker_tasks +ADD COLUMN IF NOT EXISTS source VARCHAR(100); + +-- Add source_id column (references schedule_id if from a schedule) +ALTER TABLE worker_tasks +ADD COLUMN IF NOT EXISTS source_schedule_id INTEGER REFERENCES task_schedules(id); + +-- Add request metadata (IP, user agent) for debugging +ALTER TABLE worker_tasks +ADD COLUMN IF NOT EXISTS source_metadata JSONB; + +-- Create index for querying by source +CREATE INDEX IF NOT EXISTS idx_worker_tasks_source ON worker_tasks(source); + +-- Comment explaining source values +COMMENT ON COLUMN worker_tasks.source IS 'Task creation source: schedule, api_run_now, api_crawl_state, api_batch_staggered, api_batch_az_stores, task_chain, manual'; +COMMENT ON COLUMN worker_tasks.source_schedule_id IS 'ID of the schedule that created this task (if source=schedule or source=api_run_now)'; +COMMENT ON COLUMN worker_tasks.source_metadata IS 'Request metadata: {ip, user_agent, endpoint, timestamp}'; diff --git a/backend/migrations/105_dashboard_performance_indexes.sql b/backend/migrations/105_dashboard_performance_indexes.sql new file mode 100644 index 00000000..8a9fb0f3 --- /dev/null +++ b/backend/migrations/105_dashboard_performance_indexes.sql @@ -0,0 +1,25 @@ +-- Migration 105: Add indexes for dashboard performance +-- Purpose: Speed up the /dashboard and /national/summary endpoints +-- +-- These queries were identified as slow: +-- 1. COUNT(*) FROM store_product_snapshots WHERE captured_at >= NOW() - INTERVAL '24 hours' +-- 2. National summary aggregate queries + +-- Index for snapshot counts by time (used in dashboard) +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_store_product_snapshots_captured_at +ON store_product_snapshots(captured_at DESC); + +-- Index for crawl traces by time and success (used in dashboard) +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_crawl_traces_started_success +ON crawl_orchestration_traces(started_at DESC, success); + +-- Partial index for recent failed crawls (faster for dashboard alerts) +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_crawl_traces_recent_failures +ON crawl_orchestration_traces(started_at DESC) +WHERE success = false; + +-- Composite index for store_products aggregations by dispensary +-- Helps with national summary state metrics query +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_store_products_dispensary_brand +ON store_products(dispensary_id, brand_name_raw) +WHERE brand_name_raw IS NOT NULL; diff --git a/backend/src/discovery/promotion.ts b/backend/src/discovery/promotion.ts index b5f895eb..ed3cb635 100644 --- a/backend/src/discovery/promotion.ts +++ b/backend/src/discovery/promotion.ts @@ -151,6 +151,19 @@ function generateSlug(name: string, city: string, state: string): string { return base; } +/** + * Derive menu_type from platform_menu_url pattern + */ +function deriveMenuType(url: string | null): string { + if (!url) return 'unknown'; + if (url.includes('/dispensary/')) return 'standalone'; + if (url.includes('/embedded-menu/')) return 'embedded'; + if (url.includes('/stores/')) return 'standalone'; + // Custom domain = embedded widget on store's site + if (!url.includes('dutchie.com')) return 'embedded'; + return 'unknown'; +} + /** * Log a promotion action to dutchie_promotion_log */ @@ -399,7 +412,7 @@ async function promoteLocation( loc.timezone, // $15 timezone loc.platform_location_id, // $16 platform_dispensary_id loc.platform_menu_url, // $17 menu_url - 'dutchie', // $18 menu_type + deriveMenuType(loc.platform_menu_url), // $18 menu_type loc.description, // $19 description loc.logo_image, // $20 logo_image loc.banner_image, // $21 banner_image diff --git a/backend/src/routes/analytics.ts b/backend/src/routes/analytics.ts index df67d943..03fe02ae 100755 --- a/backend/src/routes/analytics.ts +++ b/backend/src/routes/analytics.ts @@ -5,6 +5,29 @@ import { pool } from '../db/pool'; const router = Router(); router.use(authMiddleware); +// In-memory cache for expensive queries +interface CacheEntry { + data: any; + expiresAt: number; +} +const cache: Map = new Map(); + +function getCached(key: string): T | null { + const entry = cache.get(key); + if (entry && entry.expiresAt > Date.now()) { + return entry.data as T; + } + cache.delete(key); + return null; +} + +function setCache(key: string, data: any, ttlSeconds: number): void { + cache.set(key, { + data, + expiresAt: Date.now() + ttlSeconds * 1000, + }); +} + // Get analytics overview router.get('/overview', async (req, res) => { try { @@ -96,10 +119,17 @@ router.get('/products/:id', async (req, res) => { /** * GET /api/analytics/national/summary * National dashboard summary with state-by-state metrics - * OPTIMIZED: Uses approximate counts and single query for state metrics + * OPTIMIZED: Cached for 5 minutes, uses approximate counts */ router.get('/national/summary', async (req, res) => { try { + // Check cache first (5 minute TTL) + const CACHE_KEY = 'national_summary'; + const cached = getCached(CACHE_KEY); + if (cached) { + return res.json(cached); + } + // Single optimized query for all state metrics const { rows: stateMetrics } = await pool.query(` SELECT @@ -144,7 +174,7 @@ router.get('/national/summary', async (req, res) => { ) b `); - res.json({ + const response = { success: true, data: { totalStates: stateMetrics.length, @@ -165,7 +195,12 @@ router.get('/national/summary', async (req, res) => { onSpecialProducts: parseInt(s.on_special_products || '0'), })), }, - }); + }; + + // Cache for 5 minutes + setCache(CACHE_KEY, response, 300); + + res.json(response); } catch (error: any) { console.error('[Analytics] Error fetching national summary:', error.message); res.status(500).json({ success: false, error: error.message }); diff --git a/backend/src/routes/markets.ts b/backend/src/routes/markets.ts index 43c7729d..7b59ce79 100644 --- a/backend/src/routes/markets.ts +++ b/backend/src/routes/markets.ts @@ -11,13 +11,21 @@ import { pool } from '../db/pool'; const router = Router(); router.use(authMiddleware); +// In-memory cache for dashboard (1 minute TTL) +let dashboardCache: { data: any; expiresAt: number } | null = null; + /** * GET /api/markets/dashboard * Dashboard summary with counts for dispensaries, products, brands, etc. - * Optimized: Uses single query with approximate counts for large tables + * Optimized: Cached for 1 minute, uses approximate counts for large tables */ router.get('/dashboard', async (req: Request, res: Response) => { try { + // Check cache first (1 minute TTL) + if (dashboardCache && dashboardCache.expiresAt > Date.now()) { + return res.json(dashboardCache.data); + } + // Single optimized query for all counts const { rows } = await pool.query(` SELECT @@ -31,7 +39,7 @@ router.get('/dashboard', async (req: Request, res: Response) => { `); const r = rows[0]; - res.json({ + const data = { dispensaryCount: parseInt(r?.dispensary_count || '0', 10), productCount: parseInt(r?.product_count || '0', 10), brandCount: parseInt(r?.brand_count || '0', 10), @@ -39,7 +47,12 @@ router.get('/dashboard', async (req: Request, res: Response) => { snapshotCount24h: parseInt(r?.snapshot_count_24h || '0', 10), lastCrawlTime: r?.last_crawl || null, failedJobCount: parseInt(r?.failed_count || '0', 10), - }); + }; + + // Cache for 1 minute + dashboardCache = { data, expiresAt: Date.now() + 60 * 1000 }; + + res.json(data); } catch (error: any) { console.error('[Markets] Error fetching dashboard:', error.message); res.status(500).json({ error: error.message }); diff --git a/backend/src/routes/tasks.ts b/backend/src/routes/tasks.ts index 9b768a46..7e401ceb 100644 --- a/backend/src/routes/tasks.ts +++ b/backend/src/routes/tasks.ts @@ -16,9 +16,11 @@ * PUT /api/tasks/schedules/:id - Update schedule * DELETE /api/tasks/schedules/:id - Delete schedule * DELETE /api/tasks/schedules - Bulk delete schedules - * POST /api/tasks/schedules/:id/run-now - Trigger schedule immediately * POST /api/tasks/schedules/:id/toggle - Toggle schedule enabled/disabled * + * Note: "Run Now" was removed - use task priority instead. + * Higher priority tasks get picked up first (ORDER BY priority DESC). + * * Note: Schedule routes are defined BEFORE /:id to avoid route conflicts * (Express matches routes in order, and "schedules" would match /:id otherwise) */ @@ -29,7 +31,21 @@ import { TaskRole, TaskStatus, TaskFilter, + TaskSource, } from '../tasks/task-service'; + +/** + * Extract request metadata for source tracking + */ +function getRequestMetadata(req: Request): Record { + return { + ip: req.ip || req.socket?.remoteAddress || 'unknown', + userAgent: req.get('user-agent') || 'unknown', + endpoint: req.originalUrl, + method: req.method, + timestamp: new Date().toISOString(), + }; +} import { pool } from '../db/pool'; import { isTaskPoolPaused, @@ -524,146 +540,6 @@ router.delete('/schedules/:id', async (req: Request, res: Response) => { } }); -/** - * POST /api/tasks/schedules/:id/run-now - * Manually trigger a scheduled task to run immediately - * - * For product_discovery schedules with state_code, this creates individual - * tasks for each store in that state (fans out properly). - */ -router.post('/schedules/:id/run-now', async (req: Request, res: Response) => { - try { - const scheduleId = parseInt(req.params.id, 10); - - // Get the full schedule - const scheduleResult = await pool.query(` - SELECT id, name, role, state_code, dispensary_id, platform, priority, interval_hours, method - FROM task_schedules WHERE id = $1 - `, [scheduleId]); - - if (scheduleResult.rows.length === 0) { - return res.status(404).json({ error: 'Schedule not found' }); - } - - const schedule = scheduleResult.rows[0]; - let tasksCreated = 0; - - const isCrawlRole = ['product_discovery', 'product_refresh', 'payload_fetch'].includes(schedule.role); - - // Single-dispensary schedule (e.g., "Deeply Rooted Hourly") - if (isCrawlRole && schedule.dispensary_id) { - // Check if this specific store can be refreshed (no pending task) - const storeResult = await pool.query(` - SELECT d.id, d.name - FROM dispensaries d - WHERE d.id = $1 - AND d.crawl_enabled = true - AND d.platform_dispensary_id IS NOT NULL - AND NOT EXISTS ( - SELECT 1 FROM worker_tasks t - WHERE t.dispensary_id = d.id - AND t.role IN ('product_discovery', 'product_refresh', 'payload_fetch') - AND t.status IN ('pending', 'claimed', 'running') - ) - `, [schedule.dispensary_id]); - - if (storeResult.rows.length > 0) { - await taskService.createTask({ - role: 'product_discovery', - dispensary_id: schedule.dispensary_id, - platform: schedule.platform || 'dutchie', - priority: schedule.priority + 10, - method: schedule.method || 'http', - }); - tasksCreated = 1; - } else { - return res.json({ - success: true, - message: `Store ${schedule.dispensary_id} has a pending task or is disabled`, - tasksCreated: 0, - dispensaryId: schedule.dispensary_id, - }); - } - } - // Per-state schedule (e.g., "AZ Product Refresh") - else if (isCrawlRole && schedule.state_code) { - // Find stores in this state needing refresh - const storeResult = await pool.query(` - SELECT d.id - FROM dispensaries d - JOIN states s ON d.state_id = s.id - WHERE d.crawl_enabled = true - AND d.platform_dispensary_id IS NOT NULL - AND s.code = $1 - -- No pending/running crawl task already - AND NOT EXISTS ( - SELECT 1 FROM worker_tasks t - WHERE t.dispensary_id = d.id - AND t.role IN ('product_discovery', 'product_refresh', 'payload_fetch') - AND t.status IN ('pending', 'claimed', 'running') - ) - ORDER BY d.last_fetch_at NULLS FIRST, d.id - `, [schedule.state_code]); - - const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id); - - if (dispensaryIds.length > 0) { - // Create staggered tasks for all stores (always use product_discovery role) - const result = await taskService.createStaggeredTasks( - dispensaryIds, - 'product_discovery', // Normalize to product_discovery - 15, // 15 seconds stagger - schedule.platform || 'dutchie', - schedule.method || 'http' - ); - tasksCreated = result.created; - } else { - // No stores need refresh - return early with message - return res.json({ - success: true, - message: `No ${schedule.state_code} stores need refresh at this time`, - tasksCreated: 0, - stateCode: schedule.state_code, - }); - } - } else if (!isCrawlRole) { - // For other schedules (store_discovery, analytics_refresh), create a single task - await taskService.createTask({ - role: schedule.role, - platform: schedule.platform, - priority: schedule.priority + 10, - method: schedule.method, - }); - tasksCreated = 1; - } else { - // Crawl role without dispensary_id or state_code - reject - return res.status(400).json({ - error: `${schedule.role} schedules require a dispensary_id or state_code`, - }); - } - - // Update last_run_at on the schedule - await pool.query(` - UPDATE task_schedules - SET last_run_at = NOW(), - next_run_at = NOW() + (interval_hours || ' hours')::interval, - last_task_count = $2, - updated_at = NOW() - WHERE id = $1 - `, [scheduleId, tasksCreated]); - - res.json({ - success: true, - message: `Schedule "${schedule.name}" triggered`, - tasksCreated, - stateCode: schedule.state_code, - }); - } catch (error: unknown) { - console.error('Error running schedule:', error); - res.status(500).json({ error: 'Failed to run schedule' }); - } -}); - /** * POST /api/tasks/schedules/:id/toggle * Toggle a schedule's enabled status @@ -1275,10 +1151,15 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => { */ router.post('/batch/staggered', async (req: Request, res: Response) => { try { + const requestMetadata = getRequestMetadata(req); + + // Log the request for tracking phantom tasks + console.log(`[TaskAPI] POST /batch/staggered from ${requestMetadata.ip} (${requestMetadata.userAgent})`); + const { dispensary_ids, role, - stagger_seconds = 15, + stagger_seconds = 0, // Default to 0 (no stagger) - worker controls pacing platform = 'dutchie', method = null, } = req.body; @@ -1291,12 +1172,18 @@ router.post('/batch/staggered', async (req: Request, res: Response) => { return res.status(400).json({ error: 'role is required' }); } + console.log(`[TaskAPI] Creating ${dispensary_ids.length} ${role} tasks for dispensaries: ${dispensary_ids.slice(0, 5).join(',')}...`); + const result = await taskService.createStaggeredTasks( dispensary_ids, role as TaskRole, stagger_seconds, platform, - method + method, + { + source: 'api_batch_staggered', + source_metadata: requestMetadata, + } ); const totalDuration = (result.created - 1) * stagger_seconds; @@ -1320,49 +1207,6 @@ router.post('/batch/staggered', async (req: Request, res: Response) => { } }); -/** - * POST /api/tasks/batch/az-stores - * Convenience endpoint to create staggered tasks for Arizona stores - * - * Body: - * - total_tasks: number (default: 24) - Total tasks to create - * - stagger_seconds: number (default: 15) - Seconds between each task - * - split_roles: boolean (default: true) - Split between product_refresh and product_discovery - */ -router.post('/batch/az-stores', async (req: Request, res: Response) => { - try { - const { - total_tasks = 24, - stagger_seconds = 15, - split_roles = true, - } = req.body; - - const result = await taskService.createAZStoreTasks( - total_tasks, - stagger_seconds, - split_roles - ); - - const totalDuration = (result.total - 1) * stagger_seconds; - const estimatedEndTime = new Date(Date.now() + totalDuration * 1000); - - res.status(201).json({ - success: true, - total: result.total, - product_refresh: result.product_refresh, - product_discovery: result.product_discovery, - task_ids: result.taskIds, - stagger_seconds, - total_duration_seconds: totalDuration, - estimated_completion: estimatedEndTime.toISOString(), - message: `Created ${result.total} staggered tasks for AZ stores (${result.product_refresh} refresh, ${result.product_discovery} discovery)`, - }); - } catch (error: unknown) { - console.error('Error creating AZ store tasks:', error); - res.status(500).json({ error: 'Failed to create AZ store tasks' }); - } -}); - /** * POST /api/tasks/batch/entry-point-discovery * Create entry_point_discovery tasks for stores missing platform_dispensary_id @@ -1556,13 +1400,13 @@ router.post('/batch/store-discovery', async (req: Request, res: Response) => { * Create product_discovery tasks for all stores in a state * * This is the primary endpoint for triggering crawls by state. - * Creates staggered tasks for all crawl-enabled stores in the specified state. + * Creates tasks for all crawl-enabled stores in the specified state. * * Params: * - stateCode: State code (e.g., 'AZ', 'CA', 'CO') * * Body (optional): - * - stagger_seconds: number (default: 15) - Seconds between each task + * - stagger_seconds: number (default: 0) - Seconds between each task (0 = worker controls pacing) * - priority: number (default: 10) - Task priority * - method: 'curl' | 'http' | null (default: 'http') * @@ -1574,8 +1418,13 @@ router.post('/batch/store-discovery', async (req: Request, res: Response) => { router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => { try { const stateCode = req.params.stateCode.toUpperCase(); + const requestMetadata = getRequestMetadata(req); + + // Log the request for tracking phantom tasks + console.log(`[TaskAPI] POST /crawl-state/${stateCode} from ${requestMetadata.ip} (${requestMetadata.userAgent})`); + const { - stagger_seconds = 15, + stagger_seconds = 0, // Default to 0 (no stagger) - worker controls pacing priority = 10, method = 'http', } = req.body; @@ -1617,13 +1466,19 @@ router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => { const dispensaryIds = dispensariesResult.rows.map((d: { id: number }) => d.id); - // Create staggered tasks + console.log(`[TaskAPI] Creating ${dispensaryIds.length} product_discovery tasks for ${stateCode}`); + + // Create tasks with source tracking const result = await taskService.createStaggeredTasks( dispensaryIds, 'product_discovery', stagger_seconds, 'dutchie', - method + method, + { + source: 'api_crawl_state', + source_metadata: { ...requestMetadata, stateCode }, + } ); const totalDuration = (result.created - 1) * stagger_seconds; diff --git a/backend/src/services/task-scheduler.ts b/backend/src/services/task-scheduler.ts index ebcdd8c4..9ec913c0 100644 --- a/backend/src/services/task-scheduler.ts +++ b/backend/src/services/task-scheduler.ts @@ -316,13 +316,17 @@ class TaskScheduler { } // Create product_discovery tasks with HTTP transport - // Stagger by 15 seconds to prevent overwhelming proxies + // No stagger - worker controls pacing const { created } = await taskService.createStaggeredTasks( dispensaryIds, 'product_discovery', - 15, // 15 seconds apart + 0, // No stagger - worker controls pacing schedule.platform || 'dutchie', - 'http' // Force HTTP transport + 'http', // Force HTTP transport + { + source: 'schedule', + source_schedule_id: schedule.id, + } ); return created; @@ -350,6 +354,8 @@ class TaskScheduler { platform: schedule.platform || 'dutchie', priority: schedule.priority, method: 'http', // Force HTTP transport for browser-based discovery + source: 'schedule', + source_schedule_id: schedule.id, }); return 1; @@ -375,6 +381,8 @@ class TaskScheduler { await taskService.createTask({ role: 'analytics_refresh', priority: schedule.priority, + source: 'schedule', + source_schedule_id: schedule.id, }); return 1; diff --git a/backend/src/tasks/task-service.ts b/backend/src/tasks/task-service.ts index a8a65081..1d3c0e91 100644 --- a/backend/src/tasks/task-service.ts +++ b/backend/src/tasks/task-service.ts @@ -77,8 +77,21 @@ export interface CreateTaskParams { method?: 'curl' | 'http'; // Transport method: curl=axios/proxy, http=Puppeteer/browser scheduled_for?: Date; payload?: Record; // Per TASK_WORKFLOW_2024-12-10.md: For task chaining data + // Source tracking - helps identify where tasks come from + source?: TaskSource; + source_schedule_id?: number; + source_metadata?: Record; } +// Task creation sources - all automated tasks should be traceable +// Note: "Run Now" was removed - use task priority instead +export type TaskSource = + | 'schedule' // Created by task-scheduler.ts from task_schedules + | 'api_crawl_state' // POST /api/tasks/crawl-state/:stateCode + | 'api_batch_staggered' // POST /api/tasks/batch/staggered + | 'task_chain' // Created by task chaining (e.g., store_discovery -> product_discovery) + | 'manual'; // Created via admin UI or direct API call + export interface CapacityMetrics { role: string; pending_tasks: number; @@ -108,8 +121,8 @@ class TaskService { */ async createTask(params: CreateTaskParams): Promise { const result = await pool.query( - `INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for, payload) - VALUES ($1, $2, $3, $4, $5, $6, $7) + `INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for, payload, source, source_schedule_id, source_metadata) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING *`, [ params.role, @@ -119,6 +132,9 @@ class TaskService { params.method ?? null, // null = any worker can pick up, 'http' = http-capable workers only, 'curl' = curl workers only params.scheduled_for ?? null, params.payload ? JSON.stringify(params.payload) : null, + params.source ?? null, + params.source_schedule_id ?? null, + params.source_metadata ? JSON.stringify(params.source_metadata) : null, ] ); return result.rows[0] as WorkerTask; @@ -712,16 +728,15 @@ class TaskService { } /** - * Create multiple tasks with staggered start times. + * Create multiple tasks for bulk processing. * - * STAGGERED TASK WORKFLOW: - * ======================= - * This prevents resource contention and proxy assignment lag when creating - * many tasks at once. Each task gets a scheduled_for timestamp offset from - * the previous task. + * BULK TASK WORKFLOW: + * =================== + * Creates tasks with scheduled_for = NOW() by default. Worker-level controls + * handle pacing (MAX_CONCURRENT_TASKS, working hours, preflight checks). * * Workflow: - * 1. Task is created with scheduled_for = NOW() + (index * staggerSeconds) + * 1. Task is created with scheduled_for = NOW() (or staggered if specified) * 2. Worker claims task only when scheduled_for <= NOW() * 3. Worker runs preflight check on EVERY task claim * 4. If preflight passes, worker executes task @@ -729,15 +744,14 @@ class TaskService { * 6. Worker finishes task, polls for next available task * 7. Repeat - preflight runs again on next task claim * - * Benefits: - * - Prevents all 8 workers from hitting proxies simultaneously - * - Reduces API rate limiting / 403 errors - * - Spreads resource usage over time - * - Each task still runs preflight, ensuring proxy health + * Worker-Level Throttling: + * - MAX_CONCURRENT_TASKS env var limits concurrent tasks per worker + * - Working hours configuration restricts when tasks run + * - Preflight checks ensure proxy health before each task * * @param dispensaryIds - Array of dispensary IDs to create tasks for * @param role - Task role (e.g., 'product_refresh', 'product_discovery') - * @param staggerSeconds - Seconds between each task's scheduled_for time (default: 15) + * @param staggerSeconds - Seconds between each task's scheduled_for time (default: 0 = all NOW()) * @param platform - Platform identifier (default: 'dutchie') * @param method - Transport method: 'curl' or 'http' (default: null for any) * @returns Number of tasks created @@ -745,16 +759,26 @@ class TaskService { async createStaggeredTasks( dispensaryIds: number[], role: TaskRole, - staggerSeconds: number = 15, + staggerSeconds: number = 0, platform: string = 'dutchie', method: 'curl' | 'http' | null = null, - options: { skipRecentHours?: number } = {} + options: { + skipRecentHours?: number; + source?: TaskSource; + source_schedule_id?: number; + source_metadata?: Record; + } = {} ): Promise<{ created: number; skipped: number; taskIds: number[] }> { if (dispensaryIds.length === 0) { return { created: 0, skipped: 0, taskIds: [] }; } - const { skipRecentHours = 4 } = options; // Skip if completed within last 4 hours + const { + skipRecentHours = 4, + source = null, + source_schedule_id = null, + source_metadata = null, + } = options; // Filter out dispensaries that: // 1. Already have a pending/claimed/running task for this role @@ -786,17 +810,20 @@ class TaskService { SELECT dispensary_id, ROW_NUMBER() OVER (ORDER BY dispensary_id) - 1 as idx FROM eligible_ids ) - INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status) + INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status, source, source_schedule_id, source_metadata) SELECT $2::varchar as role, n.dispensary_id, $3::varchar as platform, $4::varchar as method, NOW() + (n.idx * $5::int * INTERVAL '1 second') as scheduled_for, - 'pending' as status + 'pending' as status, + $7::varchar as source, + $8::int as source_schedule_id, + $9::jsonb as source_metadata FROM numbered n RETURNING id - `, [dispensaryIds, role, platform, method, staggerSeconds, skipRecentHours]); + `, [dispensaryIds, role, platform, method, staggerSeconds, skipRecentHours, source, source_schedule_id, source_metadata ? JSON.stringify(source_metadata) : null]); const taskIds = result.rows.map((r: { id: number }) => r.id); const skipped = dispensaryIds.length - taskIds.length; @@ -810,112 +837,6 @@ class TaskService { return { created: taskIds.length, skipped, taskIds }; } - /** - * Create a batch of AZ store tasks with automatic distribution. - * - * This is a convenience method for creating tasks for Arizona stores with: - * - Automatic staggering to prevent resource contention - * - Even distribution across both refresh and discovery roles - * - * @param totalTasks - Total number of tasks to create - * @param staggerSeconds - Seconds between each task's start time - * @param splitRoles - If true, split between product_refresh and product_discovery - * @returns Summary of created tasks - */ - async createAZStoreTasks( - totalTasks: number = 24, - staggerSeconds: number = 15, - splitRoles: boolean = true - ): Promise<{ - total: number; - product_refresh: number; - product_discovery: number; - taskIds: number[]; - }> { - // Get AZ stores with platform_id and menu_url - const storesResult = await pool.query(` - SELECT d.id - FROM dispensaries d - JOIN states s ON d.state_id = s.id - WHERE s.code = 'AZ' - AND d.crawl_enabled = true - AND d.platform_dispensary_id IS NOT NULL - AND d.menu_url IS NOT NULL - ORDER BY d.id - `); - - const storeIds = storesResult.rows.map((r: { id: number }) => r.id); - - if (storeIds.length === 0) { - console.log('[TaskService] No AZ stores found with platform_id and menu_url'); - return { total: 0, product_refresh: 0, product_discovery: 0, taskIds: [] }; - } - - // Limit tasks to available stores - const maxTasks = Math.min(totalTasks, storeIds.length * 2); // 2x for both roles - const allTaskIds: number[] = []; - - if (splitRoles) { - // Split between refresh and discovery - const tasksPerRole = Math.floor(maxTasks / 2); - const refreshStores = storeIds.slice(0, tasksPerRole); - const discoveryStores = storeIds.slice(0, tasksPerRole); - - // Create refresh tasks first - const refreshResult = await this.createStaggeredTasks( - refreshStores, - 'product_refresh', - staggerSeconds, - 'dutchie' - ); - allTaskIds.push(...refreshResult.taskIds); - - // Create discovery tasks starting after refresh tasks are scheduled - const discoveryStartOffset = tasksPerRole * staggerSeconds; - const discoveryResult = await pool.query(` - WITH task_data AS ( - SELECT - unnest($1::int[]) as dispensary_id, - generate_series(0, array_length($1::int[], 1) - 1) as idx - ) - INSERT INTO worker_tasks (role, dispensary_id, platform, scheduled_for, status) - SELECT - 'product_discovery'::varchar as role, - td.dispensary_id, - 'dutchie'::varchar as platform, - NOW() + ($2::int * INTERVAL '1 second') + (td.idx * $3::int * INTERVAL '1 second') as scheduled_for, - 'pending' as status - FROM task_data td - ON CONFLICT DO NOTHING - RETURNING id - `, [discoveryStores, discoveryStartOffset, staggerSeconds]); - - allTaskIds.push(...discoveryResult.rows.map((r: { id: number }) => r.id)); - - return { - total: allTaskIds.length, - product_refresh: refreshResult.taskIds.length, - product_discovery: discoveryResult.rowCount ?? 0, - taskIds: allTaskIds - }; - } - - // Single role mode - all product_discovery - const result = await this.createStaggeredTasks( - storeIds.slice(0, totalTasks), - 'product_discovery', - staggerSeconds, - 'dutchie' - ); - - return { - total: result.taskIds.length, - product_refresh: 0, - product_discovery: result.taskIds.length, - taskIds: result.taskIds - }; - } - /** * Cleanup stale tasks that are stuck in 'claimed' or 'running' status. * diff --git a/cannaiq/src/lib/api.ts b/cannaiq/src/lib/api.ts index d990f084..87b871e3 100755 --- a/cannaiq/src/lib/api.ts +++ b/cannaiq/src/lib/api.ts @@ -2683,8 +2683,6 @@ class ApiClient { updateSchedule = this.updateDutchieAZSchedule.bind(this); /** @deprecated Use deleteTaskSchedule() - queries task_schedules table */ deleteSchedule = this.deleteDutchieAZSchedule.bind(this); - /** @deprecated Use runTaskScheduleNow() - queries task_schedules table */ - triggerSchedule = this.triggerDutchieAZSchedule.bind(this); /** @deprecated - job_schedules init not needed for task_schedules */ initSchedules = this.initDutchieAZSchedules.bind(this); getScheduleLogs = this.getCrawlScheduleLogs.bind(this); @@ -3061,11 +3059,7 @@ class ApiClient { ); } - async runTaskScheduleNow(id: number) { - return this.request<{ success: boolean; message: string; tasksCreated?: number; stateCode?: string }>(`/api/tasks/schedules/${id}/run-now`, { - method: 'POST', - }); - } + // Note: runTaskScheduleNow() was removed - use task priority instead async toggleTaskSchedule(id: number) { return this.request<{ success: boolean; schedule: { id: number; name: string; enabled: boolean }; message: string }>( diff --git a/cannaiq/src/pages/CrawlSchedulePage.tsx b/cannaiq/src/pages/CrawlSchedulePage.tsx index 461d5827..d22b2c0a 100644 --- a/cannaiq/src/pages/CrawlSchedulePage.tsx +++ b/cannaiq/src/pages/CrawlSchedulePage.tsx @@ -130,15 +130,6 @@ export function CrawlSchedulePage() { } }; - const handleTriggerSchedule = async (id: number) => { - try { - await api.triggerSchedule(id); - await loadData(); - } catch (error) { - console.error('Failed to trigger schedule:', error); - } - }; - const handleToggleEnabled = async (schedule: JobSchedule) => { try { await api.updateSchedule(schedule.id, { enabled: !schedule.enabled }); @@ -538,21 +529,6 @@ export function CrawlSchedulePage() {
-