Compare commits

...

18 Commits

Author SHA1 Message Date
Kelly
822d2b0609 feat: Idempotent entry_point_discovery with bulk endpoint
- Track id_resolution_status, attempts, and errors in handler
- Add POST /api/tasks/batch/entry-point-discovery endpoint
- Skip already-resolved stores, retry failed with force flag
2025-12-12 20:27:36 -07:00
Kelly
dfd36dacf8 fix: Show next run time correctly for schedules 2025-12-12 20:21:15 -07:00
Kelly
4ea7139ed5 feat: Add step reporting to all task handlers
Added updateStep() calls to:
- payload-fetch-curl: loading → preflight → fetching → saving
- product-refresh: loading → normalizing → upserting
- store-discovery-http: starting → preflight → navigating → fetching

This enables real-time visibility of worker progress in the dashboard.
2025-12-12 20:14:00 -07:00
Kelly
63023a4061 feat: Worker improvements and Run Now duplicate prevention
- Fix Run Now to prevent duplicate task creation
- Add loading state to Run Now button in UI
- Return early when no stores need refresh
- Worker dashboard improvements
- Browser pooling architecture updates
- K8s worker config updates (8 replicas, 3 concurrent tasks)
2025-12-12 20:11:31 -07:00
Kelly
c98c409f59 feat: Add MinIO/S3 support for payload storage
- Update payload-storage.ts to use MinIO when configured
- Payloads stored at: cannaiq/payloads/{year}/{month}/{day}/store_{id}_{ts}.json.gz
- Falls back to local filesystem when MINIO_* env vars not set
- Enables shared storage across all worker pods
- Fixes ephemeral storage issue where payloads were lost on pod restart

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 11:30:57 -07:00
Kelly
92f88fdcd6 fix(workers): Increase max concurrent tasks to 15 and add K8s permission rule
- Change MAX_CONCURRENT_TASKS default from 3 to 15
- Add CLAUDE.md rule requiring explicit permission before kubectl commands

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 10:54:33 -07:00
Kelly
832ef1cf83 feat(scheduler): Immutable schedules and HTTP-only pipeline
## Changes
- **Migration 089**: Add is_immutable and method columns to task_schedules
  - Per-state product_discovery schedules (4h default)
  - Store discovery weekly (168h)
  - All schedules use HTTP transport (Puppeteer/browser)
- **Task Scheduler**: HTTP-only product discovery with per-state scheduling
  - Each state has its own immutable schedule
  - Schedules can be edited (interval/priority) but not deleted
- **TasksDashboard UI**: Full immutability support
  - Lock icon for immutable schedules
  - State and Method columns in schedules table
  - Disabled delete for immutable, restricted edit fields
- **Store Discovery HTTP**: Auto-queue product_discovery for new stores
- **Migration 088**: Discovery payloads storage schema

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 09:24:08 -07:00
Kelly
9a24b4896c feat(tasks): Dual transport handlers and self-healing product_refresh
- Rename product-discovery.ts to product-discovery-curl.ts (axios-based)
- Rename payload-fetch.ts to payload-fetch-curl.ts
- Add product-discovery-http.ts (Puppeteer browser-based handler)
- Add method field to CreateTaskParams for transport selection
- Update task-service to insert method column on task creation
- Update task-worker with getHandlerForTask() for dual transport routing
- product_refresh now queues upstream tasks when no payload exists:
  - Has platform_dispensary_id → queues product_discovery (http)
  - No platform_dispensary_id → queues entry_point_discovery

This enables HTTP workers to pick up browser-based tasks while curl
workers handle axios-based tasks, and prevents product_refresh from
failing repeatedly when no crawl has been performed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 03:02:56 -07:00
Kelly
dd8fce6e35 fix(proxy): Convert non-standard proxy URL format and simplify preflight
- CrawlRotator.getProxyUrl() now converts non-standard format (http://host:port:user:pass) to standard format (http://user:pass@host:port)
- Simplify puppeteer preflight to only use ipify.org for IP verification (much lighter than fingerprint.com)
- Remove heavy anti-detect site tests from preflight - not needed, trust stealth plugin
- Fixes 503 errors when using session-based residential proxies

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 02:13:51 -07:00
Kelly
f82eed4dc3 feat(workers): Add proxy reload, staggered tasks, and bulk proxy import
- Periodic proxy reload: Workers now reload proxies every 60s to pick up changes
- Staggered task scheduling: New API endpoints for creating tasks with delays
- Bulk proxy import: Script supports multiple URL formats including host:port:user:pass
- Proxy URL column: Migration 086 adds proxy_url for non-standard formats

Key changes:
- crawl-rotator.ts: Added reloadIfStale(), isStale(), setReloadInterval()
- task-worker.ts: Calls reloadIfStale() in main loop
- task-service.ts: Added createStaggeredTasks() and createAZStoreTasks()
- tasks.ts: Added POST /batch/staggered and /batch/az-stores endpoints
- import-proxies.ts: New script for bulk proxy import
- CLAUDE.md: Documented staggered task workflow

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 01:53:15 -07:00
Kelly
6490df9faf feat(tasks): Consolidate schedule management into task_schedules
- Add schedule CRUD endpoints to /api/tasks/schedules
- Add Schedules section to TasksDashboard with edit/delete/bulk actions
- Deprecate job_schedules table (entries disabled in DB)
- Mark CrawlSchedulePage as deprecated (removed from menu)
- Add deprecation comments to legacy schedule methods in api.ts
- Add migration comments to workers.ts explaining consolidation

Key changes:
- Schedule management now at /admin/tasks instead of /admin/schedule
- task_schedules uses interval_hours (simpler than base_interval_minutes + jitter)
- All schedule routes placed before /:id to avoid Express route conflicts

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 01:15:21 -07:00
kelly
a077f81c65 Merge pull request 'fix(preflight): Phase 2 - Correct parameter order and add IP/fingerprint reporting' (#56) from feat/preflight-phase2-reporting into master 2025-12-12 07:35:02 +00:00
Kelly
6bcadd9e71 fix(preflight): Correct parameter order and add IP/fingerprint reporting
- Fix update_worker_preflight call to use correct parameter order:
  (worker_id, transport, status, ip, response_ms, error, fingerprint)
- Add proxyIp to both curl and http preflight reports
- Add fingerprint JSONB with timezone, location, and bot detection data
- Log HTTP IP and timezone after preflight completes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 00:32:45 -07:00
kelly
a77bf8611a Merge pull request 'feat(workers): Preflight phase 1 - Schema, StatefulSet, and timezone matching' (#55) from feat/preflight-phase1-schema into master 2025-12-12 07:30:53 +00:00
Kelly
33feca3138 fix(antidetect): Match browser timezone to proxy IP location
- Add IP geolocation lookup via ip-api.com to get timezone from proxy IP
- Use ipify.org API for reliable proxy IP detection (replaces unreliable fingerprint.com scraping)
- Set browser timezone via CDP Emulation.setTimezoneOverride to match proxy location
- Add detectedTimezone and detectedLocation to preflight result
- Add /api/worker-registry/preflight-test endpoint for smoke testing

Fixes timezone mismatch where browser showed America/Phoenix while proxy was in America/New_York

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 00:25:39 -07:00
kelly
7d85a97b63 Merge pull request 'feat: Preflight schema and StatefulSet' (#54) from feat/preflight-phase1-schema into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/54
2025-12-12 07:14:40 +00:00
Kelly
ce081effd4 feat(workers): Add preflight schema and StatefulSet
- Migration 085: Add curl_ip, http_ip, fingerprint_data, preflight_status,
  preflight_at columns to worker_registry
- StatefulSet manifest for 8 persistent workers with OnDelete update strategy

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 23:45:04 -07:00
kelly
2ed088b4d8 Merge pull request 'feat(api): Add preflight columns to worker registry API response' (#50) from feat/preflight-api-fields into master 2025-12-12 06:22:06 +00:00
31 changed files with 5066 additions and 535 deletions

114
CLAUDE.md
View File

@@ -17,6 +17,48 @@ Never deploy unless user explicitly says: "CLAUDE — DEPLOYMENT IS NOW AUTHORIZ
### 5. DB POOL ONLY
Never import `src/db/migrate.ts` at runtime. Use `src/db/pool.ts` for DB access.
### 6. K8S POD LIMITS — CRITICAL
**MAX 8 PODS** for `scraper-worker` deployment. NEVER EXCEED THIS.
**Pods vs Workers:**
- **Pod** = Kubernetes container instance (MAX 8)
- **Worker** = Concurrent task runner INSIDE a pod (controlled by `MAX_CONCURRENT_TASKS` env var)
- Formula: `8 pods × MAX_CONCURRENT_TASKS = total concurrent workers`
**Browser Task Memory Limits:**
- Each Puppeteer/Chrome browser uses ~400 MB RAM
- Pod memory limit is 2 GB
- **MAX_CONCURRENT_TASKS=3** is the safe maximum for browser tasks
- More than 3 concurrent browsers per pod = OOM crash
| Browsers | RAM Used | Status |
|----------|----------|--------|
| 3 | ~1.3 GB | Safe (recommended) |
| 4 | ~1.7 GB | Risky |
| 5+ | >2 GB | OOM crash |
**To increase throughput:** Add more pods (up to 8), NOT more concurrent tasks per pod.
```bash
# CORRECT - scale pods (up to 8)
kubectl scale deployment/scraper-worker -n dispensary-scraper --replicas=8
# WRONG - will cause OOM crashes
kubectl set env deployment/scraper-worker -n dispensary-scraper MAX_CONCURRENT_TASKS=10
```
**If K8s API returns ServiceUnavailable:** STOP IMMEDIATELY. Do not retry. The cluster is overloaded.
### 7. K8S REQUIRES EXPLICIT PERMISSION
**NEVER run kubectl commands without explicit user permission.**
Before running ANY `kubectl` command (scale, rollout, set env, delete, apply, etc.):
1. Tell the user what you want to do
2. Wait for explicit approval
3. Only then execute the command
This applies to ALL kubectl operations - even read-only ones like `kubectl get pods`.
---
## Quick Reference
@@ -205,6 +247,78 @@ These binaries mimic real browser TLS fingerprints to avoid detection.
---
## Staggered Task Workflow (Added 2025-12-12)
### Overview
When creating many tasks at once (e.g., product refresh for all AZ stores), staggered scheduling prevents resource contention, proxy assignment lag, and API rate limiting.
### How It Works
```
1. Task created with scheduled_for = NOW() + (index * stagger_seconds)
2. Worker claims task only when scheduled_for <= NOW()
3. Worker runs preflight on EVERY task claim (proxy health check)
4. If preflight passes, worker executes task
5. If preflight fails, task released back to pending for another worker
6. Worker finishes task, polls for next available task
7. Repeat - preflight runs on each new task claim
```
### Key Points
- **Preflight is per-task, not per-startup**: Each task claim triggers a new preflight check
- **Stagger prevents thundering herd**: 15 seconds between tasks is default
- **Task assignment is the trigger**: Worker picks up task → runs preflight → executes if passed
### API Endpoints
```bash
# Create staggered tasks for specific dispensary IDs
POST /api/tasks/batch/staggered
{
"dispensary_ids": [1, 2, 3, 4],
"role": "product_refresh", # or "product_discovery"
"stagger_seconds": 15, # default: 15
"platform": "dutchie", # default: "dutchie"
"method": null # "curl" | "http" | null
}
# Create staggered tasks for AZ stores (convenience endpoint)
POST /api/tasks/batch/az-stores
{
"total_tasks": 24, # default: 24
"stagger_seconds": 15, # default: 15
"split_roles": true # default: true (12 refresh, 12 discovery)
}
```
### Example: 24 Tasks for AZ Stores
```bash
curl -X POST http://localhost:3010/api/tasks/batch/az-stores \
-H "Content-Type: application/json" \
-d '{"total_tasks": 24, "stagger_seconds": 15, "split_roles": true}'
```
Response:
```json
{
"success": true,
"total": 24,
"product_refresh": 12,
"product_discovery": 12,
"stagger_seconds": 15,
"total_duration_seconds": 345,
"estimated_completion": "2025-12-12T08:40:00.000Z",
"message": "Created 24 staggered tasks for AZ stores (12 refresh, 12 discovery)"
}
```
### Related Files
| File | Purpose |
|------|---------|
| `src/tasks/task-service.ts` | `createStaggeredTasks()` and `createAZStoreTasks()` methods |
| `src/routes/tasks.ts` | API endpoints for batch task creation |
| `src/tasks/task-worker.ts` | Worker task claiming and preflight logic |
---
## Documentation
| Doc | Purpose |

View File

@@ -504,6 +504,103 @@ The Workers Dashboard shows:
| `src/routes/worker-registry.ts:148-195` | Heartbeat endpoint handling |
| `cannaiq/src/pages/WorkersDashboard.tsx:233-305` | UI components for resources |
## Browser Task Memory Limits (Updated 2025-12)
Browser-based tasks (Puppeteer/Chrome) have strict memory constraints that limit concurrency.
### Why Browser Tasks Are Different
Each browser task launches a Chrome process. Unlike I/O-bound API calls, browsers consume significant RAM:
| Component | RAM Usage |
|-----------|-----------|
| Node.js runtime | ~150 MB |
| Chrome browser (base) | ~200-250 MB |
| Dutchie menu page (loaded) | ~100-150 MB |
| **Per browser total** | **~350-450 MB** |
### Memory Math for Pod Limits
```
Pod memory limit: 2 GB (2000 MB)
Node.js runtime: -150 MB
Safety buffer: -100 MB
────────────────────────────────
Available for browsers: 1750 MB
Per browser + page: ~400 MB
Max browsers: 1750 ÷ 400 = ~4 browsers
Recommended: 3 browsers (leaves headroom for spikes)
```
### MAX_CONCURRENT_TASKS for Browser Tasks
| Browsers per Pod | RAM Used | Risk Level |
|------------------|----------|------------|
| 1 | ~500 MB | Very safe |
| 2 | ~900 MB | Safe |
| **3** | **~1.3 GB** | **Recommended** |
| 4 | ~1.7 GB | Tight (may OOM) |
| 5+ | >2 GB | Will OOM crash |
**CRITICAL**: `MAX_CONCURRENT_TASKS=3` is the maximum safe value for browser tasks with current pod limits.
### Scaling Strategy
Scale **horizontally** (more pods) rather than vertically (more concurrency per pod):
```
┌─────────────────────────────────────────────────────────────────────────┐
│ Cluster: 8 pods × 3 browsers = 24 concurrent tasks │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Pod 0 │ │ Pod 1 │ │ Pod 2 │ │ Pod 3 │ │
│ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Pod 4 │ │ Pod 5 │ │ Pod 6 │ │ Pod 7 │ │
│ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
```
### Browser Lifecycle Per Task
Each task gets a fresh browser with fresh IP/identity:
```
1. Claim task from queue
2. Get fresh proxy from pool
3. Launch browser with proxy
4. Run preflight (verify IP)
5. Execute scrape
6. Close browser
7. Repeat
```
This ensures:
- Fresh IP per task (proxy rotation)
- Fresh fingerprint per task (UA rotation)
- No cookie/session bleed between tasks
- Predictable memory usage
### Increasing Capacity
To handle more concurrent tasks:
1. **Add more pods** (up to 8 per CLAUDE.md limit)
2. **Increase pod memory** (allows 4 browsers per pod):
```yaml
resources:
limits:
memory: "2.5Gi" # from 2Gi
```
**DO NOT** simply increase `MAX_CONCURRENT_TASKS` without also increasing pod memory limits.
## Monitoring
### Logs

View File

@@ -0,0 +1,77 @@
apiVersion: v1
kind: Service
metadata:
name: scraper-worker
namespace: dispensary-scraper
labels:
app: scraper-worker
spec:
clusterIP: None # Headless service required for StatefulSet
selector:
app: scraper-worker
ports:
- port: 3010
name: http
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: scraper-worker
namespace: dispensary-scraper
spec:
serviceName: scraper-worker
replicas: 8
podManagementPolicy: Parallel # Start all pods at once
updateStrategy:
type: OnDelete # Pods only update when manually deleted - no automatic restarts
selector:
matchLabels:
app: scraper-worker
template:
metadata:
labels:
app: scraper-worker
spec:
terminationGracePeriodSeconds: 60
imagePullSecrets:
- name: regcred
containers:
- name: worker
image: code.cannabrands.app/creationshop/dispensary-scraper:latest
imagePullPolicy: Always
command: ["node"]
args: ["dist/tasks/task-worker.js"]
env:
- name: WORKER_MODE
value: "true"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: MAX_CONCURRENT_TASKS
value: "50"
- name: API_BASE_URL
value: http://scraper
- name: NODE_OPTIONS
value: --max-old-space-size=1500
envFrom:
- configMapRef:
name: scraper-config
- secretRef:
name: scraper-secrets
resources:
requests:
cpu: 100m
memory: 1Gi
limits:
cpu: 500m
memory: 2Gi
livenessProbe:
exec:
command:
- /bin/sh
- -c
- pgrep -f 'task-worker' > /dev/null
initialDelaySeconds: 10
periodSeconds: 30
failureThreshold: 3

View File

@@ -0,0 +1,168 @@
-- Migration 085: Add IP and fingerprint columns for preflight reporting
-- These columns were missing from migration 084
-- ===================================================================
-- PART 1: Add IP address columns to worker_registry
-- ===================================================================
-- IP address detected during curl/axios preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS curl_ip VARCHAR(45);
-- IP address detected during http/Puppeteer preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS http_ip VARCHAR(45);
-- ===================================================================
-- PART 2: Add fingerprint data column
-- ===================================================================
-- Browser fingerprint data captured during Puppeteer preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS fingerprint_data JSONB;
-- ===================================================================
-- PART 3: Add combined preflight status/timestamp for convenience
-- ===================================================================
-- Overall preflight status (computed from both transports)
-- Values: 'pending', 'passed', 'partial', 'failed'
-- - 'pending': neither transport tested
-- - 'passed': both transports passed (or http passed for browser-only)
-- - 'partial': at least one passed
-- - 'failed': no transport passed
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_status VARCHAR(20) DEFAULT 'pending';
-- Most recent preflight completion timestamp
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_at TIMESTAMPTZ;
-- ===================================================================
-- PART 4: Update function to set preflight status
-- ===================================================================
CREATE OR REPLACE FUNCTION update_worker_preflight(
p_worker_id VARCHAR(100),
p_transport VARCHAR(10), -- 'curl' or 'http'
p_status VARCHAR(20), -- 'passed', 'failed', 'skipped'
p_ip VARCHAR(45) DEFAULT NULL,
p_response_ms INTEGER DEFAULT NULL,
p_error TEXT DEFAULT NULL,
p_fingerprint JSONB DEFAULT NULL
) RETURNS VOID AS $$
DECLARE
v_curl_status VARCHAR(20);
v_http_status VARCHAR(20);
v_overall_status VARCHAR(20);
BEGIN
IF p_transport = 'curl' THEN
UPDATE worker_registry
SET
preflight_curl_status = p_status,
preflight_curl_at = NOW(),
preflight_curl_ms = p_response_ms,
preflight_curl_error = p_error,
curl_ip = p_ip,
updated_at = NOW()
WHERE worker_id = p_worker_id;
ELSIF p_transport = 'http' THEN
UPDATE worker_registry
SET
preflight_http_status = p_status,
preflight_http_at = NOW(),
preflight_http_ms = p_response_ms,
preflight_http_error = p_error,
http_ip = p_ip,
fingerprint_data = COALESCE(p_fingerprint, fingerprint_data),
updated_at = NOW()
WHERE worker_id = p_worker_id;
END IF;
-- Update overall preflight status
SELECT preflight_curl_status, preflight_http_status
INTO v_curl_status, v_http_status
FROM worker_registry
WHERE worker_id = p_worker_id;
-- Compute overall status
IF v_curl_status = 'passed' AND v_http_status = 'passed' THEN
v_overall_status := 'passed';
ELSIF v_curl_status = 'passed' OR v_http_status = 'passed' THEN
v_overall_status := 'partial';
ELSIF v_curl_status = 'failed' OR v_http_status = 'failed' THEN
v_overall_status := 'failed';
ELSE
v_overall_status := 'pending';
END IF;
UPDATE worker_registry
SET
preflight_status = v_overall_status,
preflight_at = NOW()
WHERE worker_id = p_worker_id;
END;
$$ LANGUAGE plpgsql;
-- ===================================================================
-- PART 5: Update v_active_workers view
-- ===================================================================
DROP VIEW IF EXISTS v_active_workers;
CREATE VIEW v_active_workers AS
SELECT
wr.id,
wr.worker_id,
wr.friendly_name,
wr.role,
wr.status,
wr.pod_name,
wr.hostname,
wr.started_at,
wr.last_heartbeat_at,
wr.last_task_at,
wr.tasks_completed,
wr.tasks_failed,
wr.current_task_id,
-- IP addresses from preflights
wr.curl_ip,
wr.http_ip,
-- Combined preflight status
wr.preflight_status,
wr.preflight_at,
-- Detailed preflight status per transport
wr.preflight_curl_status,
wr.preflight_http_status,
wr.preflight_curl_at,
wr.preflight_http_at,
wr.preflight_curl_error,
wr.preflight_http_error,
wr.preflight_curl_ms,
wr.preflight_http_ms,
-- Fingerprint data
wr.fingerprint_data,
-- Computed fields
EXTRACT(EPOCH FROM (NOW() - wr.last_heartbeat_at)) as seconds_since_heartbeat,
CASE
WHEN wr.status = 'offline' THEN 'offline'
WHEN wr.last_heartbeat_at < NOW() - INTERVAL '2 minutes' THEN 'stale'
WHEN wr.current_task_id IS NOT NULL THEN 'busy'
ELSE 'ready'
END as health_status,
-- Capability flags (can this worker handle curl/http tasks?)
(wr.preflight_curl_status = 'passed') as can_curl,
(wr.preflight_http_status = 'passed') as can_http
FROM worker_registry wr
WHERE wr.status != 'terminated'
ORDER BY wr.status = 'active' DESC, wr.last_heartbeat_at DESC;
-- ===================================================================
-- Comments
-- ===================================================================
COMMENT ON COLUMN worker_registry.curl_ip IS 'IP address detected during curl/axios preflight';
COMMENT ON COLUMN worker_registry.http_ip IS 'IP address detected during Puppeteer preflight';
COMMENT ON COLUMN worker_registry.fingerprint_data IS 'Browser fingerprint captured during Puppeteer preflight';
COMMENT ON COLUMN worker_registry.preflight_status IS 'Overall preflight status: pending, passed, partial, failed';
COMMENT ON COLUMN worker_registry.preflight_at IS 'Most recent preflight completion timestamp';

View File

@@ -0,0 +1,10 @@
-- Migration 086: Add proxy_url column for alternative URL formats
-- Some proxy providers use non-standard URL formats (e.g., host:port:user:pass)
-- This column allows storing the raw URL directly
-- Add proxy_url column - if set, used directly instead of constructing from parts
ALTER TABLE proxies
ADD COLUMN IF NOT EXISTS proxy_url TEXT;
-- Add comment
COMMENT ON COLUMN proxies.proxy_url IS 'Raw proxy URL (if provider uses non-standard format). Takes precedence over constructed URL from host/port/user/pass.';

View File

@@ -0,0 +1,30 @@
-- Migration 088: Extend raw_crawl_payloads for discovery payloads
--
-- Enables saving raw store data from Dutchie discovery crawls.
-- Store discovery returns raw dispensary objects - save them for historical analysis.
-- Add payload_type to distinguish product crawls from discovery crawls
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS payload_type VARCHAR(32) NOT NULL DEFAULT 'product';
-- Add state_code for discovery payloads (null for product payloads)
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS state_code VARCHAR(10);
-- Add store_count for discovery payloads (alternative to product_count)
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS store_count INTEGER;
-- Make dispensary_id nullable for discovery payloads
ALTER TABLE raw_crawl_payloads
ALTER COLUMN dispensary_id DROP NOT NULL;
-- Add index for discovery payload queries
CREATE INDEX IF NOT EXISTS idx_raw_crawl_payloads_type_state
ON raw_crawl_payloads(payload_type, state_code)
WHERE payload_type = 'store_discovery';
-- Comments
COMMENT ON COLUMN raw_crawl_payloads.payload_type IS 'Type: product (default), store_discovery';
COMMENT ON COLUMN raw_crawl_payloads.state_code IS 'State code for discovery payloads (e.g., AZ, MI)';
COMMENT ON COLUMN raw_crawl_payloads.store_count IS 'Number of stores in discovery payload';

View File

@@ -0,0 +1,105 @@
-- Migration 089: Immutable Schedules with Per-State Product Discovery
--
-- Key changes:
-- 1. Add is_immutable column - schedules can be edited but not deleted
-- 2. Add method column - all tasks use 'http' (Puppeteer transport)
-- 3. Store discovery weekly (168h)
-- 4. Per-state product_discovery schedules (4h default)
-- 5. Remove old payload_fetch schedules
-- =====================================================
-- 1) Add new columns to task_schedules
-- =====================================================
ALTER TABLE task_schedules
ADD COLUMN IF NOT EXISTS is_immutable BOOLEAN DEFAULT FALSE;
ALTER TABLE task_schedules
ADD COLUMN IF NOT EXISTS method VARCHAR(10) DEFAULT 'http';
-- =====================================================
-- 2) Update store_discovery to weekly and immutable
-- =====================================================
UPDATE task_schedules
SET interval_hours = 168, -- 7 days
is_immutable = TRUE,
method = 'http',
description = 'Discover new Dutchie stores weekly (HTTP transport)'
WHERE name = 'store_discovery_dutchie';
-- Insert if doesn't exist
INSERT INTO task_schedules (name, role, interval_hours, priority, description, is_immutable, method, platform, next_run_at)
VALUES ('store_discovery_dutchie', 'store_discovery', 168, 5, 'Discover new Dutchie stores weekly (HTTP transport)', TRUE, 'http', 'dutchie', NOW())
ON CONFLICT (name) DO UPDATE SET
interval_hours = 168,
is_immutable = TRUE,
method = 'http',
description = 'Discover new Dutchie stores weekly (HTTP transport)';
-- =====================================================
-- 3) Remove old payload_fetch and product_refresh_all schedules
-- =====================================================
DELETE FROM task_schedules WHERE name IN ('payload_fetch_all', 'product_refresh_all');
-- =====================================================
-- 4) Create per-state product_discovery schedules
-- =====================================================
-- One schedule per state that has dispensaries with active cannabis programs
INSERT INTO task_schedules (name, role, state_code, interval_hours, priority, description, is_immutable, method, enabled, next_run_at)
SELECT
'product_discovery_' || lower(s.code) AS name,
'product_discovery' AS role,
s.code AS state_code,
4 AS interval_hours, -- 4 hours default, editable
10 AS priority,
'Product discovery for ' || s.name || ' dispensaries (HTTP transport)' AS description,
TRUE AS is_immutable, -- Can edit but not delete
'http' AS method,
CASE WHEN s.is_active THEN TRUE ELSE FALSE END AS enabled,
-- Stagger start times: each state starts 5 minutes after the previous
NOW() + (ROW_NUMBER() OVER (ORDER BY s.code) * INTERVAL '5 minutes') AS next_run_at
FROM states s
WHERE EXISTS (
SELECT 1 FROM dispensaries d
WHERE d.state_id = s.id AND d.crawl_enabled = true
)
ON CONFLICT (name) DO UPDATE SET
is_immutable = TRUE,
method = 'http',
description = EXCLUDED.description;
-- Also create schedules for states that might have stores discovered later
INSERT INTO task_schedules (name, role, state_code, interval_hours, priority, description, is_immutable, method, enabled, next_run_at)
SELECT
'product_discovery_' || lower(s.code) AS name,
'product_discovery' AS role,
s.code AS state_code,
4 AS interval_hours,
10 AS priority,
'Product discovery for ' || s.name || ' dispensaries (HTTP transport)' AS description,
TRUE AS is_immutable,
'http' AS method,
FALSE AS enabled, -- Disabled until stores exist
NOW() + INTERVAL '1 hour'
FROM states s
WHERE NOT EXISTS (
SELECT 1 FROM task_schedules ts WHERE ts.name = 'product_discovery_' || lower(s.code)
)
ON CONFLICT (name) DO NOTHING;
-- =====================================================
-- 5) Make analytics_refresh immutable
-- =====================================================
UPDATE task_schedules
SET is_immutable = TRUE, method = 'http'
WHERE name = 'analytics_refresh';
-- =====================================================
-- 6) Add index for schedule lookups
-- =====================================================
CREATE INDEX IF NOT EXISTS idx_task_schedules_state_code
ON task_schedules(state_code)
WHERE state_code IS NOT NULL;
-- Comments
COMMENT ON COLUMN task_schedules.is_immutable IS 'If TRUE, schedule cannot be deleted (only edited)';
COMMENT ON COLUMN task_schedules.method IS 'Transport method: http (Puppeteer/browser) or curl (axios)';

View File

@@ -3,6 +3,24 @@
*
* Endpoints for managing worker tasks, viewing capacity metrics,
* and generating batch tasks.
*
* SCHEDULE MANAGEMENT (added 2025-12-12):
* This file now contains the canonical schedule management endpoints.
* The job_schedules table has been deprecated and all schedule management
* is now consolidated into task_schedules:
*
* Schedule endpoints:
* GET /api/tasks/schedules - List all schedules
* POST /api/tasks/schedules - Create new schedule
* GET /api/tasks/schedules/:id - Get schedule by ID
* PUT /api/tasks/schedules/:id - Update schedule
* DELETE /api/tasks/schedules/:id - Delete schedule
* DELETE /api/tasks/schedules - Bulk delete schedules
* POST /api/tasks/schedules/:id/run-now - Trigger schedule immediately
* POST /api/tasks/schedules/:id/toggle - Toggle schedule enabled/disabled
*
* Note: Schedule routes are defined BEFORE /:id to avoid route conflicts
* (Express matches routes in order, and "schedules" would match /:id otherwise)
*/
import { Router, Request, Response } from 'express';
@@ -131,6 +149,520 @@ router.get('/capacity/:role', async (req: Request, res: Response) => {
}
});
// ============================================================
// SCHEDULE MANAGEMENT ROUTES
// (Must be before /:id to avoid route conflicts)
// ============================================================
/**
* GET /api/tasks/schedules
* List all task schedules
*
* Returns schedules with is_immutable flag - immutable schedules can only
* have their interval_hours, priority, and enabled fields updated (not deleted).
*/
router.get('/schedules', async (req: Request, res: Response) => {
try {
const enabledOnly = req.query.enabled === 'true';
let query = `
SELECT id, name, role, description, enabled, interval_hours,
priority, state_code, platform, method,
COALESCE(is_immutable, false) as is_immutable,
last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
FROM task_schedules
`;
if (enabledOnly) {
query += ` WHERE enabled = true`;
}
query += ` ORDER BY
CASE role
WHEN 'store_discovery' THEN 1
WHEN 'product_discovery' THEN 2
WHEN 'analytics_refresh' THEN 3
ELSE 4
END,
state_code NULLS FIRST,
name`;
const result = await pool.query(query);
res.json({ schedules: result.rows });
} catch (error: unknown) {
console.error('Error listing schedules:', error);
res.status(500).json({ error: 'Failed to list schedules' });
}
});
/**
* DELETE /api/tasks/schedules
* Bulk delete schedules
*
* Immutable schedules are automatically skipped (not deleted).
*
* Body:
* - ids: number[] (required) - array of schedule IDs to delete
* - all: boolean (optional) - if true, delete all non-immutable schedules (ids ignored)
*/
router.delete('/schedules', async (req: Request, res: Response) => {
try {
const { ids, all } = req.body;
let result;
let skippedImmutable: { id: number; name: string }[] = [];
if (all === true) {
// First, find immutable schedules that will be skipped
const immutableResult = await pool.query(`
SELECT id, name FROM task_schedules WHERE is_immutable = true
`);
skippedImmutable = immutableResult.rows;
// Delete all non-immutable schedules
result = await pool.query(`
DELETE FROM task_schedules
WHERE COALESCE(is_immutable, false) = false
RETURNING id, name
`);
} else if (Array.isArray(ids) && ids.length > 0) {
// First, find which of the requested IDs are immutable
const immutableResult = await pool.query(`
SELECT id, name FROM task_schedules
WHERE id = ANY($1) AND is_immutable = true
`, [ids]);
skippedImmutable = immutableResult.rows;
// Delete only non-immutable schedules from the requested IDs
result = await pool.query(`
DELETE FROM task_schedules
WHERE id = ANY($1) AND COALESCE(is_immutable, false) = false
RETURNING id, name
`, [ids]);
} else {
return res.status(400).json({
error: 'Either provide ids array or set all=true',
});
}
res.json({
success: true,
deleted_count: result.rowCount,
deleted: result.rows,
skipped_immutable_count: skippedImmutable.length,
skipped_immutable: skippedImmutable,
message: skippedImmutable.length > 0
? `Deleted ${result.rowCount} schedule(s), skipped ${skippedImmutable.length} immutable schedule(s)`
: `Deleted ${result.rowCount} schedule(s)`,
});
} catch (error: unknown) {
console.error('Error bulk deleting schedules:', error);
res.status(500).json({ error: 'Failed to delete schedules' });
}
});
/**
* POST /api/tasks/schedules
* Create a new schedule
*
* Body:
* - name: string (required, unique)
* - role: TaskRole (required)
* - description: string (optional)
* - enabled: boolean (default true)
* - interval_hours: number (required)
* - priority: number (default 0)
* - state_code: string (optional)
* - platform: string (optional)
*/
router.post('/schedules', async (req: Request, res: Response) => {
try {
const {
name,
role,
description,
enabled = true,
interval_hours,
priority = 0,
state_code,
platform,
} = req.body;
if (!name || !role || !interval_hours) {
return res.status(400).json({
error: 'name, role, and interval_hours are required',
});
}
// Calculate next_run_at based on interval
const nextRunAt = new Date(Date.now() + interval_hours * 60 * 60 * 1000);
const result = await pool.query(`
INSERT INTO task_schedules
(name, role, description, enabled, interval_hours, priority, state_code, platform, next_run_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id, name, role, description, enabled, interval_hours,
priority, state_code, platform, last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
`, [name, role, description, enabled, interval_hours, priority, state_code, platform, nextRunAt]);
res.status(201).json(result.rows[0]);
} catch (error: any) {
if (error.code === '23505') {
// Unique constraint violation
return res.status(409).json({ error: 'A schedule with this name already exists' });
}
console.error('Error creating schedule:', error);
res.status(500).json({ error: 'Failed to create schedule' });
}
});
/**
* GET /api/tasks/schedules/:id
* Get a specific schedule by ID
*/
router.get('/schedules/:id', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
const result = await pool.query(`
SELECT id, name, role, description, enabled, interval_hours,
priority, state_code, platform, last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
FROM task_schedules
WHERE id = $1
`, [scheduleId]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
res.json(result.rows[0]);
} catch (error: unknown) {
console.error('Error getting schedule:', error);
res.status(500).json({ error: 'Failed to get schedule' });
}
});
/**
* PUT /api/tasks/schedules/:id
* Update an existing schedule
*
* For IMMUTABLE schedules, only these fields can be updated:
* - enabled (turn on/off)
* - interval_hours (change frequency)
* - priority (change priority)
*
* For regular schedules, all fields can be updated.
*/
router.put('/schedules/:id', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
const {
name,
role,
description,
enabled,
interval_hours,
priority,
state_code,
platform,
} = req.body;
// First check if schedule exists and if it's immutable
const checkResult = await pool.query(`
SELECT id, name, COALESCE(is_immutable, false) as is_immutable
FROM task_schedules WHERE id = $1
`, [scheduleId]);
if (checkResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const schedule = checkResult.rows[0];
const isImmutable = schedule.is_immutable;
// For immutable schedules, reject attempts to change protected fields
if (isImmutable) {
const protectedFields: string[] = [];
if (name !== undefined) protectedFields.push('name');
if (role !== undefined) protectedFields.push('role');
if (description !== undefined) protectedFields.push('description');
if (state_code !== undefined) protectedFields.push('state_code');
if (platform !== undefined) protectedFields.push('platform');
if (protectedFields.length > 0) {
return res.status(403).json({
error: 'Cannot modify protected fields on immutable schedule',
message: `Schedule "${schedule.name}" is immutable. Only enabled, interval_hours, and priority can be changed.`,
protected_fields: protectedFields,
allowed_fields: ['enabled', 'interval_hours', 'priority'],
});
}
}
// Build dynamic update query
const updates: string[] = [];
const values: any[] = [];
let paramIndex = 1;
// These fields can only be updated on non-immutable schedules
if (!isImmutable) {
if (name !== undefined) {
updates.push(`name = $${paramIndex++}`);
values.push(name);
}
if (role !== undefined) {
updates.push(`role = $${paramIndex++}`);
values.push(role);
}
if (description !== undefined) {
updates.push(`description = $${paramIndex++}`);
values.push(description);
}
if (state_code !== undefined) {
updates.push(`state_code = $${paramIndex++}`);
values.push(state_code || null);
}
if (platform !== undefined) {
updates.push(`platform = $${paramIndex++}`);
values.push(platform || null);
}
}
// These fields can be updated on ALL schedules (including immutable)
if (enabled !== undefined) {
updates.push(`enabled = $${paramIndex++}`);
values.push(enabled);
}
if (interval_hours !== undefined) {
updates.push(`interval_hours = $${paramIndex++}`);
values.push(interval_hours);
// Recalculate next_run_at if interval changed
const nextRunAt = new Date(Date.now() + interval_hours * 60 * 60 * 1000);
updates.push(`next_run_at = $${paramIndex++}`);
values.push(nextRunAt);
}
if (priority !== undefined) {
updates.push(`priority = $${paramIndex++}`);
values.push(priority);
}
if (updates.length === 0) {
return res.status(400).json({ error: 'No fields to update' });
}
updates.push('updated_at = NOW()');
values.push(scheduleId);
const result = await pool.query(`
UPDATE task_schedules
SET ${updates.join(', ')}
WHERE id = $${paramIndex}
RETURNING id, name, role, description, enabled, interval_hours,
priority, state_code, platform, method,
COALESCE(is_immutable, false) as is_immutable,
last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
`, values);
res.json(result.rows[0]);
} catch (error: any) {
if (error.code === '23505') {
return res.status(409).json({ error: 'A schedule with this name already exists' });
}
console.error('Error updating schedule:', error);
res.status(500).json({ error: 'Failed to update schedule' });
}
});
/**
* DELETE /api/tasks/schedules/:id
* Delete a schedule
*
* Immutable schedules cannot be deleted - they can only be disabled.
*/
router.delete('/schedules/:id', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
// First check if schedule exists and is immutable
const checkResult = await pool.query(`
SELECT id, name, COALESCE(is_immutable, false) as is_immutable
FROM task_schedules WHERE id = $1
`, [scheduleId]);
if (checkResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const schedule = checkResult.rows[0];
// Prevent deletion of immutable schedules
if (schedule.is_immutable) {
return res.status(403).json({
error: 'Cannot delete immutable schedule',
message: `Schedule "${schedule.name}" is immutable and cannot be deleted. You can disable it instead.`,
schedule_id: scheduleId,
is_immutable: true,
});
}
// Delete the schedule
await pool.query(`DELETE FROM task_schedules WHERE id = $1`, [scheduleId]);
res.json({
success: true,
message: `Schedule "${schedule.name}" deleted`,
});
} catch (error: unknown) {
console.error('Error deleting schedule:', error);
res.status(500).json({ error: 'Failed to delete schedule' });
}
});
/**
* POST /api/tasks/schedules/:id/run-now
* Manually trigger a scheduled task to run immediately
*
* For product_discovery schedules with state_code, this creates individual
* tasks for each store in that state (fans out properly).
*/
router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
// Get the full schedule
const scheduleResult = await pool.query(`
SELECT id, name, role, state_code, platform, priority, interval_hours, method
FROM task_schedules WHERE id = $1
`, [scheduleId]);
if (scheduleResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const schedule = scheduleResult.rows[0];
let tasksCreated = 0;
// For product_discovery with state_code, fan out to individual stores
if (schedule.role === 'product_discovery' && schedule.state_code) {
// Find stores in this state needing refresh
const storeResult = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND s.code = $1
-- No pending/running product_discovery task already
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role = 'product_discovery'
AND t.status IN ('pending', 'claimed', 'running')
)
ORDER BY d.last_fetch_at NULLS FIRST, d.id
`, [schedule.state_code]);
const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length > 0) {
// Create staggered tasks for all stores
const result = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery',
15, // 15 seconds stagger
schedule.platform || 'dutchie',
schedule.method || 'http'
);
tasksCreated = result.created;
} else {
// No stores need refresh - return early with message
return res.json({
success: true,
message: `No ${schedule.state_code} stores need refresh at this time`,
tasksCreated: 0,
stateCode: schedule.state_code,
});
}
} else if (schedule.role !== 'product_discovery') {
// For other schedules (store_discovery, analytics_refresh), create a single task
await taskService.createTask({
role: schedule.role,
platform: schedule.platform,
priority: schedule.priority + 10,
method: schedule.method,
});
tasksCreated = 1;
} else {
// product_discovery without state_code - shouldn't happen, reject
return res.status(400).json({
error: 'product_discovery schedules require a state_code',
});
}
// Update last_run_at on the schedule
await pool.query(`
UPDATE task_schedules
SET last_run_at = NOW(),
next_run_at = NOW() + (interval_hours || ' hours')::interval,
last_task_count = $2,
updated_at = NOW()
WHERE id = $1
`, [scheduleId, tasksCreated]);
res.json({
success: true,
message: `Schedule "${schedule.name}" triggered`,
tasksCreated,
stateCode: schedule.state_code,
});
} catch (error: unknown) {
console.error('Error running schedule:', error);
res.status(500).json({ error: 'Failed to run schedule' });
}
});
/**
* POST /api/tasks/schedules/:id/toggle
* Toggle a schedule's enabled status
*/
router.post('/schedules/:id/toggle', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
const result = await pool.query(`
UPDATE task_schedules
SET enabled = NOT enabled,
updated_at = NOW()
WHERE id = $1
RETURNING id, name, enabled
`, [scheduleId]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
res.json({
success: true,
schedule: result.rows[0],
message: result.rows[0].enabled
? `Schedule "${result.rows[0].name}" enabled`
: `Schedule "${result.rows[0].name}" disabled`,
});
} catch (error: unknown) {
console.error('Error toggling schedule:', error);
res.status(500).json({ error: 'Failed to toggle schedule' });
}
});
// ============================================================
// TASK-SPECIFIC ROUTES (with :id parameter)
// ============================================================
/**
* GET /api/tasks/:id
* Get a specific task by ID
@@ -598,6 +1130,342 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => {
}
});
// ============================================================
// STAGGERED BATCH TASK CREATION
// ============================================================
/**
* POST /api/tasks/batch/staggered
* Create multiple tasks with staggered start times
*
* This endpoint prevents resource contention when creating many tasks by
* staggering their scheduled_for timestamps. Each task becomes eligible
* for claiming only after its scheduled time.
*
* WORKFLOW:
* 1. Tasks created with scheduled_for = NOW() + (index * stagger_seconds)
* 2. Worker claims task only when scheduled_for <= NOW()
* 3. Worker runs preflight on EVERY task claim
* 4. If preflight passes, worker executes task
* 5. If preflight fails, task released back to pending for another worker
*
* Body:
* - dispensary_ids: number[] (required) - Array of dispensary IDs
* - role: TaskRole (required) - 'product_refresh' | 'product_discovery'
* - stagger_seconds: number (default: 15) - Seconds between each task start
* - platform: string (default: 'dutchie')
* - method: 'curl' | 'http' | null (default: null)
*/
router.post('/batch/staggered', async (req: Request, res: Response) => {
try {
const {
dispensary_ids,
role,
stagger_seconds = 15,
platform = 'dutchie',
method = null,
} = req.body;
if (!dispensary_ids || !Array.isArray(dispensary_ids) || dispensary_ids.length === 0) {
return res.status(400).json({ error: 'dispensary_ids array is required' });
}
if (!role) {
return res.status(400).json({ error: 'role is required' });
}
const result = await taskService.createStaggeredTasks(
dispensary_ids,
role as TaskRole,
stagger_seconds,
platform,
method
);
const totalDuration = (dispensary_ids.length - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
created: result.created,
task_ids: result.taskIds,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.created} staggered ${role} tasks (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
});
} catch (error: unknown) {
console.error('Error creating staggered tasks:', error);
res.status(500).json({ error: 'Failed to create staggered tasks' });
}
});
/**
* POST /api/tasks/batch/az-stores
* Convenience endpoint to create staggered tasks for Arizona stores
*
* Body:
* - total_tasks: number (default: 24) - Total tasks to create
* - stagger_seconds: number (default: 15) - Seconds between each task
* - split_roles: boolean (default: true) - Split between product_refresh and product_discovery
*/
router.post('/batch/az-stores', async (req: Request, res: Response) => {
try {
const {
total_tasks = 24,
stagger_seconds = 15,
split_roles = true,
} = req.body;
const result = await taskService.createAZStoreTasks(
total_tasks,
stagger_seconds,
split_roles
);
const totalDuration = (result.total - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
total: result.total,
product_refresh: result.product_refresh,
product_discovery: result.product_discovery,
task_ids: result.taskIds,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.total} staggered tasks for AZ stores (${result.product_refresh} refresh, ${result.product_discovery} discovery)`,
});
} catch (error: unknown) {
console.error('Error creating AZ store tasks:', error);
res.status(500).json({ error: 'Failed to create AZ store tasks' });
}
});
/**
* POST /api/tasks/batch/entry-point-discovery
* Create entry_point_discovery tasks for stores missing platform_dispensary_id
*
* This is idempotent - stores that already have platform_dispensary_id are skipped.
* Only creates tasks for stores with menu_url set and crawl_enabled = true.
*
* Body (optional):
* - state_code: string (optional) - Filter by state code
* - stagger_seconds: number (default: 5) - Seconds between tasks
* - force: boolean (default: false) - Re-run even for previously failed stores
*/
router.post('/batch/entry-point-discovery', async (req: Request, res: Response) => {
try {
const {
state_code,
stagger_seconds = 5,
force = false,
} = req.body;
// Find stores that need entry point discovery
const storeResult = await pool.query(`
SELECT d.id, d.name, d.menu_url
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE d.crawl_enabled = true
AND d.menu_url IS NOT NULL
AND d.platform_dispensary_id IS NULL
${state_code ? 'AND s.code = $1' : ''}
${!force ? "AND (d.id_resolution_status IS NULL OR d.id_resolution_status = 'pending')" : ''}
-- No pending/running entry_point_discovery task already
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role = 'entry_point_discovery'
AND t.status IN ('pending', 'claimed', 'running')
)
ORDER BY d.id
`, state_code ? [state_code.toUpperCase()] : []);
const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length === 0) {
return res.json({
success: true,
message: state_code
? `No ${state_code.toUpperCase()} stores need entry point discovery`
: 'No stores need entry point discovery',
tasks_created: 0,
});
}
// Create staggered tasks
const taskIds: number[] = [];
for (let i = 0; i < dispensaryIds.length; i++) {
const scheduledFor = new Date(Date.now() + i * stagger_seconds * 1000);
const result = await pool.query(`
INSERT INTO worker_tasks (role, dispensary_id, priority, scheduled_for, method)
VALUES ('entry_point_discovery', $1, 10, $2, 'http')
RETURNING id
`, [dispensaryIds[i], scheduledFor]);
taskIds.push(result.rows[0].id);
}
const totalDuration = dispensaryIds.length * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.json({
success: true,
tasks_created: taskIds.length,
task_ids: taskIds,
stores: storeResult.rows.map((r: { id: number; name: string }) => ({ id: r.id, name: r.name })),
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${taskIds.length} entry_point_discovery tasks${state_code ? ` for ${state_code.toUpperCase()}` : ''}`,
});
} catch (error: unknown) {
console.error('Error creating entry point discovery tasks:', error);
res.status(500).json({ error: 'Failed to create entry point discovery tasks' });
}
});
// ============================================================
// STATE-BASED CRAWL ENDPOINTS
// ============================================================
/**
* POST /api/tasks/crawl-state/:stateCode
* Create product_discovery tasks for all stores in a state
*
* This is the primary endpoint for triggering crawls by state.
* Creates staggered tasks for all crawl-enabled stores in the specified state.
*
* Params:
* - stateCode: State code (e.g., 'AZ', 'CA', 'CO')
*
* Body (optional):
* - stagger_seconds: number (default: 15) - Seconds between each task
* - priority: number (default: 10) - Task priority
* - method: 'curl' | 'http' | null (default: 'http')
*
* Returns:
* - tasks_created: Number of tasks created
* - stores_in_state: Total stores found for the state
* - skipped: Number skipped (already have active tasks)
*/
router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => {
try {
const stateCode = req.params.stateCode.toUpperCase();
const {
stagger_seconds = 15,
priority = 10,
method = 'http',
} = req.body;
// Verify state exists
const stateResult = await pool.query(`
SELECT id, code, name FROM states WHERE code = $1
`, [stateCode]);
if (stateResult.rows.length === 0) {
return res.status(404).json({
error: 'State not found',
state_code: stateCode,
});
}
const state = stateResult.rows[0];
// Get all crawl-enabled dispensaries in this state
const dispensariesResult = await pool.query(`
SELECT d.id, d.name
FROM dispensaries d
WHERE d.state_id = $1
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
ORDER BY d.last_fetch_at NULLS FIRST, d.id
`, [state.id]);
if (dispensariesResult.rows.length === 0) {
return res.status(200).json({
success: true,
message: `No crawl-enabled stores found in ${state.name}`,
state_code: stateCode,
state_name: state.name,
tasks_created: 0,
stores_in_state: 0,
});
}
const dispensaryIds = dispensariesResult.rows.map((d: { id: number }) => d.id);
// Create staggered tasks
const result = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery',
stagger_seconds,
'dutchie',
method
);
const totalDuration = (result.created - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
state_code: stateCode,
state_name: state.name,
tasks_created: result.created,
stores_in_state: dispensariesResult.rows.length,
skipped: dispensariesResult.rows.length - result.created,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.created} product_discovery tasks for ${state.name} (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
});
} catch (error: unknown) {
console.error('Error creating state crawl tasks:', error);
res.status(500).json({ error: 'Failed to create state crawl tasks' });
}
});
/**
* GET /api/tasks/states
* List all states with their store counts and crawl status
*/
router.get('/states', async (_req: Request, res: Response) => {
try {
const result = await pool.query(`
SELECT
s.code,
s.name,
COUNT(d.id)::int as total_stores,
COUNT(d.id) FILTER (WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NOT NULL)::int as crawl_enabled_stores,
COUNT(d.id) FILTER (WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NULL)::int as missing_platform_id,
MAX(d.last_fetch_at) as last_crawl_at,
(SELECT COUNT(*) FROM worker_tasks t
JOIN dispensaries d2 ON t.dispensary_id = d2.id
WHERE d2.state_id = s.id
AND t.role = 'product_discovery'
AND t.status IN ('pending', 'claimed', 'running'))::int as active_tasks
FROM states s
LEFT JOIN dispensaries d ON d.state_id = s.id
GROUP BY s.id, s.code, s.name
HAVING COUNT(d.id) > 0
ORDER BY COUNT(d.id) DESC
`);
res.json({
states: result.rows,
total_states: result.rows.length,
});
} catch (error: unknown) {
console.error('Error listing states:', error);
res.status(500).json({ error: 'Failed to list states' });
}
});
// ============================================================
// TASK POOL MANAGEMENT
// ============================================================
/**
* GET /api/tasks/pool/status
* Check if task pool is paused

View File

@@ -23,6 +23,8 @@
import { Router, Request, Response } from 'express';
import { pool } from '../db/pool';
import os from 'os';
import { runPuppeteerPreflightWithRetry } from '../services/puppeteer-preflight';
import { CrawlRotator } from '../services/crawl-rotator';
const router = Router();
@@ -153,7 +155,12 @@ router.post('/heartbeat', async (req: Request, res: Response) => {
active_task_count,
max_concurrent_tasks,
status = 'active',
resources
resources,
// Step tracking fields
current_step,
current_step_detail,
current_step_started_at,
task_steps,
} = req.body;
if (!worker_id) {
@@ -166,6 +173,11 @@ router.post('/heartbeat', async (req: Request, res: Response) => {
if (current_task_ids) metadata.current_task_ids = current_task_ids;
if (active_task_count !== undefined) metadata.active_task_count = active_task_count;
if (max_concurrent_tasks !== undefined) metadata.max_concurrent_tasks = max_concurrent_tasks;
// Step tracking
if (current_step) metadata.current_step = current_step;
if (current_step_detail) metadata.current_step_detail = current_step_detail;
if (current_step_started_at) metadata.current_step_started_at = current_step_started_at;
if (task_steps) metadata.task_steps = task_steps;
// Store resources in metadata jsonb column
const { rows } = await pool.query(`
@@ -250,12 +262,9 @@ router.post('/deregister', async (req: Request, res: Response) => {
// Release the name back to the pool
await pool.query('SELECT release_worker_name($1)', [worker_id]);
// Mark as terminated
// Delete the worker entry (clean shutdown)
const { rows } = await pool.query(`
UPDATE worker_registry
SET status = 'terminated',
current_task_id = NULL,
updated_at = NOW()
DELETE FROM worker_registry
WHERE worker_id = $1
RETURNING id, friendly_name
`, [worker_id]);
@@ -864,4 +873,58 @@ router.get('/pods', async (_req: Request, res: Response) => {
}
});
// ============================================================
// PREFLIGHT SMOKE TEST
// ============================================================
/**
* POST /api/worker-registry/preflight-test
* Run an HTTP (Puppeteer) preflight test and return results
*
* This is a smoke test endpoint to verify the preflight system works.
* Returns IP, fingerprint data, bot detection results, and products fetched.
*/
router.post('/preflight-test', async (_req: Request, res: Response) => {
try {
console.log('[PreflightTest] Starting HTTP preflight smoke test...');
// Create a temporary CrawlRotator for the test
const crawlRotator = new CrawlRotator();
// Run the Puppeteer preflight (with 1 retry)
const startTime = Date.now();
const result = await runPuppeteerPreflightWithRetry(crawlRotator, 1);
const duration = Date.now() - startTime;
console.log(`[PreflightTest] Completed in ${duration}ms - passed: ${result.passed}`);
res.json({
success: true,
test: 'http_preflight',
duration_ms: duration,
result: {
passed: result.passed,
proxy_ip: result.proxyIp,
fingerprint: result.fingerprint,
bot_detection: result.botDetection,
products_returned: result.productsReturned,
browser_user_agent: result.browserUserAgent,
ip_verified: result.ipVerified,
proxy_available: result.proxyAvailable,
proxy_connected: result.proxyConnected,
antidetect_ready: result.antidetectReady,
response_time_ms: result.responseTimeMs,
error: result.error
}
});
} catch (error: any) {
console.error('[PreflightTest] Error:', error.message);
res.status(500).json({
success: false,
test: 'http_preflight',
error: error.message
});
}
});
export default router;

View File

@@ -4,10 +4,25 @@
* Provider-agnostic worker management and job monitoring.
* Replaces legacy /api/dutchie-az/admin/schedules and /api/dutchie-az/monitor/* routes.
*
* DEPRECATION NOTE (2025-12-12):
* This file still queries job_schedules for backwards compatibility with
* the /api/workers endpoints that display worker status. However, the
* job_schedules table is DEPRECATED - all entries have been disabled.
*
* Schedule management has been consolidated into task_schedules:
* - Use /api/tasks/schedules for schedule CRUD operations
* - Use TasksDashboard.tsx (/admin/tasks) for schedule management UI
* - task_schedules uses interval_hours (simpler than base_interval_minutes + jitter)
*
* The /api/workers endpoints remain useful for:
* - Monitoring active workers and job status
* - K8s scaling controls
* - Job history and logs
*
* Endpoints:
* GET /api/workers - List all workers/schedules
* GET /api/workers/active - List currently active workers
* GET /api/workers/schedule - Get all job schedules
* GET /api/workers/schedule - Get all job schedules (DEPRECATED - use /api/tasks/schedules)
* GET /api/workers/:workerName - Get specific worker details
* GET /api/workers/:workerName/scope - Get worker's scope (states, etc.)
* GET /api/workers/:workerName/stats - Get worker statistics

View File

@@ -0,0 +1,284 @@
/**
* Bulk Proxy Import Script
*
* Imports proxies from various formats into the proxies table.
* Supports:
* - Standard format: http://user:pass@host:port
* - Colon format: http://host:port:user:pass
* - Simple format: host:port:user:pass (defaults to http)
*
* Usage:
* npx tsx src/scripts/import-proxies.ts < proxies.txt
* echo "http://host:port:user:pass" | npx tsx src/scripts/import-proxies.ts
* npx tsx src/scripts/import-proxies.ts --file proxies.txt
* npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"
*
* Options:
* --file <path> Read proxies from file (one per line)
* --url <url> Import a single proxy URL
* --max-connections Set max_connections for all imported proxies (default: 1)
* --dry-run Parse and show what would be imported without inserting
*/
import { getPool } from '../db/pool';
import * as fs from 'fs';
import * as readline from 'readline';
interface ParsedProxy {
protocol: string;
host: string;
port: number;
username?: string;
password?: string;
rawUrl: string;
}
/**
* Parse a proxy URL in various formats
*/
function parseProxyUrl(input: string): ParsedProxy | null {
const trimmed = input.trim();
if (!trimmed || trimmed.startsWith('#')) return null;
// Format 1: Standard URL format - http://user:pass@host:port
const standardMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):([^@]+)@([^:]+):(\d+)$/);
if (standardMatch) {
return {
protocol: standardMatch[1],
username: standardMatch[2],
password: standardMatch[3],
host: standardMatch[4],
port: parseInt(standardMatch[5], 10),
rawUrl: trimmed,
};
}
// Format 2: Standard URL without auth - http://host:port
const noAuthMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+)$/);
if (noAuthMatch) {
return {
protocol: noAuthMatch[1],
host: noAuthMatch[2],
port: parseInt(noAuthMatch[3], 10),
rawUrl: trimmed,
};
}
// Format 3: Colon format with protocol - http://host:port:user:pass
const colonWithProtocolMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+):([^:]+):(.+)$/);
if (colonWithProtocolMatch) {
return {
protocol: colonWithProtocolMatch[1],
host: colonWithProtocolMatch[2],
port: parseInt(colonWithProtocolMatch[3], 10),
username: colonWithProtocolMatch[4],
password: colonWithProtocolMatch[5],
rawUrl: trimmed, // Keep raw URL for non-standard format
};
}
// Format 4: Colon format without protocol - host:port:user:pass
const colonMatch = trimmed.match(/^([^:]+):(\d+):([^:]+):(.+)$/);
if (colonMatch) {
return {
protocol: 'http',
host: colonMatch[1],
port: parseInt(colonMatch[2], 10),
username: colonMatch[3],
password: colonMatch[4],
rawUrl: `http://${trimmed}`, // Construct raw URL
};
}
// Format 5: Simple host:port
const simpleMatch = trimmed.match(/^([^:]+):(\d+)$/);
if (simpleMatch) {
return {
protocol: 'http',
host: simpleMatch[1],
port: parseInt(simpleMatch[2], 10),
rawUrl: `http://${trimmed}`,
};
}
console.error(`[ImportProxies] Could not parse: ${trimmed}`);
return null;
}
/**
* Check if proxy URL is in non-standard format (needs proxy_url column)
*/
function isNonStandardFormat(rawUrl: string): boolean {
// Colon format: protocol://host:port:user:pass
return /^(https?|socks5):\/\/[^:]+:\d+:[^:]+:.+$/.test(rawUrl);
}
async function importProxies(proxies: ParsedProxy[], maxConnections: number, dryRun: boolean) {
if (dryRun) {
console.log('\n[ImportProxies] DRY RUN - Would import:');
for (const p of proxies) {
const needsRawUrl = isNonStandardFormat(p.rawUrl);
console.log(` ${p.host}:${p.port} (${p.protocol}) user=${p.username || 'none'} needsProxyUrl=${needsRawUrl}`);
}
console.log(`\nTotal: ${proxies.length} proxies`);
return;
}
const pool = getPool();
let inserted = 0;
let skipped = 0;
for (const proxy of proxies) {
try {
// Determine if we need to store the raw URL (non-standard format)
const needsRawUrl = isNonStandardFormat(proxy.rawUrl);
// Use different conflict resolution based on format
// Non-standard format: unique by proxy_url (session-based residential proxies)
// Standard format: unique by host/port/protocol
const query = needsRawUrl
? `
INSERT INTO proxies (host, port, protocol, username, password, max_connections, proxy_url, active)
VALUES ($1, $2, $3, $4, $5, $6, $7, true)
ON CONFLICT (proxy_url) WHERE proxy_url IS NOT NULL
DO UPDATE SET
max_connections = EXCLUDED.max_connections,
active = true,
updated_at = NOW()
RETURNING id, (xmax = 0) as is_insert
`
: `
INSERT INTO proxies (host, port, protocol, username, password, max_connections, proxy_url, active)
VALUES ($1, $2, $3, $4, $5, $6, $7, true)
ON CONFLICT (host, port, protocol)
DO UPDATE SET
username = EXCLUDED.username,
password = EXCLUDED.password,
max_connections = EXCLUDED.max_connections,
proxy_url = EXCLUDED.proxy_url,
active = true,
updated_at = NOW()
RETURNING id, (xmax = 0) as is_insert
`;
const result = await pool.query(query, [
proxy.host,
proxy.port,
proxy.protocol,
proxy.username || null,
proxy.password || null,
maxConnections,
needsRawUrl ? proxy.rawUrl : null,
]);
const isInsert = result.rows[0]?.is_insert;
const sessionId = proxy.password?.match(/session-([A-Z0-9]+)/)?.[1] || '';
const displayName = sessionId ? `session ${sessionId}` : `${proxy.host}:${proxy.port}`;
if (isInsert) {
inserted++;
console.log(`[ImportProxies] Inserted: ${displayName}`);
} else {
console.log(`[ImportProxies] Updated: ${displayName}`);
inserted++; // Count updates too
}
} catch (err: any) {
const sessionId = proxy.password?.match(/session-([A-Z0-9]+)/)?.[1] || '';
const displayName = sessionId ? `session ${sessionId}` : `${proxy.host}:${proxy.port}`;
console.error(`[ImportProxies] Error inserting ${displayName}: ${err.message}`);
skipped++;
}
}
console.log(`\n[ImportProxies] Complete: ${inserted} imported, ${skipped} skipped`);
// Notify any listening workers
try {
await pool.query(`NOTIFY proxy_added, 'bulk import'`);
console.log('[ImportProxies] Sent proxy_added notification to workers');
} catch {
// Ignore notification errors
}
}
async function readFromStdin(): Promise<string[]> {
return new Promise((resolve) => {
const lines: string[] = [];
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
});
rl.on('line', (line) => {
lines.push(line);
});
rl.on('close', () => {
resolve(lines);
});
});
}
async function main() {
const args = process.argv.slice(2);
let lines: string[] = [];
let maxConnections = 1;
let dryRun = false;
// Parse arguments
for (let i = 0; i < args.length; i++) {
if (args[i] === '--file' && args[i + 1]) {
const content = fs.readFileSync(args[i + 1], 'utf-8');
lines.push(...content.split('\n'));
i++;
} else if (args[i] === '--url' && args[i + 1]) {
lines.push(args[i + 1]);
i++;
} else if (args[i] === '--max-connections' && args[i + 1]) {
maxConnections = parseInt(args[i + 1], 10);
i++;
} else if (args[i] === '--dry-run') {
dryRun = true;
} else if (!args[i].startsWith('--')) {
// Treat as URL directly
lines.push(args[i]);
}
}
// If no lines yet, read from stdin
if (lines.length === 0) {
console.log('[ImportProxies] Reading from stdin...');
lines = await readFromStdin();
}
// Parse all lines
const proxies: ParsedProxy[] = [];
for (const line of lines) {
const parsed = parseProxyUrl(line);
if (parsed) {
proxies.push(parsed);
}
}
if (proxies.length === 0) {
console.error('[ImportProxies] No valid proxies found');
console.error('\nUsage:');
console.error(' npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"');
console.error(' npx tsx src/scripts/import-proxies.ts --file proxies.txt');
console.error(' echo "host:port:user:pass" | npx tsx src/scripts/import-proxies.ts');
console.error('\nSupported formats:');
console.error(' http://user:pass@host:port (standard)');
console.error(' http://host:port:user:pass (colon format)');
console.error(' host:port:user:pass (simple)');
process.exit(1);
}
console.log(`[ImportProxies] Parsed ${proxies.length} proxies (max_connections=${maxConnections})`);
await importProxies(proxies, maxConnections, dryRun);
}
main().catch((err) => {
console.error('[ImportProxies] Fatal error:', err);
process.exit(1);
});

View File

@@ -77,6 +77,11 @@ export interface Proxy {
country?: string;
countryCode?: string;
timezone?: string;
/**
* Raw proxy URL override. If set, used directly instead of constructing from parts.
* Supports non-standard formats like: http://host:port:user:pass
*/
proxyUrl?: string;
}
export interface ProxyStats {
@@ -129,6 +134,10 @@ export class ProxyRotator {
private proxies: Proxy[] = [];
private currentIndex: number = 0;
private lastRotation: Date = new Date();
private lastReloadAt: Date = new Date();
// Proxy reload interval - how often to check for proxy changes (default: 60 seconds)
private reloadIntervalMs: number = 60000;
constructor(pool?: Pool) {
this.pool = pool || null;
@@ -138,6 +147,13 @@ export class ProxyRotator {
this.pool = pool;
}
/**
* Set the reload interval for periodic proxy checks
*/
setReloadInterval(ms: number): void {
this.reloadIntervalMs = ms;
}
/**
* Load proxies from database
*/
@@ -167,22 +183,76 @@ export class ProxyRotator {
state,
country,
country_code as "countryCode",
timezone
timezone,
proxy_url as "proxyUrl"
FROM proxies
WHERE active = true
ORDER BY failure_count ASC, last_tested_at ASC NULLS FIRST
`);
this.proxies = result.rows;
this.lastReloadAt = new Date();
const totalCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections)`);
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections / threads)`);
} catch (error) {
console.warn(`[ProxyRotator] Could not load proxies: ${error}`);
this.proxies = [];
}
}
/**
* Check if proxy list is stale and needs reload
*/
isStale(): boolean {
const elapsed = Date.now() - this.lastReloadAt.getTime();
return elapsed > this.reloadIntervalMs;
}
/**
* Reload proxies if the cache is stale.
* This ensures workers pick up new proxies or see disabled proxies removed.
* Returns true if proxies were reloaded.
*/
async reloadIfStale(): Promise<boolean> {
if (!this.isStale()) {
return false;
}
const oldCount = this.proxies.length;
const oldCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
const oldIds = new Set(this.proxies.map(p => p.id));
await this.loadProxies();
const newCount = this.proxies.length;
const newCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
const newIds = new Set(this.proxies.map(p => p.id));
// Log changes
const added = this.proxies.filter(p => !oldIds.has(p.id));
const removed = [...oldIds].filter(id => !newIds.has(id));
if (added.length > 0 || removed.length > 0 || oldCapacity !== newCapacity) {
console.log(`[ProxyRotator] Reloaded proxies: ${oldCount}${newCount} proxies, ${oldCapacity}${newCapacity} threads`);
if (added.length > 0) {
console.log(`[ProxyRotator] Added: ${added.map(p => `${p.host}:${p.port} (${p.maxConnections} threads)`).join(', ')}`);
}
if (removed.length > 0) {
console.log(`[ProxyRotator] Removed: ${removed.join(', ')}`);
}
}
return true;
}
/**
* Get time since last reload in seconds
*/
getSecondsSinceReload(): number {
return Math.floor((Date.now() - this.lastReloadAt.getTime()) / 1000);
}
/**
* Get next proxy in rotation
*/
@@ -342,8 +412,24 @@ export class ProxyRotator {
/**
* Get proxy URL for HTTP client
* If proxy.proxyUrl is set, uses it directly (supports non-standard formats).
* Otherwise constructs standard format: protocol://user:pass@host:port
*/
getProxyUrl(proxy: Proxy): string {
// If proxyUrl is set, check if it needs conversion from non-standard format
if (proxy.proxyUrl) {
// Check if it's in non-standard format: http://host:port:user:pass
const colonFormatMatch = proxy.proxyUrl.match(/^(https?):\/\/([^:]+):(\d+):([^:]+):(.+)$/);
if (colonFormatMatch) {
// Convert to standard format: http://user:pass@host:port
const [, protocol, host, port, username, password] = colonFormatMatch;
return `${protocol}://${encodeURIComponent(username)}:${encodeURIComponent(password)}@${host}:${port}`;
}
// Already in standard format or unknown format - return as-is
return proxy.proxyUrl;
}
// Construct standard format from individual fields
const auth = proxy.username && proxy.password
? `${proxy.username}:${proxy.password}@`
: '';
@@ -584,6 +670,23 @@ export class CrawlRotator {
await this.proxy.loadProxies();
}
/**
* Reload proxy list if stale.
* Workers should call this periodically to pick up proxy changes.
* Returns true if proxies were reloaded.
*/
async reloadIfStale(): Promise<boolean> {
return this.proxy.reloadIfStale();
}
/**
* Set proxy reload interval in milliseconds.
* Default is 60 seconds.
*/
setProxyReloadInterval(ms: number): void {
this.proxy.setReloadInterval(ms);
}
/**
* Rotate proxy only (get new IP)
*/

View File

@@ -26,6 +26,34 @@ const TEST_PLATFORM_ID = '6405ef617056e8014d79101b';
const FINGERPRINT_DEMO_URL = 'https://demo.fingerprint.com/';
const AMIUNIQUE_URL = 'https://amiunique.org/fingerprint';
// IP geolocation API for timezone lookup (free, no key required)
const IP_API_URL = 'http://ip-api.com/json';
/**
* Look up timezone from IP address using ip-api.com
* Returns IANA timezone (e.g., 'America/New_York') or null on failure
*/
async function getTimezoneFromIp(ip: string): Promise<{ timezone: string; city?: string; region?: string } | null> {
try {
const axios = require('axios');
const response = await axios.get(`${IP_API_URL}/${ip}?fields=status,timezone,city,regionName`, {
timeout: 5000,
});
if (response.data?.status === 'success' && response.data?.timezone) {
return {
timezone: response.data.timezone,
city: response.data.city,
region: response.data.regionName,
};
}
return null;
} catch (err: any) {
console.log(`[PuppeteerPreflight] IP geolocation lookup failed: ${err.message}`);
return null;
}
}
export interface PuppeteerPreflightResult extends PreflightResult {
method: 'http';
/** Number of products returned (proves API access) */
@@ -42,6 +70,13 @@ export interface PuppeteerPreflightResult extends PreflightResult {
expectedProxyIp?: string;
/** Whether IP verification passed (detected IP matches proxy) */
ipVerified?: boolean;
/** Detected timezone from IP geolocation */
detectedTimezone?: string;
/** Detected location from IP geolocation */
detectedLocation?: {
city?: string;
region?: string;
};
}
/**
@@ -136,226 +171,82 @@ export async function runPuppeteerPreflight(
};
// =========================================================================
// STEP 1: Visit fingerprint.com demo to verify anti-detect and get IP
// STEP 1a: Get IP address directly via simple API (more reliable than scraping)
// =========================================================================
console.log(`[PuppeteerPreflight] Testing anti-detect at ${FINGERPRINT_DEMO_URL}...`);
console.log(`[PuppeteerPreflight] Getting proxy IP address...`);
try {
await page.goto(FINGERPRINT_DEMO_URL, {
waitUntil: 'networkidle2',
timeout: 30000,
});
result.proxyConnected = true; // If we got here, proxy is working
// Wait for fingerprint results to load
await page.waitForSelector('[data-test="visitor-id"]', { timeout: 10000 }).catch(() => {});
// Extract fingerprint data from the page
const fingerprintData = await page.evaluate(() => {
// Try to find the IP address displayed on the page
const ipElement = document.querySelector('[data-test="ip-address"]');
const ip = ipElement?.textContent?.trim() || null;
// Try to find bot detection info
const botElement = document.querySelector('[data-test="bot-detected"]');
const botDetected = botElement?.textContent?.toLowerCase().includes('true') || false;
// Try to find visitor ID (proves fingerprinting worked)
const visitorIdElement = document.querySelector('[data-test="visitor-id"]');
const visitorId = visitorIdElement?.textContent?.trim() || null;
// Alternative: look for common UI patterns if data-test attrs not present
let detectedIp = ip;
if (!detectedIp) {
// Look for IP in any element containing IP-like pattern
const allText = document.body.innerText;
const ipMatch = allText.match(/\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b/);
detectedIp = ipMatch ? ipMatch[1] : null;
}
return {
ip: detectedIp,
botDetected,
visitorId,
pageLoaded: !!document.body,
};
});
if (fingerprintData.ip) {
result.proxyIp = fingerprintData.ip;
console.log(`[PuppeteerPreflight] Detected IP: ${fingerprintData.ip}`);
// Verify IP matches expected proxy
if (expectedProxyHost) {
// Check if detected IP contains the proxy host (or is close match)
if (fingerprintData.ip === expectedProxyHost ||
expectedProxyHost.includes(fingerprintData.ip) ||
fingerprintData.ip.includes(expectedProxyHost.split('.').slice(0, 3).join('.'))) {
result.ipVerified = true;
console.log(`[PuppeteerPreflight] IP VERIFIED - matches proxy`);
} else {
console.log(`[PuppeteerPreflight] IP mismatch: expected ${expectedProxyHost}, got ${fingerprintData.ip}`);
// Don't fail - residential proxies often show different egress IPs
}
}
}
if (fingerprintData.visitorId) {
console.log(`[PuppeteerPreflight] Fingerprint visitor ID: ${fingerprintData.visitorId}`);
}
result.botDetection = {
detected: fingerprintData.botDetected,
};
if (fingerprintData.botDetected) {
console.log(`[PuppeteerPreflight] WARNING: Bot detection triggered!`);
} else {
console.log(`[PuppeteerPreflight] Anti-detect check: NOT detected as bot`);
result.antidetectReady = true;
}
} catch (fpErr: any) {
// Could mean proxy connection failed
console.log(`[PuppeteerPreflight] Fingerprint.com check failed: ${fpErr.message}`);
if (fpErr.message.includes('net::ERR_PROXY') || fpErr.message.includes('ECONNREFUSED')) {
result.error = `Proxy connection failed: ${fpErr.message}`;
return result;
}
// Try fallback: amiunique.org
console.log(`[PuppeteerPreflight] Trying fallback: ${AMIUNIQUE_URL}...`);
const ipApiResponse = await page.evaluate(async () => {
try {
await page.goto(AMIUNIQUE_URL, {
waitUntil: 'networkidle2',
timeout: 30000,
});
result.proxyConnected = true;
// Extract IP from amiunique.org page
const amiData = await page.evaluate(() => {
const allText = document.body.innerText;
const ipMatch = allText.match(/\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b/);
return {
ip: ipMatch ? ipMatch[1] : null,
pageLoaded: !!document.body,
};
});
if (amiData.ip) {
result.proxyIp = amiData.ip;
console.log(`[PuppeteerPreflight] Detected IP via amiunique.org: ${amiData.ip}`);
}
result.antidetectReady = true;
console.log(`[PuppeteerPreflight] amiunique.org fallback succeeded`);
} catch (amiErr: any) {
console.log(`[PuppeteerPreflight] amiunique.org fallback also failed: ${amiErr.message}`);
// Continue with Dutchie test anyway
result.proxyConnected = true;
result.antidetectReady = true;
}
}
// =========================================================================
// STEP 2: Test Dutchie API access (the real test)
// =========================================================================
const embedUrl = `https://dutchie.com/embedded-menu/${TEST_CNAME}?menuType=rec`;
console.log(`[PuppeteerPreflight] Establishing session at ${embedUrl}...`);
await page.goto(embedUrl, {
waitUntil: 'networkidle2',
timeout: 30000,
});
// Make GraphQL request from browser context
const graphqlResult = await page.evaluate(
async (platformId: string, hash: string) => {
try {
const variables = {
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: platformId,
pricingType: 'rec',
Status: 'Active', // CRITICAL: Must be 'Active' per CLAUDE.md
types: [],
useCache: true,
isDefaultSort: true,
sortBy: 'popularSortIdx',
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false,
},
page: 0,
perPage: 10, // Just need a few to prove it works
};
const extensions = {
persistedQuery: {
version: 1,
sha256Hash: hash,
},
};
const qs = new URLSearchParams({
operationName: 'FilteredProducts',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const sessionId = 'preflight-' + Date.now();
const response = await fetch(url, {
method: 'GET',
headers: {
Accept: 'application/json',
'content-type': 'application/json',
'x-dutchie-session': sessionId,
'apollographql-client-name': 'Marketplace (production)',
},
credentials: 'include',
});
if (!response.ok) {
return { error: `HTTP ${response.status}`, products: 0 };
}
const json = await response.json();
if (json.errors) {
return { error: JSON.stringify(json.errors).slice(0, 200), products: 0 };
}
const products = json?.data?.filteredProducts?.products || [];
return { error: null, products: products.length };
const response = await fetch('https://api.ipify.org?format=json');
const data = await response.json();
return { ip: data.ip, error: null };
} catch (err: any) {
return { error: err.message || 'Unknown error', products: 0 };
return { ip: null, error: err.message };
}
},
TEST_PLATFORM_ID,
FILTERED_PRODUCTS_HASH
);
});
if (ipApiResponse.ip) {
result.proxyIp = ipApiResponse.ip;
result.proxyConnected = true;
console.log(`[PuppeteerPreflight] Detected proxy IP: ${ipApiResponse.ip}`);
// Look up timezone from IP
const geoData = await getTimezoneFromIp(ipApiResponse.ip);
if (geoData) {
result.detectedTimezone = geoData.timezone;
result.detectedLocation = { city: geoData.city, region: geoData.region };
console.log(`[PuppeteerPreflight] IP Geolocation: ${geoData.city}, ${geoData.region} (${geoData.timezone})`);
// Set browser timezone to match proxy location via CDP
try {
const client = await page.target().createCDPSession();
await client.send('Emulation.setTimezoneOverride', { timezoneId: geoData.timezone });
console.log(`[PuppeteerPreflight] Browser timezone set to: ${geoData.timezone}`);
} catch (tzErr: any) {
console.log(`[PuppeteerPreflight] Failed to set browser timezone: ${tzErr.message}`);
}
} else {
console.log(`[PuppeteerPreflight] WARNING: Could not determine timezone from IP - timezone mismatch possible`);
}
} else {
console.log(`[PuppeteerPreflight] IP lookup failed: ${ipApiResponse.error || 'unknown error'}`);
}
} catch (ipErr: any) {
console.log(`[PuppeteerPreflight] IP API error: ${ipErr.message}`);
}
// =========================================================================
// STEP 2: Preflight complete - proxy verified via ipify.org
// We skip heavy fingerprint.com/amiunique.org tests - just verify proxy works
// The actual Dutchie test happens at task time.
// =========================================================================
// If we got an IP from ipify.org, proxy is working
if (result.proxyIp) {
result.proxyConnected = true;
result.antidetectReady = true; // Assume stealth plugin is working
}
result.responseTimeMs = Date.now() - startTime;
if (graphqlResult.error) {
result.error = `GraphQL error: ${graphqlResult.error}`;
console.log(`[PuppeteerPreflight] FAILED - ${result.error}`);
} else if (graphqlResult.products === 0) {
result.error = 'GraphQL returned 0 products';
console.log(`[PuppeteerPreflight] FAILED - No products returned`);
} else {
// If we got here with proxyConnected=true and antidetectReady=true, we're good
if (result.proxyConnected && result.antidetectReady) {
result.passed = true;
result.productsReturned = graphqlResult.products;
console.log(
`[PuppeteerPreflight] PASSED - Got ${graphqlResult.products} products in ${result.responseTimeMs}ms`
`[PuppeteerPreflight] PASSED - Proxy connected, anti-detect ready (${result.responseTimeMs}ms)`
);
if (result.proxyIp) {
console.log(`[PuppeteerPreflight] Browser IP via proxy: ${result.proxyIp}`);
}
} else if (result.proxyConnected) {
// Proxy works but anti-detect check failed - still pass (anti-detect is best-effort)
result.passed = true;
result.antidetectReady = true; // Assume ready since proxy works
console.log(
`[PuppeteerPreflight] PASSED - Proxy connected (anti-detect check skipped, ${result.responseTimeMs}ms)`
);
} else {
result.error = result.error || 'Proxy connection failed';
console.log(`[PuppeteerPreflight] FAILED - ${result.error}`);
}
} catch (err: any) {
result.error = `Browser error: ${err.message || 'Unknown error'}`;

View File

@@ -26,6 +26,12 @@ interface TaskSchedule {
next_run_at: Date | null;
state_code: string | null;
priority: number;
method: 'curl' | 'http' | null;
is_immutable: boolean;
description: string | null;
platform: string | null;
last_task_count: number | null;
last_error: string | null;
}
class TaskScheduler {
@@ -84,24 +90,22 @@ class TaskScheduler {
/**
* Ensure default schedules exist in the database
* Per TASK_WORKFLOW_2024-12-10.md: Creates schedules if they don't exist
*
* NOTE: Per-state product_discovery schedules are created by migration 089.
* This only creates core immutable schedules that should exist regardless.
*/
private async ensureDefaultSchedules(): Promise<void> {
// Per TASK_WORKFLOW_2024-12-10.md: Default schedules for task generation
// NOTE: payload_fetch replaces direct product_refresh - it chains to product_refresh
// Core schedules - all use HTTP transport for browser-based scraping
const defaults = [
{
name: 'payload_fetch_all',
role: 'payload_fetch' as TaskRole,
interval_hours: 4,
priority: 0,
description: 'Fetch payloads from Dutchie API for all crawl-enabled stores every 4 hours. Chains to product_refresh.',
},
{
name: 'store_discovery_dutchie',
role: 'store_discovery' as TaskRole,
interval_hours: 24,
interval_hours: 168, // Weekly
priority: 5,
description: 'Discover new Dutchie stores daily',
description: 'Discover new Dutchie stores weekly (HTTP transport)',
method: 'http',
is_immutable: true,
platform: 'dutchie',
},
{
name: 'analytics_refresh',
@@ -109,16 +113,21 @@ class TaskScheduler {
interval_hours: 6,
priority: 0,
description: 'Refresh analytics materialized views every 6 hours',
method: 'http',
is_immutable: true,
platform: null,
},
];
for (const sched of defaults) {
try {
await pool.query(`
INSERT INTO task_schedules (name, role, interval_hours, priority, description, enabled, next_run_at)
VALUES ($1, $2, $3, $4, $5, true, NOW())
ON CONFLICT (name) DO NOTHING
`, [sched.name, sched.role, sched.interval_hours, sched.priority, sched.description]);
INSERT INTO task_schedules (name, role, interval_hours, priority, description, method, is_immutable, platform, enabled, next_run_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true, NOW())
ON CONFLICT (name) DO UPDATE SET
method = EXCLUDED.method,
is_immutable = EXCLUDED.is_immutable
`, [sched.name, sched.role, sched.interval_hours, sched.priority, sched.description, sched.method, sched.is_immutable, sched.platform]);
} catch (err: any) {
// Table may not exist yet - will be created by migration
if (!err.message.includes('does not exist')) {
@@ -192,16 +201,27 @@ class TaskScheduler {
/**
* Execute a schedule and create tasks
* Per TASK_WORKFLOW_2024-12-10.md: Different logic per role
*
* TRANSPORT MODES:
* - All schedules now use HTTP transport (Puppeteer/browser)
* - Per-state product_discovery schedules process one state at a time
* - Workers must pass HTTP preflight to claim HTTP tasks
*/
private async executeSchedule(schedule: TaskSchedule): Promise<number> {
switch (schedule.role) {
case 'product_discovery':
// Per-state product discovery using HTTP transport
return this.generateProductDiscoveryTasks(schedule);
case 'payload_fetch':
// Per TASK_WORKFLOW_2024-12-10.md: payload_fetch replaces direct product_refresh
return this.generatePayloadFetchTasks(schedule);
// DEPRECATED: Legacy payload_fetch redirects to product_discovery
console.log(`[TaskScheduler] payload_fetch is deprecated, using product_discovery instead`);
return this.generateProductDiscoveryTasks(schedule);
case 'product_refresh':
// Legacy - kept for manual triggers, but scheduled crawls use payload_fetch
return this.generatePayloadFetchTasks(schedule);
// DEPRECATED: Legacy product_refresh redirects to product_discovery
console.log(`[TaskScheduler] product_refresh is deprecated, using product_discovery instead`);
return this.generateProductDiscoveryTasks(schedule);
case 'store_discovery':
return this.generateStoreDiscoveryTasks(schedule);
@@ -216,50 +236,69 @@ class TaskScheduler {
}
/**
* Generate payload_fetch tasks for stores that need crawling
* Per TASK_WORKFLOW_2024-12-10.md: payload_fetch hits API, saves to disk, chains to product_refresh
* Generate product_discovery tasks for stores in a specific state
* Uses HTTP transport (Puppeteer/browser) for all tasks
*
* Per-state scheduling allows:
* - Different crawl frequencies per state (e.g., AZ=4h, MI=6h)
* - Better rate limit management (one state at a time)
* - Easier debugging and monitoring per state
*/
private async generatePayloadFetchTasks(schedule: TaskSchedule): Promise<number> {
// Per TASK_WORKFLOW_2024-12-10.md: Find stores needing refresh
private async generateProductDiscoveryTasks(schedule: TaskSchedule): Promise<number> {
// state_code is required for per-state schedules
if (!schedule.state_code) {
console.warn(`[TaskScheduler] Schedule ${schedule.name} has no state_code, skipping`);
return 0;
}
// Find stores in this state needing refresh
const result = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
-- No pending/running payload_fetch or product_refresh task already
AND s.code = $1
-- No pending/running product_discovery task already
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role IN ('payload_fetch', 'product_refresh')
AND t.role = 'product_discovery'
AND t.status IN ('pending', 'claimed', 'running')
)
-- Never fetched OR last fetch > interval ago
AND (
d.last_fetch_at IS NULL
OR d.last_fetch_at < NOW() - ($1 || ' hours')::interval
OR d.last_fetch_at < NOW() - ($2 || ' hours')::interval
)
${schedule.state_code ? 'AND d.state_id = (SELECT id FROM states WHERE code = $2)' : ''}
`, schedule.state_code ? [schedule.interval_hours, schedule.state_code] : [schedule.interval_hours]);
ORDER BY d.last_fetch_at NULLS FIRST, d.id
`, [schedule.state_code, schedule.interval_hours]);
const dispensaryIds = result.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length === 0) {
console.log(`[TaskScheduler] No stores in ${schedule.state_code} need refresh`);
return 0;
}
// Per TASK_WORKFLOW_2024-12-10.md: Create payload_fetch tasks (they chain to product_refresh)
const tasks = dispensaryIds.map((id: number) => ({
role: 'payload_fetch' as TaskRole,
dispensary_id: id,
priority: schedule.priority,
}));
console.log(`[TaskScheduler] Creating ${dispensaryIds.length} product_discovery tasks for ${schedule.state_code}`);
return taskService.createTasks(tasks);
// Create product_discovery tasks with HTTP transport
// Stagger by 15 seconds to prevent overwhelming proxies
const { created } = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery',
15, // 15 seconds apart
schedule.platform || 'dutchie',
'http' // Force HTTP transport
);
return created;
}
/**
* Generate store_discovery tasks
* Per TASK_WORKFLOW_2024-12-10.md: One task per platform
* Uses HTTP transport (Puppeteer/browser) for browser-based discovery
*/
private async generateStoreDiscoveryTasks(schedule: TaskSchedule): Promise<number> {
// Check if discovery task already pending
@@ -276,8 +315,9 @@ class TaskScheduler {
await taskService.createTask({
role: 'store_discovery',
platform: 'dutchie',
platform: schedule.platform || 'dutchie',
priority: schedule.priority,
method: 'http', // Force HTTP transport for browser-based discovery
});
return 1;
@@ -310,11 +350,39 @@ class TaskScheduler {
/**
* Get all schedules for dashboard display
* Returns schedules with full metadata including immutability flag
*/
async getSchedules(): Promise<TaskSchedule[]> {
try {
const result = await pool.query(`
SELECT * FROM task_schedules ORDER BY name
SELECT
id,
name,
role,
enabled,
interval_hours,
last_run_at,
next_run_at,
state_code,
priority,
method,
COALESCE(is_immutable, false) as is_immutable,
description,
platform,
last_task_count,
last_error,
created_at,
updated_at
FROM task_schedules
ORDER BY
CASE role
WHEN 'store_discovery' THEN 1
WHEN 'product_discovery' THEN 2
WHEN 'analytics_refresh' THEN 3
ELSE 4
END,
state_code NULLS FIRST,
name
`);
return result.rows as TaskSchedule[];
} catch {
@@ -322,8 +390,24 @@ class TaskScheduler {
}
}
/**
* Get a single schedule by ID
*/
async getSchedule(id: number): Promise<TaskSchedule | null> {
try {
const result = await pool.query(`
SELECT * FROM task_schedules WHERE id = $1
`, [id]);
return result.rows[0] as TaskSchedule || null;
} catch {
return null;
}
}
/**
* Update a schedule
* Allows updating: enabled, interval_hours, priority
* Does NOT allow updating: name, role, state_code, is_immutable
*/
async updateSchedule(id: number, updates: Partial<TaskSchedule>): Promise<void> {
const setClauses: string[] = [];
@@ -355,6 +439,33 @@ class TaskScheduler {
`, values);
}
/**
* Delete a schedule (only if not immutable)
* Returns true if deleted, false if immutable
*/
async deleteSchedule(id: number): Promise<{ deleted: boolean; reason?: string }> {
// Check if schedule is immutable
const result = await pool.query(`
SELECT name, is_immutable FROM task_schedules WHERE id = $1
`, [id]);
if (result.rows.length === 0) {
return { deleted: false, reason: 'Schedule not found' };
}
const schedule = result.rows[0];
if (schedule.is_immutable) {
return {
deleted: false,
reason: `Schedule "${schedule.name}" is immutable and cannot be deleted. You can disable it instead.`
};
}
await pool.query(`DELETE FROM task_schedules WHERE id = $1`, [id]);
return { deleted: true };
}
/**
* Trigger a schedule to run immediately
*/
@@ -369,6 +480,46 @@ class TaskScheduler {
return this.executeSchedule(result.rows[0] as TaskSchedule);
}
/**
* Get schedule statistics for dashboard
*/
async getScheduleStats(): Promise<{
total: number;
enabled: number;
byRole: Record<string, number>;
byState: Record<string, number>;
}> {
try {
const result = await pool.query(`
SELECT
COUNT(*)::int as total,
SUM(CASE WHEN enabled THEN 1 ELSE 0 END)::int as enabled_count,
role,
state_code
FROM task_schedules
GROUP BY role, state_code
`);
let total = 0;
let enabled = 0;
const byRole: Record<string, number> = {};
const byState: Record<string, number> = {};
for (const row of result.rows) {
total += row.total;
enabled += row.enabled_count;
byRole[row.role] = (byRole[row.role] || 0) + row.total;
if (row.state_code) {
byState[row.state_code] = (byState[row.state_code] || 0) + row.total;
}
}
return { total, enabled, byRole, byState };
} catch {
return { total: 0, enabled: 0, byRole: {}, byState: {} };
}
}
}
// Per TASK_WORKFLOW_2024-12-10.md: Singleton instance

View File

@@ -41,9 +41,16 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
const dispensary = dispResult.rows[0];
// If already has platform_dispensary_id, we're done
// If already has platform_dispensary_id, we're done (idempotent)
if (dispensary.platform_dispensary_id) {
console.log(`[EntryPointDiscovery] Dispensary ${dispensaryId} already has platform ID: ${dispensary.platform_dispensary_id}`);
// Update last_id_resolution_at to show we checked it
await pool.query(`
UPDATE dispensaries
SET last_id_resolution_at = NOW(),
id_resolution_status = 'resolved'
WHERE id = $1
`, [dispensaryId]);
return {
success: true,
alreadyResolved: true,
@@ -51,6 +58,15 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
};
}
// Increment attempt counter
await pool.query(`
UPDATE dispensaries
SET id_resolution_attempts = COALESCE(id_resolution_attempts, 0) + 1,
last_id_resolution_at = NOW(),
id_resolution_status = 'pending'
WHERE id = $1
`, [dispensaryId]);
const menuUrl = dispensary.menu_url;
if (!menuUrl) {
return { success: false, error: `Dispensary ${dispensaryId} has no menu_url` };
@@ -114,7 +130,7 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
console.log(`[EntryPointDiscovery] Failed to resolve ${slug}: ${reason}`);
// Mark as failed resolution but keep menu_type as dutchie
// Mark as failed resolution
await pool.query(`
UPDATE dispensaries
SET
@@ -123,9 +139,11 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
WHEN $2 = 403 THEN 'blocked'
ELSE 'dutchie'
END,
id_resolution_status = 'failed',
id_resolution_error = $3,
updated_at = NOW()
WHERE id = $1
`, [dispensaryId, result.httpStatus || 0]);
`, [dispensaryId, result.httpStatus || 0, reason]);
return {
success: false,
@@ -149,6 +167,8 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
platform_dispensary_id = $2,
menu_type = 'dutchie',
crawl_enabled = true,
id_resolution_status = 'resolved',
id_resolution_error = NULL,
updated_at = NOW()
WHERE id = $1
`, [dispensaryId, platformId]);

View File

@@ -2,11 +2,18 @@
* Task Handlers Index
*
* Exports all task handlers for the task worker.
*
* Product Discovery:
* - handleProductDiscoveryCurl: curl/axios based (for curl transport)
* - handleProductDiscoveryHttp: Puppeteer browser-based (for http transport)
*/
export { handleProductDiscovery } from './product-discovery';
export { handleProductDiscovery as handleProductDiscoveryCurl } from './product-discovery-curl';
export { handleProductDiscoveryHttp } from './product-discovery-http';
export { handlePayloadFetch as handlePayloadFetchCurl } from './payload-fetch-curl';
export { handleProductRefresh } from './product-refresh';
export { handleStoreDiscovery } from './store-discovery';
export { handleStoreDiscoveryHttp } from './store-discovery-http';
export { handleEntryPointDiscovery } from './entry-point-discovery';
export { handleAnalyticsRefresh } from './analytics-refresh';
export { handleWhoami } from './whoami';

View File

@@ -28,7 +28,7 @@ import { saveRawPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult> {
const { pool, task } = ctx;
const { pool, task, updateStep } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
@@ -39,6 +39,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
updateStep('loading', 'Loading dispensary info');
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
@@ -67,6 +68,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 2: Start stealth session
// ============================================================
updateStep('preflight', 'Starting stealth session');
const session = startSession();
console.log(`[PayloadFetch] Session started: ${session.sessionId}`);
@@ -75,6 +77,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 3: Fetch products via GraphQL (Status: 'All')
// ============================================================
updateStep('fetching', 'Executing GraphQL query');
const allProducts: any[] = [];
let page = 0;
let totalCount = 0;
@@ -162,6 +165,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// STEP 4: Save raw payload to filesystem
// Per TASK_WORKFLOW_2024-12-10.md: Metadata/Payload separation
// ============================================================
updateStep('saving', `Saving ${allProducts.length} products`);
const rawPayload = {
dispensaryId,
platformId,

View File

@@ -13,7 +13,7 @@
*/
import { TaskContext, TaskResult } from '../task-worker';
import { handlePayloadFetch } from './payload-fetch';
import { handlePayloadFetch } from './payload-fetch-curl';
export async function handleProductDiscovery(ctx: TaskContext): Promise<TaskResult> {
const { task } = ctx;

View File

@@ -0,0 +1,368 @@
/**
* Product Discovery HTTP Handler (Browser-based)
*
* Uses Puppeteer + StealthPlugin to fetch products via browser context.
* Based on test-intercept.js pattern from ORGANIC_SCRAPING_GUIDE.md.
*
* This handler:
* 1. Loads dispensary info
* 2. Launches headless browser with proxy (if provided)
* 3. Establishes session by visiting embedded menu
* 4. Fetches ALL products via GraphQL from browser context
* 5. Saves raw payload to filesystem (gzipped)
* 6. Records metadata in raw_crawl_payloads table
* 7. Queues product_refresh task to process the payload
*
* Why browser-based:
* - Works with session-based residential proxies (Evomi)
* - Lower detection risk than curl/axios
* - Real Chrome TLS fingerprint
*/
import { TaskContext, TaskResult } from '../task-worker';
import { saveRawPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
// GraphQL hash for FilteredProducts query - MUST match CLAUDE.md
const FILTERED_PRODUCTS_HASH = 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0';
export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator, updateStep } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
return { success: false, error: 'No dispensary_id specified for product_discovery task' };
}
let browser: any = null;
try {
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
updateStep('loading', 'Loading dispensary info');
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
FROM dispensaries
WHERE id = $1 AND crawl_enabled = true
`, [dispensaryId]);
if (dispResult.rows.length === 0) {
return { success: false, error: `Dispensary ${dispensaryId} not found or not crawl_enabled` };
}
const dispensary = dispResult.rows[0];
const platformId = dispensary.platform_dispensary_id;
if (!platformId) {
return { success: false, error: `Dispensary ${dispensaryId} has no platform_dispensary_id` };
}
// Extract cName from menu_url
const cNameMatch = dispensary.menu_url?.match(/\/(?:embedded-menu|dispensary)\/([^/?]+)/);
const cName = cNameMatch ? cNameMatch[1] : 'dispensary';
console.log(`[ProductDiscoveryHTTP] Starting for ${dispensary.name} (ID: ${dispensaryId})`);
console.log(`[ProductDiscoveryHTTP] Platform ID: ${platformId}, cName: ${cName}`);
await ctx.heartbeat();
// ============================================================
// STEP 2: Setup Puppeteer with proxy
// ============================================================
updateStep('preflight', `Launching browser for ${dispensary.name}`);
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
// Get proxy from CrawlRotator if available
let proxyUrl: string | null = null;
if (crawlRotator) {
const currentProxy = crawlRotator.proxy.getCurrent();
if (currentProxy) {
proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy);
console.log(`[ProductDiscoveryHTTP] Using proxy: ${currentProxy.host}:${currentProxy.port}`);
}
}
// Build browser args
const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox'];
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`);
}
browser = await puppeteer.launch({
headless: 'new',
args: browserArgs,
});
const page = await browser.newPage();
// Setup proxy auth if needed
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
if (proxyUrlParsed.username && proxyUrlParsed.password) {
await page.authenticate({
username: decodeURIComponent(proxyUrlParsed.username),
password: decodeURIComponent(proxyUrlParsed.password),
});
}
}
await ctx.heartbeat();
// ============================================================
// STEP 3: Establish session by visiting embedded menu
// ============================================================
updateStep('navigating', `Loading menu page`);
const embedUrl = `https://dutchie.com/embedded-menu/${cName}?menuType=rec`;
console.log(`[ProductDiscoveryHTTP] Establishing session at ${embedUrl}...`);
await page.goto(embedUrl, {
waitUntil: 'networkidle2',
timeout: 60000,
});
// ============================================================
// STEP 3b: Detect and dismiss age gate modal
// ============================================================
try {
// Wait a bit for age gate to appear
await page.waitForTimeout(1500);
// Look for common age gate selectors
const ageGateSelectors = [
'button[data-testid="age-gate-submit"]',
'button:has-text("Yes")',
'button:has-text("I am 21")',
'button:has-text("Enter")',
'[class*="age-gate"] button',
'[class*="AgeGate"] button',
'[data-test="age-gate-button"]',
];
for (const selector of ageGateSelectors) {
try {
const button = await page.$(selector);
if (button) {
await button.click();
console.log(`[ProductDiscoveryHTTP] Age gate dismissed via: ${selector}`);
await page.waitForTimeout(1000); // Wait for modal to close
break;
}
} catch {
// Selector not found, try next
}
}
// Also try evaluating in page context for button with specific text
await page.evaluate(() => {
const buttons = Array.from(document.querySelectorAll('button'));
for (const btn of buttons) {
const text = btn.textContent?.toLowerCase() || '';
if (text.includes('yes') || text.includes('enter') || text.includes('21')) {
(btn as HTMLButtonElement).click();
return true;
}
}
return false;
});
} catch (ageGateErr) {
// Age gate might not be present, continue
console.log(`[ProductDiscoveryHTTP] No age gate detected or already dismissed`);
}
console.log(`[ProductDiscoveryHTTP] Session established, fetching products...`);
await ctx.heartbeat();
// ============================================================
// STEP 4: Fetch ALL products via GraphQL from browser context
// ============================================================
updateStep('fetching', `Executing GraphQL query`);
const result = await page.evaluate(async (platformId: string, graphqlHash: string) => {
const allProducts: any[] = [];
const logs: string[] = [];
let pageNum = 0;
const perPage = 100;
let totalCount = 0;
const sessionId = 'browser-session-' + Date.now();
try {
while (pageNum < 30) { // Max 30 pages = 3000 products
const variables = {
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: platformId,
pricingType: 'rec',
Status: 'Active', // CRITICAL: Must be 'Active', not null
types: [],
useCache: true,
isDefaultSort: true,
sortBy: 'popularSortIdx',
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false,
},
page: pageNum,
perPage: perPage,
};
const extensions = {
persistedQuery: {
version: 1,
sha256Hash: graphqlHash,
},
};
// Build GET URL like the browser does
const qs = new URLSearchParams({
operationName: 'FilteredProducts',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
'x-dutchie-session': sessionId,
'apollographql-client-name': 'Marketplace (production)',
},
credentials: 'include',
});
logs.push(`Page ${pageNum}: HTTP ${response.status}`);
if (!response.ok) {
const text = await response.text();
logs.push(`HTTP error: ${response.status} - ${text.slice(0, 200)}`);
break;
}
const json = await response.json();
if (json.errors) {
logs.push(`GraphQL error: ${JSON.stringify(json.errors).slice(0, 200)}`);
break;
}
const data = json?.data?.filteredProducts;
if (!data || !data.products) {
logs.push('No products in response');
break;
}
const products = data.products;
allProducts.push(...products);
if (pageNum === 0) {
totalCount = data.queryInfo?.totalCount || 0;
logs.push(`Total reported: ${totalCount}`);
}
logs.push(`Got ${products.length} products (total: ${allProducts.length}/${totalCount})`);
if (allProducts.length >= totalCount || products.length < perPage) {
break;
}
pageNum++;
// Small delay between pages to be polite
await new Promise(r => setTimeout(r, 200));
}
} catch (err: any) {
logs.push(`Error: ${err.message}`);
}
return { products: allProducts, totalCount, logs };
}, platformId, FILTERED_PRODUCTS_HASH);
// Print logs from browser context
result.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
console.log(`[ProductDiscoveryHTTP] Fetched ${result.products.length} products (API reported ${result.totalCount})`);
await browser.close();
browser = null;
if (result.products.length === 0) {
return {
success: false,
error: 'No products returned from GraphQL',
productsProcessed: 0,
};
}
await ctx.heartbeat();
// ============================================================
// STEP 5: Save raw payload to filesystem
// ============================================================
updateStep('saving', `Saving ${result.products.length} products`);
const rawPayload = {
dispensaryId,
platformId,
cName,
fetchedAt: new Date().toISOString(),
productCount: result.products.length,
products: result.products,
};
const payloadResult = await saveRawPayload(
pool,
dispensaryId,
rawPayload,
null, // crawl_run_id - not using crawl_runs in new system
result.products.length
);
console.log(`[ProductDiscoveryHTTP] Saved payload #${payloadResult.id} (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`);
// ============================================================
// STEP 6: Update dispensary last_fetch_at
// ============================================================
await pool.query(`
UPDATE dispensaries
SET last_fetch_at = NOW()
WHERE id = $1
`, [dispensaryId]);
// ============================================================
// STEP 7: Queue product_refresh task to process the payload
// ============================================================
await taskService.createTask({
role: 'product_refresh',
dispensary_id: dispensaryId,
priority: task.priority || 0,
payload: { payload_id: payloadResult.id },
});
console.log(`[ProductDiscoveryHTTP] Queued product_refresh task for payload #${payloadResult.id}`);
return {
success: true,
payloadId: payloadResult.id,
productCount: result.products.length,
sizeBytes: payloadResult.sizeBytes,
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[ProductDiscoveryHTTP] Error for dispensary ${dispensaryId}:`, errorMessage);
return {
success: false,
error: errorMessage,
};
} finally {
if (browser) {
await browser.close().catch(() => {});
}
}
}

View File

@@ -27,11 +27,12 @@ import {
downloadProductImages,
} from '../../hydration/canonical-upsert';
import { loadRawPayloadById, getLatestPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
const normalizer = new DutchieNormalizer();
export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult> {
const { pool, task } = ctx;
const { pool, task, updateStep } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
@@ -42,6 +43,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
updateStep('loading', 'Loading dispensary info');
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
@@ -67,6 +69,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// STEP 2: Load payload from filesystem
// Per TASK_WORKFLOW_2024-12-10.md: Read local payload, not API
// ============================================================
updateStep('loading', 'Loading payload from storage');
let payloadData: any;
let payloadId: number;
@@ -86,7 +89,37 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// Load latest payload for this dispensary
const result = await getLatestPayload(pool, dispensaryId);
if (!result) {
return { success: false, error: `No payload found for dispensary ${dispensaryId}` };
// No payload exists - queue upstream task to fetch products
console.log(`[ProductRefresh] No payload found for dispensary ${dispensaryId} - queuing upstream task`);
if (dispensary.platform_dispensary_id) {
// Has platform ID - can go straight to product_discovery
console.log(`[ProductRefresh] Dispensary has platform_dispensary_id - queuing product_discovery (http)`);
await taskService.createTask({
role: 'product_discovery',
dispensary_id: dispensaryId,
priority: task.priority || 0,
method: 'http', // Use browser-based handler for session proxies
});
return {
success: true,
queued: 'product_discovery',
reason: 'No payload exists - queued product_discovery to fetch initial data',
};
} else {
// No platform ID - need entry_point_discovery first
console.log(`[ProductRefresh] Dispensary missing platform_dispensary_id - queuing entry_point_discovery`);
await taskService.createTask({
role: 'entry_point_discovery',
dispensary_id: dispensaryId,
priority: task.priority || 0,
});
return {
success: true,
queued: 'entry_point_discovery',
reason: 'No payload and no platform_dispensary_id - queued entry_point_discovery to resolve ID',
};
}
}
payloadData = result.payload;
payloadId = result.metadata.id;
@@ -111,6 +144,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// ============================================================
// STEP 3: Normalize data
// ============================================================
updateStep('normalizing', `Normalizing ${allProducts.length} products`);
console.log(`[ProductRefresh] Normalizing ${allProducts.length} products...`);
// Build RawPayload for the normalizer
@@ -154,6 +188,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// ============================================================
// STEP 4: Upsert to canonical tables
// ============================================================
updateStep('upserting', `Saving ${normalizationResult.products.length} products to DB`);
console.log(`[ProductRefresh] Upserting to store_products...`);
const upsertResult = await upsertStoreProducts(

View File

@@ -0,0 +1,484 @@
/**
* Store Discovery HTTP Handler (Browser-based)
*
* Uses Puppeteer + StealthPlugin to discover stores via browser context.
* Based on product-discovery-http.ts pattern.
*
* This handler:
* 1. Launches headless browser with proxy (if provided)
* 2. Establishes session by visiting Dutchie dispensaries page
* 3. Fetches cities for each state via getAllCitiesByState GraphQL
* 4. Fetches stores for each city via ConsumerDispensaries GraphQL
* 5. Upserts to dutchie_discovery_locations
* 6. Auto-promotes valid locations to dispensaries table
*
* Why browser-based:
* - Works with session-based residential proxies (Evomi)
* - Lower detection risk than curl/axios
* - Real Chrome TLS fingerprint
*/
import { TaskContext, TaskResult } from '../task-worker';
import { upsertLocation } from '../../discovery/location-discovery';
import { promoteDiscoveredLocations } from '../../discovery/promotion';
import { saveDiscoveryPayload } from '../../utils/payload-storage';
// GraphQL hashes - MUST match CLAUDE.md / dutchie/client.ts
const GET_ALL_CITIES_HASH = 'ae547a0466ace5a48f91e55bf6699eacd87e3a42841560f0c0eabed5a0a920e6';
const CONSUMER_DISPENSARIES_HASH = '0a5bfa6ca1d64ae47bcccb7c8077c87147cbc4e6982c17ceec97a2a4948b311b';
interface StateWithCities {
name: string;
country: string;
cities: string[];
}
interface DiscoveredLocation {
id: string;
name: string;
slug: string;
cName?: string;
address?: string;
city?: string;
state?: string;
zip?: string;
latitude?: number;
longitude?: number;
offerPickup?: boolean;
offerDelivery?: boolean;
isRecreational?: boolean;
isMedical?: boolean;
phone?: string;
email?: string;
website?: string;
description?: string;
logoImage?: string;
bannerImage?: string;
chainSlug?: string;
enterpriseId?: string;
retailType?: string;
status?: string;
timezone?: string;
location?: {
ln1?: string;
ln2?: string;
city?: string;
state?: string;
zipcode?: string;
country?: string;
geometry?: { coordinates?: [number, number] };
};
}
export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator, updateStep } = ctx;
const platform = task.platform || 'dutchie';
let browser: any = null;
try {
updateStep('starting', 'Initializing store discovery');
console.log(`[StoreDiscoveryHTTP] Starting discovery for platform: ${platform}`);
// ============================================================
// STEP 1: Setup Puppeteer with proxy
// ============================================================
updateStep('preflight', 'Launching browser');
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
// Get proxy from CrawlRotator if available
let proxyUrl: string | null = null;
if (crawlRotator) {
const currentProxy = crawlRotator.proxy.getCurrent();
if (currentProxy) {
proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy);
console.log(`[StoreDiscoveryHTTP] Using proxy: ${currentProxy.host}:${currentProxy.port}`);
}
}
// Build browser args
const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox'];
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`);
}
browser = await puppeteer.launch({
headless: 'new',
args: browserArgs,
});
const page = await browser.newPage();
// Setup proxy auth if needed
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
if (proxyUrlParsed.username && proxyUrlParsed.password) {
await page.authenticate({
username: decodeURIComponent(proxyUrlParsed.username),
password: decodeURIComponent(proxyUrlParsed.password),
});
}
}
await ctx.heartbeat();
// ============================================================
// STEP 2: Establish session by visiting dispensaries page
// ============================================================
updateStep('navigating', 'Loading session page');
const sessionUrl = 'https://dutchie.com/dispensaries';
console.log(`[StoreDiscoveryHTTP] Establishing session at ${sessionUrl}...`);
await page.goto(sessionUrl, {
waitUntil: 'networkidle2',
timeout: 60000,
});
// Handle potential age gate
try {
await page.waitForTimeout(1500);
await page.evaluate(() => {
const buttons = Array.from(document.querySelectorAll('button'));
for (const btn of buttons) {
const text = btn.textContent?.toLowerCase() || '';
if (text.includes('yes') || text.includes('enter') || text.includes('21')) {
(btn as HTMLButtonElement).click();
return true;
}
}
return false;
});
} catch {
// Age gate might not be present
}
console.log(`[StoreDiscoveryHTTP] Session established`);
await ctx.heartbeat();
// ============================================================
// STEP 3: Get states to discover from database
// ============================================================
const statesResult = await pool.query(`
SELECT code FROM states WHERE is_active = true ORDER BY code
`);
const stateCodesToDiscover = statesResult.rows.map((r: { code: string }) => r.code);
if (stateCodesToDiscover.length === 0) {
await browser.close();
return { success: true, storesDiscovered: 0, newStoreIds: [], message: 'No active states to discover' };
}
console.log(`[StoreDiscoveryHTTP] Will discover stores in ${stateCodesToDiscover.length} states`);
// ============================================================
// STEP 4: Fetch cities for each state via GraphQL
// ============================================================
updateStep('fetching', `Fetching cities for ${stateCodesToDiscover.length} states`);
const statesWithCities = await page.evaluate(async (hash: string) => {
const logs: string[] = [];
try {
const extensions = {
persistedQuery: { version: 1, sha256Hash: hash },
};
const qs = new URLSearchParams({
operationName: 'getAllCitiesByState',
variables: JSON.stringify({}),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
},
credentials: 'include',
});
logs.push(`getAllCitiesByState: HTTP ${response.status}`);
if (!response.ok) {
return { states: [], logs };
}
const json = await response.json();
const statesData = json?.data?.statesWithDispensaries || [];
const states: StateWithCities[] = [];
for (const state of statesData) {
if (state && state.name) {
const cities = Array.isArray(state.cities)
? state.cities.filter((c: string | null) => c !== null)
: [];
states.push({
name: state.name,
country: state.country || 'US',
cities,
});
}
}
logs.push(`Found ${states.length} states with cities`);
return { states, logs };
} catch (err: any) {
logs.push(`Error: ${err.message}`);
return { states: [], logs };
}
}, GET_ALL_CITIES_HASH);
statesWithCities.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
if (statesWithCities.states.length === 0) {
await browser.close();
return { success: false, error: 'Failed to fetch states with cities' };
}
await ctx.heartbeat();
// ============================================================
// STEP 5: For each active state, fetch stores for each city
// ============================================================
let totalDiscovered = 0;
let totalUpserted = 0;
const allNewStoreIds: number[] = [];
for (const stateCode of stateCodesToDiscover) {
const stateData = statesWithCities.states.find(
(s: StateWithCities) => s.name.toUpperCase() === stateCode.toUpperCase()
);
if (!stateData || stateData.cities.length === 0) {
console.log(`[StoreDiscoveryHTTP] No cities found for ${stateCode}, skipping`);
continue;
}
console.log(`[StoreDiscoveryHTTP] Discovering ${stateData.cities.length} cities in ${stateCode}...`);
await ctx.heartbeat();
// Accumulate raw store data for this state
const stateRawStores: any[] = [];
const stateCityData: { city: string; stores: any[] }[] = [];
// Fetch stores for each city in this state
for (const city of stateData.cities) {
try {
const cityResult = await page.evaluate(async (
cityName: string,
stateCodeParam: string,
hash: string
) => {
const logs: string[] = [];
const allDispensaries: any[] = [];
let page = 0;
const perPage = 200;
try {
while (page < 5) { // Max 5 pages per city
const variables = {
dispensaryFilter: {
activeOnly: true,
city: cityName,
state: stateCodeParam,
},
page,
perPage,
};
const extensions = {
persistedQuery: { version: 1, sha256Hash: hash },
};
const qs = new URLSearchParams({
operationName: 'ConsumerDispensaries',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
},
credentials: 'include',
});
if (!response.ok) {
logs.push(`${cityName}: HTTP ${response.status}`);
break;
}
const json = await response.json();
const dispensaries = json?.data?.filteredDispensaries || [];
if (dispensaries.length === 0) {
break;
}
// Filter to ensure correct state
const stateFiltered = dispensaries.filter((d: any) =>
d.location?.state?.toUpperCase() === stateCodeParam.toUpperCase()
);
allDispensaries.push(...stateFiltered);
if (dispensaries.length < perPage) {
break;
}
page++;
// Small delay between pages
await new Promise(r => setTimeout(r, 100));
}
logs.push(`${cityName}: ${allDispensaries.length} stores`);
} catch (err: any) {
logs.push(`${cityName}: Error - ${err.message}`);
}
return { dispensaries: allDispensaries, logs };
}, city, stateCode, CONSUMER_DISPENSARIES_HASH);
cityResult.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
// Accumulate raw store data
stateRawStores.push(...cityResult.dispensaries);
stateCityData.push({ city, stores: cityResult.dispensaries });
// Upsert each discovered location
for (const disp of cityResult.dispensaries) {
try {
const location = normalizeLocation(disp);
if (!location.id) {
continue; // Skip locations without platform ID
}
const result = await upsertLocation(pool, location as any, null);
if (result) {
totalUpserted++;
if (result.isNew) {
totalDiscovered++;
}
}
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Upsert error for ${disp.name}:`, err.message);
}
}
// Small delay between cities to avoid rate limiting
await new Promise(r => setTimeout(r, 300));
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Error fetching ${city}, ${stateCode}:`, err.message);
}
}
// Heartbeat after each state
await ctx.heartbeat();
// ============================================================
// STEP 5b: Save raw store payload for this state
// ============================================================
if (stateRawStores.length > 0) {
try {
const rawPayload = {
stateCode,
platform,
fetchedAt: new Date().toISOString(),
storeCount: stateRawStores.length,
citiesProcessed: stateCityData.length,
cities: stateCityData,
stores: stateRawStores,
};
const payloadResult = await saveDiscoveryPayload(pool, stateCode, rawPayload, stateRawStores.length);
console.log(`[StoreDiscoveryHTTP] Saved raw payload for ${stateCode}: ${stateRawStores.length} stores (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`);
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Failed to save payload for ${stateCode}:`, err.message);
}
}
// Auto-promote valid locations for this state
try {
const promotionResult = await promoteDiscoveredLocations(stateCode);
const promoted = promotionResult.created + promotionResult.updated;
if (promoted > 0) {
console.log(`[StoreDiscoveryHTTP] Promoted ${promoted} locations in ${stateCode} (${promotionResult.created} new, ${promotionResult.updated} updated)`);
// newDispensaryIds is returned but not in typed interface
const newIds = (promotionResult as any).newDispensaryIds || [];
allNewStoreIds.push(...newIds);
}
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Promotion error for ${stateCode}:`, err.message);
}
}
await browser.close();
browser = null;
console.log(`[StoreDiscoveryHTTP] Complete: ${totalDiscovered} new, ${totalUpserted} upserted, ${allNewStoreIds.length} promoted`);
return {
success: true,
storesDiscovered: totalDiscovered,
storesUpserted: totalUpserted,
statesProcessed: stateCodesToDiscover.length,
newStoreIds: allNewStoreIds,
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[StoreDiscoveryHTTP] Error:`, errorMessage);
return {
success: false,
error: errorMessage,
newStoreIds: [],
};
} finally {
if (browser) {
await browser.close().catch(() => {});
}
}
}
/**
* Normalize a raw dispensary response to our DiscoveredLocation format
*/
function normalizeLocation(raw: any): DiscoveredLocation {
const loc = raw.location || {};
const coords = loc.geometry?.coordinates || [];
return {
id: raw.id || raw._id || '',
name: raw.name || '',
slug: raw.slug || raw.cName || '',
cName: raw.cName || raw.slug || '',
address: raw.address || loc.ln1 || '',
city: raw.city || loc.city || '',
state: raw.state || loc.state || '',
zip: raw.zip || loc.zipcode || loc.zip || '',
latitude: coords[1] || raw.latitude,
longitude: coords[0] || raw.longitude,
timezone: raw.timezone || '',
offerPickup: raw.offerPickup ?? raw.storeSettings?.offerPickup ?? true,
offerDelivery: raw.offerDelivery ?? raw.storeSettings?.offerDelivery ?? false,
isRecreational: raw.isRecreational ?? raw.recDispensary ?? true,
isMedical: raw.isMedical ?? raw.medicalDispensary ?? true,
phone: raw.phone || '',
email: raw.email || '',
website: raw.embedBackUrl || '',
description: raw.description || '',
logoImage: raw.logoImage || '',
bannerImage: raw.bannerImage || '',
chainSlug: raw.chain || '',
enterpriseId: raw.retailer?.enterpriseId || '',
retailType: raw.retailType || '',
status: raw.status || '',
location: loc,
};
}

View File

@@ -17,7 +17,8 @@ export {
export { TaskWorker, TaskContext, TaskResult } from './task-worker';
export {
handleProductDiscovery,
handleProductDiscoveryCurl,
handleProductDiscoveryHttp,
handleProductRefresh,
handleStoreDiscovery,
handleEntryPointDiscovery,

View File

@@ -6,12 +6,15 @@
* task-service.ts and routes/tasks.ts.
*
* State is in-memory and resets on server restart.
* By default, the pool is PAUSED (closed) - admin must explicitly start it.
* This prevents workers from immediately grabbing tasks on deploy before
* the system is ready.
* By default, the pool is OPEN - workers start claiming tasks immediately.
* Admin can pause via API endpoint if needed.
*
* Note: Each process (backend, worker) has its own copy of this state.
* The /pool/pause and /pool/resume endpoints only affect the backend process.
* Workers always start with pool open.
*/
let taskPoolPaused = true;
let taskPoolPaused = false;
export function isTaskPoolPaused(): boolean {
return taskPoolPaused;

View File

@@ -73,6 +73,7 @@ export interface CreateTaskParams {
dispensary_id?: number;
platform?: string;
priority?: number;
method?: 'curl' | 'http'; // Transport method: curl=axios/proxy, http=Puppeteer/browser
scheduled_for?: Date;
payload?: Record<string, unknown>; // Per TASK_WORKFLOW_2024-12-10.md: For task chaining data
}
@@ -106,14 +107,15 @@ class TaskService {
*/
async createTask(params: CreateTaskParams): Promise<WorkerTask> {
const result = await pool.query(
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for, payload)
VALUES ($1, $2, $3, $4, $5, $6)
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *`,
[
params.role,
params.dispensary_id ?? null,
params.platform ?? null,
params.priority ?? 0,
params.method ?? null, // null = any worker can pick up, 'http' = http-capable workers only, 'curl' = curl workers only
params.scheduled_for ?? null,
params.payload ? JSON.stringify(params.payload) : null,
]
@@ -128,8 +130,8 @@ class TaskService {
if (tasks.length === 0) return 0;
const values = tasks.map((t, i) => {
const base = i * 5;
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5})`;
const base = i * 6;
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6})`;
});
const params = tasks.flatMap((t) => [
@@ -137,11 +139,12 @@ class TaskService {
t.dispensary_id ?? null,
t.platform ?? null,
t.priority ?? 0,
t.method ?? null,
t.scheduled_for ?? null,
]);
const result = await pool.query(
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for)
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for)
VALUES ${values.join(', ')}
ON CONFLICT DO NOTHING`,
params
@@ -268,52 +271,69 @@ class TaskService {
}
/**
* Mark a task as failed, with auto-retry if under max_retries
* Returns true if task was re-queued for retry, false if permanently failed
* Determine if an error is a "soft failure" (transient) that should be requeued
* Soft failures: timeouts, connection issues, browser launch issues
* Hard failures: business logic errors like "No products returned"
*/
async failTask(taskId: number, errorMessage: string): Promise<boolean> {
// Get current retry state
const result = await pool.query(
`SELECT retry_count, max_retries FROM worker_tasks WHERE id = $1`,
[taskId]
);
private isSoftFailure(errorMessage: string): boolean {
const softFailurePatterns = [
/timeout/i,
/timed out/i,
/connection.*terminated/i,
/connection.*refused/i,
/ECONNRESET/i,
/ECONNREFUSED/i,
/ETIMEDOUT/i,
/socket hang up/i,
/WS endpoint/i,
/browser process/i,
/Failed to launch/i,
/Navigation.*exceeded/i,
/net::ERR_/i,
/ENOENT.*storage/i, // Storage path issues (transient)
/ENOENT.*payload/i, // Payload path issues (transient)
];
if (result.rows.length === 0) {
return false;
return softFailurePatterns.some(pattern => pattern.test(errorMessage));
}
const { retry_count, max_retries } = result.rows[0];
const newRetryCount = (retry_count || 0) + 1;
/**
* Mark a task as failed
*
* Soft failures (timeouts, connection issues): Requeue back to pending for later pickup
* Hard failures (business logic errors): Mark as failed permanently
*/
async failTask(taskId: number, errorMessage: string): Promise<boolean> {
const isSoft = this.isSoftFailure(errorMessage);
if (newRetryCount < (max_retries || 3)) {
// Re-queue for retry - reset to pending with incremented retry_count
if (isSoft) {
// Soft failure: put back in queue immediately for another worker
await pool.query(
`UPDATE worker_tasks
SET status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
retry_count = $2,
error_message = $3,
error_message = $2,
scheduled_for = NULL,
updated_at = NOW()
WHERE id = $1`,
[taskId, newRetryCount, `Retry ${newRetryCount}: ${errorMessage}`]
[taskId, `Requeued: ${errorMessage}`]
);
console.log(`[TaskService] Task ${taskId} queued for retry ${newRetryCount}/${max_retries || 3}`);
console.log(`[TaskService] Task ${taskId} requeued for another worker`);
return true;
}
// Max retries exceeded - mark as permanently failed
// Hard failure: mark as permanently failed
await pool.query(
`UPDATE worker_tasks
SET status = 'failed',
completed_at = NOW(),
retry_count = $2,
error_message = $3
error_message = $2
WHERE id = $1`,
[taskId, newRetryCount, `Failed after ${newRetryCount} attempts: ${errorMessage}`]
[taskId, `Hard failure: ${errorMessage}`]
);
console.log(`[TaskService] Task ${taskId} permanently failed after ${newRetryCount} attempts`);
console.log(`[TaskService] Task ${taskId} hard failed: ${errorMessage}`);
return false;
}
@@ -473,15 +493,17 @@ class TaskService {
case 'store_discovery': {
// Per TASK_WORKFLOW_2024-12-10.md: New stores discovered -> create product_discovery tasks
// Skip entry_point_discovery since platform_dispensary_id is set during promotion
// All product_discovery tasks use HTTP transport (Puppeteer/browser)
const newStoreIds = (completedTask.result as { newStoreIds?: number[] })?.newStoreIds;
if (newStoreIds && newStoreIds.length > 0) {
console.log(`[TaskService] Chaining ${newStoreIds.length} product_discovery tasks for new stores`);
console.log(`[TaskService] Chaining ${newStoreIds.length} product_discovery tasks for new stores (HTTP transport)`);
for (const storeId of newStoreIds) {
await this.createTask({
role: 'product_discovery',
dispensary_id: storeId,
platform: completedTask.platform ?? undefined,
priority: 10, // High priority for new stores
method: 'http', // Force HTTP transport for browser-based scraping
});
}
}
@@ -498,6 +520,7 @@ class TaskService {
dispensary_id: completedTask.dispensary_id,
platform: completedTask.platform ?? undefined,
priority: 10,
method: 'http', // Force HTTP transport
});
}
break;
@@ -522,6 +545,7 @@ class TaskService {
/**
* Create store discovery task for a platform/state
* Uses HTTP transport (Puppeteer/browser) by default
*/
async createStoreDiscoveryTask(
platform: string,
@@ -532,11 +556,13 @@ class TaskService {
role: 'store_discovery',
platform,
priority,
method: 'http', // Force HTTP transport
});
}
/**
* Create entry point discovery task for a specific store
* @deprecated Entry point resolution now happens during store promotion
*/
async createEntryPointTask(
dispensaryId: number,
@@ -548,11 +574,13 @@ class TaskService {
dispensary_id: dispensaryId,
platform,
priority,
method: 'http', // Force HTTP transport
});
}
/**
* Create product discovery task for a specific store
* Uses HTTP transport (Puppeteer/browser) by default
*/
async createProductDiscoveryTask(
dispensaryId: number,
@@ -564,6 +592,7 @@ class TaskService {
dispensary_id: dispensaryId,
platform,
priority,
method: 'http', // Force HTTP transport
});
}
@@ -641,6 +670,248 @@ class TaskService {
return (result.rows[0] as { completed_at: Date | null })?.completed_at ?? null;
}
/**
* Create multiple tasks with staggered start times.
*
* STAGGERED TASK WORKFLOW:
* =======================
* This prevents resource contention and proxy assignment lag when creating
* many tasks at once. Each task gets a scheduled_for timestamp offset from
* the previous task.
*
* Workflow:
* 1. Task is created with scheduled_for = NOW() + (index * staggerSeconds)
* 2. Worker claims task only when scheduled_for <= NOW()
* 3. Worker runs preflight check on EVERY task claim
* 4. If preflight passes, worker executes task
* 5. If preflight fails, task is released back to pending for another worker
* 6. Worker finishes task, polls for next available task
* 7. Repeat - preflight runs again on next task claim
*
* Benefits:
* - Prevents all 8 workers from hitting proxies simultaneously
* - Reduces API rate limiting / 403 errors
* - Spreads resource usage over time
* - Each task still runs preflight, ensuring proxy health
*
* @param dispensaryIds - Array of dispensary IDs to create tasks for
* @param role - Task role (e.g., 'product_refresh', 'product_discovery')
* @param staggerSeconds - Seconds between each task's scheduled_for time (default: 15)
* @param platform - Platform identifier (default: 'dutchie')
* @param method - Transport method: 'curl' or 'http' (default: null for any)
* @returns Number of tasks created
*/
async createStaggeredTasks(
dispensaryIds: number[],
role: TaskRole,
staggerSeconds: number = 15,
platform: string = 'dutchie',
method: 'curl' | 'http' | null = null
): Promise<{ created: number; taskIds: number[] }> {
if (dispensaryIds.length === 0) {
return { created: 0, taskIds: [] };
}
// Use a single INSERT with generate_series for efficiency
const result = await pool.query(`
WITH task_data AS (
SELECT
unnest($1::int[]) as dispensary_id,
generate_series(0, array_length($1::int[], 1) - 1) as idx
)
INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status)
SELECT
$2::varchar as role,
td.dispensary_id,
$3::varchar as platform,
$4::varchar as method,
NOW() + (td.idx * $5::int * INTERVAL '1 second') as scheduled_for,
'pending' as status
FROM task_data td
ON CONFLICT DO NOTHING
RETURNING id
`, [dispensaryIds, role, platform, method, staggerSeconds]);
const taskIds = result.rows.map((r: { id: number }) => r.id);
console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks (${staggerSeconds}s apart)`);
return { created: taskIds.length, taskIds };
}
/**
* Create a batch of AZ store tasks with automatic distribution.
*
* This is a convenience method for creating tasks for Arizona stores with:
* - Automatic staggering to prevent resource contention
* - Even distribution across both refresh and discovery roles
*
* @param totalTasks - Total number of tasks to create
* @param staggerSeconds - Seconds between each task's start time
* @param splitRoles - If true, split between product_refresh and product_discovery
* @returns Summary of created tasks
*/
async createAZStoreTasks(
totalTasks: number = 24,
staggerSeconds: number = 15,
splitRoles: boolean = true
): Promise<{
total: number;
product_refresh: number;
product_discovery: number;
taskIds: number[];
}> {
// Get AZ stores with platform_id and menu_url
const storesResult = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE s.code = 'AZ'
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND d.menu_url IS NOT NULL
ORDER BY d.id
`);
const storeIds = storesResult.rows.map((r: { id: number }) => r.id);
if (storeIds.length === 0) {
console.log('[TaskService] No AZ stores found with platform_id and menu_url');
return { total: 0, product_refresh: 0, product_discovery: 0, taskIds: [] };
}
// Limit tasks to available stores
const maxTasks = Math.min(totalTasks, storeIds.length * 2); // 2x for both roles
const allTaskIds: number[] = [];
if (splitRoles) {
// Split between refresh and discovery
const tasksPerRole = Math.floor(maxTasks / 2);
const refreshStores = storeIds.slice(0, tasksPerRole);
const discoveryStores = storeIds.slice(0, tasksPerRole);
// Create refresh tasks first
const refreshResult = await this.createStaggeredTasks(
refreshStores,
'product_refresh',
staggerSeconds,
'dutchie'
);
allTaskIds.push(...refreshResult.taskIds);
// Create discovery tasks starting after refresh tasks are scheduled
const discoveryStartOffset = tasksPerRole * staggerSeconds;
const discoveryResult = await pool.query(`
WITH task_data AS (
SELECT
unnest($1::int[]) as dispensary_id,
generate_series(0, array_length($1::int[], 1) - 1) as idx
)
INSERT INTO worker_tasks (role, dispensary_id, platform, scheduled_for, status)
SELECT
'product_discovery'::varchar as role,
td.dispensary_id,
'dutchie'::varchar as platform,
NOW() + ($2::int * INTERVAL '1 second') + (td.idx * $3::int * INTERVAL '1 second') as scheduled_for,
'pending' as status
FROM task_data td
ON CONFLICT DO NOTHING
RETURNING id
`, [discoveryStores, discoveryStartOffset, staggerSeconds]);
allTaskIds.push(...discoveryResult.rows.map((r: { id: number }) => r.id));
return {
total: allTaskIds.length,
product_refresh: refreshResult.taskIds.length,
product_discovery: discoveryResult.rowCount ?? 0,
taskIds: allTaskIds
};
}
// Single role mode - all product_discovery
const result = await this.createStaggeredTasks(
storeIds.slice(0, totalTasks),
'product_discovery',
staggerSeconds,
'dutchie'
);
return {
total: result.taskIds.length,
product_refresh: 0,
product_discovery: result.taskIds.length,
taskIds: result.taskIds
};
}
/**
* Cleanup stale tasks that are stuck in 'claimed' or 'running' status.
*
* This handles the case where workers crash/restart and leave tasks in-flight.
* These stale tasks block the queue because the claim query excludes dispensary_ids
* that have active tasks.
*
* Called automatically on worker startup and can be called periodically.
*
* @param staleMinutes - Tasks older than this (based on last_heartbeat_at or claimed_at) are reset
* @returns Object with cleanup stats
*/
async cleanupStaleTasks(staleMinutes: number = 30): Promise<{
cleaned: number;
byStatus: { claimed: number; running: number };
byRole: Record<string, number>;
}> {
// First, get stats on what we're about to clean
const statsResult = await pool.query(`
SELECT status, role, COUNT(*)::int as count
FROM worker_tasks
WHERE status IN ('claimed', 'running')
AND COALESCE(last_heartbeat_at, claimed_at, created_at) < NOW() - INTERVAL '1 minute' * $1
GROUP BY status, role
`, [staleMinutes]);
const byStatus = { claimed: 0, running: 0 };
const byRole: Record<string, number> = {};
for (const row of statsResult.rows) {
const { status, role, count } = row as { status: string; role: string; count: number };
if (status === 'claimed') byStatus.claimed += count;
if (status === 'running') byStatus.running += count;
byRole[role] = (byRole[role] || 0) + count;
}
const totalStale = byStatus.claimed + byStatus.running;
if (totalStale === 0) {
return { cleaned: 0, byStatus, byRole };
}
// Reset stale tasks to pending
const result = await pool.query(`
UPDATE worker_tasks
SET
status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
last_heartbeat_at = NULL,
error_message = CONCAT(COALESCE(error_message, ''), ' [Auto-reset: stale after ', $1, ' min]'),
updated_at = NOW()
WHERE status IN ('claimed', 'running')
AND COALESCE(last_heartbeat_at, claimed_at, created_at) < NOW() - INTERVAL '1 minute' * $1
`, [staleMinutes]);
const cleaned = result.rowCount ?? 0;
if (cleaned > 0) {
console.log(`[TaskService] Cleaned up ${cleaned} stale tasks (claimed: ${byStatus.claimed}, running: ${byStatus.running})`);
console.log(`[TaskService] Stale tasks by role: ${Object.entries(byRole).map(([r, c]) => `${r}:${c}`).join(', ')}`);
}
return { cleaned, byStatus, byRole };
}
/**
* Calculate workers needed to complete tasks within SLA
*/

View File

@@ -11,10 +11,17 @@
* - Workers report heartbeats to worker_registry
* - Workers are ROLE-AGNOSTIC by default (can handle any task type)
*
* Stealth & Anti-Detection:
* PROXIES ARE REQUIRED - workers will fail to start if no proxies available.
* Stealth & Anti-Detection (LAZY INITIALIZATION):
* Workers start IMMEDIATELY without waiting for proxies.
* Stealth systems (proxies, fingerprints, preflights) are initialized
* on first task claim, not at worker startup.
*
* On startup, workers initialize the CrawlRotator which provides:
* This allows workers to:
* - Register and send heartbeats immediately
* - Wait in main loop without blocking on proxy availability
* - Initialize proxies/preflights only when tasks are actually available
*
* On first task claim attempt, workers initialize the CrawlRotator which provides:
* - Proxy rotation: Loads proxies from `proxies` table, ALL requests use proxy
* - User-Agent rotation: Cycles through realistic browser fingerprints
* - Fingerprint rotation: Changes browser profile on blocks
@@ -34,11 +41,16 @@
*
* Environment:
* WORKER_ROLE - Which task role to process (optional, null = any task)
* WORKER_ID - Optional custom worker ID (auto-generated if not provided)
* POD_NAME - Kubernetes pod name (optional)
* POD_NAME - K8s StatefulSet pod name (PRIMARY - use this for persistent identity)
* WORKER_ID - Custom worker ID (fallback if POD_NAME not set)
* POLL_INTERVAL_MS - How often to check for tasks (default: 5000)
* HEARTBEAT_INTERVAL_MS - How often to update heartbeat (default: 30000)
* API_BASE_URL - Backend API URL for registration (default: http://localhost:3010)
*
* Worker Identity:
* Workers use POD_NAME as their worker_id for persistent identity across restarts.
* In K8s StatefulSet, POD_NAME = "scraper-worker-0" through "scraper-worker-7".
* This ensures workers re-register with the same ID instead of creating new entries.
*/
import { Pool } from 'pg';
@@ -57,10 +69,13 @@ import { runPuppeteerPreflightWithRetry, PuppeteerPreflightResult } from '../ser
// Task handlers by role
// Per TASK_WORKFLOW_2024-12-10.md: payload_fetch and product_refresh are now separate
import { handlePayloadFetch } from './handlers/payload-fetch';
// Dual-transport: curl vs http (browser-based) handlers
import { handlePayloadFetch } from './handlers/payload-fetch-curl';
import { handleProductRefresh } from './handlers/product-refresh';
import { handleProductDiscovery } from './handlers/product-discovery';
import { handleProductDiscovery } from './handlers/product-discovery-curl';
import { handleProductDiscoveryHttp } from './handlers/product-discovery-http';
import { handleStoreDiscovery } from './handlers/store-discovery';
import { handleStoreDiscoveryHttp } from './handlers/store-discovery-http';
import { handleEntryPointDiscovery } from './handlers/entry-point-discovery';
import { handleAnalyticsRefresh } from './handlers/analytics-refresh';
import { handleWhoami } from './handlers/whoami';
@@ -82,7 +97,11 @@ const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010';
// =============================================================================
// Maximum number of tasks this worker will run concurrently
// Tune based on workload: I/O-bound tasks benefit from higher concurrency
// Browser tasks (Puppeteer) use ~400MB RAM each. With 2GB pod limit:
// - 3 browsers = ~1.3GB = SAFE
// - 4 browsers = ~1.7GB = RISKY
// - 5+ browsers = OOM CRASH
// See: docs/WORKER_TASK_ARCHITECTURE.md#browser-task-memory-limits
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
// When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
@@ -116,6 +135,8 @@ export interface TaskContext {
task: WorkerTask;
heartbeat: () => Promise<void>;
crawlRotator?: CrawlRotator;
/** Update the current step being executed (shown in dashboard) */
updateStep: (step: string, detail?: string) => void;
}
export interface TaskResult {
@@ -132,17 +153,47 @@ type TaskHandler = (ctx: TaskContext) => Promise<TaskResult>;
// Per TASK_WORKFLOW_2024-12-10.md: Handler registry
// payload_fetch: Fetches from Dutchie API, saves to disk
// product_refresh: Reads local payload, normalizes, upserts to DB
// product_discovery: Main handler for product crawling
// product_discovery: Main handler for product crawling (has curl and http variants)
const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
payload_fetch: handlePayloadFetch, // API fetch -> disk
payload_fetch: handlePayloadFetch, // API fetch -> disk (curl)
product_refresh: handleProductRefresh, // disk -> DB
product_discovery: handleProductDiscovery,
product_discovery: handleProductDiscovery, // Default: curl (see getHandlerForTask for http override)
store_discovery: handleStoreDiscovery,
entry_point_discovery: handleEntryPointDiscovery,
analytics_refresh: handleAnalyticsRefresh,
whoami: handleWhoami, // Tests proxy + anti-detect
};
/**
* Get the appropriate handler for a task, considering both role and method.
*
* Dual-transport handlers:
* - product_discovery: curl (axios) or http (Puppeteer)
* - store_discovery: curl (axios) or http (Puppeteer)
*
* Default method is 'http' since all GraphQL queries should use browser transport
* for better TLS fingerprinting and session-based proxy compatibility.
*/
function getHandlerForTask(task: WorkerTask): TaskHandler | undefined {
const role = task.role as TaskRole;
const method = task.method || 'http'; // Default to HTTP for all GraphQL tasks
// product_discovery: dual-transport support
if (role === 'product_discovery' && method === 'http') {
console.log(`[TaskWorker] Using HTTP handler for product_discovery (method=${method})`);
return handleProductDiscoveryHttp;
}
// store_discovery: dual-transport support
if (role === 'store_discovery' && method === 'http') {
console.log(`[TaskWorker] Using HTTP handler for store_discovery (method=${method})`);
return handleStoreDiscoveryHttp;
}
// Default: use the static handler registry (curl-based)
return TASK_HANDLERS[role];
}
/**
* Resource usage stats reported to the registry and used for backoff decisions.
* These values are included in worker heartbeats and displayed in the UI.
@@ -209,6 +260,28 @@ export class TaskWorker {
private preflightCurlResult: CurlPreflightResult | null = null;
private preflightHttpResult: PuppeteerPreflightResult | null = null;
// ==========================================================================
// LAZY INITIALIZATION FLAGS
// ==========================================================================
// Stealth/proxy initialization is deferred until first task claim.
// Workers register immediately and enter main loop without blocking.
// ==========================================================================
private stealthInitialized: boolean = false;
private preflightsCompleted: boolean = false;
private initializingPromise: Promise<void> | null = null;
// ==========================================================================
// STEP TRACKING FOR DASHBOARD VISIBILITY
// ==========================================================================
// Workers report their current step in heartbeats so the dashboard can show
// real-time progress like "preflight", "loading page", "processing products"
// ==========================================================================
private currentStep: string = 'idle';
private currentStepDetail: string | null = null;
private currentStepStartedAt: Date | null = null;
/** Map of task ID -> step info for concurrent tasks */
private taskSteps: Map<number, { step: string; detail: string | null; startedAt: Date }> = new Map();
constructor(role: TaskRole | null = null, workerId?: string) {
this.pool = getPool();
this.role = role;
@@ -291,11 +364,70 @@ export class TaskWorker {
return this.activeTasks.size < this.maxConcurrentTasks;
}
// ==========================================================================
// STEP TRACKING METHODS
// ==========================================================================
/**
* Update the current step for a task (for dashboard visibility)
* @param taskId - The task ID to update
* @param step - Short step name (e.g., "preflight", "loading", "processing")
* @param detail - Optional detail (e.g., "Verifying IP 1.2.3.4")
*/
public updateTaskStep(taskId: number, step: string, detail?: string): void {
this.taskSteps.set(taskId, {
step,
detail: detail || null,
startedAt: new Date(),
});
// Also update the "primary" step for single-task backwards compat
if (this.activeTasks.size === 1 || taskId === Array.from(this.activeTasks.keys())[0]) {
this.currentStep = step;
this.currentStepDetail = detail || null;
this.currentStepStartedAt = new Date();
}
console.log(`[TaskWorker] Step: ${step}${detail ? ` - ${detail}` : ''} (task #${taskId})`);
}
/**
* Clear step tracking for a task (when task completes)
*/
private clearTaskStep(taskId: number): void {
this.taskSteps.delete(taskId);
// Reset primary step if no more active tasks
if (this.activeTasks.size === 0) {
this.currentStep = 'idle';
this.currentStepDetail = null;
this.currentStepStartedAt = null;
}
}
/**
* Get current step info for all active tasks (for heartbeat)
*/
private getTaskStepsInfo(): Array<{
task_id: number;
step: string;
detail: string | null;
elapsed_ms: number;
}> {
const now = Date.now();
return Array.from(this.taskSteps.entries()).map(([taskId, info]) => ({
task_id: taskId,
step: info.step,
detail: info.detail,
elapsed_ms: now - info.startedAt.getTime(),
}));
}
/**
* Initialize stealth systems (proxy rotation, fingerprints)
* Called once on worker startup before processing any tasks.
* Called LAZILY on first task claim attempt (NOT at worker startup).
*
* IMPORTANT: Proxies are REQUIRED. Workers will wait until proxies are available.
* IMPORTANT: Proxies are REQUIRED to claim tasks. This method waits until proxies are available.
* Workers listen for PostgreSQL NOTIFY 'proxy_added' to wake up immediately when proxies are added.
*/
private async initializeStealth(): Promise<void> {
@@ -435,35 +567,98 @@ export class TaskWorker {
/**
* Report preflight status to worker_registry
* Function signature: update_worker_preflight(worker_id, transport, status, ip, response_ms, error, fingerprint)
*/
private async reportPreflightStatus(): Promise<void> {
try {
// Update worker_registry directly via SQL (more reliable than API)
// CURL preflight - includes IP address
await this.pool.query(`
SELECT update_worker_preflight($1, 'curl', $2, $3, $4)
SELECT update_worker_preflight($1, 'curl', $2, $3, $4, $5, $6)
`, [
this.workerId,
this.preflightCurlPassed ? 'passed' : 'failed',
this.preflightCurlResult?.proxyIp || null,
this.preflightCurlResult?.responseTimeMs || null,
this.preflightCurlResult?.error || null,
null, // No fingerprint for curl
]);
// HTTP preflight - includes IP, fingerprint, and timezone data
const httpFingerprint = this.preflightHttpResult ? {
...this.preflightHttpResult.fingerprint,
detectedTimezone: (this.preflightHttpResult as any).detectedTimezone,
detectedLocation: (this.preflightHttpResult as any).detectedLocation,
productsReturned: this.preflightHttpResult.productsReturned,
botDetection: (this.preflightHttpResult as any).botDetection,
} : null;
await this.pool.query(`
SELECT update_worker_preflight($1, 'http', $2, $3, $4)
SELECT update_worker_preflight($1, 'http', $2, $3, $4, $5, $6)
`, [
this.workerId,
this.preflightHttpPassed ? 'passed' : 'failed',
this.preflightHttpResult?.proxyIp || null,
this.preflightHttpResult?.responseTimeMs || null,
this.preflightHttpResult?.error || null,
httpFingerprint ? JSON.stringify(httpFingerprint) : null,
]);
console.log(`[TaskWorker] Preflight status reported to worker_registry`);
if (this.preflightHttpResult?.proxyIp) {
console.log(`[TaskWorker] HTTP IP: ${this.preflightHttpResult.proxyIp}, Timezone: ${(this.preflightHttpResult as any).detectedTimezone || 'unknown'}`);
}
} catch (err: any) {
// Non-fatal - worker can still function
console.warn(`[TaskWorker] Could not report preflight status: ${err.message}`);
}
}
/**
* Lazy initialization of stealth systems.
* Called BEFORE claiming first task (not at worker startup).
* This allows workers to register and enter main loop immediately.
*
* Returns true if initialization succeeded, false otherwise.
*/
private async ensureStealthInitialized(): Promise<boolean> {
// Already initialized
if (this.stealthInitialized && this.preflightsCompleted) {
return true;
}
// Already initializing (prevent concurrent init attempts)
if (this.initializingPromise) {
await this.initializingPromise;
return this.stealthInitialized && this.preflightsCompleted;
}
console.log(`[TaskWorker] ${this.friendlyName} lazy-initializing stealth systems (first task claim)...`);
this.initializingPromise = (async () => {
try {
// Initialize proxy/fingerprint rotation
await this.initializeStealth();
this.stealthInitialized = true;
// Run dual-transport preflights
await this.runDualPreflights();
this.preflightsCompleted = true;
const preflightMsg = `curl=${this.preflightCurlPassed ? '✓' : '✗'} http=${this.preflightHttpPassed ? '✓' : '✗'}`;
console.log(`[TaskWorker] ${this.friendlyName} stealth ready (${preflightMsg})`);
} catch (err: any) {
console.error(`[TaskWorker] ${this.friendlyName} stealth init failed: ${err.message}`);
this.stealthInitialized = false;
this.preflightsCompleted = false;
}
})();
await this.initializingPromise;
this.initializingPromise = null;
return this.stealthInitialized && this.preflightsCompleted;
}
/**
* Register worker with the registry (get friendly name)
*/
@@ -517,7 +712,7 @@ export class TaskWorker {
}
/**
* Send heartbeat to registry with resource usage and proxy location
* Send heartbeat to registry with resource usage, proxy location, and step info
*/
private async sendRegistryHeartbeat(): Promise<void> {
try {
@@ -529,6 +724,9 @@ export class TaskWorker {
// Get array of active task IDs
const activeTaskIds = Array.from(this.activeTasks.keys());
// Get step info for all active tasks
const taskSteps = this.getTaskStepsInfo();
await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
@@ -539,6 +737,11 @@ export class TaskWorker {
active_task_count: this.activeTasks.size,
max_concurrent_tasks: this.maxConcurrentTasks,
status: this.activeTasks.size > 0 ? 'active' : 'idle',
// Step tracking for dashboard visibility
current_step: this.currentStep,
current_step_detail: this.currentStepDetail,
current_step_started_at: this.currentStepStartedAt?.toISOString() || null,
task_steps: taskSteps, // Per-task step info for concurrent workers
resources: {
memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024),
memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024),
@@ -597,25 +800,36 @@ export class TaskWorker {
/**
* Start the worker loop
*
* Workers start IMMEDIATELY without blocking on proxy/preflight init.
* Stealth systems are lazy-initialized on first task claim.
* This allows workers to register and send heartbeats even when proxies aren't ready.
*/
async start(): Promise<void> {
this.isRunning = true;
// Initialize stealth systems (proxy rotation, fingerprints)
await this.initializeStealth();
// Register with the API to get a friendly name
// Register with the API to get a friendly name (non-blocking)
await this.register();
// Run dual-transport preflights
await this.runDualPreflights();
// Start registry heartbeat
// Start registry heartbeat immediately
this.startRegistryHeartbeat();
// Cleanup stale tasks on startup (only worker-0 does this to avoid races)
// This handles tasks left in 'claimed'/'running' status when workers restart
if (this.workerId.endsWith('-0') || this.workerId === 'scraper-worker-0') {
try {
console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`);
const cleanupResult = await taskService.cleanupStaleTasks(30); // 30 minute threshold
if (cleanupResult.cleaned > 0) {
console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`);
}
} catch (err: any) {
console.error(`[TaskWorker] Stale task cleanup error:`, err.message);
}
}
const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)';
const preflightMsg = `curl=${this.preflightCurlPassed ? '✓' : '✗'} http=${this.preflightHttpPassed ? '✓' : '✗'}`;
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (${preflightMsg}, max ${this.maxConcurrentTasks} concurrent tasks)`);
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (stealth=lazy, max ${this.maxConcurrentTasks} concurrent tasks)`);
while (this.isRunning) {
try {
@@ -658,6 +872,12 @@ export class TaskWorker {
this.backoffReason = null;
}
// Periodically reload proxies to pick up changes (new proxies, disabled proxies)
// This runs every ~60 seconds (controlled by setProxyReloadInterval)
if (this.stealthInitialized) {
await this.crawlRotator.reloadIfStale();
}
// Check for decommission signal
const shouldDecommission = await this.checkDecommission();
if (shouldDecommission) {
@@ -669,6 +889,20 @@ export class TaskWorker {
// Try to claim more tasks if we have capacity
if (this.canAcceptMoreTasks()) {
// =================================================================
// LAZY INITIALIZATION - Initialize stealth on first task claim
// Workers start immediately and init proxies only when needed
// =================================================================
if (!this.stealthInitialized) {
const initSuccess = await this.ensureStealthInitialized();
if (!initSuccess) {
// Init failed - wait and retry next loop
console.log(`[TaskWorker] ${this.friendlyName} stealth init failed, waiting before retry...`);
await this.sleep(30000);
return;
}
}
// Pass preflight capabilities to only claim compatible tasks
const task = await taskService.claimTask(
this.role,
@@ -681,13 +915,32 @@ export class TaskWorker {
console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`);
// =================================================================
// PREFLIGHT CHECK - CRITICAL: Worker MUST pass before task execution
// Verifies: 1) Proxy available 2) Proxy connected 3) Anti-detect ready
// PREFLIGHT CHECK - Use stored preflight results based on task method
// We already ran dual-transport preflights at startup, so just verify
// the correct preflight passed for this task's required method.
// =================================================================
const preflight = await this.crawlRotator.preflight();
if (!preflight.passed) {
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${preflight.error}`);
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without proxy/anti-detect`);
const taskMethod = task.method || 'http'; // Default to http if not specified
let preflightPassed = false;
let preflightMsg = '';
if (taskMethod === 'http' && this.preflightHttpPassed) {
preflightPassed = true;
preflightMsg = `HTTP preflight passed (IP: ${this.preflightHttpResult?.proxyIp || 'unknown'})`;
} else if (taskMethod === 'curl' && this.preflightCurlPassed) {
preflightPassed = true;
preflightMsg = `CURL preflight passed (IP: ${this.preflightCurlResult?.proxyIp || 'unknown'})`;
} else if (!task.method && (this.preflightHttpPassed || this.preflightCurlPassed)) {
// No method preference - either transport works
preflightPassed = true;
preflightMsg = this.preflightHttpPassed ? 'HTTP preflight passed' : 'CURL preflight passed';
}
if (!preflightPassed) {
const errorMsg = taskMethod === 'http'
? 'HTTP preflight not passed - cannot execute http tasks'
: 'CURL preflight not passed - cannot execute curl tasks';
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${errorMsg}`);
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without preflight`);
// Release task back to pending so another worker can pick it up
await taskService.releaseTask(task.id);
@@ -697,7 +950,7 @@ export class TaskWorker {
return;
}
console.log(`[TaskWorker] ${this.friendlyName} preflight PASSED for task ${task.id} (proxy: ${preflight.proxyIp}, ${preflight.responseTimeMs}ms)`);
console.log(`[TaskWorker] ${this.friendlyName} preflight verified for task ${task.id}: ${preflightMsg}`);
this.activeTasks.set(task.id, task);
@@ -741,13 +994,13 @@ export class TaskWorker {
// Mark as running
await taskService.startTask(task.id);
// Get handler for this role
const handler = TASK_HANDLERS[task.role];
// Get handler for this role (considers method for dual-transport)
const handler = getHandlerForTask(task);
if (!handler) {
throw new Error(`No handler registered for role: ${task.role}`);
}
// Create context
// Create context with step tracking
const ctx: TaskContext = {
pool: this.pool,
workerId: this.workerId,
@@ -756,12 +1009,21 @@ export class TaskWorker {
await taskService.heartbeat(task.id);
},
crawlRotator: this.crawlRotator,
updateStep: (step: string, detail?: string) => {
this.updateTaskStep(task.id, step, detail);
},
};
// Initialize step tracking for this task
this.updateTaskStep(task.id, 'starting', `Initializing ${task.role}`);
// Execute the task
const result = await handler(ctx);
if (result.success) {
// Clear step tracking
this.clearTaskStep(task.id);
// Mark as completed
await taskService.completeTask(task.id, result);
await this.reportTaskCompletion(true);
@@ -777,12 +1039,18 @@ export class TaskWorker {
console.log(`[TaskWorker] Chained new task ${chainedTask.id} (${chainedTask.role})`);
}
} else {
// Clear step tracking
this.clearTaskStep(task.id);
// Mark as failed
await taskService.failTask(task.id, result.error || 'Unknown error');
await this.reportTaskCompletion(false);
console.log(`[TaskWorker] ${this.friendlyName} failed task ${task.id}: ${result.error}`);
}
} catch (error: any) {
// Clear step tracking
this.clearTaskStep(task.id);
// Mark as failed
await taskService.failTask(task.id, error.message);
await this.reportTaskCompletion(false);
@@ -904,7 +1172,10 @@ async function main(): Promise<void> {
process.exit(1);
}
const workerId = process.env.WORKER_ID;
// Use POD_NAME for persistent identity in K8s StatefulSet
// This ensures workers keep the same ID across restarts
// Falls back to WORKER_ID, then generates UUID if neither is set
const workerId = process.env.POD_NAME || process.env.WORKER_ID;
// Pass null for role-agnostic, or the specific role
const worker = new TaskWorker(role || null, workerId);

View File

@@ -5,10 +5,13 @@
*
* Design Pattern: Metadata/Payload Separation
* - Metadata in PostgreSQL (raw_crawl_payloads table): Small, indexed, queryable
* - Payload on filesystem: Gzipped JSON at storage_path
* - Payload stored in MinIO/S3 (or local filesystem as fallback): Gzipped JSON
*
* Storage structure:
* /storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
* Storage structure (MinIO):
* cannaiq/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
*
* Storage structure (Local fallback):
* ./storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
*
* Benefits:
* - Compare any two crawls to see what changed
@@ -16,6 +19,7 @@
* - Debug issues by seeing exactly what the API returned
* - DB stays small, backups stay fast
* - ~90% compression (1.5MB -> 150KB per crawl)
* - Shared storage accessible by all worker pods (MinIO)
*/
import * as fs from 'fs';
@@ -24,13 +28,47 @@ import * as zlib from 'zlib';
import { promisify } from 'util';
import { Pool } from 'pg';
import * as crypto from 'crypto';
import * as Minio from 'minio';
const gzip = promisify(zlib.gzip);
const gunzip = promisify(zlib.gunzip);
// Base path for payload storage (matches image storage pattern)
// Base path for payload storage (used for local fallback and as key prefix in MinIO)
const PAYLOAD_BASE_PATH = process.env.PAYLOAD_STORAGE_PATH || './storage/payloads';
// MinIO configuration
const MINIO_ENDPOINT = process.env.MINIO_ENDPOINT;
const MINIO_PORT = parseInt(process.env.MINIO_PORT || '443');
const MINIO_USE_SSL = process.env.MINIO_USE_SSL === 'true';
const MINIO_ACCESS_KEY = process.env.MINIO_ACCESS_KEY;
const MINIO_SECRET_KEY = process.env.MINIO_SECRET_KEY;
const MINIO_BUCKET = process.env.MINIO_BUCKET || 'cannaiq';
// Check if MinIO is configured
const useMinIO = !!(MINIO_ENDPOINT && MINIO_ACCESS_KEY && MINIO_SECRET_KEY);
let minioClient: Minio.Client | null = null;
function getMinioClient(): Minio.Client {
if (!minioClient && useMinIO) {
minioClient = new Minio.Client({
endPoint: MINIO_ENDPOINT!,
port: MINIO_PORT,
useSSL: MINIO_USE_SSL,
accessKey: MINIO_ACCESS_KEY!,
secretKey: MINIO_SECRET_KEY!,
});
}
return minioClient!;
}
// Log which storage backend we're using
if (useMinIO) {
console.log(`[PayloadStorage] Using MinIO storage: ${MINIO_ENDPOINT}/${MINIO_BUCKET}`);
} else {
console.log(`[PayloadStorage] Using local filesystem storage: ${PAYLOAD_BASE_PATH}`);
}
/**
* Result from saving a payload
*/
@@ -58,9 +96,10 @@ export interface LoadPayloadResult {
}
/**
* Generate storage path for a payload
* Generate storage path/key for a payload
*
* Format: /storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
* MinIO format: payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
* Local format: ./storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
*/
function generateStoragePath(dispensaryId: number, timestamp: Date): string {
const year = timestamp.getFullYear();
@@ -68,13 +107,15 @@ function generateStoragePath(dispensaryId: number, timestamp: Date): string {
const day = String(timestamp.getDate()).padStart(2, '0');
const ts = timestamp.getTime();
return path.join(
PAYLOAD_BASE_PATH,
String(year),
month,
day,
`store_${dispensaryId}_${ts}.json.gz`
);
const relativePath = `payloads/${year}/${month}/${day}/store_${dispensaryId}_${ts}.json.gz`;
if (useMinIO) {
// MinIO uses forward slashes, no leading slash
return relativePath;
} else {
// Local filesystem uses OS-specific path
return path.join(PAYLOAD_BASE_PATH, String(year), month, day, `store_${dispensaryId}_${ts}.json.gz`);
}
}
/**
@@ -93,7 +134,7 @@ function calculateChecksum(data: Buffer): string {
}
/**
* Save a raw crawl payload to filesystem and record metadata in DB
* Save a raw crawl payload to MinIO/S3 (or filesystem) and record metadata in DB
*
* @param pool - Database connection pool
* @param dispensaryId - ID of the dispensary
@@ -119,9 +160,19 @@ export async function saveRawPayload(
const compressedSize = compressed.length;
const checksum = calculateChecksum(compressed);
// Write to filesystem
// Write to storage backend
if (useMinIO) {
// Upload to MinIO
const client = getMinioClient();
await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, {
'Content-Type': 'application/gzip',
'Content-Encoding': 'gzip',
});
} else {
// Write to local filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
}
// Record metadata in DB
const result = await pool.query(`
@@ -147,7 +198,8 @@ export async function saveRawPayload(
checksum
]);
console.log(`[PayloadStorage] Saved payload for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`);
const backend = useMinIO ? 'MinIO' : 'local';
console.log(`[PayloadStorage] Saved payload to ${backend} for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`);
return {
id: result.rows[0].id,
@@ -196,13 +248,32 @@ export async function loadRawPayloadById(
}
/**
* Load a raw payload directly from filesystem path
* Load a raw payload directly from storage path (MinIO or filesystem)
*
* @param storagePath - Path to gzipped JSON file
* @param storagePath - Path/key to gzipped JSON file
* @returns Parsed JSON payload
*/
export async function loadPayloadFromPath(storagePath: string): Promise<any> {
const compressed = await fs.promises.readFile(storagePath);
let compressed: Buffer;
// Determine if path looks like MinIO key (starts with payloads/) or local path
const isMinIOPath = storagePath.startsWith('payloads/') && useMinIO;
if (isMinIOPath) {
// Download from MinIO
const client = getMinioClient();
const chunks: Buffer[] = [];
const stream = await client.getObject(MINIO_BUCKET, storagePath);
for await (const chunk of stream) {
chunks.push(chunk as Buffer);
}
compressed = Buffer.concat(chunks);
} else {
// Read from local filesystem
compressed = await fs.promises.readFile(storagePath);
}
const decompressed = await gunzip(compressed);
return JSON.parse(decompressed.toString('utf8'));
}
@@ -366,6 +437,152 @@ export async function listPayloadMetadata(
}));
}
/**
* Result from saving a discovery payload
*/
export interface SaveDiscoveryPayloadResult {
id: number;
storagePath: string;
sizeBytes: number;
sizeBytesRaw: number;
checksum: string;
}
/**
* Generate storage path/key for a discovery payload
*
* MinIO format: payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
* Local format: ./storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
*/
function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): string {
const year = timestamp.getFullYear();
const month = String(timestamp.getMonth() + 1).padStart(2, '0');
const day = String(timestamp.getDate()).padStart(2, '0');
const ts = timestamp.getTime();
const relativePath = `payloads/discovery/${year}/${month}/${day}/state_${stateCode.toLowerCase()}_${ts}.json.gz`;
if (useMinIO) {
return relativePath;
} else {
return path.join(PAYLOAD_BASE_PATH, 'discovery', String(year), month, day, `state_${stateCode.toLowerCase()}_${ts}.json.gz`);
}
}
/**
* Save a raw store discovery payload to MinIO/S3 (or filesystem) and record metadata in DB
*
* @param pool - Database connection pool
* @param stateCode - State code (e.g., 'AZ', 'MI')
* @param payload - Raw JSON payload from discovery GraphQL
* @param storeCount - Number of stores in payload
* @returns SaveDiscoveryPayloadResult with file info and DB record ID
*/
export async function saveDiscoveryPayload(
pool: Pool,
stateCode: string,
payload: any,
storeCount: number = 0
): Promise<SaveDiscoveryPayloadResult> {
const timestamp = new Date();
const storagePath = generateDiscoveryStoragePath(stateCode, timestamp);
// Serialize and compress
const jsonStr = JSON.stringify(payload);
const rawSize = Buffer.byteLength(jsonStr, 'utf8');
const compressed = await gzip(Buffer.from(jsonStr, 'utf8'));
const compressedSize = compressed.length;
const checksum = calculateChecksum(compressed);
// Write to storage backend
if (useMinIO) {
// Upload to MinIO
const client = getMinioClient();
await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, {
'Content-Type': 'application/gzip',
'Content-Encoding': 'gzip',
});
} else {
// Write to local filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
}
// Record metadata in DB
const result = await pool.query(`
INSERT INTO raw_crawl_payloads (
payload_type,
state_code,
storage_path,
store_count,
size_bytes,
size_bytes_raw,
fetched_at,
checksum_sha256
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id
`, [
'store_discovery',
stateCode.toUpperCase(),
storagePath,
storeCount,
compressedSize,
rawSize,
timestamp,
checksum
]);
const backend = useMinIO ? 'MinIO' : 'local';
console.log(`[PayloadStorage] Saved discovery payload to ${backend} for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`);
return {
id: result.rows[0].id,
storagePath,
sizeBytes: compressedSize,
sizeBytesRaw: rawSize,
checksum
};
}
/**
* Get the latest discovery payload for a state
*
* @param pool - Database connection pool
* @param stateCode - State code (e.g., 'AZ', 'MI')
* @returns Parsed payload and metadata, or null if none exists
*/
export async function getLatestDiscoveryPayload(
pool: Pool,
stateCode: string
): Promise<{ payload: any; metadata: any } | null> {
const result = await pool.query(`
SELECT id, state_code, storage_path, store_count, fetched_at
FROM raw_crawl_payloads
WHERE payload_type = 'store_discovery'
AND state_code = $1
ORDER BY fetched_at DESC
LIMIT 1
`, [stateCode.toUpperCase()]);
if (result.rows.length === 0) {
return null;
}
const row = result.rows[0];
const payload = await loadPayloadFromPath(row.storage_path);
return {
payload,
metadata: {
id: row.id,
stateCode: row.state_code,
storeCount: row.store_count,
fetchedAt: row.fetched_at,
storagePath: row.storage_path
}
};
}
/**
* Delete old payloads (for retention policy)
*

View File

@@ -2666,13 +2666,25 @@ class ApiClient {
// Dashboard methods
getMarketDashboard = this.getMarketsDashboard.bind(this);
// Schedule methods (no conflicts)
// ============================================================
// LEGACY SCHEDULE METHODS (DEPRECATED 2025-12-12)
// These use /api/markets/admin/schedules which queries job_schedules
// Use getTaskSchedules(), updateTaskSchedule(), etc. instead
// (defined below, use /api/tasks/schedules which queries task_schedules)
// ============================================================
/** @deprecated Use getTaskSchedules() - queries task_schedules table */
getSchedules = this.getCrawlSchedules.bind(this);
/** @deprecated Use getTaskSchedule() - queries task_schedules table */
getSchedule = this.getDutchieAZSchedule.bind(this);
/** @deprecated Use createTaskSchedule() - queries task_schedules table */
createSchedule = this.createDutchieAZSchedule.bind(this);
/** @deprecated Use updateTaskSchedule() - queries task_schedules table */
updateSchedule = this.updateDutchieAZSchedule.bind(this);
/** @deprecated Use deleteTaskSchedule() - queries task_schedules table */
deleteSchedule = this.deleteDutchieAZSchedule.bind(this);
/** @deprecated Use runTaskScheduleNow() - queries task_schedules table */
triggerSchedule = this.triggerDutchieAZSchedule.bind(this);
/** @deprecated - job_schedules init not needed for task_schedules */
initSchedules = this.initDutchieAZSchedules.bind(this);
getScheduleLogs = this.getCrawlScheduleLogs.bind(this);
getRunLogs = this.getDutchieAZRunLogs.bind(this);
@@ -2976,6 +2988,101 @@ class ApiClient {
{ method: 'POST', body: JSON.stringify({ replicas }) }
);
}
// ==========================================
// Task Schedules API (recurring task definitions)
// ==========================================
async getTaskSchedules(enabledOnly?: boolean) {
const qs = enabledOnly ? '?enabled=true' : '';
return this.request<{ schedules: TaskSchedule[] }>(`/api/tasks/schedules${qs}`);
}
async getTaskSchedule(id: number) {
return this.request<TaskSchedule>(`/api/tasks/schedules/${id}`);
}
async createTaskSchedule(data: {
name: string;
role: string;
description?: string;
enabled?: boolean;
interval_hours: number;
priority?: number;
state_code?: string;
platform?: string;
}) {
return this.request<TaskSchedule>('/api/tasks/schedules', {
method: 'POST',
body: JSON.stringify(data),
});
}
async updateTaskSchedule(id: number, data: Partial<{
name: string;
role: string;
description: string;
enabled: boolean;
interval_hours: number;
priority: number;
state_code: string;
platform: string;
}>) {
return this.request<TaskSchedule>(`/api/tasks/schedules/${id}`, {
method: 'PUT',
body: JSON.stringify(data),
});
}
async deleteTaskSchedule(id: number) {
return this.request<{ success: boolean; message: string }>(`/api/tasks/schedules/${id}`, {
method: 'DELETE',
});
}
async deleteTaskSchedulesBulk(ids?: number[], all?: boolean) {
return this.request<{ success: boolean; deleted_count: number; deleted: { id: number; name: string }[]; message: string }>(
'/api/tasks/schedules',
{
method: 'DELETE',
body: JSON.stringify({ ids, all }),
}
);
}
async runTaskScheduleNow(id: number) {
return this.request<{ success: boolean; message: string; tasksCreated?: number; stateCode?: string }>(`/api/tasks/schedules/${id}/run-now`, {
method: 'POST',
});
}
async toggleTaskSchedule(id: number) {
return this.request<{ success: boolean; schedule: { id: number; name: string; enabled: boolean }; message: string }>(
`/api/tasks/schedules/${id}/toggle`,
{ method: 'POST' }
);
}
}
// Type for task schedules
export interface TaskSchedule {
id: number;
name: string;
role: string;
description: string | null;
enabled: boolean;
interval_hours: number;
priority: number;
state_code: string | null;
platform: string | null;
method: 'curl' | 'http' | null;
is_immutable: boolean;
last_run_at: string | null;
next_run_at: string | null;
last_task_count: number;
last_error: string | null;
created_at: string;
updated_at: string;
}
export const api = new ApiClient(API_URL);

View File

@@ -1,3 +1,18 @@
/**
* @deprecated 2025-12-12
*
* This page used the legacy job_schedules table which has been deprecated.
* All schedule management has been consolidated into task_schedules and
* is now managed via the /admin/tasks page (TasksDashboard.tsx).
*
* The job_schedules table entries have been disabled and marked deprecated.
* This page is no longer in the navigation menu but kept for reference.
*
* Migration details:
* - job_schedules used base_interval_minutes + jitter_minutes
* - task_schedules uses interval_hours (simpler model)
* - All CRUD operations now via /api/tasks/schedules endpoints
*/
import { useEffect, useState } from 'react';
import { Layout } from '../components/Layout';
import { api } from '../lib/api';

View File

@@ -1,5 +1,5 @@
import { useState, useEffect } from 'react';
import { api } from '../lib/api';
import { api, TaskSchedule } from '../lib/api';
import { Layout } from '../components/Layout';
import {
ListChecks,
@@ -21,6 +21,12 @@ import {
X,
Calendar,
Trash2,
Edit2,
Play,
Pause,
Timer,
Lock,
Globe,
} from 'lucide-react';
interface Task {
@@ -381,10 +387,264 @@ const ROLES = [
'store_discovery',
'entry_point_discovery',
'product_discovery',
'product_refresh',
'analytics_refresh',
];
// ============================================================
// Schedule Edit Modal
// ============================================================
interface ScheduleEditModalProps {
isOpen: boolean;
schedule: TaskSchedule | null;
onClose: () => void;
onSave: () => void;
}
function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditModalProps) {
const [name, setName] = useState('');
const [role, setRole] = useState('product_refresh');
const [description, setDescription] = useState('');
const [enabled, setEnabled] = useState(true);
const [intervalHours, setIntervalHours] = useState(4);
const [priority, setPriority] = useState(0);
const [stateCode, setStateCode] = useState('');
const [platform, setPlatform] = useState('dutchie');
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
const isNew = !schedule;
const isImmutable = schedule?.is_immutable ?? false;
useEffect(() => {
if (schedule) {
setName(schedule.name);
setRole(schedule.role);
setDescription(schedule.description || '');
setEnabled(schedule.enabled);
setIntervalHours(schedule.interval_hours);
setPriority(schedule.priority);
setStateCode(schedule.state_code || '');
setPlatform(schedule.platform || 'dutchie');
} else {
// Reset for new schedule
setName('');
setRole('product_refresh');
setDescription('');
setEnabled(true);
setIntervalHours(4);
setPriority(0);
setStateCode('');
setPlatform('dutchie');
}
setError(null);
}, [schedule, isOpen]);
const handleSubmit = async () => {
if (!isImmutable && !name.trim()) {
setError('Name is required');
return;
}
setLoading(true);
setError(null);
try {
// For immutable schedules, only send allowed fields
const data = isImmutable
? {
enabled,
interval_hours: intervalHours,
priority,
}
: {
name: name.trim(),
role,
description: description.trim() || undefined,
enabled,
interval_hours: intervalHours,
priority,
state_code: stateCode.trim() || undefined,
platform: platform.trim() || undefined,
};
if (isNew) {
await api.createTaskSchedule(data as any);
} else {
await api.updateTaskSchedule(schedule!.id, data);
}
onSave();
onClose();
} catch (err: any) {
setError(err.response?.data?.error || err.message || 'Failed to save schedule');
} finally {
setLoading(false);
}
};
if (!isOpen) return null;
return (
<div className="fixed inset-0 z-50 overflow-y-auto">
<div className="flex min-h-full items-center justify-center p-4">
<div className="fixed inset-0 bg-black/50" onClick={onClose} />
<div className="relative bg-white rounded-xl shadow-xl max-w-lg w-full">
<div className="px-6 py-4 border-b border-gray-200 flex items-center justify-between">
<h2 className="text-lg font-semibold text-gray-900">
{isNew ? 'Create Schedule' : 'Edit Schedule'}
</h2>
<button onClick={onClose} className="p-1 hover:bg-gray-100 rounded">
<X className="w-5 h-5 text-gray-500" />
</button>
</div>
<div className="px-6 py-4 space-y-4">
{error && (
<div className="bg-red-50 border border-red-200 rounded-lg p-3 text-red-700 text-sm">
{error}
</div>
)}
{isImmutable && (
<div className="bg-amber-50 border border-amber-200 rounded-lg p-3 flex items-start gap-2">
<Lock className="w-4 h-4 text-amber-600 flex-shrink-0 mt-0.5" />
<div className="text-sm text-amber-800">
<strong>Immutable schedule.</strong> Only <em>Enabled</em>, <em>Interval</em>, and <em>Priority</em> can be modified.
</div>
</div>
)}
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Name *</label>
<input
type="text"
value={name}
onChange={(e) => setName(e.target.value)}
placeholder="e.g., product_refresh_all"
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Role *</label>
<select
value={role}
onChange={(e) => setRole(e.target.value)}
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
>
{TASK_ROLES.map(r => (
<option key={r.id} value={r.id}>{r.name}</option>
))}
</select>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Description</label>
<input
type="text"
value={description}
onChange={(e) => setDescription(e.target.value)}
placeholder="Optional description"
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
<div className="grid grid-cols-2 gap-4">
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Interval (hours) *</label>
<input
type="number"
min="1"
max="168"
value={intervalHours}
onChange={(e) => setIntervalHours(parseInt(e.target.value) || 4)}
className="w-full px-3 py-2 border border-gray-200 rounded-lg"
/>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Priority</label>
<input
type="number"
min="0"
max="100"
value={priority}
onChange={(e) => setPriority(parseInt(e.target.value) || 0)}
className="w-full px-3 py-2 border border-gray-200 rounded-lg"
/>
</div>
</div>
<div className="grid grid-cols-2 gap-4">
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">State Code</label>
<input
type="text"
value={stateCode}
onChange={(e) => setStateCode(e.target.value.toUpperCase())}
placeholder="e.g., AZ"
maxLength={2}
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Platform</label>
<input
type="text"
value={platform}
onChange={(e) => setPlatform(e.target.value)}
placeholder="e.g., dutchie"
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
</div>
<div className="flex items-center gap-2">
<input
type="checkbox"
id="enabled"
checked={enabled}
onChange={(e) => setEnabled(e.target.checked)}
className="w-4 h-4 text-emerald-600 rounded"
/>
<label htmlFor="enabled" className="text-sm text-gray-700">Enabled</label>
</div>
</div>
<div className="px-6 py-4 border-t border-gray-200 bg-gray-50 flex justify-end gap-3">
<button onClick={onClose} className="px-4 py-2 text-sm text-gray-700 hover:bg-gray-100 rounded-lg">
Cancel
</button>
<button
onClick={handleSubmit}
disabled={loading}
className="px-4 py-2 text-sm bg-emerald-600 text-white rounded-lg hover:bg-emerald-700 disabled:opacity-50 flex items-center gap-2"
>
{loading && <RefreshCw className="w-4 h-4 animate-spin" />}
{isNew ? 'Create' : 'Save'}
</button>
</div>
</div>
</div>
</div>
);
}
const STATUS_COLORS: Record<string, string> = {
pending: 'bg-yellow-100 text-yellow-800',
claimed: 'bg-blue-100 text-blue-800',
@@ -443,6 +703,28 @@ function formatTimeAgo(dateStr: string | null): string {
return `${Math.floor(diff / 86400)}d ago`;
}
function formatNextRun(dateStr: string | null): string {
if (!dateStr) return '-';
const date = new Date(dateStr);
const now = new Date();
const diff = (date.getTime() - now.getTime()) / 1000;
// If in the past, show "overdue"
if (diff < 0) {
const absDiff = Math.abs(diff);
if (absDiff < 60) return 'overdue';
if (absDiff < 3600) return `${Math.floor(absDiff / 60)}m overdue`;
if (absDiff < 86400) return `${Math.floor(absDiff / 3600)}h overdue`;
return `${Math.floor(absDiff / 86400)}d overdue`;
}
// Future time
if (diff < 60) return `in ${Math.round(diff)}s`;
if (diff < 3600) return `in ${Math.floor(diff / 60)}m`;
if (diff < 86400) return `in ${Math.floor(diff / 3600)}h ${Math.floor((diff % 3600) / 60)}m`;
return `in ${Math.floor(diff / 86400)}d ${Math.floor((diff % 86400) / 3600)}h`;
}
export default function TasksDashboard() {
const [tasks, setTasks] = useState<Task[]>([]);
const [counts, setCounts] = useState<TaskCounts | null>(null);
@@ -452,6 +734,14 @@ export default function TasksDashboard() {
const [poolPaused, setPoolPaused] = useState(false);
const [showCreateModal, setShowCreateModal] = useState(false);
// Schedules state
const [schedules, setSchedules] = useState<TaskSchedule[]>([]);
const [showSchedules, setShowSchedules] = useState(true);
const [selectedSchedules, setSelectedSchedules] = useState<Set<number>>(new Set());
const [editingSchedule, setEditingSchedule] = useState<TaskSchedule | null>(null);
const [showScheduleModal, setShowScheduleModal] = useState(false);
const [runningScheduleId, setRunningScheduleId] = useState<number | null>(null);
// Pagination
const [page, setPage] = useState(0);
const tasksPerPage = 25;
@@ -465,7 +755,7 @@ export default function TasksDashboard() {
const fetchData = async () => {
try {
const [tasksRes, countsRes, capacityRes, poolStatus] = await Promise.all([
const [tasksRes, countsRes, capacityRes, poolStatus, schedulesRes] = await Promise.all([
api.getTasks({
role: roleFilter || undefined,
status: statusFilter || undefined,
@@ -474,12 +764,14 @@ export default function TasksDashboard() {
api.getTaskCounts(),
api.getTaskCapacity(),
api.getTaskPoolStatus(),
api.getTaskSchedules(),
]);
setTasks(tasksRes.tasks || []);
setCounts(countsRes);
setCapacity(capacityRes.metrics || []);
setPoolPaused(poolStatus.paused);
setSchedules(schedulesRes.schedules || []);
setError(null);
} catch (err: any) {
setError(err.message || 'Failed to load tasks');
@@ -488,6 +780,95 @@ export default function TasksDashboard() {
}
};
const handleDeleteSchedule = async (scheduleId: number) => {
if (!confirm('Delete this schedule?')) return;
try {
await api.deleteTaskSchedule(scheduleId);
setSelectedSchedules(prev => {
const next = new Set(prev);
next.delete(scheduleId);
return next;
});
fetchData();
} catch (err: any) {
console.error('Delete schedule error:', err);
alert(err.response?.data?.error || 'Failed to delete schedule');
}
};
const handleBulkDeleteSchedules = async () => {
// Filter out immutable schedules from selection
const deletableIds = Array.from(selectedSchedules).filter(id => {
const schedule = schedules.find(s => s.id === id);
return schedule && !schedule.is_immutable;
});
if (deletableIds.length === 0) {
alert('No deletable schedules selected. Immutable schedules cannot be deleted.');
return;
}
const immutableCount = selectedSchedules.size - deletableIds.length;
const confirmMsg = immutableCount > 0
? `Delete ${deletableIds.length} schedule(s)? (${immutableCount} immutable schedule(s) will be skipped)`
: `Delete ${deletableIds.length} selected schedule(s)?`;
if (!confirm(confirmMsg)) return;
try {
await api.deleteTaskSchedulesBulk(deletableIds);
setSelectedSchedules(new Set());
fetchData();
} catch (err: any) {
console.error('Bulk delete error:', err);
alert(err.response?.data?.error || 'Failed to delete schedules');
}
};
const handleToggleSchedule = async (scheduleId: number) => {
try {
await api.toggleTaskSchedule(scheduleId);
fetchData();
} catch (err: any) {
console.error('Toggle schedule error:', err);
alert(err.response?.data?.error || 'Failed to toggle schedule');
}
};
const handleRunScheduleNow = async (scheduleId: number) => {
if (runningScheduleId !== null) return; // Prevent duplicate clicks
setRunningScheduleId(scheduleId);
try {
const result = await api.runTaskScheduleNow(scheduleId) as { success: boolean; message: string; tasksCreated?: number };
alert(result.message + (result.tasksCreated ? ` (${result.tasksCreated} tasks created)` : ''));
fetchData();
} catch (err: any) {
console.error('Run schedule error:', err);
alert(err.response?.data?.error || 'Failed to run schedule');
} finally {
setRunningScheduleId(null);
}
};
const toggleSelectSchedule = (id: number) => {
setSelectedSchedules(prev => {
const next = new Set(prev);
if (next.has(id)) {
next.delete(id);
} else {
next.add(id);
}
return next;
});
};
const toggleSelectAllSchedules = () => {
if (selectedSchedules.size === schedules.length) {
setSelectedSchedules(new Set());
} else {
setSelectedSchedules(new Set(schedules.map(s => s.id)));
}
};
const handleDeleteTask = async (taskId: number) => {
if (!confirm('Delete this task?')) return;
try {
@@ -583,6 +964,17 @@ export default function TasksDashboard() {
onTaskCreated={fetchData}
/>
{/* Schedule Edit Modal */}
<ScheduleEditModal
isOpen={showScheduleModal}
schedule={editingSchedule}
onClose={() => {
setShowScheduleModal(false);
setEditingSchedule(null);
}}
onSave={fetchData}
/>
{/* Status Summary Cards */}
<div className="grid grid-cols-2 sm:grid-cols-3 lg:grid-cols-6 gap-4">
{Object.entries(counts || {}).map(([status, count]) => (
@@ -681,18 +1073,18 @@ export default function TasksDashboard() {
{formatDuration(metric.avg_duration_sec)}
</td>
<td className="px-4 py-3 text-sm text-right text-gray-600">
{metric.tasks_per_worker_hour?.toFixed(1) || '-'}
{metric.tasks_per_worker_hour ? Number(metric.tasks_per_worker_hour).toFixed(1) : '-'}
</td>
<td className="px-4 py-3 text-sm text-right">
{metric.estimated_hours_to_drain ? (
<span
className={
metric.estimated_hours_to_drain > 4
Number(metric.estimated_hours_to_drain) > 4
? 'text-red-600 font-medium'
: 'text-gray-600'
}
>
{metric.estimated_hours_to_drain.toFixed(1)}h
{Number(metric.estimated_hours_to_drain).toFixed(1)}h
</span>
) : (
'-'
@@ -714,6 +1106,245 @@ export default function TasksDashboard() {
)}
</div>
{/* Schedules Section */}
<div className="bg-white rounded-lg border border-gray-200 overflow-hidden">
<button
onClick={() => setShowSchedules(!showSchedules)}
className="w-full flex items-center justify-between p-4 hover:bg-gray-50"
>
<div className="flex items-center gap-2">
<Timer className="w-5 h-5 text-emerald-600" />
<span className="font-medium text-gray-900">Schedules ({schedules.length})</span>
</div>
{showSchedules ? (
<ChevronUp className="w-5 h-5 text-gray-400" />
) : (
<ChevronDown className="w-5 h-5 text-gray-400" />
)}
</button>
{showSchedules && (
<div className="border-t border-gray-200">
{/* Schedule Actions */}
<div className="p-4 bg-gray-50 border-b border-gray-200 flex flex-wrap items-center justify-between gap-2">
<div className="flex items-center gap-2">
<button
onClick={() => {
setEditingSchedule(null);
setShowScheduleModal(true);
}}
className="flex items-center gap-1 px-3 py-1.5 text-sm bg-emerald-600 text-white rounded hover:bg-emerald-700"
>
<Plus className="w-4 h-4" />
New Schedule
</button>
{selectedSchedules.size > 0 && (
<button
onClick={handleBulkDeleteSchedules}
className="flex items-center gap-1 px-3 py-1.5 text-sm bg-red-600 text-white rounded hover:bg-red-700"
>
<Trash2 className="w-4 h-4" />
Delete ({selectedSchedules.size})
</button>
)}
</div>
<span className="text-sm text-gray-500">
{schedules.filter(s => s.enabled).length} enabled
</span>
</div>
{schedules.length === 0 ? (
<div className="p-8 text-center text-gray-500">
No schedules configured. Click "New Schedule" to create one.
</div>
) : (
<div className="overflow-x-auto">
<table className="min-w-full divide-y divide-gray-200">
<thead className="bg-gray-50">
<tr>
<th className="px-4 py-3 text-left">
<input
type="checkbox"
checked={selectedSchedules.size === schedules.length && schedules.length > 0}
onChange={toggleSelectAllSchedules}
className="w-4 h-4 text-emerald-600 rounded"
/>
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Name
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Role
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
State
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Method
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Interval
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Last Run
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Next Run
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Status
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase w-32">
Actions
</th>
</tr>
</thead>
<tbody className="divide-y divide-gray-200">
{schedules.map((schedule) => (
<tr key={schedule.id} className="hover:bg-gray-50">
<td className="px-4 py-3">
<input
type="checkbox"
checked={selectedSchedules.has(schedule.id)}
onChange={() => toggleSelectSchedule(schedule.id)}
className="w-4 h-4 text-emerald-600 rounded"
disabled={schedule.is_immutable}
/>
</td>
<td className="px-4 py-3">
<div className="flex items-center gap-2">
{schedule.is_immutable && (
<span title="Immutable schedule (cannot be deleted)">
<Lock className="w-3.5 h-3.5 text-amber-500 flex-shrink-0" />
</span>
)}
<div>
<div className="text-sm font-medium text-gray-900">{schedule.name}</div>
{schedule.description && (
<div className="text-xs text-gray-500">{schedule.description}</div>
)}
</div>
</div>
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.role.replace(/_/g, ' ')}
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.state_code ? (
<span className="inline-flex items-center gap-1 px-2 py-0.5 bg-blue-50 text-blue-700 rounded font-medium">
<Globe className="w-3 h-3" />
{schedule.state_code}
</span>
) : (
<span className="text-gray-400">-</span>
)}
</td>
<td className="px-4 py-3 text-sm">
<span className={`inline-flex items-center px-2 py-0.5 rounded text-xs font-medium ${
schedule.method === 'http'
? 'bg-purple-100 text-purple-700'
: schedule.method === 'curl'
? 'bg-orange-100 text-orange-700'
: 'bg-gray-100 text-gray-600'
}`}>
{schedule.method || 'any'}
</span>
</td>
<td className="px-4 py-3 text-sm text-gray-600">
Every {schedule.interval_hours}h
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.last_run_at ? formatTimeAgo(schedule.last_run_at) : '-'}
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.next_run_at ? formatNextRun(schedule.next_run_at) : '-'}
</td>
<td className="px-4 py-3">
<span
className={`inline-flex items-center gap-1 px-2 py-1 rounded-full text-xs font-medium ${
schedule.enabled
? 'bg-green-100 text-green-800'
: 'bg-gray-100 text-gray-800'
}`}
>
{schedule.enabled ? (
<>
<Play className="w-3 h-3" />
Active
</>
) : (
<>
<Pause className="w-3 h-3" />
Paused
</>
)}
</span>
</td>
<td className="px-4 py-3">
<div className="flex items-center gap-1">
<button
onClick={() => handleRunScheduleNow(schedule.id)}
disabled={runningScheduleId !== null}
className={`p-1.5 rounded transition-colors ${
runningScheduleId === schedule.id
? 'text-emerald-600 bg-emerald-50 cursor-wait'
: runningScheduleId !== null
? 'text-gray-300 cursor-not-allowed'
: 'text-gray-400 hover:text-emerald-600 hover:bg-emerald-50'
}`}
title={runningScheduleId === schedule.id ? 'Running...' : 'Run now'}
>
<PlayCircle className={`w-4 h-4 ${runningScheduleId === schedule.id ? 'animate-pulse' : ''}`} />
</button>
<button
onClick={() => handleToggleSchedule(schedule.id)}
className={`p-1.5 rounded transition-colors ${
schedule.enabled
? 'text-gray-400 hover:text-yellow-600 hover:bg-yellow-50'
: 'text-gray-400 hover:text-green-600 hover:bg-green-50'
}`}
title={schedule.enabled ? 'Pause' : 'Enable'}
>
{schedule.enabled ? (
<Pause className="w-4 h-4" />
) : (
<Play className="w-4 h-4" />
)}
</button>
<button
onClick={() => {
setEditingSchedule(schedule);
setShowScheduleModal(true);
}}
className="p-1.5 text-gray-400 hover:text-blue-600 hover:bg-blue-50 rounded transition-colors"
title={schedule.is_immutable ? 'Edit (limited fields)' : 'Edit'}
>
<Edit2 className="w-4 h-4" />
</button>
<button
onClick={() => handleDeleteSchedule(schedule.id)}
disabled={schedule.is_immutable}
className={`p-1.5 rounded transition-colors ${
schedule.is_immutable
? 'text-gray-300 cursor-not-allowed'
: 'text-gray-400 hover:text-red-600 hover:bg-red-50'
}`}
title={schedule.is_immutable ? 'Cannot delete immutable schedule' : 'Delete'}
>
<Trash2 className="w-4 h-4" />
</button>
</div>
</td>
</tr>
))}
</tbody>
</table>
</div>
)}
</div>
)}
</div>
{/* Filters */}
<div className="flex flex-col sm:flex-row gap-4">
<div className="relative flex-1">

View File

@@ -78,6 +78,16 @@ interface Worker {
timezone?: string;
isRotating?: boolean;
};
// Step tracking
current_step?: string;
current_step_detail?: string;
current_step_started_at?: string;
task_steps?: Array<{
task_id: number;
step: string;
detail: string | null;
elapsed_ms: number;
}>;
} | null;
}
@@ -87,11 +97,24 @@ interface Task {
role: string;
dispensary_id: number | null;
dispensary_name?: string;
dispensary_slug?: string;
status: string;
priority: number;
started_at: string | null;
completed_at: string | null;
claimed_by: string | null;
worker_id: string | null;
error_message?: string | null;
result?: {
success?: boolean;
productsProcessed?: number;
snapshotsCreated?: number;
newProducts?: number;
updatedProducts?: number;
storesDiscovered?: number;
markedOos?: number;
error?: string;
} | null;
}
function formatRelativeTime(dateStr: string | null): string {
@@ -349,7 +372,59 @@ function TransportBadge({ worker }: { worker: Worker }) {
);
}
// Task count badge showing active/max concurrent tasks
// Step badge showing current step with detail
function StepBadge({ worker }: { worker: Worker }) {
const step = worker.metadata?.current_step;
const detail = worker.metadata?.current_step_detail;
const startedAt = worker.metadata?.current_step_started_at;
const taskSteps = worker.metadata?.task_steps;
if (!step || step === 'idle') {
return null;
}
// Calculate elapsed time
let elapsedStr = '';
if (startedAt) {
const elapsed = Date.now() - new Date(startedAt).getTime();
if (elapsed < 60000) {
elapsedStr = `${Math.round(elapsed / 1000)}s`;
} else {
elapsedStr = `${Math.round(elapsed / 60000)}m`;
}
}
// Step colors
const getStepColor = (s: string) => {
if (s.includes('preflight')) return 'text-yellow-600 bg-yellow-50';
if (s.includes('loading') || s.includes('navigating')) return 'text-blue-600 bg-blue-50';
if (s.includes('processing') || s.includes('normalizing')) return 'text-purple-600 bg-purple-50';
if (s.includes('saving') || s.includes('upserting')) return 'text-emerald-600 bg-emerald-50';
if (s.includes('error') || s.includes('failed')) return 'text-red-600 bg-red-50';
return 'text-gray-600 bg-gray-50';
};
const colorClass = getStepColor(step);
// Build tooltip with all task steps if concurrent
const tooltipLines = taskSteps?.map(ts =>
`Task #${ts.task_id}: ${ts.step}${ts.detail ? ` - ${ts.detail}` : ''} (${Math.round(ts.elapsed_ms / 1000)}s)`
) || [];
return (
<div
className={`inline-flex items-center gap-1.5 px-2 py-1 rounded text-xs font-medium ${colorClass}`}
title={tooltipLines.length > 0 ? tooltipLines.join('\n') : undefined}
>
<span className="animate-pulse"></span>
<span className="font-semibold">{step}</span>
{detail && <span className="text-gray-500 truncate max-w-[120px]">- {detail}</span>}
{elapsedStr && <span className="text-gray-400">({elapsedStr})</span>}
</div>
);
}
// Task count badge showing active/max concurrent tasks with task details
function TaskCountBadge({ worker, tasks }: { worker: Worker; tasks: Task[] }) {
const activeCount = worker.active_task_count ?? (worker.current_task_id ? 1 : 0);
const maxCount = worker.max_concurrent_tasks ?? 1;
@@ -359,20 +434,34 @@ function TaskCountBadge({ worker, tasks }: { worker: Worker; tasks: Task[] }) {
return <span className="text-gray-400 text-sm">Idle</span>;
}
// Get task names for tooltip
const taskNames = taskIds.map(id => {
const task = tasks.find(t => t.id === id);
return task ? `#${id}: ${task.role}${task.dispensary_name ? ` (${task.dispensary_name})` : ''}` : `#${id}`;
}).join('\n');
// Get task details for display
const activeTasks = taskIds.map(id => tasks.find(t => t.id === id)).filter(Boolean) as Task[];
// Build tooltip with full details
const tooltipLines = activeTasks.map(task =>
`#${task.id}: ${task.role}${task.dispensary_name ? ` - ${task.dispensary_name}` : ''}`
);
// Show first task details inline
const firstTask = activeTasks[0];
const roleLabel = firstTask?.role?.replace(/_/g, ' ') || 'task';
const storeName = firstTask?.dispensary_name;
return (
<div className="flex items-center gap-2" title={taskNames}>
<div className="flex flex-col gap-0.5" title={tooltipLines.join('\n')}>
<span className="text-sm font-medium text-blue-600">
{activeCount}/{maxCount} tasks
{activeCount}/{maxCount} active
</span>
{firstTask && (
<span className="text-xs text-gray-500 truncate max-w-[140px]">
{roleLabel}{storeName ? `: ${storeName}` : ''}
</span>
{taskIds.length === 1 && (
<span className="text-xs text-gray-500">#{taskIds[0]}</span>
)}
{activeTasks.length > 1 && (
<span className="text-xs text-gray-400">+{activeTasks.length - 1} more</span>
)}
{/* Show current step */}
<StepBadge worker={worker} />
</div>
);
}
@@ -507,6 +596,175 @@ function groupWorkersByPod(workers: Worker[]): Map<string, Worker[]> {
return pods;
}
// Calculate task duration in seconds
function getTaskDuration(task: Task): number | null {
if (!task.started_at) return null;
const start = new Date(task.started_at);
const end = task.completed_at ? new Date(task.completed_at) : new Date();
return Math.round((end.getTime() - start.getTime()) / 1000);
}
// Format duration for display
function formatTaskDuration(seconds: number | null): string {
if (seconds === null) return '-';
if (seconds < 60) return `${seconds}s`;
const mins = Math.floor(seconds / 60);
const secs = seconds % 60;
if (mins < 60) return `${mins}m ${secs}s`;
const hrs = Math.floor(mins / 60);
return `${hrs}h ${mins % 60}m`;
}
// Get friendly worker name from worker_id
function getWorkerShortName(workerId: string | null): string {
if (!workerId) return 'Unknown';
// Extract last part after the hash (e.g., "scraper-worker-75b8b9b5c9-46p4j" -> "46p4j")
const parts = workerId.split('-');
return parts[parts.length - 1] || workerId.slice(-8);
}
// Live Activity Panel - shows recent task completions and failures
function LiveActivityPanel({
recentTasks,
runningTasks
}: {
recentTasks: Task[];
runningTasks: Task[];
}) {
// Combine running and recent completed/failed, sort by most recent activity
const allActivity = [
...runningTasks.map(t => ({ ...t, activityType: 'running' as const })),
...recentTasks.map(t => ({ ...t, activityType: t.status as 'completed' | 'failed' })),
].sort((a, b) => {
const aTime = a.activityType === 'running' ? a.started_at : a.completed_at;
const bTime = b.activityType === 'running' ? b.started_at : b.completed_at;
if (!aTime || !bTime) return 0;
return new Date(bTime).getTime() - new Date(aTime).getTime();
}).slice(0, 15); // Show max 15 items
const getRoleIcon = (role: string) => {
switch (role) {
case 'product_refresh': return '🔄';
case 'product_discovery': return '🔍';
case 'store_discovery': return '🏪';
case 'entry_point_discovery': return '🎯';
case 'analytics_refresh': return '📊';
default: return '📋';
}
};
const getStatusConfig = (status: string) => {
switch (status) {
case 'running':
return { bg: 'bg-blue-50', border: 'border-blue-200', icon: '🔵', text: 'text-blue-700' };
case 'completed':
return { bg: 'bg-emerald-50', border: 'border-emerald-200', icon: '🟢', text: 'text-emerald-700' };
case 'failed':
return { bg: 'bg-red-50', border: 'border-red-200', icon: '🔴', text: 'text-red-700' };
default:
return { bg: 'bg-gray-50', border: 'border-gray-200', icon: '⚪', text: 'text-gray-700' };
}
};
const getResultSummary = (task: Task): string => {
if (!task.result) return '';
const parts: string[] = [];
if (task.result.productsProcessed) parts.push(`${task.result.productsProcessed} products`);
if (task.result.newProducts) parts.push(`${task.result.newProducts} new`);
if (task.result.storesDiscovered) parts.push(`${task.result.storesDiscovered} stores`);
if (task.result.markedOos && task.result.markedOos > 0) parts.push(`${task.result.markedOos} OOS`);
return parts.length > 0 ? ` - ${parts.join(', ')}` : '';
};
return (
<div className="bg-white rounded-lg border border-gray-200 overflow-hidden">
<div className="px-4 py-3 border-b border-gray-200 bg-gray-50">
<div className="flex items-center justify-between">
<h3 className="text-sm font-semibold text-gray-900 flex items-center gap-2">
<Activity className="w-4 h-4 text-blue-500" />
Live Activity
</h3>
<span className="text-xs text-gray-500">
{runningTasks.length} running, {recentTasks.length} recent
</span>
</div>
</div>
<div className="divide-y divide-gray-100 max-h-[400px] overflow-y-auto">
{allActivity.length === 0 ? (
<div className="px-4 py-8 text-center text-gray-500">
<Activity className="w-8 h-8 mx-auto mb-2 text-gray-300" />
<p className="text-sm">No recent activity</p>
</div>
) : (
allActivity.map((task) => {
const config = getStatusConfig(task.activityType);
const duration = getTaskDuration(task);
const workerName = getWorkerShortName(task.worker_id);
return (
<div
key={`${task.id}-${task.activityType}`}
className={`px-4 py-3 ${config.bg} ${task.activityType === 'running' ? 'animate-pulse' : ''}`}
>
<div className="flex items-start gap-3">
<span className="text-lg flex-shrink-0">{config.icon}</span>
<div className="flex-1 min-w-0">
<div className="flex items-center gap-2 flex-wrap">
<span className="text-xs font-medium text-gray-500 bg-gray-100 px-1.5 py-0.5 rounded">
{workerName}
</span>
<span className={`text-sm font-medium ${config.text}`}>
{task.activityType === 'running' ? 'working on' : task.activityType}
</span>
<span className="text-sm text-gray-700">
{getRoleIcon(task.role)} {task.role.replace(/_/g, ' ')}
</span>
</div>
{task.dispensary_name && (
<p className="text-sm text-gray-900 font-medium mt-1 truncate">
{task.dispensary_name}
</p>
)}
<div className="flex items-center gap-3 mt-1 text-xs text-gray-500">
{duration !== null && (
<span className="flex items-center gap-1">
<Timer className="w-3 h-3" />
{formatTaskDuration(duration)}
</span>
)}
{task.activityType === 'completed' && task.result && (
<span className="text-emerald-600 font-medium">
{getResultSummary(task)}
</span>
)}
{task.activityType === 'failed' && task.error_message && (
<span className="text-red-600 truncate max-w-[200px]" title={task.error_message}>
{task.error_message.slice(0, 50)}...
</span>
)}
{task.completed_at && (
<span className="text-gray-400">
{formatRelativeTime(task.completed_at)}
</span>
)}
</div>
</div>
<span className="text-xs text-gray-400 flex-shrink-0">
#{task.id}
</span>
</div>
</div>
);
})
)}
</div>
</div>
);
}
// Format estimated time remaining
function formatEstimatedTime(hours: number): string {
if (hours < 1) {
@@ -524,7 +782,8 @@ function formatEstimatedTime(hours: number): string {
export function WorkersDashboard() {
const [workers, setWorkers] = useState<Worker[]>([]);
const [tasks, setTasks] = useState<Task[]>([]);
const [tasks, setTasks] = useState<Task[]>([]); // Running tasks
const [recentTasks, setRecentTasks] = useState<Task[]>([]); // Recent completed/failed
const [pendingTaskCount, setPendingTaskCount] = useState<number>(0);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
@@ -538,15 +797,28 @@ export function WorkersDashboard() {
const fetchData = useCallback(async () => {
try {
// Fetch workers from registry, running tasks, and task counts
const [workersRes, tasksRes, countsRes] = await Promise.all([
// Fetch workers from registry, running tasks, recent tasks, and task counts
const [workersRes, tasksRes, recentCompletedRes, recentFailedRes, countsRes] = await Promise.all([
api.get('/api/worker-registry/workers'),
api.get('/api/tasks?status=running&limit=100'),
api.get('/api/tasks?status=completed&limit=10'),
api.get('/api/tasks?status=failed&limit=5'),
api.get('/api/tasks/counts'),
]);
setWorkers(workersRes.data.workers || []);
setTasks(tasksRes.data.tasks || []);
// Combine recent completed and failed, sort by completion time
const recentCompleted = recentCompletedRes.data.tasks || [];
const recentFailed = recentFailedRes.data.tasks || [];
const combined = [...recentCompleted, ...recentFailed].sort((a, b) => {
const aTime = a.completed_at ? new Date(a.completed_at).getTime() : 0;
const bTime = b.completed_at ? new Date(b.completed_at).getTime() : 0;
return bTime - aTime;
});
setRecentTasks(combined.slice(0, 15));
setPendingTaskCount(countsRes.data?.pending || 0);
setError(null);
} catch (err: any) {
@@ -764,8 +1036,15 @@ export function WorkersDashboard() {
);
})()}
{/* Worker Pods Visualization */}
<div className="bg-white rounded-lg border border-gray-200 overflow-hidden">
{/* Two Column Layout: Live Activity + Worker Pods */}
<div className="grid grid-cols-1 lg:grid-cols-3 gap-6">
{/* Live Activity Panel - Takes 1/3 width on large screens */}
<div className="lg:col-span-1">
<LiveActivityPanel recentTasks={recentTasks} runningTasks={tasks} />
</div>
{/* Worker Pods Visualization - Takes 2/3 width on large screens */}
<div className="lg:col-span-2 bg-white rounded-lg border border-gray-200 overflow-hidden">
<div className="px-4 py-3 border-b border-gray-200 bg-gray-50">
<div className="flex items-center justify-between">
<div>
@@ -912,6 +1191,7 @@ export function WorkersDashboard() {
</div>
)}
</div>
</div>
{/* Workers Table */}
<div className="bg-white rounded-lg border border-gray-200 overflow-hidden">

View File

@@ -12,7 +12,10 @@ metadata:
name: scraper-worker
namespace: dispensary-scraper
spec:
replicas: 25
# MAX 8 PODS - See CLAUDE.md rule #6
# Each pod runs up to MAX_CONCURRENT_TASKS browsers (~400MB each)
# Scale pods for throughput, not concurrent tasks per pod
replicas: 8
selector:
matchLabels:
app: scraper-worker
@@ -44,6 +47,10 @@ spec:
value: "http://scraper"
- name: NODE_OPTIONS
value: "--max-old-space-size=1500"
# Browser memory limits - see docs/WORKER_TASK_ARCHITECTURE.md
# 3 browsers × ~400MB = ~1.3GB (safe for 2GB pod limit)
- name: MAX_CONCURRENT_TASKS
value: "3"
resources:
requests:
memory: "1Gi"
@@ -61,169 +68,3 @@ spec:
periodSeconds: 30
failureThreshold: 3
terminationGracePeriodSeconds: 60
---
# =============================================================================
# ALTERNATIVE: StatefulSet with multiple workers per pod (not currently used)
# =============================================================================
# Task Worker Pods (StatefulSet)
# Each pod runs 5 role-agnostic workers that pull tasks from worker_tasks queue.
#
# Architecture:
# - Pods are named from a predefined list (Aethelgard, Xylos, etc.)
# - Each pod spawns 5 worker processes
# - Workers register with API and show their pod name
# - HPA scales pods 5-15 based on pending task count
# - Workers use DB-level locking (FOR UPDATE SKIP LOCKED) to prevent conflicts
#
# Pod Names (up to 25):
# Aethelgard, Xylos, Kryll, Coriolis, Dimidium, Veridia, Zetani, Talos IV,
# Onyx, Celestia, Gormand, Betha, Ragnar, Syphon, Axiom, Nadir, Terra Nova,
# Acheron, Nexus, Vespera, Helios Prime, Oasis, Mordina, Cygnus, Umbra
---
apiVersion: v1
kind: ConfigMap
metadata:
name: pod-names
namespace: dispensary-scraper
data:
names: |
Aethelgard
Xylos
Kryll
Coriolis
Dimidium
Veridia
Zetani
Talos IV
Onyx
Celestia
Gormand
Betha
Ragnar
Syphon
Axiom
Nadir
Terra Nova
Acheron
Nexus
Vespera
Helios Prime
Oasis
Mordina
Cygnus
Umbra
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: worker-pod
namespace: dispensary-scraper
spec:
serviceName: worker-pods
replicas: 5
podManagementPolicy: Parallel
selector:
matchLabels:
app: worker-pod
template:
metadata:
labels:
app: worker-pod
spec:
imagePullSecrets:
- name: regcred
containers:
- name: workers
image: code.cannabrands.app/creationshop/dispensary-scraper:latest
# Run 5 workers per pod
command: ["/bin/sh", "-c"]
args:
- |
# Get pod ordinal (0, 1, 2, etc.)
ORDINAL=$(echo $HOSTNAME | rev | cut -d'-' -f1 | rev)
# Get pod name from configmap
POD_NAME=$(sed -n "$((ORDINAL + 1))p" /etc/pod-names/names)
echo "Starting pod: $POD_NAME (ordinal: $ORDINAL)"
# Start 5 workers in this pod
for i in 1 2 3 4 5; do
WORKER_ID="${POD_NAME}-worker-${i}" \
POD_NAME="$POD_NAME" \
node dist/tasks/task-worker.js &
done
# Wait for all workers
wait
envFrom:
- configMapRef:
name: scraper-config
- secretRef:
name: scraper-secrets
env:
- name: API_BASE_URL
value: "http://scraper:3010"
- name: WORKERS_PER_POD
value: "5"
volumeMounts:
- name: pod-names
mountPath: /etc/pod-names
resources:
requests:
memory: "512Mi"
cpu: "200m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
exec:
command:
- /bin/sh
- -c
- "pgrep -f 'task-worker' > /dev/null"
initialDelaySeconds: 15
periodSeconds: 30
failureThreshold: 3
volumes:
- name: pod-names
configMap:
name: pod-names
terminationGracePeriodSeconds: 60
---
# Headless service for StatefulSet
apiVersion: v1
kind: Service
metadata:
name: worker-pods
namespace: dispensary-scraper
spec:
clusterIP: None
selector:
app: worker-pod
ports:
- port: 80
name: placeholder
---
# HPA to scale pods based on pending tasks
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: worker-pod-hpa
namespace: dispensary-scraper
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: StatefulSet
name: worker-pod
minReplicas: 5
maxReplicas: 15
metrics:
- type: External
external:
metric:
name: pending_tasks
selector:
matchLabels:
queue: worker_tasks
target:
type: AverageValue
averageValue: "10"