# Worker Task Architecture This document describes the unified task-based worker system that replaces the legacy fragmented job systems. ## Overview The task worker architecture provides a single, unified system for managing all background work in CannaiQ: - **Store discovery** - Find new dispensaries on platforms - **Entry point discovery** - Resolve platform IDs from menu URLs - **Product discovery** - Initial product fetch for new stores - **Product resync** - Regular price/stock updates for existing stores - **Analytics refresh** - Refresh materialized views and analytics ## Architecture ### Database Tables **`worker_tasks`** - Central task queue ```sql CREATE TABLE worker_tasks ( id SERIAL PRIMARY KEY, role task_role NOT NULL, -- What type of work dispensary_id INTEGER, -- Which store (if applicable) platform VARCHAR(50), -- Which platform (dutchie, etc.) status task_status DEFAULT 'pending', priority INTEGER DEFAULT 0, -- Higher = process first scheduled_for TIMESTAMP, -- Don't process before this time worker_id VARCHAR(100), -- Which worker claimed it claimed_at TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP, last_heartbeat_at TIMESTAMP, -- For stale detection result JSONB, -- Output from handler error_message TEXT, retry_count INTEGER DEFAULT 0, max_retries INTEGER DEFAULT 3, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); ``` **Key indexes:** - `idx_worker_tasks_pending_priority` - For efficient task claiming - `idx_worker_tasks_active_dispensary` - Prevents concurrent tasks per store (partial unique index) ### Task Roles | Role | Purpose | Per-Store | Scheduled | |------|---------|-----------|-----------| | `store_discovery` | Find new stores on a platform | No | Daily | | `entry_point_discovery` | Resolve platform IDs | Yes | On-demand | | `product_discovery` | Initial product fetch | Yes | After entry_point | | `product_resync` | Price/stock updates | Yes | Every 4 hours | | `analytics_refresh` | Refresh MVs | No | Daily | ### Task Lifecycle ``` pending → claimed → running → completed ↓ failed ``` 1. **pending** - Task is waiting to be picked up 2. **claimed** - Worker has claimed it (atomic via SELECT FOR UPDATE SKIP LOCKED) 3. **running** - Worker is actively processing 4. **completed** - Task finished successfully 5. **failed** - Task encountered an error 6. **stale** - Task lost its worker (recovered automatically) ## Files ### Core Files | File | Purpose | |------|---------| | `src/tasks/task-service.ts` | TaskService - CRUD, claiming, capacity metrics | | `src/tasks/task-worker.ts` | TaskWorker - Main worker loop | | `src/tasks/index.ts` | Module exports | | `src/routes/tasks.ts` | API endpoints | | `migrations/074_worker_task_queue.sql` | Database schema | ### Task Handlers | File | Role | |------|------| | `src/tasks/handlers/store-discovery.ts` | `store_discovery` | | `src/tasks/handlers/entry-point-discovery.ts` | `entry_point_discovery` | | `src/tasks/handlers/product-discovery.ts` | `product_discovery` | | `src/tasks/handlers/product-resync.ts` | `product_resync` | | `src/tasks/handlers/analytics-refresh.ts` | `analytics_refresh` | ## Running Workers ### Environment Variables | Variable | Default | Description | |----------|---------|-------------| | `WORKER_ROLE` | (required) | Which task role to process | | `WORKER_ID` | auto-generated | Custom worker identifier | | `POLL_INTERVAL_MS` | 5000 | How often to check for tasks | | `HEARTBEAT_INTERVAL_MS` | 30000 | How often to update heartbeat | ### Starting a Worker ```bash # Start a product resync worker WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts # Start with custom ID WORKER_ROLE=product_resync WORKER_ID=resync-1 npx tsx src/tasks/task-worker.ts # Start multiple workers for different roles WORKER_ROLE=store_discovery npx tsx src/tasks/task-worker.ts & WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts & ``` ### Kubernetes Deployment ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: task-worker-resync spec: replicas: 3 template: spec: containers: - name: worker image: code.cannabrands.app/creationshop/dispensary-scraper:latest command: ["npx", "tsx", "src/tasks/task-worker.ts"] env: - name: WORKER_ROLE value: "product_resync" ``` ## API Endpoints ### Task Management | Endpoint | Method | Description | |----------|--------|-------------| | `/api/tasks` | GET | List tasks with filters | | `/api/tasks` | POST | Create a new task | | `/api/tasks/:id` | GET | Get task by ID | | `/api/tasks/counts` | GET | Get counts by status | | `/api/tasks/capacity` | GET | Get capacity metrics | | `/api/tasks/capacity/:role` | GET | Get role-specific capacity | | `/api/tasks/recover-stale` | POST | Recover tasks from dead workers | ### Task Generation | Endpoint | Method | Description | |----------|--------|-------------| | `/api/tasks/generate/resync` | POST | Generate daily resync tasks | | `/api/tasks/generate/discovery` | POST | Create store discovery task | ### Migration (from legacy systems) | Endpoint | Method | Description | |----------|--------|-------------| | `/api/tasks/migration/status` | GET | Compare old vs new systems | | `/api/tasks/migration/disable-old-schedules` | POST | Disable job_schedules | | `/api/tasks/migration/cancel-pending-crawl-jobs` | POST | Cancel old crawl jobs | | `/api/tasks/migration/create-resync-tasks` | POST | Create tasks for all stores | | `/api/tasks/migration/full-migrate` | POST | One-click migration | ### Role-Specific Endpoints | Endpoint | Method | Description | |----------|--------|-------------| | `/api/tasks/role/:role/last-completion` | GET | Last completion time | | `/api/tasks/role/:role/recent` | GET | Recent completions | | `/api/tasks/store/:id/active` | GET | Check if store has active task | ## Capacity Planning The `v_worker_capacity` view provides real-time metrics: ```sql SELECT * FROM v_worker_capacity; ``` Returns: - `pending_tasks` - Tasks waiting to be claimed - `ready_tasks` - Tasks ready now (scheduled_for is null or past) - `claimed_tasks` - Tasks claimed but not started - `running_tasks` - Tasks actively processing - `completed_last_hour` - Recent completions - `failed_last_hour` - Recent failures - `active_workers` - Workers with recent heartbeats - `avg_duration_sec` - Average task duration - `tasks_per_worker_hour` - Throughput estimate - `estimated_hours_to_drain` - Time to clear queue ### Scaling Recommendations ```javascript // API: GET /api/tasks/capacity/:role { "role": "product_resync", "pending_tasks": 500, "active_workers": 3, "workers_needed": { "for_1_hour": 10, "for_4_hours": 3, "for_8_hours": 2 } } ``` ## Task Chaining Tasks can automatically create follow-up tasks: ``` store_discovery → entry_point_discovery → product_discovery ↓ (store has platform_dispensary_id) ↓ Daily resync tasks ``` The `chainNextTask()` method handles this automatically. ## Stale Task Recovery Tasks are considered stale if `last_heartbeat_at` is older than the threshold (default 10 minutes). ```sql SELECT recover_stale_tasks(10); -- 10 minute threshold ``` Or via API: ```bash curl -X POST /api/tasks/recover-stale \ -H 'Content-Type: application/json' \ -d '{"threshold_minutes": 10}' ``` ## Migration from Legacy Systems ### Legacy Systems Replaced 1. **job_schedules + job_run_logs** - Scheduled job definitions 2. **dispensary_crawl_jobs** - Per-dispensary crawl queue 3. **SyncOrchestrator + HydrationWorker** - Raw payload processing ### Migration Steps **Option 1: One-Click Migration** ```bash curl -X POST /api/tasks/migration/full-migrate ``` This will: 1. Disable all job_schedules 2. Cancel pending dispensary_crawl_jobs 3. Generate resync tasks for all stores 4. Create discovery and analytics tasks **Option 2: Manual Migration** ```bash # 1. Check current status curl /api/tasks/migration/status # 2. Disable old schedules curl -X POST /api/tasks/migration/disable-old-schedules # 3. Cancel pending crawl jobs curl -X POST /api/tasks/migration/cancel-pending-crawl-jobs # 4. Create resync tasks curl -X POST /api/tasks/migration/create-resync-tasks \ -H 'Content-Type: application/json' \ -d '{"state_code": "AZ"}' # 5. Generate daily resync schedule curl -X POST /api/tasks/generate/resync \ -H 'Content-Type: application/json' \ -d '{"batches_per_day": 6}' ``` ## Per-Store Locking The system prevents concurrent tasks for the same store using a partial unique index: ```sql CREATE UNIQUE INDEX idx_worker_tasks_active_dispensary ON worker_tasks (dispensary_id) WHERE dispensary_id IS NOT NULL AND status IN ('claimed', 'running'); ``` This ensures only one task can be active per store at any time. ## Task Priority Tasks are claimed in priority order (higher first), then by creation time: ```sql ORDER BY priority DESC, created_at ASC ``` Default priorities: - `store_discovery`: 0 - `entry_point_discovery`: 10 (high - new stores) - `product_discovery`: 10 (high - new stores) - `product_resync`: 0 - `analytics_refresh`: 0 ## Scheduled Tasks Tasks can be scheduled for future execution: ```javascript await taskService.createTask({ role: 'product_resync', dispensary_id: 123, scheduled_for: new Date('2025-01-10T06:00:00Z'), }); ``` The `generate_resync_tasks()` function creates staggered tasks throughout the day: ```sql SELECT generate_resync_tasks(6, '2025-01-10'); -- 6 batches = every 4 hours ``` ## Dashboard Integration The admin dashboard shows task queue status in the main overview: ``` Task Queue Summary ------------------ Pending: 45 Running: 3 Completed: 1,234 Failed: 12 ``` Full task management is available at `/admin/tasks`. ## Error Handling Failed tasks include the error message in `error_message` and can be retried: ```sql -- View failed tasks SELECT id, role, dispensary_id, error_message, retry_count FROM worker_tasks WHERE status = 'failed' ORDER BY completed_at DESC LIMIT 20; -- Retry failed tasks UPDATE worker_tasks SET status = 'pending', retry_count = retry_count + 1 WHERE status = 'failed' AND retry_count < max_retries; ``` ## Concurrent Task Processing (Added 2024-12) Workers can now process multiple tasks concurrently within a single worker instance. This improves throughput by utilizing async I/O efficiently. ### Architecture ``` ┌─────────────────────────────────────────────────────────────┐ │ Pod (K8s) │ │ │ │ ┌─────────────────────────────────────────────────────┐ │ │ │ TaskWorker │ │ │ │ │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ │ │ Task 1 │ │ Task 2 │ │ Task 3 │ (concurrent)│ │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ │ │ │ │ │ Resource Monitor │ │ │ │ ├── Memory: 65% (threshold: 85%) │ │ │ │ ├── CPU: 45% (threshold: 90%) │ │ │ │ └── Status: Normal │ │ │ └─────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` ### Environment Variables | Variable | Default | Description | |----------|---------|-------------| | `MAX_CONCURRENT_TASKS` | 3 | Maximum tasks a worker will run concurrently | | `MEMORY_BACKOFF_THRESHOLD` | 0.85 | Back off when heap memory exceeds 85% | | `CPU_BACKOFF_THRESHOLD` | 0.90 | Back off when CPU exceeds 90% | | `BACKOFF_DURATION_MS` | 10000 | How long to wait when backing off (10s) | ### How It Works 1. **Main Loop**: Worker continuously tries to fill up to `MAX_CONCURRENT_TASKS` 2. **Resource Monitoring**: Before claiming a new task, worker checks memory and CPU 3. **Backoff**: If resources exceed thresholds, worker pauses and stops claiming new tasks 4. **Concurrent Execution**: Tasks run in parallel using `Promise` - they don't block each other 5. **Graceful Shutdown**: On SIGTERM/decommission, worker stops claiming but waits for active tasks ### Resource Monitoring ```typescript // ResourceStats interface interface ResourceStats { memoryPercent: number; // Current heap usage as decimal (0.0-1.0) memoryMb: number; // Current heap used in MB memoryTotalMb: number; // Total heap available in MB cpuPercent: number; // CPU usage as percentage (0-100) isBackingOff: boolean; // True if worker is in backoff state backoffReason: string; // Why the worker is backing off } ``` ### Heartbeat Data Workers report the following in their heartbeat: ```json { "worker_id": "worker-abc123", "current_task_id": 456, "current_task_ids": [456, 457, 458], "active_task_count": 3, "max_concurrent_tasks": 3, "status": "active", "resources": { "memory_mb": 256, "memory_total_mb": 512, "memory_rss_mb": 320, "memory_percent": 50, "cpu_user_ms": 12500, "cpu_system_ms": 3200, "cpu_percent": 45, "is_backing_off": false, "backoff_reason": null } } ``` ### Backoff Behavior When resources exceed thresholds: 1. Worker logs the backoff reason: ``` [TaskWorker] MyWorker backing off: Memory at 87.3% (threshold: 85%) ``` 2. Worker stops claiming new tasks but continues existing tasks 3. After `BACKOFF_DURATION_MS`, worker rechecks resources 4. When resources return to normal: ``` [TaskWorker] MyWorker resuming normal operation ``` ### UI Display The Workers Dashboard shows: - **Tasks Column**: `2/3 tasks` (active/max concurrent) - **Resources Column**: Memory % and CPU % with color coding - Green: < 50% - Yellow: 50-74% - Amber: 75-89% - Red: 90%+ - **Backing Off**: Orange warning badge when worker is in backoff state ### Task Count Badge Details ``` ┌─────────────────────────────────────────────┐ │ Worker: "MyWorker" │ │ Tasks: 2/3 tasks #456, #457 │ │ Resources: 🧠 65% 💻 45% │ │ Status: ● Active │ └─────────────────────────────────────────────┘ ``` ### Best Practices 1. **Start Conservative**: Use `MAX_CONCURRENT_TASKS=3` initially 2. **Monitor Resources**: Watch for frequent backoffs in logs 3. **Tune Per Workload**: I/O-bound tasks benefit from higher concurrency 4. **Scale Horizontally**: Add more pods rather than cranking concurrency too high ### Code References | File | Purpose | |------|---------| | `src/tasks/task-worker.ts:68-71` | Concurrency environment variables | | `src/tasks/task-worker.ts:104-111` | ResourceStats interface | | `src/tasks/task-worker.ts:149-179` | getResourceStats() method | | `src/tasks/task-worker.ts:184-196` | shouldBackOff() method | | `src/tasks/task-worker.ts:462-516` | mainLoop() with concurrent claiming | | `src/routes/worker-registry.ts:148-195` | Heartbeat endpoint handling | | `cannaiq/src/pages/WorkersDashboard.tsx:233-305` | UI components for resources | ## Browser Task Memory Limits (Updated 2025-12) Browser-based tasks (Puppeteer/Chrome) have strict memory constraints that limit concurrency. ### Why Browser Tasks Are Different Each browser task launches a Chrome process. Unlike I/O-bound API calls, browsers consume significant RAM: | Component | RAM Usage | |-----------|-----------| | Node.js runtime | ~150 MB | | Chrome browser (base) | ~200-250 MB | | Dutchie menu page (loaded) | ~100-150 MB | | **Per browser total** | **~350-450 MB** | ### Memory Math for Pod Limits ``` Pod memory limit: 2 GB (2000 MB) Node.js runtime: -150 MB Safety buffer: -100 MB ──────────────────────────────── Available for browsers: 1750 MB Per browser + page: ~400 MB Max browsers: 1750 ÷ 400 = ~4 browsers Recommended: 3 browsers (leaves headroom for spikes) ``` ### MAX_CONCURRENT_TASKS for Browser Tasks | Browsers per Pod | RAM Used | Risk Level | |------------------|----------|------------| | 1 | ~500 MB | Very safe | | 2 | ~900 MB | Safe | | **3** | **~1.3 GB** | **Recommended** | | 4 | ~1.7 GB | Tight (may OOM) | | 5+ | >2 GB | Will OOM crash | **CRITICAL**: `MAX_CONCURRENT_TASKS=3` is the maximum safe value for browser tasks with current pod limits. ### Scaling Strategy Scale **horizontally** (more pods) rather than vertically (more concurrency per pod): ``` ┌─────────────────────────────────────────────────────────────────────────┐ │ Cluster: 8 pods × 3 browsers = 24 concurrent tasks │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Pod 0 │ │ Pod 1 │ │ Pod 2 │ │ Pod 3 │ │ │ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Pod 4 │ │ Pod 5 │ │ Pod 6 │ │ Pod 7 │ │ │ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────────────────────────────────────┘ ``` ### Browser Lifecycle Per Task Each task gets a fresh browser with fresh IP/identity: ``` 1. Claim task from queue 2. Get fresh proxy from pool 3. Launch browser with proxy 4. Run preflight (verify IP) 5. Execute scrape 6. Close browser 7. Repeat ``` This ensures: - Fresh IP per task (proxy rotation) - Fresh fingerprint per task (UA rotation) - No cookie/session bleed between tasks - Predictable memory usage ### Increasing Capacity To handle more concurrent tasks: 1. **Add more pods** (up to 8 per CLAUDE.md limit) 2. **Increase pod memory** (allows 4 browsers per pod): ```yaml resources: limits: memory: "2.5Gi" # from 2Gi ``` **DO NOT** simply increase `MAX_CONCURRENT_TASKS` without also increasing pod memory limits. ## Monitoring ### Logs Workers log to stdout: ``` [TaskWorker] Starting worker worker-product_resync-a1b2c3d4 for role: product_resync [TaskWorker] Claimed task 123 (product_resync) for dispensary 456 [TaskWorker] Task 123 completed successfully ``` ### Health Check Check if workers are active: ```sql SELECT worker_id, role, COUNT(*), MAX(last_heartbeat_at) FROM worker_tasks WHERE last_heartbeat_at > NOW() - INTERVAL '5 minutes' GROUP BY worker_id, role; ``` ### Metrics ```sql -- Tasks by status SELECT status, COUNT(*) FROM worker_tasks GROUP BY status; -- Tasks by role SELECT role, status, COUNT(*) FROM worker_tasks GROUP BY role, status; -- Average duration by role SELECT role, AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_seconds FROM worker_tasks WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '24 hours' GROUP BY role; ```