Files
cannaiq/backend/docs/TASK_WORKFLOW_2024-12-10.md
Kelly 4949b22457 feat(tasks): Refactor task workflow with payload/refresh separation
Major changes:
- Split crawl into payload_fetch (API → disk) and product_refresh (disk → DB)
- Add task chaining: store_discovery → product_discovery → payload_fetch → product_refresh
- Add payload storage utilities for gzipped JSON on filesystem
- Add /api/payloads endpoints for payload access and diffing
- Add DB-driven TaskScheduler with schedule persistence
- Track newDispensaryIds through discovery promotion for chaining
- Add stealth improvements: HTTP fingerprinting, proxy rotation enhancements
- Add Workers dashboard K8s scaling controls

New files:
- src/tasks/handlers/payload-fetch.ts - Fetches from API, saves to disk
- src/services/task-scheduler.ts - DB-driven schedule management
- src/utils/payload-storage.ts - Payload save/load utilities
- src/routes/payloads.ts - Payload API endpoints
- src/services/http-fingerprint.ts - Browser fingerprint generation
- docs/TASK_WORKFLOW_2024-12-10.md - Complete workflow documentation

Migrations:
- 078: Proxy consecutive 403 tracking
- 079: task_schedules table
- 080: raw_crawl_payloads table
- 081: payload column and last_fetch_at

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 22:15:35 -07:00

585 lines
31 KiB
Markdown

# Task Workflow Documentation
**Date: 2024-12-10**
This document describes the complete task/job processing architecture after the 2024-12-10 rewrite.
---
## Complete Architecture
```
┌─────────────────────────────────────────────────────────────────────────────────┐
│ KUBERNETES CLUSTER │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ API SERVER POD (scraper) │ │
│ │ │ │
│ │ ┌──────────────────┐ ┌────────────────────────────────────────┐ │ │
│ │ │ Express API │ │ TaskScheduler │ │ │
│ │ │ │ │ (src/services/task-scheduler.ts) │ │ │
│ │ │ /api/job-queue │ │ │ │ │
│ │ │ /api/tasks │ │ • Polls every 60s │ │ │
│ │ │ /api/schedules │ │ • Checks task_schedules table │ │ │
│ │ └────────┬─────────┘ │ • SELECT FOR UPDATE SKIP LOCKED │ │ │
│ │ │ │ • Generates tasks when due │ │ │
│ │ │ └──────────────────┬─────────────────────┘ │ │
│ │ │ │ │ │
│ └────────────┼──────────────────────────────────┼──────────────────────────┘ │
│ │ │ │
│ │ ┌────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ POSTGRESQL DATABASE │ │
│ │ │ │
│ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │
│ │ │ task_schedules │ │ worker_tasks │ │ │
│ │ │ │ │ │ │ │
│ │ │ • product_refresh │───────►│ • pending tasks │ │ │
│ │ │ • store_discovery │ create │ • claimed tasks │ │ │
│ │ │ • analytics_refresh │ tasks │ • running tasks │ │ │
│ │ │ │ │ • completed tasks │ │ │
│ │ │ next_run_at │ │ │ │ │
│ │ │ last_run_at │ │ role, dispensary_id │ │ │
│ │ │ interval_hours │ │ priority, status │ │ │
│ │ └─────────────────────┘ └──────────┬──────────┘ │ │
│ │ │ │ │
│ └─────────────────────────────────────────────┼────────────────────────────┘ │
│ │ │
│ ┌──────────────────────┘ │
│ │ Workers poll for tasks │
│ │ (SELECT FOR UPDATE SKIP LOCKED) │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ WORKER PODS (StatefulSet: scraper-worker) │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Worker 0 │ │ Worker 1 │ │ Worker 2 │ │ Worker N │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ │ task-worker │ │ task-worker │ │ task-worker │ │ task-worker │ │ │
│ │ │ .ts │ │ .ts │ │ .ts │ │ .ts │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────────┘
```
---
## Startup Sequence
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ API SERVER STARTUP │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Express app initializes │
│ │ │
│ ▼ │
│ 2. runAutoMigrations() │
│ • Runs pending migrations (including 079_task_schedules.sql) │
│ │ │
│ ▼ │
│ 3. initializeMinio() / initializeImageStorage() │
│ │ │
│ ▼ │
│ 4. cleanupOrphanedJobs() │
│ │ │
│ ▼ │
│ 5. taskScheduler.start() ◄─── NEW (per TASK_WORKFLOW_2024-12-10.md) │
│ │ │
│ ├── Recover stale tasks (workers that died) │
│ ├── Ensure default schedules exist in task_schedules │
│ ├── Check and run any due schedules immediately │
│ └── Start 60-second poll interval │
│ │ │
│ ▼ │
│ 6. app.listen(PORT) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ WORKER POD STARTUP │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. K8s starts pod from StatefulSet │
│ │ │
│ ▼ │
│ 2. TaskWorker.constructor() │
│ • Create DB pool │
│ • Create CrawlRotator │
│ │ │
│ ▼ │
│ 3. initializeStealth() │
│ • Load proxies from DB (REQUIRED - fails if none) │
│ • Wire rotator to Dutchie client │
│ │ │
│ ▼ │
│ 4. register() with API │
│ • Optional - continues if fails │
│ │ │
│ ▼ │
│ 5. startRegistryHeartbeat() every 30s │
│ │ │
│ ▼ │
│ 6. processNextTask() loop │
│ │ │
│ ├── Poll for pending task (FOR UPDATE SKIP LOCKED) │
│ ├── Claim task atomically │
│ ├── Execute handler (product_refresh, store_discovery, etc.) │
│ ├── Mark complete/failed │
│ ├── Chain next task if applicable │
│ └── Loop │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
---
## Schedule Flow
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ SCHEDULER POLL (every 60 seconds) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ BEGIN TRANSACTION │
│ │ │
│ ▼ │
│ SELECT * FROM task_schedules │
│ WHERE enabled = true AND next_run_at <= NOW() │
│ FOR UPDATE SKIP LOCKED ◄─── Prevents duplicate execution across replicas │
│ │ │
│ ▼ │
│ For each due schedule: │
│ │ │
│ ├── product_refresh_all │
│ │ └─► Query dispensaries needing crawl │
│ │ └─► Create product_refresh tasks in worker_tasks │
│ │ │
│ ├── store_discovery_dutchie │
│ │ └─► Create single store_discovery task │
│ │ │
│ └── analytics_refresh │
│ └─► Create single analytics_refresh task │
│ │ │
│ ▼ │
│ UPDATE task_schedules SET │
│ last_run_at = NOW(), │
│ next_run_at = NOW() + interval_hours │
│ │ │
│ ▼ │
│ COMMIT │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
---
## Task Lifecycle
```
┌──────────┐
│ SCHEDULE │
│ DUE │
└────┬─────┘
┌──────────────┐ claim ┌──────────────┐ start ┌──────────────┐
│ PENDING │────────────►│ CLAIMED │────────────►│ RUNNING │
└──────────────┘ └──────────────┘ └──────┬───────┘
▲ │
│ ┌──────────────┼──────────────┐
│ retry │ │ │
│ (if retries < max) ▼ ▼ ▼
│ ┌──────────┐ ┌──────────┐ ┌──────────┐
└──────────────────────────────────│ FAILED │ │ COMPLETED│ │ STALE │
└──────────┘ └──────────┘ └────┬─────┘
recover_stale_tasks()
┌──────────┐
│ PENDING │
└──────────┘
```
---
## Database Tables
### task_schedules (NEW - migration 079)
Stores schedule definitions. Survives restarts.
```sql
CREATE TABLE task_schedules (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE,
role VARCHAR(50) NOT NULL, -- product_refresh, store_discovery, etc.
enabled BOOLEAN DEFAULT TRUE,
interval_hours INTEGER NOT NULL, -- How often to run
priority INTEGER DEFAULT 0, -- Task priority when created
state_code VARCHAR(2), -- Optional filter
last_run_at TIMESTAMPTZ, -- When it last ran
next_run_at TIMESTAMPTZ, -- When it's due next
last_task_count INTEGER, -- Tasks created last run
last_error TEXT -- Error message if failed
);
```
### worker_tasks (migration 074)
The task queue. Workers pull from here.
```sql
CREATE TABLE worker_tasks (
id SERIAL PRIMARY KEY,
role task_role NOT NULL, -- What type of work
dispensary_id INTEGER, -- Which store (if applicable)
platform VARCHAR(50), -- Which platform
status task_status DEFAULT 'pending',
priority INTEGER DEFAULT 0, -- Higher = process first
scheduled_for TIMESTAMP, -- Don't process before this time
worker_id VARCHAR(100), -- Which worker claimed it
claimed_at TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
last_heartbeat_at TIMESTAMP, -- For stale detection
result JSONB,
error_message TEXT,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3
);
```
---
## Default Schedules
| Name | Role | Interval | Priority | Description |
|------|------|----------|----------|-------------|
| `payload_fetch_all` | payload_fetch | 4 hours | 0 | Fetch payloads from Dutchie API (chains to product_refresh) |
| `store_discovery_dutchie` | store_discovery | 24 hours | 5 | Find new Dutchie stores |
| `analytics_refresh` | analytics_refresh | 6 hours | 0 | Refresh MVs |
---
## Task Roles
| Role | Description | Creates Tasks For |
|------|-------------|-------------------|
| `payload_fetch` | **NEW** - Fetch from Dutchie API, save to disk | Each dispensary needing crawl |
| `product_refresh` | **CHANGED** - Read local payload, normalize, upsert to DB | Chained from payload_fetch |
| `store_discovery` | Find new dispensaries, returns newStoreIds[] | Single task per platform |
| `entry_point_discovery` | **DEPRECATED** - Resolve platform IDs | No longer used |
| `product_discovery` | Initial product fetch for new stores | Chained from store_discovery |
| `analytics_refresh` | Refresh MVs | Single global task |
### Payload/Refresh Separation (2024-12-10)
The crawl workflow is now split into two phases:
```
payload_fetch (scheduled every 4h)
└─► Hit Dutchie GraphQL API
└─► Save raw JSON to /storage/payloads/{year}/{month}/{day}/store_{id}_{ts}.json.gz
└─► Record metadata in raw_crawl_payloads table
└─► Queue product_refresh task with payload_id
product_refresh (chained from payload_fetch)
└─► Load payload from filesystem (NOT from API)
└─► Normalize via DutchieNormalizer
└─► Upsert to store_products
└─► Create snapshots
└─► Track missing products
└─► Download images
```
**Benefits:**
- **Retry-friendly**: If normalize fails, re-run product_refresh without re-crawling
- **Replay-able**: Run product_refresh against any historical payload
- **Faster refreshes**: Local file read vs network call
- **Historical diffs**: Compare payloads to see what changed between crawls
- **Less API pressure**: Only payload_fetch hits Dutchie
---
## Task Chaining
Tasks automatically queue follow-up tasks upon successful completion. This creates two main flows:
### Discovery Flow (New Stores)
When `store_discovery` finds new dispensaries, they automatically get their initial product data:
```
store_discovery
└─► Discovers new locations via Dutchie GraphQL
└─► Auto-promotes valid locations to dispensaries table
└─► Collects newDispensaryIds[] from promotions
└─► Returns { newStoreIds: [...] } in result
chainNextTask() detects newStoreIds
└─► Creates product_discovery task for each new store
product_discovery
└─► Calls handlePayloadFetch() internally
└─► payload_fetch hits Dutchie API
└─► Saves raw JSON to /storage/payloads/
└─► Queues product_refresh task with payload_id
product_refresh
└─► Loads payload from filesystem
└─► Normalizes and upserts to store_products
└─► Creates snapshots, downloads images
```
**Complete Discovery Chain:**
```
store_discovery → product_discovery → payload_fetch → product_refresh
(internal call) (queues next)
```
### Scheduled Flow (Existing Stores)
For existing stores, `payload_fetch_all` schedule runs every 4 hours:
```
TaskScheduler (every 60s)
└─► Checks task_schedules for due schedules
└─► payload_fetch_all is due
└─► Generates payload_fetch task for each dispensary
payload_fetch
└─► Hits Dutchie GraphQL API
└─► Saves raw JSON to /storage/payloads/
└─► Queues product_refresh task with payload_id
product_refresh
└─► Loads payload from filesystem (NOT API)
└─► Normalizes via DutchieNormalizer
└─► Upserts to store_products
└─► Creates snapshots
```
**Complete Scheduled Chain:**
```
payload_fetch → product_refresh
(queues) (reads local)
```
### Chaining Implementation
Task chaining is handled in two places:
1. **Internal chaining (handler calls handler):**
- `product_discovery` calls `handlePayloadFetch()` directly
2. **External chaining (chainNextTask() in task-service.ts):**
- Called after task completion
- `store_discovery` → queues `product_discovery` for each newStoreId
3. **Queue-based chaining (taskService.createTask):**
- `payload_fetch` queues `product_refresh` with `payload: { payload_id }`
---
## Payload API Endpoints
Raw crawl payloads can be accessed via the Payloads API:
| Endpoint | Method | Description |
|----------|--------|-------------|
| `GET /api/payloads` | GET | List payload metadata (paginated) |
| `GET /api/payloads/:id` | GET | Get payload metadata by ID |
| `GET /api/payloads/:id/data` | GET | Get full payload JSON (decompressed) |
| `GET /api/payloads/store/:dispensaryId` | GET | List payloads for a store |
| `GET /api/payloads/store/:dispensaryId/latest` | GET | Get latest payload for a store |
| `GET /api/payloads/store/:dispensaryId/diff` | GET | Diff two payloads for changes |
### Payload Diff Response
The diff endpoint returns:
```json
{
"success": true,
"from": { "id": 123, "fetchedAt": "...", "productCount": 100 },
"to": { "id": 456, "fetchedAt": "...", "productCount": 105 },
"diff": {
"added": 10,
"removed": 5,
"priceChanges": 8,
"stockChanges": 12
},
"details": {
"added": [...],
"removed": [...],
"priceChanges": [...],
"stockChanges": [...]
}
}
```
---
## API Endpoints
### Schedules (NEW)
| Endpoint | Method | Description |
|----------|--------|-------------|
| `GET /api/schedules` | GET | List all schedules |
| `PUT /api/schedules/:id` | PUT | Update schedule |
| `POST /api/schedules/:id/trigger` | POST | Run schedule immediately |
### Task Creation (rewired 2024-12-10)
| Endpoint | Method | Description |
|----------|--------|-------------|
| `POST /api/job-queue/enqueue` | POST | Create single task |
| `POST /api/job-queue/enqueue-batch` | POST | Create batch tasks |
| `POST /api/job-queue/enqueue-state` | POST | Create tasks for state |
| `POST /api/tasks` | POST | Direct task creation |
### Task Management
| Endpoint | Method | Description |
|----------|--------|-------------|
| `GET /api/tasks` | GET | List tasks |
| `GET /api/tasks/:id` | GET | Get single task |
| `GET /api/tasks/counts` | GET | Task counts by status |
| `POST /api/tasks/recover-stale` | POST | Recover stale tasks |
---
## Key Files
| File | Purpose |
|------|---------|
| `src/services/task-scheduler.ts` | **NEW** - DB-driven scheduler |
| `src/tasks/task-worker.ts` | Worker that processes tasks |
| `src/tasks/task-service.ts` | Task CRUD operations |
| `src/tasks/handlers/payload-fetch.ts` | **NEW** - Fetches from API, saves to disk |
| `src/tasks/handlers/product-refresh.ts` | **CHANGED** - Reads from disk, processes to DB |
| `src/utils/payload-storage.ts` | **NEW** - Payload save/load utilities |
| `src/routes/tasks.ts` | Task API endpoints |
| `src/routes/job-queue.ts` | Job Queue UI endpoints (rewired) |
| `migrations/079_task_schedules.sql` | Schedule table |
| `migrations/080_raw_crawl_payloads.sql` | Payload metadata table |
| `migrations/081_payload_fetch_columns.sql` | payload, last_fetch_at columns |
| `migrations/074_worker_task_queue.sql` | Task queue table |
---
## Legacy Code (DEPRECATED)
| File | Status | Replacement |
|------|--------|-------------|
| `src/services/scheduler.ts` | DEPRECATED | `task-scheduler.ts` |
| `dispensary_crawl_jobs` table | ORPHANED | `worker_tasks` |
| `job_schedules` table | LEGACY | `task_schedules` |
---
## Dashboard Integration
Both pages remain wired to the dashboard:
| Page | Data Source | Actions |
|------|-------------|---------|
| **Job Queue** | `worker_tasks`, `task_schedules` | Create tasks, view schedules |
| **Task Queue** | `worker_tasks` | View tasks, recover stale |
---
## Multi-Replica Safety
The scheduler uses `SELECT FOR UPDATE SKIP LOCKED` to ensure:
1. **Only one replica** executes a schedule at a time
2. **No duplicate tasks** created
3. **Survives pod restarts** - state in DB, not memory
4. **Self-healing** - recovers stale tasks on startup
```sql
-- This query is atomic across all API server replicas
SELECT * FROM task_schedules
WHERE enabled = true AND next_run_at <= NOW()
FOR UPDATE SKIP LOCKED
```
---
## Worker Scaling (K8s)
Workers run as a StatefulSet in Kubernetes. You can scale from the admin UI or CLI.
### From Admin UI
The Workers page (`/admin/workers`) provides:
- Current replica count display
- Scale up/down buttons
- Target replica input
### API Endpoints
| Endpoint | Method | Description |
|----------|--------|-------------|
| `GET /api/workers/k8s/replicas` | GET | Get current/desired replica counts |
| `POST /api/workers/k8s/scale` | POST | Scale to N replicas (body: `{ replicas: N }`) |
### From CLI
```bash
# View current replicas
kubectl get statefulset scraper-worker -n dispensary-scraper
# Scale to 10 workers
kubectl scale statefulset scraper-worker -n dispensary-scraper --replicas=10
# Scale down to 3 workers
kubectl scale statefulset scraper-worker -n dispensary-scraper --replicas=3
```
### Configuration
Environment variables for the API server:
| Variable | Default | Description |
|----------|---------|-------------|
| `K8S_NAMESPACE` | `dispensary-scraper` | Kubernetes namespace |
| `K8S_WORKER_STATEFULSET` | `scraper-worker` | StatefulSet name |
### RBAC Requirements
The API server pod needs these K8s permissions:
```yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: worker-scaler
namespace: dispensary-scraper
rules:
- apiGroups: ["apps"]
resources: ["statefulsets"]
verbs: ["get", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: scraper-worker-scaler
namespace: dispensary-scraper
subjects:
- kind: ServiceAccount
name: default
namespace: dispensary-scraper
roleRef:
kind: Role
name: worker-scaler
apiGroup: rbac.authorization.k8s.io
```