Compare commits

..

2 Commits

Author SHA1 Message Date
Kelly
2513e22171 fix(security): Add auth middleware to unprotected API endpoints
Security audit identified 8 endpoint groups that were publicly accessible
without authentication. Added authMiddleware and requireRole where appropriate.

Protected endpoints:
- /api/payloads/* - authMiddleware (trusted origins or API token)
- /api/job-queue/* - authMiddleware + requireRole('admin')
- /api/workers/* - authMiddleware
- /api/worker-registry/* - authMiddleware (pods access via trusted IPs)
- /api/k8s/* - authMiddleware + requireRole('admin')
- /api/pipeline/* - authMiddleware + requireRole('admin')
- /api/tasks/* - authMiddleware + requireRole('admin')
- /api/admin/orchestrator/* - authMiddleware + requireRole('admin')

Also:
- Added API_SECURITY.md documentation
- Filter AI settings from /settings page (managed in /ai-settings)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-12 00:30:08 -07:00
Kelly
e17b3b225a feat(k8s): Add StatefulSet for persistent workers
- Add scraper-worker-statefulset.yaml with 8 persistent pods
- updateStrategy: OnDelete prevents automatic restarts
- Workers maintain stable identity across restarts
- Document worker architecture in CLAUDE.md
- Add worker registry API endpoint documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 00:30:08 -07:00
32 changed files with 569 additions and 3788 deletions

131
CLAUDE.md
View File

@@ -17,35 +17,6 @@ Never deploy unless user explicitly says: "CLAUDE — DEPLOYMENT IS NOW AUTHORIZ
### 5. DB POOL ONLY
Never import `src/db/migrate.ts` at runtime. Use `src/db/pool.ts` for DB access.
### 6. K8S POD LIMITS — CRITICAL
**MAX 8 PODS** for `scraper-worker` deployment. NEVER EXCEED THIS.
**Pods vs Workers:**
- **Pod** = Kubernetes container instance (MAX 8)
- **Worker** = Concurrent task runner INSIDE a pod (controlled by `MAX_CONCURRENT_TASKS` env var)
- Formula: `8 pods × MAX_CONCURRENT_TASKS = total concurrent workers`
**To increase workers:** Change `MAX_CONCURRENT_TASKS` env var, NOT replicas.
```bash
# CORRECT - increase workers per pod
kubectl set env deployment/scraper-worker -n dispensary-scraper MAX_CONCURRENT_TASKS=5
# WRONG - never scale above 8 replicas
kubectl scale deployment/scraper-worker --replicas=20 # NEVER DO THIS
```
**If K8s API returns ServiceUnavailable:** STOP IMMEDIATELY. Do not retry. The cluster is overloaded.
### 7. K8S REQUIRES EXPLICIT PERMISSION
**NEVER run kubectl commands without explicit user permission.**
Before running ANY `kubectl` command (scale, rollout, set env, delete, apply, etc.):
1. Tell the user what you want to do
2. Wait for explicit approval
3. Only then execute the command
This applies to ALL kubectl operations - even read-only ones like `kubectl get pods`.
---
## Quick Reference
@@ -234,75 +205,55 @@ These binaries mimic real browser TLS fingerprints to avoid detection.
---
## Staggered Task Workflow (Added 2025-12-12)
## Worker Architecture (Kubernetes)
### Overview
When creating many tasks at once (e.g., product refresh for all AZ stores), staggered scheduling prevents resource contention, proxy assignment lag, and API rate limiting.
### Persistent Workers (StatefulSet)
### How It Works
```
1. Task created with scheduled_for = NOW() + (index * stagger_seconds)
2. Worker claims task only when scheduled_for <= NOW()
3. Worker runs preflight on EVERY task claim (proxy health check)
4. If preflight passes, worker executes task
5. If preflight fails, task released back to pending for another worker
6. Worker finishes task, polls for next available task
7. Repeat - preflight runs on each new task claim
```
Workers run as a **StatefulSet** with 8 persistent pods. They maintain identity across restarts.
### Key Points
- **Preflight is per-task, not per-startup**: Each task claim triggers a new preflight check
- **Stagger prevents thundering herd**: 15 seconds between tasks is default
- **Task assignment is the trigger**: Worker picks up task → runs preflight → executes if passed
**Pod Names**: `scraper-worker-0` through `scraper-worker-7`
### API Endpoints
```bash
# Create staggered tasks for specific dispensary IDs
POST /api/tasks/batch/staggered
{
"dispensary_ids": [1, 2, 3, 4],
"role": "product_refresh", # or "product_discovery"
"stagger_seconds": 15, # default: 15
"platform": "dutchie", # default: "dutchie"
"method": null # "curl" | "http" | null
}
**Key Properties**:
- `updateStrategy: OnDelete` - Pods only update when manually deleted (no automatic restarts)
- `podManagementPolicy: Parallel` - All pods start simultaneously
- Workers register with their pod name as identity
# Create staggered tasks for AZ stores (convenience endpoint)
POST /api/tasks/batch/az-stores
{
"total_tasks": 24, # default: 24
"stagger_seconds": 15, # default: 15
"split_roles": true # default: true (12 refresh, 12 discovery)
}
```
**K8s Manifest**: `backend/k8s/scraper-worker-statefulset.yaml`
### Example: 24 Tasks for AZ Stores
```bash
curl -X POST http://localhost:3010/api/tasks/batch/az-stores \
-H "Content-Type: application/json" \
-d '{"total_tasks": 24, "stagger_seconds": 15, "split_roles": true}'
```
### Worker Lifecycle
Response:
```json
{
"success": true,
"total": 24,
"product_refresh": 12,
"product_discovery": 12,
"stagger_seconds": 15,
"total_duration_seconds": 345,
"estimated_completion": "2025-12-12T08:40:00.000Z",
"message": "Created 24 staggered tasks for AZ stores (12 refresh, 12 discovery)"
}
```
1. **Startup**: Worker registers in `worker_registry` table with pod name
2. **Preflight**: Runs dual-transport preflights (curl + http), reports IPs and fingerprint
3. **Task Loop**: Polls for tasks, executes them, reports status
4. **Shutdown**: Graceful 60-second termination period
### Related Files
| File | Purpose |
|------|---------|
| `src/tasks/task-service.ts` | `createStaggeredTasks()` and `createAZStoreTasks()` methods |
| `src/routes/tasks.ts` | API endpoints for batch task creation |
| `src/tasks/task-worker.ts` | Worker task claiming and preflight logic |
### NEVER Restart Workers Unnecessarily
**Claude must NOT**:
- Restart workers unless explicitly requested
- Use `kubectl rollout restart` on workers
- Use `kubectl set image` on workers (this triggers restart)
**To update worker code** (only when user authorizes):
1. Build and push new image with version tag
2. Update StatefulSet image reference
3. Manually delete pods one at a time when ready: `kubectl delete pod scraper-worker-0 -n dispensary-scraper`
### Worker Registry API
**Endpoint**: `GET /api/worker-registry/workers`
**Response Fields**:
| Field | Description |
|-------|-------------|
| `pod_name` | Kubernetes pod name |
| `worker_id` | Internal worker UUID |
| `status` | active, idle, offline |
| `curl_ip` | IP from curl preflight |
| `http_ip` | IP from Puppeteer preflight |
| `preflight_status` | pending, passed, failed |
| `preflight_at` | Timestamp of last preflight |
| `fingerprint_data` | Browser fingerprint JSON |
---

View File

@@ -0,0 +1,175 @@
# API Security Documentation
This document describes the authentication and authorization configuration for all CannaiQ API endpoints.
## Authentication Methods
### 1. Trusted Origins (No Token Required)
Requests from trusted sources are automatically authenticated with `internal` role:
**Trusted IPs:**
- `127.0.0.1` (localhost IPv4)
- `::1` (localhost IPv6)
- `::ffff:127.0.0.1` (IPv4-mapped IPv6)
**Trusted Domains:**
- `https://cannaiq.co`
- `https://www.cannaiq.co`
- `https://findadispo.com`
- `https://www.findadispo.com`
- `https://findagram.co`
- `https://www.findagram.co`
- `http://localhost:3010`
- `http://localhost:8080`
- `http://localhost:5173`
**Trusted Patterns:**
- `*.cannabrands.app`
- `*.cannaiq.co`
**Internal Header:**
- `X-Internal-Request` header matching `INTERNAL_REQUEST_SECRET` env var
### 2. Bearer Token Authentication
External requests must include a valid token:
```
Authorization: Bearer <token>
```
**Token Types:**
- **JWT Token**: User session tokens (7-day expiry)
- **API Token**: Long-lived tokens for integrations (stored in `api_tokens` table)
## Authorization Levels
### Public (No Auth)
Routes accessible without authentication:
- `GET /health` - Health check
- `GET /api/health/*` - Comprehensive health endpoints
- `GET /outbound-ip` - Server's outbound IP
- `GET /api/v1/deals` - Public deals endpoint
### Authenticated (Trusted Origin or Token)
Routes requiring authentication but no specific role:
| Route | Description |
|-------|-------------|
| `/api/payloads/*` | Raw crawl payload access |
| `/api/workers/*` | Worker monitoring |
| `/api/worker-registry/*` | Worker registration and heartbeats |
| `/api/stores/*` | Store CRUD |
| `/api/products/*` | Product listing |
| `/api/dispensaries/*` | Dispensary data |
### Admin Only (Requires `admin` or `superadmin` role)
Routes restricted to administrators:
| Route | Description |
|-------|-------------|
| `/api/job-queue/*` | Job queue management |
| `/api/k8s/*` | Kubernetes control (scaling) |
| `/api/pipeline/*` | Pipeline stage transitions |
| `/api/tasks/*` | Task queue management |
| `/api/admin/orchestrator/*` | Orchestrator dashboard |
| `/api/admin/trusted-origins/*` | Manage trusted origins |
| `/api/admin/debug/*` | Debug endpoints |
**Note:** The `internal` role (localhost/trusted origins) bypasses role checks, granting automatic admin access for local development and internal services.
## Endpoint Security Matrix
| Endpoint Group | Auth Required | Role Required | Notes |
|----------------|---------------|---------------|-------|
| `/api/payloads/*` | Yes | None | Query API for raw crawl data |
| `/api/job-queue/*` | Yes | admin | Legacy job queue (deprecated) |
| `/api/workers/*` | Yes | None | Worker status monitoring |
| `/api/worker-registry/*` | Yes | None | Workers register via trusted IPs |
| `/api/k8s/*` | Yes | admin | K8s scaling controls |
| `/api/pipeline/*` | Yes | admin | Store pipeline transitions |
| `/api/tasks/*` | Yes | admin | Task queue CRUD |
| `/api/admin/orchestrator/*` | Yes | admin | Orchestrator metrics/alerts |
| `/api/admin/trusted-origins/*` | Yes | admin | Auth bypass management |
| `/api/v1/*` | Varies | Varies | Public API (per-endpoint) |
| `/api/consumer/*` | Varies | Varies | Consumer features |
## Implementation Details
### Middleware Stack
```typescript
// Authentication middleware - validates token or trusted origin
import { authMiddleware } from '../auth/middleware';
// Role requirement middleware - checks user role
import { requireRole } from '../auth/middleware';
// Usage in route files:
router.use(authMiddleware); // All routes need auth
router.use(requireRole('admin', 'superadmin')); // Admin-only routes
```
### Auth Middleware Flow
```
Request → Check Bearer Token
├─ Valid JWT → Set user from token → Continue
├─ Valid API Token → Set user as api_token role → Continue
└─ No Token → Check Trusted Origin
├─ Trusted → Set user as internal role → Continue
└─ Not Trusted → 401 Unauthorized
```
### Role Check Flow
```
Request → authMiddleware → requireRole('admin')
├─ role === 'internal' → Continue (bypass)
├─ role in ['admin', 'superadmin'] → Continue
└─ else → 403 Forbidden
```
## Worker Pod Authentication
Worker pods (in Kubernetes) authenticate via:
1. **Internal IP**: Pods communicate via cluster IPs, which are trusted
2. **Internal Header**: Optional `X-Internal-Request` header for explicit trust
Endpoints used by workers:
- `POST /api/worker-registry/register` - Report for duty
- `POST /api/worker-registry/heartbeat` - Stay alive
- `POST /api/worker-registry/deregister` - Graceful shutdown
- `POST /api/worker-registry/task-completed` - Report task completion
## API Token Management
API tokens are managed via:
- `GET /api/api-tokens` - List tokens
- `POST /api/api-tokens` - Create token
- `DELETE /api/api-tokens/:id` - Revoke token
Token properties:
- `token`: The bearer token value
- `name`: Human-readable identifier
- `rate_limit`: Requests per minute
- `expires_at`: Optional expiration
- `active`: Enable/disable toggle
- `allowed_endpoints`: Optional endpoint restrictions
## Security Best Practices
1. **Never expose tokens in URLs** - Use Authorization header
2. **Use HTTPS in production** - All traffic encrypted
3. **Rotate API tokens periodically** - Set expiration dates
4. **Monitor rate limits** - Prevent abuse
5. **Audit access logs** - Track API usage via `api_usage_logs` table
## Related Files
- `src/auth/middleware.ts` - Auth middleware implementation
- `src/routes/api-tokens.ts` - Token management endpoints
- `src/middleware/apiTokenTracker.ts` - Usage tracking
- `src/middleware/trustedDomains.ts` - Domain trust markers

View File

@@ -1,10 +0,0 @@
-- Migration 086: Add proxy_url column for alternative URL formats
-- Some proxy providers use non-standard URL formats (e.g., host:port:user:pass)
-- This column allows storing the raw URL directly
-- Add proxy_url column - if set, used directly instead of constructing from parts
ALTER TABLE proxies
ADD COLUMN IF NOT EXISTS proxy_url TEXT;
-- Add comment
COMMENT ON COLUMN proxies.proxy_url IS 'Raw proxy URL (if provider uses non-standard format). Takes precedence over constructed URL from host/port/user/pass.';

View File

@@ -1,30 +0,0 @@
-- Migration 088: Extend raw_crawl_payloads for discovery payloads
--
-- Enables saving raw store data from Dutchie discovery crawls.
-- Store discovery returns raw dispensary objects - save them for historical analysis.
-- Add payload_type to distinguish product crawls from discovery crawls
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS payload_type VARCHAR(32) NOT NULL DEFAULT 'product';
-- Add state_code for discovery payloads (null for product payloads)
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS state_code VARCHAR(10);
-- Add store_count for discovery payloads (alternative to product_count)
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS store_count INTEGER;
-- Make dispensary_id nullable for discovery payloads
ALTER TABLE raw_crawl_payloads
ALTER COLUMN dispensary_id DROP NOT NULL;
-- Add index for discovery payload queries
CREATE INDEX IF NOT EXISTS idx_raw_crawl_payloads_type_state
ON raw_crawl_payloads(payload_type, state_code)
WHERE payload_type = 'store_discovery';
-- Comments
COMMENT ON COLUMN raw_crawl_payloads.payload_type IS 'Type: product (default), store_discovery';
COMMENT ON COLUMN raw_crawl_payloads.state_code IS 'State code for discovery payloads (e.g., AZ, MI)';
COMMENT ON COLUMN raw_crawl_payloads.store_count IS 'Number of stores in discovery payload';

View File

@@ -1,105 +0,0 @@
-- Migration 089: Immutable Schedules with Per-State Product Discovery
--
-- Key changes:
-- 1. Add is_immutable column - schedules can be edited but not deleted
-- 2. Add method column - all tasks use 'http' (Puppeteer transport)
-- 3. Store discovery weekly (168h)
-- 4. Per-state product_discovery schedules (4h default)
-- 5. Remove old payload_fetch schedules
-- =====================================================
-- 1) Add new columns to task_schedules
-- =====================================================
ALTER TABLE task_schedules
ADD COLUMN IF NOT EXISTS is_immutable BOOLEAN DEFAULT FALSE;
ALTER TABLE task_schedules
ADD COLUMN IF NOT EXISTS method VARCHAR(10) DEFAULT 'http';
-- =====================================================
-- 2) Update store_discovery to weekly and immutable
-- =====================================================
UPDATE task_schedules
SET interval_hours = 168, -- 7 days
is_immutable = TRUE,
method = 'http',
description = 'Discover new Dutchie stores weekly (HTTP transport)'
WHERE name = 'store_discovery_dutchie';
-- Insert if doesn't exist
INSERT INTO task_schedules (name, role, interval_hours, priority, description, is_immutable, method, platform, next_run_at)
VALUES ('store_discovery_dutchie', 'store_discovery', 168, 5, 'Discover new Dutchie stores weekly (HTTP transport)', TRUE, 'http', 'dutchie', NOW())
ON CONFLICT (name) DO UPDATE SET
interval_hours = 168,
is_immutable = TRUE,
method = 'http',
description = 'Discover new Dutchie stores weekly (HTTP transport)';
-- =====================================================
-- 3) Remove old payload_fetch and product_refresh_all schedules
-- =====================================================
DELETE FROM task_schedules WHERE name IN ('payload_fetch_all', 'product_refresh_all');
-- =====================================================
-- 4) Create per-state product_discovery schedules
-- =====================================================
-- One schedule per state that has dispensaries with active cannabis programs
INSERT INTO task_schedules (name, role, state_code, interval_hours, priority, description, is_immutable, method, enabled, next_run_at)
SELECT
'product_discovery_' || lower(s.code) AS name,
'product_discovery' AS role,
s.code AS state_code,
4 AS interval_hours, -- 4 hours default, editable
10 AS priority,
'Product discovery for ' || s.name || ' dispensaries (HTTP transport)' AS description,
TRUE AS is_immutable, -- Can edit but not delete
'http' AS method,
CASE WHEN s.is_active THEN TRUE ELSE FALSE END AS enabled,
-- Stagger start times: each state starts 5 minutes after the previous
NOW() + (ROW_NUMBER() OVER (ORDER BY s.code) * INTERVAL '5 minutes') AS next_run_at
FROM states s
WHERE EXISTS (
SELECT 1 FROM dispensaries d
WHERE d.state_id = s.id AND d.crawl_enabled = true
)
ON CONFLICT (name) DO UPDATE SET
is_immutable = TRUE,
method = 'http',
description = EXCLUDED.description;
-- Also create schedules for states that might have stores discovered later
INSERT INTO task_schedules (name, role, state_code, interval_hours, priority, description, is_immutable, method, enabled, next_run_at)
SELECT
'product_discovery_' || lower(s.code) AS name,
'product_discovery' AS role,
s.code AS state_code,
4 AS interval_hours,
10 AS priority,
'Product discovery for ' || s.name || ' dispensaries (HTTP transport)' AS description,
TRUE AS is_immutable,
'http' AS method,
FALSE AS enabled, -- Disabled until stores exist
NOW() + INTERVAL '1 hour'
FROM states s
WHERE NOT EXISTS (
SELECT 1 FROM task_schedules ts WHERE ts.name = 'product_discovery_' || lower(s.code)
)
ON CONFLICT (name) DO NOTHING;
-- =====================================================
-- 5) Make analytics_refresh immutable
-- =====================================================
UPDATE task_schedules
SET is_immutable = TRUE, method = 'http'
WHERE name = 'analytics_refresh';
-- =====================================================
-- 6) Add index for schedule lookups
-- =====================================================
CREATE INDEX IF NOT EXISTS idx_task_schedules_state_code
ON task_schedules(state_code)
WHERE state_code IS NOT NULL;
-- Comments
COMMENT ON COLUMN task_schedules.is_immutable IS 'If TRUE, schedule cannot be deleted (only edited)';
COMMENT ON COLUMN task_schedules.method IS 'Transport method: http (Puppeteer/browser) or curl (axios)';

View File

@@ -15,9 +15,14 @@
import { Router, Request, Response } from 'express';
import { pool } from '../db/pool';
import { authMiddleware, requireRole } from '../auth/middleware';
const router = Router();
// All job-queue routes require authentication and admin role
router.use(authMiddleware);
router.use(requireRole('admin', 'superadmin'));
// In-memory queue state (would be in Redis in production)
let queuePaused = false;

View File

@@ -7,9 +7,14 @@
import { Router, Request, Response } from 'express';
import * as k8s from '@kubernetes/client-node';
import { authMiddleware, requireRole } from '../auth/middleware';
const router = Router();
// K8s control routes require authentication and admin role
router.use(authMiddleware);
router.use(requireRole('admin', 'superadmin'));
// K8s client setup - lazy initialization
let appsApi: k8s.AppsV1Api | null = null;
let k8sError: string | null = null;

View File

@@ -11,9 +11,14 @@ import { getLatestTrace, getTracesForDispensary, getTraceById } from '../service
import { getProviderDisplayName } from '../utils/provider-display';
import * as fs from 'fs';
import * as path from 'path';
import { authMiddleware, requireRole } from '../auth/middleware';
const router = Router();
// Orchestrator admin routes require authentication and admin role
router.use(authMiddleware);
router.use(requireRole('admin', 'superadmin'));
// ============================================================
// ORCHESTRATOR METRICS
// ============================================================

View File

@@ -21,9 +21,13 @@ import {
listPayloadMetadata,
} from '../utils/payload-storage';
import { Pool } from 'pg';
import { authMiddleware } from '../auth/middleware';
const router = Router();
// All payload routes require authentication (trusted origins or API token)
router.use(authMiddleware);
// Get pool instance for queries
const getDbPool = (): Pool => getPool() as unknown as Pool;

View File

@@ -18,9 +18,14 @@
import { Router, Request, Response } from 'express';
import { pool } from '../db/pool';
import { authMiddleware, requireRole } from '../auth/middleware';
const router = Router();
// Pipeline routes require authentication and admin role
router.use(authMiddleware);
router.use(requireRole('admin', 'superadmin'));
// Valid stages
const STAGES = ['discovered', 'validated', 'promoted', 'sandbox', 'production', 'failing'] as const;
type Stage = typeof STAGES[number];

View File

@@ -3,24 +3,6 @@
*
* Endpoints for managing worker tasks, viewing capacity metrics,
* and generating batch tasks.
*
* SCHEDULE MANAGEMENT (added 2025-12-12):
* This file now contains the canonical schedule management endpoints.
* The job_schedules table has been deprecated and all schedule management
* is now consolidated into task_schedules:
*
* Schedule endpoints:
* GET /api/tasks/schedules - List all schedules
* POST /api/tasks/schedules - Create new schedule
* GET /api/tasks/schedules/:id - Get schedule by ID
* PUT /api/tasks/schedules/:id - Update schedule
* DELETE /api/tasks/schedules/:id - Delete schedule
* DELETE /api/tasks/schedules - Bulk delete schedules
* POST /api/tasks/schedules/:id/run-now - Trigger schedule immediately
* POST /api/tasks/schedules/:id/toggle - Toggle schedule enabled/disabled
*
* Note: Schedule routes are defined BEFORE /:id to avoid route conflicts
* (Express matches routes in order, and "schedules" would match /:id otherwise)
*/
import { Router, Request, Response } from 'express';
@@ -37,9 +19,14 @@ import {
resumeTaskPool,
getTaskPoolStatus,
} from '../tasks/task-pool-state';
import { authMiddleware, requireRole } from '../auth/middleware';
const router = Router();
// Task routes require authentication and admin role
router.use(authMiddleware);
router.use(requireRole('admin', 'superadmin'));
/**
* GET /api/tasks
* List tasks with optional filters
@@ -149,464 +136,6 @@ router.get('/capacity/:role', async (req: Request, res: Response) => {
}
});
// ============================================================
// SCHEDULE MANAGEMENT ROUTES
// (Must be before /:id to avoid route conflicts)
// ============================================================
/**
* GET /api/tasks/schedules
* List all task schedules
*
* Returns schedules with is_immutable flag - immutable schedules can only
* have their interval_hours, priority, and enabled fields updated (not deleted).
*/
router.get('/schedules', async (req: Request, res: Response) => {
try {
const enabledOnly = req.query.enabled === 'true';
let query = `
SELECT id, name, role, description, enabled, interval_hours,
priority, state_code, platform, method,
COALESCE(is_immutable, false) as is_immutable,
last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
FROM task_schedules
`;
if (enabledOnly) {
query += ` WHERE enabled = true`;
}
query += ` ORDER BY
CASE role
WHEN 'store_discovery' THEN 1
WHEN 'product_discovery' THEN 2
WHEN 'analytics_refresh' THEN 3
ELSE 4
END,
state_code NULLS FIRST,
name`;
const result = await pool.query(query);
res.json({ schedules: result.rows });
} catch (error: unknown) {
console.error('Error listing schedules:', error);
res.status(500).json({ error: 'Failed to list schedules' });
}
});
/**
* DELETE /api/tasks/schedules
* Bulk delete schedules
*
* Immutable schedules are automatically skipped (not deleted).
*
* Body:
* - ids: number[] (required) - array of schedule IDs to delete
* - all: boolean (optional) - if true, delete all non-immutable schedules (ids ignored)
*/
router.delete('/schedules', async (req: Request, res: Response) => {
try {
const { ids, all } = req.body;
let result;
let skippedImmutable: { id: number; name: string }[] = [];
if (all === true) {
// First, find immutable schedules that will be skipped
const immutableResult = await pool.query(`
SELECT id, name FROM task_schedules WHERE is_immutable = true
`);
skippedImmutable = immutableResult.rows;
// Delete all non-immutable schedules
result = await pool.query(`
DELETE FROM task_schedules
WHERE COALESCE(is_immutable, false) = false
RETURNING id, name
`);
} else if (Array.isArray(ids) && ids.length > 0) {
// First, find which of the requested IDs are immutable
const immutableResult = await pool.query(`
SELECT id, name FROM task_schedules
WHERE id = ANY($1) AND is_immutable = true
`, [ids]);
skippedImmutable = immutableResult.rows;
// Delete only non-immutable schedules from the requested IDs
result = await pool.query(`
DELETE FROM task_schedules
WHERE id = ANY($1) AND COALESCE(is_immutable, false) = false
RETURNING id, name
`, [ids]);
} else {
return res.status(400).json({
error: 'Either provide ids array or set all=true',
});
}
res.json({
success: true,
deleted_count: result.rowCount,
deleted: result.rows,
skipped_immutable_count: skippedImmutable.length,
skipped_immutable: skippedImmutable,
message: skippedImmutable.length > 0
? `Deleted ${result.rowCount} schedule(s), skipped ${skippedImmutable.length} immutable schedule(s)`
: `Deleted ${result.rowCount} schedule(s)`,
});
} catch (error: unknown) {
console.error('Error bulk deleting schedules:', error);
res.status(500).json({ error: 'Failed to delete schedules' });
}
});
/**
* POST /api/tasks/schedules
* Create a new schedule
*
* Body:
* - name: string (required, unique)
* - role: TaskRole (required)
* - description: string (optional)
* - enabled: boolean (default true)
* - interval_hours: number (required)
* - priority: number (default 0)
* - state_code: string (optional)
* - platform: string (optional)
*/
router.post('/schedules', async (req: Request, res: Response) => {
try {
const {
name,
role,
description,
enabled = true,
interval_hours,
priority = 0,
state_code,
platform,
} = req.body;
if (!name || !role || !interval_hours) {
return res.status(400).json({
error: 'name, role, and interval_hours are required',
});
}
// Calculate next_run_at based on interval
const nextRunAt = new Date(Date.now() + interval_hours * 60 * 60 * 1000);
const result = await pool.query(`
INSERT INTO task_schedules
(name, role, description, enabled, interval_hours, priority, state_code, platform, next_run_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id, name, role, description, enabled, interval_hours,
priority, state_code, platform, last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
`, [name, role, description, enabled, interval_hours, priority, state_code, platform, nextRunAt]);
res.status(201).json(result.rows[0]);
} catch (error: any) {
if (error.code === '23505') {
// Unique constraint violation
return res.status(409).json({ error: 'A schedule with this name already exists' });
}
console.error('Error creating schedule:', error);
res.status(500).json({ error: 'Failed to create schedule' });
}
});
/**
* GET /api/tasks/schedules/:id
* Get a specific schedule by ID
*/
router.get('/schedules/:id', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
const result = await pool.query(`
SELECT id, name, role, description, enabled, interval_hours,
priority, state_code, platform, last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
FROM task_schedules
WHERE id = $1
`, [scheduleId]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
res.json(result.rows[0]);
} catch (error: unknown) {
console.error('Error getting schedule:', error);
res.status(500).json({ error: 'Failed to get schedule' });
}
});
/**
* PUT /api/tasks/schedules/:id
* Update an existing schedule
*
* For IMMUTABLE schedules, only these fields can be updated:
* - enabled (turn on/off)
* - interval_hours (change frequency)
* - priority (change priority)
*
* For regular schedules, all fields can be updated.
*/
router.put('/schedules/:id', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
const {
name,
role,
description,
enabled,
interval_hours,
priority,
state_code,
platform,
} = req.body;
// First check if schedule exists and if it's immutable
const checkResult = await pool.query(`
SELECT id, name, COALESCE(is_immutable, false) as is_immutable
FROM task_schedules WHERE id = $1
`, [scheduleId]);
if (checkResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const schedule = checkResult.rows[0];
const isImmutable = schedule.is_immutable;
// For immutable schedules, reject attempts to change protected fields
if (isImmutable) {
const protectedFields: string[] = [];
if (name !== undefined) protectedFields.push('name');
if (role !== undefined) protectedFields.push('role');
if (description !== undefined) protectedFields.push('description');
if (state_code !== undefined) protectedFields.push('state_code');
if (platform !== undefined) protectedFields.push('platform');
if (protectedFields.length > 0) {
return res.status(403).json({
error: 'Cannot modify protected fields on immutable schedule',
message: `Schedule "${schedule.name}" is immutable. Only enabled, interval_hours, and priority can be changed.`,
protected_fields: protectedFields,
allowed_fields: ['enabled', 'interval_hours', 'priority'],
});
}
}
// Build dynamic update query
const updates: string[] = [];
const values: any[] = [];
let paramIndex = 1;
// These fields can only be updated on non-immutable schedules
if (!isImmutable) {
if (name !== undefined) {
updates.push(`name = $${paramIndex++}`);
values.push(name);
}
if (role !== undefined) {
updates.push(`role = $${paramIndex++}`);
values.push(role);
}
if (description !== undefined) {
updates.push(`description = $${paramIndex++}`);
values.push(description);
}
if (state_code !== undefined) {
updates.push(`state_code = $${paramIndex++}`);
values.push(state_code || null);
}
if (platform !== undefined) {
updates.push(`platform = $${paramIndex++}`);
values.push(platform || null);
}
}
// These fields can be updated on ALL schedules (including immutable)
if (enabled !== undefined) {
updates.push(`enabled = $${paramIndex++}`);
values.push(enabled);
}
if (interval_hours !== undefined) {
updates.push(`interval_hours = $${paramIndex++}`);
values.push(interval_hours);
// Recalculate next_run_at if interval changed
const nextRunAt = new Date(Date.now() + interval_hours * 60 * 60 * 1000);
updates.push(`next_run_at = $${paramIndex++}`);
values.push(nextRunAt);
}
if (priority !== undefined) {
updates.push(`priority = $${paramIndex++}`);
values.push(priority);
}
if (updates.length === 0) {
return res.status(400).json({ error: 'No fields to update' });
}
updates.push('updated_at = NOW()');
values.push(scheduleId);
const result = await pool.query(`
UPDATE task_schedules
SET ${updates.join(', ')}
WHERE id = $${paramIndex}
RETURNING id, name, role, description, enabled, interval_hours,
priority, state_code, platform, method,
COALESCE(is_immutable, false) as is_immutable,
last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at
`, values);
res.json(result.rows[0]);
} catch (error: any) {
if (error.code === '23505') {
return res.status(409).json({ error: 'A schedule with this name already exists' });
}
console.error('Error updating schedule:', error);
res.status(500).json({ error: 'Failed to update schedule' });
}
});
/**
* DELETE /api/tasks/schedules/:id
* Delete a schedule
*
* Immutable schedules cannot be deleted - they can only be disabled.
*/
router.delete('/schedules/:id', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
// First check if schedule exists and is immutable
const checkResult = await pool.query(`
SELECT id, name, COALESCE(is_immutable, false) as is_immutable
FROM task_schedules WHERE id = $1
`, [scheduleId]);
if (checkResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const schedule = checkResult.rows[0];
// Prevent deletion of immutable schedules
if (schedule.is_immutable) {
return res.status(403).json({
error: 'Cannot delete immutable schedule',
message: `Schedule "${schedule.name}" is immutable and cannot be deleted. You can disable it instead.`,
schedule_id: scheduleId,
is_immutable: true,
});
}
// Delete the schedule
await pool.query(`DELETE FROM task_schedules WHERE id = $1`, [scheduleId]);
res.json({
success: true,
message: `Schedule "${schedule.name}" deleted`,
});
} catch (error: unknown) {
console.error('Error deleting schedule:', error);
res.status(500).json({ error: 'Failed to delete schedule' });
}
});
/**
* POST /api/tasks/schedules/:id/run-now
* Manually trigger a scheduled task to run immediately
*/
router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
// Get the schedule
const scheduleResult = await pool.query(`
SELECT id, name, role, state_code, platform, priority
FROM task_schedules WHERE id = $1
`, [scheduleId]);
if (scheduleResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const schedule = scheduleResult.rows[0];
// Create a task based on the schedule
const task = await taskService.createTask({
role: schedule.role,
platform: schedule.platform,
priority: schedule.priority + 10, // Boost priority for manual runs
});
// Update last_run_at on the schedule
await pool.query(`
UPDATE task_schedules
SET last_run_at = NOW(),
next_run_at = NOW() + (interval_hours || ' hours')::interval,
updated_at = NOW()
WHERE id = $1
`, [scheduleId]);
res.json({
success: true,
message: `Schedule "${schedule.name}" triggered`,
task,
});
} catch (error: unknown) {
console.error('Error running schedule:', error);
res.status(500).json({ error: 'Failed to run schedule' });
}
});
/**
* POST /api/tasks/schedules/:id/toggle
* Toggle a schedule's enabled status
*/
router.post('/schedules/:id/toggle', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
const result = await pool.query(`
UPDATE task_schedules
SET enabled = NOT enabled,
updated_at = NOW()
WHERE id = $1
RETURNING id, name, enabled
`, [scheduleId]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
res.json({
success: true,
schedule: result.rows[0],
message: result.rows[0].enabled
? `Schedule "${result.rows[0].name}" enabled`
: `Schedule "${result.rows[0].name}" disabled`,
});
} catch (error: unknown) {
console.error('Error toggling schedule:', error);
res.status(500).json({ error: 'Failed to toggle schedule' });
}
});
// ============================================================
// TASK-SPECIFIC ROUTES (with :id parameter)
// ============================================================
/**
* GET /api/tasks/:id
* Get a specific task by ID
@@ -1074,123 +603,6 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => {
}
});
// ============================================================
// STAGGERED BATCH TASK CREATION
// ============================================================
/**
* POST /api/tasks/batch/staggered
* Create multiple tasks with staggered start times
*
* This endpoint prevents resource contention when creating many tasks by
* staggering their scheduled_for timestamps. Each task becomes eligible
* for claiming only after its scheduled time.
*
* WORKFLOW:
* 1. Tasks created with scheduled_for = NOW() + (index * stagger_seconds)
* 2. Worker claims task only when scheduled_for <= NOW()
* 3. Worker runs preflight on EVERY task claim
* 4. If preflight passes, worker executes task
* 5. If preflight fails, task released back to pending for another worker
*
* Body:
* - dispensary_ids: number[] (required) - Array of dispensary IDs
* - role: TaskRole (required) - 'product_refresh' | 'product_discovery'
* - stagger_seconds: number (default: 15) - Seconds between each task start
* - platform: string (default: 'dutchie')
* - method: 'curl' | 'http' | null (default: null)
*/
router.post('/batch/staggered', async (req: Request, res: Response) => {
try {
const {
dispensary_ids,
role,
stagger_seconds = 15,
platform = 'dutchie',
method = null,
} = req.body;
if (!dispensary_ids || !Array.isArray(dispensary_ids) || dispensary_ids.length === 0) {
return res.status(400).json({ error: 'dispensary_ids array is required' });
}
if (!role) {
return res.status(400).json({ error: 'role is required' });
}
const result = await taskService.createStaggeredTasks(
dispensary_ids,
role as TaskRole,
stagger_seconds,
platform,
method
);
const totalDuration = (dispensary_ids.length - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
created: result.created,
task_ids: result.taskIds,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.created} staggered ${role} tasks (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
});
} catch (error: unknown) {
console.error('Error creating staggered tasks:', error);
res.status(500).json({ error: 'Failed to create staggered tasks' });
}
});
/**
* POST /api/tasks/batch/az-stores
* Convenience endpoint to create staggered tasks for Arizona stores
*
* Body:
* - total_tasks: number (default: 24) - Total tasks to create
* - stagger_seconds: number (default: 15) - Seconds between each task
* - split_roles: boolean (default: true) - Split between product_refresh and product_discovery
*/
router.post('/batch/az-stores', async (req: Request, res: Response) => {
try {
const {
total_tasks = 24,
stagger_seconds = 15,
split_roles = true,
} = req.body;
const result = await taskService.createAZStoreTasks(
total_tasks,
stagger_seconds,
split_roles
);
const totalDuration = (result.total - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
total: result.total,
product_refresh: result.product_refresh,
product_discovery: result.product_discovery,
task_ids: result.taskIds,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.total} staggered tasks for AZ stores (${result.product_refresh} refresh, ${result.product_discovery} discovery)`,
});
} catch (error: unknown) {
console.error('Error creating AZ store tasks:', error);
res.status(500).json({ error: 'Failed to create AZ store tasks' });
}
});
// ============================================================
// TASK POOL MANAGEMENT
// ============================================================
/**
* GET /api/tasks/pool/status
* Check if task pool is paused

View File

@@ -23,11 +23,14 @@
import { Router, Request, Response } from 'express';
import { pool } from '../db/pool';
import os from 'os';
import { runPuppeteerPreflightWithRetry } from '../services/puppeteer-preflight';
import { CrawlRotator } from '../services/crawl-rotator';
import { authMiddleware } from '../auth/middleware';
const router = Router();
// Worker registry routes require authentication
// Note: Internal workers (pods) can access via trusted IP (localhost, in-cluster)
router.use(authMiddleware);
// ============================================================
// WORKER REGISTRATION
// ============================================================
@@ -252,9 +255,12 @@ router.post('/deregister', async (req: Request, res: Response) => {
// Release the name back to the pool
await pool.query('SELECT release_worker_name($1)', [worker_id]);
// Delete the worker entry (clean shutdown)
// Mark as terminated
const { rows } = await pool.query(`
DELETE FROM worker_registry
UPDATE worker_registry
SET status = 'terminated',
current_task_id = NULL,
updated_at = NOW()
WHERE worker_id = $1
RETURNING id, friendly_name
`, [worker_id]);
@@ -863,58 +869,4 @@ router.get('/pods', async (_req: Request, res: Response) => {
}
});
// ============================================================
// PREFLIGHT SMOKE TEST
// ============================================================
/**
* POST /api/worker-registry/preflight-test
* Run an HTTP (Puppeteer) preflight test and return results
*
* This is a smoke test endpoint to verify the preflight system works.
* Returns IP, fingerprint data, bot detection results, and products fetched.
*/
router.post('/preflight-test', async (_req: Request, res: Response) => {
try {
console.log('[PreflightTest] Starting HTTP preflight smoke test...');
// Create a temporary CrawlRotator for the test
const crawlRotator = new CrawlRotator();
// Run the Puppeteer preflight (with 1 retry)
const startTime = Date.now();
const result = await runPuppeteerPreflightWithRetry(crawlRotator, 1);
const duration = Date.now() - startTime;
console.log(`[PreflightTest] Completed in ${duration}ms - passed: ${result.passed}`);
res.json({
success: true,
test: 'http_preflight',
duration_ms: duration,
result: {
passed: result.passed,
proxy_ip: result.proxyIp,
fingerprint: result.fingerprint,
bot_detection: result.botDetection,
products_returned: result.productsReturned,
browser_user_agent: result.browserUserAgent,
ip_verified: result.ipVerified,
proxy_available: result.proxyAvailable,
proxy_connected: result.proxyConnected,
antidetect_ready: result.antidetectReady,
response_time_ms: result.responseTimeMs,
error: result.error
}
});
} catch (error: any) {
console.error('[PreflightTest] Error:', error.message);
res.status(500).json({
success: false,
test: 'http_preflight',
error: error.message
});
}
});
export default router;

View File

@@ -4,25 +4,10 @@
* Provider-agnostic worker management and job monitoring.
* Replaces legacy /api/dutchie-az/admin/schedules and /api/dutchie-az/monitor/* routes.
*
* DEPRECATION NOTE (2025-12-12):
* This file still queries job_schedules for backwards compatibility with
* the /api/workers endpoints that display worker status. However, the
* job_schedules table is DEPRECATED - all entries have been disabled.
*
* Schedule management has been consolidated into task_schedules:
* - Use /api/tasks/schedules for schedule CRUD operations
* - Use TasksDashboard.tsx (/admin/tasks) for schedule management UI
* - task_schedules uses interval_hours (simpler than base_interval_minutes + jitter)
*
* The /api/workers endpoints remain useful for:
* - Monitoring active workers and job status
* - K8s scaling controls
* - Job history and logs
*
* Endpoints:
* GET /api/workers - List all workers/schedules
* GET /api/workers/active - List currently active workers
* GET /api/workers/schedule - Get all job schedules (DEPRECATED - use /api/tasks/schedules)
* GET /api/workers/schedule - Get all job schedules
* GET /api/workers/:workerName - Get specific worker details
* GET /api/workers/:workerName/scope - Get worker's scope (states, etc.)
* GET /api/workers/:workerName/stats - Get worker statistics
@@ -41,9 +26,13 @@
import { Router, Request, Response } from 'express';
import { pool } from '../db/pool';
import * as k8s from '@kubernetes/client-node';
import { authMiddleware } from '../auth/middleware';
const router = Router();
// All worker routes require authentication (trusted origins or API token)
router.use(authMiddleware);
// ============================================================
// K8S SCALING CONFIGURATION (added 2024-12-10)
// Per TASK_WORKFLOW_2024-12-10.md: Admin can scale workers from UI

View File

@@ -1,284 +0,0 @@
/**
* Bulk Proxy Import Script
*
* Imports proxies from various formats into the proxies table.
* Supports:
* - Standard format: http://user:pass@host:port
* - Colon format: http://host:port:user:pass
* - Simple format: host:port:user:pass (defaults to http)
*
* Usage:
* npx tsx src/scripts/import-proxies.ts < proxies.txt
* echo "http://host:port:user:pass" | npx tsx src/scripts/import-proxies.ts
* npx tsx src/scripts/import-proxies.ts --file proxies.txt
* npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"
*
* Options:
* --file <path> Read proxies from file (one per line)
* --url <url> Import a single proxy URL
* --max-connections Set max_connections for all imported proxies (default: 1)
* --dry-run Parse and show what would be imported without inserting
*/
import { getPool } from '../db/pool';
import * as fs from 'fs';
import * as readline from 'readline';
interface ParsedProxy {
protocol: string;
host: string;
port: number;
username?: string;
password?: string;
rawUrl: string;
}
/**
* Parse a proxy URL in various formats
*/
function parseProxyUrl(input: string): ParsedProxy | null {
const trimmed = input.trim();
if (!trimmed || trimmed.startsWith('#')) return null;
// Format 1: Standard URL format - http://user:pass@host:port
const standardMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):([^@]+)@([^:]+):(\d+)$/);
if (standardMatch) {
return {
protocol: standardMatch[1],
username: standardMatch[2],
password: standardMatch[3],
host: standardMatch[4],
port: parseInt(standardMatch[5], 10),
rawUrl: trimmed,
};
}
// Format 2: Standard URL without auth - http://host:port
const noAuthMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+)$/);
if (noAuthMatch) {
return {
protocol: noAuthMatch[1],
host: noAuthMatch[2],
port: parseInt(noAuthMatch[3], 10),
rawUrl: trimmed,
};
}
// Format 3: Colon format with protocol - http://host:port:user:pass
const colonWithProtocolMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+):([^:]+):(.+)$/);
if (colonWithProtocolMatch) {
return {
protocol: colonWithProtocolMatch[1],
host: colonWithProtocolMatch[2],
port: parseInt(colonWithProtocolMatch[3], 10),
username: colonWithProtocolMatch[4],
password: colonWithProtocolMatch[5],
rawUrl: trimmed, // Keep raw URL for non-standard format
};
}
// Format 4: Colon format without protocol - host:port:user:pass
const colonMatch = trimmed.match(/^([^:]+):(\d+):([^:]+):(.+)$/);
if (colonMatch) {
return {
protocol: 'http',
host: colonMatch[1],
port: parseInt(colonMatch[2], 10),
username: colonMatch[3],
password: colonMatch[4],
rawUrl: `http://${trimmed}`, // Construct raw URL
};
}
// Format 5: Simple host:port
const simpleMatch = trimmed.match(/^([^:]+):(\d+)$/);
if (simpleMatch) {
return {
protocol: 'http',
host: simpleMatch[1],
port: parseInt(simpleMatch[2], 10),
rawUrl: `http://${trimmed}`,
};
}
console.error(`[ImportProxies] Could not parse: ${trimmed}`);
return null;
}
/**
* Check if proxy URL is in non-standard format (needs proxy_url column)
*/
function isNonStandardFormat(rawUrl: string): boolean {
// Colon format: protocol://host:port:user:pass
return /^(https?|socks5):\/\/[^:]+:\d+:[^:]+:.+$/.test(rawUrl);
}
async function importProxies(proxies: ParsedProxy[], maxConnections: number, dryRun: boolean) {
if (dryRun) {
console.log('\n[ImportProxies] DRY RUN - Would import:');
for (const p of proxies) {
const needsRawUrl = isNonStandardFormat(p.rawUrl);
console.log(` ${p.host}:${p.port} (${p.protocol}) user=${p.username || 'none'} needsProxyUrl=${needsRawUrl}`);
}
console.log(`\nTotal: ${proxies.length} proxies`);
return;
}
const pool = getPool();
let inserted = 0;
let skipped = 0;
for (const proxy of proxies) {
try {
// Determine if we need to store the raw URL (non-standard format)
const needsRawUrl = isNonStandardFormat(proxy.rawUrl);
// Use different conflict resolution based on format
// Non-standard format: unique by proxy_url (session-based residential proxies)
// Standard format: unique by host/port/protocol
const query = needsRawUrl
? `
INSERT INTO proxies (host, port, protocol, username, password, max_connections, proxy_url, active)
VALUES ($1, $2, $3, $4, $5, $6, $7, true)
ON CONFLICT (proxy_url) WHERE proxy_url IS NOT NULL
DO UPDATE SET
max_connections = EXCLUDED.max_connections,
active = true,
updated_at = NOW()
RETURNING id, (xmax = 0) as is_insert
`
: `
INSERT INTO proxies (host, port, protocol, username, password, max_connections, proxy_url, active)
VALUES ($1, $2, $3, $4, $5, $6, $7, true)
ON CONFLICT (host, port, protocol)
DO UPDATE SET
username = EXCLUDED.username,
password = EXCLUDED.password,
max_connections = EXCLUDED.max_connections,
proxy_url = EXCLUDED.proxy_url,
active = true,
updated_at = NOW()
RETURNING id, (xmax = 0) as is_insert
`;
const result = await pool.query(query, [
proxy.host,
proxy.port,
proxy.protocol,
proxy.username || null,
proxy.password || null,
maxConnections,
needsRawUrl ? proxy.rawUrl : null,
]);
const isInsert = result.rows[0]?.is_insert;
const sessionId = proxy.password?.match(/session-([A-Z0-9]+)/)?.[1] || '';
const displayName = sessionId ? `session ${sessionId}` : `${proxy.host}:${proxy.port}`;
if (isInsert) {
inserted++;
console.log(`[ImportProxies] Inserted: ${displayName}`);
} else {
console.log(`[ImportProxies] Updated: ${displayName}`);
inserted++; // Count updates too
}
} catch (err: any) {
const sessionId = proxy.password?.match(/session-([A-Z0-9]+)/)?.[1] || '';
const displayName = sessionId ? `session ${sessionId}` : `${proxy.host}:${proxy.port}`;
console.error(`[ImportProxies] Error inserting ${displayName}: ${err.message}`);
skipped++;
}
}
console.log(`\n[ImportProxies] Complete: ${inserted} imported, ${skipped} skipped`);
// Notify any listening workers
try {
await pool.query(`NOTIFY proxy_added, 'bulk import'`);
console.log('[ImportProxies] Sent proxy_added notification to workers');
} catch {
// Ignore notification errors
}
}
async function readFromStdin(): Promise<string[]> {
return new Promise((resolve) => {
const lines: string[] = [];
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
});
rl.on('line', (line) => {
lines.push(line);
});
rl.on('close', () => {
resolve(lines);
});
});
}
async function main() {
const args = process.argv.slice(2);
let lines: string[] = [];
let maxConnections = 1;
let dryRun = false;
// Parse arguments
for (let i = 0; i < args.length; i++) {
if (args[i] === '--file' && args[i + 1]) {
const content = fs.readFileSync(args[i + 1], 'utf-8');
lines.push(...content.split('\n'));
i++;
} else if (args[i] === '--url' && args[i + 1]) {
lines.push(args[i + 1]);
i++;
} else if (args[i] === '--max-connections' && args[i + 1]) {
maxConnections = parseInt(args[i + 1], 10);
i++;
} else if (args[i] === '--dry-run') {
dryRun = true;
} else if (!args[i].startsWith('--')) {
// Treat as URL directly
lines.push(args[i]);
}
}
// If no lines yet, read from stdin
if (lines.length === 0) {
console.log('[ImportProxies] Reading from stdin...');
lines = await readFromStdin();
}
// Parse all lines
const proxies: ParsedProxy[] = [];
for (const line of lines) {
const parsed = parseProxyUrl(line);
if (parsed) {
proxies.push(parsed);
}
}
if (proxies.length === 0) {
console.error('[ImportProxies] No valid proxies found');
console.error('\nUsage:');
console.error(' npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"');
console.error(' npx tsx src/scripts/import-proxies.ts --file proxies.txt');
console.error(' echo "host:port:user:pass" | npx tsx src/scripts/import-proxies.ts');
console.error('\nSupported formats:');
console.error(' http://user:pass@host:port (standard)');
console.error(' http://host:port:user:pass (colon format)');
console.error(' host:port:user:pass (simple)');
process.exit(1);
}
console.log(`[ImportProxies] Parsed ${proxies.length} proxies (max_connections=${maxConnections})`);
await importProxies(proxies, maxConnections, dryRun);
}
main().catch((err) => {
console.error('[ImportProxies] Fatal error:', err);
process.exit(1);
});

View File

@@ -77,11 +77,6 @@ export interface Proxy {
country?: string;
countryCode?: string;
timezone?: string;
/**
* Raw proxy URL override. If set, used directly instead of constructing from parts.
* Supports non-standard formats like: http://host:port:user:pass
*/
proxyUrl?: string;
}
export interface ProxyStats {
@@ -134,10 +129,6 @@ export class ProxyRotator {
private proxies: Proxy[] = [];
private currentIndex: number = 0;
private lastRotation: Date = new Date();
private lastReloadAt: Date = new Date();
// Proxy reload interval - how often to check for proxy changes (default: 60 seconds)
private reloadIntervalMs: number = 60000;
constructor(pool?: Pool) {
this.pool = pool || null;
@@ -147,13 +138,6 @@ export class ProxyRotator {
this.pool = pool;
}
/**
* Set the reload interval for periodic proxy checks
*/
setReloadInterval(ms: number): void {
this.reloadIntervalMs = ms;
}
/**
* Load proxies from database
*/
@@ -183,76 +167,22 @@ export class ProxyRotator {
state,
country,
country_code as "countryCode",
timezone,
proxy_url as "proxyUrl"
timezone
FROM proxies
WHERE active = true
ORDER BY failure_count ASC, last_tested_at ASC NULLS FIRST
`);
this.proxies = result.rows;
this.lastReloadAt = new Date();
const totalCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections / threads)`);
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections)`);
} catch (error) {
console.warn(`[ProxyRotator] Could not load proxies: ${error}`);
this.proxies = [];
}
}
/**
* Check if proxy list is stale and needs reload
*/
isStale(): boolean {
const elapsed = Date.now() - this.lastReloadAt.getTime();
return elapsed > this.reloadIntervalMs;
}
/**
* Reload proxies if the cache is stale.
* This ensures workers pick up new proxies or see disabled proxies removed.
* Returns true if proxies were reloaded.
*/
async reloadIfStale(): Promise<boolean> {
if (!this.isStale()) {
return false;
}
const oldCount = this.proxies.length;
const oldCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
const oldIds = new Set(this.proxies.map(p => p.id));
await this.loadProxies();
const newCount = this.proxies.length;
const newCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
const newIds = new Set(this.proxies.map(p => p.id));
// Log changes
const added = this.proxies.filter(p => !oldIds.has(p.id));
const removed = [...oldIds].filter(id => !newIds.has(id));
if (added.length > 0 || removed.length > 0 || oldCapacity !== newCapacity) {
console.log(`[ProxyRotator] Reloaded proxies: ${oldCount}${newCount} proxies, ${oldCapacity}${newCapacity} threads`);
if (added.length > 0) {
console.log(`[ProxyRotator] Added: ${added.map(p => `${p.host}:${p.port} (${p.maxConnections} threads)`).join(', ')}`);
}
if (removed.length > 0) {
console.log(`[ProxyRotator] Removed: ${removed.join(', ')}`);
}
}
return true;
}
/**
* Get time since last reload in seconds
*/
getSecondsSinceReload(): number {
return Math.floor((Date.now() - this.lastReloadAt.getTime()) / 1000);
}
/**
* Get next proxy in rotation
*/
@@ -412,24 +342,8 @@ export class ProxyRotator {
/**
* Get proxy URL for HTTP client
* If proxy.proxyUrl is set, uses it directly (supports non-standard formats).
* Otherwise constructs standard format: protocol://user:pass@host:port
*/
getProxyUrl(proxy: Proxy): string {
// If proxyUrl is set, check if it needs conversion from non-standard format
if (proxy.proxyUrl) {
// Check if it's in non-standard format: http://host:port:user:pass
const colonFormatMatch = proxy.proxyUrl.match(/^(https?):\/\/([^:]+):(\d+):([^:]+):(.+)$/);
if (colonFormatMatch) {
// Convert to standard format: http://user:pass@host:port
const [, protocol, host, port, username, password] = colonFormatMatch;
return `${protocol}://${encodeURIComponent(username)}:${encodeURIComponent(password)}@${host}:${port}`;
}
// Already in standard format or unknown format - return as-is
return proxy.proxyUrl;
}
// Construct standard format from individual fields
const auth = proxy.username && proxy.password
? `${proxy.username}:${proxy.password}@`
: '';
@@ -670,23 +584,6 @@ export class CrawlRotator {
await this.proxy.loadProxies();
}
/**
* Reload proxy list if stale.
* Workers should call this periodically to pick up proxy changes.
* Returns true if proxies were reloaded.
*/
async reloadIfStale(): Promise<boolean> {
return this.proxy.reloadIfStale();
}
/**
* Set proxy reload interval in milliseconds.
* Default is 60 seconds.
*/
setProxyReloadInterval(ms: number): void {
this.proxy.setReloadInterval(ms);
}
/**
* Rotate proxy only (get new IP)
*/

View File

@@ -26,34 +26,6 @@ const TEST_PLATFORM_ID = '6405ef617056e8014d79101b';
const FINGERPRINT_DEMO_URL = 'https://demo.fingerprint.com/';
const AMIUNIQUE_URL = 'https://amiunique.org/fingerprint';
// IP geolocation API for timezone lookup (free, no key required)
const IP_API_URL = 'http://ip-api.com/json';
/**
* Look up timezone from IP address using ip-api.com
* Returns IANA timezone (e.g., 'America/New_York') or null on failure
*/
async function getTimezoneFromIp(ip: string): Promise<{ timezone: string; city?: string; region?: string } | null> {
try {
const axios = require('axios');
const response = await axios.get(`${IP_API_URL}/${ip}?fields=status,timezone,city,regionName`, {
timeout: 5000,
});
if (response.data?.status === 'success' && response.data?.timezone) {
return {
timezone: response.data.timezone,
city: response.data.city,
region: response.data.regionName,
};
}
return null;
} catch (err: any) {
console.log(`[PuppeteerPreflight] IP geolocation lookup failed: ${err.message}`);
return null;
}
}
export interface PuppeteerPreflightResult extends PreflightResult {
method: 'http';
/** Number of products returned (proves API access) */
@@ -70,13 +42,6 @@ export interface PuppeteerPreflightResult extends PreflightResult {
expectedProxyIp?: string;
/** Whether IP verification passed (detected IP matches proxy) */
ipVerified?: boolean;
/** Detected timezone from IP geolocation */
detectedTimezone?: string;
/** Detected location from IP geolocation */
detectedLocation?: {
city?: string;
region?: string;
};
}
/**
@@ -171,82 +136,226 @@ export async function runPuppeteerPreflight(
};
// =========================================================================
// STEP 1a: Get IP address directly via simple API (more reliable than scraping)
// STEP 1: Visit fingerprint.com demo to verify anti-detect and get IP
// =========================================================================
console.log(`[PuppeteerPreflight] Getting proxy IP address...`);
console.log(`[PuppeteerPreflight] Testing anti-detect at ${FINGERPRINT_DEMO_URL}...`);
try {
const ipApiResponse = await page.evaluate(async () => {
try {
const response = await fetch('https://api.ipify.org?format=json');
const data = await response.json();
return { ip: data.ip, error: null };
} catch (err: any) {
return { ip: null, error: err.message };
}
await page.goto(FINGERPRINT_DEMO_URL, {
waitUntil: 'networkidle2',
timeout: 30000,
});
if (ipApiResponse.ip) {
result.proxyIp = ipApiResponse.ip;
result.proxyConnected = true;
console.log(`[PuppeteerPreflight] Detected proxy IP: ${ipApiResponse.ip}`);
result.proxyConnected = true; // If we got here, proxy is working
// Look up timezone from IP
const geoData = await getTimezoneFromIp(ipApiResponse.ip);
if (geoData) {
result.detectedTimezone = geoData.timezone;
result.detectedLocation = { city: geoData.city, region: geoData.region };
console.log(`[PuppeteerPreflight] IP Geolocation: ${geoData.city}, ${geoData.region} (${geoData.timezone})`);
// Wait for fingerprint results to load
await page.waitForSelector('[data-test="visitor-id"]', { timeout: 10000 }).catch(() => {});
// Set browser timezone to match proxy location via CDP
try {
const client = await page.target().createCDPSession();
await client.send('Emulation.setTimezoneOverride', { timezoneId: geoData.timezone });
console.log(`[PuppeteerPreflight] Browser timezone set to: ${geoData.timezone}`);
} catch (tzErr: any) {
console.log(`[PuppeteerPreflight] Failed to set browser timezone: ${tzErr.message}`);
}
} else {
console.log(`[PuppeteerPreflight] WARNING: Could not determine timezone from IP - timezone mismatch possible`);
// Extract fingerprint data from the page
const fingerprintData = await page.evaluate(() => {
// Try to find the IP address displayed on the page
const ipElement = document.querySelector('[data-test="ip-address"]');
const ip = ipElement?.textContent?.trim() || null;
// Try to find bot detection info
const botElement = document.querySelector('[data-test="bot-detected"]');
const botDetected = botElement?.textContent?.toLowerCase().includes('true') || false;
// Try to find visitor ID (proves fingerprinting worked)
const visitorIdElement = document.querySelector('[data-test="visitor-id"]');
const visitorId = visitorIdElement?.textContent?.trim() || null;
// Alternative: look for common UI patterns if data-test attrs not present
let detectedIp = ip;
if (!detectedIp) {
// Look for IP in any element containing IP-like pattern
const allText = document.body.innerText;
const ipMatch = allText.match(/\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b/);
detectedIp = ipMatch ? ipMatch[1] : null;
}
return {
ip: detectedIp,
botDetected,
visitorId,
pageLoaded: !!document.body,
};
});
if (fingerprintData.ip) {
result.proxyIp = fingerprintData.ip;
console.log(`[PuppeteerPreflight] Detected IP: ${fingerprintData.ip}`);
// Verify IP matches expected proxy
if (expectedProxyHost) {
// Check if detected IP contains the proxy host (or is close match)
if (fingerprintData.ip === expectedProxyHost ||
expectedProxyHost.includes(fingerprintData.ip) ||
fingerprintData.ip.includes(expectedProxyHost.split('.').slice(0, 3).join('.'))) {
result.ipVerified = true;
console.log(`[PuppeteerPreflight] IP VERIFIED - matches proxy`);
} else {
console.log(`[PuppeteerPreflight] IP mismatch: expected ${expectedProxyHost}, got ${fingerprintData.ip}`);
// Don't fail - residential proxies often show different egress IPs
}
}
} else {
console.log(`[PuppeteerPreflight] IP lookup failed: ${ipApiResponse.error || 'unknown error'}`);
}
} catch (ipErr: any) {
console.log(`[PuppeteerPreflight] IP API error: ${ipErr.message}`);
if (fingerprintData.visitorId) {
console.log(`[PuppeteerPreflight] Fingerprint visitor ID: ${fingerprintData.visitorId}`);
}
result.botDetection = {
detected: fingerprintData.botDetected,
};
if (fingerprintData.botDetected) {
console.log(`[PuppeteerPreflight] WARNING: Bot detection triggered!`);
} else {
console.log(`[PuppeteerPreflight] Anti-detect check: NOT detected as bot`);
result.antidetectReady = true;
}
} catch (fpErr: any) {
// Could mean proxy connection failed
console.log(`[PuppeteerPreflight] Fingerprint.com check failed: ${fpErr.message}`);
if (fpErr.message.includes('net::ERR_PROXY') || fpErr.message.includes('ECONNREFUSED')) {
result.error = `Proxy connection failed: ${fpErr.message}`;
return result;
}
// Try fallback: amiunique.org
console.log(`[PuppeteerPreflight] Trying fallback: ${AMIUNIQUE_URL}...`);
try {
await page.goto(AMIUNIQUE_URL, {
waitUntil: 'networkidle2',
timeout: 30000,
});
result.proxyConnected = true;
// Extract IP from amiunique.org page
const amiData = await page.evaluate(() => {
const allText = document.body.innerText;
const ipMatch = allText.match(/\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b/);
return {
ip: ipMatch ? ipMatch[1] : null,
pageLoaded: !!document.body,
};
});
if (amiData.ip) {
result.proxyIp = amiData.ip;
console.log(`[PuppeteerPreflight] Detected IP via amiunique.org: ${amiData.ip}`);
}
result.antidetectReady = true;
console.log(`[PuppeteerPreflight] amiunique.org fallback succeeded`);
} catch (amiErr: any) {
console.log(`[PuppeteerPreflight] amiunique.org fallback also failed: ${amiErr.message}`);
// Continue with Dutchie test anyway
result.proxyConnected = true;
result.antidetectReady = true;
}
}
// =========================================================================
// STEP 2: Preflight complete - proxy verified via ipify.org
// We skip heavy fingerprint.com/amiunique.org tests - just verify proxy works
// The actual Dutchie test happens at task time.
// STEP 2: Test Dutchie API access (the real test)
// =========================================================================
const embedUrl = `https://dutchie.com/embedded-menu/${TEST_CNAME}?menuType=rec`;
console.log(`[PuppeteerPreflight] Establishing session at ${embedUrl}...`);
await page.goto(embedUrl, {
waitUntil: 'networkidle2',
timeout: 30000,
});
// Make GraphQL request from browser context
const graphqlResult = await page.evaluate(
async (platformId: string, hash: string) => {
try {
const variables = {
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: platformId,
pricingType: 'rec',
Status: 'Active', // CRITICAL: Must be 'Active' per CLAUDE.md
types: [],
useCache: true,
isDefaultSort: true,
sortBy: 'popularSortIdx',
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false,
},
page: 0,
perPage: 10, // Just need a few to prove it works
};
const extensions = {
persistedQuery: {
version: 1,
sha256Hash: hash,
},
};
const qs = new URLSearchParams({
operationName: 'FilteredProducts',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const sessionId = 'preflight-' + Date.now();
const response = await fetch(url, {
method: 'GET',
headers: {
Accept: 'application/json',
'content-type': 'application/json',
'x-dutchie-session': sessionId,
'apollographql-client-name': 'Marketplace (production)',
},
credentials: 'include',
});
if (!response.ok) {
return { error: `HTTP ${response.status}`, products: 0 };
}
const json = await response.json();
if (json.errors) {
return { error: JSON.stringify(json.errors).slice(0, 200), products: 0 };
}
const products = json?.data?.filteredProducts?.products || [];
return { error: null, products: products.length };
} catch (err: any) {
return { error: err.message || 'Unknown error', products: 0 };
}
},
TEST_PLATFORM_ID,
FILTERED_PRODUCTS_HASH
);
// If we got an IP from ipify.org, proxy is working
if (result.proxyIp) {
result.proxyConnected = true;
result.antidetectReady = true; // Assume stealth plugin is working
}
result.responseTimeMs = Date.now() - startTime;
// If we got here with proxyConnected=true and antidetectReady=true, we're good
if (result.proxyConnected && result.antidetectReady) {
if (graphqlResult.error) {
result.error = `GraphQL error: ${graphqlResult.error}`;
console.log(`[PuppeteerPreflight] FAILED - ${result.error}`);
} else if (graphqlResult.products === 0) {
result.error = 'GraphQL returned 0 products';
console.log(`[PuppeteerPreflight] FAILED - No products returned`);
} else {
result.passed = true;
result.productsReturned = graphqlResult.products;
console.log(
`[PuppeteerPreflight] PASSED - Proxy connected, anti-detect ready (${result.responseTimeMs}ms)`
`[PuppeteerPreflight] PASSED - Got ${graphqlResult.products} products in ${result.responseTimeMs}ms`
);
if (result.proxyIp) {
console.log(`[PuppeteerPreflight] Browser IP via proxy: ${result.proxyIp}`);
}
} else if (result.proxyConnected) {
// Proxy works but anti-detect check failed - still pass (anti-detect is best-effort)
result.passed = true;
result.antidetectReady = true; // Assume ready since proxy works
console.log(
`[PuppeteerPreflight] PASSED - Proxy connected (anti-detect check skipped, ${result.responseTimeMs}ms)`
);
} else {
result.error = result.error || 'Proxy connection failed';
console.log(`[PuppeteerPreflight] FAILED - ${result.error}`);
}
} catch (err: any) {
result.error = `Browser error: ${err.message || 'Unknown error'}`;

View File

@@ -26,12 +26,6 @@ interface TaskSchedule {
next_run_at: Date | null;
state_code: string | null;
priority: number;
method: 'curl' | 'http' | null;
is_immutable: boolean;
description: string | null;
platform: string | null;
last_task_count: number | null;
last_error: string | null;
}
class TaskScheduler {
@@ -90,22 +84,24 @@ class TaskScheduler {
/**
* Ensure default schedules exist in the database
* Per TASK_WORKFLOW_2024-12-10.md: Creates schedules if they don't exist
*
* NOTE: Per-state product_discovery schedules are created by migration 089.
* This only creates core immutable schedules that should exist regardless.
*/
private async ensureDefaultSchedules(): Promise<void> {
// Core schedules - all use HTTP transport for browser-based scraping
// Per TASK_WORKFLOW_2024-12-10.md: Default schedules for task generation
// NOTE: payload_fetch replaces direct product_refresh - it chains to product_refresh
const defaults = [
{
name: 'payload_fetch_all',
role: 'payload_fetch' as TaskRole,
interval_hours: 4,
priority: 0,
description: 'Fetch payloads from Dutchie API for all crawl-enabled stores every 4 hours. Chains to product_refresh.',
},
{
name: 'store_discovery_dutchie',
role: 'store_discovery' as TaskRole,
interval_hours: 168, // Weekly
interval_hours: 24,
priority: 5,
description: 'Discover new Dutchie stores weekly (HTTP transport)',
method: 'http',
is_immutable: true,
platform: 'dutchie',
description: 'Discover new Dutchie stores daily',
},
{
name: 'analytics_refresh',
@@ -113,21 +109,16 @@ class TaskScheduler {
interval_hours: 6,
priority: 0,
description: 'Refresh analytics materialized views every 6 hours',
method: 'http',
is_immutable: true,
platform: null,
},
];
for (const sched of defaults) {
try {
await pool.query(`
INSERT INTO task_schedules (name, role, interval_hours, priority, description, method, is_immutable, platform, enabled, next_run_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true, NOW())
ON CONFLICT (name) DO UPDATE SET
method = EXCLUDED.method,
is_immutable = EXCLUDED.is_immutable
`, [sched.name, sched.role, sched.interval_hours, sched.priority, sched.description, sched.method, sched.is_immutable, sched.platform]);
INSERT INTO task_schedules (name, role, interval_hours, priority, description, enabled, next_run_at)
VALUES ($1, $2, $3, $4, $5, true, NOW())
ON CONFLICT (name) DO NOTHING
`, [sched.name, sched.role, sched.interval_hours, sched.priority, sched.description]);
} catch (err: any) {
// Table may not exist yet - will be created by migration
if (!err.message.includes('does not exist')) {
@@ -201,27 +192,16 @@ class TaskScheduler {
/**
* Execute a schedule and create tasks
* Per TASK_WORKFLOW_2024-12-10.md: Different logic per role
*
* TRANSPORT MODES:
* - All schedules now use HTTP transport (Puppeteer/browser)
* - Per-state product_discovery schedules process one state at a time
* - Workers must pass HTTP preflight to claim HTTP tasks
*/
private async executeSchedule(schedule: TaskSchedule): Promise<number> {
switch (schedule.role) {
case 'product_discovery':
// Per-state product discovery using HTTP transport
return this.generateProductDiscoveryTasks(schedule);
case 'payload_fetch':
// DEPRECATED: Legacy payload_fetch redirects to product_discovery
console.log(`[TaskScheduler] payload_fetch is deprecated, using product_discovery instead`);
return this.generateProductDiscoveryTasks(schedule);
// Per TASK_WORKFLOW_2024-12-10.md: payload_fetch replaces direct product_refresh
return this.generatePayloadFetchTasks(schedule);
case 'product_refresh':
// DEPRECATED: Legacy product_refresh redirects to product_discovery
console.log(`[TaskScheduler] product_refresh is deprecated, using product_discovery instead`);
return this.generateProductDiscoveryTasks(schedule);
// Legacy - kept for manual triggers, but scheduled crawls use payload_fetch
return this.generatePayloadFetchTasks(schedule);
case 'store_discovery':
return this.generateStoreDiscoveryTasks(schedule);
@@ -236,69 +216,50 @@ class TaskScheduler {
}
/**
* Generate product_discovery tasks for stores in a specific state
* Uses HTTP transport (Puppeteer/browser) for all tasks
*
* Per-state scheduling allows:
* - Different crawl frequencies per state (e.g., AZ=4h, MI=6h)
* - Better rate limit management (one state at a time)
* - Easier debugging and monitoring per state
* Generate payload_fetch tasks for stores that need crawling
* Per TASK_WORKFLOW_2024-12-10.md: payload_fetch hits API, saves to disk, chains to product_refresh
*/
private async generateProductDiscoveryTasks(schedule: TaskSchedule): Promise<number> {
// state_code is required for per-state schedules
if (!schedule.state_code) {
console.warn(`[TaskScheduler] Schedule ${schedule.name} has no state_code, skipping`);
return 0;
}
// Find stores in this state needing refresh
private async generatePayloadFetchTasks(schedule: TaskSchedule): Promise<number> {
// Per TASK_WORKFLOW_2024-12-10.md: Find stores needing refresh
const result = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND s.code = $1
-- No pending/running product_discovery task already
-- No pending/running payload_fetch or product_refresh task already
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role = 'product_discovery'
AND t.role IN ('payload_fetch', 'product_refresh')
AND t.status IN ('pending', 'claimed', 'running')
)
-- Never fetched OR last fetch > interval ago
AND (
d.last_fetch_at IS NULL
OR d.last_fetch_at < NOW() - ($2 || ' hours')::interval
OR d.last_fetch_at < NOW() - ($1 || ' hours')::interval
)
ORDER BY d.last_fetch_at NULLS FIRST, d.id
`, [schedule.state_code, schedule.interval_hours]);
${schedule.state_code ? 'AND d.state_id = (SELECT id FROM states WHERE code = $2)' : ''}
`, schedule.state_code ? [schedule.interval_hours, schedule.state_code] : [schedule.interval_hours]);
const dispensaryIds = result.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length === 0) {
console.log(`[TaskScheduler] No stores in ${schedule.state_code} need refresh`);
return 0;
}
console.log(`[TaskScheduler] Creating ${dispensaryIds.length} product_discovery tasks for ${schedule.state_code}`);
// Per TASK_WORKFLOW_2024-12-10.md: Create payload_fetch tasks (they chain to product_refresh)
const tasks = dispensaryIds.map((id: number) => ({
role: 'payload_fetch' as TaskRole,
dispensary_id: id,
priority: schedule.priority,
}));
// Create product_discovery tasks with HTTP transport
// Stagger by 15 seconds to prevent overwhelming proxies
const { created } = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery',
15, // 15 seconds apart
schedule.platform || 'dutchie',
'http' // Force HTTP transport
);
return created;
return taskService.createTasks(tasks);
}
/**
* Generate store_discovery tasks
* Uses HTTP transport (Puppeteer/browser) for browser-based discovery
* Per TASK_WORKFLOW_2024-12-10.md: One task per platform
*/
private async generateStoreDiscoveryTasks(schedule: TaskSchedule): Promise<number> {
// Check if discovery task already pending
@@ -315,9 +276,8 @@ class TaskScheduler {
await taskService.createTask({
role: 'store_discovery',
platform: schedule.platform || 'dutchie',
platform: 'dutchie',
priority: schedule.priority,
method: 'http', // Force HTTP transport for browser-based discovery
});
return 1;
@@ -350,39 +310,11 @@ class TaskScheduler {
/**
* Get all schedules for dashboard display
* Returns schedules with full metadata including immutability flag
*/
async getSchedules(): Promise<TaskSchedule[]> {
try {
const result = await pool.query(`
SELECT
id,
name,
role,
enabled,
interval_hours,
last_run_at,
next_run_at,
state_code,
priority,
method,
COALESCE(is_immutable, false) as is_immutable,
description,
platform,
last_task_count,
last_error,
created_at,
updated_at
FROM task_schedules
ORDER BY
CASE role
WHEN 'store_discovery' THEN 1
WHEN 'product_discovery' THEN 2
WHEN 'analytics_refresh' THEN 3
ELSE 4
END,
state_code NULLS FIRST,
name
SELECT * FROM task_schedules ORDER BY name
`);
return result.rows as TaskSchedule[];
} catch {
@@ -390,24 +322,8 @@ class TaskScheduler {
}
}
/**
* Get a single schedule by ID
*/
async getSchedule(id: number): Promise<TaskSchedule | null> {
try {
const result = await pool.query(`
SELECT * FROM task_schedules WHERE id = $1
`, [id]);
return result.rows[0] as TaskSchedule || null;
} catch {
return null;
}
}
/**
* Update a schedule
* Allows updating: enabled, interval_hours, priority
* Does NOT allow updating: name, role, state_code, is_immutable
*/
async updateSchedule(id: number, updates: Partial<TaskSchedule>): Promise<void> {
const setClauses: string[] = [];
@@ -439,33 +355,6 @@ class TaskScheduler {
`, values);
}
/**
* Delete a schedule (only if not immutable)
* Returns true if deleted, false if immutable
*/
async deleteSchedule(id: number): Promise<{ deleted: boolean; reason?: string }> {
// Check if schedule is immutable
const result = await pool.query(`
SELECT name, is_immutable FROM task_schedules WHERE id = $1
`, [id]);
if (result.rows.length === 0) {
return { deleted: false, reason: 'Schedule not found' };
}
const schedule = result.rows[0];
if (schedule.is_immutable) {
return {
deleted: false,
reason: `Schedule "${schedule.name}" is immutable and cannot be deleted. You can disable it instead.`
};
}
await pool.query(`DELETE FROM task_schedules WHERE id = $1`, [id]);
return { deleted: true };
}
/**
* Trigger a schedule to run immediately
*/
@@ -480,46 +369,6 @@ class TaskScheduler {
return this.executeSchedule(result.rows[0] as TaskSchedule);
}
/**
* Get schedule statistics for dashboard
*/
async getScheduleStats(): Promise<{
total: number;
enabled: number;
byRole: Record<string, number>;
byState: Record<string, number>;
}> {
try {
const result = await pool.query(`
SELECT
COUNT(*)::int as total,
SUM(CASE WHEN enabled THEN 1 ELSE 0 END)::int as enabled_count,
role,
state_code
FROM task_schedules
GROUP BY role, state_code
`);
let total = 0;
let enabled = 0;
const byRole: Record<string, number> = {};
const byState: Record<string, number> = {};
for (const row of result.rows) {
total += row.total;
enabled += row.enabled_count;
byRole[row.role] = (byRole[row.role] || 0) + row.total;
if (row.state_code) {
byState[row.state_code] = (byState[row.state_code] || 0) + row.total;
}
}
return { total, enabled, byRole, byState };
} catch {
return { total: 0, enabled: 0, byRole: {}, byState: {} };
}
}
}
// Per TASK_WORKFLOW_2024-12-10.md: Singleton instance

View File

@@ -2,18 +2,11 @@
* Task Handlers Index
*
* Exports all task handlers for the task worker.
*
* Product Discovery:
* - handleProductDiscoveryCurl: curl/axios based (for curl transport)
* - handleProductDiscoveryHttp: Puppeteer browser-based (for http transport)
*/
export { handleProductDiscovery as handleProductDiscoveryCurl } from './product-discovery-curl';
export { handleProductDiscoveryHttp } from './product-discovery-http';
export { handlePayloadFetch as handlePayloadFetchCurl } from './payload-fetch-curl';
export { handleProductDiscovery } from './product-discovery';
export { handleProductRefresh } from './product-refresh';
export { handleStoreDiscovery } from './store-discovery';
export { handleStoreDiscoveryHttp } from './store-discovery-http';
export { handleEntryPointDiscovery } from './entry-point-discovery';
export { handleAnalyticsRefresh } from './analytics-refresh';
export { handleWhoami } from './whoami';

View File

@@ -1,363 +0,0 @@
/**
* Product Discovery HTTP Handler (Browser-based)
*
* Uses Puppeteer + StealthPlugin to fetch products via browser context.
* Based on test-intercept.js pattern from ORGANIC_SCRAPING_GUIDE.md.
*
* This handler:
* 1. Loads dispensary info
* 2. Launches headless browser with proxy (if provided)
* 3. Establishes session by visiting embedded menu
* 4. Fetches ALL products via GraphQL from browser context
* 5. Saves raw payload to filesystem (gzipped)
* 6. Records metadata in raw_crawl_payloads table
* 7. Queues product_refresh task to process the payload
*
* Why browser-based:
* - Works with session-based residential proxies (Evomi)
* - Lower detection risk than curl/axios
* - Real Chrome TLS fingerprint
*/
import { TaskContext, TaskResult } from '../task-worker';
import { saveRawPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
// GraphQL hash for FilteredProducts query - MUST match CLAUDE.md
const FILTERED_PRODUCTS_HASH = 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0';
export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
return { success: false, error: 'No dispensary_id specified for product_discovery task' };
}
let browser: any = null;
try {
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
FROM dispensaries
WHERE id = $1 AND crawl_enabled = true
`, [dispensaryId]);
if (dispResult.rows.length === 0) {
return { success: false, error: `Dispensary ${dispensaryId} not found or not crawl_enabled` };
}
const dispensary = dispResult.rows[0];
const platformId = dispensary.platform_dispensary_id;
if (!platformId) {
return { success: false, error: `Dispensary ${dispensaryId} has no platform_dispensary_id` };
}
// Extract cName from menu_url
const cNameMatch = dispensary.menu_url?.match(/\/(?:embedded-menu|dispensary)\/([^/?]+)/);
const cName = cNameMatch ? cNameMatch[1] : 'dispensary';
console.log(`[ProductDiscoveryHTTP] Starting for ${dispensary.name} (ID: ${dispensaryId})`);
console.log(`[ProductDiscoveryHTTP] Platform ID: ${platformId}, cName: ${cName}`);
await ctx.heartbeat();
// ============================================================
// STEP 2: Setup Puppeteer with proxy
// ============================================================
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
// Get proxy from CrawlRotator if available
let proxyUrl: string | null = null;
if (crawlRotator) {
const currentProxy = crawlRotator.proxy.getCurrent();
if (currentProxy) {
proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy);
console.log(`[ProductDiscoveryHTTP] Using proxy: ${currentProxy.host}:${currentProxy.port}`);
}
}
// Build browser args
const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox'];
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`);
}
browser = await puppeteer.launch({
headless: 'new',
args: browserArgs,
});
const page = await browser.newPage();
// Setup proxy auth if needed
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
if (proxyUrlParsed.username && proxyUrlParsed.password) {
await page.authenticate({
username: decodeURIComponent(proxyUrlParsed.username),
password: decodeURIComponent(proxyUrlParsed.password),
});
}
}
await ctx.heartbeat();
// ============================================================
// STEP 3: Establish session by visiting embedded menu
// ============================================================
const embedUrl = `https://dutchie.com/embedded-menu/${cName}?menuType=rec`;
console.log(`[ProductDiscoveryHTTP] Establishing session at ${embedUrl}...`);
await page.goto(embedUrl, {
waitUntil: 'networkidle2',
timeout: 60000,
});
// ============================================================
// STEP 3b: Detect and dismiss age gate modal
// ============================================================
try {
// Wait a bit for age gate to appear
await page.waitForTimeout(1500);
// Look for common age gate selectors
const ageGateSelectors = [
'button[data-testid="age-gate-submit"]',
'button:has-text("Yes")',
'button:has-text("I am 21")',
'button:has-text("Enter")',
'[class*="age-gate"] button',
'[class*="AgeGate"] button',
'[data-test="age-gate-button"]',
];
for (const selector of ageGateSelectors) {
try {
const button = await page.$(selector);
if (button) {
await button.click();
console.log(`[ProductDiscoveryHTTP] Age gate dismissed via: ${selector}`);
await page.waitForTimeout(1000); // Wait for modal to close
break;
}
} catch {
// Selector not found, try next
}
}
// Also try evaluating in page context for button with specific text
await page.evaluate(() => {
const buttons = Array.from(document.querySelectorAll('button'));
for (const btn of buttons) {
const text = btn.textContent?.toLowerCase() || '';
if (text.includes('yes') || text.includes('enter') || text.includes('21')) {
(btn as HTMLButtonElement).click();
return true;
}
}
return false;
});
} catch (ageGateErr) {
// Age gate might not be present, continue
console.log(`[ProductDiscoveryHTTP] No age gate detected or already dismissed`);
}
console.log(`[ProductDiscoveryHTTP] Session established, fetching products...`);
await ctx.heartbeat();
// ============================================================
// STEP 4: Fetch ALL products via GraphQL from browser context
// ============================================================
const result = await page.evaluate(async (platformId: string, graphqlHash: string) => {
const allProducts: any[] = [];
const logs: string[] = [];
let pageNum = 0;
const perPage = 100;
let totalCount = 0;
const sessionId = 'browser-session-' + Date.now();
try {
while (pageNum < 30) { // Max 30 pages = 3000 products
const variables = {
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: platformId,
pricingType: 'rec',
Status: 'Active', // CRITICAL: Must be 'Active', not null
types: [],
useCache: true,
isDefaultSort: true,
sortBy: 'popularSortIdx',
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false,
},
page: pageNum,
perPage: perPage,
};
const extensions = {
persistedQuery: {
version: 1,
sha256Hash: graphqlHash,
},
};
// Build GET URL like the browser does
const qs = new URLSearchParams({
operationName: 'FilteredProducts',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
'x-dutchie-session': sessionId,
'apollographql-client-name': 'Marketplace (production)',
},
credentials: 'include',
});
logs.push(`Page ${pageNum}: HTTP ${response.status}`);
if (!response.ok) {
const text = await response.text();
logs.push(`HTTP error: ${response.status} - ${text.slice(0, 200)}`);
break;
}
const json = await response.json();
if (json.errors) {
logs.push(`GraphQL error: ${JSON.stringify(json.errors).slice(0, 200)}`);
break;
}
const data = json?.data?.filteredProducts;
if (!data || !data.products) {
logs.push('No products in response');
break;
}
const products = data.products;
allProducts.push(...products);
if (pageNum === 0) {
totalCount = data.queryInfo?.totalCount || 0;
logs.push(`Total reported: ${totalCount}`);
}
logs.push(`Got ${products.length} products (total: ${allProducts.length}/${totalCount})`);
if (allProducts.length >= totalCount || products.length < perPage) {
break;
}
pageNum++;
// Small delay between pages to be polite
await new Promise(r => setTimeout(r, 200));
}
} catch (err: any) {
logs.push(`Error: ${err.message}`);
}
return { products: allProducts, totalCount, logs };
}, platformId, FILTERED_PRODUCTS_HASH);
// Print logs from browser context
result.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
console.log(`[ProductDiscoveryHTTP] Fetched ${result.products.length} products (API reported ${result.totalCount})`);
await browser.close();
browser = null;
if (result.products.length === 0) {
return {
success: false,
error: 'No products returned from GraphQL',
productsProcessed: 0,
};
}
await ctx.heartbeat();
// ============================================================
// STEP 5: Save raw payload to filesystem
// ============================================================
const rawPayload = {
dispensaryId,
platformId,
cName,
fetchedAt: new Date().toISOString(),
productCount: result.products.length,
products: result.products,
};
const payloadResult = await saveRawPayload(
pool,
dispensaryId,
rawPayload,
null, // crawl_run_id - not using crawl_runs in new system
result.products.length
);
console.log(`[ProductDiscoveryHTTP] Saved payload #${payloadResult.id} (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`);
// ============================================================
// STEP 6: Update dispensary last_fetch_at
// ============================================================
await pool.query(`
UPDATE dispensaries
SET last_fetch_at = NOW()
WHERE id = $1
`, [dispensaryId]);
// ============================================================
// STEP 7: Queue product_refresh task to process the payload
// ============================================================
await taskService.createTask({
role: 'product_refresh',
dispensary_id: dispensaryId,
priority: task.priority || 0,
payload: { payload_id: payloadResult.id },
});
console.log(`[ProductDiscoveryHTTP] Queued product_refresh task for payload #${payloadResult.id}`);
return {
success: true,
payloadId: payloadResult.id,
productCount: result.products.length,
sizeBytes: payloadResult.sizeBytes,
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[ProductDiscoveryHTTP] Error for dispensary ${dispensaryId}:`, errorMessage);
return {
success: false,
error: errorMessage,
};
} finally {
if (browser) {
await browser.close().catch(() => {});
}
}
}

View File

@@ -13,7 +13,7 @@
*/
import { TaskContext, TaskResult } from '../task-worker';
import { handlePayloadFetch } from './payload-fetch-curl';
import { handlePayloadFetch } from './payload-fetch';
export async function handleProductDiscovery(ctx: TaskContext): Promise<TaskResult> {
const { task } = ctx;

View File

@@ -27,7 +27,6 @@ import {
downloadProductImages,
} from '../../hydration/canonical-upsert';
import { loadRawPayloadById, getLatestPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
const normalizer = new DutchieNormalizer();
@@ -87,37 +86,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// Load latest payload for this dispensary
const result = await getLatestPayload(pool, dispensaryId);
if (!result) {
// No payload exists - queue upstream task to fetch products
console.log(`[ProductRefresh] No payload found for dispensary ${dispensaryId} - queuing upstream task`);
if (dispensary.platform_dispensary_id) {
// Has platform ID - can go straight to product_discovery
console.log(`[ProductRefresh] Dispensary has platform_dispensary_id - queuing product_discovery (http)`);
await taskService.createTask({
role: 'product_discovery',
dispensary_id: dispensaryId,
priority: task.priority || 0,
method: 'http', // Use browser-based handler for session proxies
});
return {
success: true,
queued: 'product_discovery',
reason: 'No payload exists - queued product_discovery to fetch initial data',
};
} else {
// No platform ID - need entry_point_discovery first
console.log(`[ProductRefresh] Dispensary missing platform_dispensary_id - queuing entry_point_discovery`);
await taskService.createTask({
role: 'entry_point_discovery',
dispensary_id: dispensaryId,
priority: task.priority || 0,
});
return {
success: true,
queued: 'entry_point_discovery',
reason: 'No payload and no platform_dispensary_id - queued entry_point_discovery to resolve ID',
};
}
return { success: false, error: `No payload found for dispensary ${dispensaryId}` };
}
payloadData = result.payload;
payloadId = result.metadata.id;

View File

@@ -1,480 +0,0 @@
/**
* Store Discovery HTTP Handler (Browser-based)
*
* Uses Puppeteer + StealthPlugin to discover stores via browser context.
* Based on product-discovery-http.ts pattern.
*
* This handler:
* 1. Launches headless browser with proxy (if provided)
* 2. Establishes session by visiting Dutchie dispensaries page
* 3. Fetches cities for each state via getAllCitiesByState GraphQL
* 4. Fetches stores for each city via ConsumerDispensaries GraphQL
* 5. Upserts to dutchie_discovery_locations
* 6. Auto-promotes valid locations to dispensaries table
*
* Why browser-based:
* - Works with session-based residential proxies (Evomi)
* - Lower detection risk than curl/axios
* - Real Chrome TLS fingerprint
*/
import { TaskContext, TaskResult } from '../task-worker';
import { upsertLocation } from '../../discovery/location-discovery';
import { promoteDiscoveredLocations } from '../../discovery/promotion';
import { saveDiscoveryPayload } from '../../utils/payload-storage';
// GraphQL hashes - MUST match CLAUDE.md / dutchie/client.ts
const GET_ALL_CITIES_HASH = 'ae547a0466ace5a48f91e55bf6699eacd87e3a42841560f0c0eabed5a0a920e6';
const CONSUMER_DISPENSARIES_HASH = '0a5bfa6ca1d64ae47bcccb7c8077c87147cbc4e6982c17ceec97a2a4948b311b';
interface StateWithCities {
name: string;
country: string;
cities: string[];
}
interface DiscoveredLocation {
id: string;
name: string;
slug: string;
cName?: string;
address?: string;
city?: string;
state?: string;
zip?: string;
latitude?: number;
longitude?: number;
offerPickup?: boolean;
offerDelivery?: boolean;
isRecreational?: boolean;
isMedical?: boolean;
phone?: string;
email?: string;
website?: string;
description?: string;
logoImage?: string;
bannerImage?: string;
chainSlug?: string;
enterpriseId?: string;
retailType?: string;
status?: string;
timezone?: string;
location?: {
ln1?: string;
ln2?: string;
city?: string;
state?: string;
zipcode?: string;
country?: string;
geometry?: { coordinates?: [number, number] };
};
}
export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator } = ctx;
const platform = task.platform || 'dutchie';
let browser: any = null;
try {
console.log(`[StoreDiscoveryHTTP] Starting discovery for platform: ${platform}`);
// ============================================================
// STEP 1: Setup Puppeteer with proxy
// ============================================================
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
// Get proxy from CrawlRotator if available
let proxyUrl: string | null = null;
if (crawlRotator) {
const currentProxy = crawlRotator.proxy.getCurrent();
if (currentProxy) {
proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy);
console.log(`[StoreDiscoveryHTTP] Using proxy: ${currentProxy.host}:${currentProxy.port}`);
}
}
// Build browser args
const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox'];
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`);
}
browser = await puppeteer.launch({
headless: 'new',
args: browserArgs,
});
const page = await browser.newPage();
// Setup proxy auth if needed
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
if (proxyUrlParsed.username && proxyUrlParsed.password) {
await page.authenticate({
username: decodeURIComponent(proxyUrlParsed.username),
password: decodeURIComponent(proxyUrlParsed.password),
});
}
}
await ctx.heartbeat();
// ============================================================
// STEP 2: Establish session by visiting dispensaries page
// ============================================================
const sessionUrl = 'https://dutchie.com/dispensaries';
console.log(`[StoreDiscoveryHTTP] Establishing session at ${sessionUrl}...`);
await page.goto(sessionUrl, {
waitUntil: 'networkidle2',
timeout: 60000,
});
// Handle potential age gate
try {
await page.waitForTimeout(1500);
await page.evaluate(() => {
const buttons = Array.from(document.querySelectorAll('button'));
for (const btn of buttons) {
const text = btn.textContent?.toLowerCase() || '';
if (text.includes('yes') || text.includes('enter') || text.includes('21')) {
(btn as HTMLButtonElement).click();
return true;
}
}
return false;
});
} catch {
// Age gate might not be present
}
console.log(`[StoreDiscoveryHTTP] Session established`);
await ctx.heartbeat();
// ============================================================
// STEP 3: Get states to discover from database
// ============================================================
const statesResult = await pool.query(`
SELECT code FROM states WHERE is_active = true ORDER BY code
`);
const stateCodesToDiscover = statesResult.rows.map((r: { code: string }) => r.code);
if (stateCodesToDiscover.length === 0) {
await browser.close();
return { success: true, storesDiscovered: 0, newStoreIds: [], message: 'No active states to discover' };
}
console.log(`[StoreDiscoveryHTTP] Will discover stores in ${stateCodesToDiscover.length} states`);
// ============================================================
// STEP 4: Fetch cities for each state via GraphQL
// ============================================================
const statesWithCities = await page.evaluate(async (hash: string) => {
const logs: string[] = [];
try {
const extensions = {
persistedQuery: { version: 1, sha256Hash: hash },
};
const qs = new URLSearchParams({
operationName: 'getAllCitiesByState',
variables: JSON.stringify({}),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
},
credentials: 'include',
});
logs.push(`getAllCitiesByState: HTTP ${response.status}`);
if (!response.ok) {
return { states: [], logs };
}
const json = await response.json();
const statesData = json?.data?.statesWithDispensaries || [];
const states: StateWithCities[] = [];
for (const state of statesData) {
if (state && state.name) {
const cities = Array.isArray(state.cities)
? state.cities.filter((c: string | null) => c !== null)
: [];
states.push({
name: state.name,
country: state.country || 'US',
cities,
});
}
}
logs.push(`Found ${states.length} states with cities`);
return { states, logs };
} catch (err: any) {
logs.push(`Error: ${err.message}`);
return { states: [], logs };
}
}, GET_ALL_CITIES_HASH);
statesWithCities.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
if (statesWithCities.states.length === 0) {
await browser.close();
return { success: false, error: 'Failed to fetch states with cities' };
}
await ctx.heartbeat();
// ============================================================
// STEP 5: For each active state, fetch stores for each city
// ============================================================
let totalDiscovered = 0;
let totalUpserted = 0;
const allNewStoreIds: number[] = [];
for (const stateCode of stateCodesToDiscover) {
const stateData = statesWithCities.states.find(
(s: StateWithCities) => s.name.toUpperCase() === stateCode.toUpperCase()
);
if (!stateData || stateData.cities.length === 0) {
console.log(`[StoreDiscoveryHTTP] No cities found for ${stateCode}, skipping`);
continue;
}
console.log(`[StoreDiscoveryHTTP] Discovering ${stateData.cities.length} cities in ${stateCode}...`);
await ctx.heartbeat();
// Accumulate raw store data for this state
const stateRawStores: any[] = [];
const stateCityData: { city: string; stores: any[] }[] = [];
// Fetch stores for each city in this state
for (const city of stateData.cities) {
try {
const cityResult = await page.evaluate(async (
cityName: string,
stateCodeParam: string,
hash: string
) => {
const logs: string[] = [];
const allDispensaries: any[] = [];
let page = 0;
const perPage = 200;
try {
while (page < 5) { // Max 5 pages per city
const variables = {
dispensaryFilter: {
activeOnly: true,
city: cityName,
state: stateCodeParam,
},
page,
perPage,
};
const extensions = {
persistedQuery: { version: 1, sha256Hash: hash },
};
const qs = new URLSearchParams({
operationName: 'ConsumerDispensaries',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
},
credentials: 'include',
});
if (!response.ok) {
logs.push(`${cityName}: HTTP ${response.status}`);
break;
}
const json = await response.json();
const dispensaries = json?.data?.filteredDispensaries || [];
if (dispensaries.length === 0) {
break;
}
// Filter to ensure correct state
const stateFiltered = dispensaries.filter((d: any) =>
d.location?.state?.toUpperCase() === stateCodeParam.toUpperCase()
);
allDispensaries.push(...stateFiltered);
if (dispensaries.length < perPage) {
break;
}
page++;
// Small delay between pages
await new Promise(r => setTimeout(r, 100));
}
logs.push(`${cityName}: ${allDispensaries.length} stores`);
} catch (err: any) {
logs.push(`${cityName}: Error - ${err.message}`);
}
return { dispensaries: allDispensaries, logs };
}, city, stateCode, CONSUMER_DISPENSARIES_HASH);
cityResult.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
// Accumulate raw store data
stateRawStores.push(...cityResult.dispensaries);
stateCityData.push({ city, stores: cityResult.dispensaries });
// Upsert each discovered location
for (const disp of cityResult.dispensaries) {
try {
const location = normalizeLocation(disp);
if (!location.id) {
continue; // Skip locations without platform ID
}
const result = await upsertLocation(pool, location as any, null);
if (result) {
totalUpserted++;
if (result.isNew) {
totalDiscovered++;
}
}
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Upsert error for ${disp.name}:`, err.message);
}
}
// Small delay between cities to avoid rate limiting
await new Promise(r => setTimeout(r, 300));
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Error fetching ${city}, ${stateCode}:`, err.message);
}
}
// Heartbeat after each state
await ctx.heartbeat();
// ============================================================
// STEP 5b: Save raw store payload for this state
// ============================================================
if (stateRawStores.length > 0) {
try {
const rawPayload = {
stateCode,
platform,
fetchedAt: new Date().toISOString(),
storeCount: stateRawStores.length,
citiesProcessed: stateCityData.length,
cities: stateCityData,
stores: stateRawStores,
};
const payloadResult = await saveDiscoveryPayload(pool, stateCode, rawPayload, stateRawStores.length);
console.log(`[StoreDiscoveryHTTP] Saved raw payload for ${stateCode}: ${stateRawStores.length} stores (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`);
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Failed to save payload for ${stateCode}:`, err.message);
}
}
// Auto-promote valid locations for this state
try {
const promotionResult = await promoteDiscoveredLocations(stateCode);
const promoted = promotionResult.created + promotionResult.updated;
if (promoted > 0) {
console.log(`[StoreDiscoveryHTTP] Promoted ${promoted} locations in ${stateCode} (${promotionResult.created} new, ${promotionResult.updated} updated)`);
// newDispensaryIds is returned but not in typed interface
const newIds = (promotionResult as any).newDispensaryIds || [];
allNewStoreIds.push(...newIds);
}
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Promotion error for ${stateCode}:`, err.message);
}
}
await browser.close();
browser = null;
console.log(`[StoreDiscoveryHTTP] Complete: ${totalDiscovered} new, ${totalUpserted} upserted, ${allNewStoreIds.length} promoted`);
return {
success: true,
storesDiscovered: totalDiscovered,
storesUpserted: totalUpserted,
statesProcessed: stateCodesToDiscover.length,
newStoreIds: allNewStoreIds,
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[StoreDiscoveryHTTP] Error:`, errorMessage);
return {
success: false,
error: errorMessage,
newStoreIds: [],
};
} finally {
if (browser) {
await browser.close().catch(() => {});
}
}
}
/**
* Normalize a raw dispensary response to our DiscoveredLocation format
*/
function normalizeLocation(raw: any): DiscoveredLocation {
const loc = raw.location || {};
const coords = loc.geometry?.coordinates || [];
return {
id: raw.id || raw._id || '',
name: raw.name || '',
slug: raw.slug || raw.cName || '',
cName: raw.cName || raw.slug || '',
address: raw.address || loc.ln1 || '',
city: raw.city || loc.city || '',
state: raw.state || loc.state || '',
zip: raw.zip || loc.zipcode || loc.zip || '',
latitude: coords[1] || raw.latitude,
longitude: coords[0] || raw.longitude,
timezone: raw.timezone || '',
offerPickup: raw.offerPickup ?? raw.storeSettings?.offerPickup ?? true,
offerDelivery: raw.offerDelivery ?? raw.storeSettings?.offerDelivery ?? false,
isRecreational: raw.isRecreational ?? raw.recDispensary ?? true,
isMedical: raw.isMedical ?? raw.medicalDispensary ?? true,
phone: raw.phone || '',
email: raw.email || '',
website: raw.embedBackUrl || '',
description: raw.description || '',
logoImage: raw.logoImage || '',
bannerImage: raw.bannerImage || '',
chainSlug: raw.chain || '',
enterpriseId: raw.retailer?.enterpriseId || '',
retailType: raw.retailType || '',
status: raw.status || '',
location: loc,
};
}

View File

@@ -17,8 +17,7 @@ export {
export { TaskWorker, TaskContext, TaskResult } from './task-worker';
export {
handleProductDiscoveryCurl,
handleProductDiscoveryHttp,
handleProductDiscovery,
handleProductRefresh,
handleStoreDiscovery,
handleEntryPointDiscovery,

View File

@@ -6,15 +6,12 @@
* task-service.ts and routes/tasks.ts.
*
* State is in-memory and resets on server restart.
* By default, the pool is OPEN - workers start claiming tasks immediately.
* Admin can pause via API endpoint if needed.
*
* Note: Each process (backend, worker) has its own copy of this state.
* The /pool/pause and /pool/resume endpoints only affect the backend process.
* Workers always start with pool open.
* By default, the pool is PAUSED (closed) - admin must explicitly start it.
* This prevents workers from immediately grabbing tasks on deploy before
* the system is ready.
*/
let taskPoolPaused = false;
let taskPoolPaused = true;
export function isTaskPoolPaused(): boolean {
return taskPoolPaused;

View File

@@ -73,7 +73,6 @@ export interface CreateTaskParams {
dispensary_id?: number;
platform?: string;
priority?: number;
method?: 'curl' | 'http'; // Transport method: curl=axios/proxy, http=Puppeteer/browser
scheduled_for?: Date;
payload?: Record<string, unknown>; // Per TASK_WORKFLOW_2024-12-10.md: For task chaining data
}
@@ -107,15 +106,14 @@ class TaskService {
*/
async createTask(params: CreateTaskParams): Promise<WorkerTask> {
const result = await pool.query(
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for, payload)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *`,
[
params.role,
params.dispensary_id ?? null,
params.platform ?? null,
params.priority ?? 0,
params.method ?? null, // null = any worker can pick up, 'http' = http-capable workers only, 'curl' = curl workers only
params.scheduled_for ?? null,
params.payload ? JSON.stringify(params.payload) : null,
]
@@ -130,8 +128,8 @@ class TaskService {
if (tasks.length === 0) return 0;
const values = tasks.map((t, i) => {
const base = i * 6;
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6})`;
const base = i * 5;
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5})`;
});
const params = tasks.flatMap((t) => [
@@ -139,12 +137,11 @@ class TaskService {
t.dispensary_id ?? null,
t.platform ?? null,
t.priority ?? 0,
t.method ?? null,
t.scheduled_for ?? null,
]);
const result = await pool.query(
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for)
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for)
VALUES ${values.join(', ')}
ON CONFLICT DO NOTHING`,
params
@@ -476,17 +473,15 @@ class TaskService {
case 'store_discovery': {
// Per TASK_WORKFLOW_2024-12-10.md: New stores discovered -> create product_discovery tasks
// Skip entry_point_discovery since platform_dispensary_id is set during promotion
// All product_discovery tasks use HTTP transport (Puppeteer/browser)
const newStoreIds = (completedTask.result as { newStoreIds?: number[] })?.newStoreIds;
if (newStoreIds && newStoreIds.length > 0) {
console.log(`[TaskService] Chaining ${newStoreIds.length} product_discovery tasks for new stores (HTTP transport)`);
console.log(`[TaskService] Chaining ${newStoreIds.length} product_discovery tasks for new stores`);
for (const storeId of newStoreIds) {
await this.createTask({
role: 'product_discovery',
dispensary_id: storeId,
platform: completedTask.platform ?? undefined,
priority: 10, // High priority for new stores
method: 'http', // Force HTTP transport for browser-based scraping
});
}
}
@@ -503,7 +498,6 @@ class TaskService {
dispensary_id: completedTask.dispensary_id,
platform: completedTask.platform ?? undefined,
priority: 10,
method: 'http', // Force HTTP transport
});
}
break;
@@ -528,7 +522,6 @@ class TaskService {
/**
* Create store discovery task for a platform/state
* Uses HTTP transport (Puppeteer/browser) by default
*/
async createStoreDiscoveryTask(
platform: string,
@@ -539,13 +532,11 @@ class TaskService {
role: 'store_discovery',
platform,
priority,
method: 'http', // Force HTTP transport
});
}
/**
* Create entry point discovery task for a specific store
* @deprecated Entry point resolution now happens during store promotion
*/
async createEntryPointTask(
dispensaryId: number,
@@ -557,13 +548,11 @@ class TaskService {
dispensary_id: dispensaryId,
platform,
priority,
method: 'http', // Force HTTP transport
});
}
/**
* Create product discovery task for a specific store
* Uses HTTP transport (Puppeteer/browser) by default
*/
async createProductDiscoveryTask(
dispensaryId: number,
@@ -575,7 +564,6 @@ class TaskService {
dispensary_id: dispensaryId,
platform,
priority,
method: 'http', // Force HTTP transport
});
}
@@ -653,248 +641,6 @@ class TaskService {
return (result.rows[0] as { completed_at: Date | null })?.completed_at ?? null;
}
/**
* Create multiple tasks with staggered start times.
*
* STAGGERED TASK WORKFLOW:
* =======================
* This prevents resource contention and proxy assignment lag when creating
* many tasks at once. Each task gets a scheduled_for timestamp offset from
* the previous task.
*
* Workflow:
* 1. Task is created with scheduled_for = NOW() + (index * staggerSeconds)
* 2. Worker claims task only when scheduled_for <= NOW()
* 3. Worker runs preflight check on EVERY task claim
* 4. If preflight passes, worker executes task
* 5. If preflight fails, task is released back to pending for another worker
* 6. Worker finishes task, polls for next available task
* 7. Repeat - preflight runs again on next task claim
*
* Benefits:
* - Prevents all 8 workers from hitting proxies simultaneously
* - Reduces API rate limiting / 403 errors
* - Spreads resource usage over time
* - Each task still runs preflight, ensuring proxy health
*
* @param dispensaryIds - Array of dispensary IDs to create tasks for
* @param role - Task role (e.g., 'product_refresh', 'product_discovery')
* @param staggerSeconds - Seconds between each task's scheduled_for time (default: 15)
* @param platform - Platform identifier (default: 'dutchie')
* @param method - Transport method: 'curl' or 'http' (default: null for any)
* @returns Number of tasks created
*/
async createStaggeredTasks(
dispensaryIds: number[],
role: TaskRole,
staggerSeconds: number = 15,
platform: string = 'dutchie',
method: 'curl' | 'http' | null = null
): Promise<{ created: number; taskIds: number[] }> {
if (dispensaryIds.length === 0) {
return { created: 0, taskIds: [] };
}
// Use a single INSERT with generate_series for efficiency
const result = await pool.query(`
WITH task_data AS (
SELECT
unnest($1::int[]) as dispensary_id,
generate_series(0, array_length($1::int[], 1) - 1) as idx
)
INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status)
SELECT
$2::varchar as role,
td.dispensary_id,
$3::varchar as platform,
$4::varchar as method,
NOW() + (td.idx * $5::int * INTERVAL '1 second') as scheduled_for,
'pending' as status
FROM task_data td
ON CONFLICT DO NOTHING
RETURNING id
`, [dispensaryIds, role, platform, method, staggerSeconds]);
const taskIds = result.rows.map((r: { id: number }) => r.id);
console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks (${staggerSeconds}s apart)`);
return { created: taskIds.length, taskIds };
}
/**
* Create a batch of AZ store tasks with automatic distribution.
*
* This is a convenience method for creating tasks for Arizona stores with:
* - Automatic staggering to prevent resource contention
* - Even distribution across both refresh and discovery roles
*
* @param totalTasks - Total number of tasks to create
* @param staggerSeconds - Seconds between each task's start time
* @param splitRoles - If true, split between product_refresh and product_discovery
* @returns Summary of created tasks
*/
async createAZStoreTasks(
totalTasks: number = 24,
staggerSeconds: number = 15,
splitRoles: boolean = true
): Promise<{
total: number;
product_refresh: number;
product_discovery: number;
taskIds: number[];
}> {
// Get AZ stores with platform_id and menu_url
const storesResult = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE s.code = 'AZ'
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND d.menu_url IS NOT NULL
ORDER BY d.id
`);
const storeIds = storesResult.rows.map((r: { id: number }) => r.id);
if (storeIds.length === 0) {
console.log('[TaskService] No AZ stores found with platform_id and menu_url');
return { total: 0, product_refresh: 0, product_discovery: 0, taskIds: [] };
}
// Limit tasks to available stores
const maxTasks = Math.min(totalTasks, storeIds.length * 2); // 2x for both roles
const allTaskIds: number[] = [];
if (splitRoles) {
// Split between refresh and discovery
const tasksPerRole = Math.floor(maxTasks / 2);
const refreshStores = storeIds.slice(0, tasksPerRole);
const discoveryStores = storeIds.slice(0, tasksPerRole);
// Create refresh tasks first
const refreshResult = await this.createStaggeredTasks(
refreshStores,
'product_refresh',
staggerSeconds,
'dutchie'
);
allTaskIds.push(...refreshResult.taskIds);
// Create discovery tasks starting after refresh tasks are scheduled
const discoveryStartOffset = tasksPerRole * staggerSeconds;
const discoveryResult = await pool.query(`
WITH task_data AS (
SELECT
unnest($1::int[]) as dispensary_id,
generate_series(0, array_length($1::int[], 1) - 1) as idx
)
INSERT INTO worker_tasks (role, dispensary_id, platform, scheduled_for, status)
SELECT
'product_discovery'::varchar as role,
td.dispensary_id,
'dutchie'::varchar as platform,
NOW() + ($2::int * INTERVAL '1 second') + (td.idx * $3::int * INTERVAL '1 second') as scheduled_for,
'pending' as status
FROM task_data td
ON CONFLICT DO NOTHING
RETURNING id
`, [discoveryStores, discoveryStartOffset, staggerSeconds]);
allTaskIds.push(...discoveryResult.rows.map((r: { id: number }) => r.id));
return {
total: allTaskIds.length,
product_refresh: refreshResult.taskIds.length,
product_discovery: discoveryResult.rowCount ?? 0,
taskIds: allTaskIds
};
}
// Single role mode - all product_discovery
const result = await this.createStaggeredTasks(
storeIds.slice(0, totalTasks),
'product_discovery',
staggerSeconds,
'dutchie'
);
return {
total: result.taskIds.length,
product_refresh: 0,
product_discovery: result.taskIds.length,
taskIds: result.taskIds
};
}
/**
* Cleanup stale tasks that are stuck in 'claimed' or 'running' status.
*
* This handles the case where workers crash/restart and leave tasks in-flight.
* These stale tasks block the queue because the claim query excludes dispensary_ids
* that have active tasks.
*
* Called automatically on worker startup and can be called periodically.
*
* @param staleMinutes - Tasks older than this (based on last_heartbeat_at or claimed_at) are reset
* @returns Object with cleanup stats
*/
async cleanupStaleTasks(staleMinutes: number = 30): Promise<{
cleaned: number;
byStatus: { claimed: number; running: number };
byRole: Record<string, number>;
}> {
// First, get stats on what we're about to clean
const statsResult = await pool.query(`
SELECT status, role, COUNT(*)::int as count
FROM worker_tasks
WHERE status IN ('claimed', 'running')
AND COALESCE(last_heartbeat_at, claimed_at, created_at) < NOW() - INTERVAL '1 minute' * $1
GROUP BY status, role
`, [staleMinutes]);
const byStatus = { claimed: 0, running: 0 };
const byRole: Record<string, number> = {};
for (const row of statsResult.rows) {
const { status, role, count } = row as { status: string; role: string; count: number };
if (status === 'claimed') byStatus.claimed += count;
if (status === 'running') byStatus.running += count;
byRole[role] = (byRole[role] || 0) + count;
}
const totalStale = byStatus.claimed + byStatus.running;
if (totalStale === 0) {
return { cleaned: 0, byStatus, byRole };
}
// Reset stale tasks to pending
const result = await pool.query(`
UPDATE worker_tasks
SET
status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
last_heartbeat_at = NULL,
error_message = CONCAT(COALESCE(error_message, ''), ' [Auto-reset: stale after ', $1, ' min]'),
updated_at = NOW()
WHERE status IN ('claimed', 'running')
AND COALESCE(last_heartbeat_at, claimed_at, created_at) < NOW() - INTERVAL '1 minute' * $1
`, [staleMinutes]);
const cleaned = result.rowCount ?? 0;
if (cleaned > 0) {
console.log(`[TaskService] Cleaned up ${cleaned} stale tasks (claimed: ${byStatus.claimed}, running: ${byStatus.running})`);
console.log(`[TaskService] Stale tasks by role: ${Object.entries(byRole).map(([r, c]) => `${r}:${c}`).join(', ')}`);
}
return { cleaned, byStatus, byRole };
}
/**
* Calculate workers needed to complete tasks within SLA
*/

View File

@@ -11,17 +11,10 @@
* - Workers report heartbeats to worker_registry
* - Workers are ROLE-AGNOSTIC by default (can handle any task type)
*
* Stealth & Anti-Detection (LAZY INITIALIZATION):
* Workers start IMMEDIATELY without waiting for proxies.
* Stealth systems (proxies, fingerprints, preflights) are initialized
* on first task claim, not at worker startup.
* Stealth & Anti-Detection:
* PROXIES ARE REQUIRED - workers will fail to start if no proxies available.
*
* This allows workers to:
* - Register and send heartbeats immediately
* - Wait in main loop without blocking on proxy availability
* - Initialize proxies/preflights only when tasks are actually available
*
* On first task claim attempt, workers initialize the CrawlRotator which provides:
* On startup, workers initialize the CrawlRotator which provides:
* - Proxy rotation: Loads proxies from `proxies` table, ALL requests use proxy
* - User-Agent rotation: Cycles through realistic browser fingerprints
* - Fingerprint rotation: Changes browser profile on blocks
@@ -41,16 +34,11 @@
*
* Environment:
* WORKER_ROLE - Which task role to process (optional, null = any task)
* POD_NAME - K8s StatefulSet pod name (PRIMARY - use this for persistent identity)
* WORKER_ID - Custom worker ID (fallback if POD_NAME not set)
* WORKER_ID - Optional custom worker ID (auto-generated if not provided)
* POD_NAME - Kubernetes pod name (optional)
* POLL_INTERVAL_MS - How often to check for tasks (default: 5000)
* HEARTBEAT_INTERVAL_MS - How often to update heartbeat (default: 30000)
* API_BASE_URL - Backend API URL for registration (default: http://localhost:3010)
*
* Worker Identity:
* Workers use POD_NAME as their worker_id for persistent identity across restarts.
* In K8s StatefulSet, POD_NAME = "scraper-worker-0" through "scraper-worker-7".
* This ensures workers re-register with the same ID instead of creating new entries.
*/
import { Pool } from 'pg';
@@ -69,13 +57,10 @@ import { runPuppeteerPreflightWithRetry, PuppeteerPreflightResult } from '../ser
// Task handlers by role
// Per TASK_WORKFLOW_2024-12-10.md: payload_fetch and product_refresh are now separate
// Dual-transport: curl vs http (browser-based) handlers
import { handlePayloadFetch } from './handlers/payload-fetch-curl';
import { handlePayloadFetch } from './handlers/payload-fetch';
import { handleProductRefresh } from './handlers/product-refresh';
import { handleProductDiscovery } from './handlers/product-discovery-curl';
import { handleProductDiscoveryHttp } from './handlers/product-discovery-http';
import { handleProductDiscovery } from './handlers/product-discovery';
import { handleStoreDiscovery } from './handlers/store-discovery';
import { handleStoreDiscoveryHttp } from './handlers/store-discovery-http';
import { handleEntryPointDiscovery } from './handlers/entry-point-discovery';
import { handleAnalyticsRefresh } from './handlers/analytics-refresh';
import { handleWhoami } from './handlers/whoami';
@@ -98,7 +83,7 @@ const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010';
// Maximum number of tasks this worker will run concurrently
// Tune based on workload: I/O-bound tasks benefit from higher concurrency
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '15');
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
// When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
// Default 85% - gives headroom before OOM
@@ -147,47 +132,17 @@ type TaskHandler = (ctx: TaskContext) => Promise<TaskResult>;
// Per TASK_WORKFLOW_2024-12-10.md: Handler registry
// payload_fetch: Fetches from Dutchie API, saves to disk
// product_refresh: Reads local payload, normalizes, upserts to DB
// product_discovery: Main handler for product crawling (has curl and http variants)
// product_discovery: Main handler for product crawling
const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
payload_fetch: handlePayloadFetch, // API fetch -> disk (curl)
payload_fetch: handlePayloadFetch, // API fetch -> disk
product_refresh: handleProductRefresh, // disk -> DB
product_discovery: handleProductDiscovery, // Default: curl (see getHandlerForTask for http override)
product_discovery: handleProductDiscovery,
store_discovery: handleStoreDiscovery,
entry_point_discovery: handleEntryPointDiscovery,
analytics_refresh: handleAnalyticsRefresh,
whoami: handleWhoami, // Tests proxy + anti-detect
};
/**
* Get the appropriate handler for a task, considering both role and method.
*
* Dual-transport handlers:
* - product_discovery: curl (axios) or http (Puppeteer)
* - store_discovery: curl (axios) or http (Puppeteer)
*
* Default method is 'http' since all GraphQL queries should use browser transport
* for better TLS fingerprinting and session-based proxy compatibility.
*/
function getHandlerForTask(task: WorkerTask): TaskHandler | undefined {
const role = task.role as TaskRole;
const method = task.method || 'http'; // Default to HTTP for all GraphQL tasks
// product_discovery: dual-transport support
if (role === 'product_discovery' && method === 'http') {
console.log(`[TaskWorker] Using HTTP handler for product_discovery (method=${method})`);
return handleProductDiscoveryHttp;
}
// store_discovery: dual-transport support
if (role === 'store_discovery' && method === 'http') {
console.log(`[TaskWorker] Using HTTP handler for store_discovery (method=${method})`);
return handleStoreDiscoveryHttp;
}
// Default: use the static handler registry (curl-based)
return TASK_HANDLERS[role];
}
/**
* Resource usage stats reported to the registry and used for backoff decisions.
* These values are included in worker heartbeats and displayed in the UI.
@@ -254,16 +209,6 @@ export class TaskWorker {
private preflightCurlResult: CurlPreflightResult | null = null;
private preflightHttpResult: PuppeteerPreflightResult | null = null;
// ==========================================================================
// LAZY INITIALIZATION FLAGS
// ==========================================================================
// Stealth/proxy initialization is deferred until first task claim.
// Workers register immediately and enter main loop without blocking.
// ==========================================================================
private stealthInitialized: boolean = false;
private preflightsCompleted: boolean = false;
private initializingPromise: Promise<void> | null = null;
constructor(role: TaskRole | null = null, workerId?: string) {
this.pool = getPool();
this.role = role;
@@ -348,9 +293,9 @@ export class TaskWorker {
/**
* Initialize stealth systems (proxy rotation, fingerprints)
* Called LAZILY on first task claim attempt (NOT at worker startup).
* Called once on worker startup before processing any tasks.
*
* IMPORTANT: Proxies are REQUIRED to claim tasks. This method waits until proxies are available.
* IMPORTANT: Proxies are REQUIRED. Workers will wait until proxies are available.
* Workers listen for PostgreSQL NOTIFY 'proxy_added' to wake up immediately when proxies are added.
*/
private async initializeStealth(): Promise<void> {
@@ -490,98 +435,35 @@ export class TaskWorker {
/**
* Report preflight status to worker_registry
* Function signature: update_worker_preflight(worker_id, transport, status, ip, response_ms, error, fingerprint)
*/
private async reportPreflightStatus(): Promise<void> {
try {
// Update worker_registry directly via SQL (more reliable than API)
// CURL preflight - includes IP address
await this.pool.query(`
SELECT update_worker_preflight($1, 'curl', $2, $3, $4, $5, $6)
SELECT update_worker_preflight($1, 'curl', $2, $3, $4)
`, [
this.workerId,
this.preflightCurlPassed ? 'passed' : 'failed',
this.preflightCurlResult?.proxyIp || null,
this.preflightCurlResult?.responseTimeMs || null,
this.preflightCurlResult?.error || null,
null, // No fingerprint for curl
]);
// HTTP preflight - includes IP, fingerprint, and timezone data
const httpFingerprint = this.preflightHttpResult ? {
...this.preflightHttpResult.fingerprint,
detectedTimezone: (this.preflightHttpResult as any).detectedTimezone,
detectedLocation: (this.preflightHttpResult as any).detectedLocation,
productsReturned: this.preflightHttpResult.productsReturned,
botDetection: (this.preflightHttpResult as any).botDetection,
} : null;
await this.pool.query(`
SELECT update_worker_preflight($1, 'http', $2, $3, $4, $5, $6)
SELECT update_worker_preflight($1, 'http', $2, $3, $4)
`, [
this.workerId,
this.preflightHttpPassed ? 'passed' : 'failed',
this.preflightHttpResult?.proxyIp || null,
this.preflightHttpResult?.responseTimeMs || null,
this.preflightHttpResult?.error || null,
httpFingerprint ? JSON.stringify(httpFingerprint) : null,
]);
console.log(`[TaskWorker] Preflight status reported to worker_registry`);
if (this.preflightHttpResult?.proxyIp) {
console.log(`[TaskWorker] HTTP IP: ${this.preflightHttpResult.proxyIp}, Timezone: ${(this.preflightHttpResult as any).detectedTimezone || 'unknown'}`);
}
} catch (err: any) {
// Non-fatal - worker can still function
console.warn(`[TaskWorker] Could not report preflight status: ${err.message}`);
}
}
/**
* Lazy initialization of stealth systems.
* Called BEFORE claiming first task (not at worker startup).
* This allows workers to register and enter main loop immediately.
*
* Returns true if initialization succeeded, false otherwise.
*/
private async ensureStealthInitialized(): Promise<boolean> {
// Already initialized
if (this.stealthInitialized && this.preflightsCompleted) {
return true;
}
// Already initializing (prevent concurrent init attempts)
if (this.initializingPromise) {
await this.initializingPromise;
return this.stealthInitialized && this.preflightsCompleted;
}
console.log(`[TaskWorker] ${this.friendlyName} lazy-initializing stealth systems (first task claim)...`);
this.initializingPromise = (async () => {
try {
// Initialize proxy/fingerprint rotation
await this.initializeStealth();
this.stealthInitialized = true;
// Run dual-transport preflights
await this.runDualPreflights();
this.preflightsCompleted = true;
const preflightMsg = `curl=${this.preflightCurlPassed ? '✓' : '✗'} http=${this.preflightHttpPassed ? '✓' : '✗'}`;
console.log(`[TaskWorker] ${this.friendlyName} stealth ready (${preflightMsg})`);
} catch (err: any) {
console.error(`[TaskWorker] ${this.friendlyName} stealth init failed: ${err.message}`);
this.stealthInitialized = false;
this.preflightsCompleted = false;
}
})();
await this.initializingPromise;
this.initializingPromise = null;
return this.stealthInitialized && this.preflightsCompleted;
}
/**
* Register worker with the registry (get friendly name)
*/
@@ -715,36 +597,25 @@ export class TaskWorker {
/**
* Start the worker loop
*
* Workers start IMMEDIATELY without blocking on proxy/preflight init.
* Stealth systems are lazy-initialized on first task claim.
* This allows workers to register and send heartbeats even when proxies aren't ready.
*/
async start(): Promise<void> {
this.isRunning = true;
// Register with the API to get a friendly name (non-blocking)
// Initialize stealth systems (proxy rotation, fingerprints)
await this.initializeStealth();
// Register with the API to get a friendly name
await this.register();
// Start registry heartbeat immediately
// Run dual-transport preflights
await this.runDualPreflights();
// Start registry heartbeat
this.startRegistryHeartbeat();
// Cleanup stale tasks on startup (only worker-0 does this to avoid races)
// This handles tasks left in 'claimed'/'running' status when workers restart
if (this.workerId.endsWith('-0') || this.workerId === 'scraper-worker-0') {
try {
console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`);
const cleanupResult = await taskService.cleanupStaleTasks(30); // 30 minute threshold
if (cleanupResult.cleaned > 0) {
console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`);
}
} catch (err: any) {
console.error(`[TaskWorker] Stale task cleanup error:`, err.message);
}
}
const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)';
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (stealth=lazy, max ${this.maxConcurrentTasks} concurrent tasks)`);
const preflightMsg = `curl=${this.preflightCurlPassed ? '✓' : '✗'} http=${this.preflightHttpPassed ? '✓' : '✗'}`;
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (${preflightMsg}, max ${this.maxConcurrentTasks} concurrent tasks)`);
while (this.isRunning) {
try {
@@ -787,12 +658,6 @@ export class TaskWorker {
this.backoffReason = null;
}
// Periodically reload proxies to pick up changes (new proxies, disabled proxies)
// This runs every ~60 seconds (controlled by setProxyReloadInterval)
if (this.stealthInitialized) {
await this.crawlRotator.reloadIfStale();
}
// Check for decommission signal
const shouldDecommission = await this.checkDecommission();
if (shouldDecommission) {
@@ -804,20 +669,6 @@ export class TaskWorker {
// Try to claim more tasks if we have capacity
if (this.canAcceptMoreTasks()) {
// =================================================================
// LAZY INITIALIZATION - Initialize stealth on first task claim
// Workers start immediately and init proxies only when needed
// =================================================================
if (!this.stealthInitialized) {
const initSuccess = await this.ensureStealthInitialized();
if (!initSuccess) {
// Init failed - wait and retry next loop
console.log(`[TaskWorker] ${this.friendlyName} stealth init failed, waiting before retry...`);
await this.sleep(30000);
return;
}
}
// Pass preflight capabilities to only claim compatible tasks
const task = await taskService.claimTask(
this.role,
@@ -830,32 +681,13 @@ export class TaskWorker {
console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`);
// =================================================================
// PREFLIGHT CHECK - Use stored preflight results based on task method
// We already ran dual-transport preflights at startup, so just verify
// the correct preflight passed for this task's required method.
// PREFLIGHT CHECK - CRITICAL: Worker MUST pass before task execution
// Verifies: 1) Proxy available 2) Proxy connected 3) Anti-detect ready
// =================================================================
const taskMethod = task.method || 'http'; // Default to http if not specified
let preflightPassed = false;
let preflightMsg = '';
if (taskMethod === 'http' && this.preflightHttpPassed) {
preflightPassed = true;
preflightMsg = `HTTP preflight passed (IP: ${this.preflightHttpResult?.proxyIp || 'unknown'})`;
} else if (taskMethod === 'curl' && this.preflightCurlPassed) {
preflightPassed = true;
preflightMsg = `CURL preflight passed (IP: ${this.preflightCurlResult?.proxyIp || 'unknown'})`;
} else if (!task.method && (this.preflightHttpPassed || this.preflightCurlPassed)) {
// No method preference - either transport works
preflightPassed = true;
preflightMsg = this.preflightHttpPassed ? 'HTTP preflight passed' : 'CURL preflight passed';
}
if (!preflightPassed) {
const errorMsg = taskMethod === 'http'
? 'HTTP preflight not passed - cannot execute http tasks'
: 'CURL preflight not passed - cannot execute curl tasks';
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${errorMsg}`);
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without preflight`);
const preflight = await this.crawlRotator.preflight();
if (!preflight.passed) {
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${preflight.error}`);
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without proxy/anti-detect`);
// Release task back to pending so another worker can pick it up
await taskService.releaseTask(task.id);
@@ -865,7 +697,7 @@ export class TaskWorker {
return;
}
console.log(`[TaskWorker] ${this.friendlyName} preflight verified for task ${task.id}: ${preflightMsg}`);
console.log(`[TaskWorker] ${this.friendlyName} preflight PASSED for task ${task.id} (proxy: ${preflight.proxyIp}, ${preflight.responseTimeMs}ms)`);
this.activeTasks.set(task.id, task);
@@ -909,8 +741,8 @@ export class TaskWorker {
// Mark as running
await taskService.startTask(task.id);
// Get handler for this role (considers method for dual-transport)
const handler = getHandlerForTask(task);
// Get handler for this role
const handler = TASK_HANDLERS[task.role];
if (!handler) {
throw new Error(`No handler registered for role: ${task.role}`);
}
@@ -1072,10 +904,7 @@ async function main(): Promise<void> {
process.exit(1);
}
// Use POD_NAME for persistent identity in K8s StatefulSet
// This ensures workers keep the same ID across restarts
// Falls back to WORKER_ID, then generates UUID if neither is set
const workerId = process.env.POD_NAME || process.env.WORKER_ID;
const workerId = process.env.WORKER_ID;
// Pass null for role-agnostic, or the specific role
const worker = new TaskWorker(role || null, workerId);

View File

@@ -366,141 +366,6 @@ export async function listPayloadMetadata(
}));
}
/**
* Result from saving a discovery payload
*/
export interface SaveDiscoveryPayloadResult {
id: number;
storagePath: string;
sizeBytes: number;
sizeBytesRaw: number;
checksum: string;
}
/**
* Generate storage path for a discovery payload
*
* Format: /storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
*/
function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): string {
const year = timestamp.getFullYear();
const month = String(timestamp.getMonth() + 1).padStart(2, '0');
const day = String(timestamp.getDate()).padStart(2, '0');
const ts = timestamp.getTime();
return path.join(
PAYLOAD_BASE_PATH,
'discovery',
String(year),
month,
day,
`state_${stateCode.toLowerCase()}_${ts}.json.gz`
);
}
/**
* Save a raw store discovery payload to filesystem and record metadata in DB
*
* @param pool - Database connection pool
* @param stateCode - State code (e.g., 'AZ', 'MI')
* @param payload - Raw JSON payload from discovery GraphQL
* @param storeCount - Number of stores in payload
* @returns SaveDiscoveryPayloadResult with file info and DB record ID
*/
export async function saveDiscoveryPayload(
pool: Pool,
stateCode: string,
payload: any,
storeCount: number = 0
): Promise<SaveDiscoveryPayloadResult> {
const timestamp = new Date();
const storagePath = generateDiscoveryStoragePath(stateCode, timestamp);
// Serialize and compress
const jsonStr = JSON.stringify(payload);
const rawSize = Buffer.byteLength(jsonStr, 'utf8');
const compressed = await gzip(Buffer.from(jsonStr, 'utf8'));
const compressedSize = compressed.length;
const checksum = calculateChecksum(compressed);
// Write to filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
// Record metadata in DB
const result = await pool.query(`
INSERT INTO raw_crawl_payloads (
payload_type,
state_code,
storage_path,
store_count,
size_bytes,
size_bytes_raw,
fetched_at,
checksum_sha256
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id
`, [
'store_discovery',
stateCode.toUpperCase(),
storagePath,
storeCount,
compressedSize,
rawSize,
timestamp,
checksum
]);
console.log(`[PayloadStorage] Saved discovery payload for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`);
return {
id: result.rows[0].id,
storagePath,
sizeBytes: compressedSize,
sizeBytesRaw: rawSize,
checksum
};
}
/**
* Get the latest discovery payload for a state
*
* @param pool - Database connection pool
* @param stateCode - State code (e.g., 'AZ', 'MI')
* @returns Parsed payload and metadata, or null if none exists
*/
export async function getLatestDiscoveryPayload(
pool: Pool,
stateCode: string
): Promise<{ payload: any; metadata: any } | null> {
const result = await pool.query(`
SELECT id, state_code, storage_path, store_count, fetched_at
FROM raw_crawl_payloads
WHERE payload_type = 'store_discovery'
AND state_code = $1
ORDER BY fetched_at DESC
LIMIT 1
`, [stateCode.toUpperCase()]);
if (result.rows.length === 0) {
return null;
}
const row = result.rows[0];
const payload = await loadPayloadFromPath(row.storage_path);
return {
payload,
metadata: {
id: row.id,
stateCode: row.state_code,
storeCount: row.store_count,
fetchedAt: row.fetched_at,
storagePath: row.storage_path
}
};
}
/**
* Delete old payloads (for retention policy)
*

View File

@@ -2666,25 +2666,13 @@ class ApiClient {
// Dashboard methods
getMarketDashboard = this.getMarketsDashboard.bind(this);
// ============================================================
// LEGACY SCHEDULE METHODS (DEPRECATED 2025-12-12)
// These use /api/markets/admin/schedules which queries job_schedules
// Use getTaskSchedules(), updateTaskSchedule(), etc. instead
// (defined below, use /api/tasks/schedules which queries task_schedules)
// ============================================================
/** @deprecated Use getTaskSchedules() - queries task_schedules table */
// Schedule methods (no conflicts)
getSchedules = this.getCrawlSchedules.bind(this);
/** @deprecated Use getTaskSchedule() - queries task_schedules table */
getSchedule = this.getDutchieAZSchedule.bind(this);
/** @deprecated Use createTaskSchedule() - queries task_schedules table */
createSchedule = this.createDutchieAZSchedule.bind(this);
/** @deprecated Use updateTaskSchedule() - queries task_schedules table */
updateSchedule = this.updateDutchieAZSchedule.bind(this);
/** @deprecated Use deleteTaskSchedule() - queries task_schedules table */
deleteSchedule = this.deleteDutchieAZSchedule.bind(this);
/** @deprecated Use runTaskScheduleNow() - queries task_schedules table */
triggerSchedule = this.triggerDutchieAZSchedule.bind(this);
/** @deprecated - job_schedules init not needed for task_schedules */
initSchedules = this.initDutchieAZSchedules.bind(this);
getScheduleLogs = this.getCrawlScheduleLogs.bind(this);
getRunLogs = this.getDutchieAZRunLogs.bind(this);
@@ -2988,101 +2976,6 @@ class ApiClient {
{ method: 'POST', body: JSON.stringify({ replicas }) }
);
}
// ==========================================
// Task Schedules API (recurring task definitions)
// ==========================================
async getTaskSchedules(enabledOnly?: boolean) {
const qs = enabledOnly ? '?enabled=true' : '';
return this.request<{ schedules: TaskSchedule[] }>(`/api/tasks/schedules${qs}`);
}
async getTaskSchedule(id: number) {
return this.request<TaskSchedule>(`/api/tasks/schedules/${id}`);
}
async createTaskSchedule(data: {
name: string;
role: string;
description?: string;
enabled?: boolean;
interval_hours: number;
priority?: number;
state_code?: string;
platform?: string;
}) {
return this.request<TaskSchedule>('/api/tasks/schedules', {
method: 'POST',
body: JSON.stringify(data),
});
}
async updateTaskSchedule(id: number, data: Partial<{
name: string;
role: string;
description: string;
enabled: boolean;
interval_hours: number;
priority: number;
state_code: string;
platform: string;
}>) {
return this.request<TaskSchedule>(`/api/tasks/schedules/${id}`, {
method: 'PUT',
body: JSON.stringify(data),
});
}
async deleteTaskSchedule(id: number) {
return this.request<{ success: boolean; message: string }>(`/api/tasks/schedules/${id}`, {
method: 'DELETE',
});
}
async deleteTaskSchedulesBulk(ids?: number[], all?: boolean) {
return this.request<{ success: boolean; deleted_count: number; deleted: { id: number; name: string }[]; message: string }>(
'/api/tasks/schedules',
{
method: 'DELETE',
body: JSON.stringify({ ids, all }),
}
);
}
async runTaskScheduleNow(id: number) {
return this.request<{ success: boolean; message: string; task: any }>(`/api/tasks/schedules/${id}/run-now`, {
method: 'POST',
});
}
async toggleTaskSchedule(id: number) {
return this.request<{ success: boolean; schedule: { id: number; name: string; enabled: boolean }; message: string }>(
`/api/tasks/schedules/${id}/toggle`,
{ method: 'POST' }
);
}
}
// Type for task schedules
export interface TaskSchedule {
id: number;
name: string;
role: string;
description: string | null;
enabled: boolean;
interval_hours: number;
priority: number;
state_code: string | null;
platform: string | null;
method: 'curl' | 'http' | null;
is_immutable: boolean;
last_run_at: string | null;
next_run_at: string | null;
last_task_count: number;
last_error: string | null;
created_at: string;
updated_at: string;
}
export const api = new ApiClient(API_URL);

View File

@@ -1,18 +1,3 @@
/**
* @deprecated 2025-12-12
*
* This page used the legacy job_schedules table which has been deprecated.
* All schedule management has been consolidated into task_schedules and
* is now managed via the /admin/tasks page (TasksDashboard.tsx).
*
* The job_schedules table entries have been disabled and marked deprecated.
* This page is no longer in the navigation menu but kept for reference.
*
* Migration details:
* - job_schedules used base_interval_minutes + jitter_minutes
* - task_schedules uses interval_hours (simpler model)
* - All CRUD operations now via /api/tasks/schedules endpoints
*/
import { useEffect, useState } from 'react';
import { Layout } from '../components/Layout';
import { api } from '../lib/api';

View File

@@ -14,11 +14,27 @@ export function Settings() {
loadSettings();
}, []);
// AI-related settings are managed in /ai-settings, filter them out here
const AI_SETTING_KEYS = [
'ai_model',
'ai_provider',
'anthropic_api_key',
'openai_api_key',
'anthropic_model',
'openai_model',
'anthropic_enabled',
'openai_enabled',
];
const loadSettings = async () => {
setLoading(true);
try {
const data = await api.getSettings();
setSettings(data.settings);
// Filter out AI settings - those are managed in /ai-settings
const filteredSettings = (data.settings || []).filter(
(s: any) => !AI_SETTING_KEYS.includes(s.key)
);
setSettings(filteredSettings);
} catch (error) {
console.error('Failed to load settings:', error);
} finally {

View File

@@ -1,5 +1,5 @@
import { useState, useEffect } from 'react';
import { api, TaskSchedule } from '../lib/api';
import { api } from '../lib/api';
import { Layout } from '../components/Layout';
import {
ListChecks,
@@ -21,12 +21,6 @@ import {
X,
Calendar,
Trash2,
Edit2,
Play,
Pause,
Timer,
Lock,
Globe,
} from 'lucide-react';
interface Task {
@@ -387,264 +381,10 @@ const ROLES = [
'store_discovery',
'entry_point_discovery',
'product_discovery',
'product_refresh',
'analytics_refresh',
];
// ============================================================
// Schedule Edit Modal
// ============================================================
interface ScheduleEditModalProps {
isOpen: boolean;
schedule: TaskSchedule | null;
onClose: () => void;
onSave: () => void;
}
function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditModalProps) {
const [name, setName] = useState('');
const [role, setRole] = useState('product_refresh');
const [description, setDescription] = useState('');
const [enabled, setEnabled] = useState(true);
const [intervalHours, setIntervalHours] = useState(4);
const [priority, setPriority] = useState(0);
const [stateCode, setStateCode] = useState('');
const [platform, setPlatform] = useState('dutchie');
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
const isNew = !schedule;
const isImmutable = schedule?.is_immutable ?? false;
useEffect(() => {
if (schedule) {
setName(schedule.name);
setRole(schedule.role);
setDescription(schedule.description || '');
setEnabled(schedule.enabled);
setIntervalHours(schedule.interval_hours);
setPriority(schedule.priority);
setStateCode(schedule.state_code || '');
setPlatform(schedule.platform || 'dutchie');
} else {
// Reset for new schedule
setName('');
setRole('product_refresh');
setDescription('');
setEnabled(true);
setIntervalHours(4);
setPriority(0);
setStateCode('');
setPlatform('dutchie');
}
setError(null);
}, [schedule, isOpen]);
const handleSubmit = async () => {
if (!isImmutable && !name.trim()) {
setError('Name is required');
return;
}
setLoading(true);
setError(null);
try {
// For immutable schedules, only send allowed fields
const data = isImmutable
? {
enabled,
interval_hours: intervalHours,
priority,
}
: {
name: name.trim(),
role,
description: description.trim() || undefined,
enabled,
interval_hours: intervalHours,
priority,
state_code: stateCode.trim() || undefined,
platform: platform.trim() || undefined,
};
if (isNew) {
await api.createTaskSchedule(data as any);
} else {
await api.updateTaskSchedule(schedule!.id, data);
}
onSave();
onClose();
} catch (err: any) {
setError(err.response?.data?.error || err.message || 'Failed to save schedule');
} finally {
setLoading(false);
}
};
if (!isOpen) return null;
return (
<div className="fixed inset-0 z-50 overflow-y-auto">
<div className="flex min-h-full items-center justify-center p-4">
<div className="fixed inset-0 bg-black/50" onClick={onClose} />
<div className="relative bg-white rounded-xl shadow-xl max-w-lg w-full">
<div className="px-6 py-4 border-b border-gray-200 flex items-center justify-between">
<h2 className="text-lg font-semibold text-gray-900">
{isNew ? 'Create Schedule' : 'Edit Schedule'}
</h2>
<button onClick={onClose} className="p-1 hover:bg-gray-100 rounded">
<X className="w-5 h-5 text-gray-500" />
</button>
</div>
<div className="px-6 py-4 space-y-4">
{error && (
<div className="bg-red-50 border border-red-200 rounded-lg p-3 text-red-700 text-sm">
{error}
</div>
)}
{isImmutable && (
<div className="bg-amber-50 border border-amber-200 rounded-lg p-3 flex items-start gap-2">
<Lock className="w-4 h-4 text-amber-600 flex-shrink-0 mt-0.5" />
<div className="text-sm text-amber-800">
<strong>Immutable schedule.</strong> Only <em>Enabled</em>, <em>Interval</em>, and <em>Priority</em> can be modified.
</div>
</div>
)}
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Name *</label>
<input
type="text"
value={name}
onChange={(e) => setName(e.target.value)}
placeholder="e.g., product_refresh_all"
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Role *</label>
<select
value={role}
onChange={(e) => setRole(e.target.value)}
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
>
{TASK_ROLES.map(r => (
<option key={r.id} value={r.id}>{r.name}</option>
))}
</select>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Description</label>
<input
type="text"
value={description}
onChange={(e) => setDescription(e.target.value)}
placeholder="Optional description"
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
<div className="grid grid-cols-2 gap-4">
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Interval (hours) *</label>
<input
type="number"
min="1"
max="168"
value={intervalHours}
onChange={(e) => setIntervalHours(parseInt(e.target.value) || 4)}
className="w-full px-3 py-2 border border-gray-200 rounded-lg"
/>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Priority</label>
<input
type="number"
min="0"
max="100"
value={priority}
onChange={(e) => setPriority(parseInt(e.target.value) || 0)}
className="w-full px-3 py-2 border border-gray-200 rounded-lg"
/>
</div>
</div>
<div className="grid grid-cols-2 gap-4">
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">State Code</label>
<input
type="text"
value={stateCode}
onChange={(e) => setStateCode(e.target.value.toUpperCase())}
placeholder="e.g., AZ"
maxLength={2}
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-1">Platform</label>
<input
type="text"
value={platform}
onChange={(e) => setPlatform(e.target.value)}
placeholder="e.g., dutchie"
disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/>
</div>
</div>
<div className="flex items-center gap-2">
<input
type="checkbox"
id="enabled"
checked={enabled}
onChange={(e) => setEnabled(e.target.checked)}
className="w-4 h-4 text-emerald-600 rounded"
/>
<label htmlFor="enabled" className="text-sm text-gray-700">Enabled</label>
</div>
</div>
<div className="px-6 py-4 border-t border-gray-200 bg-gray-50 flex justify-end gap-3">
<button onClick={onClose} className="px-4 py-2 text-sm text-gray-700 hover:bg-gray-100 rounded-lg">
Cancel
</button>
<button
onClick={handleSubmit}
disabled={loading}
className="px-4 py-2 text-sm bg-emerald-600 text-white rounded-lg hover:bg-emerald-700 disabled:opacity-50 flex items-center gap-2"
>
{loading && <RefreshCw className="w-4 h-4 animate-spin" />}
{isNew ? 'Create' : 'Save'}
</button>
</div>
</div>
</div>
</div>
);
}
const STATUS_COLORS: Record<string, string> = {
pending: 'bg-yellow-100 text-yellow-800',
claimed: 'bg-blue-100 text-blue-800',
@@ -712,13 +452,6 @@ export default function TasksDashboard() {
const [poolPaused, setPoolPaused] = useState(false);
const [showCreateModal, setShowCreateModal] = useState(false);
// Schedules state
const [schedules, setSchedules] = useState<TaskSchedule[]>([]);
const [showSchedules, setShowSchedules] = useState(true);
const [selectedSchedules, setSelectedSchedules] = useState<Set<number>>(new Set());
const [editingSchedule, setEditingSchedule] = useState<TaskSchedule | null>(null);
const [showScheduleModal, setShowScheduleModal] = useState(false);
// Pagination
const [page, setPage] = useState(0);
const tasksPerPage = 25;
@@ -732,7 +465,7 @@ export default function TasksDashboard() {
const fetchData = async () => {
try {
const [tasksRes, countsRes, capacityRes, poolStatus, schedulesRes] = await Promise.all([
const [tasksRes, countsRes, capacityRes, poolStatus] = await Promise.all([
api.getTasks({
role: roleFilter || undefined,
status: statusFilter || undefined,
@@ -741,14 +474,12 @@ export default function TasksDashboard() {
api.getTaskCounts(),
api.getTaskCapacity(),
api.getTaskPoolStatus(),
api.getTaskSchedules(),
]);
setTasks(tasksRes.tasks || []);
setCounts(countsRes);
setCapacity(capacityRes.metrics || []);
setPoolPaused(poolStatus.paused);
setSchedules(schedulesRes.schedules || []);
setError(null);
} catch (err: any) {
setError(err.message || 'Failed to load tasks');
@@ -757,91 +488,6 @@ export default function TasksDashboard() {
}
};
const handleDeleteSchedule = async (scheduleId: number) => {
if (!confirm('Delete this schedule?')) return;
try {
await api.deleteTaskSchedule(scheduleId);
setSelectedSchedules(prev => {
const next = new Set(prev);
next.delete(scheduleId);
return next;
});
fetchData();
} catch (err: any) {
console.error('Delete schedule error:', err);
alert(err.response?.data?.error || 'Failed to delete schedule');
}
};
const handleBulkDeleteSchedules = async () => {
// Filter out immutable schedules from selection
const deletableIds = Array.from(selectedSchedules).filter(id => {
const schedule = schedules.find(s => s.id === id);
return schedule && !schedule.is_immutable;
});
if (deletableIds.length === 0) {
alert('No deletable schedules selected. Immutable schedules cannot be deleted.');
return;
}
const immutableCount = selectedSchedules.size - deletableIds.length;
const confirmMsg = immutableCount > 0
? `Delete ${deletableIds.length} schedule(s)? (${immutableCount} immutable schedule(s) will be skipped)`
: `Delete ${deletableIds.length} selected schedule(s)?`;
if (!confirm(confirmMsg)) return;
try {
await api.deleteTaskSchedulesBulk(deletableIds);
setSelectedSchedules(new Set());
fetchData();
} catch (err: any) {
console.error('Bulk delete error:', err);
alert(err.response?.data?.error || 'Failed to delete schedules');
}
};
const handleToggleSchedule = async (scheduleId: number) => {
try {
await api.toggleTaskSchedule(scheduleId);
fetchData();
} catch (err: any) {
console.error('Toggle schedule error:', err);
alert(err.response?.data?.error || 'Failed to toggle schedule');
}
};
const handleRunScheduleNow = async (scheduleId: number) => {
try {
const result = await api.runTaskScheduleNow(scheduleId);
alert(result.message);
fetchData();
} catch (err: any) {
console.error('Run schedule error:', err);
alert(err.response?.data?.error || 'Failed to run schedule');
}
};
const toggleSelectSchedule = (id: number) => {
setSelectedSchedules(prev => {
const next = new Set(prev);
if (next.has(id)) {
next.delete(id);
} else {
next.add(id);
}
return next;
});
};
const toggleSelectAllSchedules = () => {
if (selectedSchedules.size === schedules.length) {
setSelectedSchedules(new Set());
} else {
setSelectedSchedules(new Set(schedules.map(s => s.id)));
}
};
const handleDeleteTask = async (taskId: number) => {
if (!confirm('Delete this task?')) return;
try {
@@ -937,17 +583,6 @@ export default function TasksDashboard() {
onTaskCreated={fetchData}
/>
{/* Schedule Edit Modal */}
<ScheduleEditModal
isOpen={showScheduleModal}
schedule={editingSchedule}
onClose={() => {
setShowScheduleModal(false);
setEditingSchedule(null);
}}
onSave={fetchData}
/>
{/* Status Summary Cards */}
<div className="grid grid-cols-2 sm:grid-cols-3 lg:grid-cols-6 gap-4">
{Object.entries(counts || {}).map(([status, count]) => (
@@ -1079,238 +714,6 @@ export default function TasksDashboard() {
)}
</div>
{/* Schedules Section */}
<div className="bg-white rounded-lg border border-gray-200 overflow-hidden">
<button
onClick={() => setShowSchedules(!showSchedules)}
className="w-full flex items-center justify-between p-4 hover:bg-gray-50"
>
<div className="flex items-center gap-2">
<Timer className="w-5 h-5 text-emerald-600" />
<span className="font-medium text-gray-900">Schedules ({schedules.length})</span>
</div>
{showSchedules ? (
<ChevronUp className="w-5 h-5 text-gray-400" />
) : (
<ChevronDown className="w-5 h-5 text-gray-400" />
)}
</button>
{showSchedules && (
<div className="border-t border-gray-200">
{/* Schedule Actions */}
<div className="p-4 bg-gray-50 border-b border-gray-200 flex flex-wrap items-center justify-between gap-2">
<div className="flex items-center gap-2">
<button
onClick={() => {
setEditingSchedule(null);
setShowScheduleModal(true);
}}
className="flex items-center gap-1 px-3 py-1.5 text-sm bg-emerald-600 text-white rounded hover:bg-emerald-700"
>
<Plus className="w-4 h-4" />
New Schedule
</button>
{selectedSchedules.size > 0 && (
<button
onClick={handleBulkDeleteSchedules}
className="flex items-center gap-1 px-3 py-1.5 text-sm bg-red-600 text-white rounded hover:bg-red-700"
>
<Trash2 className="w-4 h-4" />
Delete ({selectedSchedules.size})
</button>
)}
</div>
<span className="text-sm text-gray-500">
{schedules.filter(s => s.enabled).length} enabled
</span>
</div>
{schedules.length === 0 ? (
<div className="p-8 text-center text-gray-500">
No schedules configured. Click "New Schedule" to create one.
</div>
) : (
<div className="overflow-x-auto">
<table className="min-w-full divide-y divide-gray-200">
<thead className="bg-gray-50">
<tr>
<th className="px-4 py-3 text-left">
<input
type="checkbox"
checked={selectedSchedules.size === schedules.length && schedules.length > 0}
onChange={toggleSelectAllSchedules}
className="w-4 h-4 text-emerald-600 rounded"
/>
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Name
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Role
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
State
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Method
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Interval
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Last Run
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Next Run
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Status
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase w-32">
Actions
</th>
</tr>
</thead>
<tbody className="divide-y divide-gray-200">
{schedules.map((schedule) => (
<tr key={schedule.id} className="hover:bg-gray-50">
<td className="px-4 py-3">
<input
type="checkbox"
checked={selectedSchedules.has(schedule.id)}
onChange={() => toggleSelectSchedule(schedule.id)}
className="w-4 h-4 text-emerald-600 rounded"
disabled={schedule.is_immutable}
/>
</td>
<td className="px-4 py-3">
<div className="flex items-center gap-2">
{schedule.is_immutable && (
<span title="Immutable schedule (cannot be deleted)">
<Lock className="w-3.5 h-3.5 text-amber-500 flex-shrink-0" />
</span>
)}
<div>
<div className="text-sm font-medium text-gray-900">{schedule.name}</div>
{schedule.description && (
<div className="text-xs text-gray-500">{schedule.description}</div>
)}
</div>
</div>
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.role.replace(/_/g, ' ')}
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.state_code ? (
<span className="inline-flex items-center gap-1 px-2 py-0.5 bg-blue-50 text-blue-700 rounded font-medium">
<Globe className="w-3 h-3" />
{schedule.state_code}
</span>
) : (
<span className="text-gray-400">-</span>
)}
</td>
<td className="px-4 py-3 text-sm">
<span className={`inline-flex items-center px-2 py-0.5 rounded text-xs font-medium ${
schedule.method === 'http'
? 'bg-purple-100 text-purple-700'
: schedule.method === 'curl'
? 'bg-orange-100 text-orange-700'
: 'bg-gray-100 text-gray-600'
}`}>
{schedule.method || 'any'}
</span>
</td>
<td className="px-4 py-3 text-sm text-gray-600">
Every {schedule.interval_hours}h
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.last_run_at ? formatTimeAgo(schedule.last_run_at) : '-'}
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.next_run_at ? formatTimeAgo(schedule.next_run_at) : '-'}
</td>
<td className="px-4 py-3">
<span
className={`inline-flex items-center gap-1 px-2 py-1 rounded-full text-xs font-medium ${
schedule.enabled
? 'bg-green-100 text-green-800'
: 'bg-gray-100 text-gray-800'
}`}
>
{schedule.enabled ? (
<>
<Play className="w-3 h-3" />
Active
</>
) : (
<>
<Pause className="w-3 h-3" />
Paused
</>
)}
</span>
</td>
<td className="px-4 py-3">
<div className="flex items-center gap-1">
<button
onClick={() => handleRunScheduleNow(schedule.id)}
className="p-1.5 text-gray-400 hover:text-emerald-600 hover:bg-emerald-50 rounded transition-colors"
title="Run now"
>
<PlayCircle className="w-4 h-4" />
</button>
<button
onClick={() => handleToggleSchedule(schedule.id)}
className={`p-1.5 rounded transition-colors ${
schedule.enabled
? 'text-gray-400 hover:text-yellow-600 hover:bg-yellow-50'
: 'text-gray-400 hover:text-green-600 hover:bg-green-50'
}`}
title={schedule.enabled ? 'Pause' : 'Enable'}
>
{schedule.enabled ? (
<Pause className="w-4 h-4" />
) : (
<Play className="w-4 h-4" />
)}
</button>
<button
onClick={() => {
setEditingSchedule(schedule);
setShowScheduleModal(true);
}}
className="p-1.5 text-gray-400 hover:text-blue-600 hover:bg-blue-50 rounded transition-colors"
title={schedule.is_immutable ? 'Edit (limited fields)' : 'Edit'}
>
<Edit2 className="w-4 h-4" />
</button>
<button
onClick={() => handleDeleteSchedule(schedule.id)}
disabled={schedule.is_immutable}
className={`p-1.5 rounded transition-colors ${
schedule.is_immutable
? 'text-gray-300 cursor-not-allowed'
: 'text-gray-400 hover:text-red-600 hover:bg-red-50'
}`}
title={schedule.is_immutable ? 'Cannot delete immutable schedule' : 'Delete'}
>
<Trash2 className="w-4 h-4" />
</button>
</div>
</td>
</tr>
))}
</tbody>
</table>
</div>
)}
</div>
)}
</div>
{/* Filters */}
<div className="flex flex-col sm:flex-row gap-4">
<div className="relative flex-1">