# CannaiQ Worker System ## Overview The Worker System is a role-based task queue that processes background jobs. All tasks go into a single pool, and workers claim tasks based on their assigned role. --- ## Design Pattern: Single Pool, Role-Based Claiming ``` ┌─────────────────────────────────────────┐ │ TASK POOL (worker_tasks) │ │ │ │ ┌─────────────────────────────────┐ │ │ │ role=store_discovery pending │ │ │ │ role=product_resync pending │ │ │ │ role=product_resync pending │ │ │ │ role=product_resync pending │ │ │ │ role=analytics_refresh pending │ │ │ │ role=entry_point_disc pending │ │ │ └─────────────────────────────────┘ │ └─────────────────────────────────────────┘ │ ┌────────────────────────────┼────────────────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │ WORKER │ │ WORKER │ │ WORKER │ │ role=product_ │ │ role=product_ │ │ role=store_ │ │ resync │ │ resync │ │ discovery │ │ │ │ │ │ │ │ Claims ONLY │ │ Claims ONLY │ │ Claims ONLY │ │ product_resync │ │ product_resync │ │ store_discovery │ │ tasks │ │ tasks │ │ tasks │ └──────────────────┘ └──────────────────┘ └──────────────────┘ ``` **Key Points:** - All tasks go into ONE table (`worker_tasks`) - Each worker is assigned ONE role at startup - Workers only claim tasks matching their role - Multiple workers can share the same role (horizontal scaling) --- ## Worker Roles | Role | Purpose | Per-Store? | Schedule | |------|---------|------------|----------| | `store_discovery` | Find new dispensaries via GraphQL | No | Weekly | | `entry_point_discovery` | Resolve platform IDs from menu URLs | Yes | On-demand | | `product_discovery` | Initial product fetch for new stores | Yes | On-demand | | `product_resync` | Regular price/stock updates | Yes | Every 4 hours | | `analytics_refresh` | Refresh materialized views | No | Daily | --- ## Task Lifecycle ``` pending → claimed → running → completed ↓ failed ↓ (retry if < max_retries) ``` | Status | Meaning | |--------|---------| | `pending` | Waiting to be claimed | | `claimed` | Worker has claimed, not yet started | | `running` | Worker is actively processing | | `completed` | Successfully finished | | `failed` | Error occurred | | `stale` | Worker died (heartbeat timeout) | --- ## Task Chaining Tasks automatically create follow-up tasks: ``` store_discovery (finds new stores) │ ├─ Returns newStoreIds[] in result ▼ entry_point_discovery (for each new store) │ ├─ Resolves platform_dispensary_id ▼ product_discovery (initial crawl) │ ▼ (store enters regular schedule) │ ▼ product_resync (every 4 hours) ``` --- ## How Claiming Works ### 1. Worker starts with a role ```bash WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts ``` ### 2. Worker loop polls for tasks ```typescript // Simplified worker loop while (running) { const task = await claimTask(this.role, this.workerId); if (!task) { await sleep(5000); // No tasks, wait 5 seconds continue; } await processTask(task); } ``` ### 3. SQL function claims atomically ```sql -- claim_task(role, worker_id) UPDATE worker_tasks SET status = 'claimed', worker_id = $2, claimed_at = NOW() WHERE id = ( SELECT id FROM worker_tasks WHERE role = $1 -- Filter by worker's role AND status = 'pending' AND (scheduled_for IS NULL OR scheduled_for <= NOW()) AND dispensary_id NOT IN ( -- Per-store locking SELECT dispensary_id FROM worker_tasks WHERE status IN ('claimed', 'running') ) ORDER BY priority DESC, created_at ASC -- Priority ordering LIMIT 1 FOR UPDATE SKIP LOCKED -- Atomic, no race conditions ) RETURNING *; ``` **Key Features:** - `FOR UPDATE SKIP LOCKED` - Prevents race conditions between workers - Role filtering - Worker only sees tasks for its role - Per-store locking - Only one active task per dispensary - Priority ordering - Higher priority tasks first - Scheduled tasks - Respects `scheduled_for` timestamp --- ## Heartbeat & Stale Recovery Workers send heartbeats every 30 seconds while processing: ```typescript // During task processing setInterval(() => { await pool.query( 'UPDATE worker_tasks SET last_heartbeat_at = NOW() WHERE id = $1', [taskId] ); }, 30000); ``` If a worker dies, its tasks are recovered: ```sql -- recover_stale_tasks(threshold_minutes) UPDATE worker_tasks SET status = 'pending', worker_id = NULL, retry_count = retry_count + 1 WHERE status IN ('claimed', 'running') AND last_heartbeat_at < NOW() - INTERVAL '10 minutes' AND retry_count < max_retries; ``` --- ## Scheduling ### Daily Resync Generation ```sql SELECT generate_resync_tasks(6, CURRENT_DATE); -- 6 batches = every 4 hours ``` Creates staggered tasks: | Batch | Time | Stores | |-------|------|--------| | 1 | 00:00 | 1-50 | | 2 | 04:00 | 51-100 | | 3 | 08:00 | 101-150 | | 4 | 12:00 | 151-200 | | 5 | 16:00 | 201-250 | | 6 | 20:00 | 251-300 | --- ## Files ### Core | File | Purpose | |------|---------| | `src/tasks/task-service.ts` | Task CRUD, claiming, capacity metrics | | `src/tasks/task-worker.ts` | Worker loop, heartbeat, handler dispatch | | `src/routes/tasks.ts` | REST API endpoints | | `migrations/074_worker_task_queue.sql` | Database schema + SQL functions | ### Handlers | File | Role | |------|------| | `src/tasks/handlers/store-discovery.ts` | `store_discovery` | | `src/tasks/handlers/entry-point-discovery.ts` | `entry_point_discovery` | | `src/tasks/handlers/product-discovery.ts` | `product_discovery` | | `src/tasks/handlers/product-resync.ts` | `product_resync` | | `src/tasks/handlers/analytics-refresh.ts` | `analytics_refresh` | --- ## Running Workers ### Local Development ```bash # Start a single worker WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts # Start multiple workers (different terminals) WORKER_ROLE=product_resync WORKER_ID=resync-1 npx tsx src/tasks/task-worker.ts WORKER_ROLE=product_resync WORKER_ID=resync-2 npx tsx src/tasks/task-worker.ts WORKER_ROLE=store_discovery npx tsx src/tasks/task-worker.ts ``` ### Environment Variables | Variable | Default | Description | |----------|---------|-------------| | `WORKER_ROLE` | (required) | Which task role to process | | `WORKER_ID` | auto-generated | Custom worker identifier | | `POLL_INTERVAL_MS` | 5000 | How often to check for tasks | | `HEARTBEAT_INTERVAL_MS` | 30000 | How often to update heartbeat | ### Kubernetes ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: task-worker-resync spec: replicas: 5 # Scale horizontally template: spec: containers: - name: worker image: code.cannabrands.app/creationshop/dispensary-scraper:latest command: ["npx", "tsx", "src/tasks/task-worker.ts"] env: - name: WORKER_ROLE value: "product_resync" ``` --- ## API Endpoints ### Task Management | Method | Endpoint | Description | |--------|----------|-------------| | GET | `/api/tasks` | List tasks (with filters) | | POST | `/api/tasks` | Create a task | | GET | `/api/tasks/:id` | Get task by ID | | GET | `/api/tasks/counts` | Counts by status | | GET | `/api/tasks/capacity` | Capacity metrics | | POST | `/api/tasks/recover-stale` | Recover dead worker tasks | ### Task Generation | Method | Endpoint | Description | |--------|----------|-------------| | POST | `/api/tasks/generate/resync` | Generate daily resync batch | | POST | `/api/tasks/generate/discovery` | Create store discovery task | --- ## Capacity Planning The `v_worker_capacity` view provides metrics: ```sql SELECT * FROM v_worker_capacity; ``` | Metric | Description | |--------|-------------| | `pending_tasks` | Tasks waiting | | `ready_tasks` | Tasks ready now (scheduled_for passed) | | `running_tasks` | Tasks being processed | | `active_workers` | Workers with recent heartbeat | | `tasks_per_worker_hour` | Throughput estimate | | `estimated_hours_to_drain` | Time to clear queue | ### Scaling API ```bash GET /api/tasks/capacity/product_resync ``` ```json { "pending_tasks": 500, "active_workers": 3, "workers_needed": { "for_1_hour": 10, "for_4_hours": 3, "for_8_hours": 2 } } ``` --- ## Database Schema ### worker_tasks ```sql CREATE TABLE worker_tasks ( id SERIAL PRIMARY KEY, -- Task identification role VARCHAR(50) NOT NULL, dispensary_id INTEGER REFERENCES dispensaries(id), platform VARCHAR(20), -- State status VARCHAR(20) DEFAULT 'pending', priority INTEGER DEFAULT 0, scheduled_for TIMESTAMPTZ, -- Ownership worker_id VARCHAR(100), claimed_at TIMESTAMPTZ, started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, last_heartbeat_at TIMESTAMPTZ, -- Results result JSONB, error_message TEXT, retry_count INTEGER DEFAULT 0, max_retries INTEGER DEFAULT 3, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); ``` ### Key Indexes ```sql -- Fast claiming by role CREATE INDEX idx_worker_tasks_pending ON worker_tasks(role, priority DESC, created_at ASC) WHERE status = 'pending'; -- Prevent duplicate active tasks per store CREATE UNIQUE INDEX idx_worker_tasks_unique_active_store ON worker_tasks(dispensary_id) WHERE status IN ('claimed', 'running') AND dispensary_id IS NOT NULL; ``` --- ## Monitoring ### Logs ``` [TaskWorker] Starting worker worker-product_resync-a1b2c3d4 for role: product_resync [TaskWorker] Claimed task 123 (product_resync) for dispensary 456 [TaskWorker] Task 123 completed successfully ``` ### Health Check ```sql -- Active workers SELECT worker_id, role, COUNT(*), MAX(last_heartbeat_at) FROM worker_tasks WHERE last_heartbeat_at > NOW() - INTERVAL '5 minutes' GROUP BY worker_id, role; -- Task counts by role/status SELECT role, status, COUNT(*) FROM worker_tasks GROUP BY role, status; ```