Files
cannaiq/docs/WORKER_SYSTEM.md
Kelly 56cc171287 feat: Stealth worker system with mandatory proxy rotation
## Worker System
- Role-agnostic workers that can handle any task type
- Pod-based architecture with StatefulSet (5-15 pods, 5 workers each)
- Custom pod names (Aethelgard, Xylos, Kryll, etc.)
- Worker registry with friendly names and resource monitoring
- Hub-and-spoke visualization on JobQueue page

## Stealth & Anti-Detection (REQUIRED)
- Proxies are MANDATORY - workers fail to start without active proxies
- CrawlRotator initializes on worker startup
- Loads proxies from `proxies` table
- Auto-rotates proxy + fingerprint on 403 errors
- 12 browser fingerprints (Chrome, Firefox, Safari, Edge)
- Locale/timezone matching for geographic consistency

## Task System
- Renamed product_resync → product_refresh
- Task chaining: store_discovery → entry_point → product_discovery
- Priority-based claiming with FOR UPDATE SKIP LOCKED
- Heartbeat and stale task recovery

## UI Updates
- JobQueue: Pod visualization, resource monitoring on hover
- WorkersDashboard: Simplified worker list
- Removed unused filters from task list

## Other
- IP2Location service for visitor analytics
- Findagram consumer features scaffolding
- Documentation updates

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 00:44:59 -07:00

11 KiB

CannaiQ Worker System

Overview

The Worker System is a role-based task queue that processes background jobs. All tasks go into a single pool, and workers claim tasks based on their assigned role.


Design Pattern: Single Pool, Role-Based Claiming

                    ┌─────────────────────────────────────────┐
                    │           TASK POOL (worker_tasks)      │
                    │                                         │
                    │  ┌─────────────────────────────────┐   │
                    │  │ role=store_discovery    pending │   │
                    │  │ role=product_resync     pending │   │
                    │  │ role=product_resync     pending │   │
                    │  │ role=product_resync     pending │   │
                    │  │ role=analytics_refresh  pending │   │
                    │  │ role=entry_point_disc   pending │   │
                    │  └─────────────────────────────────┘   │
                    └─────────────────────────────────────────┘
                                       │
          ┌────────────────────────────┼────────────────────────────┐
          │                            │                            │
          ▼                            ▼                            ▼
┌──────────────────┐      ┌──────────────────┐      ┌──────────────────┐
│ WORKER           │      │ WORKER           │      │ WORKER           │
│ role=product_    │      │ role=product_    │      │ role=store_      │
│      resync      │      │      resync      │      │      discovery   │
│                  │      │                  │      │                  │
│ Claims ONLY      │      │ Claims ONLY      │      │ Claims ONLY      │
│ product_resync   │      │ product_resync   │      │ store_discovery  │
│ tasks            │      │ tasks            │      │ tasks            │
└──────────────────┘      └──────────────────┘      └──────────────────┘

Key Points:

  • All tasks go into ONE table (worker_tasks)
  • Each worker is assigned ONE role at startup
  • Workers only claim tasks matching their role
  • Multiple workers can share the same role (horizontal scaling)

Worker Roles

Role Purpose Per-Store? Schedule
store_discovery Find new dispensaries via GraphQL No Weekly
entry_point_discovery Resolve platform IDs from menu URLs Yes On-demand
product_discovery Initial product fetch for new stores Yes On-demand
product_resync Regular price/stock updates Yes Every 4 hours
analytics_refresh Refresh materialized views No Daily

Task Lifecycle

pending → claimed → running → completed
                        ↓
                      failed
                        ↓
                      (retry if < max_retries)
Status Meaning
pending Waiting to be claimed
claimed Worker has claimed, not yet started
running Worker is actively processing
completed Successfully finished
failed Error occurred
stale Worker died (heartbeat timeout)

Task Chaining

Tasks automatically create follow-up tasks:

store_discovery (finds new stores)
       │
       ├─ Returns newStoreIds[] in result
       ▼
entry_point_discovery (for each new store)
       │
       ├─ Resolves platform_dispensary_id
       ▼
product_discovery (initial crawl)
       │
       ▼
(store enters regular schedule)
       │
       ▼
product_resync (every 4 hours)

How Claiming Works

1. Worker starts with a role

WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts

2. Worker loop polls for tasks

// Simplified worker loop
while (running) {
  const task = await claimTask(this.role, this.workerId);

  if (!task) {
    await sleep(5000);  // No tasks, wait 5 seconds
    continue;
  }

  await processTask(task);
}

3. SQL function claims atomically

-- claim_task(role, worker_id)
UPDATE worker_tasks
SET status = 'claimed', worker_id = $2, claimed_at = NOW()
WHERE id = (
  SELECT id FROM worker_tasks
  WHERE role = $1                              -- Filter by worker's role
    AND status = 'pending'
    AND (scheduled_for IS NULL OR scheduled_for <= NOW())
    AND dispensary_id NOT IN (                 -- Per-store locking
      SELECT dispensary_id FROM worker_tasks
      WHERE status IN ('claimed', 'running')
    )
  ORDER BY priority DESC, created_at ASC       -- Priority ordering
  LIMIT 1
  FOR UPDATE SKIP LOCKED                       -- Atomic, no race conditions
)
RETURNING *;

Key Features:

  • FOR UPDATE SKIP LOCKED - Prevents race conditions between workers
  • Role filtering - Worker only sees tasks for its role
  • Per-store locking - Only one active task per dispensary
  • Priority ordering - Higher priority tasks first
  • Scheduled tasks - Respects scheduled_for timestamp

Heartbeat & Stale Recovery

Workers send heartbeats every 30 seconds while processing:

// During task processing
setInterval(() => {
  await pool.query(
    'UPDATE worker_tasks SET last_heartbeat_at = NOW() WHERE id = $1',
    [taskId]
  );
}, 30000);

If a worker dies, its tasks are recovered:

-- recover_stale_tasks(threshold_minutes)
UPDATE worker_tasks
SET status = 'pending', worker_id = NULL, retry_count = retry_count + 1
WHERE status IN ('claimed', 'running')
  AND last_heartbeat_at < NOW() - INTERVAL '10 minutes'
  AND retry_count < max_retries;

Scheduling

Daily Resync Generation

SELECT generate_resync_tasks(6, CURRENT_DATE);  -- 6 batches = every 4 hours

Creates staggered tasks:

Batch Time Stores
1 00:00 1-50
2 04:00 51-100
3 08:00 101-150
4 12:00 151-200
5 16:00 201-250
6 20:00 251-300

Files

Core

File Purpose
src/tasks/task-service.ts Task CRUD, claiming, capacity metrics
src/tasks/task-worker.ts Worker loop, heartbeat, handler dispatch
src/routes/tasks.ts REST API endpoints
migrations/074_worker_task_queue.sql Database schema + SQL functions

Handlers

File Role
src/tasks/handlers/store-discovery.ts store_discovery
src/tasks/handlers/entry-point-discovery.ts entry_point_discovery
src/tasks/handlers/product-discovery.ts product_discovery
src/tasks/handlers/product-resync.ts product_resync
src/tasks/handlers/analytics-refresh.ts analytics_refresh

Running Workers

Local Development

# Start a single worker
WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts

# Start multiple workers (different terminals)
WORKER_ROLE=product_resync WORKER_ID=resync-1 npx tsx src/tasks/task-worker.ts
WORKER_ROLE=product_resync WORKER_ID=resync-2 npx tsx src/tasks/task-worker.ts
WORKER_ROLE=store_discovery npx tsx src/tasks/task-worker.ts

Environment Variables

Variable Default Description
WORKER_ROLE (required) Which task role to process
WORKER_ID auto-generated Custom worker identifier
POLL_INTERVAL_MS 5000 How often to check for tasks
HEARTBEAT_INTERVAL_MS 30000 How often to update heartbeat

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: task-worker-resync
spec:
  replicas: 5  # Scale horizontally
  template:
    spec:
      containers:
      - name: worker
        image: code.cannabrands.app/creationshop/dispensary-scraper:latest
        command: ["npx", "tsx", "src/tasks/task-worker.ts"]
        env:
        - name: WORKER_ROLE
          value: "product_resync"

API Endpoints

Task Management

Method Endpoint Description
GET /api/tasks List tasks (with filters)
POST /api/tasks Create a task
GET /api/tasks/:id Get task by ID
GET /api/tasks/counts Counts by status
GET /api/tasks/capacity Capacity metrics
POST /api/tasks/recover-stale Recover dead worker tasks

Task Generation

Method Endpoint Description
POST /api/tasks/generate/resync Generate daily resync batch
POST /api/tasks/generate/discovery Create store discovery task

Capacity Planning

The v_worker_capacity view provides metrics:

SELECT * FROM v_worker_capacity;
Metric Description
pending_tasks Tasks waiting
ready_tasks Tasks ready now (scheduled_for passed)
running_tasks Tasks being processed
active_workers Workers with recent heartbeat
tasks_per_worker_hour Throughput estimate
estimated_hours_to_drain Time to clear queue

Scaling API

GET /api/tasks/capacity/product_resync
{
  "pending_tasks": 500,
  "active_workers": 3,
  "workers_needed": {
    "for_1_hour": 10,
    "for_4_hours": 3,
    "for_8_hours": 2
  }
}

Database Schema

worker_tasks

CREATE TABLE worker_tasks (
  id SERIAL PRIMARY KEY,

  -- Task identification
  role VARCHAR(50) NOT NULL,
  dispensary_id INTEGER REFERENCES dispensaries(id),
  platform VARCHAR(20),

  -- State
  status VARCHAR(20) DEFAULT 'pending',
  priority INTEGER DEFAULT 0,
  scheduled_for TIMESTAMPTZ,

  -- Ownership
  worker_id VARCHAR(100),
  claimed_at TIMESTAMPTZ,
  started_at TIMESTAMPTZ,
  completed_at TIMESTAMPTZ,
  last_heartbeat_at TIMESTAMPTZ,

  -- Results
  result JSONB,
  error_message TEXT,
  retry_count INTEGER DEFAULT 0,
  max_retries INTEGER DEFAULT 3,

  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

Key Indexes

-- Fast claiming by role
CREATE INDEX idx_worker_tasks_pending
  ON worker_tasks(role, priority DESC, created_at ASC)
  WHERE status = 'pending';

-- Prevent duplicate active tasks per store
CREATE UNIQUE INDEX idx_worker_tasks_unique_active_store
  ON worker_tasks(dispensary_id)
  WHERE status IN ('claimed', 'running') AND dispensary_id IS NOT NULL;

Monitoring

Logs

[TaskWorker] Starting worker worker-product_resync-a1b2c3d4 for role: product_resync
[TaskWorker] Claimed task 123 (product_resync) for dispensary 456
[TaskWorker] Task 123 completed successfully

Health Check

-- Active workers
SELECT worker_id, role, COUNT(*), MAX(last_heartbeat_at)
FROM worker_tasks
WHERE last_heartbeat_at > NOW() - INTERVAL '5 minutes'
GROUP BY worker_id, role;

-- Task counts by role/status
SELECT role, status, COUNT(*)
FROM worker_tasks
GROUP BY role, status;