Compare commits

..

12 Commits

Author SHA1 Message Date
Kelly
92f88fdcd6 fix(workers): Increase max concurrent tasks to 15 and add K8s permission rule
- Change MAX_CONCURRENT_TASKS default from 3 to 15
- Add CLAUDE.md rule requiring explicit permission before kubectl commands

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 10:54:33 -07:00
Kelly
832ef1cf83 feat(scheduler): Immutable schedules and HTTP-only pipeline
## Changes
- **Migration 089**: Add is_immutable and method columns to task_schedules
  - Per-state product_discovery schedules (4h default)
  - Store discovery weekly (168h)
  - All schedules use HTTP transport (Puppeteer/browser)
- **Task Scheduler**: HTTP-only product discovery with per-state scheduling
  - Each state has its own immutable schedule
  - Schedules can be edited (interval/priority) but not deleted
- **TasksDashboard UI**: Full immutability support
  - Lock icon for immutable schedules
  - State and Method columns in schedules table
  - Disabled delete for immutable, restricted edit fields
- **Store Discovery HTTP**: Auto-queue product_discovery for new stores
- **Migration 088**: Discovery payloads storage schema

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 09:24:08 -07:00
Kelly
9a24b4896c feat(tasks): Dual transport handlers and self-healing product_refresh
- Rename product-discovery.ts to product-discovery-curl.ts (axios-based)
- Rename payload-fetch.ts to payload-fetch-curl.ts
- Add product-discovery-http.ts (Puppeteer browser-based handler)
- Add method field to CreateTaskParams for transport selection
- Update task-service to insert method column on task creation
- Update task-worker with getHandlerForTask() for dual transport routing
- product_refresh now queues upstream tasks when no payload exists:
  - Has platform_dispensary_id → queues product_discovery (http)
  - No platform_dispensary_id → queues entry_point_discovery

This enables HTTP workers to pick up browser-based tasks while curl
workers handle axios-based tasks, and prevents product_refresh from
failing repeatedly when no crawl has been performed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 03:02:56 -07:00
Kelly
dd8fce6e35 fix(proxy): Convert non-standard proxy URL format and simplify preflight
- CrawlRotator.getProxyUrl() now converts non-standard format (http://host:port:user:pass) to standard format (http://user:pass@host:port)
- Simplify puppeteer preflight to only use ipify.org for IP verification (much lighter than fingerprint.com)
- Remove heavy anti-detect site tests from preflight - not needed, trust stealth plugin
- Fixes 503 errors when using session-based residential proxies

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 02:13:51 -07:00
Kelly
f82eed4dc3 feat(workers): Add proxy reload, staggered tasks, and bulk proxy import
- Periodic proxy reload: Workers now reload proxies every 60s to pick up changes
- Staggered task scheduling: New API endpoints for creating tasks with delays
- Bulk proxy import: Script supports multiple URL formats including host:port:user:pass
- Proxy URL column: Migration 086 adds proxy_url for non-standard formats

Key changes:
- crawl-rotator.ts: Added reloadIfStale(), isStale(), setReloadInterval()
- task-worker.ts: Calls reloadIfStale() in main loop
- task-service.ts: Added createStaggeredTasks() and createAZStoreTasks()
- tasks.ts: Added POST /batch/staggered and /batch/az-stores endpoints
- import-proxies.ts: New script for bulk proxy import
- CLAUDE.md: Documented staggered task workflow

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 01:53:15 -07:00
Kelly
6490df9faf feat(tasks): Consolidate schedule management into task_schedules
- Add schedule CRUD endpoints to /api/tasks/schedules
- Add Schedules section to TasksDashboard with edit/delete/bulk actions
- Deprecate job_schedules table (entries disabled in DB)
- Mark CrawlSchedulePage as deprecated (removed from menu)
- Add deprecation comments to legacy schedule methods in api.ts
- Add migration comments to workers.ts explaining consolidation

Key changes:
- Schedule management now at /admin/tasks instead of /admin/schedule
- task_schedules uses interval_hours (simpler than base_interval_minutes + jitter)
- All schedule routes placed before /:id to avoid Express route conflicts

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 01:15:21 -07:00
kelly
a077f81c65 Merge pull request 'fix(preflight): Phase 2 - Correct parameter order and add IP/fingerprint reporting' (#56) from feat/preflight-phase2-reporting into master 2025-12-12 07:35:02 +00:00
Kelly
6bcadd9e71 fix(preflight): Correct parameter order and add IP/fingerprint reporting
- Fix update_worker_preflight call to use correct parameter order:
  (worker_id, transport, status, ip, response_ms, error, fingerprint)
- Add proxyIp to both curl and http preflight reports
- Add fingerprint JSONB with timezone, location, and bot detection data
- Log HTTP IP and timezone after preflight completes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 00:32:45 -07:00
kelly
a77bf8611a Merge pull request 'feat(workers): Preflight phase 1 - Schema, StatefulSet, and timezone matching' (#55) from feat/preflight-phase1-schema into master 2025-12-12 07:30:53 +00:00
Kelly
33feca3138 fix(antidetect): Match browser timezone to proxy IP location
- Add IP geolocation lookup via ip-api.com to get timezone from proxy IP
- Use ipify.org API for reliable proxy IP detection (replaces unreliable fingerprint.com scraping)
- Set browser timezone via CDP Emulation.setTimezoneOverride to match proxy location
- Add detectedTimezone and detectedLocation to preflight result
- Add /api/worker-registry/preflight-test endpoint for smoke testing

Fixes timezone mismatch where browser showed America/Phoenix while proxy was in America/New_York

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 00:25:39 -07:00
kelly
7d85a97b63 Merge pull request 'feat: Preflight schema and StatefulSet' (#54) from feat/preflight-phase1-schema into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/54
2025-12-12 07:14:40 +00:00
Kelly
ce081effd4 feat(workers): Add preflight schema and StatefulSet
- Migration 085: Add curl_ip, http_ip, fingerprint_data, preflight_status,
  preflight_at columns to worker_registry
- StatefulSet manifest for 8 persistent workers with OnDelete update strategy

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 23:45:04 -07:00
35 changed files with 4074 additions and 1925 deletions

101
CLAUDE.md
View File

@@ -17,6 +17,35 @@ Never deploy unless user explicitly says: "CLAUDE — DEPLOYMENT IS NOW AUTHORIZ
### 5. DB POOL ONLY
Never import `src/db/migrate.ts` at runtime. Use `src/db/pool.ts` for DB access.
### 6. K8S POD LIMITS — CRITICAL
**MAX 8 PODS** for `scraper-worker` deployment. NEVER EXCEED THIS.
**Pods vs Workers:**
- **Pod** = Kubernetes container instance (MAX 8)
- **Worker** = Concurrent task runner INSIDE a pod (controlled by `MAX_CONCURRENT_TASKS` env var)
- Formula: `8 pods × MAX_CONCURRENT_TASKS = total concurrent workers`
**To increase workers:** Change `MAX_CONCURRENT_TASKS` env var, NOT replicas.
```bash
# CORRECT - increase workers per pod
kubectl set env deployment/scraper-worker -n dispensary-scraper MAX_CONCURRENT_TASKS=5
# WRONG - never scale above 8 replicas
kubectl scale deployment/scraper-worker --replicas=20 # NEVER DO THIS
```
**If K8s API returns ServiceUnavailable:** STOP IMMEDIATELY. Do not retry. The cluster is overloaded.
### 7. K8S REQUIRES EXPLICIT PERMISSION
**NEVER run kubectl commands without explicit user permission.**
Before running ANY `kubectl` command (scale, rollout, set env, delete, apply, etc.):
1. Tell the user what you want to do
2. Wait for explicit approval
3. Only then execute the command
This applies to ALL kubectl operations - even read-only ones like `kubectl get pods`.
---
## Quick Reference
@@ -205,6 +234,78 @@ These binaries mimic real browser TLS fingerprints to avoid detection.
---
## Staggered Task Workflow (Added 2025-12-12)
### Overview
When creating many tasks at once (e.g., product refresh for all AZ stores), staggered scheduling prevents resource contention, proxy assignment lag, and API rate limiting.
### How It Works
```
1. Task created with scheduled_for = NOW() + (index * stagger_seconds)
2. Worker claims task only when scheduled_for <= NOW()
3. Worker runs preflight on EVERY task claim (proxy health check)
4. If preflight passes, worker executes task
5. If preflight fails, task released back to pending for another worker
6. Worker finishes task, polls for next available task
7. Repeat - preflight runs on each new task claim
```
### Key Points
- **Preflight is per-task, not per-startup**: Each task claim triggers a new preflight check
- **Stagger prevents thundering herd**: 15 seconds between tasks is default
- **Task assignment is the trigger**: Worker picks up task → runs preflight → executes if passed
### API Endpoints
```bash
# Create staggered tasks for specific dispensary IDs
POST /api/tasks/batch/staggered
{
"dispensary_ids": [1, 2, 3, 4],
"role": "product_refresh", # or "product_discovery"
"stagger_seconds": 15, # default: 15
"platform": "dutchie", # default: "dutchie"
"method": null # "curl" | "http" | null
}
# Create staggered tasks for AZ stores (convenience endpoint)
POST /api/tasks/batch/az-stores
{
"total_tasks": 24, # default: 24
"stagger_seconds": 15, # default: 15
"split_roles": true # default: true (12 refresh, 12 discovery)
}
```
### Example: 24 Tasks for AZ Stores
```bash
curl -X POST http://localhost:3010/api/tasks/batch/az-stores \
-H "Content-Type: application/json" \
-d '{"total_tasks": 24, "stagger_seconds": 15, "split_roles": true}'
```
Response:
```json
{
"success": true,
"total": 24,
"product_refresh": 12,
"product_discovery": 12,
"stagger_seconds": 15,
"total_duration_seconds": 345,
"estimated_completion": "2025-12-12T08:40:00.000Z",
"message": "Created 24 staggered tasks for AZ stores (12 refresh, 12 discovery)"
}
```
### Related Files
| File | Purpose |
|------|---------|
| `src/tasks/task-service.ts` | `createStaggeredTasks()` and `createAZStoreTasks()` methods |
| `src/routes/tasks.ts` | API endpoints for batch task creation |
| `src/tasks/task-worker.ts` | Worker task claiming and preflight logic |
---
## Documentation
| Doc | Purpose |

View File

@@ -1,343 +0,0 @@
# CannaiQ Query API
Query raw crawl payload data with flexible filters, sorting, and aggregation.
## Base URL
```
https://cannaiq.co/api/payloads
```
## Authentication
Include your API key in the header:
```
X-API-Key: your-api-key
```
---
## Endpoints
### 1. Query Products
Filter and search products from a store's latest crawl data.
```
GET /api/payloads/store/{dispensaryId}/query
```
#### Query Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `brand` | string | Filter by brand name (partial match) |
| `category` | string | Filter by category (flower, vape, edible, etc.) |
| `subcategory` | string | Filter by subcategory |
| `strain_type` | string | Filter by strain (indica, sativa, hybrid, cbd) |
| `in_stock` | boolean | Filter by stock status (true/false) |
| `price_min` | number | Minimum price |
| `price_max` | number | Maximum price |
| `thc_min` | number | Minimum THC percentage |
| `thc_max` | number | Maximum THC percentage |
| `search` | string | Search product name (partial match) |
| `fields` | string | Comma-separated fields to return |
| `limit` | number | Max results (default 100, max 1000) |
| `offset` | number | Skip results for pagination |
| `sort` | string | Sort by: name, price, thc, brand |
| `order` | string | Sort order: asc, desc |
#### Available Fields
When using `fields` parameter, you can request:
- `id` - Product ID
- `name` - Product name
- `brand` - Brand name
- `category` - Product category
- `subcategory` - Product subcategory
- `strain_type` - Indica/Sativa/Hybrid/CBD
- `price` - Current price
- `price_med` - Medical price
- `price_rec` - Recreational price
- `thc` - THC percentage
- `cbd` - CBD percentage
- `weight` - Product weight/size
- `status` - Stock status
- `in_stock` - Boolean in-stock flag
- `image_url` - Product image
- `description` - Product description
#### Examples
**Get all flower products under $40:**
```
GET /api/payloads/store/112/query?category=flower&price_max=40
```
**Search for "Blue Dream" with high THC:**
```
GET /api/payloads/store/112/query?search=blue+dream&thc_min=20
```
**Get only name and price for Alien Labs products:**
```
GET /api/payloads/store/112/query?brand=Alien+Labs&fields=name,price,thc
```
**Get top 10 highest THC products:**
```
GET /api/payloads/store/112/query?sort=thc&order=desc&limit=10
```
**Paginate through in-stock products:**
```
GET /api/payloads/store/112/query?in_stock=true&limit=50&offset=0
GET /api/payloads/store/112/query?in_stock=true&limit=50&offset=50
```
#### Response
```json
{
"success": true,
"dispensaryId": 112,
"payloadId": 45,
"fetchedAt": "2025-12-11T10:30:00Z",
"query": {
"filters": {
"brand": "Alien Labs",
"category": null,
"price_max": null
},
"sort": "price",
"order": "asc",
"limit": 100,
"offset": 0
},
"pagination": {
"total": 15,
"returned": 15,
"limit": 100,
"offset": 0,
"has_more": false
},
"products": [
{
"id": "507f1f77bcf86cd799439011",
"name": "Alien Labs - Baklava 3.5g",
"brand": "Alien Labs",
"category": "flower",
"strain_type": "hybrid",
"price": 55,
"thc": "28.5",
"in_stock": true
}
]
}
```
---
### 2. Aggregate Data
Group products and calculate metrics.
```
GET /api/payloads/store/{dispensaryId}/aggregate
```
#### Query Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `group_by` | string | **Required.** Field to group by: brand, category, subcategory, strain_type |
| `metrics` | string | Comma-separated metrics (default: count) |
#### Available Metrics
- `count` - Number of products
- `avg_price` - Average price
- `min_price` - Lowest price
- `max_price` - Highest price
- `avg_thc` - Average THC percentage
- `in_stock_count` - Number of in-stock products
#### Examples
**Count products by brand:**
```
GET /api/payloads/store/112/aggregate?group_by=brand
```
**Get price stats by category:**
```
GET /api/payloads/store/112/aggregate?group_by=category&metrics=count,avg_price,min_price,max_price
```
**Get THC averages by strain type:**
```
GET /api/payloads/store/112/aggregate?group_by=strain_type&metrics=count,avg_thc
```
**Brand analysis with stock info:**
```
GET /api/payloads/store/112/aggregate?group_by=brand&metrics=count,avg_price,in_stock_count
```
#### Response
```json
{
"success": true,
"dispensaryId": 112,
"payloadId": 45,
"fetchedAt": "2025-12-11T10:30:00Z",
"groupBy": "brand",
"metrics": ["count", "avg_price"],
"totalProducts": 450,
"groupCount": 85,
"aggregations": [
{
"brand": "Alien Labs",
"count": 15,
"avg_price": 52.33
},
{
"brand": "Connected",
"count": 12,
"avg_price": 48.50
}
]
}
```
---
### 3. Compare Stores (Price Comparison)
Query the same data from multiple stores and compare in your app:
```javascript
// Get flower prices from Store A
const storeA = await fetch('/api/payloads/store/112/query?category=flower&fields=name,brand,price');
// Get flower prices from Store B
const storeB = await fetch('/api/payloads/store/115/query?category=flower&fields=name,brand,price');
// Compare in your app
const dataA = await storeA.json();
const dataB = await storeB.json();
// Find matching products and compare prices
```
---
### 4. Price History
For historical price data, use the snapshots endpoint:
```
GET /api/v1/products/{productId}/history?days=30
```
Or compare payloads over time:
```
GET /api/payloads/store/{dispensaryId}/diff?from={payloadId1}&to={payloadId2}
```
The diff endpoint shows:
- Products added
- Products removed
- Price changes
- Stock changes
---
### 5. List Stores
Get available dispensaries to query:
```
GET /api/stores
```
Returns all stores with their IDs, names, and locations.
---
## Use Cases
### Price Comparison App
```javascript
// 1. Get stores in Arizona
const stores = await fetch('/api/stores?state=AZ').then(r => r.json());
// 2. Query flower prices from each store
const prices = await Promise.all(
stores.map(store =>
fetch(`/api/payloads/store/${store.id}/query?category=flower&fields=name,brand,price`)
.then(r => r.json())
)
);
// 3. Build comparison matrix in your app
```
### Brand Analytics Dashboard
```javascript
// Get brand presence across stores
const brandData = await Promise.all(
storeIds.map(id =>
fetch(`/api/payloads/store/${id}/aggregate?group_by=brand&metrics=count,avg_price`)
.then(r => r.json())
)
);
// Aggregate brand presence across all stores
```
### Deal Finder
```javascript
// Find high-THC flower under $30
const deals = await fetch(
'/api/payloads/store/112/query?category=flower&price_max=30&thc_min=20&in_stock=true&sort=thc&order=desc'
).then(r => r.json());
```
### Inventory Tracker
```javascript
// Get products that went out of stock
const diff = await fetch('/api/payloads/store/112/diff').then(r => r.json());
const outOfStock = diff.details.stockChanges.filter(
p => p.newStatus !== 'Active'
);
```
---
## Rate Limits
- Default: 100 requests/minute per API key
- Contact support for higher limits
## Error Responses
```json
{
"success": false,
"error": "Error message here"
}
```
Common errors:
- `404` - Store or payload not found
- `400` - Missing required parameter
- `401` - Invalid or missing API key
- `429` - Rate limit exceeded

View File

@@ -0,0 +1,77 @@
apiVersion: v1
kind: Service
metadata:
name: scraper-worker
namespace: dispensary-scraper
labels:
app: scraper-worker
spec:
clusterIP: None # Headless service required for StatefulSet
selector:
app: scraper-worker
ports:
- port: 3010
name: http
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: scraper-worker
namespace: dispensary-scraper
spec:
serviceName: scraper-worker
replicas: 8
podManagementPolicy: Parallel # Start all pods at once
updateStrategy:
type: OnDelete # Pods only update when manually deleted - no automatic restarts
selector:
matchLabels:
app: scraper-worker
template:
metadata:
labels:
app: scraper-worker
spec:
terminationGracePeriodSeconds: 60
imagePullSecrets:
- name: regcred
containers:
- name: worker
image: code.cannabrands.app/creationshop/dispensary-scraper:latest
imagePullPolicy: Always
command: ["node"]
args: ["dist/tasks/task-worker.js"]
env:
- name: WORKER_MODE
value: "true"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: MAX_CONCURRENT_TASKS
value: "50"
- name: API_BASE_URL
value: http://scraper
- name: NODE_OPTIONS
value: --max-old-space-size=1500
envFrom:
- configMapRef:
name: scraper-config
- secretRef:
name: scraper-secrets
resources:
requests:
cpu: 100m
memory: 1Gi
limits:
cpu: 500m
memory: 2Gi
livenessProbe:
exec:
command:
- /bin/sh
- -c
- pgrep -f 'task-worker' > /dev/null
initialDelaySeconds: 10
periodSeconds: 30
failureThreshold: 3

View File

@@ -0,0 +1,168 @@
-- Migration 085: Add IP and fingerprint columns for preflight reporting
-- These columns were missing from migration 084
-- ===================================================================
-- PART 1: Add IP address columns to worker_registry
-- ===================================================================
-- IP address detected during curl/axios preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS curl_ip VARCHAR(45);
-- IP address detected during http/Puppeteer preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS http_ip VARCHAR(45);
-- ===================================================================
-- PART 2: Add fingerprint data column
-- ===================================================================
-- Browser fingerprint data captured during Puppeteer preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS fingerprint_data JSONB;
-- ===================================================================
-- PART 3: Add combined preflight status/timestamp for convenience
-- ===================================================================
-- Overall preflight status (computed from both transports)
-- Values: 'pending', 'passed', 'partial', 'failed'
-- - 'pending': neither transport tested
-- - 'passed': both transports passed (or http passed for browser-only)
-- - 'partial': at least one passed
-- - 'failed': no transport passed
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_status VARCHAR(20) DEFAULT 'pending';
-- Most recent preflight completion timestamp
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_at TIMESTAMPTZ;
-- ===================================================================
-- PART 4: Update function to set preflight status
-- ===================================================================
CREATE OR REPLACE FUNCTION update_worker_preflight(
p_worker_id VARCHAR(100),
p_transport VARCHAR(10), -- 'curl' or 'http'
p_status VARCHAR(20), -- 'passed', 'failed', 'skipped'
p_ip VARCHAR(45) DEFAULT NULL,
p_response_ms INTEGER DEFAULT NULL,
p_error TEXT DEFAULT NULL,
p_fingerprint JSONB DEFAULT NULL
) RETURNS VOID AS $$
DECLARE
v_curl_status VARCHAR(20);
v_http_status VARCHAR(20);
v_overall_status VARCHAR(20);
BEGIN
IF p_transport = 'curl' THEN
UPDATE worker_registry
SET
preflight_curl_status = p_status,
preflight_curl_at = NOW(),
preflight_curl_ms = p_response_ms,
preflight_curl_error = p_error,
curl_ip = p_ip,
updated_at = NOW()
WHERE worker_id = p_worker_id;
ELSIF p_transport = 'http' THEN
UPDATE worker_registry
SET
preflight_http_status = p_status,
preflight_http_at = NOW(),
preflight_http_ms = p_response_ms,
preflight_http_error = p_error,
http_ip = p_ip,
fingerprint_data = COALESCE(p_fingerprint, fingerprint_data),
updated_at = NOW()
WHERE worker_id = p_worker_id;
END IF;
-- Update overall preflight status
SELECT preflight_curl_status, preflight_http_status
INTO v_curl_status, v_http_status
FROM worker_registry
WHERE worker_id = p_worker_id;
-- Compute overall status
IF v_curl_status = 'passed' AND v_http_status = 'passed' THEN
v_overall_status := 'passed';
ELSIF v_curl_status = 'passed' OR v_http_status = 'passed' THEN
v_overall_status := 'partial';
ELSIF v_curl_status = 'failed' OR v_http_status = 'failed' THEN
v_overall_status := 'failed';
ELSE
v_overall_status := 'pending';
END IF;
UPDATE worker_registry
SET
preflight_status = v_overall_status,
preflight_at = NOW()
WHERE worker_id = p_worker_id;
END;
$$ LANGUAGE plpgsql;
-- ===================================================================
-- PART 5: Update v_active_workers view
-- ===================================================================
DROP VIEW IF EXISTS v_active_workers;
CREATE VIEW v_active_workers AS
SELECT
wr.id,
wr.worker_id,
wr.friendly_name,
wr.role,
wr.status,
wr.pod_name,
wr.hostname,
wr.started_at,
wr.last_heartbeat_at,
wr.last_task_at,
wr.tasks_completed,
wr.tasks_failed,
wr.current_task_id,
-- IP addresses from preflights
wr.curl_ip,
wr.http_ip,
-- Combined preflight status
wr.preflight_status,
wr.preflight_at,
-- Detailed preflight status per transport
wr.preflight_curl_status,
wr.preflight_http_status,
wr.preflight_curl_at,
wr.preflight_http_at,
wr.preflight_curl_error,
wr.preflight_http_error,
wr.preflight_curl_ms,
wr.preflight_http_ms,
-- Fingerprint data
wr.fingerprint_data,
-- Computed fields
EXTRACT(EPOCH FROM (NOW() - wr.last_heartbeat_at)) as seconds_since_heartbeat,
CASE
WHEN wr.status = 'offline' THEN 'offline'
WHEN wr.last_heartbeat_at < NOW() - INTERVAL '2 minutes' THEN 'stale'
WHEN wr.current_task_id IS NOT NULL THEN 'busy'
ELSE 'ready'
END as health_status,
-- Capability flags (can this worker handle curl/http tasks?)
(wr.preflight_curl_status = 'passed') as can_curl,
(wr.preflight_http_status = 'passed') as can_http
FROM worker_registry wr
WHERE wr.status != 'terminated'
ORDER BY wr.status = 'active' DESC, wr.last_heartbeat_at DESC;
-- ===================================================================
-- Comments
-- ===================================================================
COMMENT ON COLUMN worker_registry.curl_ip IS 'IP address detected during curl/axios preflight';
COMMENT ON COLUMN worker_registry.http_ip IS 'IP address detected during Puppeteer preflight';
COMMENT ON COLUMN worker_registry.fingerprint_data IS 'Browser fingerprint captured during Puppeteer preflight';
COMMENT ON COLUMN worker_registry.preflight_status IS 'Overall preflight status: pending, passed, partial, failed';
COMMENT ON COLUMN worker_registry.preflight_at IS 'Most recent preflight completion timestamp';

View File

@@ -1,59 +0,0 @@
-- Migration 085: Trusted Origins Management
-- Allows admin to manage trusted IPs and domains via UI instead of hardcoded values
-- Trusted origins table (IPs and domains that bypass API key auth)
CREATE TABLE IF NOT EXISTS trusted_origins (
id SERIAL PRIMARY KEY,
-- Origin type: 'ip', 'domain', 'pattern'
origin_type VARCHAR(20) NOT NULL CHECK (origin_type IN ('ip', 'domain', 'pattern')),
-- The actual value
-- For ip: '127.0.0.1', '::1', '192.168.1.0/24'
-- For domain: 'cannaiq.co', 'findadispo.com'
-- For pattern: '^https://.*\.cannabrands\.app$' (regex)
origin_value VARCHAR(255) NOT NULL,
-- Description for admin reference
description TEXT,
-- Active flag
active BOOLEAN DEFAULT true,
-- Audit
created_at TIMESTAMPTZ DEFAULT NOW(),
created_by INTEGER REFERENCES users(id),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(origin_type, origin_value)
);
-- Index for quick lookups
CREATE INDEX IF NOT EXISTS idx_trusted_origins_active ON trusted_origins(active) WHERE active = true;
CREATE INDEX IF NOT EXISTS idx_trusted_origins_type ON trusted_origins(origin_type, active);
-- Seed with current hardcoded values
INSERT INTO trusted_origins (origin_type, origin_value, description) VALUES
-- Trusted IPs (localhost)
('ip', '127.0.0.1', 'Localhost IPv4'),
('ip', '::1', 'Localhost IPv6'),
('ip', '::ffff:127.0.0.1', 'Localhost IPv4-mapped IPv6'),
-- Trusted domains
('domain', 'cannaiq.co', 'CannaiQ production'),
('domain', 'www.cannaiq.co', 'CannaiQ production (www)'),
('domain', 'findadispo.com', 'FindADispo production'),
('domain', 'www.findadispo.com', 'FindADispo production (www)'),
('domain', 'findagram.co', 'Findagram production'),
('domain', 'www.findagram.co', 'Findagram production (www)'),
('domain', 'localhost:3010', 'Local backend dev'),
('domain', 'localhost:8080', 'Local admin dev'),
('domain', 'localhost:5173', 'Local Vite dev'),
-- Pattern-based (regex)
('pattern', '^https://.*\.cannabrands\.app$', 'All cannabrands.app subdomains'),
('pattern', '^https://.*\.cannaiq\.co$', 'All cannaiq.co subdomains')
ON CONFLICT (origin_type, origin_value) DO NOTHING;
-- Add comment
COMMENT ON TABLE trusted_origins IS 'IPs and domains that bypass API key authentication. Managed via /admin.';

View File

@@ -0,0 +1,10 @@
-- Migration 086: Add proxy_url column for alternative URL formats
-- Some proxy providers use non-standard URL formats (e.g., host:port:user:pass)
-- This column allows storing the raw URL directly
-- Add proxy_url column - if set, used directly instead of constructing from parts
ALTER TABLE proxies
ADD COLUMN IF NOT EXISTS proxy_url TEXT;
-- Add comment
COMMENT ON COLUMN proxies.proxy_url IS 'Raw proxy URL (if provider uses non-standard format). Takes precedence over constructed URL from host/port/user/pass.';

View File

@@ -0,0 +1,30 @@
-- Migration 088: Extend raw_crawl_payloads for discovery payloads
--
-- Enables saving raw store data from Dutchie discovery crawls.
-- Store discovery returns raw dispensary objects - save them for historical analysis.
-- Add payload_type to distinguish product crawls from discovery crawls
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS payload_type VARCHAR(32) NOT NULL DEFAULT 'product';
-- Add state_code for discovery payloads (null for product payloads)
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS state_code VARCHAR(10);
-- Add store_count for discovery payloads (alternative to product_count)
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS store_count INTEGER;
-- Make dispensary_id nullable for discovery payloads
ALTER TABLE raw_crawl_payloads
ALTER COLUMN dispensary_id DROP NOT NULL;
-- Add index for discovery payload queries
CREATE INDEX IF NOT EXISTS idx_raw_crawl_payloads_type_state
ON raw_crawl_payloads(payload_type, state_code)
WHERE payload_type = 'store_discovery';
-- Comments
COMMENT ON COLUMN raw_crawl_payloads.payload_type IS 'Type: product (default), store_discovery';
COMMENT ON COLUMN raw_crawl_payloads.state_code IS 'State code for discovery payloads (e.g., AZ, MI)';
COMMENT ON COLUMN raw_crawl_payloads.store_count IS 'Number of stores in discovery payload';

View File

@@ -0,0 +1,105 @@
-- Migration 089: Immutable Schedules with Per-State Product Discovery
--
-- Key changes:
-- 1. Add is_immutable column - schedules can be edited but not deleted
-- 2. Add method column - all tasks use 'http' (Puppeteer transport)
-- 3. Store discovery weekly (168h)
-- 4. Per-state product_discovery schedules (4h default)
-- 5. Remove old payload_fetch schedules
-- =====================================================
-- 1) Add new columns to task_schedules
-- =====================================================
ALTER TABLE task_schedules
ADD COLUMN IF NOT EXISTS is_immutable BOOLEAN DEFAULT FALSE;
ALTER TABLE task_schedules
ADD COLUMN IF NOT EXISTS method VARCHAR(10) DEFAULT 'http';
-- =====================================================
-- 2) Update store_discovery to weekly and immutable
-- =====================================================
UPDATE task_schedules
SET interval_hours = 168, -- 7 days
is_immutable = TRUE,
method = 'http',
description = 'Discover new Dutchie stores weekly (HTTP transport)'
WHERE name = 'store_discovery_dutchie';
-- Insert if doesn't exist
INSERT INTO task_schedules (name, role, interval_hours, priority, description, is_immutable, method, platform, next_run_at)
VALUES ('store_discovery_dutchie', 'store_discovery', 168, 5, 'Discover new Dutchie stores weekly (HTTP transport)', TRUE, 'http', 'dutchie', NOW())
ON CONFLICT (name) DO UPDATE SET
interval_hours = 168,
is_immutable = TRUE,
method = 'http',
description = 'Discover new Dutchie stores weekly (HTTP transport)';
-- =====================================================
-- 3) Remove old payload_fetch and product_refresh_all schedules
-- =====================================================
DELETE FROM task_schedules WHERE name IN ('payload_fetch_all', 'product_refresh_all');
-- =====================================================
-- 4) Create per-state product_discovery schedules
-- =====================================================
-- One schedule per state that has dispensaries with active cannabis programs
INSERT INTO task_schedules (name, role, state_code, interval_hours, priority, description, is_immutable, method, enabled, next_run_at)
SELECT
'product_discovery_' || lower(s.code) AS name,
'product_discovery' AS role,
s.code AS state_code,
4 AS interval_hours, -- 4 hours default, editable
10 AS priority,
'Product discovery for ' || s.name || ' dispensaries (HTTP transport)' AS description,
TRUE AS is_immutable, -- Can edit but not delete
'http' AS method,
CASE WHEN s.is_active THEN TRUE ELSE FALSE END AS enabled,
-- Stagger start times: each state starts 5 minutes after the previous
NOW() + (ROW_NUMBER() OVER (ORDER BY s.code) * INTERVAL '5 minutes') AS next_run_at
FROM states s
WHERE EXISTS (
SELECT 1 FROM dispensaries d
WHERE d.state_id = s.id AND d.crawl_enabled = true
)
ON CONFLICT (name) DO UPDATE SET
is_immutable = TRUE,
method = 'http',
description = EXCLUDED.description;
-- Also create schedules for states that might have stores discovered later
INSERT INTO task_schedules (name, role, state_code, interval_hours, priority, description, is_immutable, method, enabled, next_run_at)
SELECT
'product_discovery_' || lower(s.code) AS name,
'product_discovery' AS role,
s.code AS state_code,
4 AS interval_hours,
10 AS priority,
'Product discovery for ' || s.name || ' dispensaries (HTTP transport)' AS description,
TRUE AS is_immutable,
'http' AS method,
FALSE AS enabled, -- Disabled until stores exist
NOW() + INTERVAL '1 hour'
FROM states s
WHERE NOT EXISTS (
SELECT 1 FROM task_schedules ts WHERE ts.name = 'product_discovery_' || lower(s.code)
)
ON CONFLICT (name) DO NOTHING;
-- =====================================================
-- 5) Make analytics_refresh immutable
-- =====================================================
UPDATE task_schedules
SET is_immutable = TRUE, method = 'http'
WHERE name = 'analytics_refresh';
-- =====================================================
-- 6) Add index for schedule lookups
-- =====================================================
CREATE INDEX IF NOT EXISTS idx_task_schedules_state_code
ON task_schedules(state_code)
WHERE state_code IS NOT NULL;
-- Comments
COMMENT ON COLUMN task_schedules.is_immutable IS 'If TRUE, schedule cannot be deleted (only edited)';
COMMENT ON COLUMN task_schedules.method IS 'Transport method: http (Puppeteer/browser) or curl (axios)';

View File

@@ -35,8 +35,6 @@
"puppeteer-extra-plugin-stealth": "^2.11.2",
"sharp": "^0.32.0",
"socks-proxy-agent": "^8.0.2",
"swagger-jsdoc": "^6.2.8",
"swagger-ui-express": "^5.0.1",
"user-agents": "^1.1.669",
"uuid": "^9.0.1",
"zod": "^3.22.4"
@@ -49,53 +47,11 @@
"@types/node": "^20.10.5",
"@types/node-cron": "^3.0.11",
"@types/pg": "^8.15.6",
"@types/swagger-jsdoc": "^6.0.4",
"@types/swagger-ui-express": "^4.1.8",
"@types/uuid": "^9.0.7",
"tsx": "^4.7.0",
"typescript": "^5.3.3"
}
},
"node_modules/@apidevtools/json-schema-ref-parser": {
"version": "9.1.2",
"resolved": "https://registry.npmjs.org/@apidevtools/json-schema-ref-parser/-/json-schema-ref-parser-9.1.2.tgz",
"integrity": "sha512-r1w81DpR+KyRWd3f+rk6TNqMgedmAxZP5v5KWlXQWlgMUUtyEJch0DKEci1SorPMiSeM8XPl7MZ3miJ60JIpQg==",
"dependencies": {
"@jsdevtools/ono": "^7.1.3",
"@types/json-schema": "^7.0.6",
"call-me-maybe": "^1.0.1",
"js-yaml": "^4.1.0"
}
},
"node_modules/@apidevtools/openapi-schemas": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/@apidevtools/openapi-schemas/-/openapi-schemas-2.1.0.tgz",
"integrity": "sha512-Zc1AlqrJlX3SlpupFGpiLi2EbteyP7fXmUOGup6/DnkRgjP9bgMM/ag+n91rsv0U1Gpz0H3VILA/o3bW7Ua6BQ==",
"engines": {
"node": ">=10"
}
},
"node_modules/@apidevtools/swagger-methods": {
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/@apidevtools/swagger-methods/-/swagger-methods-3.0.2.tgz",
"integrity": "sha512-QAkD5kK2b1WfjDS/UQn/qQkbwF31uqRjPTrsCs5ZG9BQGAkjwvqGFjjPqAuzac/IYzpPtRzjCP1WrTuAIjMrXg=="
},
"node_modules/@apidevtools/swagger-parser": {
"version": "10.0.3",
"resolved": "https://registry.npmjs.org/@apidevtools/swagger-parser/-/swagger-parser-10.0.3.tgz",
"integrity": "sha512-sNiLY51vZOmSPFZA5TF35KZ2HbgYklQnTSDnkghamzLb3EkNtcQnrBQEj5AOCxHpTtXpqMCRM1CrmV2rG6nw4g==",
"dependencies": {
"@apidevtools/json-schema-ref-parser": "^9.0.6",
"@apidevtools/openapi-schemas": "^2.0.4",
"@apidevtools/swagger-methods": "^3.0.2",
"@jsdevtools/ono": "^7.1.3",
"call-me-maybe": "^1.0.1",
"z-schema": "^5.0.1"
},
"peerDependencies": {
"openapi-types": ">=7"
}
},
"node_modules/@babel/code-frame": {
"version": "7.27.1",
"resolved": "https://registry.npmjs.org/@babel/code-frame/-/code-frame-7.27.1.tgz",
@@ -538,11 +494,6 @@
"resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.4.0.tgz",
"integrity": "sha512-aFT2yemJJo+TZCmieA7qnYGQooOS7QfNmYrzGtsYd3g9j5iDP8AimYYAesf79ohjbLG12XxC4nG5DyEnC88AsQ=="
},
"node_modules/@jsdevtools/ono": {
"version": "7.1.3",
"resolved": "https://registry.npmjs.org/@jsdevtools/ono/-/ono-7.1.3.tgz",
"integrity": "sha512-4JQNk+3mVzK3xh2rqd6RB4J46qUR19azEHBneZyTZM+c456qOrbbM/5xcR8huNCCcbVt7+UmizG6GuUvPvKUYg=="
},
"node_modules/@jsep-plugin/assignment": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/@jsep-plugin/assignment/-/assignment-1.3.0.tgz",
@@ -810,12 +761,6 @@
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
},
"node_modules/@scarf/scarf": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/@scarf/scarf/-/scarf-1.4.0.tgz",
"integrity": "sha512-xxeapPiUXdZAE3che6f3xogoJPeZgig6omHEy1rIY5WVsB3H2BHNnZH+gHG6x91SCWyQCzWGsuL2Hh3ClO5/qQ==",
"hasInstallScript": true
},
"node_modules/@tootallnate/quickjs-emscripten": {
"version": "0.23.0",
"resolved": "https://registry.npmjs.org/@tootallnate/quickjs-emscripten/-/quickjs-emscripten-0.23.0.tgz",
@@ -910,11 +855,6 @@
"resolved": "https://registry.npmjs.org/@types/js-yaml/-/js-yaml-4.0.9.tgz",
"integrity": "sha512-k4MGaQl5TGo/iipqb2UDG2UwjXziSWkh0uysQelTlJpX1qGlpUZYm8PnO4DxG1qBomtJUdYJ6qR6xdIah10JLg=="
},
"node_modules/@types/json-schema": {
"version": "7.0.15",
"resolved": "https://registry.npmjs.org/@types/json-schema/-/json-schema-7.0.15.tgz",
"integrity": "sha512-5+fP8P8MFNC+AyZCDxrB2pkZFPGzqQWUzpSeuuVLvm8VMcorNYavBqoFcxK8bQz4Qsbn4oUEEem4wDLfcysGHA=="
},
"node_modules/@types/jsonwebtoken": {
"version": "9.0.10",
"resolved": "https://registry.npmjs.org/@types/jsonwebtoken/-/jsonwebtoken-9.0.10.tgz",
@@ -1020,22 +960,6 @@
"@types/node": "*"
}
},
"node_modules/@types/swagger-jsdoc": {
"version": "6.0.4",
"resolved": "https://registry.npmjs.org/@types/swagger-jsdoc/-/swagger-jsdoc-6.0.4.tgz",
"integrity": "sha512-W+Xw5epcOZrF/AooUM/PccNMSAFOKWZA5dasNyMujTwsBkU74njSJBpvCCJhHAJ95XRMzQrrW844Btu0uoetwQ==",
"dev": true
},
"node_modules/@types/swagger-ui-express": {
"version": "4.1.8",
"resolved": "https://registry.npmjs.org/@types/swagger-ui-express/-/swagger-ui-express-4.1.8.tgz",
"integrity": "sha512-AhZV8/EIreHFmBV5wAs0gzJUNq9JbbSXgJLQubCC0jtIo6prnI9MIRRxnU4MZX9RB9yXxF1V4R7jtLl/Wcj31g==",
"dev": true,
"dependencies": {
"@types/express": "*",
"@types/serve-static": "*"
}
},
"node_modules/@types/uuid": {
"version": "9.0.8",
"resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.8.tgz",
@@ -1510,11 +1434,6 @@
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/call-me-maybe": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/call-me-maybe/-/call-me-maybe-1.0.2.tgz",
"integrity": "sha512-HpX65o1Hnr9HH25ojC1YGs7HCQLq0GCOibSaWER0eNpgJ/Z1MZv2mTc7+xh6WOPxbRVcmgbv4hGU+uSQ/2xFZQ=="
},
"node_modules/callsites": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/callsites/-/callsites-3.1.0.tgz",
@@ -1675,14 +1594,6 @@
"node": ">= 0.8"
}
},
"node_modules/commander": {
"version": "6.2.0",
"resolved": "https://registry.npmjs.org/commander/-/commander-6.2.0.tgz",
"integrity": "sha512-zP4jEKbe8SHzKJYQmq8Y9gYjtO/POJLgIdKgV7B9qNmABVFVc+ctqSX6iXh4mCpJfRBOabiZ2YKPg8ciDw6C+Q==",
"engines": {
"node": ">= 6"
}
},
"node_modules/concat-map": {
"version": "0.0.1",
"resolved": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz",
@@ -1952,17 +1863,6 @@
"resolved": "https://registry.npmjs.org/devtools-protocol/-/devtools-protocol-0.0.1232444.tgz",
"integrity": "sha512-pM27vqEfxSxRkTMnF+XCmxSEb6duO5R+t8A9DEEJgy4Wz2RVanje2mmj99B6A3zv2r/qGfYlOvYznUhuokizmg=="
},
"node_modules/doctrine": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/doctrine/-/doctrine-3.0.0.tgz",
"integrity": "sha512-yS+Q5i3hBf7GBkd4KG8a7eBNNWNGLTaEwwYWUijIYM7zrlYDM0BFXHjjPWlWZ1Rg7UaddZeIDmi9jF3HmqiQ2w==",
"dependencies": {
"esutils": "^2.0.2"
},
"engines": {
"node": ">=6.0.0"
}
},
"node_modules/dom-serializer": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/dom-serializer/-/dom-serializer-2.0.0.tgz",
@@ -3358,12 +3258,6 @@
"resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz",
"integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ=="
},
"node_modules/lodash.get": {
"version": "4.4.2",
"resolved": "https://registry.npmjs.org/lodash.get/-/lodash.get-4.4.2.tgz",
"integrity": "sha512-z+Uw/vLuy6gQe8cfaFWD7p0wVv8fJl3mbzXh33RS+0oW2wvUqiRXiQ69gLWSLpgB5/6sU+r6BlQR0MBILadqTQ==",
"deprecated": "This package is deprecated. Use the optional chaining (?.) operator instead."
},
"node_modules/lodash.includes": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz",
@@ -3379,12 +3273,6 @@
"resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz",
"integrity": "sha512-Bz5mupy2SVbPHURB98VAcw+aHh4vRV5IPNhILUCsOzRmsTmSQ17jIuqopAentWoehktxGd9e/hbIXq980/1QJg=="
},
"node_modules/lodash.isequal": {
"version": "4.5.0",
"resolved": "https://registry.npmjs.org/lodash.isequal/-/lodash.isequal-4.5.0.tgz",
"integrity": "sha512-pDo3lu8Jhfjqls6GkMgpahsF9kCyayhgykjyLMNFTKWrpVdAQtYyB4muAMWozBB4ig/dtWAmsMxLEI8wuz+DYQ==",
"deprecated": "This package is deprecated. Use require('node:util').isDeepStrictEqual instead."
},
"node_modules/lodash.isinteger": {
"version": "4.0.4",
"resolved": "https://registry.npmjs.org/lodash.isinteger/-/lodash.isinteger-4.0.4.tgz",
@@ -3405,11 +3293,6 @@
"resolved": "https://registry.npmjs.org/lodash.isstring/-/lodash.isstring-4.0.1.tgz",
"integrity": "sha512-0wJxfxH1wgO3GrbuP+dTTk7op+6L41QCXbGINEmD+ny/G/eCqGzxyCsh7159S+mgDDcoarnBw6PC1PS5+wUGgw=="
},
"node_modules/lodash.mergewith": {
"version": "4.6.2",
"resolved": "https://registry.npmjs.org/lodash.mergewith/-/lodash.mergewith-4.6.2.tgz",
"integrity": "sha512-GK3g5RPZWTRSeLSpgP8Xhra+pnjBC56q9FZYe1d5RN3TJ35dbkGy3YqBSMbyCrlbi+CM9Z3Jk5yTL7RCsqboyQ=="
},
"node_modules/lodash.once": {
"version": "4.1.1",
"resolved": "https://registry.npmjs.org/lodash.once/-/lodash.once-4.1.1.tgz",
@@ -3865,12 +3748,6 @@
"wrappy": "1"
}
},
"node_modules/openapi-types": {
"version": "12.1.3",
"resolved": "https://registry.npmjs.org/openapi-types/-/openapi-types-12.1.3.tgz",
"integrity": "sha512-N4YtSYJqghVu4iek2ZUvcN/0aqH1kRDuNqzcycDxhOUpg7GdvLa2F3DgS6yBNhInhv2r/6I0Flkn7CqL8+nIcw==",
"peer": true
},
"node_modules/openid-client": {
"version": "6.8.1",
"resolved": "https://registry.npmjs.org/openid-client/-/openid-client-6.8.1.tgz",
@@ -5311,78 +5188,6 @@
}
]
},
"node_modules/swagger-jsdoc": {
"version": "6.2.8",
"resolved": "https://registry.npmjs.org/swagger-jsdoc/-/swagger-jsdoc-6.2.8.tgz",
"integrity": "sha512-VPvil1+JRpmJ55CgAtn8DIcpBs0bL5L3q5bVQvF4tAW/k/9JYSj7dCpaYCAv5rufe0vcCbBRQXGvzpkWjvLklQ==",
"dependencies": {
"commander": "6.2.0",
"doctrine": "3.0.0",
"glob": "7.1.6",
"lodash.mergewith": "^4.6.2",
"swagger-parser": "^10.0.3",
"yaml": "2.0.0-1"
},
"bin": {
"swagger-jsdoc": "bin/swagger-jsdoc.js"
},
"engines": {
"node": ">=12.0.0"
}
},
"node_modules/swagger-jsdoc/node_modules/glob": {
"version": "7.1.6",
"resolved": "https://registry.npmjs.org/glob/-/glob-7.1.6.tgz",
"integrity": "sha512-LwaxwyZ72Lk7vZINtNNrywX0ZuLyStrdDtabefZKAY5ZGJhVtgdznluResxNmPitE0SAO+O26sWTHeKSI2wMBA==",
"deprecated": "Glob versions prior to v9 are no longer supported",
"dependencies": {
"fs.realpath": "^1.0.0",
"inflight": "^1.0.4",
"inherits": "2",
"minimatch": "^3.0.4",
"once": "^1.3.0",
"path-is-absolute": "^1.0.0"
},
"engines": {
"node": "*"
},
"funding": {
"url": "https://github.com/sponsors/isaacs"
}
},
"node_modules/swagger-parser": {
"version": "10.0.3",
"resolved": "https://registry.npmjs.org/swagger-parser/-/swagger-parser-10.0.3.tgz",
"integrity": "sha512-nF7oMeL4KypldrQhac8RyHerJeGPD1p2xDh900GPvc+Nk7nWP6jX2FcC7WmkinMoAmoO774+AFXcWsW8gMWEIg==",
"dependencies": {
"@apidevtools/swagger-parser": "10.0.3"
},
"engines": {
"node": ">=10"
}
},
"node_modules/swagger-ui-dist": {
"version": "5.31.0",
"resolved": "https://registry.npmjs.org/swagger-ui-dist/-/swagger-ui-dist-5.31.0.tgz",
"integrity": "sha512-zSUTIck02fSga6rc0RZP3b7J7wgHXwLea8ZjgLA3Vgnb8QeOl3Wou2/j5QkzSGeoz6HusP/coYuJl33aQxQZpg==",
"dependencies": {
"@scarf/scarf": "=1.4.0"
}
},
"node_modules/swagger-ui-express": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/swagger-ui-express/-/swagger-ui-express-5.0.1.tgz",
"integrity": "sha512-SrNU3RiBGTLLmFU8GIJdOdanJTl4TOmT27tt3bWWHppqYmAZ6IDuEuBvMU6nZq0zLEe6b/1rACXCgLZqO6ZfrA==",
"dependencies": {
"swagger-ui-dist": ">=5.0.0"
},
"engines": {
"node": ">= v0.10.32"
},
"peerDependencies": {
"express": ">=4.0.0 || >=5.0.0-beta"
}
},
"node_modules/tar": {
"version": "6.2.1",
"resolved": "https://registry.npmjs.org/tar/-/tar-6.2.1.tgz",
@@ -5601,14 +5406,6 @@
"uuid": "dist/bin/uuid"
}
},
"node_modules/validator": {
"version": "13.15.23",
"resolved": "https://registry.npmjs.org/validator/-/validator-13.15.23.tgz",
"integrity": "sha512-4yoz1kEWqUjzi5zsPbAS/903QXSYp0UOtHsPpp7p9rHAw/W+dkInskAE386Fat3oKRROwO98d9ZB0G4cObgUyw==",
"engines": {
"node": ">= 0.10"
}
},
"node_modules/vary": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz",
@@ -5787,14 +5584,6 @@
"resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz",
"integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A=="
},
"node_modules/yaml": {
"version": "2.0.0-1",
"resolved": "https://registry.npmjs.org/yaml/-/yaml-2.0.0-1.tgz",
"integrity": "sha512-W7h5dEhywMKenDJh2iX/LABkbFnBxasD27oyXWDS/feDsxiw0dD5ncXdYXgkvAsXIY2MpW/ZKkr9IU30DBdMNQ==",
"engines": {
"node": ">= 6"
}
},
"node_modules/yargs": {
"version": "17.7.2",
"resolved": "https://registry.npmjs.org/yargs/-/yargs-17.7.2.tgz",
@@ -5829,34 +5618,6 @@
"fd-slicer": "~1.1.0"
}
},
"node_modules/z-schema": {
"version": "5.0.5",
"resolved": "https://registry.npmjs.org/z-schema/-/z-schema-5.0.5.tgz",
"integrity": "sha512-D7eujBWkLa3p2sIpJA0d1pr7es+a7m0vFAnZLlCEKq/Ij2k0MLi9Br2UPxoxdYystm5K1yeBGzub0FlYUEWj2Q==",
"dependencies": {
"lodash.get": "^4.4.2",
"lodash.isequal": "^4.5.0",
"validator": "^13.7.0"
},
"bin": {
"z-schema": "bin/z-schema"
},
"engines": {
"node": ">=8.0.0"
},
"optionalDependencies": {
"commander": "^9.4.1"
}
},
"node_modules/z-schema/node_modules/commander": {
"version": "9.5.0",
"resolved": "https://registry.npmjs.org/commander/-/commander-9.5.0.tgz",
"integrity": "sha512-KRs7WVDKg86PWiuAqhDrAQnTXZKraVcCc6vFdL14qrZ/DcWwuRo7VoiYXalXO7S5GKpqYiVEwCbgFDfxNHKJBQ==",
"optional": true,
"engines": {
"node": "^12.20.0 || >=14"
}
},
"node_modules/zod": {
"version": "3.25.76",
"resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz",

View File

@@ -49,8 +49,6 @@
"puppeteer-extra-plugin-stealth": "^2.11.2",
"sharp": "^0.32.0",
"socks-proxy-agent": "^8.0.2",
"swagger-jsdoc": "^6.2.8",
"swagger-ui-express": "^5.0.1",
"user-agents": "^1.1.669",
"uuid": "^9.0.1",
"zod": "^3.22.4"
@@ -63,8 +61,6 @@
"@types/node": "^20.10.5",
"@types/node-cron": "^3.0.11",
"@types/pg": "^8.15.6",
"@types/swagger-jsdoc": "^6.0.4",
"@types/swagger-ui-express": "^4.1.8",
"@types/uuid": "^9.0.7",
"tsx": "^4.7.0",
"typescript": "^5.3.3"

View File

@@ -7,7 +7,6 @@
*
* NO username/password auth in API. Use tokens only.
*
* Trusted origins are managed via /admin and stored in the trusted_origins table.
* Localhost bypass: curl from 127.0.0.1 gets automatic admin access.
*/
import { Request, Response, NextFunction } from 'express';
@@ -17,8 +16,8 @@ import { pool } from '../db/pool';
const JWT_SECRET = process.env.JWT_SECRET || 'change_this_in_production';
// Fallback trusted origins (used if DB unavailable)
const FALLBACK_TRUSTED_ORIGINS = [
// Trusted origins that bypass auth for internal/same-origin requests
const TRUSTED_ORIGINS = [
'https://cannaiq.co',
'https://www.cannaiq.co',
'https://findadispo.com',
@@ -30,108 +29,31 @@ const FALLBACK_TRUSTED_ORIGINS = [
'http://localhost:5173',
];
const FALLBACK_TRUSTED_PATTERNS = [
/^https:\/\/.*\.cannabrands\.app$/,
/^https:\/\/.*\.cannaiq\.co$/,
// Pattern-based trusted origins (wildcards)
const TRUSTED_ORIGIN_PATTERNS = [
/^https:\/\/.*\.cannabrands\.app$/, // *.cannabrands.app
/^https:\/\/.*\.cannaiq\.co$/, // *.cannaiq.co
];
const FALLBACK_TRUSTED_IPS = [
// Trusted IPs for internal pod-to-pod communication
const TRUSTED_IPS = [
'127.0.0.1',
'::1',
'::ffff:127.0.0.1',
];
// Cache for DB-backed trusted origins
let trustedOriginsCache: {
ips: Set<string>;
domains: Set<string>;
patterns: RegExp[];
loadedAt: Date;
} | null = null;
/**
* Load trusted origins from DB with caching (5 min TTL)
*/
async function loadTrustedOrigins(): Promise<{
ips: Set<string>;
domains: Set<string>;
patterns: RegExp[];
}> {
// Return cached if fresh
if (trustedOriginsCache) {
const age = Date.now() - trustedOriginsCache.loadedAt.getTime();
if (age < 5 * 60 * 1000) {
return trustedOriginsCache;
}
}
try {
const result = await pool.query(`
SELECT origin_type, origin_value
FROM trusted_origins
WHERE active = true
`);
const ips = new Set<string>();
const domains = new Set<string>();
const patterns: RegExp[] = [];
for (const row of result.rows) {
switch (row.origin_type) {
case 'ip':
ips.add(row.origin_value);
break;
case 'domain':
// Store as full origin for comparison
if (!row.origin_value.startsWith('http')) {
domains.add(`https://${row.origin_value}`);
domains.add(`http://${row.origin_value}`);
} else {
domains.add(row.origin_value);
}
break;
case 'pattern':
try {
patterns.push(new RegExp(row.origin_value));
} catch {
console.warn(`[Auth] Invalid trusted origin pattern: ${row.origin_value}`);
}
break;
}
}
trustedOriginsCache = { ips, domains, patterns, loadedAt: new Date() };
return trustedOriginsCache;
} catch (error) {
// DB not available or table doesn't exist - use fallbacks
return {
ips: new Set(FALLBACK_TRUSTED_IPS),
domains: new Set(FALLBACK_TRUSTED_ORIGINS),
patterns: FALLBACK_TRUSTED_PATTERNS,
};
}
}
/**
* Clear trusted origins cache (called when admin updates origins)
*/
export function clearTrustedOriginsCache() {
trustedOriginsCache = null;
}
/**
* Check if request is from a trusted origin/IP
*/
async function isTrustedRequest(req: Request): Promise<boolean> {
const { ips, domains, patterns } = await loadTrustedOrigins();
function isTrustedRequest(req: Request): boolean {
// Check origin header
const origin = req.headers.origin;
if (origin) {
if (domains.has(origin)) {
if (TRUSTED_ORIGINS.includes(origin)) {
return true;
}
for (const pattern of patterns) {
// Check pattern-based origins (wildcards like *.cannabrands.app)
for (const pattern of TRUSTED_ORIGIN_PATTERNS) {
if (pattern.test(origin)) {
return true;
}
@@ -141,15 +63,16 @@ async function isTrustedRequest(req: Request): Promise<boolean> {
// Check referer header (for same-origin requests without CORS)
const referer = req.headers.referer;
if (referer) {
for (const trusted of domains) {
for (const trusted of TRUSTED_ORIGINS) {
if (referer.startsWith(trusted)) {
return true;
}
}
// Check pattern-based referers
try {
const refererUrl = new URL(referer);
const refererOrigin = refererUrl.origin;
for (const pattern of patterns) {
for (const pattern of TRUSTED_ORIGIN_PATTERNS) {
if (pattern.test(refererOrigin)) {
return true;
}
@@ -161,7 +84,7 @@ async function isTrustedRequest(req: Request): Promise<boolean> {
// Check IP for internal requests (pod-to-pod, localhost)
const clientIp = req.ip || req.socket.remoteAddress || '';
if (ips.has(clientIp)) {
if (TRUSTED_IPS.includes(clientIp)) {
return true;
}
@@ -277,7 +200,7 @@ export async function authMiddleware(req: AuthRequest, res: Response, next: Next
}
// No token provided - check trusted origins for API access (WordPress, etc.)
if (await isTrustedRequest(req)) {
if (isTrustedRequest(req)) {
req.user = {
id: 0,
email: 'internal@system',

View File

@@ -147,8 +147,6 @@ import workerRegistryRoutes from './routes/worker-registry';
// Per TASK_WORKFLOW_2024-12-10.md: Raw payload access API
import payloadsRoutes from './routes/payloads';
import k8sRoutes from './routes/k8s';
import trustedOriginsRoutes from './routes/trusted-origins';
// Mark requests from trusted domains (cannaiq.co, findagram.co, findadispo.com)
// These domains can access the API without authentication
@@ -202,10 +200,6 @@ app.use('/api/admin/orchestrator', orchestratorAdminRoutes);
app.use('/api/admin/debug', adminDebugRoutes);
console.log('[AdminDebug] Routes registered at /api/admin/debug');
// Admin routes - trusted origins management (IPs, domains that bypass auth)
app.use('/api/admin/trusted-origins', trustedOriginsRoutes);
console.log('[TrustedOrigins] Routes registered at /api/admin/trusted-origins');
// Admin routes - intelligence (brands, pricing analytics)
app.use('/api/admin/intelligence', intelligenceRoutes);
console.log('[Intelligence] Routes registered at /api/admin/intelligence');

View File

@@ -28,55 +28,8 @@ const router = Router();
const getDbPool = (): Pool => getPool() as unknown as Pool;
/**
* @swagger
* /payloads:
* get:
* summary: List payload metadata
* description: Returns paginated list of raw crawl payload metadata. Does not include the actual payload data.
* tags: [Payloads]
* parameters:
* - in: query
* name: limit
* schema:
* type: integer
* default: 50
* maximum: 100
* description: Number of payloads to return
* - in: query
* name: offset
* schema:
* type: integer
* default: 0
* description: Number of payloads to skip
* - in: query
* name: dispensary_id
* schema:
* type: integer
* description: Filter by dispensary ID
* responses:
* 200:
* description: List of payload metadata
* content:
* application/json:
* schema:
* type: object
* properties:
* success:
* type: boolean
* example: true
* payloads:
* type: array
* items:
* $ref: '#/components/schemas/PayloadMetadata'
* pagination:
* type: object
* properties:
* limit:
* type: integer
* offset:
* type: integer
* 500:
* description: Server error
* GET /api/payloads
* List payload metadata (paginated)
*/
router.get('/', async (req: Request, res: Response) => {
try {
@@ -103,35 +56,8 @@ router.get('/', async (req: Request, res: Response) => {
});
/**
* @swagger
* /payloads/{id}:
* get:
* summary: Get payload metadata by ID
* description: Returns metadata for a specific payload including dispensary name, size, and timestamps.
* tags: [Payloads]
* parameters:
* - in: path
* name: id
* required: true
* schema:
* type: integer
* description: Payload ID
* responses:
* 200:
* description: Payload metadata
* content:
* application/json:
* schema:
* type: object
* properties:
* success:
* type: boolean
* payload:
* $ref: '#/components/schemas/PayloadMetadata'
* 404:
* description: Payload not found
* 500:
* description: Server error
* GET /api/payloads/:id
* Get payload metadata by ID
*/
router.get('/:id', async (req: Request, res: Response) => {
try {
@@ -171,43 +97,8 @@ router.get('/:id', async (req: Request, res: Response) => {
});
/**
* @swagger
* /payloads/{id}/data:
* get:
* summary: Get full payload data
* description: Returns the complete raw crawl payload JSON, decompressed from disk. This includes all products from the crawl.
* tags: [Payloads]
* parameters:
* - in: path
* name: id
* required: true
* schema:
* type: integer
* description: Payload ID
* responses:
* 200:
* description: Full payload data
* content:
* application/json:
* schema:
* type: object
* properties:
* success:
* type: boolean
* metadata:
* $ref: '#/components/schemas/PayloadMetadata'
* data:
* type: object
* description: Raw GraphQL response with products array
* properties:
* products:
* type: array
* items:
* type: object
* 404:
* description: Payload not found
* 500:
* description: Server error
* GET /api/payloads/:id/data
* Get full payload JSON (decompressed from disk)
*/
router.get('/:id/data', async (req: Request, res: Response) => {
try {
@@ -232,48 +123,8 @@ router.get('/:id/data', async (req: Request, res: Response) => {
});
/**
* @swagger
* /payloads/store/{dispensaryId}:
* get:
* summary: List payloads for a store
* description: Returns paginated list of payload metadata for a specific dispensary.
* tags: [Payloads]
* parameters:
* - in: path
* name: dispensaryId
* required: true
* schema:
* type: integer
* description: Dispensary ID
* - in: query
* name: limit
* schema:
* type: integer
* default: 20
* maximum: 100
* - in: query
* name: offset
* schema:
* type: integer
* default: 0
* responses:
* 200:
* description: List of payloads for store
* content:
* application/json:
* schema:
* type: object
* properties:
* success:
* type: boolean
* dispensaryId:
* type: integer
* payloads:
* type: array
* items:
* $ref: '#/components/schemas/PayloadMetadata'
* 500:
* description: Server error
* GET /api/payloads/store/:dispensaryId
* List payloads for a specific store
*/
router.get('/store/:dispensaryId', async (req: Request, res: Response) => {
try {
@@ -301,42 +152,8 @@ router.get('/store/:dispensaryId', async (req: Request, res: Response) => {
});
/**
* @swagger
* /payloads/store/{dispensaryId}/latest:
* get:
* summary: Get latest payload for a store
* description: Returns the most recent raw crawl payload for a dispensary, including full product data.
* tags: [Payloads]
* parameters:
* - in: path
* name: dispensaryId
* required: true
* schema:
* type: integer
* description: Dispensary ID
* responses:
* 200:
* description: Latest payload with full data
* content:
* application/json:
* schema:
* type: object
* properties:
* success:
* type: boolean
* metadata:
* $ref: '#/components/schemas/PayloadMetadata'
* data:
* type: object
* properties:
* products:
* type: array
* items:
* type: object
* 404:
* description: No payloads found for dispensary
* 500:
* description: Server error
* GET /api/payloads/store/:dispensaryId/latest
* Get the latest payload for a store (with full data)
*/
router.get('/store/:dispensaryId/latest', async (req: Request, res: Response) => {
try {
@@ -364,107 +181,12 @@ router.get('/store/:dispensaryId/latest', async (req: Request, res: Response) =>
});
/**
* @swagger
* /payloads/store/{dispensaryId}/diff:
* get:
* summary: Compare two payloads
* description: |
* Compares two crawl payloads for a store and returns the differences.
* If no IDs are provided, compares the two most recent payloads.
* Returns added products, removed products, price changes, and stock changes.
* tags: [Payloads]
* parameters:
* - in: path
* name: dispensaryId
* required: true
* schema:
* type: integer
* description: Dispensary ID
* - in: query
* name: from
* schema:
* type: integer
* description: Older payload ID (optional)
* - in: query
* name: to
* schema:
* type: integer
* description: Newer payload ID (optional)
* responses:
* 200:
* description: Payload diff results
* content:
* application/json:
* schema:
* type: object
* properties:
* success:
* type: boolean
* from:
* type: object
* properties:
* id:
* type: integer
* fetchedAt:
* type: string
* format: date-time
* productCount:
* type: integer
* to:
* type: object
* properties:
* id:
* type: integer
* fetchedAt:
* type: string
* format: date-time
* productCount:
* type: integer
* diff:
* type: object
* properties:
* added:
* type: integer
* removed:
* type: integer
* priceChanges:
* type: integer
* stockChanges:
* type: integer
* details:
* type: object
* properties:
* added:
* type: array
* items:
* type: object
* removed:
* type: array
* items:
* type: object
* priceChanges:
* type: array
* items:
* type: object
* properties:
* id:
* type: string
* name:
* type: string
* oldPrice:
* type: number
* newPrice:
* type: number
* stockChanges:
* type: array
* items:
* type: object
* 400:
* description: Need at least 2 payloads to diff
* 404:
* description: One or both payloads not found
* 500:
* description: Server error
* GET /api/payloads/store/:dispensaryId/diff
* Compare two payloads for a store
*
* Query params:
* - from: payload ID (older)
* - to: payload ID (newer) - optional, defaults to latest
*/
router.get('/store/:dispensaryId/diff', async (req: Request, res: Response) => {
try {
@@ -609,370 +331,4 @@ router.get('/store/:dispensaryId/diff', async (req: Request, res: Response) => {
}
});
/**
* GET /api/payloads/store/:dispensaryId/query
* Query products from the latest payload with flexible filters
*
* Query params:
* - brand: Filter by brand name (partial match)
* - category: Filter by category (exact match)
* - subcategory: Filter by subcategory
* - strain_type: Filter by strain type (indica, sativa, hybrid, cbd)
* - in_stock: Filter by stock status (true/false)
* - price_min: Minimum price
* - price_max: Maximum price
* - thc_min: Minimum THC percentage
* - thc_max: Maximum THC percentage
* - search: Search product name (partial match)
* - fields: Comma-separated list of fields to return
* - limit: Max results (default 100, max 1000)
* - offset: Skip results for pagination
* - sort: Sort field (name, price, thc, brand)
* - order: Sort order (asc, desc)
*/
router.get('/store/:dispensaryId/query', async (req: Request, res: Response) => {
try {
const pool = getDbPool();
const dispensaryId = parseInt(req.params.dispensaryId);
// Get latest payload
const result = await getLatestPayload(pool, dispensaryId);
if (!result) {
return res.status(404).json({
success: false,
error: `No payloads found for dispensary ${dispensaryId}`,
});
}
let products = result.payload.products || [];
// Parse query params
const {
brand,
category,
subcategory,
strain_type,
in_stock,
price_min,
price_max,
thc_min,
thc_max,
search,
fields,
limit: limitStr,
offset: offsetStr,
sort,
order,
} = req.query;
// Apply filters
if (brand) {
const brandLower = (brand as string).toLowerCase();
products = products.filter((p: any) =>
p.brand?.name?.toLowerCase().includes(brandLower)
);
}
if (category) {
const catLower = (category as string).toLowerCase();
products = products.filter((p: any) =>
p.category?.toLowerCase() === catLower ||
p.Category?.toLowerCase() === catLower
);
}
if (subcategory) {
const subLower = (subcategory as string).toLowerCase();
products = products.filter((p: any) =>
p.subcategory?.toLowerCase() === subLower ||
p.subCategory?.toLowerCase() === subLower
);
}
if (strain_type) {
const strainLower = (strain_type as string).toLowerCase();
products = products.filter((p: any) =>
p.strainType?.toLowerCase() === strainLower ||
p.strain_type?.toLowerCase() === strainLower
);
}
if (in_stock !== undefined) {
const wantInStock = in_stock === 'true';
products = products.filter((p: any) => {
const status = p.Status || p.status;
const isInStock = status === 'Active' || status === 'In Stock' || status === 'in_stock';
return wantInStock ? isInStock : !isInStock;
});
}
if (price_min !== undefined) {
const min = parseFloat(price_min as string);
products = products.filter((p: any) => {
const price = p.Prices?.[0]?.price || p.price;
return price >= min;
});
}
if (price_max !== undefined) {
const max = parseFloat(price_max as string);
products = products.filter((p: any) => {
const price = p.Prices?.[0]?.price || p.price;
return price <= max;
});
}
if (thc_min !== undefined) {
const min = parseFloat(thc_min as string);
products = products.filter((p: any) => {
const thc = p.potencyThc?.formatted || p.thc || 0;
const thcNum = typeof thc === 'string' ? parseFloat(thc) : thc;
return thcNum >= min;
});
}
if (thc_max !== undefined) {
const max = parseFloat(thc_max as string);
products = products.filter((p: any) => {
const thc = p.potencyThc?.formatted || p.thc || 0;
const thcNum = typeof thc === 'string' ? parseFloat(thc) : thc;
return thcNum <= max;
});
}
if (search) {
const searchLower = (search as string).toLowerCase();
products = products.filter((p: any) =>
p.name?.toLowerCase().includes(searchLower)
);
}
// Sort
if (sort) {
const sortOrder = order === 'desc' ? -1 : 1;
products.sort((a: any, b: any) => {
let aVal: any, bVal: any;
switch (sort) {
case 'name':
aVal = a.name || '';
bVal = b.name || '';
break;
case 'price':
aVal = a.Prices?.[0]?.price || a.price || 0;
bVal = b.Prices?.[0]?.price || b.price || 0;
break;
case 'thc':
aVal = parseFloat(a.potencyThc?.formatted || a.thc || '0');
bVal = parseFloat(b.potencyThc?.formatted || b.thc || '0');
break;
case 'brand':
aVal = a.brand?.name || '';
bVal = b.brand?.name || '';
break;
default:
return 0;
}
if (aVal < bVal) return -1 * sortOrder;
if (aVal > bVal) return 1 * sortOrder;
return 0;
});
}
// Pagination
const totalCount = products.length;
const limit = Math.min(parseInt(limitStr as string) || 100, 1000);
const offset = parseInt(offsetStr as string) || 0;
products = products.slice(offset, offset + limit);
// Field selection - normalize product structure
const normalizedProducts = products.map((p: any) => {
const normalized: any = {
id: p._id || p.id,
name: p.name,
brand: p.brand?.name || p.brandName,
category: p.category || p.Category,
subcategory: p.subcategory || p.subCategory,
strain_type: p.strainType || p.strain_type,
price: p.Prices?.[0]?.price || p.price,
price_med: p.Prices?.[0]?.priceMed || p.priceMed,
price_rec: p.Prices?.[0]?.priceRec || p.priceRec,
thc: p.potencyThc?.formatted || p.thc,
cbd: p.potencyCbd?.formatted || p.cbd,
weight: p.Prices?.[0]?.weight || p.weight,
status: p.Status || p.status,
in_stock: (p.Status || p.status) === 'Active',
image_url: p.image || p.imageUrl || p.image_url,
description: p.description,
};
// If specific fields requested, filter
if (fields) {
const requestedFields = (fields as string).split(',').map(f => f.trim());
const filtered: any = {};
for (const field of requestedFields) {
if (normalized.hasOwnProperty(field)) {
filtered[field] = normalized[field];
}
}
return filtered;
}
return normalized;
});
res.json({
success: true,
dispensaryId,
payloadId: result.metadata.id,
fetchedAt: result.metadata.fetchedAt,
query: {
filters: {
brand: brand || null,
category: category || null,
subcategory: subcategory || null,
strain_type: strain_type || null,
in_stock: in_stock || null,
price_min: price_min || null,
price_max: price_max || null,
thc_min: thc_min || null,
thc_max: thc_max || null,
search: search || null,
},
sort: sort || null,
order: order || 'asc',
limit,
offset,
},
pagination: {
total: totalCount,
returned: normalizedProducts.length,
limit,
offset,
has_more: offset + limit < totalCount,
},
products: normalizedProducts,
});
} catch (error: any) {
console.error('[Payloads] Query error:', error.message);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/payloads/store/:dispensaryId/aggregate
* Aggregate data from the latest payload
*
* Query params:
* - group_by: Field to group by (brand, category, subcategory, strain_type)
* - metrics: Comma-separated metrics (count, avg_price, min_price, max_price, avg_thc)
*/
router.get('/store/:dispensaryId/aggregate', async (req: Request, res: Response) => {
try {
const pool = getDbPool();
const dispensaryId = parseInt(req.params.dispensaryId);
const result = await getLatestPayload(pool, dispensaryId);
if (!result) {
return res.status(404).json({
success: false,
error: `No payloads found for dispensary ${dispensaryId}`,
});
}
const products = result.payload.products || [];
const groupBy = req.query.group_by as string;
const metricsParam = req.query.metrics as string || 'count';
const metrics = metricsParam.split(',').map(m => m.trim());
if (!groupBy) {
return res.status(400).json({
success: false,
error: 'group_by parameter is required (brand, category, subcategory, strain_type)',
});
}
// Group products
const groups: Map<string, any[]> = new Map();
for (const p of products) {
let key: string;
switch (groupBy) {
case 'brand':
key = p.brand?.name || 'Unknown';
break;
case 'category':
key = p.category || p.Category || 'Unknown';
break;
case 'subcategory':
key = p.subcategory || p.subCategory || 'Unknown';
break;
case 'strain_type':
key = p.strainType || p.strain_type || 'Unknown';
break;
default:
key = 'Unknown';
}
if (!groups.has(key)) {
groups.set(key, []);
}
groups.get(key)!.push(p);
}
// Calculate metrics
const aggregations: any[] = [];
for (const [key, items] of groups) {
const agg: any = { [groupBy]: key };
for (const metric of metrics) {
switch (metric) {
case 'count':
agg.count = items.length;
break;
case 'avg_price':
const prices = items.map(p => p.Prices?.[0]?.price || p.price).filter(p => p != null);
agg.avg_price = prices.length > 0 ? prices.reduce((a, b) => a + b, 0) / prices.length : null;
break;
case 'min_price':
const minPrices = items.map(p => p.Prices?.[0]?.price || p.price).filter(p => p != null);
agg.min_price = minPrices.length > 0 ? Math.min(...minPrices) : null;
break;
case 'max_price':
const maxPrices = items.map(p => p.Prices?.[0]?.price || p.price).filter(p => p != null);
agg.max_price = maxPrices.length > 0 ? Math.max(...maxPrices) : null;
break;
case 'avg_thc':
const thcs = items.map(p => parseFloat(p.potencyThc?.formatted || p.thc || '0')).filter(t => t > 0);
agg.avg_thc = thcs.length > 0 ? thcs.reduce((a, b) => a + b, 0) / thcs.length : null;
break;
case 'in_stock_count':
agg.in_stock_count = items.filter(p => (p.Status || p.status) === 'Active').length;
break;
}
}
aggregations.push(agg);
}
// Sort by count descending
aggregations.sort((a, b) => (b.count || 0) - (a.count || 0));
res.json({
success: true,
dispensaryId,
payloadId: result.metadata.id,
fetchedAt: result.metadata.fetchedAt,
groupBy,
metrics,
totalProducts: products.length,
groupCount: aggregations.length,
aggregations,
});
} catch (error: any) {
console.error('[Payloads] Aggregate error:', error.message);
res.status(500).json({ success: false, error: error.message });
}
});
export default router;

View File

@@ -3,6 +3,24 @@
*
* Endpoints for managing worker tasks, viewing capacity metrics,
* and generating batch tasks.
*
* SCHEDULE MANAGEMENT (added 2025-12-12):
* This file now contains the canonical schedule management endpoints.
* The job_schedules table has been deprecated and all schedule management
* is now consolidated into task_schedules:
*
* Schedule endpoints:
* GET /api/tasks/schedules - List all schedules
* POST /api/tasks/schedules - Create new schedule
* GET /api/tasks/schedules/:id - Get schedule by ID
* PUT /api/tasks/schedules/:id - Update schedule
* DELETE /api/tasks/schedules/:id - Delete schedule
* DELETE /api/tasks/schedules - Bulk delete schedules
* POST /api/tasks/schedules/:id/run-now - Trigger schedule immediately
* POST /api/tasks/schedules/:id/toggle - Toggle schedule enabled/disabled
*
* Note: Schedule routes are defined BEFORE /:id to avoid route conflicts
* (Express matches routes in order, and "schedules" would match /:id otherwise)
*/
import { Router, Request, Response } from 'express';
@@ -131,6 +149,464 @@ router.get('/capacity/:role', async (req: Request, res: Response) => {
}
});
// ============================================================
// SCHEDULE MANAGEMENT ROUTES
// (Must be before /:id to avoid route conflicts)
// ============================================================
/**
* GET /api/tasks/schedules
* List all task schedules
*
* Returns schedules with is_immutable flag - immutable schedules can only
* have their interval_hours, priority, and enabled fields updated (not deleted).
*/
router.get('/schedules', async (req: Request, res: Response) => {
try {
const enabledOnly = req.query.enabled === 'true';
let query = `
SELECT id, name, role, description, enabled, interval_hours,
priority, state_code, platform, method,
COALESCE(is_immutable, false) as is_immutable,
last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
FROM task_schedules
`;
if (enabledOnly) {
query += ` WHERE enabled = true`;
}
query += ` ORDER BY
CASE role
WHEN 'store_discovery' THEN 1
WHEN 'product_discovery' THEN 2
WHEN 'analytics_refresh' THEN 3
ELSE 4
END,
state_code NULLS FIRST,
name`;
const result = await pool.query(query);
res.json({ schedules: result.rows });
} catch (error: unknown) {
console.error('Error listing schedules:', error);
res.status(500).json({ error: 'Failed to list schedules' });
}
});
/**
* DELETE /api/tasks/schedules
* Bulk delete schedules
*
* Immutable schedules are automatically skipped (not deleted).
*
* Body:
* - ids: number[] (required) - array of schedule IDs to delete
* - all: boolean (optional) - if true, delete all non-immutable schedules (ids ignored)
*/
router.delete('/schedules', async (req: Request, res: Response) => {
try {
const { ids, all } = req.body;
let result;
let skippedImmutable: { id: number; name: string }[] = [];
if (all === true) {
// First, find immutable schedules that will be skipped
const immutableResult = await pool.query(`
SELECT id, name FROM task_schedules WHERE is_immutable = true
`);
skippedImmutable = immutableResult.rows;
// Delete all non-immutable schedules
result = await pool.query(`
DELETE FROM task_schedules
WHERE COALESCE(is_immutable, false) = false
RETURNING id, name
`);
} else if (Array.isArray(ids) && ids.length > 0) {
// First, find which of the requested IDs are immutable
const immutableResult = await pool.query(`
SELECT id, name FROM task_schedules
WHERE id = ANY($1) AND is_immutable = true
`, [ids]);
skippedImmutable = immutableResult.rows;
// Delete only non-immutable schedules from the requested IDs
result = await pool.query(`
DELETE FROM task_schedules
WHERE id = ANY($1) AND COALESCE(is_immutable, false) = false
RETURNING id, name
`, [ids]);
} else {
return res.status(400).json({
error: 'Either provide ids array or set all=true',
});
}
res.json({
success: true,
deleted_count: result.rowCount,
deleted: result.rows,
skipped_immutable_count: skippedImmutable.length,
skipped_immutable: skippedImmutable,
message: skippedImmutable.length > 0
? `Deleted ${result.rowCount} schedule(s), skipped ${skippedImmutable.length} immutable schedule(s)`
: `Deleted ${result.rowCount} schedule(s)`,
});
} catch (error: unknown) {
console.error('Error bulk deleting schedules:', error);
res.status(500).json({ error: 'Failed to delete schedules' });
}
});
/**
* POST /api/tasks/schedules
* Create a new schedule
*
* Body:
* - name: string (required, unique)
* - role: TaskRole (required)
* - description: string (optional)
* - enabled: boolean (default true)
* - interval_hours: number (required)
* - priority: number (default 0)
* - state_code: string (optional)
* - platform: string (optional)
*/
router.post('/schedules', async (req: Request, res: Response) => {
try {
const {
name,
role,
description,
enabled = true,
interval_hours,
priority = 0,
state_code,
platform,
} = req.body;
if (!name || !role || !interval_hours) {
return res.status(400).json({
error: 'name, role, and interval_hours are required',
});
}
// Calculate next_run_at based on interval
const nextRunAt = new Date(Date.now() + interval_hours * 60 * 60 * 1000);
const result = await pool.query(`
INSERT INTO task_schedules
(name, role, description, enabled, interval_hours, priority, state_code, platform, next_run_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id, name, role, description, enabled, interval_hours,
priority, state_code, platform, last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
`, [name, role, description, enabled, interval_hours, priority, state_code, platform, nextRunAt]);
res.status(201).json(result.rows[0]);
} catch (error: any) {
if (error.code === '23505') {
// Unique constraint violation
return res.status(409).json({ error: 'A schedule with this name already exists' });
}
console.error('Error creating schedule:', error);
res.status(500).json({ error: 'Failed to create schedule' });
}
});
/**
* GET /api/tasks/schedules/:id
* Get a specific schedule by ID
*/
router.get('/schedules/:id', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
const result = await pool.query(`
SELECT id, name, role, description, enabled, interval_hours,
priority, state_code, platform, last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
FROM task_schedules
WHERE id = $1
`, [scheduleId]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
res.json(result.rows[0]);
} catch (error: unknown) {
console.error('Error getting schedule:', error);
res.status(500).json({ error: 'Failed to get schedule' });
}
});
/**
* PUT /api/tasks/schedules/:id
* Update an existing schedule
*
* For IMMUTABLE schedules, only these fields can be updated:
* - enabled (turn on/off)
* - interval_hours (change frequency)
* - priority (change priority)
*
* For regular schedules, all fields can be updated.
*/
router.put('/schedules/:id', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
const {
name,
role,
description,
enabled,
interval_hours,
priority,
state_code,
platform,
} = req.body;
// First check if schedule exists and if it's immutable
const checkResult = await pool.query(`
SELECT id, name, COALESCE(is_immutable, false) as is_immutable
FROM task_schedules WHERE id = $1
`, [scheduleId]);
if (checkResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const schedule = checkResult.rows[0];
const isImmutable = schedule.is_immutable;
// For immutable schedules, reject attempts to change protected fields
if (isImmutable) {
const protectedFields: string[] = [];
if (name !== undefined) protectedFields.push('name');
if (role !== undefined) protectedFields.push('role');
if (description !== undefined) protectedFields.push('description');
if (state_code !== undefined) protectedFields.push('state_code');
if (platform !== undefined) protectedFields.push('platform');
if (protectedFields.length > 0) {
return res.status(403).json({
error: 'Cannot modify protected fields on immutable schedule',
message: `Schedule "${schedule.name}" is immutable. Only enabled, interval_hours, and priority can be changed.`,
protected_fields: protectedFields,
allowed_fields: ['enabled', 'interval_hours', 'priority'],
});
}
}
// Build dynamic update query
const updates: string[] = [];
const values: any[] = [];
let paramIndex = 1;
// These fields can only be updated on non-immutable schedules
if (!isImmutable) {
if (name !== undefined) {
updates.push(`name = $${paramIndex++}`);
values.push(name);
}
if (role !== undefined) {
updates.push(`role = $${paramIndex++}`);
values.push(role);
}
if (description !== undefined) {
updates.push(`description = $${paramIndex++}`);
values.push(description);
}
if (state_code !== undefined) {
updates.push(`state_code = $${paramIndex++}`);
values.push(state_code || null);
}
if (platform !== undefined) {
updates.push(`platform = $${paramIndex++}`);
values.push(platform || null);
}
}
// These fields can be updated on ALL schedules (including immutable)
if (enabled !== undefined) {
updates.push(`enabled = $${paramIndex++}`);
values.push(enabled);
}
if (interval_hours !== undefined) {
updates.push(`interval_hours = $${paramIndex++}`);
values.push(interval_hours);
// Recalculate next_run_at if interval changed
const nextRunAt = new Date(Date.now() + interval_hours * 60 * 60 * 1000);
updates.push(`next_run_at = $${paramIndex++}`);
values.push(nextRunAt);
}
if (priority !== undefined) {
updates.push(`priority = $${paramIndex++}`);
values.push(priority);
}
if (updates.length === 0) {
return res.status(400).json({ error: 'No fields to update' });
}
updates.push('updated_at = NOW()');
values.push(scheduleId);
const result = await pool.query(`
UPDATE task_schedules
SET ${updates.join(', ')}
WHERE id = $${paramIndex}
RETURNING id, name, role, description, enabled, interval_hours,
priority, state_code, platform, method,
COALESCE(is_immutable, false) as is_immutable,
last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
`, values);
res.json(result.rows[0]);
} catch (error: any) {
if (error.code === '23505') {
return res.status(409).json({ error: 'A schedule with this name already exists' });
}
console.error('Error updating schedule:', error);
res.status(500).json({ error: 'Failed to update schedule' });
}
});
/**
* DELETE /api/tasks/schedules/:id
* Delete a schedule
*
* Immutable schedules cannot be deleted - they can only be disabled.
*/
router.delete('/schedules/:id', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
// First check if schedule exists and is immutable
const checkResult = await pool.query(`
SELECT id, name, COALESCE(is_immutable, false) as is_immutable
FROM task_schedules WHERE id = $1
`, [scheduleId]);
if (checkResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const schedule = checkResult.rows[0];
// Prevent deletion of immutable schedules
if (schedule.is_immutable) {
return res.status(403).json({
error: 'Cannot delete immutable schedule',
message: `Schedule "${schedule.name}" is immutable and cannot be deleted. You can disable it instead.`,
schedule_id: scheduleId,
is_immutable: true,
});
}
// Delete the schedule
await pool.query(`DELETE FROM task_schedules WHERE id = $1`, [scheduleId]);
res.json({
success: true,
message: `Schedule "${schedule.name}" deleted`,
});
} catch (error: unknown) {
console.error('Error deleting schedule:', error);
res.status(500).json({ error: 'Failed to delete schedule' });
}
});
/**
* POST /api/tasks/schedules/:id/run-now
* Manually trigger a scheduled task to run immediately
*/
router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
// Get the schedule
const scheduleResult = await pool.query(`
SELECT id, name, role, state_code, platform, priority
FROM task_schedules WHERE id = $1
`, [scheduleId]);
if (scheduleResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const schedule = scheduleResult.rows[0];
// Create a task based on the schedule
const task = await taskService.createTask({
role: schedule.role,
platform: schedule.platform,
priority: schedule.priority + 10, // Boost priority for manual runs
});
// Update last_run_at on the schedule
await pool.query(`
UPDATE task_schedules
SET last_run_at = NOW(),
next_run_at = NOW() + (interval_hours || ' hours')::interval,
updated_at = NOW()
WHERE id = $1
`, [scheduleId]);
res.json({
success: true,
message: `Schedule "${schedule.name}" triggered`,
task,
});
} catch (error: unknown) {
console.error('Error running schedule:', error);
res.status(500).json({ error: 'Failed to run schedule' });
}
});
/**
* POST /api/tasks/schedules/:id/toggle
* Toggle a schedule's enabled status
*/
router.post('/schedules/:id/toggle', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
const result = await pool.query(`
UPDATE task_schedules
SET enabled = NOT enabled,
updated_at = NOW()
WHERE id = $1
RETURNING id, name, enabled
`, [scheduleId]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
res.json({
success: true,
schedule: result.rows[0],
message: result.rows[0].enabled
? `Schedule "${result.rows[0].name}" enabled`
: `Schedule "${result.rows[0].name}" disabled`,
});
} catch (error: unknown) {
console.error('Error toggling schedule:', error);
res.status(500).json({ error: 'Failed to toggle schedule' });
}
});
// ============================================================
// TASK-SPECIFIC ROUTES (with :id parameter)
// ============================================================
/**
* GET /api/tasks/:id
* Get a specific task by ID
@@ -598,6 +1074,123 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => {
}
});
// ============================================================
// STAGGERED BATCH TASK CREATION
// ============================================================
/**
* POST /api/tasks/batch/staggered
* Create multiple tasks with staggered start times
*
* This endpoint prevents resource contention when creating many tasks by
* staggering their scheduled_for timestamps. Each task becomes eligible
* for claiming only after its scheduled time.
*
* WORKFLOW:
* 1. Tasks created with scheduled_for = NOW() + (index * stagger_seconds)
* 2. Worker claims task only when scheduled_for <= NOW()
* 3. Worker runs preflight on EVERY task claim
* 4. If preflight passes, worker executes task
* 5. If preflight fails, task released back to pending for another worker
*
* Body:
* - dispensary_ids: number[] (required) - Array of dispensary IDs
* - role: TaskRole (required) - 'product_refresh' | 'product_discovery'
* - stagger_seconds: number (default: 15) - Seconds between each task start
* - platform: string (default: 'dutchie')
* - method: 'curl' | 'http' | null (default: null)
*/
router.post('/batch/staggered', async (req: Request, res: Response) => {
try {
const {
dispensary_ids,
role,
stagger_seconds = 15,
platform = 'dutchie',
method = null,
} = req.body;
if (!dispensary_ids || !Array.isArray(dispensary_ids) || dispensary_ids.length === 0) {
return res.status(400).json({ error: 'dispensary_ids array is required' });
}
if (!role) {
return res.status(400).json({ error: 'role is required' });
}
const result = await taskService.createStaggeredTasks(
dispensary_ids,
role as TaskRole,
stagger_seconds,
platform,
method
);
const totalDuration = (dispensary_ids.length - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
created: result.created,
task_ids: result.taskIds,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.created} staggered ${role} tasks (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
});
} catch (error: unknown) {
console.error('Error creating staggered tasks:', error);
res.status(500).json({ error: 'Failed to create staggered tasks' });
}
});
/**
* POST /api/tasks/batch/az-stores
* Convenience endpoint to create staggered tasks for Arizona stores
*
* Body:
* - total_tasks: number (default: 24) - Total tasks to create
* - stagger_seconds: number (default: 15) - Seconds between each task
* - split_roles: boolean (default: true) - Split between product_refresh and product_discovery
*/
router.post('/batch/az-stores', async (req: Request, res: Response) => {
try {
const {
total_tasks = 24,
stagger_seconds = 15,
split_roles = true,
} = req.body;
const result = await taskService.createAZStoreTasks(
total_tasks,
stagger_seconds,
split_roles
);
const totalDuration = (result.total - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
total: result.total,
product_refresh: result.product_refresh,
product_discovery: result.product_discovery,
task_ids: result.taskIds,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.total} staggered tasks for AZ stores (${result.product_refresh} refresh, ${result.product_discovery} discovery)`,
});
} catch (error: unknown) {
console.error('Error creating AZ store tasks:', error);
res.status(500).json({ error: 'Failed to create AZ store tasks' });
}
});
// ============================================================
// TASK POOL MANAGEMENT
// ============================================================
/**
* GET /api/tasks/pool/status
* Check if task pool is paused

View File

@@ -1,224 +0,0 @@
/**
* Trusted Origins Admin Routes
*
* Manage IPs and domains that bypass API key authentication.
* Available at /api/admin/trusted-origins
*/
import { Router, Response } from 'express';
import { pool } from '../db/pool';
import { AuthRequest, authMiddleware, requireRole, clearTrustedOriginsCache } from '../auth/middleware';
const router = Router();
// All routes require admin auth
router.use(authMiddleware);
router.use(requireRole('admin', 'superadmin'));
/**
* GET /api/admin/trusted-origins
* List all trusted origins
*/
router.get('/', async (req: AuthRequest, res: Response) => {
try {
const result = await pool.query(`
SELECT
id,
origin_type,
origin_value,
description,
active,
created_at,
updated_at
FROM trusted_origins
ORDER BY origin_type, origin_value
`);
res.json({
success: true,
origins: result.rows,
counts: {
total: result.rows.length,
active: result.rows.filter(r => r.active).length,
ips: result.rows.filter(r => r.origin_type === 'ip').length,
domains: result.rows.filter(r => r.origin_type === 'domain').length,
patterns: result.rows.filter(r => r.origin_type === 'pattern').length,
},
});
} catch (error: any) {
console.error('[TrustedOrigins] List error:', error.message);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/admin/trusted-origins
* Add a new trusted origin
*/
router.post('/', async (req: AuthRequest, res: Response) => {
try {
const { origin_type, origin_value, description } = req.body;
if (!origin_type || !origin_value) {
return res.status(400).json({
success: false,
error: 'origin_type and origin_value are required',
});
}
if (!['ip', 'domain', 'pattern'].includes(origin_type)) {
return res.status(400).json({
success: false,
error: 'origin_type must be: ip, domain, or pattern',
});
}
// Validate pattern if regex
if (origin_type === 'pattern') {
try {
new RegExp(origin_value);
} catch {
return res.status(400).json({
success: false,
error: 'Invalid regex pattern',
});
}
}
const result = await pool.query(`
INSERT INTO trusted_origins (origin_type, origin_value, description, created_by)
VALUES ($1, $2, $3, $4)
RETURNING id, origin_type, origin_value, description, active, created_at
`, [origin_type, origin_value, description || null, req.user?.id || null]);
// Invalidate cache
clearTrustedOriginsCache();
res.json({
success: true,
origin: result.rows[0],
});
} catch (error: any) {
if (error.code === '23505') {
return res.status(409).json({
success: false,
error: 'This origin already exists',
});
}
console.error('[TrustedOrigins] Add error:', error.message);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* PUT /api/admin/trusted-origins/:id
* Update a trusted origin
*/
router.put('/:id', async (req: AuthRequest, res: Response) => {
try {
const id = parseInt(req.params.id);
const { origin_type, origin_value, description, active } = req.body;
// Validate pattern if regex
if (origin_type === 'pattern' && origin_value) {
try {
new RegExp(origin_value);
} catch {
return res.status(400).json({
success: false,
error: 'Invalid regex pattern',
});
}
}
const result = await pool.query(`
UPDATE trusted_origins
SET
origin_type = COALESCE($1, origin_type),
origin_value = COALESCE($2, origin_value),
description = COALESCE($3, description),
active = COALESCE($4, active),
updated_at = NOW()
WHERE id = $5
RETURNING id, origin_type, origin_value, description, active, updated_at
`, [origin_type, origin_value, description, active, id]);
if (result.rows.length === 0) {
return res.status(404).json({ success: false, error: 'Origin not found' });
}
// Invalidate cache
clearTrustedOriginsCache();
res.json({
success: true,
origin: result.rows[0],
});
} catch (error: any) {
console.error('[TrustedOrigins] Update error:', error.message);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* DELETE /api/admin/trusted-origins/:id
* Delete a trusted origin
*/
router.delete('/:id', async (req: AuthRequest, res: Response) => {
try {
const id = parseInt(req.params.id);
const result = await pool.query(`
DELETE FROM trusted_origins WHERE id = $1 RETURNING id, origin_value
`, [id]);
if (result.rows.length === 0) {
return res.status(404).json({ success: false, error: 'Origin not found' });
}
// Invalidate cache
clearTrustedOriginsCache();
res.json({
success: true,
deleted: result.rows[0],
});
} catch (error: any) {
console.error('[TrustedOrigins] Delete error:', error.message);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/admin/trusted-origins/:id/toggle
* Toggle active status
*/
router.post('/:id/toggle', async (req: AuthRequest, res: Response) => {
try {
const id = parseInt(req.params.id);
const result = await pool.query(`
UPDATE trusted_origins
SET active = NOT active, updated_at = NOW()
WHERE id = $1
RETURNING id, origin_type, origin_value, active
`, [id]);
if (result.rows.length === 0) {
return res.status(404).json({ success: false, error: 'Origin not found' });
}
// Invalidate cache
clearTrustedOriginsCache();
res.json({
success: true,
origin: result.rows[0],
});
} catch (error: any) {
console.error('[TrustedOrigins] Toggle error:', error.message);
res.status(500).json({ success: false, error: error.message });
}
});
export default router;

View File

@@ -23,6 +23,8 @@
import { Router, Request, Response } from 'express';
import { pool } from '../db/pool';
import os from 'os';
import { runPuppeteerPreflightWithRetry } from '../services/puppeteer-preflight';
import { CrawlRotator } from '../services/crawl-rotator';
const router = Router();
@@ -250,12 +252,9 @@ router.post('/deregister', async (req: Request, res: Response) => {
// Release the name back to the pool
await pool.query('SELECT release_worker_name($1)', [worker_id]);
// Mark as terminated
// Delete the worker entry (clean shutdown)
const { rows } = await pool.query(`
UPDATE worker_registry
SET status = 'terminated',
current_task_id = NULL,
updated_at = NOW()
DELETE FROM worker_registry
WHERE worker_id = $1
RETURNING id, friendly_name
`, [worker_id]);
@@ -864,4 +863,58 @@ router.get('/pods', async (_req: Request, res: Response) => {
}
});
// ============================================================
// PREFLIGHT SMOKE TEST
// ============================================================
/**
* POST /api/worker-registry/preflight-test
* Run an HTTP (Puppeteer) preflight test and return results
*
* This is a smoke test endpoint to verify the preflight system works.
* Returns IP, fingerprint data, bot detection results, and products fetched.
*/
router.post('/preflight-test', async (_req: Request, res: Response) => {
try {
console.log('[PreflightTest] Starting HTTP preflight smoke test...');
// Create a temporary CrawlRotator for the test
const crawlRotator = new CrawlRotator();
// Run the Puppeteer preflight (with 1 retry)
const startTime = Date.now();
const result = await runPuppeteerPreflightWithRetry(crawlRotator, 1);
const duration = Date.now() - startTime;
console.log(`[PreflightTest] Completed in ${duration}ms - passed: ${result.passed}`);
res.json({
success: true,
test: 'http_preflight',
duration_ms: duration,
result: {
passed: result.passed,
proxy_ip: result.proxyIp,
fingerprint: result.fingerprint,
bot_detection: result.botDetection,
products_returned: result.productsReturned,
browser_user_agent: result.browserUserAgent,
ip_verified: result.ipVerified,
proxy_available: result.proxyAvailable,
proxy_connected: result.proxyConnected,
antidetect_ready: result.antidetectReady,
response_time_ms: result.responseTimeMs,
error: result.error
}
});
} catch (error: any) {
console.error('[PreflightTest] Error:', error.message);
res.status(500).json({
success: false,
test: 'http_preflight',
error: error.message
});
}
});
export default router;

View File

@@ -4,10 +4,25 @@
* Provider-agnostic worker management and job monitoring.
* Replaces legacy /api/dutchie-az/admin/schedules and /api/dutchie-az/monitor/* routes.
*
* DEPRECATION NOTE (2025-12-12):
* This file still queries job_schedules for backwards compatibility with
* the /api/workers endpoints that display worker status. However, the
* job_schedules table is DEPRECATED - all entries have been disabled.
*
* Schedule management has been consolidated into task_schedules:
* - Use /api/tasks/schedules for schedule CRUD operations
* - Use TasksDashboard.tsx (/admin/tasks) for schedule management UI
* - task_schedules uses interval_hours (simpler than base_interval_minutes + jitter)
*
* The /api/workers endpoints remain useful for:
* - Monitoring active workers and job status
* - K8s scaling controls
* - Job history and logs
*
* Endpoints:
* GET /api/workers - List all workers/schedules
* GET /api/workers/active - List currently active workers
* GET /api/workers/schedule - Get all job schedules
* GET /api/workers/schedule - Get all job schedules (DEPRECATED - use /api/tasks/schedules)
* GET /api/workers/:workerName - Get specific worker details
* GET /api/workers/:workerName/scope - Get worker's scope (states, etc.)
* GET /api/workers/:workerName/stats - Get worker statistics

View File

@@ -0,0 +1,284 @@
/**
* Bulk Proxy Import Script
*
* Imports proxies from various formats into the proxies table.
* Supports:
* - Standard format: http://user:pass@host:port
* - Colon format: http://host:port:user:pass
* - Simple format: host:port:user:pass (defaults to http)
*
* Usage:
* npx tsx src/scripts/import-proxies.ts < proxies.txt
* echo "http://host:port:user:pass" | npx tsx src/scripts/import-proxies.ts
* npx tsx src/scripts/import-proxies.ts --file proxies.txt
* npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"
*
* Options:
* --file <path> Read proxies from file (one per line)
* --url <url> Import a single proxy URL
* --max-connections Set max_connections for all imported proxies (default: 1)
* --dry-run Parse and show what would be imported without inserting
*/
import { getPool } from '../db/pool';
import * as fs from 'fs';
import * as readline from 'readline';
interface ParsedProxy {
protocol: string;
host: string;
port: number;
username?: string;
password?: string;
rawUrl: string;
}
/**
* Parse a proxy URL in various formats
*/
function parseProxyUrl(input: string): ParsedProxy | null {
const trimmed = input.trim();
if (!trimmed || trimmed.startsWith('#')) return null;
// Format 1: Standard URL format - http://user:pass@host:port
const standardMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):([^@]+)@([^:]+):(\d+)$/);
if (standardMatch) {
return {
protocol: standardMatch[1],
username: standardMatch[2],
password: standardMatch[3],
host: standardMatch[4],
port: parseInt(standardMatch[5], 10),
rawUrl: trimmed,
};
}
// Format 2: Standard URL without auth - http://host:port
const noAuthMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+)$/);
if (noAuthMatch) {
return {
protocol: noAuthMatch[1],
host: noAuthMatch[2],
port: parseInt(noAuthMatch[3], 10),
rawUrl: trimmed,
};
}
// Format 3: Colon format with protocol - http://host:port:user:pass
const colonWithProtocolMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+):([^:]+):(.+)$/);
if (colonWithProtocolMatch) {
return {
protocol: colonWithProtocolMatch[1],
host: colonWithProtocolMatch[2],
port: parseInt(colonWithProtocolMatch[3], 10),
username: colonWithProtocolMatch[4],
password: colonWithProtocolMatch[5],
rawUrl: trimmed, // Keep raw URL for non-standard format
};
}
// Format 4: Colon format without protocol - host:port:user:pass
const colonMatch = trimmed.match(/^([^:]+):(\d+):([^:]+):(.+)$/);
if (colonMatch) {
return {
protocol: 'http',
host: colonMatch[1],
port: parseInt(colonMatch[2], 10),
username: colonMatch[3],
password: colonMatch[4],
rawUrl: `http://${trimmed}`, // Construct raw URL
};
}
// Format 5: Simple host:port
const simpleMatch = trimmed.match(/^([^:]+):(\d+)$/);
if (simpleMatch) {
return {
protocol: 'http',
host: simpleMatch[1],
port: parseInt(simpleMatch[2], 10),
rawUrl: `http://${trimmed}`,
};
}
console.error(`[ImportProxies] Could not parse: ${trimmed}`);
return null;
}
/**
* Check if proxy URL is in non-standard format (needs proxy_url column)
*/
function isNonStandardFormat(rawUrl: string): boolean {
// Colon format: protocol://host:port:user:pass
return /^(https?|socks5):\/\/[^:]+:\d+:[^:]+:.+$/.test(rawUrl);
}
async function importProxies(proxies: ParsedProxy[], maxConnections: number, dryRun: boolean) {
if (dryRun) {
console.log('\n[ImportProxies] DRY RUN - Would import:');
for (const p of proxies) {
const needsRawUrl = isNonStandardFormat(p.rawUrl);
console.log(` ${p.host}:${p.port} (${p.protocol}) user=${p.username || 'none'} needsProxyUrl=${needsRawUrl}`);
}
console.log(`\nTotal: ${proxies.length} proxies`);
return;
}
const pool = getPool();
let inserted = 0;
let skipped = 0;
for (const proxy of proxies) {
try {
// Determine if we need to store the raw URL (non-standard format)
const needsRawUrl = isNonStandardFormat(proxy.rawUrl);
// Use different conflict resolution based on format
// Non-standard format: unique by proxy_url (session-based residential proxies)
// Standard format: unique by host/port/protocol
const query = needsRawUrl
? `
INSERT INTO proxies (host, port, protocol, username, password, max_connections, proxy_url, active)
VALUES ($1, $2, $3, $4, $5, $6, $7, true)
ON CONFLICT (proxy_url) WHERE proxy_url IS NOT NULL
DO UPDATE SET
max_connections = EXCLUDED.max_connections,
active = true,
updated_at = NOW()
RETURNING id, (xmax = 0) as is_insert
`
: `
INSERT INTO proxies (host, port, protocol, username, password, max_connections, proxy_url, active)
VALUES ($1, $2, $3, $4, $5, $6, $7, true)
ON CONFLICT (host, port, protocol)
DO UPDATE SET
username = EXCLUDED.username,
password = EXCLUDED.password,
max_connections = EXCLUDED.max_connections,
proxy_url = EXCLUDED.proxy_url,
active = true,
updated_at = NOW()
RETURNING id, (xmax = 0) as is_insert
`;
const result = await pool.query(query, [
proxy.host,
proxy.port,
proxy.protocol,
proxy.username || null,
proxy.password || null,
maxConnections,
needsRawUrl ? proxy.rawUrl : null,
]);
const isInsert = result.rows[0]?.is_insert;
const sessionId = proxy.password?.match(/session-([A-Z0-9]+)/)?.[1] || '';
const displayName = sessionId ? `session ${sessionId}` : `${proxy.host}:${proxy.port}`;
if (isInsert) {
inserted++;
console.log(`[ImportProxies] Inserted: ${displayName}`);
} else {
console.log(`[ImportProxies] Updated: ${displayName}`);
inserted++; // Count updates too
}
} catch (err: any) {
const sessionId = proxy.password?.match(/session-([A-Z0-9]+)/)?.[1] || '';
const displayName = sessionId ? `session ${sessionId}` : `${proxy.host}:${proxy.port}`;
console.error(`[ImportProxies] Error inserting ${displayName}: ${err.message}`);
skipped++;
}
}
console.log(`\n[ImportProxies] Complete: ${inserted} imported, ${skipped} skipped`);
// Notify any listening workers
try {
await pool.query(`NOTIFY proxy_added, 'bulk import'`);
console.log('[ImportProxies] Sent proxy_added notification to workers');
} catch {
// Ignore notification errors
}
}
async function readFromStdin(): Promise<string[]> {
return new Promise((resolve) => {
const lines: string[] = [];
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
});
rl.on('line', (line) => {
lines.push(line);
});
rl.on('close', () => {
resolve(lines);
});
});
}
async function main() {
const args = process.argv.slice(2);
let lines: string[] = [];
let maxConnections = 1;
let dryRun = false;
// Parse arguments
for (let i = 0; i < args.length; i++) {
if (args[i] === '--file' && args[i + 1]) {
const content = fs.readFileSync(args[i + 1], 'utf-8');
lines.push(...content.split('\n'));
i++;
} else if (args[i] === '--url' && args[i + 1]) {
lines.push(args[i + 1]);
i++;
} else if (args[i] === '--max-connections' && args[i + 1]) {
maxConnections = parseInt(args[i + 1], 10);
i++;
} else if (args[i] === '--dry-run') {
dryRun = true;
} else if (!args[i].startsWith('--')) {
// Treat as URL directly
lines.push(args[i]);
}
}
// If no lines yet, read from stdin
if (lines.length === 0) {
console.log('[ImportProxies] Reading from stdin...');
lines = await readFromStdin();
}
// Parse all lines
const proxies: ParsedProxy[] = [];
for (const line of lines) {
const parsed = parseProxyUrl(line);
if (parsed) {
proxies.push(parsed);
}
}
if (proxies.length === 0) {
console.error('[ImportProxies] No valid proxies found');
console.error('\nUsage:');
console.error(' npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"');
console.error(' npx tsx src/scripts/import-proxies.ts --file proxies.txt');
console.error(' echo "host:port:user:pass" | npx tsx src/scripts/import-proxies.ts');
console.error('\nSupported formats:');
console.error(' http://user:pass@host:port (standard)');
console.error(' http://host:port:user:pass (colon format)');
console.error(' host:port:user:pass (simple)');
process.exit(1);
}
console.log(`[ImportProxies] Parsed ${proxies.length} proxies (max_connections=${maxConnections})`);
await importProxies(proxies, maxConnections, dryRun);
}
main().catch((err) => {
console.error('[ImportProxies] Fatal error:', err);
process.exit(1);
});

View File

@@ -77,6 +77,11 @@ export interface Proxy {
country?: string;
countryCode?: string;
timezone?: string;
/**
* Raw proxy URL override. If set, used directly instead of constructing from parts.
* Supports non-standard formats like: http://host:port:user:pass
*/
proxyUrl?: string;
}
export interface ProxyStats {
@@ -129,6 +134,10 @@ export class ProxyRotator {
private proxies: Proxy[] = [];
private currentIndex: number = 0;
private lastRotation: Date = new Date();
private lastReloadAt: Date = new Date();
// Proxy reload interval - how often to check for proxy changes (default: 60 seconds)
private reloadIntervalMs: number = 60000;
constructor(pool?: Pool) {
this.pool = pool || null;
@@ -138,6 +147,13 @@ export class ProxyRotator {
this.pool = pool;
}
/**
* Set the reload interval for periodic proxy checks
*/
setReloadInterval(ms: number): void {
this.reloadIntervalMs = ms;
}
/**
* Load proxies from database
*/
@@ -167,22 +183,76 @@ export class ProxyRotator {
state,
country,
country_code as "countryCode",
timezone
timezone,
proxy_url as "proxyUrl"
FROM proxies
WHERE active = true
ORDER BY failure_count ASC, last_tested_at ASC NULLS FIRST
`);
this.proxies = result.rows;
this.lastReloadAt = new Date();
const totalCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections)`);
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections / threads)`);
} catch (error) {
console.warn(`[ProxyRotator] Could not load proxies: ${error}`);
this.proxies = [];
}
}
/**
* Check if proxy list is stale and needs reload
*/
isStale(): boolean {
const elapsed = Date.now() - this.lastReloadAt.getTime();
return elapsed > this.reloadIntervalMs;
}
/**
* Reload proxies if the cache is stale.
* This ensures workers pick up new proxies or see disabled proxies removed.
* Returns true if proxies were reloaded.
*/
async reloadIfStale(): Promise<boolean> {
if (!this.isStale()) {
return false;
}
const oldCount = this.proxies.length;
const oldCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
const oldIds = new Set(this.proxies.map(p => p.id));
await this.loadProxies();
const newCount = this.proxies.length;
const newCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
const newIds = new Set(this.proxies.map(p => p.id));
// Log changes
const added = this.proxies.filter(p => !oldIds.has(p.id));
const removed = [...oldIds].filter(id => !newIds.has(id));
if (added.length > 0 || removed.length > 0 || oldCapacity !== newCapacity) {
console.log(`[ProxyRotator] Reloaded proxies: ${oldCount}${newCount} proxies, ${oldCapacity}${newCapacity} threads`);
if (added.length > 0) {
console.log(`[ProxyRotator] Added: ${added.map(p => `${p.host}:${p.port} (${p.maxConnections} threads)`).join(', ')}`);
}
if (removed.length > 0) {
console.log(`[ProxyRotator] Removed: ${removed.join(', ')}`);
}
}
return true;
}
/**
* Get time since last reload in seconds
*/
getSecondsSinceReload(): number {
return Math.floor((Date.now() - this.lastReloadAt.getTime()) / 1000);
}
/**
* Get next proxy in rotation
*/
@@ -342,8 +412,24 @@ export class ProxyRotator {
/**
* Get proxy URL for HTTP client
* If proxy.proxyUrl is set, uses it directly (supports non-standard formats).
* Otherwise constructs standard format: protocol://user:pass@host:port
*/
getProxyUrl(proxy: Proxy): string {
// If proxyUrl is set, check if it needs conversion from non-standard format
if (proxy.proxyUrl) {
// Check if it's in non-standard format: http://host:port:user:pass
const colonFormatMatch = proxy.proxyUrl.match(/^(https?):\/\/([^:]+):(\d+):([^:]+):(.+)$/);
if (colonFormatMatch) {
// Convert to standard format: http://user:pass@host:port
const [, protocol, host, port, username, password] = colonFormatMatch;
return `${protocol}://${encodeURIComponent(username)}:${encodeURIComponent(password)}@${host}:${port}`;
}
// Already in standard format or unknown format - return as-is
return proxy.proxyUrl;
}
// Construct standard format from individual fields
const auth = proxy.username && proxy.password
? `${proxy.username}:${proxy.password}@`
: '';
@@ -584,6 +670,23 @@ export class CrawlRotator {
await this.proxy.loadProxies();
}
/**
* Reload proxy list if stale.
* Workers should call this periodically to pick up proxy changes.
* Returns true if proxies were reloaded.
*/
async reloadIfStale(): Promise<boolean> {
return this.proxy.reloadIfStale();
}
/**
* Set proxy reload interval in milliseconds.
* Default is 60 seconds.
*/
setProxyReloadInterval(ms: number): void {
this.proxy.setReloadInterval(ms);
}
/**
* Rotate proxy only (get new IP)
*/

View File

@@ -26,6 +26,34 @@ const TEST_PLATFORM_ID = '6405ef617056e8014d79101b';
const FINGERPRINT_DEMO_URL = 'https://demo.fingerprint.com/';
const AMIUNIQUE_URL = 'https://amiunique.org/fingerprint';
// IP geolocation API for timezone lookup (free, no key required)
const IP_API_URL = 'http://ip-api.com/json';
/**
* Look up timezone from IP address using ip-api.com
* Returns IANA timezone (e.g., 'America/New_York') or null on failure
*/
async function getTimezoneFromIp(ip: string): Promise<{ timezone: string; city?: string; region?: string } | null> {
try {
const axios = require('axios');
const response = await axios.get(`${IP_API_URL}/${ip}?fields=status,timezone,city,regionName`, {
timeout: 5000,
});
if (response.data?.status === 'success' && response.data?.timezone) {
return {
timezone: response.data.timezone,
city: response.data.city,
region: response.data.regionName,
};
}
return null;
} catch (err: any) {
console.log(`[PuppeteerPreflight] IP geolocation lookup failed: ${err.message}`);
return null;
}
}
export interface PuppeteerPreflightResult extends PreflightResult {
method: 'http';
/** Number of products returned (proves API access) */
@@ -42,6 +70,13 @@ export interface PuppeteerPreflightResult extends PreflightResult {
expectedProxyIp?: string;
/** Whether IP verification passed (detected IP matches proxy) */
ipVerified?: boolean;
/** Detected timezone from IP geolocation */
detectedTimezone?: string;
/** Detected location from IP geolocation */
detectedLocation?: {
city?: string;
region?: string;
};
}
/**
@@ -136,226 +171,82 @@ export async function runPuppeteerPreflight(
};
// =========================================================================
// STEP 1: Visit fingerprint.com demo to verify anti-detect and get IP
// STEP 1a: Get IP address directly via simple API (more reliable than scraping)
// =========================================================================
console.log(`[PuppeteerPreflight] Testing anti-detect at ${FINGERPRINT_DEMO_URL}...`);
console.log(`[PuppeteerPreflight] Getting proxy IP address...`);
try {
await page.goto(FINGERPRINT_DEMO_URL, {
waitUntil: 'networkidle2',
timeout: 30000,
});
result.proxyConnected = true; // If we got here, proxy is working
// Wait for fingerprint results to load
await page.waitForSelector('[data-test="visitor-id"]', { timeout: 10000 }).catch(() => {});
// Extract fingerprint data from the page
const fingerprintData = await page.evaluate(() => {
// Try to find the IP address displayed on the page
const ipElement = document.querySelector('[data-test="ip-address"]');
const ip = ipElement?.textContent?.trim() || null;
// Try to find bot detection info
const botElement = document.querySelector('[data-test="bot-detected"]');
const botDetected = botElement?.textContent?.toLowerCase().includes('true') || false;
// Try to find visitor ID (proves fingerprinting worked)
const visitorIdElement = document.querySelector('[data-test="visitor-id"]');
const visitorId = visitorIdElement?.textContent?.trim() || null;
// Alternative: look for common UI patterns if data-test attrs not present
let detectedIp = ip;
if (!detectedIp) {
// Look for IP in any element containing IP-like pattern
const allText = document.body.innerText;
const ipMatch = allText.match(/\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b/);
detectedIp = ipMatch ? ipMatch[1] : null;
const ipApiResponse = await page.evaluate(async () => {
try {
const response = await fetch('https://api.ipify.org?format=json');
const data = await response.json();
return { ip: data.ip, error: null };
} catch (err: any) {
return { ip: null, error: err.message };
}
return {
ip: detectedIp,
botDetected,
visitorId,
pageLoaded: !!document.body,
};
});
if (fingerprintData.ip) {
result.proxyIp = fingerprintData.ip;
console.log(`[PuppeteerPreflight] Detected IP: ${fingerprintData.ip}`);
if (ipApiResponse.ip) {
result.proxyIp = ipApiResponse.ip;
result.proxyConnected = true;
console.log(`[PuppeteerPreflight] Detected proxy IP: ${ipApiResponse.ip}`);
// Verify IP matches expected proxy
if (expectedProxyHost) {
// Check if detected IP contains the proxy host (or is close match)
if (fingerprintData.ip === expectedProxyHost ||
expectedProxyHost.includes(fingerprintData.ip) ||
fingerprintData.ip.includes(expectedProxyHost.split('.').slice(0, 3).join('.'))) {
result.ipVerified = true;
console.log(`[PuppeteerPreflight] IP VERIFIED - matches proxy`);
} else {
console.log(`[PuppeteerPreflight] IP mismatch: expected ${expectedProxyHost}, got ${fingerprintData.ip}`);
// Don't fail - residential proxies often show different egress IPs
// Look up timezone from IP
const geoData = await getTimezoneFromIp(ipApiResponse.ip);
if (geoData) {
result.detectedTimezone = geoData.timezone;
result.detectedLocation = { city: geoData.city, region: geoData.region };
console.log(`[PuppeteerPreflight] IP Geolocation: ${geoData.city}, ${geoData.region} (${geoData.timezone})`);
// Set browser timezone to match proxy location via CDP
try {
const client = await page.target().createCDPSession();
await client.send('Emulation.setTimezoneOverride', { timezoneId: geoData.timezone });
console.log(`[PuppeteerPreflight] Browser timezone set to: ${geoData.timezone}`);
} catch (tzErr: any) {
console.log(`[PuppeteerPreflight] Failed to set browser timezone: ${tzErr.message}`);
}
} else {
console.log(`[PuppeteerPreflight] WARNING: Could not determine timezone from IP - timezone mismatch possible`);
}
}
if (fingerprintData.visitorId) {
console.log(`[PuppeteerPreflight] Fingerprint visitor ID: ${fingerprintData.visitorId}`);
}
result.botDetection = {
detected: fingerprintData.botDetected,
};
if (fingerprintData.botDetected) {
console.log(`[PuppeteerPreflight] WARNING: Bot detection triggered!`);
} else {
console.log(`[PuppeteerPreflight] Anti-detect check: NOT detected as bot`);
result.antidetectReady = true;
}
} catch (fpErr: any) {
// Could mean proxy connection failed
console.log(`[PuppeteerPreflight] Fingerprint.com check failed: ${fpErr.message}`);
if (fpErr.message.includes('net::ERR_PROXY') || fpErr.message.includes('ECONNREFUSED')) {
result.error = `Proxy connection failed: ${fpErr.message}`;
return result;
}
// Try fallback: amiunique.org
console.log(`[PuppeteerPreflight] Trying fallback: ${AMIUNIQUE_URL}...`);
try {
await page.goto(AMIUNIQUE_URL, {
waitUntil: 'networkidle2',
timeout: 30000,
});
result.proxyConnected = true;
// Extract IP from amiunique.org page
const amiData = await page.evaluate(() => {
const allText = document.body.innerText;
const ipMatch = allText.match(/\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b/);
return {
ip: ipMatch ? ipMatch[1] : null,
pageLoaded: !!document.body,
};
});
if (amiData.ip) {
result.proxyIp = amiData.ip;
console.log(`[PuppeteerPreflight] Detected IP via amiunique.org: ${amiData.ip}`);
}
result.antidetectReady = true;
console.log(`[PuppeteerPreflight] amiunique.org fallback succeeded`);
} catch (amiErr: any) {
console.log(`[PuppeteerPreflight] amiunique.org fallback also failed: ${amiErr.message}`);
// Continue with Dutchie test anyway
result.proxyConnected = true;
result.antidetectReady = true;
console.log(`[PuppeteerPreflight] IP lookup failed: ${ipApiResponse.error || 'unknown error'}`);
}
} catch (ipErr: any) {
console.log(`[PuppeteerPreflight] IP API error: ${ipErr.message}`);
}
// =========================================================================
// STEP 2: Test Dutchie API access (the real test)
// STEP 2: Preflight complete - proxy verified via ipify.org
// We skip heavy fingerprint.com/amiunique.org tests - just verify proxy works
// The actual Dutchie test happens at task time.
// =========================================================================
const embedUrl = `https://dutchie.com/embedded-menu/${TEST_CNAME}?menuType=rec`;
console.log(`[PuppeteerPreflight] Establishing session at ${embedUrl}...`);
await page.goto(embedUrl, {
waitUntil: 'networkidle2',
timeout: 30000,
});
// Make GraphQL request from browser context
const graphqlResult = await page.evaluate(
async (platformId: string, hash: string) => {
try {
const variables = {
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: platformId,
pricingType: 'rec',
Status: 'Active', // CRITICAL: Must be 'Active' per CLAUDE.md
types: [],
useCache: true,
isDefaultSort: true,
sortBy: 'popularSortIdx',
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false,
},
page: 0,
perPage: 10, // Just need a few to prove it works
};
const extensions = {
persistedQuery: {
version: 1,
sha256Hash: hash,
},
};
const qs = new URLSearchParams({
operationName: 'FilteredProducts',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const sessionId = 'preflight-' + Date.now();
const response = await fetch(url, {
method: 'GET',
headers: {
Accept: 'application/json',
'content-type': 'application/json',
'x-dutchie-session': sessionId,
'apollographql-client-name': 'Marketplace (production)',
},
credentials: 'include',
});
if (!response.ok) {
return { error: `HTTP ${response.status}`, products: 0 };
}
const json = await response.json();
if (json.errors) {
return { error: JSON.stringify(json.errors).slice(0, 200), products: 0 };
}
const products = json?.data?.filteredProducts?.products || [];
return { error: null, products: products.length };
} catch (err: any) {
return { error: err.message || 'Unknown error', products: 0 };
}
},
TEST_PLATFORM_ID,
FILTERED_PRODUCTS_HASH
);
// If we got an IP from ipify.org, proxy is working
if (result.proxyIp) {
result.proxyConnected = true;
result.antidetectReady = true; // Assume stealth plugin is working
}
result.responseTimeMs = Date.now() - startTime;
if (graphqlResult.error) {
result.error = `GraphQL error: ${graphqlResult.error}`;
console.log(`[PuppeteerPreflight] FAILED - ${result.error}`);
} else if (graphqlResult.products === 0) {
result.error = 'GraphQL returned 0 products';
console.log(`[PuppeteerPreflight] FAILED - No products returned`);
} else {
// If we got here with proxyConnected=true and antidetectReady=true, we're good
if (result.proxyConnected && result.antidetectReady) {
result.passed = true;
result.productsReturned = graphqlResult.products;
console.log(
`[PuppeteerPreflight] PASSED - Got ${graphqlResult.products} products in ${result.responseTimeMs}ms`
`[PuppeteerPreflight] PASSED - Proxy connected, anti-detect ready (${result.responseTimeMs}ms)`
);
if (result.proxyIp) {
console.log(`[PuppeteerPreflight] Browser IP via proxy: ${result.proxyIp}`);
}
} else if (result.proxyConnected) {
// Proxy works but anti-detect check failed - still pass (anti-detect is best-effort)
result.passed = true;
result.antidetectReady = true; // Assume ready since proxy works
console.log(
`[PuppeteerPreflight] PASSED - Proxy connected (anti-detect check skipped, ${result.responseTimeMs}ms)`
);
} else {
result.error = result.error || 'Proxy connection failed';
console.log(`[PuppeteerPreflight] FAILED - ${result.error}`);
}
} catch (err: any) {
result.error = `Browser error: ${err.message || 'Unknown error'}`;

View File

@@ -26,6 +26,12 @@ interface TaskSchedule {
next_run_at: Date | null;
state_code: string | null;
priority: number;
method: 'curl' | 'http' | null;
is_immutable: boolean;
description: string | null;
platform: string | null;
last_task_count: number | null;
last_error: string | null;
}
class TaskScheduler {
@@ -84,24 +90,22 @@ class TaskScheduler {
/**
* Ensure default schedules exist in the database
* Per TASK_WORKFLOW_2024-12-10.md: Creates schedules if they don't exist
*
* NOTE: Per-state product_discovery schedules are created by migration 089.
* This only creates core immutable schedules that should exist regardless.
*/
private async ensureDefaultSchedules(): Promise<void> {
// Per TASK_WORKFLOW_2024-12-10.md: Default schedules for task generation
// NOTE: payload_fetch replaces direct product_refresh - it chains to product_refresh
// Core schedules - all use HTTP transport for browser-based scraping
const defaults = [
{
name: 'payload_fetch_all',
role: 'payload_fetch' as TaskRole,
interval_hours: 4,
priority: 0,
description: 'Fetch payloads from Dutchie API for all crawl-enabled stores every 4 hours. Chains to product_refresh.',
},
{
name: 'store_discovery_dutchie',
role: 'store_discovery' as TaskRole,
interval_hours: 24,
interval_hours: 168, // Weekly
priority: 5,
description: 'Discover new Dutchie stores daily',
description: 'Discover new Dutchie stores weekly (HTTP transport)',
method: 'http',
is_immutable: true,
platform: 'dutchie',
},
{
name: 'analytics_refresh',
@@ -109,16 +113,21 @@ class TaskScheduler {
interval_hours: 6,
priority: 0,
description: 'Refresh analytics materialized views every 6 hours',
method: 'http',
is_immutable: true,
platform: null,
},
];
for (const sched of defaults) {
try {
await pool.query(`
INSERT INTO task_schedules (name, role, interval_hours, priority, description, enabled, next_run_at)
VALUES ($1, $2, $3, $4, $5, true, NOW())
ON CONFLICT (name) DO NOTHING
`, [sched.name, sched.role, sched.interval_hours, sched.priority, sched.description]);
INSERT INTO task_schedules (name, role, interval_hours, priority, description, method, is_immutable, platform, enabled, next_run_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true, NOW())
ON CONFLICT (name) DO UPDATE SET
method = EXCLUDED.method,
is_immutable = EXCLUDED.is_immutable
`, [sched.name, sched.role, sched.interval_hours, sched.priority, sched.description, sched.method, sched.is_immutable, sched.platform]);
} catch (err: any) {
// Table may not exist yet - will be created by migration
if (!err.message.includes('does not exist')) {
@@ -192,16 +201,27 @@ class TaskScheduler {
/**
* Execute a schedule and create tasks
* Per TASK_WORKFLOW_2024-12-10.md: Different logic per role
*
* TRANSPORT MODES:
* - All schedules now use HTTP transport (Puppeteer/browser)
* - Per-state product_discovery schedules process one state at a time
* - Workers must pass HTTP preflight to claim HTTP tasks
*/
private async executeSchedule(schedule: TaskSchedule): Promise<number> {
switch (schedule.role) {
case 'product_discovery':
// Per-state product discovery using HTTP transport
return this.generateProductDiscoveryTasks(schedule);
case 'payload_fetch':
// Per TASK_WORKFLOW_2024-12-10.md: payload_fetch replaces direct product_refresh
return this.generatePayloadFetchTasks(schedule);
// DEPRECATED: Legacy payload_fetch redirects to product_discovery
console.log(`[TaskScheduler] payload_fetch is deprecated, using product_discovery instead`);
return this.generateProductDiscoveryTasks(schedule);
case 'product_refresh':
// Legacy - kept for manual triggers, but scheduled crawls use payload_fetch
return this.generatePayloadFetchTasks(schedule);
// DEPRECATED: Legacy product_refresh redirects to product_discovery
console.log(`[TaskScheduler] product_refresh is deprecated, using product_discovery instead`);
return this.generateProductDiscoveryTasks(schedule);
case 'store_discovery':
return this.generateStoreDiscoveryTasks(schedule);
@@ -216,50 +236,69 @@ class TaskScheduler {
}
/**
* Generate payload_fetch tasks for stores that need crawling
* Per TASK_WORKFLOW_2024-12-10.md: payload_fetch hits API, saves to disk, chains to product_refresh
* Generate product_discovery tasks for stores in a specific state
* Uses HTTP transport (Puppeteer/browser) for all tasks
*
* Per-state scheduling allows:
* - Different crawl frequencies per state (e.g., AZ=4h, MI=6h)
* - Better rate limit management (one state at a time)
* - Easier debugging and monitoring per state
*/
private async generatePayloadFetchTasks(schedule: TaskSchedule): Promise<number> {
// Per TASK_WORKFLOW_2024-12-10.md: Find stores needing refresh
private async generateProductDiscoveryTasks(schedule: TaskSchedule): Promise<number> {
// state_code is required for per-state schedules
if (!schedule.state_code) {
console.warn(`[TaskScheduler] Schedule ${schedule.name} has no state_code, skipping`);
return 0;
}
// Find stores in this state needing refresh
const result = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
-- No pending/running payload_fetch or product_refresh task already
AND s.code = $1
-- No pending/running product_discovery task already
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role IN ('payload_fetch', 'product_refresh')
AND t.role = 'product_discovery'
AND t.status IN ('pending', 'claimed', 'running')
)
-- Never fetched OR last fetch > interval ago
AND (
d.last_fetch_at IS NULL
OR d.last_fetch_at < NOW() - ($1 || ' hours')::interval
OR d.last_fetch_at < NOW() - ($2 || ' hours')::interval
)
${schedule.state_code ? 'AND d.state_id = (SELECT id FROM states WHERE code = $2)' : ''}
`, schedule.state_code ? [schedule.interval_hours, schedule.state_code] : [schedule.interval_hours]);
ORDER BY d.last_fetch_at NULLS FIRST, d.id
`, [schedule.state_code, schedule.interval_hours]);
const dispensaryIds = result.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length === 0) {
console.log(`[TaskScheduler] No stores in ${schedule.state_code} need refresh`);
return 0;
}
// Per TASK_WORKFLOW_2024-12-10.md: Create payload_fetch tasks (they chain to product_refresh)
const tasks = dispensaryIds.map((id: number) => ({
role: 'payload_fetch' as TaskRole,
dispensary_id: id,
priority: schedule.priority,
}));
console.log(`[TaskScheduler] Creating ${dispensaryIds.length} product_discovery tasks for ${schedule.state_code}`);
return taskService.createTasks(tasks);
// Create product_discovery tasks with HTTP transport
// Stagger by 15 seconds to prevent overwhelming proxies
const { created } = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery',
15, // 15 seconds apart
schedule.platform || 'dutchie',
'http' // Force HTTP transport
);
return created;
}
/**
* Generate store_discovery tasks
* Per TASK_WORKFLOW_2024-12-10.md: One task per platform
* Uses HTTP transport (Puppeteer/browser) for browser-based discovery
*/
private async generateStoreDiscoveryTasks(schedule: TaskSchedule): Promise<number> {
// Check if discovery task already pending
@@ -276,8 +315,9 @@ class TaskScheduler {
await taskService.createTask({
role: 'store_discovery',
platform: 'dutchie',
platform: schedule.platform || 'dutchie',
priority: schedule.priority,
method: 'http', // Force HTTP transport for browser-based discovery
});
return 1;
@@ -310,11 +350,39 @@ class TaskScheduler {
/**
* Get all schedules for dashboard display
* Returns schedules with full metadata including immutability flag
*/
async getSchedules(): Promise<TaskSchedule[]> {
try {
const result = await pool.query(`
SELECT * FROM task_schedules ORDER BY name
SELECT
id,
name,
role,
enabled,
interval_hours,
last_run_at,
next_run_at,
state_code,
priority,
method,
COALESCE(is_immutable, false) as is_immutable,
description,
platform,
last_task_count,
last_error,
created_at,
updated_at
FROM task_schedules
ORDER BY
CASE role
WHEN 'store_discovery' THEN 1
WHEN 'product_discovery' THEN 2
WHEN 'analytics_refresh' THEN 3
ELSE 4
END,
state_code NULLS FIRST,
name
`);
return result.rows as TaskSchedule[];
} catch {
@@ -322,8 +390,24 @@ class TaskScheduler {
}
}
/**
* Get a single schedule by ID
*/
async getSchedule(id: number): Promise<TaskSchedule | null> {
try {
const result = await pool.query(`
SELECT * FROM task_schedules WHERE id = $1
`, [id]);
return result.rows[0] as TaskSchedule || null;
} catch {
return null;
}
}
/**
* Update a schedule
* Allows updating: enabled, interval_hours, priority
* Does NOT allow updating: name, role, state_code, is_immutable
*/
async updateSchedule(id: number, updates: Partial<TaskSchedule>): Promise<void> {
const setClauses: string[] = [];
@@ -355,6 +439,33 @@ class TaskScheduler {
`, values);
}
/**
* Delete a schedule (only if not immutable)
* Returns true if deleted, false if immutable
*/
async deleteSchedule(id: number): Promise<{ deleted: boolean; reason?: string }> {
// Check if schedule is immutable
const result = await pool.query(`
SELECT name, is_immutable FROM task_schedules WHERE id = $1
`, [id]);
if (result.rows.length === 0) {
return { deleted: false, reason: 'Schedule not found' };
}
const schedule = result.rows[0];
if (schedule.is_immutable) {
return {
deleted: false,
reason: `Schedule "${schedule.name}" is immutable and cannot be deleted. You can disable it instead.`
};
}
await pool.query(`DELETE FROM task_schedules WHERE id = $1`, [id]);
return { deleted: true };
}
/**
* Trigger a schedule to run immediately
*/
@@ -369,6 +480,46 @@ class TaskScheduler {
return this.executeSchedule(result.rows[0] as TaskSchedule);
}
/**
* Get schedule statistics for dashboard
*/
async getScheduleStats(): Promise<{
total: number;
enabled: number;
byRole: Record<string, number>;
byState: Record<string, number>;
}> {
try {
const result = await pool.query(`
SELECT
COUNT(*)::int as total,
SUM(CASE WHEN enabled THEN 1 ELSE 0 END)::int as enabled_count,
role,
state_code
FROM task_schedules
GROUP BY role, state_code
`);
let total = 0;
let enabled = 0;
const byRole: Record<string, number> = {};
const byState: Record<string, number> = {};
for (const row of result.rows) {
total += row.total;
enabled += row.enabled_count;
byRole[row.role] = (byRole[row.role] || 0) + row.total;
if (row.state_code) {
byState[row.state_code] = (byState[row.state_code] || 0) + row.total;
}
}
return { total, enabled, byRole, byState };
} catch {
return { total: 0, enabled: 0, byRole: {}, byState: {} };
}
}
}
// Per TASK_WORKFLOW_2024-12-10.md: Singleton instance

View File

@@ -2,11 +2,18 @@
* Task Handlers Index
*
* Exports all task handlers for the task worker.
*
* Product Discovery:
* - handleProductDiscoveryCurl: curl/axios based (for curl transport)
* - handleProductDiscoveryHttp: Puppeteer browser-based (for http transport)
*/
export { handleProductDiscovery } from './product-discovery';
export { handleProductDiscovery as handleProductDiscoveryCurl } from './product-discovery-curl';
export { handleProductDiscoveryHttp } from './product-discovery-http';
export { handlePayloadFetch as handlePayloadFetchCurl } from './payload-fetch-curl';
export { handleProductRefresh } from './product-refresh';
export { handleStoreDiscovery } from './store-discovery';
export { handleStoreDiscoveryHttp } from './store-discovery-http';
export { handleEntryPointDiscovery } from './entry-point-discovery';
export { handleAnalyticsRefresh } from './analytics-refresh';
export { handleWhoami } from './whoami';

View File

@@ -13,7 +13,7 @@
*/
import { TaskContext, TaskResult } from '../task-worker';
import { handlePayloadFetch } from './payload-fetch';
import { handlePayloadFetch } from './payload-fetch-curl';
export async function handleProductDiscovery(ctx: TaskContext): Promise<TaskResult> {
const { task } = ctx;

View File

@@ -0,0 +1,363 @@
/**
* Product Discovery HTTP Handler (Browser-based)
*
* Uses Puppeteer + StealthPlugin to fetch products via browser context.
* Based on test-intercept.js pattern from ORGANIC_SCRAPING_GUIDE.md.
*
* This handler:
* 1. Loads dispensary info
* 2. Launches headless browser with proxy (if provided)
* 3. Establishes session by visiting embedded menu
* 4. Fetches ALL products via GraphQL from browser context
* 5. Saves raw payload to filesystem (gzipped)
* 6. Records metadata in raw_crawl_payloads table
* 7. Queues product_refresh task to process the payload
*
* Why browser-based:
* - Works with session-based residential proxies (Evomi)
* - Lower detection risk than curl/axios
* - Real Chrome TLS fingerprint
*/
import { TaskContext, TaskResult } from '../task-worker';
import { saveRawPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
// GraphQL hash for FilteredProducts query - MUST match CLAUDE.md
const FILTERED_PRODUCTS_HASH = 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0';
export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
return { success: false, error: 'No dispensary_id specified for product_discovery task' };
}
let browser: any = null;
try {
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
FROM dispensaries
WHERE id = $1 AND crawl_enabled = true
`, [dispensaryId]);
if (dispResult.rows.length === 0) {
return { success: false, error: `Dispensary ${dispensaryId} not found or not crawl_enabled` };
}
const dispensary = dispResult.rows[0];
const platformId = dispensary.platform_dispensary_id;
if (!platformId) {
return { success: false, error: `Dispensary ${dispensaryId} has no platform_dispensary_id` };
}
// Extract cName from menu_url
const cNameMatch = dispensary.menu_url?.match(/\/(?:embedded-menu|dispensary)\/([^/?]+)/);
const cName = cNameMatch ? cNameMatch[1] : 'dispensary';
console.log(`[ProductDiscoveryHTTP] Starting for ${dispensary.name} (ID: ${dispensaryId})`);
console.log(`[ProductDiscoveryHTTP] Platform ID: ${platformId}, cName: ${cName}`);
await ctx.heartbeat();
// ============================================================
// STEP 2: Setup Puppeteer with proxy
// ============================================================
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
// Get proxy from CrawlRotator if available
let proxyUrl: string | null = null;
if (crawlRotator) {
const currentProxy = crawlRotator.proxy.getCurrent();
if (currentProxy) {
proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy);
console.log(`[ProductDiscoveryHTTP] Using proxy: ${currentProxy.host}:${currentProxy.port}`);
}
}
// Build browser args
const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox'];
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`);
}
browser = await puppeteer.launch({
headless: 'new',
args: browserArgs,
});
const page = await browser.newPage();
// Setup proxy auth if needed
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
if (proxyUrlParsed.username && proxyUrlParsed.password) {
await page.authenticate({
username: decodeURIComponent(proxyUrlParsed.username),
password: decodeURIComponent(proxyUrlParsed.password),
});
}
}
await ctx.heartbeat();
// ============================================================
// STEP 3: Establish session by visiting embedded menu
// ============================================================
const embedUrl = `https://dutchie.com/embedded-menu/${cName}?menuType=rec`;
console.log(`[ProductDiscoveryHTTP] Establishing session at ${embedUrl}...`);
await page.goto(embedUrl, {
waitUntil: 'networkidle2',
timeout: 60000,
});
// ============================================================
// STEP 3b: Detect and dismiss age gate modal
// ============================================================
try {
// Wait a bit for age gate to appear
await page.waitForTimeout(1500);
// Look for common age gate selectors
const ageGateSelectors = [
'button[data-testid="age-gate-submit"]',
'button:has-text("Yes")',
'button:has-text("I am 21")',
'button:has-text("Enter")',
'[class*="age-gate"] button',
'[class*="AgeGate"] button',
'[data-test="age-gate-button"]',
];
for (const selector of ageGateSelectors) {
try {
const button = await page.$(selector);
if (button) {
await button.click();
console.log(`[ProductDiscoveryHTTP] Age gate dismissed via: ${selector}`);
await page.waitForTimeout(1000); // Wait for modal to close
break;
}
} catch {
// Selector not found, try next
}
}
// Also try evaluating in page context for button with specific text
await page.evaluate(() => {
const buttons = Array.from(document.querySelectorAll('button'));
for (const btn of buttons) {
const text = btn.textContent?.toLowerCase() || '';
if (text.includes('yes') || text.includes('enter') || text.includes('21')) {
(btn as HTMLButtonElement).click();
return true;
}
}
return false;
});
} catch (ageGateErr) {
// Age gate might not be present, continue
console.log(`[ProductDiscoveryHTTP] No age gate detected or already dismissed`);
}
console.log(`[ProductDiscoveryHTTP] Session established, fetching products...`);
await ctx.heartbeat();
// ============================================================
// STEP 4: Fetch ALL products via GraphQL from browser context
// ============================================================
const result = await page.evaluate(async (platformId: string, graphqlHash: string) => {
const allProducts: any[] = [];
const logs: string[] = [];
let pageNum = 0;
const perPage = 100;
let totalCount = 0;
const sessionId = 'browser-session-' + Date.now();
try {
while (pageNum < 30) { // Max 30 pages = 3000 products
const variables = {
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: platformId,
pricingType: 'rec',
Status: 'Active', // CRITICAL: Must be 'Active', not null
types: [],
useCache: true,
isDefaultSort: true,
sortBy: 'popularSortIdx',
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false,
},
page: pageNum,
perPage: perPage,
};
const extensions = {
persistedQuery: {
version: 1,
sha256Hash: graphqlHash,
},
};
// Build GET URL like the browser does
const qs = new URLSearchParams({
operationName: 'FilteredProducts',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
'x-dutchie-session': sessionId,
'apollographql-client-name': 'Marketplace (production)',
},
credentials: 'include',
});
logs.push(`Page ${pageNum}: HTTP ${response.status}`);
if (!response.ok) {
const text = await response.text();
logs.push(`HTTP error: ${response.status} - ${text.slice(0, 200)}`);
break;
}
const json = await response.json();
if (json.errors) {
logs.push(`GraphQL error: ${JSON.stringify(json.errors).slice(0, 200)}`);
break;
}
const data = json?.data?.filteredProducts;
if (!data || !data.products) {
logs.push('No products in response');
break;
}
const products = data.products;
allProducts.push(...products);
if (pageNum === 0) {
totalCount = data.queryInfo?.totalCount || 0;
logs.push(`Total reported: ${totalCount}`);
}
logs.push(`Got ${products.length} products (total: ${allProducts.length}/${totalCount})`);
if (allProducts.length >= totalCount || products.length < perPage) {
break;
}
pageNum++;
// Small delay between pages to be polite
await new Promise(r => setTimeout(r, 200));
}
} catch (err: any) {
logs.push(`Error: ${err.message}`);
}
return { products: allProducts, totalCount, logs };
}, platformId, FILTERED_PRODUCTS_HASH);
// Print logs from browser context
result.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
console.log(`[ProductDiscoveryHTTP] Fetched ${result.products.length} products (API reported ${result.totalCount})`);
await browser.close();
browser = null;
if (result.products.length === 0) {
return {
success: false,
error: 'No products returned from GraphQL',
productsProcessed: 0,
};
}
await ctx.heartbeat();
// ============================================================
// STEP 5: Save raw payload to filesystem
// ============================================================
const rawPayload = {
dispensaryId,
platformId,
cName,
fetchedAt: new Date().toISOString(),
productCount: result.products.length,
products: result.products,
};
const payloadResult = await saveRawPayload(
pool,
dispensaryId,
rawPayload,
null, // crawl_run_id - not using crawl_runs in new system
result.products.length
);
console.log(`[ProductDiscoveryHTTP] Saved payload #${payloadResult.id} (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`);
// ============================================================
// STEP 6: Update dispensary last_fetch_at
// ============================================================
await pool.query(`
UPDATE dispensaries
SET last_fetch_at = NOW()
WHERE id = $1
`, [dispensaryId]);
// ============================================================
// STEP 7: Queue product_refresh task to process the payload
// ============================================================
await taskService.createTask({
role: 'product_refresh',
dispensary_id: dispensaryId,
priority: task.priority || 0,
payload: { payload_id: payloadResult.id },
});
console.log(`[ProductDiscoveryHTTP] Queued product_refresh task for payload #${payloadResult.id}`);
return {
success: true,
payloadId: payloadResult.id,
productCount: result.products.length,
sizeBytes: payloadResult.sizeBytes,
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[ProductDiscoveryHTTP] Error for dispensary ${dispensaryId}:`, errorMessage);
return {
success: false,
error: errorMessage,
};
} finally {
if (browser) {
await browser.close().catch(() => {});
}
}
}

View File

@@ -27,6 +27,7 @@ import {
downloadProductImages,
} from '../../hydration/canonical-upsert';
import { loadRawPayloadById, getLatestPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
const normalizer = new DutchieNormalizer();
@@ -86,7 +87,37 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// Load latest payload for this dispensary
const result = await getLatestPayload(pool, dispensaryId);
if (!result) {
return { success: false, error: `No payload found for dispensary ${dispensaryId}` };
// No payload exists - queue upstream task to fetch products
console.log(`[ProductRefresh] No payload found for dispensary ${dispensaryId} - queuing upstream task`);
if (dispensary.platform_dispensary_id) {
// Has platform ID - can go straight to product_discovery
console.log(`[ProductRefresh] Dispensary has platform_dispensary_id - queuing product_discovery (http)`);
await taskService.createTask({
role: 'product_discovery',
dispensary_id: dispensaryId,
priority: task.priority || 0,
method: 'http', // Use browser-based handler for session proxies
});
return {
success: true,
queued: 'product_discovery',
reason: 'No payload exists - queued product_discovery to fetch initial data',
};
} else {
// No platform ID - need entry_point_discovery first
console.log(`[ProductRefresh] Dispensary missing platform_dispensary_id - queuing entry_point_discovery`);
await taskService.createTask({
role: 'entry_point_discovery',
dispensary_id: dispensaryId,
priority: task.priority || 0,
});
return {
success: true,
queued: 'entry_point_discovery',
reason: 'No payload and no platform_dispensary_id - queued entry_point_discovery to resolve ID',
};
}
}
payloadData = result.payload;
payloadId = result.metadata.id;

View File

@@ -0,0 +1,480 @@
/**
* Store Discovery HTTP Handler (Browser-based)
*
* Uses Puppeteer + StealthPlugin to discover stores via browser context.
* Based on product-discovery-http.ts pattern.
*
* This handler:
* 1. Launches headless browser with proxy (if provided)
* 2. Establishes session by visiting Dutchie dispensaries page
* 3. Fetches cities for each state via getAllCitiesByState GraphQL
* 4. Fetches stores for each city via ConsumerDispensaries GraphQL
* 5. Upserts to dutchie_discovery_locations
* 6. Auto-promotes valid locations to dispensaries table
*
* Why browser-based:
* - Works with session-based residential proxies (Evomi)
* - Lower detection risk than curl/axios
* - Real Chrome TLS fingerprint
*/
import { TaskContext, TaskResult } from '../task-worker';
import { upsertLocation } from '../../discovery/location-discovery';
import { promoteDiscoveredLocations } from '../../discovery/promotion';
import { saveDiscoveryPayload } from '../../utils/payload-storage';
// GraphQL hashes - MUST match CLAUDE.md / dutchie/client.ts
const GET_ALL_CITIES_HASH = 'ae547a0466ace5a48f91e55bf6699eacd87e3a42841560f0c0eabed5a0a920e6';
const CONSUMER_DISPENSARIES_HASH = '0a5bfa6ca1d64ae47bcccb7c8077c87147cbc4e6982c17ceec97a2a4948b311b';
interface StateWithCities {
name: string;
country: string;
cities: string[];
}
interface DiscoveredLocation {
id: string;
name: string;
slug: string;
cName?: string;
address?: string;
city?: string;
state?: string;
zip?: string;
latitude?: number;
longitude?: number;
offerPickup?: boolean;
offerDelivery?: boolean;
isRecreational?: boolean;
isMedical?: boolean;
phone?: string;
email?: string;
website?: string;
description?: string;
logoImage?: string;
bannerImage?: string;
chainSlug?: string;
enterpriseId?: string;
retailType?: string;
status?: string;
timezone?: string;
location?: {
ln1?: string;
ln2?: string;
city?: string;
state?: string;
zipcode?: string;
country?: string;
geometry?: { coordinates?: [number, number] };
};
}
export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator } = ctx;
const platform = task.platform || 'dutchie';
let browser: any = null;
try {
console.log(`[StoreDiscoveryHTTP] Starting discovery for platform: ${platform}`);
// ============================================================
// STEP 1: Setup Puppeteer with proxy
// ============================================================
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
// Get proxy from CrawlRotator if available
let proxyUrl: string | null = null;
if (crawlRotator) {
const currentProxy = crawlRotator.proxy.getCurrent();
if (currentProxy) {
proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy);
console.log(`[StoreDiscoveryHTTP] Using proxy: ${currentProxy.host}:${currentProxy.port}`);
}
}
// Build browser args
const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox'];
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`);
}
browser = await puppeteer.launch({
headless: 'new',
args: browserArgs,
});
const page = await browser.newPage();
// Setup proxy auth if needed
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
if (proxyUrlParsed.username && proxyUrlParsed.password) {
await page.authenticate({
username: decodeURIComponent(proxyUrlParsed.username),
password: decodeURIComponent(proxyUrlParsed.password),
});
}
}
await ctx.heartbeat();
// ============================================================
// STEP 2: Establish session by visiting dispensaries page
// ============================================================
const sessionUrl = 'https://dutchie.com/dispensaries';
console.log(`[StoreDiscoveryHTTP] Establishing session at ${sessionUrl}...`);
await page.goto(sessionUrl, {
waitUntil: 'networkidle2',
timeout: 60000,
});
// Handle potential age gate
try {
await page.waitForTimeout(1500);
await page.evaluate(() => {
const buttons = Array.from(document.querySelectorAll('button'));
for (const btn of buttons) {
const text = btn.textContent?.toLowerCase() || '';
if (text.includes('yes') || text.includes('enter') || text.includes('21')) {
(btn as HTMLButtonElement).click();
return true;
}
}
return false;
});
} catch {
// Age gate might not be present
}
console.log(`[StoreDiscoveryHTTP] Session established`);
await ctx.heartbeat();
// ============================================================
// STEP 3: Get states to discover from database
// ============================================================
const statesResult = await pool.query(`
SELECT code FROM states WHERE is_active = true ORDER BY code
`);
const stateCodesToDiscover = statesResult.rows.map((r: { code: string }) => r.code);
if (stateCodesToDiscover.length === 0) {
await browser.close();
return { success: true, storesDiscovered: 0, newStoreIds: [], message: 'No active states to discover' };
}
console.log(`[StoreDiscoveryHTTP] Will discover stores in ${stateCodesToDiscover.length} states`);
// ============================================================
// STEP 4: Fetch cities for each state via GraphQL
// ============================================================
const statesWithCities = await page.evaluate(async (hash: string) => {
const logs: string[] = [];
try {
const extensions = {
persistedQuery: { version: 1, sha256Hash: hash },
};
const qs = new URLSearchParams({
operationName: 'getAllCitiesByState',
variables: JSON.stringify({}),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
},
credentials: 'include',
});
logs.push(`getAllCitiesByState: HTTP ${response.status}`);
if (!response.ok) {
return { states: [], logs };
}
const json = await response.json();
const statesData = json?.data?.statesWithDispensaries || [];
const states: StateWithCities[] = [];
for (const state of statesData) {
if (state && state.name) {
const cities = Array.isArray(state.cities)
? state.cities.filter((c: string | null) => c !== null)
: [];
states.push({
name: state.name,
country: state.country || 'US',
cities,
});
}
}
logs.push(`Found ${states.length} states with cities`);
return { states, logs };
} catch (err: any) {
logs.push(`Error: ${err.message}`);
return { states: [], logs };
}
}, GET_ALL_CITIES_HASH);
statesWithCities.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
if (statesWithCities.states.length === 0) {
await browser.close();
return { success: false, error: 'Failed to fetch states with cities' };
}
await ctx.heartbeat();
// ============================================================
// STEP 5: For each active state, fetch stores for each city
// ============================================================
let totalDiscovered = 0;
let totalUpserted = 0;
const allNewStoreIds: number[] = [];
for (const stateCode of stateCodesToDiscover) {
const stateData = statesWithCities.states.find(
(s: StateWithCities) => s.name.toUpperCase() === stateCode.toUpperCase()
);
if (!stateData || stateData.cities.length === 0) {
console.log(`[StoreDiscoveryHTTP] No cities found for ${stateCode}, skipping`);
continue;
}
console.log(`[StoreDiscoveryHTTP] Discovering ${stateData.cities.length} cities in ${stateCode}...`);
await ctx.heartbeat();
// Accumulate raw store data for this state
const stateRawStores: any[] = [];
const stateCityData: { city: string; stores: any[] }[] = [];
// Fetch stores for each city in this state
for (const city of stateData.cities) {
try {
const cityResult = await page.evaluate(async (
cityName: string,
stateCodeParam: string,
hash: string
) => {
const logs: string[] = [];
const allDispensaries: any[] = [];
let page = 0;
const perPage = 200;
try {
while (page < 5) { // Max 5 pages per city
const variables = {
dispensaryFilter: {
activeOnly: true,
city: cityName,
state: stateCodeParam,
},
page,
perPage,
};
const extensions = {
persistedQuery: { version: 1, sha256Hash: hash },
};
const qs = new URLSearchParams({
operationName: 'ConsumerDispensaries',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
},
credentials: 'include',
});
if (!response.ok) {
logs.push(`${cityName}: HTTP ${response.status}`);
break;
}
const json = await response.json();
const dispensaries = json?.data?.filteredDispensaries || [];
if (dispensaries.length === 0) {
break;
}
// Filter to ensure correct state
const stateFiltered = dispensaries.filter((d: any) =>
d.location?.state?.toUpperCase() === stateCodeParam.toUpperCase()
);
allDispensaries.push(...stateFiltered);
if (dispensaries.length < perPage) {
break;
}
page++;
// Small delay between pages
await new Promise(r => setTimeout(r, 100));
}
logs.push(`${cityName}: ${allDispensaries.length} stores`);
} catch (err: any) {
logs.push(`${cityName}: Error - ${err.message}`);
}
return { dispensaries: allDispensaries, logs };
}, city, stateCode, CONSUMER_DISPENSARIES_HASH);
cityResult.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
// Accumulate raw store data
stateRawStores.push(...cityResult.dispensaries);
stateCityData.push({ city, stores: cityResult.dispensaries });
// Upsert each discovered location
for (const disp of cityResult.dispensaries) {
try {
const location = normalizeLocation(disp);
if (!location.id) {
continue; // Skip locations without platform ID
}
const result = await upsertLocation(pool, location as any, null);
if (result) {
totalUpserted++;
if (result.isNew) {
totalDiscovered++;
}
}
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Upsert error for ${disp.name}:`, err.message);
}
}
// Small delay between cities to avoid rate limiting
await new Promise(r => setTimeout(r, 300));
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Error fetching ${city}, ${stateCode}:`, err.message);
}
}
// Heartbeat after each state
await ctx.heartbeat();
// ============================================================
// STEP 5b: Save raw store payload for this state
// ============================================================
if (stateRawStores.length > 0) {
try {
const rawPayload = {
stateCode,
platform,
fetchedAt: new Date().toISOString(),
storeCount: stateRawStores.length,
citiesProcessed: stateCityData.length,
cities: stateCityData,
stores: stateRawStores,
};
const payloadResult = await saveDiscoveryPayload(pool, stateCode, rawPayload, stateRawStores.length);
console.log(`[StoreDiscoveryHTTP] Saved raw payload for ${stateCode}: ${stateRawStores.length} stores (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`);
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Failed to save payload for ${stateCode}:`, err.message);
}
}
// Auto-promote valid locations for this state
try {
const promotionResult = await promoteDiscoveredLocations(stateCode);
const promoted = promotionResult.created + promotionResult.updated;
if (promoted > 0) {
console.log(`[StoreDiscoveryHTTP] Promoted ${promoted} locations in ${stateCode} (${promotionResult.created} new, ${promotionResult.updated} updated)`);
// newDispensaryIds is returned but not in typed interface
const newIds = (promotionResult as any).newDispensaryIds || [];
allNewStoreIds.push(...newIds);
}
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Promotion error for ${stateCode}:`, err.message);
}
}
await browser.close();
browser = null;
console.log(`[StoreDiscoveryHTTP] Complete: ${totalDiscovered} new, ${totalUpserted} upserted, ${allNewStoreIds.length} promoted`);
return {
success: true,
storesDiscovered: totalDiscovered,
storesUpserted: totalUpserted,
statesProcessed: stateCodesToDiscover.length,
newStoreIds: allNewStoreIds,
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[StoreDiscoveryHTTP] Error:`, errorMessage);
return {
success: false,
error: errorMessage,
newStoreIds: [],
};
} finally {
if (browser) {
await browser.close().catch(() => {});
}
}
}
/**
* Normalize a raw dispensary response to our DiscoveredLocation format
*/
function normalizeLocation(raw: any): DiscoveredLocation {
const loc = raw.location || {};
const coords = loc.geometry?.coordinates || [];
return {
id: raw.id || raw._id || '',
name: raw.name || '',
slug: raw.slug || raw.cName || '',
cName: raw.cName || raw.slug || '',
address: raw.address || loc.ln1 || '',
city: raw.city || loc.city || '',
state: raw.state || loc.state || '',
zip: raw.zip || loc.zipcode || loc.zip || '',
latitude: coords[1] || raw.latitude,
longitude: coords[0] || raw.longitude,
timezone: raw.timezone || '',
offerPickup: raw.offerPickup ?? raw.storeSettings?.offerPickup ?? true,
offerDelivery: raw.offerDelivery ?? raw.storeSettings?.offerDelivery ?? false,
isRecreational: raw.isRecreational ?? raw.recDispensary ?? true,
isMedical: raw.isMedical ?? raw.medicalDispensary ?? true,
phone: raw.phone || '',
email: raw.email || '',
website: raw.embedBackUrl || '',
description: raw.description || '',
logoImage: raw.logoImage || '',
bannerImage: raw.bannerImage || '',
chainSlug: raw.chain || '',
enterpriseId: raw.retailer?.enterpriseId || '',
retailType: raw.retailType || '',
status: raw.status || '',
location: loc,
};
}

View File

@@ -17,7 +17,8 @@ export {
export { TaskWorker, TaskContext, TaskResult } from './task-worker';
export {
handleProductDiscovery,
handleProductDiscoveryCurl,
handleProductDiscoveryHttp,
handleProductRefresh,
handleStoreDiscovery,
handleEntryPointDiscovery,

View File

@@ -6,12 +6,15 @@
* task-service.ts and routes/tasks.ts.
*
* State is in-memory and resets on server restart.
* By default, the pool is PAUSED (closed) - admin must explicitly start it.
* This prevents workers from immediately grabbing tasks on deploy before
* the system is ready.
* By default, the pool is OPEN - workers start claiming tasks immediately.
* Admin can pause via API endpoint if needed.
*
* Note: Each process (backend, worker) has its own copy of this state.
* The /pool/pause and /pool/resume endpoints only affect the backend process.
* Workers always start with pool open.
*/
let taskPoolPaused = true;
let taskPoolPaused = false;
export function isTaskPoolPaused(): boolean {
return taskPoolPaused;

View File

@@ -73,6 +73,7 @@ export interface CreateTaskParams {
dispensary_id?: number;
platform?: string;
priority?: number;
method?: 'curl' | 'http'; // Transport method: curl=axios/proxy, http=Puppeteer/browser
scheduled_for?: Date;
payload?: Record<string, unknown>; // Per TASK_WORKFLOW_2024-12-10.md: For task chaining data
}
@@ -106,14 +107,15 @@ class TaskService {
*/
async createTask(params: CreateTaskParams): Promise<WorkerTask> {
const result = await pool.query(
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for, payload)
VALUES ($1, $2, $3, $4, $5, $6)
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *`,
[
params.role,
params.dispensary_id ?? null,
params.platform ?? null,
params.priority ?? 0,
params.method ?? null, // null = any worker can pick up, 'http' = http-capable workers only, 'curl' = curl workers only
params.scheduled_for ?? null,
params.payload ? JSON.stringify(params.payload) : null,
]
@@ -128,8 +130,8 @@ class TaskService {
if (tasks.length === 0) return 0;
const values = tasks.map((t, i) => {
const base = i * 5;
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5})`;
const base = i * 6;
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6})`;
});
const params = tasks.flatMap((t) => [
@@ -137,11 +139,12 @@ class TaskService {
t.dispensary_id ?? null,
t.platform ?? null,
t.priority ?? 0,
t.method ?? null,
t.scheduled_for ?? null,
]);
const result = await pool.query(
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for)
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for)
VALUES ${values.join(', ')}
ON CONFLICT DO NOTHING`,
params
@@ -473,15 +476,17 @@ class TaskService {
case 'store_discovery': {
// Per TASK_WORKFLOW_2024-12-10.md: New stores discovered -> create product_discovery tasks
// Skip entry_point_discovery since platform_dispensary_id is set during promotion
// All product_discovery tasks use HTTP transport (Puppeteer/browser)
const newStoreIds = (completedTask.result as { newStoreIds?: number[] })?.newStoreIds;
if (newStoreIds && newStoreIds.length > 0) {
console.log(`[TaskService] Chaining ${newStoreIds.length} product_discovery tasks for new stores`);
console.log(`[TaskService] Chaining ${newStoreIds.length} product_discovery tasks for new stores (HTTP transport)`);
for (const storeId of newStoreIds) {
await this.createTask({
role: 'product_discovery',
dispensary_id: storeId,
platform: completedTask.platform ?? undefined,
priority: 10, // High priority for new stores
method: 'http', // Force HTTP transport for browser-based scraping
});
}
}
@@ -498,6 +503,7 @@ class TaskService {
dispensary_id: completedTask.dispensary_id,
platform: completedTask.platform ?? undefined,
priority: 10,
method: 'http', // Force HTTP transport
});
}
break;
@@ -522,6 +528,7 @@ class TaskService {
/**
* Create store discovery task for a platform/state
* Uses HTTP transport (Puppeteer/browser) by default
*/
async createStoreDiscoveryTask(
platform: string,
@@ -532,11 +539,13 @@ class TaskService {
role: 'store_discovery',
platform,
priority,
method: 'http', // Force HTTP transport
});
}
/**
* Create entry point discovery task for a specific store
* @deprecated Entry point resolution now happens during store promotion
*/
async createEntryPointTask(
dispensaryId: number,
@@ -548,11 +557,13 @@ class TaskService {
dispensary_id: dispensaryId,
platform,
priority,
method: 'http', // Force HTTP transport
});
}
/**
* Create product discovery task for a specific store
* Uses HTTP transport (Puppeteer/browser) by default
*/
async createProductDiscoveryTask(
dispensaryId: number,
@@ -564,6 +575,7 @@ class TaskService {
dispensary_id: dispensaryId,
platform,
priority,
method: 'http', // Force HTTP transport
});
}
@@ -641,6 +653,248 @@ class TaskService {
return (result.rows[0] as { completed_at: Date | null })?.completed_at ?? null;
}
/**
* Create multiple tasks with staggered start times.
*
* STAGGERED TASK WORKFLOW:
* =======================
* This prevents resource contention and proxy assignment lag when creating
* many tasks at once. Each task gets a scheduled_for timestamp offset from
* the previous task.
*
* Workflow:
* 1. Task is created with scheduled_for = NOW() + (index * staggerSeconds)
* 2. Worker claims task only when scheduled_for <= NOW()
* 3. Worker runs preflight check on EVERY task claim
* 4. If preflight passes, worker executes task
* 5. If preflight fails, task is released back to pending for another worker
* 6. Worker finishes task, polls for next available task
* 7. Repeat - preflight runs again on next task claim
*
* Benefits:
* - Prevents all 8 workers from hitting proxies simultaneously
* - Reduces API rate limiting / 403 errors
* - Spreads resource usage over time
* - Each task still runs preflight, ensuring proxy health
*
* @param dispensaryIds - Array of dispensary IDs to create tasks for
* @param role - Task role (e.g., 'product_refresh', 'product_discovery')
* @param staggerSeconds - Seconds between each task's scheduled_for time (default: 15)
* @param platform - Platform identifier (default: 'dutchie')
* @param method - Transport method: 'curl' or 'http' (default: null for any)
* @returns Number of tasks created
*/
async createStaggeredTasks(
dispensaryIds: number[],
role: TaskRole,
staggerSeconds: number = 15,
platform: string = 'dutchie',
method: 'curl' | 'http' | null = null
): Promise<{ created: number; taskIds: number[] }> {
if (dispensaryIds.length === 0) {
return { created: 0, taskIds: [] };
}
// Use a single INSERT with generate_series for efficiency
const result = await pool.query(`
WITH task_data AS (
SELECT
unnest($1::int[]) as dispensary_id,
generate_series(0, array_length($1::int[], 1) - 1) as idx
)
INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status)
SELECT
$2::varchar as role,
td.dispensary_id,
$3::varchar as platform,
$4::varchar as method,
NOW() + (td.idx * $5::int * INTERVAL '1 second') as scheduled_for,
'pending' as status
FROM task_data td
ON CONFLICT DO NOTHING
RETURNING id
`, [dispensaryIds, role, platform, method, staggerSeconds]);
const taskIds = result.rows.map((r: { id: number }) => r.id);
console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks (${staggerSeconds}s apart)`);
return { created: taskIds.length, taskIds };
}
/**
* Create a batch of AZ store tasks with automatic distribution.
*
* This is a convenience method for creating tasks for Arizona stores with:
* - Automatic staggering to prevent resource contention
* - Even distribution across both refresh and discovery roles
*
* @param totalTasks - Total number of tasks to create
* @param staggerSeconds - Seconds between each task's start time
* @param splitRoles - If true, split between product_refresh and product_discovery
* @returns Summary of created tasks
*/
async createAZStoreTasks(
totalTasks: number = 24,
staggerSeconds: number = 15,
splitRoles: boolean = true
): Promise<{
total: number;
product_refresh: number;
product_discovery: number;
taskIds: number[];
}> {
// Get AZ stores with platform_id and menu_url
const storesResult = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE s.code = 'AZ'
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND d.menu_url IS NOT NULL
ORDER BY d.id
`);
const storeIds = storesResult.rows.map((r: { id: number }) => r.id);
if (storeIds.length === 0) {
console.log('[TaskService] No AZ stores found with platform_id and menu_url');
return { total: 0, product_refresh: 0, product_discovery: 0, taskIds: [] };
}
// Limit tasks to available stores
const maxTasks = Math.min(totalTasks, storeIds.length * 2); // 2x for both roles
const allTaskIds: number[] = [];
if (splitRoles) {
// Split between refresh and discovery
const tasksPerRole = Math.floor(maxTasks / 2);
const refreshStores = storeIds.slice(0, tasksPerRole);
const discoveryStores = storeIds.slice(0, tasksPerRole);
// Create refresh tasks first
const refreshResult = await this.createStaggeredTasks(
refreshStores,
'product_refresh',
staggerSeconds,
'dutchie'
);
allTaskIds.push(...refreshResult.taskIds);
// Create discovery tasks starting after refresh tasks are scheduled
const discoveryStartOffset = tasksPerRole * staggerSeconds;
const discoveryResult = await pool.query(`
WITH task_data AS (
SELECT
unnest($1::int[]) as dispensary_id,
generate_series(0, array_length($1::int[], 1) - 1) as idx
)
INSERT INTO worker_tasks (role, dispensary_id, platform, scheduled_for, status)
SELECT
'product_discovery'::varchar as role,
td.dispensary_id,
'dutchie'::varchar as platform,
NOW() + ($2::int * INTERVAL '1 second') + (td.idx * $3::int * INTERVAL '1 second') as scheduled_for,
'pending' as status
FROM task_data td
ON CONFLICT DO NOTHING
RETURNING id
`, [discoveryStores, discoveryStartOffset, staggerSeconds]);
allTaskIds.push(...discoveryResult.rows.map((r: { id: number }) => r.id));
return {
total: allTaskIds.length,
product_refresh: refreshResult.taskIds.length,
product_discovery: discoveryResult.rowCount ?? 0,
taskIds: allTaskIds
};
}
// Single role mode - all product_discovery
const result = await this.createStaggeredTasks(
storeIds.slice(0, totalTasks),
'product_discovery',
staggerSeconds,
'dutchie'
);
return {
total: result.taskIds.length,
product_refresh: 0,
product_discovery: result.taskIds.length,
taskIds: result.taskIds
};
}
/**
* Cleanup stale tasks that are stuck in 'claimed' or 'running' status.
*
* This handles the case where workers crash/restart and leave tasks in-flight.
* These stale tasks block the queue because the claim query excludes dispensary_ids
* that have active tasks.
*
* Called automatically on worker startup and can be called periodically.
*
* @param staleMinutes - Tasks older than this (based on last_heartbeat_at or claimed_at) are reset
* @returns Object with cleanup stats
*/
async cleanupStaleTasks(staleMinutes: number = 30): Promise<{
cleaned: number;
byStatus: { claimed: number; running: number };
byRole: Record<string, number>;
}> {
// First, get stats on what we're about to clean
const statsResult = await pool.query(`
SELECT status, role, COUNT(*)::int as count
FROM worker_tasks
WHERE status IN ('claimed', 'running')
AND COALESCE(last_heartbeat_at, claimed_at, created_at) < NOW() - INTERVAL '1 minute' * $1
GROUP BY status, role
`, [staleMinutes]);
const byStatus = { claimed: 0, running: 0 };
const byRole: Record<string, number> = {};
for (const row of statsResult.rows) {
const { status, role, count } = row as { status: string; role: string; count: number };
if (status === 'claimed') byStatus.claimed += count;
if (status === 'running') byStatus.running += count;
byRole[role] = (byRole[role] || 0) + count;
}
const totalStale = byStatus.claimed + byStatus.running;
if (totalStale === 0) {
return { cleaned: 0, byStatus, byRole };
}
// Reset stale tasks to pending
const result = await pool.query(`
UPDATE worker_tasks
SET
status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
last_heartbeat_at = NULL,
error_message = CONCAT(COALESCE(error_message, ''), ' [Auto-reset: stale after ', $1, ' min]'),
updated_at = NOW()
WHERE status IN ('claimed', 'running')
AND COALESCE(last_heartbeat_at, claimed_at, created_at) < NOW() - INTERVAL '1 minute' * $1
`, [staleMinutes]);
const cleaned = result.rowCount ?? 0;
if (cleaned > 0) {
console.log(`[TaskService] Cleaned up ${cleaned} stale tasks (claimed: ${byStatus.claimed}, running: ${byStatus.running})`);
console.log(`[TaskService] Stale tasks by role: ${Object.entries(byRole).map(([r, c]) => `${r}:${c}`).join(', ')}`);
}
return { cleaned, byStatus, byRole };
}
/**
* Calculate workers needed to complete tasks within SLA
*/

View File

@@ -11,10 +11,17 @@
* - Workers report heartbeats to worker_registry
* - Workers are ROLE-AGNOSTIC by default (can handle any task type)
*
* Stealth & Anti-Detection:
* PROXIES ARE REQUIRED - workers will fail to start if no proxies available.
* Stealth & Anti-Detection (LAZY INITIALIZATION):
* Workers start IMMEDIATELY without waiting for proxies.
* Stealth systems (proxies, fingerprints, preflights) are initialized
* on first task claim, not at worker startup.
*
* On startup, workers initialize the CrawlRotator which provides:
* This allows workers to:
* - Register and send heartbeats immediately
* - Wait in main loop without blocking on proxy availability
* - Initialize proxies/preflights only when tasks are actually available
*
* On first task claim attempt, workers initialize the CrawlRotator which provides:
* - Proxy rotation: Loads proxies from `proxies` table, ALL requests use proxy
* - User-Agent rotation: Cycles through realistic browser fingerprints
* - Fingerprint rotation: Changes browser profile on blocks
@@ -34,11 +41,16 @@
*
* Environment:
* WORKER_ROLE - Which task role to process (optional, null = any task)
* WORKER_ID - Optional custom worker ID (auto-generated if not provided)
* POD_NAME - Kubernetes pod name (optional)
* POD_NAME - K8s StatefulSet pod name (PRIMARY - use this for persistent identity)
* WORKER_ID - Custom worker ID (fallback if POD_NAME not set)
* POLL_INTERVAL_MS - How often to check for tasks (default: 5000)
* HEARTBEAT_INTERVAL_MS - How often to update heartbeat (default: 30000)
* API_BASE_URL - Backend API URL for registration (default: http://localhost:3010)
*
* Worker Identity:
* Workers use POD_NAME as their worker_id for persistent identity across restarts.
* In K8s StatefulSet, POD_NAME = "scraper-worker-0" through "scraper-worker-7".
* This ensures workers re-register with the same ID instead of creating new entries.
*/
import { Pool } from 'pg';
@@ -57,10 +69,13 @@ import { runPuppeteerPreflightWithRetry, PuppeteerPreflightResult } from '../ser
// Task handlers by role
// Per TASK_WORKFLOW_2024-12-10.md: payload_fetch and product_refresh are now separate
import { handlePayloadFetch } from './handlers/payload-fetch';
// Dual-transport: curl vs http (browser-based) handlers
import { handlePayloadFetch } from './handlers/payload-fetch-curl';
import { handleProductRefresh } from './handlers/product-refresh';
import { handleProductDiscovery } from './handlers/product-discovery';
import { handleProductDiscovery } from './handlers/product-discovery-curl';
import { handleProductDiscoveryHttp } from './handlers/product-discovery-http';
import { handleStoreDiscovery } from './handlers/store-discovery';
import { handleStoreDiscoveryHttp } from './handlers/store-discovery-http';
import { handleEntryPointDiscovery } from './handlers/entry-point-discovery';
import { handleAnalyticsRefresh } from './handlers/analytics-refresh';
import { handleWhoami } from './handlers/whoami';
@@ -83,7 +98,7 @@ const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010';
// Maximum number of tasks this worker will run concurrently
// Tune based on workload: I/O-bound tasks benefit from higher concurrency
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '15');
// When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
// Default 85% - gives headroom before OOM
@@ -132,17 +147,47 @@ type TaskHandler = (ctx: TaskContext) => Promise<TaskResult>;
// Per TASK_WORKFLOW_2024-12-10.md: Handler registry
// payload_fetch: Fetches from Dutchie API, saves to disk
// product_refresh: Reads local payload, normalizes, upserts to DB
// product_discovery: Main handler for product crawling
// product_discovery: Main handler for product crawling (has curl and http variants)
const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
payload_fetch: handlePayloadFetch, // API fetch -> disk
payload_fetch: handlePayloadFetch, // API fetch -> disk (curl)
product_refresh: handleProductRefresh, // disk -> DB
product_discovery: handleProductDiscovery,
product_discovery: handleProductDiscovery, // Default: curl (see getHandlerForTask for http override)
store_discovery: handleStoreDiscovery,
entry_point_discovery: handleEntryPointDiscovery,
analytics_refresh: handleAnalyticsRefresh,
whoami: handleWhoami, // Tests proxy + anti-detect
};
/**
* Get the appropriate handler for a task, considering both role and method.
*
* Dual-transport handlers:
* - product_discovery: curl (axios) or http (Puppeteer)
* - store_discovery: curl (axios) or http (Puppeteer)
*
* Default method is 'http' since all GraphQL queries should use browser transport
* for better TLS fingerprinting and session-based proxy compatibility.
*/
function getHandlerForTask(task: WorkerTask): TaskHandler | undefined {
const role = task.role as TaskRole;
const method = task.method || 'http'; // Default to HTTP for all GraphQL tasks
// product_discovery: dual-transport support
if (role === 'product_discovery' && method === 'http') {
console.log(`[TaskWorker] Using HTTP handler for product_discovery (method=${method})`);
return handleProductDiscoveryHttp;
}
// store_discovery: dual-transport support
if (role === 'store_discovery' && method === 'http') {
console.log(`[TaskWorker] Using HTTP handler for store_discovery (method=${method})`);
return handleStoreDiscoveryHttp;
}
// Default: use the static handler registry (curl-based)
return TASK_HANDLERS[role];
}
/**
* Resource usage stats reported to the registry and used for backoff decisions.
* These values are included in worker heartbeats and displayed in the UI.
@@ -209,6 +254,16 @@ export class TaskWorker {
private preflightCurlResult: CurlPreflightResult | null = null;
private preflightHttpResult: PuppeteerPreflightResult | null = null;
// ==========================================================================
// LAZY INITIALIZATION FLAGS
// ==========================================================================
// Stealth/proxy initialization is deferred until first task claim.
// Workers register immediately and enter main loop without blocking.
// ==========================================================================
private stealthInitialized: boolean = false;
private preflightsCompleted: boolean = false;
private initializingPromise: Promise<void> | null = null;
constructor(role: TaskRole | null = null, workerId?: string) {
this.pool = getPool();
this.role = role;
@@ -293,9 +348,9 @@ export class TaskWorker {
/**
* Initialize stealth systems (proxy rotation, fingerprints)
* Called once on worker startup before processing any tasks.
* Called LAZILY on first task claim attempt (NOT at worker startup).
*
* IMPORTANT: Proxies are REQUIRED. Workers will wait until proxies are available.
* IMPORTANT: Proxies are REQUIRED to claim tasks. This method waits until proxies are available.
* Workers listen for PostgreSQL NOTIFY 'proxy_added' to wake up immediately when proxies are added.
*/
private async initializeStealth(): Promise<void> {
@@ -435,35 +490,98 @@ export class TaskWorker {
/**
* Report preflight status to worker_registry
* Function signature: update_worker_preflight(worker_id, transport, status, ip, response_ms, error, fingerprint)
*/
private async reportPreflightStatus(): Promise<void> {
try {
// Update worker_registry directly via SQL (more reliable than API)
// CURL preflight - includes IP address
await this.pool.query(`
SELECT update_worker_preflight($1, 'curl', $2, $3, $4)
SELECT update_worker_preflight($1, 'curl', $2, $3, $4, $5, $6)
`, [
this.workerId,
this.preflightCurlPassed ? 'passed' : 'failed',
this.preflightCurlResult?.proxyIp || null,
this.preflightCurlResult?.responseTimeMs || null,
this.preflightCurlResult?.error || null,
null, // No fingerprint for curl
]);
// HTTP preflight - includes IP, fingerprint, and timezone data
const httpFingerprint = this.preflightHttpResult ? {
...this.preflightHttpResult.fingerprint,
detectedTimezone: (this.preflightHttpResult as any).detectedTimezone,
detectedLocation: (this.preflightHttpResult as any).detectedLocation,
productsReturned: this.preflightHttpResult.productsReturned,
botDetection: (this.preflightHttpResult as any).botDetection,
} : null;
await this.pool.query(`
SELECT update_worker_preflight($1, 'http', $2, $3, $4)
SELECT update_worker_preflight($1, 'http', $2, $3, $4, $5, $6)
`, [
this.workerId,
this.preflightHttpPassed ? 'passed' : 'failed',
this.preflightHttpResult?.proxyIp || null,
this.preflightHttpResult?.responseTimeMs || null,
this.preflightHttpResult?.error || null,
httpFingerprint ? JSON.stringify(httpFingerprint) : null,
]);
console.log(`[TaskWorker] Preflight status reported to worker_registry`);
if (this.preflightHttpResult?.proxyIp) {
console.log(`[TaskWorker] HTTP IP: ${this.preflightHttpResult.proxyIp}, Timezone: ${(this.preflightHttpResult as any).detectedTimezone || 'unknown'}`);
}
} catch (err: any) {
// Non-fatal - worker can still function
console.warn(`[TaskWorker] Could not report preflight status: ${err.message}`);
}
}
/**
* Lazy initialization of stealth systems.
* Called BEFORE claiming first task (not at worker startup).
* This allows workers to register and enter main loop immediately.
*
* Returns true if initialization succeeded, false otherwise.
*/
private async ensureStealthInitialized(): Promise<boolean> {
// Already initialized
if (this.stealthInitialized && this.preflightsCompleted) {
return true;
}
// Already initializing (prevent concurrent init attempts)
if (this.initializingPromise) {
await this.initializingPromise;
return this.stealthInitialized && this.preflightsCompleted;
}
console.log(`[TaskWorker] ${this.friendlyName} lazy-initializing stealth systems (first task claim)...`);
this.initializingPromise = (async () => {
try {
// Initialize proxy/fingerprint rotation
await this.initializeStealth();
this.stealthInitialized = true;
// Run dual-transport preflights
await this.runDualPreflights();
this.preflightsCompleted = true;
const preflightMsg = `curl=${this.preflightCurlPassed ? '✓' : '✗'} http=${this.preflightHttpPassed ? '✓' : '✗'}`;
console.log(`[TaskWorker] ${this.friendlyName} stealth ready (${preflightMsg})`);
} catch (err: any) {
console.error(`[TaskWorker] ${this.friendlyName} stealth init failed: ${err.message}`);
this.stealthInitialized = false;
this.preflightsCompleted = false;
}
})();
await this.initializingPromise;
this.initializingPromise = null;
return this.stealthInitialized && this.preflightsCompleted;
}
/**
* Register worker with the registry (get friendly name)
*/
@@ -597,25 +715,36 @@ export class TaskWorker {
/**
* Start the worker loop
*
* Workers start IMMEDIATELY without blocking on proxy/preflight init.
* Stealth systems are lazy-initialized on first task claim.
* This allows workers to register and send heartbeats even when proxies aren't ready.
*/
async start(): Promise<void> {
this.isRunning = true;
// Initialize stealth systems (proxy rotation, fingerprints)
await this.initializeStealth();
// Register with the API to get a friendly name
// Register with the API to get a friendly name (non-blocking)
await this.register();
// Run dual-transport preflights
await this.runDualPreflights();
// Start registry heartbeat
// Start registry heartbeat immediately
this.startRegistryHeartbeat();
// Cleanup stale tasks on startup (only worker-0 does this to avoid races)
// This handles tasks left in 'claimed'/'running' status when workers restart
if (this.workerId.endsWith('-0') || this.workerId === 'scraper-worker-0') {
try {
console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`);
const cleanupResult = await taskService.cleanupStaleTasks(30); // 30 minute threshold
if (cleanupResult.cleaned > 0) {
console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`);
}
} catch (err: any) {
console.error(`[TaskWorker] Stale task cleanup error:`, err.message);
}
}
const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)';
const preflightMsg = `curl=${this.preflightCurlPassed ? '✓' : '✗'} http=${this.preflightHttpPassed ? '✓' : '✗'}`;
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (${preflightMsg}, max ${this.maxConcurrentTasks} concurrent tasks)`);
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (stealth=lazy, max ${this.maxConcurrentTasks} concurrent tasks)`);
while (this.isRunning) {
try {
@@ -658,6 +787,12 @@ export class TaskWorker {
this.backoffReason = null;
}
// Periodically reload proxies to pick up changes (new proxies, disabled proxies)
// This runs every ~60 seconds (controlled by setProxyReloadInterval)
if (this.stealthInitialized) {
await this.crawlRotator.reloadIfStale();
}
// Check for decommission signal
const shouldDecommission = await this.checkDecommission();
if (shouldDecommission) {
@@ -669,6 +804,20 @@ export class TaskWorker {
// Try to claim more tasks if we have capacity
if (this.canAcceptMoreTasks()) {
// =================================================================
// LAZY INITIALIZATION - Initialize stealth on first task claim
// Workers start immediately and init proxies only when needed
// =================================================================
if (!this.stealthInitialized) {
const initSuccess = await this.ensureStealthInitialized();
if (!initSuccess) {
// Init failed - wait and retry next loop
console.log(`[TaskWorker] ${this.friendlyName} stealth init failed, waiting before retry...`);
await this.sleep(30000);
return;
}
}
// Pass preflight capabilities to only claim compatible tasks
const task = await taskService.claimTask(
this.role,
@@ -681,13 +830,32 @@ export class TaskWorker {
console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`);
// =================================================================
// PREFLIGHT CHECK - CRITICAL: Worker MUST pass before task execution
// Verifies: 1) Proxy available 2) Proxy connected 3) Anti-detect ready
// PREFLIGHT CHECK - Use stored preflight results based on task method
// We already ran dual-transport preflights at startup, so just verify
// the correct preflight passed for this task's required method.
// =================================================================
const preflight = await this.crawlRotator.preflight();
if (!preflight.passed) {
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${preflight.error}`);
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without proxy/anti-detect`);
const taskMethod = task.method || 'http'; // Default to http if not specified
let preflightPassed = false;
let preflightMsg = '';
if (taskMethod === 'http' && this.preflightHttpPassed) {
preflightPassed = true;
preflightMsg = `HTTP preflight passed (IP: ${this.preflightHttpResult?.proxyIp || 'unknown'})`;
} else if (taskMethod === 'curl' && this.preflightCurlPassed) {
preflightPassed = true;
preflightMsg = `CURL preflight passed (IP: ${this.preflightCurlResult?.proxyIp || 'unknown'})`;
} else if (!task.method && (this.preflightHttpPassed || this.preflightCurlPassed)) {
// No method preference - either transport works
preflightPassed = true;
preflightMsg = this.preflightHttpPassed ? 'HTTP preflight passed' : 'CURL preflight passed';
}
if (!preflightPassed) {
const errorMsg = taskMethod === 'http'
? 'HTTP preflight not passed - cannot execute http tasks'
: 'CURL preflight not passed - cannot execute curl tasks';
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${errorMsg}`);
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without preflight`);
// Release task back to pending so another worker can pick it up
await taskService.releaseTask(task.id);
@@ -697,7 +865,7 @@ export class TaskWorker {
return;
}
console.log(`[TaskWorker] ${this.friendlyName} preflight PASSED for task ${task.id} (proxy: ${preflight.proxyIp}, ${preflight.responseTimeMs}ms)`);
console.log(`[TaskWorker] ${this.friendlyName} preflight verified for task ${task.id}: ${preflightMsg}`);
this.activeTasks.set(task.id, task);
@@ -741,8 +909,8 @@ export class TaskWorker {
// Mark as running
await taskService.startTask(task.id);
// Get handler for this role
const handler = TASK_HANDLERS[task.role];
// Get handler for this role (considers method for dual-transport)
const handler = getHandlerForTask(task);
if (!handler) {
throw new Error(`No handler registered for role: ${task.role}`);
}
@@ -904,7 +1072,10 @@ async function main(): Promise<void> {
process.exit(1);
}
const workerId = process.env.WORKER_ID;
// Use POD_NAME for persistent identity in K8s StatefulSet
// This ensures workers keep the same ID across restarts
// Falls back to WORKER_ID, then generates UUID if neither is set
const workerId = process.env.POD_NAME || process.env.WORKER_ID;
// Pass null for role-agnostic, or the specific role
const worker = new TaskWorker(role || null, workerId);

View File

@@ -366,6 +366,141 @@ export async function listPayloadMetadata(
}));
}
/**
* Result from saving a discovery payload
*/
export interface SaveDiscoveryPayloadResult {
id: number;
storagePath: string;
sizeBytes: number;
sizeBytesRaw: number;
checksum: string;
}
/**
* Generate storage path for a discovery payload
*
* Format: /storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
*/
function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): string {
const year = timestamp.getFullYear();
const month = String(timestamp.getMonth() + 1).padStart(2, '0');
const day = String(timestamp.getDate()).padStart(2, '0');
const ts = timestamp.getTime();
return path.join(
PAYLOAD_BASE_PATH,
'discovery',
String(year),
month,
day,
`state_${stateCode.toLowerCase()}_${ts}.json.gz`
);
}
/**
* Save a raw store discovery payload to filesystem and record metadata in DB
*
* @param pool - Database connection pool
* @param stateCode - State code (e.g., 'AZ', 'MI')
* @param payload - Raw JSON payload from discovery GraphQL
* @param storeCount - Number of stores in payload
* @returns SaveDiscoveryPayloadResult with file info and DB record ID
*/
export async function saveDiscoveryPayload(
pool: Pool,
stateCode: string,
payload: any,
storeCount: number = 0
): Promise<SaveDiscoveryPayloadResult> {
const timestamp = new Date();
const storagePath = generateDiscoveryStoragePath(stateCode, timestamp);
// Serialize and compress
const jsonStr = JSON.stringify(payload);
const rawSize = Buffer.byteLength(jsonStr, 'utf8');
const compressed = await gzip(Buffer.from(jsonStr, 'utf8'));
const compressedSize = compressed.length;
const checksum = calculateChecksum(compressed);
// Write to filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
// Record metadata in DB
const result = await pool.query(`
INSERT INTO raw_crawl_payloads (
payload_type,
state_code,
storage_path,
store_count,
size_bytes,
size_bytes_raw,
fetched_at,
checksum_sha256
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id
`, [
'store_discovery',
stateCode.toUpperCase(),
storagePath,
storeCount,
compressedSize,
rawSize,
timestamp,
checksum
]);
console.log(`[PayloadStorage] Saved discovery payload for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`);
return {
id: result.rows[0].id,
storagePath,
sizeBytes: compressedSize,
sizeBytesRaw: rawSize,
checksum
};
}
/**
* Get the latest discovery payload for a state
*
* @param pool - Database connection pool
* @param stateCode - State code (e.g., 'AZ', 'MI')
* @returns Parsed payload and metadata, or null if none exists
*/
export async function getLatestDiscoveryPayload(
pool: Pool,
stateCode: string
): Promise<{ payload: any; metadata: any } | null> {
const result = await pool.query(`
SELECT id, state_code, storage_path, store_count, fetched_at
FROM raw_crawl_payloads
WHERE payload_type = 'store_discovery'
AND state_code = $1
ORDER BY fetched_at DESC
LIMIT 1
`, [stateCode.toUpperCase()]);
if (result.rows.length === 0) {
return null;
}
const row = result.rows[0];
const payload = await loadPayloadFromPath(row.storage_path);
return {
payload,
metadata: {
id: row.id,
stateCode: row.state_code,
storeCount: row.store_count,
fetchedAt: row.fetched_at,
storagePath: row.storage_path
}
};
}
/**
* Delete old payloads (for retention policy)
*

View File

@@ -2666,13 +2666,25 @@ class ApiClient {
// Dashboard methods
getMarketDashboard = this.getMarketsDashboard.bind(this);
// Schedule methods (no conflicts)
// ============================================================
// LEGACY SCHEDULE METHODS (DEPRECATED 2025-12-12)
// These use /api/markets/admin/schedules which queries job_schedules
// Use getTaskSchedules(), updateTaskSchedule(), etc. instead
// (defined below, use /api/tasks/schedules which queries task_schedules)
// ============================================================
/** @deprecated Use getTaskSchedules() - queries task_schedules table */
getSchedules = this.getCrawlSchedules.bind(this);
/** @deprecated Use getTaskSchedule() - queries task_schedules table */
getSchedule = this.getDutchieAZSchedule.bind(this);
/** @deprecated Use createTaskSchedule() - queries task_schedules table */
createSchedule = this.createDutchieAZSchedule.bind(this);
/** @deprecated Use updateTaskSchedule() - queries task_schedules table */
updateSchedule = this.updateDutchieAZSchedule.bind(this);
/** @deprecated Use deleteTaskSchedule() - queries task_schedules table */
deleteSchedule = this.deleteDutchieAZSchedule.bind(this);
/** @deprecated Use runTaskScheduleNow() - queries task_schedules table */
triggerSchedule = this.triggerDutchieAZSchedule.bind(this);
/** @deprecated - job_schedules init not needed for task_schedules */
initSchedules = this.initDutchieAZSchedules.bind(this);
getScheduleLogs = this.getCrawlScheduleLogs.bind(this);
getRunLogs = this.getDutchieAZRunLogs.bind(this);
@@ -2976,6 +2988,101 @@ class ApiClient {
{ method: 'POST', body: JSON.stringify({ replicas }) }
);
}
// ==========================================
// Task Schedules API (recurring task definitions)
// ==========================================
async getTaskSchedules(enabledOnly?: boolean) {
const qs = enabledOnly ? '?enabled=true' : '';
return this.request<{ schedules: TaskSchedule[] }>(`/api/tasks/schedules${qs}`);
}
async getTaskSchedule(id: number) {
return this.request<TaskSchedule>(`/api/tasks/schedules/${id}`);
}
async createTaskSchedule(data: {
name: string;
role: string;
description?: string;
enabled?: boolean;
interval_hours: number;
priority?: number;
state_code?: string;
platform?: string;
}) {
return this.request<TaskSchedule>('/api/tasks/schedules', {
method: 'POST',
body: JSON.stringify(data),
});
}
async updateTaskSchedule(id: number, data: Partial<{
name: string;
role: string;
description: string;
enabled: boolean;
interval_hours: number;
priority: number;
state_code: string;
platform: string;
}>) {
return this.request<TaskSchedule>(`/api/tasks/schedules/${id}`, {
method: 'PUT',
body: JSON.stringify(data),
});
}
async deleteTaskSchedule(id: number) {
return this.request<{ success: boolean; message: string }>(`/api/tasks/schedules/${id}`, {
method: 'DELETE',
});
}
async deleteTaskSchedulesBulk(ids?: number[], all?: boolean) {
return this.request<{ success: boolean; deleted_count: number; deleted: { id: number; name: string }[]; message: string }>(
'/api/tasks/schedules',
{
method: 'DELETE',
body: JSON.stringify({ ids, all }),
}
);
}
async runTaskScheduleNow(id: number) {
return this.request<{ success: boolean; message: string; task: any }>(`/api/tasks/schedules/${id}/run-now`, {
method: 'POST',
});
}
async toggleTaskSchedule(id: number) {
return this.request<{ success: boolean; schedule: { id: number; name: string; enabled: boolean }; message: string }>(
`/api/tasks/schedules/${id}/toggle`,
{ method: 'POST' }
);
}
}
// Type for task schedules
export interface TaskSchedule {
id: number;
name: string;
role: string;
description: string | null;
enabled: boolean;
interval_hours: number;
priority: number;
state_code: string | null;
platform: string | null;
method: 'curl' | 'http' | null;
is_immutable: boolean;
last_run_at: string | null;
next_run_at: string | null;
last_task_count: number;
last_error: string | null;
created_at: string;
updated_at: string;
}
export const api = new ApiClient(API_URL);

View File

@@ -1,3 +1,18 @@
/**
* @deprecated 2025-12-12
*
* This page used the legacy job_schedules table which has been deprecated.
* All schedule management has been consolidated into task_schedules and
* is now managed via the /admin/tasks page (TasksDashboard.tsx).
*
* The job_schedules table entries have been disabled and marked deprecated.
* This page is no longer in the navigation menu but kept for reference.
*
* Migration details:
* - job_schedules used base_interval_minutes + jitter_minutes
* - task_schedules uses interval_hours (simpler model)
* - All CRUD operations now via /api/tasks/schedules endpoints
*/
import { useEffect, useState } from 'react';
import { Layout } from '../components/Layout';
import { api } from '../lib/api';

View File

@@ -1,5 +1,5 @@
import { useState, useEffect } from 'react';
import { api } from '../lib/api';
import { api, TaskSchedule } from '../lib/api';
import { Layout } from '../components/Layout';
import {
ListChecks,
@@ -21,6 +21,12 @@ import {
X,
Calendar,
Trash2,
Edit2,
Play,
Pause,
Timer,
Lock,
Globe,
} from 'lucide-react';
interface Task {
@@ -381,10 +387,264 @@ const ROLES = [
'store_discovery',
'entry_point_discovery',
'product_discovery',
'product_refresh',
'analytics_refresh',
];
// ============================================================
// Schedule Edit Modal
// ============================================================
interface ScheduleEditModalProps {
isOpen: boolean;
schedule: TaskSchedule | null;
onClose: () => void;
onSave: () => void;
}
function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditModalProps) {
const [name, setName] = useState('');
const [role, setRole] = useState('product_refresh');
const [description, setDescription] = useState('');
const [enabled, setEnabled] = useState(true);
const [intervalHours, setIntervalHours] = useState(4);
const [priority, setPriority] = useState(0);
const [stateCode, setStateCode] = useState('');
const [platform, setPlatform] = useState('dutchie');
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
const isNew = !schedule;
const isImmutable = schedule?.is_immutable ?? false;
useEffect(() => {
if (schedule) {
setName(schedule.name);
setRole(schedule.role);
setDescription(schedule.description || '');
setEnabled(schedule.enabled);
setIntervalHours(schedule.interval_hours);
setPriority(schedule.priority);
setStateCode(schedule.state_code || '');
setPlatform(schedule.platform || 'dutchie');
} else {
// Reset for new schedule
setName('');
setRole('product_refresh');
setDescription('');
setEnabled(true);
setIntervalHours(4);
setPriority(0);
setStateCode('');
setPlatform('dutchie');
}
setError(null);
}, [schedule, isOpen]);
const handleSubmit = async () => {
if (!isImmutable && !name.trim()) {
setError('Name is required');
return;
}
setLoading(true);
setError(null);
try {
// For immutable schedules, only send allowed fields
const data = isImmutable
? {
enabled,
interval_hours: intervalHours,
priority,
}
: {
name: name.trim(),
role,
description: description.trim() || undefined,
enabled,
interval_hours: intervalHours,
priority,
state_code: stateCode.trim() || undefined,
platform: platform.trim() || undefined,
};
if (isNew) {
await api.createTaskSchedule(data as any);
} else {
await api.updateTaskSchedule(schedule!.id, data);
}
onSave();
onClose();
} catch (err: any) {
setError(err.response?.data?.error || err.message || 'Failed to save schedule');
} finally {
setLoading(false);
}
};
if (!isOpen) return null;
return (
<div className="fixed inset-0 z-50 overflow-y-auto">
<div className="flex min-h-full items-center justify-center p-4">
<div className="fixed inset-0 bg-black/50" onClick={onClose} />
<div className="relative bg-white rounded-xl shadow-xl max-w-lg w-full">
<div className="px-6 py-4 border-b border-gray-200 flex items-center justify-between">
<h2 className="text-lg font-semibold text-gray-900">
{isNew ? 'Create Schedule' : 'Edit Schedule'}
</h2>
<button onClick={onClose} className="p-1 hover:bg-gray-100 rounded">
<X className="w-5 h-5 text-gray-500" />
</button>
</div>
<div className="px-6 py-4 space-y-4">
{error && (
<div className="bg-red-50 border border-red-200 rounded-lg p-3 text-red-700 text-sm">
{error}
</div>
)}
{isImmutable && (
<div className="bg-amber-50 border border-amber-200 rounded-lg p-3 flex items-start gap-2">
<Lock className="w-4 h-4 text-amber-600 flex-shrink-0 mt-0.5" />
<div className="text-sm text-amber-800">
<strong>Immutable schedule.</strong> Only <em>Enabled</em>, <em>Interval</em>, and <em>Priority</em> can be modified.
</div>
</div>
)}
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Name *</label>
<input
type="text"
value={name}
onChange={(e) => setName(e.target.value)}
placeholder="e.g., product_refresh_all"
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Role *</label>
<select
value={role}
onChange={(e) => setRole(e.target.value)}
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
>
{TASK_ROLES.map(r => (
<option key={r.id} value={r.id}>{r.name}</option>
))}
</select>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Description</label>
<input
type="text"
value={description}
onChange={(e) => setDescription(e.target.value)}
placeholder="Optional description"
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
<div className="grid grid-cols-2 gap-4">
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Interval (hours) *</label>
<input
type="number"
min="1"
max="168"
value={intervalHours}
onChange={(e) => setIntervalHours(parseInt(e.target.value) || 4)}
className="w-full px-3 py-2 border border-gray-200 rounded-lg"
/>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Priority</label>
<input
type="number"
min="0"
max="100"
value={priority}
onChange={(e) => setPriority(parseInt(e.target.value) || 0)}
className="w-full px-3 py-2 border border-gray-200 rounded-lg"
/>
</div>
</div>
<div className="grid grid-cols-2 gap-4">
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">State Code</label>
<input
type="text"
value={stateCode}
onChange={(e) => setStateCode(e.target.value.toUpperCase())}
placeholder="e.g., AZ"
maxLength={2}
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Platform</label>
<input
type="text"
value={platform}
onChange={(e) => setPlatform(e.target.value)}
placeholder="e.g., dutchie"
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
</div>
<div className="flex items-center gap-2">
<input
type="checkbox"
id="enabled"
checked={enabled}
onChange={(e) => setEnabled(e.target.checked)}
className="w-4 h-4 text-emerald-600 rounded"
/>
<label htmlFor="enabled" className="text-sm text-gray-700">Enabled</label>
</div>
</div>
<div className="px-6 py-4 border-t border-gray-200 bg-gray-50 flex justify-end gap-3">
<button onClick={onClose} className="px-4 py-2 text-sm text-gray-700 hover:bg-gray-100 rounded-lg">
Cancel
</button>
<button
onClick={handleSubmit}
disabled={loading}
className="px-4 py-2 text-sm bg-emerald-600 text-white rounded-lg hover:bg-emerald-700 disabled:opacity-50 flex items-center gap-2"
>
{loading && <RefreshCw className="w-4 h-4 animate-spin" />}
{isNew ? 'Create' : 'Save'}
</button>
</div>
</div>
</div>
</div>
);
}
const STATUS_COLORS: Record<string, string> = {
pending: 'bg-yellow-100 text-yellow-800',
claimed: 'bg-blue-100 text-blue-800',
@@ -452,6 +712,13 @@ export default function TasksDashboard() {
const [poolPaused, setPoolPaused] = useState(false);
const [showCreateModal, setShowCreateModal] = useState(false);
// Schedules state
const [schedules, setSchedules] = useState<TaskSchedule[]>([]);
const [showSchedules, setShowSchedules] = useState(true);
const [selectedSchedules, setSelectedSchedules] = useState<Set<number>>(new Set());
const [editingSchedule, setEditingSchedule] = useState<TaskSchedule | null>(null);
const [showScheduleModal, setShowScheduleModal] = useState(false);
// Pagination
const [page, setPage] = useState(0);
const tasksPerPage = 25;
@@ -465,7 +732,7 @@ export default function TasksDashboard() {
const fetchData = async () => {
try {
const [tasksRes, countsRes, capacityRes, poolStatus] = await Promise.all([
const [tasksRes, countsRes, capacityRes, poolStatus, schedulesRes] = await Promise.all([
api.getTasks({
role: roleFilter || undefined,
status: statusFilter || undefined,
@@ -474,12 +741,14 @@ export default function TasksDashboard() {
api.getTaskCounts(),
api.getTaskCapacity(),
api.getTaskPoolStatus(),
api.getTaskSchedules(),
]);
setTasks(tasksRes.tasks || []);
setCounts(countsRes);
setCapacity(capacityRes.metrics || []);
setPoolPaused(poolStatus.paused);
setSchedules(schedulesRes.schedules || []);
setError(null);
} catch (err: any) {
setError(err.message || 'Failed to load tasks');
@@ -488,6 +757,91 @@ export default function TasksDashboard() {
}
};
const handleDeleteSchedule = async (scheduleId: number) => {
if (!confirm('Delete this schedule?')) return;
try {
await api.deleteTaskSchedule(scheduleId);
setSelectedSchedules(prev => {
const next = new Set(prev);
next.delete(scheduleId);
return next;
});
fetchData();
} catch (err: any) {
console.error('Delete schedule error:', err);
alert(err.response?.data?.error || 'Failed to delete schedule');
}
};
const handleBulkDeleteSchedules = async () => {
// Filter out immutable schedules from selection
const deletableIds = Array.from(selectedSchedules).filter(id => {
const schedule = schedules.find(s => s.id === id);
return schedule && !schedule.is_immutable;
});
if (deletableIds.length === 0) {
alert('No deletable schedules selected. Immutable schedules cannot be deleted.');
return;
}
const immutableCount = selectedSchedules.size - deletableIds.length;
const confirmMsg = immutableCount > 0
? `Delete ${deletableIds.length} schedule(s)? (${immutableCount} immutable schedule(s) will be skipped)`
: `Delete ${deletableIds.length} selected schedule(s)?`;
if (!confirm(confirmMsg)) return;
try {
await api.deleteTaskSchedulesBulk(deletableIds);
setSelectedSchedules(new Set());
fetchData();
} catch (err: any) {
console.error('Bulk delete error:', err);
alert(err.response?.data?.error || 'Failed to delete schedules');
}
};
const handleToggleSchedule = async (scheduleId: number) => {
try {
await api.toggleTaskSchedule(scheduleId);
fetchData();
} catch (err: any) {
console.error('Toggle schedule error:', err);
alert(err.response?.data?.error || 'Failed to toggle schedule');
}
};
const handleRunScheduleNow = async (scheduleId: number) => {
try {
const result = await api.runTaskScheduleNow(scheduleId);
alert(result.message);
fetchData();
} catch (err: any) {
console.error('Run schedule error:', err);
alert(err.response?.data?.error || 'Failed to run schedule');
}
};
const toggleSelectSchedule = (id: number) => {
setSelectedSchedules(prev => {
const next = new Set(prev);
if (next.has(id)) {
next.delete(id);
} else {
next.add(id);
}
return next;
});
};
const toggleSelectAllSchedules = () => {
if (selectedSchedules.size === schedules.length) {
setSelectedSchedules(new Set());
} else {
setSelectedSchedules(new Set(schedules.map(s => s.id)));
}
};
const handleDeleteTask = async (taskId: number) => {
if (!confirm('Delete this task?')) return;
try {
@@ -583,6 +937,17 @@ export default function TasksDashboard() {
onTaskCreated={fetchData}
/>
{/* Schedule Edit Modal */}
<ScheduleEditModal
isOpen={showScheduleModal}
schedule={editingSchedule}
onClose={() => {
setShowScheduleModal(false);
setEditingSchedule(null);
}}
onSave={fetchData}
/>
{/* Status Summary Cards */}
<div className="grid grid-cols-2 sm:grid-cols-3 lg:grid-cols-6 gap-4">
{Object.entries(counts || {}).map(([status, count]) => (
@@ -714,6 +1079,238 @@ export default function TasksDashboard() {
)}
</div>
{/* Schedules Section */}
<div className="bg-white rounded-lg border border-gray-200 overflow-hidden">
<button
onClick={() => setShowSchedules(!showSchedules)}
className="w-full flex items-center justify-between p-4 hover:bg-gray-50"
>
<div className="flex items-center gap-2">
<Timer className="w-5 h-5 text-emerald-600" />
<span className="font-medium text-gray-900">Schedules ({schedules.length})</span>
</div>
{showSchedules ? (
<ChevronUp className="w-5 h-5 text-gray-400" />
) : (
<ChevronDown className="w-5 h-5 text-gray-400" />
)}
</button>
{showSchedules && (
<div className="border-t border-gray-200">
{/* Schedule Actions */}
<div className="p-4 bg-gray-50 border-b border-gray-200 flex flex-wrap items-center justify-between gap-2">
<div className="flex items-center gap-2">
<button
onClick={() => {
setEditingSchedule(null);
setShowScheduleModal(true);
}}
className="flex items-center gap-1 px-3 py-1.5 text-sm bg-emerald-600 text-white rounded hover:bg-emerald-700"
>
<Plus className="w-4 h-4" />
New Schedule
</button>
{selectedSchedules.size > 0 && (
<button
onClick={handleBulkDeleteSchedules}
className="flex items-center gap-1 px-3 py-1.5 text-sm bg-red-600 text-white rounded hover:bg-red-700"
>
<Trash2 className="w-4 h-4" />
Delete ({selectedSchedules.size})
</button>
)}
</div>
<span className="text-sm text-gray-500">
{schedules.filter(s => s.enabled).length} enabled
</span>
</div>
{schedules.length === 0 ? (
<div className="p-8 text-center text-gray-500">
No schedules configured. Click "New Schedule" to create one.
</div>
) : (
<div className="overflow-x-auto">
<table className="min-w-full divide-y divide-gray-200">
<thead className="bg-gray-50">
<tr>
<th className="px-4 py-3 text-left">
<input
type="checkbox"
checked={selectedSchedules.size === schedules.length && schedules.length > 0}
onChange={toggleSelectAllSchedules}
className="w-4 h-4 text-emerald-600 rounded"
/>
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Name
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Role
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
State
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Method
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Interval
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Last Run
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Next Run
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Status
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase w-32">
Actions
</th>
</tr>
</thead>
<tbody className="divide-y divide-gray-200">
{schedules.map((schedule) => (
<tr key={schedule.id} className="hover:bg-gray-50">
<td className="px-4 py-3">
<input
type="checkbox"
checked={selectedSchedules.has(schedule.id)}
onChange={() => toggleSelectSchedule(schedule.id)}
className="w-4 h-4 text-emerald-600 rounded"
disabled={schedule.is_immutable}
/>
</td>
<td className="px-4 py-3">
<div className="flex items-center gap-2">
{schedule.is_immutable && (
<span title="Immutable schedule (cannot be deleted)">
<Lock className="w-3.5 h-3.5 text-amber-500 flex-shrink-0" />
</span>
)}
<div>
<div className="text-sm font-medium text-gray-900">{schedule.name}</div>
{schedule.description && (
<div className="text-xs text-gray-500">{schedule.description}</div>
)}
</div>
</div>
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.role.replace(/_/g, ' ')}
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.state_code ? (
<span className="inline-flex items-center gap-1 px-2 py-0.5 bg-blue-50 text-blue-700 rounded font-medium">
<Globe className="w-3 h-3" />
{schedule.state_code}
</span>
) : (
<span className="text-gray-400">-</span>
)}
</td>
<td className="px-4 py-3 text-sm">
<span className={`inline-flex items-center px-2 py-0.5 rounded text-xs font-medium ${
schedule.method === 'http'
? 'bg-purple-100 text-purple-700'
: schedule.method === 'curl'
? 'bg-orange-100 text-orange-700'
: 'bg-gray-100 text-gray-600'
}`}>
{schedule.method || 'any'}
</span>
</td>
<td className="px-4 py-3 text-sm text-gray-600">
Every {schedule.interval_hours}h
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.last_run_at ? formatTimeAgo(schedule.last_run_at) : '-'}
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.next_run_at ? formatTimeAgo(schedule.next_run_at) : '-'}
</td>
<td className="px-4 py-3">
<span
className={`inline-flex items-center gap-1 px-2 py-1 rounded-full text-xs font-medium ${
schedule.enabled
? 'bg-green-100 text-green-800'
: 'bg-gray-100 text-gray-800'
}`}
>
{schedule.enabled ? (
<>
<Play className="w-3 h-3" />
Active
</>
) : (
<>
<Pause className="w-3 h-3" />
Paused
</>
)}
</span>
</td>
<td className="px-4 py-3">
<div className="flex items-center gap-1">
<button
onClick={() => handleRunScheduleNow(schedule.id)}
className="p-1.5 text-gray-400 hover:text-emerald-600 hover:bg-emerald-50 rounded transition-colors"
title="Run now"
>
<PlayCircle className="w-4 h-4" />
</button>
<button
onClick={() => handleToggleSchedule(schedule.id)}
className={`p-1.5 rounded transition-colors ${
schedule.enabled
? 'text-gray-400 hover:text-yellow-600 hover:bg-yellow-50'
: 'text-gray-400 hover:text-green-600 hover:bg-green-50'
}`}
title={schedule.enabled ? 'Pause' : 'Enable'}
>
{schedule.enabled ? (
<Pause className="w-4 h-4" />
) : (
<Play className="w-4 h-4" />
)}
</button>
<button
onClick={() => {
setEditingSchedule(schedule);
setShowScheduleModal(true);
}}
className="p-1.5 text-gray-400 hover:text-blue-600 hover:bg-blue-50 rounded transition-colors"
title={schedule.is_immutable ? 'Edit (limited fields)' : 'Edit'}
>
<Edit2 className="w-4 h-4" />
</button>
<button
onClick={() => handleDeleteSchedule(schedule.id)}
disabled={schedule.is_immutable}
className={`p-1.5 rounded transition-colors ${
schedule.is_immutable
? 'text-gray-300 cursor-not-allowed'
: 'text-gray-400 hover:text-red-600 hover:bg-red-50'
}`}
title={schedule.is_immutable ? 'Cannot delete immutable schedule' : 'Delete'}
>
<Trash2 className="w-4 h-4" />
</button>
</div>
</td>
</tr>
))}
</tbody>
</table>
</div>
)}
</div>
)}
</div>
{/* Filters */}
<div className="flex flex-col sm:flex-row gap-4">
<div className="relative flex-1">