# Worker Task Architecture This document describes the unified task-based worker system that replaces the legacy fragmented job systems. ## Overview The task worker architecture provides a single, unified system for managing all background work in CannaiQ: - **Store discovery** - Find new dispensaries on platforms - **Entry point discovery** - Resolve platform IDs from menu URLs - **Product discovery** - Initial product fetch for new stores - **Product resync** - Regular price/stock updates for existing stores - **Analytics refresh** - Refresh materialized views and analytics ## Architecture ### Database Tables **`worker_tasks`** - Central task queue ```sql CREATE TABLE worker_tasks ( id SERIAL PRIMARY KEY, role task_role NOT NULL, -- What type of work dispensary_id INTEGER, -- Which store (if applicable) platform VARCHAR(50), -- Which platform (dutchie, etc.) status task_status DEFAULT 'pending', priority INTEGER DEFAULT 0, -- Higher = process first scheduled_for TIMESTAMP, -- Don't process before this time worker_id VARCHAR(100), -- Which worker claimed it claimed_at TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP, last_heartbeat_at TIMESTAMP, -- For stale detection result JSONB, -- Output from handler error_message TEXT, retry_count INTEGER DEFAULT 0, max_retries INTEGER DEFAULT 3, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); ``` **Key indexes:** - `idx_worker_tasks_pending_priority` - For efficient task claiming - `idx_worker_tasks_active_dispensary` - Prevents concurrent tasks per store (partial unique index) ### Task Roles | Role | Purpose | Per-Store | Scheduled | |------|---------|-----------|-----------| | `store_discovery` | Find new stores on a platform | No | Daily | | `entry_point_discovery` | Resolve platform IDs | Yes | On-demand | | `product_discovery` | Initial product fetch | Yes | After entry_point | | `product_resync` | Price/stock updates | Yes | Every 4 hours | | `analytics_refresh` | Refresh MVs | No | Daily | ### Task Lifecycle ``` pending → claimed → running → completed ↓ failed ``` 1. **pending** - Task is waiting to be picked up 2. **claimed** - Worker has claimed it (atomic via SELECT FOR UPDATE SKIP LOCKED) 3. **running** - Worker is actively processing 4. **completed** - Task finished successfully 5. **failed** - Task encountered an error 6. **stale** - Task lost its worker (recovered automatically) ## Files ### Core Files | File | Purpose | |------|---------| | `src/tasks/task-service.ts` | TaskService - CRUD, claiming, capacity metrics | | `src/tasks/task-worker.ts` | TaskWorker - Main worker loop | | `src/tasks/index.ts` | Module exports | | `src/routes/tasks.ts` | API endpoints | | `migrations/074_worker_task_queue.sql` | Database schema | ### Task Handlers | File | Role | |------|------| | `src/tasks/handlers/store-discovery.ts` | `store_discovery` | | `src/tasks/handlers/entry-point-discovery.ts` | `entry_point_discovery` | | `src/tasks/handlers/product-discovery.ts` | `product_discovery` | | `src/tasks/handlers/product-resync.ts` | `product_resync` | | `src/tasks/handlers/analytics-refresh.ts` | `analytics_refresh` | ## Running Workers ### Environment Variables | Variable | Default | Description | |----------|---------|-------------| | `WORKER_ROLE` | (required) | Which task role to process | | `WORKER_ID` | auto-generated | Custom worker identifier | | `POLL_INTERVAL_MS` | 5000 | How often to check for tasks | | `HEARTBEAT_INTERVAL_MS` | 30000 | How often to update heartbeat | ### Starting a Worker ```bash # Start a product resync worker WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts # Start with custom ID WORKER_ROLE=product_resync WORKER_ID=resync-1 npx tsx src/tasks/task-worker.ts # Start multiple workers for different roles WORKER_ROLE=store_discovery npx tsx src/tasks/task-worker.ts & WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts & ``` ### Kubernetes Deployment ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: task-worker-resync spec: replicas: 3 template: spec: containers: - name: worker image: code.cannabrands.app/creationshop/dispensary-scraper:latest command: ["npx", "tsx", "src/tasks/task-worker.ts"] env: - name: WORKER_ROLE value: "product_resync" ``` ## API Endpoints ### Task Management | Endpoint | Method | Description | |----------|--------|-------------| | `/api/tasks` | GET | List tasks with filters | | `/api/tasks` | POST | Create a new task | | `/api/tasks/:id` | GET | Get task by ID | | `/api/tasks/counts` | GET | Get counts by status | | `/api/tasks/capacity` | GET | Get capacity metrics | | `/api/tasks/capacity/:role` | GET | Get role-specific capacity | | `/api/tasks/recover-stale` | POST | Recover tasks from dead workers | ### Task Generation | Endpoint | Method | Description | |----------|--------|-------------| | `/api/tasks/generate/resync` | POST | Generate daily resync tasks | | `/api/tasks/generate/discovery` | POST | Create store discovery task | ### Migration (from legacy systems) | Endpoint | Method | Description | |----------|--------|-------------| | `/api/tasks/migration/status` | GET | Compare old vs new systems | | `/api/tasks/migration/disable-old-schedules` | POST | Disable job_schedules | | `/api/tasks/migration/cancel-pending-crawl-jobs` | POST | Cancel old crawl jobs | | `/api/tasks/migration/create-resync-tasks` | POST | Create tasks for all stores | | `/api/tasks/migration/full-migrate` | POST | One-click migration | ### Role-Specific Endpoints | Endpoint | Method | Description | |----------|--------|-------------| | `/api/tasks/role/:role/last-completion` | GET | Last completion time | | `/api/tasks/role/:role/recent` | GET | Recent completions | | `/api/tasks/store/:id/active` | GET | Check if store has active task | ## Capacity Planning The `v_worker_capacity` view provides real-time metrics: ```sql SELECT * FROM v_worker_capacity; ``` Returns: - `pending_tasks` - Tasks waiting to be claimed - `ready_tasks` - Tasks ready now (scheduled_for is null or past) - `claimed_tasks` - Tasks claimed but not started - `running_tasks` - Tasks actively processing - `completed_last_hour` - Recent completions - `failed_last_hour` - Recent failures - `active_workers` - Workers with recent heartbeats - `avg_duration_sec` - Average task duration - `tasks_per_worker_hour` - Throughput estimate - `estimated_hours_to_drain` - Time to clear queue ### Scaling Recommendations ```javascript // API: GET /api/tasks/capacity/:role { "role": "product_resync", "pending_tasks": 500, "active_workers": 3, "workers_needed": { "for_1_hour": 10, "for_4_hours": 3, "for_8_hours": 2 } } ``` ## Task Chaining Tasks can automatically create follow-up tasks: ``` store_discovery → entry_point_discovery → product_discovery ↓ (store has platform_dispensary_id) ↓ Daily resync tasks ``` The `chainNextTask()` method handles this automatically. ## Stale Task Recovery Tasks are considered stale if `last_heartbeat_at` is older than the threshold (default 10 minutes). ```sql SELECT recover_stale_tasks(10); -- 10 minute threshold ``` Or via API: ```bash curl -X POST /api/tasks/recover-stale \ -H 'Content-Type: application/json' \ -d '{"threshold_minutes": 10}' ``` ## Migration from Legacy Systems ### Legacy Systems Replaced 1. **job_schedules + job_run_logs** - Scheduled job definitions 2. **dispensary_crawl_jobs** - Per-dispensary crawl queue 3. **SyncOrchestrator + HydrationWorker** - Raw payload processing ### Migration Steps **Option 1: One-Click Migration** ```bash curl -X POST /api/tasks/migration/full-migrate ``` This will: 1. Disable all job_schedules 2. Cancel pending dispensary_crawl_jobs 3. Generate resync tasks for all stores 4. Create discovery and analytics tasks **Option 2: Manual Migration** ```bash # 1. Check current status curl /api/tasks/migration/status # 2. Disable old schedules curl -X POST /api/tasks/migration/disable-old-schedules # 3. Cancel pending crawl jobs curl -X POST /api/tasks/migration/cancel-pending-crawl-jobs # 4. Create resync tasks curl -X POST /api/tasks/migration/create-resync-tasks \ -H 'Content-Type: application/json' \ -d '{"state_code": "AZ"}' # 5. Generate daily resync schedule curl -X POST /api/tasks/generate/resync \ -H 'Content-Type: application/json' \ -d '{"batches_per_day": 6}' ``` ## Per-Store Locking The system prevents concurrent tasks for the same store using a partial unique index: ```sql CREATE UNIQUE INDEX idx_worker_tasks_active_dispensary ON worker_tasks (dispensary_id) WHERE dispensary_id IS NOT NULL AND status IN ('claimed', 'running'); ``` This ensures only one task can be active per store at any time. ## Task Priority Tasks are claimed in priority order (higher first), then by creation time: ```sql ORDER BY priority DESC, created_at ASC ``` Default priorities: - `store_discovery`: 0 - `entry_point_discovery`: 10 (high - new stores) - `product_discovery`: 10 (high - new stores) - `product_resync`: 0 - `analytics_refresh`: 0 ## Scheduled Tasks Tasks can be scheduled for future execution: ```javascript await taskService.createTask({ role: 'product_resync', dispensary_id: 123, scheduled_for: new Date('2025-01-10T06:00:00Z'), }); ``` The `generate_resync_tasks()` function creates staggered tasks throughout the day: ```sql SELECT generate_resync_tasks(6, '2025-01-10'); -- 6 batches = every 4 hours ``` ## Dashboard Integration The admin dashboard shows task queue status in the main overview: ``` Task Queue Summary ------------------ Pending: 45 Running: 3 Completed: 1,234 Failed: 12 ``` Full task management is available at `/admin/tasks`. ## Error Handling Failed tasks include the error message in `error_message` and can be retried: ```sql -- View failed tasks SELECT id, role, dispensary_id, error_message, retry_count FROM worker_tasks WHERE status = 'failed' ORDER BY completed_at DESC LIMIT 20; -- Retry failed tasks UPDATE worker_tasks SET status = 'pending', retry_count = retry_count + 1 WHERE status = 'failed' AND retry_count < max_retries; ``` ## Monitoring ### Logs Workers log to stdout: ``` [TaskWorker] Starting worker worker-product_resync-a1b2c3d4 for role: product_resync [TaskWorker] Claimed task 123 (product_resync) for dispensary 456 [TaskWorker] Task 123 completed successfully ``` ### Health Check Check if workers are active: ```sql SELECT worker_id, role, COUNT(*), MAX(last_heartbeat_at) FROM worker_tasks WHERE last_heartbeat_at > NOW() - INTERVAL '5 minutes' GROUP BY worker_id, role; ``` ### Metrics ```sql -- Tasks by status SELECT status, COUNT(*) FROM worker_tasks GROUP BY status; -- Tasks by role SELECT role, status, COUNT(*) FROM worker_tasks GROUP BY role, status; -- Average duration by role SELECT role, AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_seconds FROM worker_tasks WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '24 hours' GROUP BY role; ```