# Task Workflow Documentation **Date: 2024-12-10** This document describes the complete task/job processing architecture after the 2024-12-10 rewrite. --- ## Complete Architecture ``` ┌─────────────────────────────────────────────────────────────────────────────────┐ │ KUBERNETES CLUSTER │ ├─────────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────────────────┐ │ │ │ API SERVER POD (scraper) │ │ │ │ │ │ │ │ ┌──────────────────┐ ┌────────────────────────────────────────┐ │ │ │ │ │ Express API │ │ TaskScheduler │ │ │ │ │ │ │ │ (src/services/task-scheduler.ts) │ │ │ │ │ │ /api/job-queue │ │ │ │ │ │ │ │ /api/tasks │ │ • Polls every 60s │ │ │ │ │ │ /api/schedules │ │ • Checks task_schedules table │ │ │ │ │ └────────┬─────────┘ │ • SELECT FOR UPDATE SKIP LOCKED │ │ │ │ │ │ │ • Generates tasks when due │ │ │ │ │ │ └──────────────────┬─────────────────────┘ │ │ │ │ │ │ │ │ │ └────────────┼──────────────────────────────────┼──────────────────────────┘ │ │ │ │ │ │ │ ┌────────────────────────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────────┐ │ │ │ POSTGRESQL DATABASE │ │ │ │ │ │ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ │ │ task_schedules │ │ worker_tasks │ │ │ │ │ │ │ │ │ │ │ │ │ │ • product_refresh │───────►│ • pending tasks │ │ │ │ │ │ • store_discovery │ create │ • claimed tasks │ │ │ │ │ │ • analytics_refresh │ tasks │ • running tasks │ │ │ │ │ │ │ │ • completed tasks │ │ │ │ │ │ next_run_at │ │ │ │ │ │ │ │ last_run_at │ │ role, dispensary_id │ │ │ │ │ │ interval_hours │ │ priority, status │ │ │ │ │ └─────────────────────┘ └──────────┬──────────┘ │ │ │ │ │ │ │ │ └─────────────────────────────────────────────┼────────────────────────────┘ │ │ │ │ │ ┌──────────────────────┘ │ │ │ Workers poll for tasks │ │ │ (SELECT FOR UPDATE SKIP LOCKED) │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────────┐ │ │ │ WORKER PODS (StatefulSet: scraper-worker) │ │ │ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Worker 0 │ │ Worker 1 │ │ Worker 2 │ │ Worker N │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ task-worker │ │ task-worker │ │ task-worker │ │ task-worker │ │ │ │ │ │ .ts │ │ .ts │ │ .ts │ │ .ts │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ └──────────────────────────────────────────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────────────────────────┘ ``` --- ## Startup Sequence ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ API SERVER STARTUP │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ 1. Express app initializes │ │ │ │ │ ▼ │ │ 2. runAutoMigrations() │ │ • Runs pending migrations (including 079_task_schedules.sql) │ │ │ │ │ ▼ │ │ 3. initializeMinio() / initializeImageStorage() │ │ │ │ │ ▼ │ │ 4. cleanupOrphanedJobs() │ │ │ │ │ ▼ │ │ 5. taskScheduler.start() ◄─── NEW (per TASK_WORKFLOW_2024-12-10.md) │ │ │ │ │ ├── Recover stale tasks (workers that died) │ │ ├── Ensure default schedules exist in task_schedules │ │ ├── Check and run any due schedules immediately │ │ └── Start 60-second poll interval │ │ │ │ │ ▼ │ │ 6. app.listen(PORT) │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────────────┐ │ WORKER POD STARTUP │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ 1. K8s starts pod from StatefulSet │ │ │ │ │ ▼ │ │ 2. TaskWorker.constructor() │ │ • Create DB pool │ │ • Create CrawlRotator │ │ │ │ │ ▼ │ │ 3. initializeStealth() │ │ • Load proxies from DB (REQUIRED - fails if none) │ │ • Wire rotator to Dutchie client │ │ │ │ │ ▼ │ │ 4. register() with API │ │ • Optional - continues if fails │ │ │ │ │ ▼ │ │ 5. startRegistryHeartbeat() every 30s │ │ │ │ │ ▼ │ │ 6. processNextTask() loop │ │ │ │ │ ├── Poll for pending task (FOR UPDATE SKIP LOCKED) │ │ ├── Claim task atomically │ │ ├── Execute handler (product_refresh, store_discovery, etc.) │ │ ├── Mark complete/failed │ │ ├── Chain next task if applicable │ │ └── Loop │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` --- ## Schedule Flow ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ SCHEDULER POLL (every 60 seconds) │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ BEGIN TRANSACTION │ │ │ │ │ ▼ │ │ SELECT * FROM task_schedules │ │ WHERE enabled = true AND next_run_at <= NOW() │ │ FOR UPDATE SKIP LOCKED ◄─── Prevents duplicate execution across replicas │ │ │ │ │ ▼ │ │ For each due schedule: │ │ │ │ │ ├── product_refresh_all │ │ │ └─► Query dispensaries needing crawl │ │ │ └─► Create product_refresh tasks in worker_tasks │ │ │ │ │ ├── store_discovery_dutchie │ │ │ └─► Create single store_discovery task │ │ │ │ │ └── analytics_refresh │ │ └─► Create single analytics_refresh task │ │ │ │ │ ▼ │ │ UPDATE task_schedules SET │ │ last_run_at = NOW(), │ │ next_run_at = NOW() + interval_hours │ │ │ │ │ ▼ │ │ COMMIT │ │ │ └─────────────────────────────────────────────────────────────────────────────┘ ``` --- ## Task Lifecycle ``` ┌──────────┐ │ SCHEDULE │ │ DUE │ └────┬─────┘ │ ▼ ┌──────────────┐ claim ┌──────────────┐ start ┌──────────────┐ │ PENDING │────────────►│ CLAIMED │────────────►│ RUNNING │ └──────────────┘ └──────────────┘ └──────┬───────┘ ▲ │ │ ┌──────────────┼──────────────┐ │ retry │ │ │ │ (if retries < max) ▼ ▼ ▼ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ └──────────────────────────────────│ FAILED │ │ COMPLETED│ │ STALE │ └──────────┘ └──────────┘ └────┬─────┘ │ recover_stale_tasks() │ ▼ ┌──────────┐ │ PENDING │ └──────────┘ ``` --- ## Database Tables ### task_schedules (NEW - migration 079) Stores schedule definitions. Survives restarts. ```sql CREATE TABLE task_schedules ( id SERIAL PRIMARY KEY, name VARCHAR(100) NOT NULL UNIQUE, role VARCHAR(50) NOT NULL, -- product_refresh, store_discovery, etc. enabled BOOLEAN DEFAULT TRUE, interval_hours INTEGER NOT NULL, -- How often to run priority INTEGER DEFAULT 0, -- Task priority when created state_code VARCHAR(2), -- Optional filter last_run_at TIMESTAMPTZ, -- When it last ran next_run_at TIMESTAMPTZ, -- When it's due next last_task_count INTEGER, -- Tasks created last run last_error TEXT -- Error message if failed ); ``` ### worker_tasks (migration 074) The task queue. Workers pull from here. ```sql CREATE TABLE worker_tasks ( id SERIAL PRIMARY KEY, role task_role NOT NULL, -- What type of work dispensary_id INTEGER, -- Which store (if applicable) platform VARCHAR(50), -- Which platform status task_status DEFAULT 'pending', priority INTEGER DEFAULT 0, -- Higher = process first scheduled_for TIMESTAMP, -- Don't process before this time worker_id VARCHAR(100), -- Which worker claimed it claimed_at TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP, last_heartbeat_at TIMESTAMP, -- For stale detection result JSONB, error_message TEXT, retry_count INTEGER DEFAULT 0, max_retries INTEGER DEFAULT 3 ); ``` --- ## Default Schedules | Name | Role | Interval | Priority | Description | |------|------|----------|----------|-------------| | `payload_fetch_all` | payload_fetch | 4 hours | 0 | Fetch payloads from Dutchie API (chains to product_refresh) | | `store_discovery_dutchie` | store_discovery | 24 hours | 5 | Find new Dutchie stores | | `analytics_refresh` | analytics_refresh | 6 hours | 0 | Refresh MVs | --- ## Task Roles | Role | Description | Creates Tasks For | |------|-------------|-------------------| | `payload_fetch` | **NEW** - Fetch from Dutchie API, save to disk | Each dispensary needing crawl | | `product_refresh` | **CHANGED** - Read local payload, normalize, upsert to DB | Chained from payload_fetch | | `store_discovery` | Find new dispensaries, returns newStoreIds[] | Single task per platform | | `entry_point_discovery` | **DEPRECATED** - Resolve platform IDs | No longer used | | `product_discovery` | Initial product fetch for new stores | Chained from store_discovery | | `analytics_refresh` | Refresh MVs | Single global task | ### Payload/Refresh Separation (2024-12-10) The crawl workflow is now split into two phases: ``` payload_fetch (scheduled every 4h) └─► Hit Dutchie GraphQL API └─► Save raw JSON to /storage/payloads/{year}/{month}/{day}/store_{id}_{ts}.json.gz └─► Record metadata in raw_crawl_payloads table └─► Queue product_refresh task with payload_id product_refresh (chained from payload_fetch) └─► Load payload from filesystem (NOT from API) └─► Normalize via DutchieNormalizer └─► Upsert to store_products └─► Create snapshots └─► Track missing products └─► Download images ``` **Benefits:** - **Retry-friendly**: If normalize fails, re-run product_refresh without re-crawling - **Replay-able**: Run product_refresh against any historical payload - **Faster refreshes**: Local file read vs network call - **Historical diffs**: Compare payloads to see what changed between crawls - **Less API pressure**: Only payload_fetch hits Dutchie --- ## Task Chaining Tasks automatically queue follow-up tasks upon successful completion. This creates two main flows: ### Discovery Flow (New Stores) When `store_discovery` finds new dispensaries, they automatically get their initial product data: ``` store_discovery └─► Discovers new locations via Dutchie GraphQL └─► Auto-promotes valid locations to dispensaries table └─► Collects newDispensaryIds[] from promotions └─► Returns { newStoreIds: [...] } in result chainNextTask() detects newStoreIds └─► Creates product_discovery task for each new store product_discovery └─► Calls handlePayloadFetch() internally └─► payload_fetch hits Dutchie API └─► Saves raw JSON to /storage/payloads/ └─► Queues product_refresh task with payload_id product_refresh └─► Loads payload from filesystem └─► Normalizes and upserts to store_products └─► Creates snapshots, downloads images ``` **Complete Discovery Chain:** ``` store_discovery → product_discovery → payload_fetch → product_refresh (internal call) (queues next) ``` ### Scheduled Flow (Existing Stores) For existing stores, `payload_fetch_all` schedule runs every 4 hours: ``` TaskScheduler (every 60s) └─► Checks task_schedules for due schedules └─► payload_fetch_all is due └─► Generates payload_fetch task for each dispensary payload_fetch └─► Hits Dutchie GraphQL API └─► Saves raw JSON to /storage/payloads/ └─► Queues product_refresh task with payload_id product_refresh └─► Loads payload from filesystem (NOT API) └─► Normalizes via DutchieNormalizer └─► Upserts to store_products └─► Creates snapshots ``` **Complete Scheduled Chain:** ``` payload_fetch → product_refresh (queues) (reads local) ``` ### Chaining Implementation Task chaining is handled in two places: 1. **Internal chaining (handler calls handler):** - `product_discovery` calls `handlePayloadFetch()` directly 2. **External chaining (chainNextTask() in task-service.ts):** - Called after task completion - `store_discovery` → queues `product_discovery` for each newStoreId 3. **Queue-based chaining (taskService.createTask):** - `payload_fetch` queues `product_refresh` with `payload: { payload_id }` --- ## Payload API Endpoints Raw crawl payloads can be accessed via the Payloads API: | Endpoint | Method | Description | |----------|--------|-------------| | `GET /api/payloads` | GET | List payload metadata (paginated) | | `GET /api/payloads/:id` | GET | Get payload metadata by ID | | `GET /api/payloads/:id/data` | GET | Get full payload JSON (decompressed) | | `GET /api/payloads/store/:dispensaryId` | GET | List payloads for a store | | `GET /api/payloads/store/:dispensaryId/latest` | GET | Get latest payload for a store | | `GET /api/payloads/store/:dispensaryId/diff` | GET | Diff two payloads for changes | ### Payload Diff Response The diff endpoint returns: ```json { "success": true, "from": { "id": 123, "fetchedAt": "...", "productCount": 100 }, "to": { "id": 456, "fetchedAt": "...", "productCount": 105 }, "diff": { "added": 10, "removed": 5, "priceChanges": 8, "stockChanges": 12 }, "details": { "added": [...], "removed": [...], "priceChanges": [...], "stockChanges": [...] } } ``` --- ## API Endpoints ### Schedules (NEW) | Endpoint | Method | Description | |----------|--------|-------------| | `GET /api/schedules` | GET | List all schedules | | `PUT /api/schedules/:id` | PUT | Update schedule | | `POST /api/schedules/:id/trigger` | POST | Run schedule immediately | ### Task Creation (rewired 2024-12-10) | Endpoint | Method | Description | |----------|--------|-------------| | `POST /api/job-queue/enqueue` | POST | Create single task | | `POST /api/job-queue/enqueue-batch` | POST | Create batch tasks | | `POST /api/job-queue/enqueue-state` | POST | Create tasks for state | | `POST /api/tasks` | POST | Direct task creation | ### Task Management | Endpoint | Method | Description | |----------|--------|-------------| | `GET /api/tasks` | GET | List tasks | | `GET /api/tasks/:id` | GET | Get single task | | `GET /api/tasks/counts` | GET | Task counts by status | | `POST /api/tasks/recover-stale` | POST | Recover stale tasks | --- ## Key Files | File | Purpose | |------|---------| | `src/services/task-scheduler.ts` | **NEW** - DB-driven scheduler | | `src/tasks/task-worker.ts` | Worker that processes tasks | | `src/tasks/task-service.ts` | Task CRUD operations | | `src/tasks/handlers/payload-fetch.ts` | **NEW** - Fetches from API, saves to disk | | `src/tasks/handlers/product-refresh.ts` | **CHANGED** - Reads from disk, processes to DB | | `src/utils/payload-storage.ts` | **NEW** - Payload save/load utilities | | `src/routes/tasks.ts` | Task API endpoints | | `src/routes/job-queue.ts` | Job Queue UI endpoints (rewired) | | `migrations/079_task_schedules.sql` | Schedule table | | `migrations/080_raw_crawl_payloads.sql` | Payload metadata table | | `migrations/081_payload_fetch_columns.sql` | payload, last_fetch_at columns | | `migrations/074_worker_task_queue.sql` | Task queue table | --- ## Legacy Code (DEPRECATED) | File | Status | Replacement | |------|--------|-------------| | `src/services/scheduler.ts` | DEPRECATED | `task-scheduler.ts` | | `dispensary_crawl_jobs` table | ORPHANED | `worker_tasks` | | `job_schedules` table | LEGACY | `task_schedules` | --- ## Dashboard Integration Both pages remain wired to the dashboard: | Page | Data Source | Actions | |------|-------------|---------| | **Job Queue** | `worker_tasks`, `task_schedules` | Create tasks, view schedules | | **Task Queue** | `worker_tasks` | View tasks, recover stale | --- ## Multi-Replica Safety The scheduler uses `SELECT FOR UPDATE SKIP LOCKED` to ensure: 1. **Only one replica** executes a schedule at a time 2. **No duplicate tasks** created 3. **Survives pod restarts** - state in DB, not memory 4. **Self-healing** - recovers stale tasks on startup ```sql -- This query is atomic across all API server replicas SELECT * FROM task_schedules WHERE enabled = true AND next_run_at <= NOW() FOR UPDATE SKIP LOCKED ``` --- ## Worker Scaling (K8s) Workers run as a StatefulSet in Kubernetes. You can scale from the admin UI or CLI. ### From Admin UI The Workers page (`/admin/workers`) provides: - Current replica count display - Scale up/down buttons - Target replica input ### API Endpoints | Endpoint | Method | Description | |----------|--------|-------------| | `GET /api/workers/k8s/replicas` | GET | Get current/desired replica counts | | `POST /api/workers/k8s/scale` | POST | Scale to N replicas (body: `{ replicas: N }`) | ### From CLI ```bash # View current replicas kubectl get statefulset scraper-worker -n dispensary-scraper # Scale to 10 workers kubectl scale statefulset scraper-worker -n dispensary-scraper --replicas=10 # Scale down to 3 workers kubectl scale statefulset scraper-worker -n dispensary-scraper --replicas=3 ``` ### Configuration Environment variables for the API server: | Variable | Default | Description | |----------|---------|-------------| | `K8S_NAMESPACE` | `dispensary-scraper` | Kubernetes namespace | | `K8S_WORKER_STATEFULSET` | `scraper-worker` | StatefulSet name | ### RBAC Requirements The API server pod needs these K8s permissions: ```yaml apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: worker-scaler namespace: dispensary-scraper rules: - apiGroups: ["apps"] resources: ["statefulsets"] verbs: ["get", "patch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: scraper-worker-scaler namespace: dispensary-scraper subjects: - kind: ServiceAccount name: default namespace: dispensary-scraper roleRef: kind: Role name: worker-scaler apiGroup: rbac.authorization.k8s.io ```