## Worker System - Role-agnostic workers that can handle any task type - Pod-based architecture with StatefulSet (5-15 pods, 5 workers each) - Custom pod names (Aethelgard, Xylos, Kryll, etc.) - Worker registry with friendly names and resource monitoring - Hub-and-spoke visualization on JobQueue page ## Stealth & Anti-Detection (REQUIRED) - Proxies are MANDATORY - workers fail to start without active proxies - CrawlRotator initializes on worker startup - Loads proxies from `proxies` table - Auto-rotates proxy + fingerprint on 403 errors - 12 browser fingerprints (Chrome, Firefox, Safari, Edge) - Locale/timezone matching for geographic consistency ## Task System - Renamed product_resync → product_refresh - Task chaining: store_discovery → entry_point → product_discovery - Priority-based claiming with FOR UPDATE SKIP LOCKED - Heartbeat and stale task recovery ## UI Updates - JobQueue: Pod visualization, resource monitoring on hover - WorkersDashboard: Simplified worker list - Removed unused filters from task list ## Other - IP2Location service for visitor analytics - Findagram consumer features scaffolding - Documentation updates 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
11 KiB
11 KiB
CannaiQ Worker System
Overview
The Worker System is a role-based task queue that processes background jobs. All tasks go into a single pool, and workers claim tasks based on their assigned role.
Design Pattern: Single Pool, Role-Based Claiming
┌─────────────────────────────────────────┐
│ TASK POOL (worker_tasks) │
│ │
│ ┌─────────────────────────────────┐ │
│ │ role=store_discovery pending │ │
│ │ role=product_resync pending │ │
│ │ role=product_resync pending │ │
│ │ role=product_resync pending │ │
│ │ role=analytics_refresh pending │ │
│ │ role=entry_point_disc pending │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
│
┌────────────────────────────┼────────────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ WORKER │ │ WORKER │ │ WORKER │
│ role=product_ │ │ role=product_ │ │ role=store_ │
│ resync │ │ resync │ │ discovery │
│ │ │ │ │ │
│ Claims ONLY │ │ Claims ONLY │ │ Claims ONLY │
│ product_resync │ │ product_resync │ │ store_discovery │
│ tasks │ │ tasks │ │ tasks │
└──────────────────┘ └──────────────────┘ └──────────────────┘
Key Points:
- All tasks go into ONE table (
worker_tasks) - Each worker is assigned ONE role at startup
- Workers only claim tasks matching their role
- Multiple workers can share the same role (horizontal scaling)
Worker Roles
| Role | Purpose | Per-Store? | Schedule |
|---|---|---|---|
store_discovery |
Find new dispensaries via GraphQL | No | Weekly |
entry_point_discovery |
Resolve platform IDs from menu URLs | Yes | On-demand |
product_discovery |
Initial product fetch for new stores | Yes | On-demand |
product_resync |
Regular price/stock updates | Yes | Every 4 hours |
analytics_refresh |
Refresh materialized views | No | Daily |
Task Lifecycle
pending → claimed → running → completed
↓
failed
↓
(retry if < max_retries)
| Status | Meaning |
|---|---|
pending |
Waiting to be claimed |
claimed |
Worker has claimed, not yet started |
running |
Worker is actively processing |
completed |
Successfully finished |
failed |
Error occurred |
stale |
Worker died (heartbeat timeout) |
Task Chaining
Tasks automatically create follow-up tasks:
store_discovery (finds new stores)
│
├─ Returns newStoreIds[] in result
▼
entry_point_discovery (for each new store)
│
├─ Resolves platform_dispensary_id
▼
product_discovery (initial crawl)
│
▼
(store enters regular schedule)
│
▼
product_resync (every 4 hours)
How Claiming Works
1. Worker starts with a role
WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts
2. Worker loop polls for tasks
// Simplified worker loop
while (running) {
const task = await claimTask(this.role, this.workerId);
if (!task) {
await sleep(5000); // No tasks, wait 5 seconds
continue;
}
await processTask(task);
}
3. SQL function claims atomically
-- claim_task(role, worker_id)
UPDATE worker_tasks
SET status = 'claimed', worker_id = $2, claimed_at = NOW()
WHERE id = (
SELECT id FROM worker_tasks
WHERE role = $1 -- Filter by worker's role
AND status = 'pending'
AND (scheduled_for IS NULL OR scheduled_for <= NOW())
AND dispensary_id NOT IN ( -- Per-store locking
SELECT dispensary_id FROM worker_tasks
WHERE status IN ('claimed', 'running')
)
ORDER BY priority DESC, created_at ASC -- Priority ordering
LIMIT 1
FOR UPDATE SKIP LOCKED -- Atomic, no race conditions
)
RETURNING *;
Key Features:
FOR UPDATE SKIP LOCKED- Prevents race conditions between workers- Role filtering - Worker only sees tasks for its role
- Per-store locking - Only one active task per dispensary
- Priority ordering - Higher priority tasks first
- Scheduled tasks - Respects
scheduled_fortimestamp
Heartbeat & Stale Recovery
Workers send heartbeats every 30 seconds while processing:
// During task processing
setInterval(() => {
await pool.query(
'UPDATE worker_tasks SET last_heartbeat_at = NOW() WHERE id = $1',
[taskId]
);
}, 30000);
If a worker dies, its tasks are recovered:
-- recover_stale_tasks(threshold_minutes)
UPDATE worker_tasks
SET status = 'pending', worker_id = NULL, retry_count = retry_count + 1
WHERE status IN ('claimed', 'running')
AND last_heartbeat_at < NOW() - INTERVAL '10 minutes'
AND retry_count < max_retries;
Scheduling
Daily Resync Generation
SELECT generate_resync_tasks(6, CURRENT_DATE); -- 6 batches = every 4 hours
Creates staggered tasks:
| Batch | Time | Stores |
|---|---|---|
| 1 | 00:00 | 1-50 |
| 2 | 04:00 | 51-100 |
| 3 | 08:00 | 101-150 |
| 4 | 12:00 | 151-200 |
| 5 | 16:00 | 201-250 |
| 6 | 20:00 | 251-300 |
Files
Core
| File | Purpose |
|---|---|
src/tasks/task-service.ts |
Task CRUD, claiming, capacity metrics |
src/tasks/task-worker.ts |
Worker loop, heartbeat, handler dispatch |
src/routes/tasks.ts |
REST API endpoints |
migrations/074_worker_task_queue.sql |
Database schema + SQL functions |
Handlers
| File | Role |
|---|---|
src/tasks/handlers/store-discovery.ts |
store_discovery |
src/tasks/handlers/entry-point-discovery.ts |
entry_point_discovery |
src/tasks/handlers/product-discovery.ts |
product_discovery |
src/tasks/handlers/product-resync.ts |
product_resync |
src/tasks/handlers/analytics-refresh.ts |
analytics_refresh |
Running Workers
Local Development
# Start a single worker
WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts
# Start multiple workers (different terminals)
WORKER_ROLE=product_resync WORKER_ID=resync-1 npx tsx src/tasks/task-worker.ts
WORKER_ROLE=product_resync WORKER_ID=resync-2 npx tsx src/tasks/task-worker.ts
WORKER_ROLE=store_discovery npx tsx src/tasks/task-worker.ts
Environment Variables
| Variable | Default | Description |
|---|---|---|
WORKER_ROLE |
(required) | Which task role to process |
WORKER_ID |
auto-generated | Custom worker identifier |
POLL_INTERVAL_MS |
5000 | How often to check for tasks |
HEARTBEAT_INTERVAL_MS |
30000 | How often to update heartbeat |
Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: task-worker-resync
spec:
replicas: 5 # Scale horizontally
template:
spec:
containers:
- name: worker
image: code.cannabrands.app/creationshop/dispensary-scraper:latest
command: ["npx", "tsx", "src/tasks/task-worker.ts"]
env:
- name: WORKER_ROLE
value: "product_resync"
API Endpoints
Task Management
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/tasks |
List tasks (with filters) |
| POST | /api/tasks |
Create a task |
| GET | /api/tasks/:id |
Get task by ID |
| GET | /api/tasks/counts |
Counts by status |
| GET | /api/tasks/capacity |
Capacity metrics |
| POST | /api/tasks/recover-stale |
Recover dead worker tasks |
Task Generation
| Method | Endpoint | Description |
|---|---|---|
| POST | /api/tasks/generate/resync |
Generate daily resync batch |
| POST | /api/tasks/generate/discovery |
Create store discovery task |
Capacity Planning
The v_worker_capacity view provides metrics:
SELECT * FROM v_worker_capacity;
| Metric | Description |
|---|---|
pending_tasks |
Tasks waiting |
ready_tasks |
Tasks ready now (scheduled_for passed) |
running_tasks |
Tasks being processed |
active_workers |
Workers with recent heartbeat |
tasks_per_worker_hour |
Throughput estimate |
estimated_hours_to_drain |
Time to clear queue |
Scaling API
GET /api/tasks/capacity/product_resync
{
"pending_tasks": 500,
"active_workers": 3,
"workers_needed": {
"for_1_hour": 10,
"for_4_hours": 3,
"for_8_hours": 2
}
}
Database Schema
worker_tasks
CREATE TABLE worker_tasks (
id SERIAL PRIMARY KEY,
-- Task identification
role VARCHAR(50) NOT NULL,
dispensary_id INTEGER REFERENCES dispensaries(id),
platform VARCHAR(20),
-- State
status VARCHAR(20) DEFAULT 'pending',
priority INTEGER DEFAULT 0,
scheduled_for TIMESTAMPTZ,
-- Ownership
worker_id VARCHAR(100),
claimed_at TIMESTAMPTZ,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
last_heartbeat_at TIMESTAMPTZ,
-- Results
result JSONB,
error_message TEXT,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Key Indexes
-- Fast claiming by role
CREATE INDEX idx_worker_tasks_pending
ON worker_tasks(role, priority DESC, created_at ASC)
WHERE status = 'pending';
-- Prevent duplicate active tasks per store
CREATE UNIQUE INDEX idx_worker_tasks_unique_active_store
ON worker_tasks(dispensary_id)
WHERE status IN ('claimed', 'running') AND dispensary_id IS NOT NULL;
Monitoring
Logs
[TaskWorker] Starting worker worker-product_resync-a1b2c3d4 for role: product_resync
[TaskWorker] Claimed task 123 (product_resync) for dispensary 456
[TaskWorker] Task 123 completed successfully
Health Check
-- Active workers
SELECT worker_id, role, COUNT(*), MAX(last_heartbeat_at)
FROM worker_tasks
WHERE last_heartbeat_at > NOW() - INTERVAL '5 minutes'
GROUP BY worker_id, role;
-- Task counts by role/status
SELECT role, status, COUNT(*)
FROM worker_tasks
GROUP BY role, status;