Files
cannaiq/backend/docs/_archive/TASK_WORKFLOW_2024-12-10.md
Kelly a35976b9e9 chore: Clean up deprecated code and docs
- Move deprecated directories to src/_deprecated/:
  - hydration/ (old pipeline approach)
  - scraper-v2/ (old Puppeteer scraper)
  - canonical-hydration/ (merged into tasks)
  - Unused services: availability, crawler-logger, geolocation, etc
  - Unused utils: age-gate-playwright, HomepageValidator, stealthBrowser

- Archive outdated docs to docs/_archive/:
  - ANALYTICS_RUNBOOK.md
  - ANALYTICS_V2_EXAMPLES.md
  - BRAND_INTELLIGENCE_API.md
  - CRAWL_PIPELINE.md
  - TASK_WORKFLOW_2024-12-10.md
  - WORKER_TASK_ARCHITECTURE.md
  - ORGANIC_SCRAPING_GUIDE.md

- Add docs/CODEBASE_MAP.md as single source of truth
- Add warning files to deprecated/archived directories
- Slim down CLAUDE.md to essential rules only

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 22:17:40 -07:00

31 KiB

Task Workflow Documentation

Date: 2024-12-10

This document describes the complete task/job processing architecture after the 2024-12-10 rewrite.


Complete Architecture

┌─────────────────────────────────────────────────────────────────────────────────┐
│                              KUBERNETES CLUSTER                                  │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                  │
│  ┌─────────────────────────────────────────────────────────────────────────┐    │
│  │                         API SERVER POD (scraper)                         │    │
│  │                                                                          │    │
│  │   ┌──────────────────┐     ┌────────────────────────────────────────┐   │    │
│  │   │   Express API    │     │         TaskScheduler                   │   │    │
│  │   │                  │     │   (src/services/task-scheduler.ts)      │   │    │
│  │   │  /api/job-queue  │     │                                         │   │    │
│  │   │  /api/tasks      │     │   • Polls every 60s                     │   │    │
│  │   │  /api/schedules  │     │   • Checks task_schedules table         │   │    │
│  │   └────────┬─────────┘     │   • SELECT FOR UPDATE SKIP LOCKED       │   │    │
│  │            │               │   • Generates tasks when due            │   │    │
│  │            │               └──────────────────┬─────────────────────┘   │    │
│  │            │                                  │                          │    │
│  └────────────┼──────────────────────────────────┼──────────────────────────┘    │
│               │                                  │                               │
│               │         ┌────────────────────────┘                               │
│               │         │                                                        │
│               ▼         ▼                                                        │
│  ┌─────────────────────────────────────────────────────────────────────────┐    │
│  │                          POSTGRESQL DATABASE                             │    │
│  │                                                                          │    │
│  │   ┌─────────────────────┐        ┌─────────────────────┐                │    │
│  │   │   task_schedules    │        │    worker_tasks     │                │    │
│  │   │                     │        │                     │                │    │
│  │   │ • product_refresh   │───────►│ • pending tasks     │                │    │
│  │   │ • store_discovery   │ create │ • claimed tasks     │                │    │
│  │   │ • analytics_refresh │ tasks  │ • running tasks     │                │    │
│  │   │                     │        │ • completed tasks   │                │    │
│  │   │ next_run_at         │        │                     │                │    │
│  │   │ last_run_at         │        │ role, dispensary_id │                │    │
│  │   │ interval_hours      │        │ priority, status    │                │    │
│  │   └─────────────────────┘        └──────────┬──────────┘                │    │
│  │                                             │                            │    │
│  └─────────────────────────────────────────────┼────────────────────────────┘    │
│                                                │                                  │
│                         ┌──────────────────────┘                                  │
│                         │ Workers poll for tasks                                  │
│                         │ (SELECT FOR UPDATE SKIP LOCKED)                         │
│                         ▼                                                         │
│  ┌─────────────────────────────────────────────────────────────────────────┐    │
│  │                    WORKER PODS (StatefulSet: scraper-worker)             │    │
│  │                                                                          │    │
│  │   ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐    │    │
│  │   │  Worker 0   │  │  Worker 1   │  │  Worker 2   │  │  Worker N   │    │    │
│  │   │             │  │             │  │             │  │             │    │    │
│  │   │ task-worker │  │ task-worker │  │ task-worker │  │ task-worker │    │    │
│  │   │     .ts     │  │     .ts     │  │     .ts     │  │     .ts     │    │    │
│  │   └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘    │    │
│  │                                                                          │    │
│  └──────────────────────────────────────────────────────────────────────────┘    │
│                                                                                  │
└──────────────────────────────────────────────────────────────────────────────────┘

Startup Sequence

┌─────────────────────────────────────────────────────────────────────────────┐
│                        API SERVER STARTUP                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   1. Express app initializes                                                 │
│                    │                                                         │
│                    ▼                                                         │
│   2. runAutoMigrations()                                                     │
│      • Runs pending migrations (including 079_task_schedules.sql)           │
│                    │                                                         │
│                    ▼                                                         │
│   3. initializeMinio() / initializeImageStorage()                           │
│                    │                                                         │
│                    ▼                                                         │
│   4. cleanupOrphanedJobs()                                                   │
│                    │                                                         │
│                    ▼                                                         │
│   5. taskScheduler.start()  ◄─── NEW (per TASK_WORKFLOW_2024-12-10.md)      │
│      │                                                                       │
│      ├── Recover stale tasks (workers that died)                            │
│      ├── Ensure default schedules exist in task_schedules                   │
│      ├── Check and run any due schedules immediately                        │
│      └── Start 60-second poll interval                                      │
│                    │                                                         │
│                    ▼                                                         │
│   6. app.listen(PORT)                                                        │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────────┐
│                        WORKER POD STARTUP                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   1. K8s starts pod from StatefulSet                                        │
│                    │                                                         │
│                    ▼                                                         │
│   2. TaskWorker.constructor()                                               │
│      • Create DB pool                                                        │
│      • Create CrawlRotator                                                   │
│                    │                                                         │
│                    ▼                                                         │
│   3. initializeStealth()                                                    │
│      • Load proxies from DB (REQUIRED - fails if none)                      │
│      • Wire rotator to Dutchie client                                       │
│                    │                                                         │
│                    ▼                                                         │
│   4. register() with API                                                    │
│      • Optional - continues if fails                                         │
│                    │                                                         │
│                    ▼                                                         │
│   5. startRegistryHeartbeat() every 30s                                     │
│                    │                                                         │
│                    ▼                                                         │
│   6. processNextTask() loop                                                 │
│      │                                                                       │
│      ├── Poll for pending task (FOR UPDATE SKIP LOCKED)                     │
│      ├── Claim task atomically                                              │
│      ├── Execute handler (product_refresh, store_discovery, etc.)           │
│      ├── Mark complete/failed                                               │
│      ├── Chain next task if applicable                                      │
│      └── Loop                                                               │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Schedule Flow

┌─────────────────────────────────────────────────────────────────────────────┐
│                     SCHEDULER POLL (every 60 seconds)                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   BEGIN TRANSACTION                                                          │
│         │                                                                    │
│         ▼                                                                    │
│   SELECT * FROM task_schedules                                              │
│   WHERE enabled = true AND next_run_at <= NOW()                             │
│   FOR UPDATE SKIP LOCKED  ◄─── Prevents duplicate execution across replicas │
│         │                                                                    │
│         ▼                                                                    │
│   For each due schedule:                                                     │
│         │                                                                    │
│         ├── product_refresh_all                                             │
│         │   └─► Query dispensaries needing crawl                            │
│         │   └─► Create product_refresh tasks in worker_tasks                │
│         │                                                                    │
│         ├── store_discovery_dutchie                                         │
│         │   └─► Create single store_discovery task                          │
│         │                                                                    │
│         └── analytics_refresh                                                │
│             └─► Create single analytics_refresh task                        │
│         │                                                                    │
│         ▼                                                                    │
│   UPDATE task_schedules SET                                                  │
│     last_run_at = NOW(),                                                     │
│     next_run_at = NOW() + interval_hours                                    │
│         │                                                                    │
│         ▼                                                                    │
│   COMMIT                                                                     │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Task Lifecycle

                                    ┌──────────┐
                                    │ SCHEDULE │
                                    │   DUE    │
                                    └────┬─────┘
                                         │
                                         ▼
┌──────────────┐    claim    ┌──────────────┐    start    ┌──────────────┐
│   PENDING    │────────────►│   CLAIMED    │────────────►│   RUNNING    │
└──────────────┘             └──────────────┘             └──────┬───────┘
       ▲                                                        │
       │                                         ┌──────────────┼──────────────┐
       │ retry                                   │              │              │
       │ (if retries < max)                      ▼              ▼              ▼
       │                                  ┌──────────┐   ┌──────────┐   ┌──────────┐
       └──────────────────────────────────│  FAILED  │   │ COMPLETED│   │  STALE   │
                                          └──────────┘   └──────────┘   └────┬─────┘
                                                                              │
                                                              recover_stale_tasks()
                                                                              │
                                                                              ▼
                                                                        ┌──────────┐
                                                                        │ PENDING  │
                                                                        └──────────┘

Database Tables

task_schedules (NEW - migration 079)

Stores schedule definitions. Survives restarts.

CREATE TABLE task_schedules (
  id SERIAL PRIMARY KEY,
  name VARCHAR(100) NOT NULL UNIQUE,
  role VARCHAR(50) NOT NULL,        -- product_refresh, store_discovery, etc.
  enabled BOOLEAN DEFAULT TRUE,
  interval_hours INTEGER NOT NULL,  -- How often to run
  priority INTEGER DEFAULT 0,       -- Task priority when created
  state_code VARCHAR(2),            -- Optional filter
  last_run_at TIMESTAMPTZ,          -- When it last ran
  next_run_at TIMESTAMPTZ,          -- When it's due next
  last_task_count INTEGER,          -- Tasks created last run
  last_error TEXT                   -- Error message if failed
);

worker_tasks (migration 074)

The task queue. Workers pull from here.

CREATE TABLE worker_tasks (
  id SERIAL PRIMARY KEY,
  role task_role NOT NULL,          -- What type of work
  dispensary_id INTEGER,            -- Which store (if applicable)
  platform VARCHAR(50),             -- Which platform
  status task_status DEFAULT 'pending',
  priority INTEGER DEFAULT 0,       -- Higher = process first
  scheduled_for TIMESTAMP,          -- Don't process before this time
  worker_id VARCHAR(100),           -- Which worker claimed it
  claimed_at TIMESTAMP,
  started_at TIMESTAMP,
  completed_at TIMESTAMP,
  last_heartbeat_at TIMESTAMP,      -- For stale detection
  result JSONB,
  error_message TEXT,
  retry_count INTEGER DEFAULT 0,
  max_retries INTEGER DEFAULT 3
);

Default Schedules

Name Role Interval Priority Description
payload_fetch_all payload_fetch 4 hours 0 Fetch payloads from Dutchie API (chains to product_refresh)
store_discovery_dutchie store_discovery 24 hours 5 Find new Dutchie stores
analytics_refresh analytics_refresh 6 hours 0 Refresh MVs

Task Roles

Role Description Creates Tasks For
payload_fetch NEW - Fetch from Dutchie API, save to disk Each dispensary needing crawl
product_refresh CHANGED - Read local payload, normalize, upsert to DB Chained from payload_fetch
store_discovery Find new dispensaries, returns newStoreIds[] Single task per platform
entry_point_discovery DEPRECATED - Resolve platform IDs No longer used
product_discovery Initial product fetch for new stores Chained from store_discovery
analytics_refresh Refresh MVs Single global task

Payload/Refresh Separation (2024-12-10)

The crawl workflow is now split into two phases:

payload_fetch (scheduled every 4h)
  └─► Hit Dutchie GraphQL API
  └─► Save raw JSON to /storage/payloads/{year}/{month}/{day}/store_{id}_{ts}.json.gz
  └─► Record metadata in raw_crawl_payloads table
  └─► Queue product_refresh task with payload_id

product_refresh (chained from payload_fetch)
  └─► Load payload from filesystem (NOT from API)
  └─► Normalize via DutchieNormalizer
  └─► Upsert to store_products
  └─► Create snapshots
  └─► Track missing products
  └─► Download images

Benefits:

  • Retry-friendly: If normalize fails, re-run product_refresh without re-crawling
  • Replay-able: Run product_refresh against any historical payload
  • Faster refreshes: Local file read vs network call
  • Historical diffs: Compare payloads to see what changed between crawls
  • Less API pressure: Only payload_fetch hits Dutchie

Task Chaining

Tasks automatically queue follow-up tasks upon successful completion. This creates two main flows:

Discovery Flow (New Stores)

When store_discovery finds new dispensaries, they automatically get their initial product data:

store_discovery
  └─► Discovers new locations via Dutchie GraphQL
  └─► Auto-promotes valid locations to dispensaries table
  └─► Collects newDispensaryIds[] from promotions
  └─► Returns { newStoreIds: [...] } in result

chainNextTask() detects newStoreIds
  └─► Creates product_discovery task for each new store

product_discovery
  └─► Calls handlePayloadFetch() internally
  └─► payload_fetch hits Dutchie API
  └─► Saves raw JSON to /storage/payloads/
  └─► Queues product_refresh task with payload_id

product_refresh
  └─► Loads payload from filesystem
  └─► Normalizes and upserts to store_products
  └─► Creates snapshots, downloads images

Complete Discovery Chain:

store_discovery → product_discovery → payload_fetch → product_refresh
                        (internal call)    (queues next)

Scheduled Flow (Existing Stores)

For existing stores, payload_fetch_all schedule runs every 4 hours:

TaskScheduler (every 60s)
  └─► Checks task_schedules for due schedules
  └─► payload_fetch_all is due
  └─► Generates payload_fetch task for each dispensary

payload_fetch
  └─► Hits Dutchie GraphQL API
  └─► Saves raw JSON to /storage/payloads/
  └─► Queues product_refresh task with payload_id

product_refresh
  └─► Loads payload from filesystem (NOT API)
  └─► Normalizes via DutchieNormalizer
  └─► Upserts to store_products
  └─► Creates snapshots

Complete Scheduled Chain:

payload_fetch → product_refresh
  (queues)        (reads local)

Chaining Implementation

Task chaining is handled in two places:

  1. Internal chaining (handler calls handler):

    • product_discovery calls handlePayloadFetch() directly
  2. External chaining (chainNextTask() in task-service.ts):

    • Called after task completion
    • store_discovery → queues product_discovery for each newStoreId
  3. Queue-based chaining (taskService.createTask):

    • payload_fetch queues product_refresh with payload: { payload_id }

Payload API Endpoints

Raw crawl payloads can be accessed via the Payloads API:

Endpoint Method Description
GET /api/payloads GET List payload metadata (paginated)
GET /api/payloads/:id GET Get payload metadata by ID
GET /api/payloads/:id/data GET Get full payload JSON (decompressed)
GET /api/payloads/store/:dispensaryId GET List payloads for a store
GET /api/payloads/store/:dispensaryId/latest GET Get latest payload for a store
GET /api/payloads/store/:dispensaryId/diff GET Diff two payloads for changes

Payload Diff Response

The diff endpoint returns:

{
  "success": true,
  "from": { "id": 123, "fetchedAt": "...", "productCount": 100 },
  "to": { "id": 456, "fetchedAt": "...", "productCount": 105 },
  "diff": {
    "added": 10,
    "removed": 5,
    "priceChanges": 8,
    "stockChanges": 12
  },
  "details": {
    "added": [...],
    "removed": [...],
    "priceChanges": [...],
    "stockChanges": [...]
  }
}

API Endpoints

Schedules (NEW)

Endpoint Method Description
GET /api/schedules GET List all schedules
PUT /api/schedules/:id PUT Update schedule
POST /api/schedules/:id/trigger POST Run schedule immediately

Task Creation (rewired 2024-12-10)

Endpoint Method Description
POST /api/job-queue/enqueue POST Create single task
POST /api/job-queue/enqueue-batch POST Create batch tasks
POST /api/job-queue/enqueue-state POST Create tasks for state
POST /api/tasks POST Direct task creation

Task Management

Endpoint Method Description
GET /api/tasks GET List tasks
GET /api/tasks/:id GET Get single task
GET /api/tasks/counts GET Task counts by status
POST /api/tasks/recover-stale POST Recover stale tasks

Key Files

File Purpose
src/services/task-scheduler.ts NEW - DB-driven scheduler
src/tasks/task-worker.ts Worker that processes tasks
src/tasks/task-service.ts Task CRUD operations
src/tasks/handlers/payload-fetch.ts NEW - Fetches from API, saves to disk
src/tasks/handlers/product-refresh.ts CHANGED - Reads from disk, processes to DB
src/utils/payload-storage.ts NEW - Payload save/load utilities
src/routes/tasks.ts Task API endpoints
src/routes/job-queue.ts Job Queue UI endpoints (rewired)
migrations/079_task_schedules.sql Schedule table
migrations/080_raw_crawl_payloads.sql Payload metadata table
migrations/081_payload_fetch_columns.sql payload, last_fetch_at columns
migrations/074_worker_task_queue.sql Task queue table

Legacy Code (DEPRECATED)

File Status Replacement
src/services/scheduler.ts DEPRECATED task-scheduler.ts
dispensary_crawl_jobs table ORPHANED worker_tasks
job_schedules table LEGACY task_schedules

Dashboard Integration

Both pages remain wired to the dashboard:

Page Data Source Actions
Job Queue worker_tasks, task_schedules Create tasks, view schedules
Task Queue worker_tasks View tasks, recover stale

Multi-Replica Safety

The scheduler uses SELECT FOR UPDATE SKIP LOCKED to ensure:

  1. Only one replica executes a schedule at a time
  2. No duplicate tasks created
  3. Survives pod restarts - state in DB, not memory
  4. Self-healing - recovers stale tasks on startup
-- This query is atomic across all API server replicas
SELECT * FROM task_schedules
WHERE enabled = true AND next_run_at <= NOW()
FOR UPDATE SKIP LOCKED

Worker Scaling (K8s)

Workers run as a StatefulSet in Kubernetes. You can scale from the admin UI or CLI.

From Admin UI

The Workers page (/admin/workers) provides:

  • Current replica count display
  • Scale up/down buttons
  • Target replica input

API Endpoints

Endpoint Method Description
GET /api/workers/k8s/replicas GET Get current/desired replica counts
POST /api/workers/k8s/scale POST Scale to N replicas (body: { replicas: N })

From CLI

# View current replicas
kubectl get statefulset scraper-worker -n dispensary-scraper

# Scale to 10 workers
kubectl scale statefulset scraper-worker -n dispensary-scraper --replicas=10

# Scale down to 3 workers
kubectl scale statefulset scraper-worker -n dispensary-scraper --replicas=3

Configuration

Environment variables for the API server:

Variable Default Description
K8S_NAMESPACE dispensary-scraper Kubernetes namespace
K8S_WORKER_STATEFULSET scraper-worker StatefulSet name

RBAC Requirements

The API server pod needs these K8s permissions:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: worker-scaler
  namespace: dispensary-scraper
rules:
- apiGroups: ["apps"]
  resources: ["statefulsets"]
  verbs: ["get", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: scraper-worker-scaler
  namespace: dispensary-scraper
subjects:
- kind: ServiceAccount
  name: default
  namespace: dispensary-scraper
roleRef:
  kind: Role
  name: worker-scaler
  apiGroup: rbac.authorization.k8s.io