## Worker System - Role-agnostic workers that can handle any task type - Pod-based architecture with StatefulSet (5-15 pods, 5 workers each) - Custom pod names (Aethelgard, Xylos, Kryll, etc.) - Worker registry with friendly names and resource monitoring - Hub-and-spoke visualization on JobQueue page ## Stealth & Anti-Detection (REQUIRED) - Proxies are MANDATORY - workers fail to start without active proxies - CrawlRotator initializes on worker startup - Loads proxies from `proxies` table - Auto-rotates proxy + fingerprint on 403 errors - 12 browser fingerprints (Chrome, Firefox, Safari, Edge) - Locale/timezone matching for geographic consistency ## Task System - Renamed product_resync → product_refresh - Task chaining: store_discovery → entry_point → product_discovery - Priority-based claiming with FOR UPDATE SKIP LOCKED - Heartbeat and stale task recovery ## UI Updates - JobQueue: Pod visualization, resource monitoring on hover - WorkersDashboard: Simplified worker list - Removed unused filters from task list ## Other - IP2Location service for visitor analytics - Findagram consumer features scaffolding - Documentation updates 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
409 lines
11 KiB
Markdown
409 lines
11 KiB
Markdown
# CannaiQ Worker System
|
|
|
|
## Overview
|
|
|
|
The Worker System is a role-based task queue that processes background jobs. All tasks go into a single pool, and workers claim tasks based on their assigned role.
|
|
|
|
---
|
|
|
|
## Design Pattern: Single Pool, Role-Based Claiming
|
|
|
|
```
|
|
┌─────────────────────────────────────────┐
|
|
│ TASK POOL (worker_tasks) │
|
|
│ │
|
|
│ ┌─────────────────────────────────┐ │
|
|
│ │ role=store_discovery pending │ │
|
|
│ │ role=product_resync pending │ │
|
|
│ │ role=product_resync pending │ │
|
|
│ │ role=product_resync pending │ │
|
|
│ │ role=analytics_refresh pending │ │
|
|
│ │ role=entry_point_disc pending │ │
|
|
│ └─────────────────────────────────┘ │
|
|
└─────────────────────────────────────────┘
|
|
│
|
|
┌────────────────────────────┼────────────────────────────┐
|
|
│ │ │
|
|
▼ ▼ ▼
|
|
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
|
|
│ WORKER │ │ WORKER │ │ WORKER │
|
|
│ role=product_ │ │ role=product_ │ │ role=store_ │
|
|
│ resync │ │ resync │ │ discovery │
|
|
│ │ │ │ │ │
|
|
│ Claims ONLY │ │ Claims ONLY │ │ Claims ONLY │
|
|
│ product_resync │ │ product_resync │ │ store_discovery │
|
|
│ tasks │ │ tasks │ │ tasks │
|
|
└──────────────────┘ └──────────────────┘ └──────────────────┘
|
|
```
|
|
|
|
**Key Points:**
|
|
- All tasks go into ONE table (`worker_tasks`)
|
|
- Each worker is assigned ONE role at startup
|
|
- Workers only claim tasks matching their role
|
|
- Multiple workers can share the same role (horizontal scaling)
|
|
|
|
---
|
|
|
|
## Worker Roles
|
|
|
|
| Role | Purpose | Per-Store? | Schedule |
|
|
|------|---------|------------|----------|
|
|
| `store_discovery` | Find new dispensaries via GraphQL | No | Weekly |
|
|
| `entry_point_discovery` | Resolve platform IDs from menu URLs | Yes | On-demand |
|
|
| `product_discovery` | Initial product fetch for new stores | Yes | On-demand |
|
|
| `product_resync` | Regular price/stock updates | Yes | Every 4 hours |
|
|
| `analytics_refresh` | Refresh materialized views | No | Daily |
|
|
|
|
---
|
|
|
|
## Task Lifecycle
|
|
|
|
```
|
|
pending → claimed → running → completed
|
|
↓
|
|
failed
|
|
↓
|
|
(retry if < max_retries)
|
|
```
|
|
|
|
| Status | Meaning |
|
|
|--------|---------|
|
|
| `pending` | Waiting to be claimed |
|
|
| `claimed` | Worker has claimed, not yet started |
|
|
| `running` | Worker is actively processing |
|
|
| `completed` | Successfully finished |
|
|
| `failed` | Error occurred |
|
|
| `stale` | Worker died (heartbeat timeout) |
|
|
|
|
---
|
|
|
|
## Task Chaining
|
|
|
|
Tasks automatically create follow-up tasks:
|
|
|
|
```
|
|
store_discovery (finds new stores)
|
|
│
|
|
├─ Returns newStoreIds[] in result
|
|
▼
|
|
entry_point_discovery (for each new store)
|
|
│
|
|
├─ Resolves platform_dispensary_id
|
|
▼
|
|
product_discovery (initial crawl)
|
|
│
|
|
▼
|
|
(store enters regular schedule)
|
|
│
|
|
▼
|
|
product_resync (every 4 hours)
|
|
```
|
|
|
|
---
|
|
|
|
## How Claiming Works
|
|
|
|
### 1. Worker starts with a role
|
|
|
|
```bash
|
|
WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts
|
|
```
|
|
|
|
### 2. Worker loop polls for tasks
|
|
|
|
```typescript
|
|
// Simplified worker loop
|
|
while (running) {
|
|
const task = await claimTask(this.role, this.workerId);
|
|
|
|
if (!task) {
|
|
await sleep(5000); // No tasks, wait 5 seconds
|
|
continue;
|
|
}
|
|
|
|
await processTask(task);
|
|
}
|
|
```
|
|
|
|
### 3. SQL function claims atomically
|
|
|
|
```sql
|
|
-- claim_task(role, worker_id)
|
|
UPDATE worker_tasks
|
|
SET status = 'claimed', worker_id = $2, claimed_at = NOW()
|
|
WHERE id = (
|
|
SELECT id FROM worker_tasks
|
|
WHERE role = $1 -- Filter by worker's role
|
|
AND status = 'pending'
|
|
AND (scheduled_for IS NULL OR scheduled_for <= NOW())
|
|
AND dispensary_id NOT IN ( -- Per-store locking
|
|
SELECT dispensary_id FROM worker_tasks
|
|
WHERE status IN ('claimed', 'running')
|
|
)
|
|
ORDER BY priority DESC, created_at ASC -- Priority ordering
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED -- Atomic, no race conditions
|
|
)
|
|
RETURNING *;
|
|
```
|
|
|
|
**Key Features:**
|
|
- `FOR UPDATE SKIP LOCKED` - Prevents race conditions between workers
|
|
- Role filtering - Worker only sees tasks for its role
|
|
- Per-store locking - Only one active task per dispensary
|
|
- Priority ordering - Higher priority tasks first
|
|
- Scheduled tasks - Respects `scheduled_for` timestamp
|
|
|
|
---
|
|
|
|
## Heartbeat & Stale Recovery
|
|
|
|
Workers send heartbeats every 30 seconds while processing:
|
|
|
|
```typescript
|
|
// During task processing
|
|
setInterval(() => {
|
|
await pool.query(
|
|
'UPDATE worker_tasks SET last_heartbeat_at = NOW() WHERE id = $1',
|
|
[taskId]
|
|
);
|
|
}, 30000);
|
|
```
|
|
|
|
If a worker dies, its tasks are recovered:
|
|
|
|
```sql
|
|
-- recover_stale_tasks(threshold_minutes)
|
|
UPDATE worker_tasks
|
|
SET status = 'pending', worker_id = NULL, retry_count = retry_count + 1
|
|
WHERE status IN ('claimed', 'running')
|
|
AND last_heartbeat_at < NOW() - INTERVAL '10 minutes'
|
|
AND retry_count < max_retries;
|
|
```
|
|
|
|
---
|
|
|
|
## Scheduling
|
|
|
|
### Daily Resync Generation
|
|
|
|
```sql
|
|
SELECT generate_resync_tasks(6, CURRENT_DATE); -- 6 batches = every 4 hours
|
|
```
|
|
|
|
Creates staggered tasks:
|
|
| Batch | Time | Stores |
|
|
|-------|------|--------|
|
|
| 1 | 00:00 | 1-50 |
|
|
| 2 | 04:00 | 51-100 |
|
|
| 3 | 08:00 | 101-150 |
|
|
| 4 | 12:00 | 151-200 |
|
|
| 5 | 16:00 | 201-250 |
|
|
| 6 | 20:00 | 251-300 |
|
|
|
|
---
|
|
|
|
## Files
|
|
|
|
### Core
|
|
|
|
| File | Purpose |
|
|
|------|---------|
|
|
| `src/tasks/task-service.ts` | Task CRUD, claiming, capacity metrics |
|
|
| `src/tasks/task-worker.ts` | Worker loop, heartbeat, handler dispatch |
|
|
| `src/routes/tasks.ts` | REST API endpoints |
|
|
| `migrations/074_worker_task_queue.sql` | Database schema + SQL functions |
|
|
|
|
### Handlers
|
|
|
|
| File | Role |
|
|
|------|------|
|
|
| `src/tasks/handlers/store-discovery.ts` | `store_discovery` |
|
|
| `src/tasks/handlers/entry-point-discovery.ts` | `entry_point_discovery` |
|
|
| `src/tasks/handlers/product-discovery.ts` | `product_discovery` |
|
|
| `src/tasks/handlers/product-resync.ts` | `product_resync` |
|
|
| `src/tasks/handlers/analytics-refresh.ts` | `analytics_refresh` |
|
|
|
|
---
|
|
|
|
## Running Workers
|
|
|
|
### Local Development
|
|
|
|
```bash
|
|
# Start a single worker
|
|
WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts
|
|
|
|
# Start multiple workers (different terminals)
|
|
WORKER_ROLE=product_resync WORKER_ID=resync-1 npx tsx src/tasks/task-worker.ts
|
|
WORKER_ROLE=product_resync WORKER_ID=resync-2 npx tsx src/tasks/task-worker.ts
|
|
WORKER_ROLE=store_discovery npx tsx src/tasks/task-worker.ts
|
|
```
|
|
|
|
### Environment Variables
|
|
|
|
| Variable | Default | Description |
|
|
|----------|---------|-------------|
|
|
| `WORKER_ROLE` | (required) | Which task role to process |
|
|
| `WORKER_ID` | auto-generated | Custom worker identifier |
|
|
| `POLL_INTERVAL_MS` | 5000 | How often to check for tasks |
|
|
| `HEARTBEAT_INTERVAL_MS` | 30000 | How often to update heartbeat |
|
|
|
|
### Kubernetes
|
|
|
|
```yaml
|
|
apiVersion: apps/v1
|
|
kind: Deployment
|
|
metadata:
|
|
name: task-worker-resync
|
|
spec:
|
|
replicas: 5 # Scale horizontally
|
|
template:
|
|
spec:
|
|
containers:
|
|
- name: worker
|
|
image: code.cannabrands.app/creationshop/dispensary-scraper:latest
|
|
command: ["npx", "tsx", "src/tasks/task-worker.ts"]
|
|
env:
|
|
- name: WORKER_ROLE
|
|
value: "product_resync"
|
|
```
|
|
|
|
---
|
|
|
|
## API Endpoints
|
|
|
|
### Task Management
|
|
|
|
| Method | Endpoint | Description |
|
|
|--------|----------|-------------|
|
|
| GET | `/api/tasks` | List tasks (with filters) |
|
|
| POST | `/api/tasks` | Create a task |
|
|
| GET | `/api/tasks/:id` | Get task by ID |
|
|
| GET | `/api/tasks/counts` | Counts by status |
|
|
| GET | `/api/tasks/capacity` | Capacity metrics |
|
|
| POST | `/api/tasks/recover-stale` | Recover dead worker tasks |
|
|
|
|
### Task Generation
|
|
|
|
| Method | Endpoint | Description |
|
|
|--------|----------|-------------|
|
|
| POST | `/api/tasks/generate/resync` | Generate daily resync batch |
|
|
| POST | `/api/tasks/generate/discovery` | Create store discovery task |
|
|
|
|
---
|
|
|
|
## Capacity Planning
|
|
|
|
The `v_worker_capacity` view provides metrics:
|
|
|
|
```sql
|
|
SELECT * FROM v_worker_capacity;
|
|
```
|
|
|
|
| Metric | Description |
|
|
|--------|-------------|
|
|
| `pending_tasks` | Tasks waiting |
|
|
| `ready_tasks` | Tasks ready now (scheduled_for passed) |
|
|
| `running_tasks` | Tasks being processed |
|
|
| `active_workers` | Workers with recent heartbeat |
|
|
| `tasks_per_worker_hour` | Throughput estimate |
|
|
| `estimated_hours_to_drain` | Time to clear queue |
|
|
|
|
### Scaling API
|
|
|
|
```bash
|
|
GET /api/tasks/capacity/product_resync
|
|
```
|
|
|
|
```json
|
|
{
|
|
"pending_tasks": 500,
|
|
"active_workers": 3,
|
|
"workers_needed": {
|
|
"for_1_hour": 10,
|
|
"for_4_hours": 3,
|
|
"for_8_hours": 2
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Database Schema
|
|
|
|
### worker_tasks
|
|
|
|
```sql
|
|
CREATE TABLE worker_tasks (
|
|
id SERIAL PRIMARY KEY,
|
|
|
|
-- Task identification
|
|
role VARCHAR(50) NOT NULL,
|
|
dispensary_id INTEGER REFERENCES dispensaries(id),
|
|
platform VARCHAR(20),
|
|
|
|
-- State
|
|
status VARCHAR(20) DEFAULT 'pending',
|
|
priority INTEGER DEFAULT 0,
|
|
scheduled_for TIMESTAMPTZ,
|
|
|
|
-- Ownership
|
|
worker_id VARCHAR(100),
|
|
claimed_at TIMESTAMPTZ,
|
|
started_at TIMESTAMPTZ,
|
|
completed_at TIMESTAMPTZ,
|
|
last_heartbeat_at TIMESTAMPTZ,
|
|
|
|
-- Results
|
|
result JSONB,
|
|
error_message TEXT,
|
|
retry_count INTEGER DEFAULT 0,
|
|
max_retries INTEGER DEFAULT 3,
|
|
|
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
```
|
|
|
|
### Key Indexes
|
|
|
|
```sql
|
|
-- Fast claiming by role
|
|
CREATE INDEX idx_worker_tasks_pending
|
|
ON worker_tasks(role, priority DESC, created_at ASC)
|
|
WHERE status = 'pending';
|
|
|
|
-- Prevent duplicate active tasks per store
|
|
CREATE UNIQUE INDEX idx_worker_tasks_unique_active_store
|
|
ON worker_tasks(dispensary_id)
|
|
WHERE status IN ('claimed', 'running') AND dispensary_id IS NOT NULL;
|
|
```
|
|
|
|
---
|
|
|
|
## Monitoring
|
|
|
|
### Logs
|
|
|
|
```
|
|
[TaskWorker] Starting worker worker-product_resync-a1b2c3d4 for role: product_resync
|
|
[TaskWorker] Claimed task 123 (product_resync) for dispensary 456
|
|
[TaskWorker] Task 123 completed successfully
|
|
```
|
|
|
|
### Health Check
|
|
|
|
```sql
|
|
-- Active workers
|
|
SELECT worker_id, role, COUNT(*), MAX(last_heartbeat_at)
|
|
FROM worker_tasks
|
|
WHERE last_heartbeat_at > NOW() - INTERVAL '5 minutes'
|
|
GROUP BY worker_id, role;
|
|
|
|
-- Task counts by role/status
|
|
SELECT role, status, COUNT(*)
|
|
FROM worker_tasks
|
|
GROUP BY role, status;
|
|
```
|