Files
cannaiq/backend/docs/_archive/WORKER_TASK_ARCHITECTURE.md
Kelly 63023a4061 feat: Worker improvements and Run Now duplicate prevention
- Fix Run Now to prevent duplicate task creation
- Add loading state to Run Now button in UI
- Return early when no stores need refresh
- Worker dashboard improvements
- Browser pooling architecture updates
- K8s worker config updates (8 replicas, 3 concurrent tasks)
2025-12-12 20:11:31 -07:00

21 KiB
Raw Blame History

Worker Task Architecture

This document describes the unified task-based worker system that replaces the legacy fragmented job systems.

Overview

The task worker architecture provides a single, unified system for managing all background work in CannaiQ:

  • Store discovery - Find new dispensaries on platforms
  • Entry point discovery - Resolve platform IDs from menu URLs
  • Product discovery - Initial product fetch for new stores
  • Product resync - Regular price/stock updates for existing stores
  • Analytics refresh - Refresh materialized views and analytics

Architecture

Database Tables

worker_tasks - Central task queue

CREATE TABLE worker_tasks (
  id SERIAL PRIMARY KEY,
  role task_role NOT NULL,           -- What type of work
  dispensary_id INTEGER,              -- Which store (if applicable)
  platform VARCHAR(50),               -- Which platform (dutchie, etc.)
  status task_status DEFAULT 'pending',
  priority INTEGER DEFAULT 0,         -- Higher = process first
  scheduled_for TIMESTAMP,            -- Don't process before this time
  worker_id VARCHAR(100),             -- Which worker claimed it
  claimed_at TIMESTAMP,
  started_at TIMESTAMP,
  completed_at TIMESTAMP,
  last_heartbeat_at TIMESTAMP,        -- For stale detection
  result JSONB,                       -- Output from handler
  error_message TEXT,
  retry_count INTEGER DEFAULT 0,
  max_retries INTEGER DEFAULT 3,
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP DEFAULT NOW()
);

Key indexes:

  • idx_worker_tasks_pending_priority - For efficient task claiming
  • idx_worker_tasks_active_dispensary - Prevents concurrent tasks per store (partial unique index)

Task Roles

Role Purpose Per-Store Scheduled
store_discovery Find new stores on a platform No Daily
entry_point_discovery Resolve platform IDs Yes On-demand
product_discovery Initial product fetch Yes After entry_point
product_resync Price/stock updates Yes Every 4 hours
analytics_refresh Refresh MVs No Daily

Task Lifecycle

pending → claimed → running → completed
                  ↓
                failed
  1. pending - Task is waiting to be picked up
  2. claimed - Worker has claimed it (atomic via SELECT FOR UPDATE SKIP LOCKED)
  3. running - Worker is actively processing
  4. completed - Task finished successfully
  5. failed - Task encountered an error
  6. stale - Task lost its worker (recovered automatically)

Files

Core Files

File Purpose
src/tasks/task-service.ts TaskService - CRUD, claiming, capacity metrics
src/tasks/task-worker.ts TaskWorker - Main worker loop
src/tasks/index.ts Module exports
src/routes/tasks.ts API endpoints
migrations/074_worker_task_queue.sql Database schema

Task Handlers

File Role
src/tasks/handlers/store-discovery.ts store_discovery
src/tasks/handlers/entry-point-discovery.ts entry_point_discovery
src/tasks/handlers/product-discovery.ts product_discovery
src/tasks/handlers/product-resync.ts product_resync
src/tasks/handlers/analytics-refresh.ts analytics_refresh

Running Workers

Environment Variables

Variable Default Description
WORKER_ROLE (required) Which task role to process
WORKER_ID auto-generated Custom worker identifier
POLL_INTERVAL_MS 5000 How often to check for tasks
HEARTBEAT_INTERVAL_MS 30000 How often to update heartbeat

Starting a Worker

# Start a product resync worker
WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts

# Start with custom ID
WORKER_ROLE=product_resync WORKER_ID=resync-1 npx tsx src/tasks/task-worker.ts

# Start multiple workers for different roles
WORKER_ROLE=store_discovery npx tsx src/tasks/task-worker.ts &
WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts &

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: task-worker-resync
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: worker
        image: code.cannabrands.app/creationshop/dispensary-scraper:latest
        command: ["npx", "tsx", "src/tasks/task-worker.ts"]
        env:
        - name: WORKER_ROLE
          value: "product_resync"

API Endpoints

Task Management

Endpoint Method Description
/api/tasks GET List tasks with filters
/api/tasks POST Create a new task
/api/tasks/:id GET Get task by ID
/api/tasks/counts GET Get counts by status
/api/tasks/capacity GET Get capacity metrics
/api/tasks/capacity/:role GET Get role-specific capacity
/api/tasks/recover-stale POST Recover tasks from dead workers

Task Generation

Endpoint Method Description
/api/tasks/generate/resync POST Generate daily resync tasks
/api/tasks/generate/discovery POST Create store discovery task

Migration (from legacy systems)

Endpoint Method Description
/api/tasks/migration/status GET Compare old vs new systems
/api/tasks/migration/disable-old-schedules POST Disable job_schedules
/api/tasks/migration/cancel-pending-crawl-jobs POST Cancel old crawl jobs
/api/tasks/migration/create-resync-tasks POST Create tasks for all stores
/api/tasks/migration/full-migrate POST One-click migration

Role-Specific Endpoints

Endpoint Method Description
/api/tasks/role/:role/last-completion GET Last completion time
/api/tasks/role/:role/recent GET Recent completions
/api/tasks/store/:id/active GET Check if store has active task

Capacity Planning

The v_worker_capacity view provides real-time metrics:

SELECT * FROM v_worker_capacity;

Returns:

  • pending_tasks - Tasks waiting to be claimed
  • ready_tasks - Tasks ready now (scheduled_for is null or past)
  • claimed_tasks - Tasks claimed but not started
  • running_tasks - Tasks actively processing
  • completed_last_hour - Recent completions
  • failed_last_hour - Recent failures
  • active_workers - Workers with recent heartbeats
  • avg_duration_sec - Average task duration
  • tasks_per_worker_hour - Throughput estimate
  • estimated_hours_to_drain - Time to clear queue

Scaling Recommendations

// API: GET /api/tasks/capacity/:role
{
  "role": "product_resync",
  "pending_tasks": 500,
  "active_workers": 3,
  "workers_needed": {
    "for_1_hour": 10,
    "for_4_hours": 3,
    "for_8_hours": 2
  }
}

Task Chaining

Tasks can automatically create follow-up tasks:

store_discovery → entry_point_discovery → product_discovery
                              ↓
                     (store has platform_dispensary_id)
                              ↓
                     Daily resync tasks

The chainNextTask() method handles this automatically.

Stale Task Recovery

Tasks are considered stale if last_heartbeat_at is older than the threshold (default 10 minutes).

SELECT recover_stale_tasks(10); -- 10 minute threshold

Or via API:

curl -X POST /api/tasks/recover-stale \
  -H 'Content-Type: application/json' \
  -d '{"threshold_minutes": 10}'

Migration from Legacy Systems

Legacy Systems Replaced

  1. job_schedules + job_run_logs - Scheduled job definitions
  2. dispensary_crawl_jobs - Per-dispensary crawl queue
  3. SyncOrchestrator + HydrationWorker - Raw payload processing

Migration Steps

Option 1: One-Click Migration

curl -X POST /api/tasks/migration/full-migrate

This will:

  1. Disable all job_schedules
  2. Cancel pending dispensary_crawl_jobs
  3. Generate resync tasks for all stores
  4. Create discovery and analytics tasks

Option 2: Manual Migration

# 1. Check current status
curl /api/tasks/migration/status

# 2. Disable old schedules
curl -X POST /api/tasks/migration/disable-old-schedules

# 3. Cancel pending crawl jobs
curl -X POST /api/tasks/migration/cancel-pending-crawl-jobs

# 4. Create resync tasks
curl -X POST /api/tasks/migration/create-resync-tasks \
  -H 'Content-Type: application/json' \
  -d '{"state_code": "AZ"}'

# 5. Generate daily resync schedule
curl -X POST /api/tasks/generate/resync \
  -H 'Content-Type: application/json' \
  -d '{"batches_per_day": 6}'

Per-Store Locking

The system prevents concurrent tasks for the same store using a partial unique index:

CREATE UNIQUE INDEX idx_worker_tasks_active_dispensary
ON worker_tasks (dispensary_id)
WHERE dispensary_id IS NOT NULL
AND status IN ('claimed', 'running');

This ensures only one task can be active per store at any time.

Task Priority

Tasks are claimed in priority order (higher first), then by creation time:

ORDER BY priority DESC, created_at ASC

Default priorities:

  • store_discovery: 0
  • entry_point_discovery: 10 (high - new stores)
  • product_discovery: 10 (high - new stores)
  • product_resync: 0
  • analytics_refresh: 0

Scheduled Tasks

Tasks can be scheduled for future execution:

await taskService.createTask({
  role: 'product_resync',
  dispensary_id: 123,
  scheduled_for: new Date('2025-01-10T06:00:00Z'),
});

The generate_resync_tasks() function creates staggered tasks throughout the day:

SELECT generate_resync_tasks(6, '2025-01-10'); -- 6 batches = every 4 hours

Dashboard Integration

The admin dashboard shows task queue status in the main overview:

Task Queue Summary
------------------
Pending:   45
Running:   3
Completed: 1,234
Failed:    12

Full task management is available at /admin/tasks.

Error Handling

Failed tasks include the error message in error_message and can be retried:

-- View failed tasks
SELECT id, role, dispensary_id, error_message, retry_count
FROM worker_tasks
WHERE status = 'failed'
ORDER BY completed_at DESC
LIMIT 20;

-- Retry failed tasks
UPDATE worker_tasks
SET status = 'pending', retry_count = retry_count + 1
WHERE status = 'failed' AND retry_count < max_retries;

Concurrent Task Processing (Added 2024-12)

Workers can now process multiple tasks concurrently within a single worker instance. This improves throughput by utilizing async I/O efficiently.

Architecture

┌─────────────────────────────────────────────────────────────┐
│                         Pod (K8s)                           │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                    TaskWorker                        │   │
│  │                                                      │   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐             │   │
│  │  │ Task 1  │  │ Task 2  │  │ Task 3  │  (concurrent)│   │
│  │  └─────────┘  └─────────┘  └─────────┘             │   │
│  │                                                      │   │
│  │  Resource Monitor                                    │   │
│  │  ├── Memory: 65% (threshold: 85%)                   │   │
│  │  ├── CPU: 45% (threshold: 90%)                      │   │
│  │  └── Status: Normal                                  │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

Environment Variables

Variable Default Description
MAX_CONCURRENT_TASKS 3 Maximum tasks a worker will run concurrently
MEMORY_BACKOFF_THRESHOLD 0.85 Back off when heap memory exceeds 85%
CPU_BACKOFF_THRESHOLD 0.90 Back off when CPU exceeds 90%
BACKOFF_DURATION_MS 10000 How long to wait when backing off (10s)

How It Works

  1. Main Loop: Worker continuously tries to fill up to MAX_CONCURRENT_TASKS
  2. Resource Monitoring: Before claiming a new task, worker checks memory and CPU
  3. Backoff: If resources exceed thresholds, worker pauses and stops claiming new tasks
  4. Concurrent Execution: Tasks run in parallel using Promise - they don't block each other
  5. Graceful Shutdown: On SIGTERM/decommission, worker stops claiming but waits for active tasks

Resource Monitoring

// ResourceStats interface
interface ResourceStats {
  memoryPercent: number;    // Current heap usage as decimal (0.0-1.0)
  memoryMb: number;         // Current heap used in MB
  memoryTotalMb: number;    // Total heap available in MB
  cpuPercent: number;       // CPU usage as percentage (0-100)
  isBackingOff: boolean;    // True if worker is in backoff state
  backoffReason: string;    // Why the worker is backing off
}

Heartbeat Data

Workers report the following in their heartbeat:

{
  "worker_id": "worker-abc123",
  "current_task_id": 456,
  "current_task_ids": [456, 457, 458],
  "active_task_count": 3,
  "max_concurrent_tasks": 3,
  "status": "active",
  "resources": {
    "memory_mb": 256,
    "memory_total_mb": 512,
    "memory_rss_mb": 320,
    "memory_percent": 50,
    "cpu_user_ms": 12500,
    "cpu_system_ms": 3200,
    "cpu_percent": 45,
    "is_backing_off": false,
    "backoff_reason": null
  }
}

Backoff Behavior

When resources exceed thresholds:

  1. Worker logs the backoff reason:

    [TaskWorker] MyWorker backing off: Memory at 87.3% (threshold: 85%)
    
  2. Worker stops claiming new tasks but continues existing tasks

  3. After BACKOFF_DURATION_MS, worker rechecks resources

  4. When resources return to normal:

    [TaskWorker] MyWorker resuming normal operation
    

UI Display

The Workers Dashboard shows:

  • Tasks Column: 2/3 tasks (active/max concurrent)
  • Resources Column: Memory % and CPU % with color coding
    • Green: < 50%
    • Yellow: 50-74%
    • Amber: 75-89%
    • Red: 90%+
  • Backing Off: Orange warning badge when worker is in backoff state

Task Count Badge Details

┌─────────────────────────────────────────────┐
│ Worker: "MyWorker"                          │
│ Tasks: 2/3 tasks  #456, #457                │
│ Resources: 🧠 65%  💻 45%                    │
│ Status: ● Active                            │
└─────────────────────────────────────────────┘

Best Practices

  1. Start Conservative: Use MAX_CONCURRENT_TASKS=3 initially
  2. Monitor Resources: Watch for frequent backoffs in logs
  3. Tune Per Workload: I/O-bound tasks benefit from higher concurrency
  4. Scale Horizontally: Add more pods rather than cranking concurrency too high

Code References

File Purpose
src/tasks/task-worker.ts:68-71 Concurrency environment variables
src/tasks/task-worker.ts:104-111 ResourceStats interface
src/tasks/task-worker.ts:149-179 getResourceStats() method
src/tasks/task-worker.ts:184-196 shouldBackOff() method
src/tasks/task-worker.ts:462-516 mainLoop() with concurrent claiming
src/routes/worker-registry.ts:148-195 Heartbeat endpoint handling
cannaiq/src/pages/WorkersDashboard.tsx:233-305 UI components for resources

Browser Task Memory Limits (Updated 2025-12)

Browser-based tasks (Puppeteer/Chrome) have strict memory constraints that limit concurrency.

Why Browser Tasks Are Different

Each browser task launches a Chrome process. Unlike I/O-bound API calls, browsers consume significant RAM:

Component RAM Usage
Node.js runtime ~150 MB
Chrome browser (base) ~200-250 MB
Dutchie menu page (loaded) ~100-150 MB
Per browser total ~350-450 MB

Memory Math for Pod Limits

Pod memory limit:     2 GB (2000 MB)
Node.js runtime:      -150 MB
Safety buffer:        -100 MB
────────────────────────────────
Available for browsers: 1750 MB

Per browser + page:    ~400 MB

Max browsers: 1750 ÷ 400 = ~4 browsers

Recommended: 3 browsers (leaves headroom for spikes)

MAX_CONCURRENT_TASKS for Browser Tasks

Browsers per Pod RAM Used Risk Level
1 ~500 MB Very safe
2 ~900 MB Safe
3 ~1.3 GB Recommended
4 ~1.7 GB Tight (may OOM)
5+ >2 GB Will OOM crash

CRITICAL: MAX_CONCURRENT_TASKS=3 is the maximum safe value for browser tasks with current pod limits.

Scaling Strategy

Scale horizontally (more pods) rather than vertically (more concurrency per pod):

┌─────────────────────────────────────────────────────────────────────────┐
│ Cluster: 8 pods × 3 browsers = 24 concurrent tasks                       │
│                                                                          │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐       │
│  │ Pod 0       │ │ Pod 1       │ │ Pod 2       │ │ Pod 3       │       │
│  │ 3 browsers  │ │ 3 browsers  │ │ 3 browsers  │ │ 3 browsers  │       │
│  └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘       │
│                                                                          │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐       │
│  │ Pod 4       │ │ Pod 5       │ │ Pod 6       │ │ Pod 7       │       │
│  │ 3 browsers  │ │ 3 browsers  │ │ 3 browsers  │ │ 3 browsers  │       │
│  └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘       │
└─────────────────────────────────────────────────────────────────────────┘

Browser Lifecycle Per Task

Each task gets a fresh browser with fresh IP/identity:

1. Claim task from queue
2. Get fresh proxy from pool
3. Launch browser with proxy
4. Run preflight (verify IP)
5. Execute scrape
6. Close browser
7. Repeat

This ensures:

  • Fresh IP per task (proxy rotation)
  • Fresh fingerprint per task (UA rotation)
  • No cookie/session bleed between tasks
  • Predictable memory usage

Increasing Capacity

To handle more concurrent tasks:

  1. Add more pods (up to 8 per CLAUDE.md limit)
  2. Increase pod memory (allows 4 browsers per pod):
    resources:
      limits:
        memory: "2.5Gi"  # from 2Gi
    

DO NOT simply increase MAX_CONCURRENT_TASKS without also increasing pod memory limits.

Monitoring

Logs

Workers log to stdout:

[TaskWorker] Starting worker worker-product_resync-a1b2c3d4 for role: product_resync
[TaskWorker] Claimed task 123 (product_resync) for dispensary 456
[TaskWorker] Task 123 completed successfully

Health Check

Check if workers are active:

SELECT worker_id, role, COUNT(*), MAX(last_heartbeat_at)
FROM worker_tasks
WHERE last_heartbeat_at > NOW() - INTERVAL '5 minutes'
GROUP BY worker_id, role;

Metrics

-- Tasks by status
SELECT status, COUNT(*) FROM worker_tasks GROUP BY status;

-- Tasks by role
SELECT role, status, COUNT(*) FROM worker_tasks GROUP BY role, status;

-- Average duration by role
SELECT role, AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_seconds
FROM worker_tasks
WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '24 hours'
GROUP BY role;