Compare commits

..

5 Commits

Author SHA1 Message Date
Kelly
92f88fdcd6 fix(workers): Increase max concurrent tasks to 15 and add K8s permission rule
- Change MAX_CONCURRENT_TASKS default from 3 to 15
- Add CLAUDE.md rule requiring explicit permission before kubectl commands

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 10:54:33 -07:00
Kelly
832ef1cf83 feat(scheduler): Immutable schedules and HTTP-only pipeline
## Changes
- **Migration 089**: Add is_immutable and method columns to task_schedules
  - Per-state product_discovery schedules (4h default)
  - Store discovery weekly (168h)
  - All schedules use HTTP transport (Puppeteer/browser)
- **Task Scheduler**: HTTP-only product discovery with per-state scheduling
  - Each state has its own immutable schedule
  - Schedules can be edited (interval/priority) but not deleted
- **TasksDashboard UI**: Full immutability support
  - Lock icon for immutable schedules
  - State and Method columns in schedules table
  - Disabled delete for immutable, restricted edit fields
- **Store Discovery HTTP**: Auto-queue product_discovery for new stores
- **Migration 088**: Discovery payloads storage schema

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 09:24:08 -07:00
Kelly
9a24b4896c feat(tasks): Dual transport handlers and self-healing product_refresh
- Rename product-discovery.ts to product-discovery-curl.ts (axios-based)
- Rename payload-fetch.ts to payload-fetch-curl.ts
- Add product-discovery-http.ts (Puppeteer browser-based handler)
- Add method field to CreateTaskParams for transport selection
- Update task-service to insert method column on task creation
- Update task-worker with getHandlerForTask() for dual transport routing
- product_refresh now queues upstream tasks when no payload exists:
  - Has platform_dispensary_id → queues product_discovery (http)
  - No platform_dispensary_id → queues entry_point_discovery

This enables HTTP workers to pick up browser-based tasks while curl
workers handle axios-based tasks, and prevents product_refresh from
failing repeatedly when no crawl has been performed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 03:02:56 -07:00
Kelly
dd8fce6e35 fix(proxy): Convert non-standard proxy URL format and simplify preflight
- CrawlRotator.getProxyUrl() now converts non-standard format (http://host:port:user:pass) to standard format (http://user:pass@host:port)
- Simplify puppeteer preflight to only use ipify.org for IP verification (much lighter than fingerprint.com)
- Remove heavy anti-detect site tests from preflight - not needed, trust stealth plugin
- Fixes 503 errors when using session-based residential proxies

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 02:13:51 -07:00
Kelly
f82eed4dc3 feat(workers): Add proxy reload, staggered tasks, and bulk proxy import
- Periodic proxy reload: Workers now reload proxies every 60s to pick up changes
- Staggered task scheduling: New API endpoints for creating tasks with delays
- Bulk proxy import: Script supports multiple URL formats including host:port:user:pass
- Proxy URL column: Migration 086 adds proxy_url for non-standard formats

Key changes:
- crawl-rotator.ts: Added reloadIfStale(), isStale(), setReloadInterval()
- task-worker.ts: Calls reloadIfStale() in main loop
- task-service.ts: Added createStaggeredTasks() and createAZStoreTasks()
- tasks.ts: Added POST /batch/staggered and /batch/az-stores endpoints
- import-proxies.ts: New script for bulk proxy import
- CLAUDE.md: Documented staggered task workflow

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 01:53:15 -07:00
23 changed files with 2591 additions and 349 deletions

101
CLAUDE.md
View File

@@ -17,6 +17,35 @@ Never deploy unless user explicitly says: "CLAUDE — DEPLOYMENT IS NOW AUTHORIZ
### 5. DB POOL ONLY ### 5. DB POOL ONLY
Never import `src/db/migrate.ts` at runtime. Use `src/db/pool.ts` for DB access. Never import `src/db/migrate.ts` at runtime. Use `src/db/pool.ts` for DB access.
### 6. K8S POD LIMITS — CRITICAL
**MAX 8 PODS** for `scraper-worker` deployment. NEVER EXCEED THIS.
**Pods vs Workers:**
- **Pod** = Kubernetes container instance (MAX 8)
- **Worker** = Concurrent task runner INSIDE a pod (controlled by `MAX_CONCURRENT_TASKS` env var)
- Formula: `8 pods × MAX_CONCURRENT_TASKS = total concurrent workers`
**To increase workers:** Change `MAX_CONCURRENT_TASKS` env var, NOT replicas.
```bash
# CORRECT - increase workers per pod
kubectl set env deployment/scraper-worker -n dispensary-scraper MAX_CONCURRENT_TASKS=5
# WRONG - never scale above 8 replicas
kubectl scale deployment/scraper-worker --replicas=20 # NEVER DO THIS
```
**If K8s API returns ServiceUnavailable:** STOP IMMEDIATELY. Do not retry. The cluster is overloaded.
### 7. K8S REQUIRES EXPLICIT PERMISSION
**NEVER run kubectl commands without explicit user permission.**
Before running ANY `kubectl` command (scale, rollout, set env, delete, apply, etc.):
1. Tell the user what you want to do
2. Wait for explicit approval
3. Only then execute the command
This applies to ALL kubectl operations - even read-only ones like `kubectl get pods`.
--- ---
## Quick Reference ## Quick Reference
@@ -205,6 +234,78 @@ These binaries mimic real browser TLS fingerprints to avoid detection.
--- ---
## Staggered Task Workflow (Added 2025-12-12)
### Overview
When creating many tasks at once (e.g., product refresh for all AZ stores), staggered scheduling prevents resource contention, proxy assignment lag, and API rate limiting.
### How It Works
```
1. Task created with scheduled_for = NOW() + (index * stagger_seconds)
2. Worker claims task only when scheduled_for <= NOW()
3. Worker runs preflight on EVERY task claim (proxy health check)
4. If preflight passes, worker executes task
5. If preflight fails, task released back to pending for another worker
6. Worker finishes task, polls for next available task
7. Repeat - preflight runs on each new task claim
```
### Key Points
- **Preflight is per-task, not per-startup**: Each task claim triggers a new preflight check
- **Stagger prevents thundering herd**: 15 seconds between tasks is default
- **Task assignment is the trigger**: Worker picks up task → runs preflight → executes if passed
### API Endpoints
```bash
# Create staggered tasks for specific dispensary IDs
POST /api/tasks/batch/staggered
{
"dispensary_ids": [1, 2, 3, 4],
"role": "product_refresh", # or "product_discovery"
"stagger_seconds": 15, # default: 15
"platform": "dutchie", # default: "dutchie"
"method": null # "curl" | "http" | null
}
# Create staggered tasks for AZ stores (convenience endpoint)
POST /api/tasks/batch/az-stores
{
"total_tasks": 24, # default: 24
"stagger_seconds": 15, # default: 15
"split_roles": true # default: true (12 refresh, 12 discovery)
}
```
### Example: 24 Tasks for AZ Stores
```bash
curl -X POST http://localhost:3010/api/tasks/batch/az-stores \
-H "Content-Type: application/json" \
-d '{"total_tasks": 24, "stagger_seconds": 15, "split_roles": true}'
```
Response:
```json
{
"success": true,
"total": 24,
"product_refresh": 12,
"product_discovery": 12,
"stagger_seconds": 15,
"total_duration_seconds": 345,
"estimated_completion": "2025-12-12T08:40:00.000Z",
"message": "Created 24 staggered tasks for AZ stores (12 refresh, 12 discovery)"
}
```
### Related Files
| File | Purpose |
|------|---------|
| `src/tasks/task-service.ts` | `createStaggeredTasks()` and `createAZStoreTasks()` methods |
| `src/routes/tasks.ts` | API endpoints for batch task creation |
| `src/tasks/task-worker.ts` | Worker task claiming and preflight logic |
---
## Documentation ## Documentation
| Doc | Purpose | | Doc | Purpose |

View File

@@ -0,0 +1,10 @@
-- Migration 086: Add proxy_url column for alternative URL formats
-- Some proxy providers use non-standard URL formats (e.g., host:port:user:pass)
-- This column allows storing the raw URL directly
-- Add proxy_url column - if set, used directly instead of constructing from parts
ALTER TABLE proxies
ADD COLUMN IF NOT EXISTS proxy_url TEXT;
-- Add comment
COMMENT ON COLUMN proxies.proxy_url IS 'Raw proxy URL (if provider uses non-standard format). Takes precedence over constructed URL from host/port/user/pass.';

View File

@@ -0,0 +1,30 @@
-- Migration 088: Extend raw_crawl_payloads for discovery payloads
--
-- Enables saving raw store data from Dutchie discovery crawls.
-- Store discovery returns raw dispensary objects - save them for historical analysis.
-- Add payload_type to distinguish product crawls from discovery crawls
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS payload_type VARCHAR(32) NOT NULL DEFAULT 'product';
-- Add state_code for discovery payloads (null for product payloads)
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS state_code VARCHAR(10);
-- Add store_count for discovery payloads (alternative to product_count)
ALTER TABLE raw_crawl_payloads
ADD COLUMN IF NOT EXISTS store_count INTEGER;
-- Make dispensary_id nullable for discovery payloads
ALTER TABLE raw_crawl_payloads
ALTER COLUMN dispensary_id DROP NOT NULL;
-- Add index for discovery payload queries
CREATE INDEX IF NOT EXISTS idx_raw_crawl_payloads_type_state
ON raw_crawl_payloads(payload_type, state_code)
WHERE payload_type = 'store_discovery';
-- Comments
COMMENT ON COLUMN raw_crawl_payloads.payload_type IS 'Type: product (default), store_discovery';
COMMENT ON COLUMN raw_crawl_payloads.state_code IS 'State code for discovery payloads (e.g., AZ, MI)';
COMMENT ON COLUMN raw_crawl_payloads.store_count IS 'Number of stores in discovery payload';

View File

@@ -0,0 +1,105 @@
-- Migration 089: Immutable Schedules with Per-State Product Discovery
--
-- Key changes:
-- 1. Add is_immutable column - schedules can be edited but not deleted
-- 2. Add method column - all tasks use 'http' (Puppeteer transport)
-- 3. Store discovery weekly (168h)
-- 4. Per-state product_discovery schedules (4h default)
-- 5. Remove old payload_fetch schedules
-- =====================================================
-- 1) Add new columns to task_schedules
-- =====================================================
ALTER TABLE task_schedules
ADD COLUMN IF NOT EXISTS is_immutable BOOLEAN DEFAULT FALSE;
ALTER TABLE task_schedules
ADD COLUMN IF NOT EXISTS method VARCHAR(10) DEFAULT 'http';
-- =====================================================
-- 2) Update store_discovery to weekly and immutable
-- =====================================================
UPDATE task_schedules
SET interval_hours = 168, -- 7 days
is_immutable = TRUE,
method = 'http',
description = 'Discover new Dutchie stores weekly (HTTP transport)'
WHERE name = 'store_discovery_dutchie';
-- Insert if doesn't exist
INSERT INTO task_schedules (name, role, interval_hours, priority, description, is_immutable, method, platform, next_run_at)
VALUES ('store_discovery_dutchie', 'store_discovery', 168, 5, 'Discover new Dutchie stores weekly (HTTP transport)', TRUE, 'http', 'dutchie', NOW())
ON CONFLICT (name) DO UPDATE SET
interval_hours = 168,
is_immutable = TRUE,
method = 'http',
description = 'Discover new Dutchie stores weekly (HTTP transport)';
-- =====================================================
-- 3) Remove old payload_fetch and product_refresh_all schedules
-- =====================================================
DELETE FROM task_schedules WHERE name IN ('payload_fetch_all', 'product_refresh_all');
-- =====================================================
-- 4) Create per-state product_discovery schedules
-- =====================================================
-- One schedule per state that has dispensaries with active cannabis programs
INSERT INTO task_schedules (name, role, state_code, interval_hours, priority, description, is_immutable, method, enabled, next_run_at)
SELECT
'product_discovery_' || lower(s.code) AS name,
'product_discovery' AS role,
s.code AS state_code,
4 AS interval_hours, -- 4 hours default, editable
10 AS priority,
'Product discovery for ' || s.name || ' dispensaries (HTTP transport)' AS description,
TRUE AS is_immutable, -- Can edit but not delete
'http' AS method,
CASE WHEN s.is_active THEN TRUE ELSE FALSE END AS enabled,
-- Stagger start times: each state starts 5 minutes after the previous
NOW() + (ROW_NUMBER() OVER (ORDER BY s.code) * INTERVAL '5 minutes') AS next_run_at
FROM states s
WHERE EXISTS (
SELECT 1 FROM dispensaries d
WHERE d.state_id = s.id AND d.crawl_enabled = true
)
ON CONFLICT (name) DO UPDATE SET
is_immutable = TRUE,
method = 'http',
description = EXCLUDED.description;
-- Also create schedules for states that might have stores discovered later
INSERT INTO task_schedules (name, role, state_code, interval_hours, priority, description, is_immutable, method, enabled, next_run_at)
SELECT
'product_discovery_' || lower(s.code) AS name,
'product_discovery' AS role,
s.code AS state_code,
4 AS interval_hours,
10 AS priority,
'Product discovery for ' || s.name || ' dispensaries (HTTP transport)' AS description,
TRUE AS is_immutable,
'http' AS method,
FALSE AS enabled, -- Disabled until stores exist
NOW() + INTERVAL '1 hour'
FROM states s
WHERE NOT EXISTS (
SELECT 1 FROM task_schedules ts WHERE ts.name = 'product_discovery_' || lower(s.code)
)
ON CONFLICT (name) DO NOTHING;
-- =====================================================
-- 5) Make analytics_refresh immutable
-- =====================================================
UPDATE task_schedules
SET is_immutable = TRUE, method = 'http'
WHERE name = 'analytics_refresh';
-- =====================================================
-- 6) Add index for schedule lookups
-- =====================================================
CREATE INDEX IF NOT EXISTS idx_task_schedules_state_code
ON task_schedules(state_code)
WHERE state_code IS NOT NULL;
-- Comments
COMMENT ON COLUMN task_schedules.is_immutable IS 'If TRUE, schedule cannot be deleted (only edited)';
COMMENT ON COLUMN task_schedules.method IS 'Transport method: http (Puppeteer/browser) or curl (axios)';

View File

@@ -157,6 +157,9 @@ router.get('/capacity/:role', async (req: Request, res: Response) => {
/** /**
* GET /api/tasks/schedules * GET /api/tasks/schedules
* List all task schedules * List all task schedules
*
* Returns schedules with is_immutable flag - immutable schedules can only
* have their interval_hours, priority, and enabled fields updated (not deleted).
*/ */
router.get('/schedules', async (req: Request, res: Response) => { router.get('/schedules', async (req: Request, res: Response) => {
try { try {
@@ -164,7 +167,9 @@ router.get('/schedules', async (req: Request, res: Response) => {
let query = ` let query = `
SELECT id, name, role, description, enabled, interval_hours, SELECT id, name, role, description, enabled, interval_hours,
priority, state_code, platform, last_run_at, next_run_at, priority, state_code, platform, method,
COALESCE(is_immutable, false) as is_immutable,
last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at last_task_count, last_error, created_at, updated_at
FROM task_schedules FROM task_schedules
`; `;
@@ -173,7 +178,15 @@ router.get('/schedules', async (req: Request, res: Response) => {
query += ` WHERE enabled = true`; query += ` WHERE enabled = true`;
} }
query += ` ORDER BY name`; query += ` ORDER BY
CASE role
WHEN 'store_discovery' THEN 1
WHEN 'product_discovery' THEN 2
WHEN 'analytics_refresh' THEN 3
ELSE 4
END,
state_code NULLS FIRST,
name`;
const result = await pool.query(query); const result = await pool.query(query);
res.json({ schedules: result.rows }); res.json({ schedules: result.rows });
@@ -187,25 +200,45 @@ router.get('/schedules', async (req: Request, res: Response) => {
* DELETE /api/tasks/schedules * DELETE /api/tasks/schedules
* Bulk delete schedules * Bulk delete schedules
* *
* Immutable schedules are automatically skipped (not deleted).
*
* Body: * Body:
* - ids: number[] (required) - array of schedule IDs to delete * - ids: number[] (required) - array of schedule IDs to delete
* - all: boolean (optional) - if true, delete all schedules (ids ignored) * - all: boolean (optional) - if true, delete all non-immutable schedules (ids ignored)
*/ */
router.delete('/schedules', async (req: Request, res: Response) => { router.delete('/schedules', async (req: Request, res: Response) => {
try { try {
const { ids, all } = req.body; const { ids, all } = req.body;
let result; let result;
let skippedImmutable: { id: number; name: string }[] = [];
if (all === true) { if (all === true) {
// Delete all schedules // First, find immutable schedules that will be skipped
const immutableResult = await pool.query(`
SELECT id, name FROM task_schedules WHERE is_immutable = true
`);
skippedImmutable = immutableResult.rows;
// Delete all non-immutable schedules
result = await pool.query(` result = await pool.query(`
DELETE FROM task_schedules RETURNING id, name DELETE FROM task_schedules
WHERE COALESCE(is_immutable, false) = false
RETURNING id, name
`); `);
} else if (Array.isArray(ids) && ids.length > 0) { } else if (Array.isArray(ids) && ids.length > 0) {
// Delete specific schedules by IDs // First, find which of the requested IDs are immutable
const immutableResult = await pool.query(`
SELECT id, name FROM task_schedules
WHERE id = ANY($1) AND is_immutable = true
`, [ids]);
skippedImmutable = immutableResult.rows;
// Delete only non-immutable schedules from the requested IDs
result = await pool.query(` result = await pool.query(`
DELETE FROM task_schedules WHERE id = ANY($1) RETURNING id, name DELETE FROM task_schedules
WHERE id = ANY($1) AND COALESCE(is_immutable, false) = false
RETURNING id, name
`, [ids]); `, [ids]);
} else { } else {
return res.status(400).json({ return res.status(400).json({
@@ -217,7 +250,11 @@ router.delete('/schedules', async (req: Request, res: Response) => {
success: true, success: true,
deleted_count: result.rowCount, deleted_count: result.rowCount,
deleted: result.rows, deleted: result.rows,
message: `Deleted ${result.rowCount} schedule(s)`, skipped_immutable_count: skippedImmutable.length,
skipped_immutable: skippedImmutable,
message: skippedImmutable.length > 0
? `Deleted ${result.rowCount} schedule(s), skipped ${skippedImmutable.length} immutable schedule(s)`
: `Deleted ${result.rowCount} schedule(s)`,
}); });
} catch (error: unknown) { } catch (error: unknown) {
console.error('Error bulk deleting schedules:', error); console.error('Error bulk deleting schedules:', error);
@@ -311,6 +348,13 @@ router.get('/schedules/:id', async (req: Request, res: Response) => {
/** /**
* PUT /api/tasks/schedules/:id * PUT /api/tasks/schedules/:id
* Update an existing schedule * Update an existing schedule
*
* For IMMUTABLE schedules, only these fields can be updated:
* - enabled (turn on/off)
* - interval_hours (change frequency)
* - priority (change priority)
*
* For regular schedules, all fields can be updated.
*/ */
router.put('/schedules/:id', async (req: Request, res: Response) => { router.put('/schedules/:id', async (req: Request, res: Response) => {
try { try {
@@ -326,23 +370,68 @@ router.put('/schedules/:id', async (req: Request, res: Response) => {
platform, platform,
} = req.body; } = req.body;
// First check if schedule exists and if it's immutable
const checkResult = await pool.query(`
SELECT id, name, COALESCE(is_immutable, false) as is_immutable
FROM task_schedules WHERE id = $1
`, [scheduleId]);
if (checkResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const schedule = checkResult.rows[0];
const isImmutable = schedule.is_immutable;
// For immutable schedules, reject attempts to change protected fields
if (isImmutable) {
const protectedFields: string[] = [];
if (name !== undefined) protectedFields.push('name');
if (role !== undefined) protectedFields.push('role');
if (description !== undefined) protectedFields.push('description');
if (state_code !== undefined) protectedFields.push('state_code');
if (platform !== undefined) protectedFields.push('platform');
if (protectedFields.length > 0) {
return res.status(403).json({
error: 'Cannot modify protected fields on immutable schedule',
message: `Schedule "${schedule.name}" is immutable. Only enabled, interval_hours, and priority can be changed.`,
protected_fields: protectedFields,
allowed_fields: ['enabled', 'interval_hours', 'priority'],
});
}
}
// Build dynamic update query // Build dynamic update query
const updates: string[] = []; const updates: string[] = [];
const values: any[] = []; const values: any[] = [];
let paramIndex = 1; let paramIndex = 1;
if (name !== undefined) { // These fields can only be updated on non-immutable schedules
updates.push(`name = $${paramIndex++}`); if (!isImmutable) {
values.push(name); if (name !== undefined) {
} updates.push(`name = $${paramIndex++}`);
if (role !== undefined) { values.push(name);
updates.push(`role = $${paramIndex++}`); }
values.push(role); if (role !== undefined) {
} updates.push(`role = $${paramIndex++}`);
if (description !== undefined) { values.push(role);
updates.push(`description = $${paramIndex++}`); }
values.push(description); if (description !== undefined) {
updates.push(`description = $${paramIndex++}`);
values.push(description);
}
if (state_code !== undefined) {
updates.push(`state_code = $${paramIndex++}`);
values.push(state_code || null);
}
if (platform !== undefined) {
updates.push(`platform = $${paramIndex++}`);
values.push(platform || null);
}
} }
// These fields can be updated on ALL schedules (including immutable)
if (enabled !== undefined) { if (enabled !== undefined) {
updates.push(`enabled = $${paramIndex++}`); updates.push(`enabled = $${paramIndex++}`);
values.push(enabled); values.push(enabled);
@@ -360,14 +449,6 @@ router.put('/schedules/:id', async (req: Request, res: Response) => {
updates.push(`priority = $${paramIndex++}`); updates.push(`priority = $${paramIndex++}`);
values.push(priority); values.push(priority);
} }
if (state_code !== undefined) {
updates.push(`state_code = $${paramIndex++}`);
values.push(state_code || null);
}
if (platform !== undefined) {
updates.push(`platform = $${paramIndex++}`);
values.push(platform || null);
}
if (updates.length === 0) { if (updates.length === 0) {
return res.status(400).json({ error: 'No fields to update' }); return res.status(400).json({ error: 'No fields to update' });
@@ -381,14 +462,12 @@ router.put('/schedules/:id', async (req: Request, res: Response) => {
SET ${updates.join(', ')} SET ${updates.join(', ')}
WHERE id = $${paramIndex} WHERE id = $${paramIndex}
RETURNING id, name, role, description, enabled, interval_hours, RETURNING id, name, role, description, enabled, interval_hours,
priority, state_code, platform, last_run_at, next_run_at, priority, state_code, platform, method,
COALESCE(is_immutable, false) as is_immutable,
last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at last_task_count, last_error, created_at, updated_at
`, values); `, values);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
res.json(result.rows[0]); res.json(result.rows[0]);
} catch (error: any) { } catch (error: any) {
if (error.code === '23505') { if (error.code === '23505') {
@@ -402,22 +481,41 @@ router.put('/schedules/:id', async (req: Request, res: Response) => {
/** /**
* DELETE /api/tasks/schedules/:id * DELETE /api/tasks/schedules/:id
* Delete a schedule * Delete a schedule
*
* Immutable schedules cannot be deleted - they can only be disabled.
*/ */
router.delete('/schedules/:id', async (req: Request, res: Response) => { router.delete('/schedules/:id', async (req: Request, res: Response) => {
try { try {
const scheduleId = parseInt(req.params.id, 10); const scheduleId = parseInt(req.params.id, 10);
const result = await pool.query(` // First check if schedule exists and is immutable
DELETE FROM task_schedules WHERE id = $1 RETURNING id, name const checkResult = await pool.query(`
SELECT id, name, COALESCE(is_immutable, false) as is_immutable
FROM task_schedules WHERE id = $1
`, [scheduleId]); `, [scheduleId]);
if (result.rows.length === 0) { if (checkResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' }); return res.status(404).json({ error: 'Schedule not found' });
} }
const schedule = checkResult.rows[0];
// Prevent deletion of immutable schedules
if (schedule.is_immutable) {
return res.status(403).json({
error: 'Cannot delete immutable schedule',
message: `Schedule "${schedule.name}" is immutable and cannot be deleted. You can disable it instead.`,
schedule_id: scheduleId,
is_immutable: true,
});
}
// Delete the schedule
await pool.query(`DELETE FROM task_schedules WHERE id = $1`, [scheduleId]);
res.json({ res.json({
success: true, success: true,
message: `Schedule "${result.rows[0].name}" deleted`, message: `Schedule "${schedule.name}" deleted`,
}); });
} catch (error: unknown) { } catch (error: unknown) {
console.error('Error deleting schedule:', error); console.error('Error deleting schedule:', error);
@@ -976,6 +1074,123 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => {
} }
}); });
// ============================================================
// STAGGERED BATCH TASK CREATION
// ============================================================
/**
* POST /api/tasks/batch/staggered
* Create multiple tasks with staggered start times
*
* This endpoint prevents resource contention when creating many tasks by
* staggering their scheduled_for timestamps. Each task becomes eligible
* for claiming only after its scheduled time.
*
* WORKFLOW:
* 1. Tasks created with scheduled_for = NOW() + (index * stagger_seconds)
* 2. Worker claims task only when scheduled_for <= NOW()
* 3. Worker runs preflight on EVERY task claim
* 4. If preflight passes, worker executes task
* 5. If preflight fails, task released back to pending for another worker
*
* Body:
* - dispensary_ids: number[] (required) - Array of dispensary IDs
* - role: TaskRole (required) - 'product_refresh' | 'product_discovery'
* - stagger_seconds: number (default: 15) - Seconds between each task start
* - platform: string (default: 'dutchie')
* - method: 'curl' | 'http' | null (default: null)
*/
router.post('/batch/staggered', async (req: Request, res: Response) => {
try {
const {
dispensary_ids,
role,
stagger_seconds = 15,
platform = 'dutchie',
method = null,
} = req.body;
if (!dispensary_ids || !Array.isArray(dispensary_ids) || dispensary_ids.length === 0) {
return res.status(400).json({ error: 'dispensary_ids array is required' });
}
if (!role) {
return res.status(400).json({ error: 'role is required' });
}
const result = await taskService.createStaggeredTasks(
dispensary_ids,
role as TaskRole,
stagger_seconds,
platform,
method
);
const totalDuration = (dispensary_ids.length - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
created: result.created,
task_ids: result.taskIds,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.created} staggered ${role} tasks (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
});
} catch (error: unknown) {
console.error('Error creating staggered tasks:', error);
res.status(500).json({ error: 'Failed to create staggered tasks' });
}
});
/**
* POST /api/tasks/batch/az-stores
* Convenience endpoint to create staggered tasks for Arizona stores
*
* Body:
* - total_tasks: number (default: 24) - Total tasks to create
* - stagger_seconds: number (default: 15) - Seconds between each task
* - split_roles: boolean (default: true) - Split between product_refresh and product_discovery
*/
router.post('/batch/az-stores', async (req: Request, res: Response) => {
try {
const {
total_tasks = 24,
stagger_seconds = 15,
split_roles = true,
} = req.body;
const result = await taskService.createAZStoreTasks(
total_tasks,
stagger_seconds,
split_roles
);
const totalDuration = (result.total - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
total: result.total,
product_refresh: result.product_refresh,
product_discovery: result.product_discovery,
task_ids: result.taskIds,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.total} staggered tasks for AZ stores (${result.product_refresh} refresh, ${result.product_discovery} discovery)`,
});
} catch (error: unknown) {
console.error('Error creating AZ store tasks:', error);
res.status(500).json({ error: 'Failed to create AZ store tasks' });
}
});
// ============================================================
// TASK POOL MANAGEMENT
// ============================================================
/** /**
* GET /api/tasks/pool/status * GET /api/tasks/pool/status
* Check if task pool is paused * Check if task pool is paused

View File

@@ -252,12 +252,9 @@ router.post('/deregister', async (req: Request, res: Response) => {
// Release the name back to the pool // Release the name back to the pool
await pool.query('SELECT release_worker_name($1)', [worker_id]); await pool.query('SELECT release_worker_name($1)', [worker_id]);
// Mark as terminated // Delete the worker entry (clean shutdown)
const { rows } = await pool.query(` const { rows } = await pool.query(`
UPDATE worker_registry DELETE FROM worker_registry
SET status = 'terminated',
current_task_id = NULL,
updated_at = NOW()
WHERE worker_id = $1 WHERE worker_id = $1
RETURNING id, friendly_name RETURNING id, friendly_name
`, [worker_id]); `, [worker_id]);

View File

@@ -0,0 +1,284 @@
/**
* Bulk Proxy Import Script
*
* Imports proxies from various formats into the proxies table.
* Supports:
* - Standard format: http://user:pass@host:port
* - Colon format: http://host:port:user:pass
* - Simple format: host:port:user:pass (defaults to http)
*
* Usage:
* npx tsx src/scripts/import-proxies.ts < proxies.txt
* echo "http://host:port:user:pass" | npx tsx src/scripts/import-proxies.ts
* npx tsx src/scripts/import-proxies.ts --file proxies.txt
* npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"
*
* Options:
* --file <path> Read proxies from file (one per line)
* --url <url> Import a single proxy URL
* --max-connections Set max_connections for all imported proxies (default: 1)
* --dry-run Parse and show what would be imported without inserting
*/
import { getPool } from '../db/pool';
import * as fs from 'fs';
import * as readline from 'readline';
interface ParsedProxy {
protocol: string;
host: string;
port: number;
username?: string;
password?: string;
rawUrl: string;
}
/**
* Parse a proxy URL in various formats
*/
function parseProxyUrl(input: string): ParsedProxy | null {
const trimmed = input.trim();
if (!trimmed || trimmed.startsWith('#')) return null;
// Format 1: Standard URL format - http://user:pass@host:port
const standardMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):([^@]+)@([^:]+):(\d+)$/);
if (standardMatch) {
return {
protocol: standardMatch[1],
username: standardMatch[2],
password: standardMatch[3],
host: standardMatch[4],
port: parseInt(standardMatch[5], 10),
rawUrl: trimmed,
};
}
// Format 2: Standard URL without auth - http://host:port
const noAuthMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+)$/);
if (noAuthMatch) {
return {
protocol: noAuthMatch[1],
host: noAuthMatch[2],
port: parseInt(noAuthMatch[3], 10),
rawUrl: trimmed,
};
}
// Format 3: Colon format with protocol - http://host:port:user:pass
const colonWithProtocolMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+):([^:]+):(.+)$/);
if (colonWithProtocolMatch) {
return {
protocol: colonWithProtocolMatch[1],
host: colonWithProtocolMatch[2],
port: parseInt(colonWithProtocolMatch[3], 10),
username: colonWithProtocolMatch[4],
password: colonWithProtocolMatch[5],
rawUrl: trimmed, // Keep raw URL for non-standard format
};
}
// Format 4: Colon format without protocol - host:port:user:pass
const colonMatch = trimmed.match(/^([^:]+):(\d+):([^:]+):(.+)$/);
if (colonMatch) {
return {
protocol: 'http',
host: colonMatch[1],
port: parseInt(colonMatch[2], 10),
username: colonMatch[3],
password: colonMatch[4],
rawUrl: `http://${trimmed}`, // Construct raw URL
};
}
// Format 5: Simple host:port
const simpleMatch = trimmed.match(/^([^:]+):(\d+)$/);
if (simpleMatch) {
return {
protocol: 'http',
host: simpleMatch[1],
port: parseInt(simpleMatch[2], 10),
rawUrl: `http://${trimmed}`,
};
}
console.error(`[ImportProxies] Could not parse: ${trimmed}`);
return null;
}
/**
* Check if proxy URL is in non-standard format (needs proxy_url column)
*/
function isNonStandardFormat(rawUrl: string): boolean {
// Colon format: protocol://host:port:user:pass
return /^(https?|socks5):\/\/[^:]+:\d+:[^:]+:.+$/.test(rawUrl);
}
async function importProxies(proxies: ParsedProxy[], maxConnections: number, dryRun: boolean) {
if (dryRun) {
console.log('\n[ImportProxies] DRY RUN - Would import:');
for (const p of proxies) {
const needsRawUrl = isNonStandardFormat(p.rawUrl);
console.log(` ${p.host}:${p.port} (${p.protocol}) user=${p.username || 'none'} needsProxyUrl=${needsRawUrl}`);
}
console.log(`\nTotal: ${proxies.length} proxies`);
return;
}
const pool = getPool();
let inserted = 0;
let skipped = 0;
for (const proxy of proxies) {
try {
// Determine if we need to store the raw URL (non-standard format)
const needsRawUrl = isNonStandardFormat(proxy.rawUrl);
// Use different conflict resolution based on format
// Non-standard format: unique by proxy_url (session-based residential proxies)
// Standard format: unique by host/port/protocol
const query = needsRawUrl
? `
INSERT INTO proxies (host, port, protocol, username, password, max_connections, proxy_url, active)
VALUES ($1, $2, $3, $4, $5, $6, $7, true)
ON CONFLICT (proxy_url) WHERE proxy_url IS NOT NULL
DO UPDATE SET
max_connections = EXCLUDED.max_connections,
active = true,
updated_at = NOW()
RETURNING id, (xmax = 0) as is_insert
`
: `
INSERT INTO proxies (host, port, protocol, username, password, max_connections, proxy_url, active)
VALUES ($1, $2, $3, $4, $5, $6, $7, true)
ON CONFLICT (host, port, protocol)
DO UPDATE SET
username = EXCLUDED.username,
password = EXCLUDED.password,
max_connections = EXCLUDED.max_connections,
proxy_url = EXCLUDED.proxy_url,
active = true,
updated_at = NOW()
RETURNING id, (xmax = 0) as is_insert
`;
const result = await pool.query(query, [
proxy.host,
proxy.port,
proxy.protocol,
proxy.username || null,
proxy.password || null,
maxConnections,
needsRawUrl ? proxy.rawUrl : null,
]);
const isInsert = result.rows[0]?.is_insert;
const sessionId = proxy.password?.match(/session-([A-Z0-9]+)/)?.[1] || '';
const displayName = sessionId ? `session ${sessionId}` : `${proxy.host}:${proxy.port}`;
if (isInsert) {
inserted++;
console.log(`[ImportProxies] Inserted: ${displayName}`);
} else {
console.log(`[ImportProxies] Updated: ${displayName}`);
inserted++; // Count updates too
}
} catch (err: any) {
const sessionId = proxy.password?.match(/session-([A-Z0-9]+)/)?.[1] || '';
const displayName = sessionId ? `session ${sessionId}` : `${proxy.host}:${proxy.port}`;
console.error(`[ImportProxies] Error inserting ${displayName}: ${err.message}`);
skipped++;
}
}
console.log(`\n[ImportProxies] Complete: ${inserted} imported, ${skipped} skipped`);
// Notify any listening workers
try {
await pool.query(`NOTIFY proxy_added, 'bulk import'`);
console.log('[ImportProxies] Sent proxy_added notification to workers');
} catch {
// Ignore notification errors
}
}
async function readFromStdin(): Promise<string[]> {
return new Promise((resolve) => {
const lines: string[] = [];
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
});
rl.on('line', (line) => {
lines.push(line);
});
rl.on('close', () => {
resolve(lines);
});
});
}
async function main() {
const args = process.argv.slice(2);
let lines: string[] = [];
let maxConnections = 1;
let dryRun = false;
// Parse arguments
for (let i = 0; i < args.length; i++) {
if (args[i] === '--file' && args[i + 1]) {
const content = fs.readFileSync(args[i + 1], 'utf-8');
lines.push(...content.split('\n'));
i++;
} else if (args[i] === '--url' && args[i + 1]) {
lines.push(args[i + 1]);
i++;
} else if (args[i] === '--max-connections' && args[i + 1]) {
maxConnections = parseInt(args[i + 1], 10);
i++;
} else if (args[i] === '--dry-run') {
dryRun = true;
} else if (!args[i].startsWith('--')) {
// Treat as URL directly
lines.push(args[i]);
}
}
// If no lines yet, read from stdin
if (lines.length === 0) {
console.log('[ImportProxies] Reading from stdin...');
lines = await readFromStdin();
}
// Parse all lines
const proxies: ParsedProxy[] = [];
for (const line of lines) {
const parsed = parseProxyUrl(line);
if (parsed) {
proxies.push(parsed);
}
}
if (proxies.length === 0) {
console.error('[ImportProxies] No valid proxies found');
console.error('\nUsage:');
console.error(' npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"');
console.error(' npx tsx src/scripts/import-proxies.ts --file proxies.txt');
console.error(' echo "host:port:user:pass" | npx tsx src/scripts/import-proxies.ts');
console.error('\nSupported formats:');
console.error(' http://user:pass@host:port (standard)');
console.error(' http://host:port:user:pass (colon format)');
console.error(' host:port:user:pass (simple)');
process.exit(1);
}
console.log(`[ImportProxies] Parsed ${proxies.length} proxies (max_connections=${maxConnections})`);
await importProxies(proxies, maxConnections, dryRun);
}
main().catch((err) => {
console.error('[ImportProxies] Fatal error:', err);
process.exit(1);
});

View File

@@ -77,6 +77,11 @@ export interface Proxy {
country?: string; country?: string;
countryCode?: string; countryCode?: string;
timezone?: string; timezone?: string;
/**
* Raw proxy URL override. If set, used directly instead of constructing from parts.
* Supports non-standard formats like: http://host:port:user:pass
*/
proxyUrl?: string;
} }
export interface ProxyStats { export interface ProxyStats {
@@ -129,6 +134,10 @@ export class ProxyRotator {
private proxies: Proxy[] = []; private proxies: Proxy[] = [];
private currentIndex: number = 0; private currentIndex: number = 0;
private lastRotation: Date = new Date(); private lastRotation: Date = new Date();
private lastReloadAt: Date = new Date();
// Proxy reload interval - how often to check for proxy changes (default: 60 seconds)
private reloadIntervalMs: number = 60000;
constructor(pool?: Pool) { constructor(pool?: Pool) {
this.pool = pool || null; this.pool = pool || null;
@@ -138,6 +147,13 @@ export class ProxyRotator {
this.pool = pool; this.pool = pool;
} }
/**
* Set the reload interval for periodic proxy checks
*/
setReloadInterval(ms: number): void {
this.reloadIntervalMs = ms;
}
/** /**
* Load proxies from database * Load proxies from database
*/ */
@@ -167,22 +183,76 @@ export class ProxyRotator {
state, state,
country, country,
country_code as "countryCode", country_code as "countryCode",
timezone timezone,
proxy_url as "proxyUrl"
FROM proxies FROM proxies
WHERE active = true WHERE active = true
ORDER BY failure_count ASC, last_tested_at ASC NULLS FIRST ORDER BY failure_count ASC, last_tested_at ASC NULLS FIRST
`); `);
this.proxies = result.rows; this.proxies = result.rows;
this.lastReloadAt = new Date();
const totalCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0); const totalCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections)`); console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections / threads)`);
} catch (error) { } catch (error) {
console.warn(`[ProxyRotator] Could not load proxies: ${error}`); console.warn(`[ProxyRotator] Could not load proxies: ${error}`);
this.proxies = []; this.proxies = [];
} }
} }
/**
* Check if proxy list is stale and needs reload
*/
isStale(): boolean {
const elapsed = Date.now() - this.lastReloadAt.getTime();
return elapsed > this.reloadIntervalMs;
}
/**
* Reload proxies if the cache is stale.
* This ensures workers pick up new proxies or see disabled proxies removed.
* Returns true if proxies were reloaded.
*/
async reloadIfStale(): Promise<boolean> {
if (!this.isStale()) {
return false;
}
const oldCount = this.proxies.length;
const oldCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
const oldIds = new Set(this.proxies.map(p => p.id));
await this.loadProxies();
const newCount = this.proxies.length;
const newCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
const newIds = new Set(this.proxies.map(p => p.id));
// Log changes
const added = this.proxies.filter(p => !oldIds.has(p.id));
const removed = [...oldIds].filter(id => !newIds.has(id));
if (added.length > 0 || removed.length > 0 || oldCapacity !== newCapacity) {
console.log(`[ProxyRotator] Reloaded proxies: ${oldCount}${newCount} proxies, ${oldCapacity}${newCapacity} threads`);
if (added.length > 0) {
console.log(`[ProxyRotator] Added: ${added.map(p => `${p.host}:${p.port} (${p.maxConnections} threads)`).join(', ')}`);
}
if (removed.length > 0) {
console.log(`[ProxyRotator] Removed: ${removed.join(', ')}`);
}
}
return true;
}
/**
* Get time since last reload in seconds
*/
getSecondsSinceReload(): number {
return Math.floor((Date.now() - this.lastReloadAt.getTime()) / 1000);
}
/** /**
* Get next proxy in rotation * Get next proxy in rotation
*/ */
@@ -342,8 +412,24 @@ export class ProxyRotator {
/** /**
* Get proxy URL for HTTP client * Get proxy URL for HTTP client
* If proxy.proxyUrl is set, uses it directly (supports non-standard formats).
* Otherwise constructs standard format: protocol://user:pass@host:port
*/ */
getProxyUrl(proxy: Proxy): string { getProxyUrl(proxy: Proxy): string {
// If proxyUrl is set, check if it needs conversion from non-standard format
if (proxy.proxyUrl) {
// Check if it's in non-standard format: http://host:port:user:pass
const colonFormatMatch = proxy.proxyUrl.match(/^(https?):\/\/([^:]+):(\d+):([^:]+):(.+)$/);
if (colonFormatMatch) {
// Convert to standard format: http://user:pass@host:port
const [, protocol, host, port, username, password] = colonFormatMatch;
return `${protocol}://${encodeURIComponent(username)}:${encodeURIComponent(password)}@${host}:${port}`;
}
// Already in standard format or unknown format - return as-is
return proxy.proxyUrl;
}
// Construct standard format from individual fields
const auth = proxy.username && proxy.password const auth = proxy.username && proxy.password
? `${proxy.username}:${proxy.password}@` ? `${proxy.username}:${proxy.password}@`
: ''; : '';
@@ -584,6 +670,23 @@ export class CrawlRotator {
await this.proxy.loadProxies(); await this.proxy.loadProxies();
} }
/**
* Reload proxy list if stale.
* Workers should call this periodically to pick up proxy changes.
* Returns true if proxies were reloaded.
*/
async reloadIfStale(): Promise<boolean> {
return this.proxy.reloadIfStale();
}
/**
* Set proxy reload interval in milliseconds.
* Default is 60 seconds.
*/
setProxyReloadInterval(ms: number): void {
this.proxy.setReloadInterval(ms);
}
/** /**
* Rotate proxy only (get new IP) * Rotate proxy only (get new IP)
*/ */

View File

@@ -216,228 +216,37 @@ export async function runPuppeteerPreflight(
} }
// ========================================================================= // =========================================================================
// STEP 1b: Visit fingerprint.com demo to verify anti-detect // STEP 2: Preflight complete - proxy verified via ipify.org
// We skip heavy fingerprint.com/amiunique.org tests - just verify proxy works
// The actual Dutchie test happens at task time.
// ========================================================================= // =========================================================================
console.log(`[PuppeteerPreflight] Testing anti-detect at ${FINGERPRINT_DEMO_URL}...`);
try { // If we got an IP from ipify.org, proxy is working
await page.goto(FINGERPRINT_DEMO_URL, { if (result.proxyIp) {
waitUntil: 'networkidle2', result.proxyConnected = true;
timeout: 30000, result.antidetectReady = true; // Assume stealth plugin is working
});
result.proxyConnected = true; // If we got here, proxy is working
// Wait for fingerprint results to load
await page.waitForSelector('[data-test="visitor-id"]', { timeout: 10000 }).catch(() => {});
// Extract fingerprint data from the page
const fingerprintData = await page.evaluate(() => {
// Try to find the IP address displayed on the page
const ipElement = document.querySelector('[data-test="ip-address"]');
const ip = ipElement?.textContent?.trim() || null;
// Try to find bot detection info
const botElement = document.querySelector('[data-test="bot-detected"]');
const botDetected = botElement?.textContent?.toLowerCase().includes('true') || false;
// Try to find visitor ID (proves fingerprinting worked)
const visitorIdElement = document.querySelector('[data-test="visitor-id"]');
const visitorId = visitorIdElement?.textContent?.trim() || null;
// Alternative: look for common UI patterns if data-test attrs not present
let detectedIp = ip;
if (!detectedIp) {
// Look for IP in any element containing IP-like pattern
const allText = document.body.innerText;
const ipMatch = allText.match(/\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b/);
detectedIp = ipMatch ? ipMatch[1] : null;
}
return {
ip: detectedIp,
botDetected,
visitorId,
pageLoaded: !!document.body,
};
});
if (fingerprintData.ip) {
result.proxyIp = fingerprintData.ip;
console.log(`[PuppeteerPreflight] Detected IP: ${fingerprintData.ip}`);
// Verify IP matches expected proxy
if (expectedProxyHost) {
// Check if detected IP contains the proxy host (or is close match)
if (fingerprintData.ip === expectedProxyHost ||
expectedProxyHost.includes(fingerprintData.ip) ||
fingerprintData.ip.includes(expectedProxyHost.split('.').slice(0, 3).join('.'))) {
result.ipVerified = true;
console.log(`[PuppeteerPreflight] IP VERIFIED - matches proxy`);
} else {
console.log(`[PuppeteerPreflight] IP mismatch: expected ${expectedProxyHost}, got ${fingerprintData.ip}`);
// Don't fail - residential proxies often show different egress IPs
}
}
// Note: Timezone already set earlier via ipify.org IP lookup
}
if (fingerprintData.visitorId) {
console.log(`[PuppeteerPreflight] Fingerprint visitor ID: ${fingerprintData.visitorId}`);
}
result.botDetection = {
detected: fingerprintData.botDetected,
};
if (fingerprintData.botDetected) {
console.log(`[PuppeteerPreflight] WARNING: Bot detection triggered!`);
} else {
console.log(`[PuppeteerPreflight] Anti-detect check: NOT detected as bot`);
result.antidetectReady = true;
}
} catch (fpErr: any) {
// Could mean proxy connection failed
console.log(`[PuppeteerPreflight] Fingerprint.com check failed: ${fpErr.message}`);
if (fpErr.message.includes('net::ERR_PROXY') || fpErr.message.includes('ECONNREFUSED')) {
result.error = `Proxy connection failed: ${fpErr.message}`;
return result;
}
// Try fallback: amiunique.org
console.log(`[PuppeteerPreflight] Trying fallback: ${AMIUNIQUE_URL}...`);
try {
await page.goto(AMIUNIQUE_URL, {
waitUntil: 'networkidle2',
timeout: 30000,
});
result.proxyConnected = true;
// Extract IP from amiunique.org page
const amiData = await page.evaluate(() => {
const allText = document.body.innerText;
const ipMatch = allText.match(/\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b/);
return {
ip: ipMatch ? ipMatch[1] : null,
pageLoaded: !!document.body,
};
});
if (amiData.ip) {
result.proxyIp = amiData.ip;
console.log(`[PuppeteerPreflight] Detected IP via amiunique.org: ${amiData.ip}`);
}
result.antidetectReady = true;
console.log(`[PuppeteerPreflight] amiunique.org fallback succeeded`);
} catch (amiErr: any) {
console.log(`[PuppeteerPreflight] amiunique.org fallback also failed: ${amiErr.message}`);
// Continue with Dutchie test anyway
result.proxyConnected = true;
result.antidetectReady = true;
}
} }
// =========================================================================
// STEP 2: Test Dutchie API access (the real test)
// =========================================================================
const embedUrl = `https://dutchie.com/embedded-menu/${TEST_CNAME}?menuType=rec`;
console.log(`[PuppeteerPreflight] Establishing session at ${embedUrl}...`);
await page.goto(embedUrl, {
waitUntil: 'networkidle2',
timeout: 30000,
});
// Make GraphQL request from browser context
const graphqlResult = await page.evaluate(
async (platformId: string, hash: string) => {
try {
const variables = {
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: platformId,
pricingType: 'rec',
Status: 'Active', // CRITICAL: Must be 'Active' per CLAUDE.md
types: [],
useCache: true,
isDefaultSort: true,
sortBy: 'popularSortIdx',
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false,
},
page: 0,
perPage: 10, // Just need a few to prove it works
};
const extensions = {
persistedQuery: {
version: 1,
sha256Hash: hash,
},
};
const qs = new URLSearchParams({
operationName: 'FilteredProducts',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const sessionId = 'preflight-' + Date.now();
const response = await fetch(url, {
method: 'GET',
headers: {
Accept: 'application/json',
'content-type': 'application/json',
'x-dutchie-session': sessionId,
'apollographql-client-name': 'Marketplace (production)',
},
credentials: 'include',
});
if (!response.ok) {
return { error: `HTTP ${response.status}`, products: 0 };
}
const json = await response.json();
if (json.errors) {
return { error: JSON.stringify(json.errors).slice(0, 200), products: 0 };
}
const products = json?.data?.filteredProducts?.products || [];
return { error: null, products: products.length };
} catch (err: any) {
return { error: err.message || 'Unknown error', products: 0 };
}
},
TEST_PLATFORM_ID,
FILTERED_PRODUCTS_HASH
);
result.responseTimeMs = Date.now() - startTime; result.responseTimeMs = Date.now() - startTime;
if (graphqlResult.error) { // If we got here with proxyConnected=true and antidetectReady=true, we're good
result.error = `GraphQL error: ${graphqlResult.error}`; if (result.proxyConnected && result.antidetectReady) {
console.log(`[PuppeteerPreflight] FAILED - ${result.error}`);
} else if (graphqlResult.products === 0) {
result.error = 'GraphQL returned 0 products';
console.log(`[PuppeteerPreflight] FAILED - No products returned`);
} else {
result.passed = true; result.passed = true;
result.productsReturned = graphqlResult.products;
console.log( console.log(
`[PuppeteerPreflight] PASSED - Got ${graphqlResult.products} products in ${result.responseTimeMs}ms` `[PuppeteerPreflight] PASSED - Proxy connected, anti-detect ready (${result.responseTimeMs}ms)`
); );
if (result.proxyIp) { if (result.proxyIp) {
console.log(`[PuppeteerPreflight] Browser IP via proxy: ${result.proxyIp}`); console.log(`[PuppeteerPreflight] Browser IP via proxy: ${result.proxyIp}`);
} }
} else if (result.proxyConnected) {
// Proxy works but anti-detect check failed - still pass (anti-detect is best-effort)
result.passed = true;
result.antidetectReady = true; // Assume ready since proxy works
console.log(
`[PuppeteerPreflight] PASSED - Proxy connected (anti-detect check skipped, ${result.responseTimeMs}ms)`
);
} else {
result.error = result.error || 'Proxy connection failed';
console.log(`[PuppeteerPreflight] FAILED - ${result.error}`);
} }
} catch (err: any) { } catch (err: any) {
result.error = `Browser error: ${err.message || 'Unknown error'}`; result.error = `Browser error: ${err.message || 'Unknown error'}`;

View File

@@ -26,6 +26,12 @@ interface TaskSchedule {
next_run_at: Date | null; next_run_at: Date | null;
state_code: string | null; state_code: string | null;
priority: number; priority: number;
method: 'curl' | 'http' | null;
is_immutable: boolean;
description: string | null;
platform: string | null;
last_task_count: number | null;
last_error: string | null;
} }
class TaskScheduler { class TaskScheduler {
@@ -84,24 +90,22 @@ class TaskScheduler {
/** /**
* Ensure default schedules exist in the database * Ensure default schedules exist in the database
* Per TASK_WORKFLOW_2024-12-10.md: Creates schedules if they don't exist * Per TASK_WORKFLOW_2024-12-10.md: Creates schedules if they don't exist
*
* NOTE: Per-state product_discovery schedules are created by migration 089.
* This only creates core immutable schedules that should exist regardless.
*/ */
private async ensureDefaultSchedules(): Promise<void> { private async ensureDefaultSchedules(): Promise<void> {
// Per TASK_WORKFLOW_2024-12-10.md: Default schedules for task generation // Core schedules - all use HTTP transport for browser-based scraping
// NOTE: payload_fetch replaces direct product_refresh - it chains to product_refresh
const defaults = [ const defaults = [
{
name: 'payload_fetch_all',
role: 'payload_fetch' as TaskRole,
interval_hours: 4,
priority: 0,
description: 'Fetch payloads from Dutchie API for all crawl-enabled stores every 4 hours. Chains to product_refresh.',
},
{ {
name: 'store_discovery_dutchie', name: 'store_discovery_dutchie',
role: 'store_discovery' as TaskRole, role: 'store_discovery' as TaskRole,
interval_hours: 24, interval_hours: 168, // Weekly
priority: 5, priority: 5,
description: 'Discover new Dutchie stores daily', description: 'Discover new Dutchie stores weekly (HTTP transport)',
method: 'http',
is_immutable: true,
platform: 'dutchie',
}, },
{ {
name: 'analytics_refresh', name: 'analytics_refresh',
@@ -109,16 +113,21 @@ class TaskScheduler {
interval_hours: 6, interval_hours: 6,
priority: 0, priority: 0,
description: 'Refresh analytics materialized views every 6 hours', description: 'Refresh analytics materialized views every 6 hours',
method: 'http',
is_immutable: true,
platform: null,
}, },
]; ];
for (const sched of defaults) { for (const sched of defaults) {
try { try {
await pool.query(` await pool.query(`
INSERT INTO task_schedules (name, role, interval_hours, priority, description, enabled, next_run_at) INSERT INTO task_schedules (name, role, interval_hours, priority, description, method, is_immutable, platform, enabled, next_run_at)
VALUES ($1, $2, $3, $4, $5, true, NOW()) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true, NOW())
ON CONFLICT (name) DO NOTHING ON CONFLICT (name) DO UPDATE SET
`, [sched.name, sched.role, sched.interval_hours, sched.priority, sched.description]); method = EXCLUDED.method,
is_immutable = EXCLUDED.is_immutable
`, [sched.name, sched.role, sched.interval_hours, sched.priority, sched.description, sched.method, sched.is_immutable, sched.platform]);
} catch (err: any) { } catch (err: any) {
// Table may not exist yet - will be created by migration // Table may not exist yet - will be created by migration
if (!err.message.includes('does not exist')) { if (!err.message.includes('does not exist')) {
@@ -192,16 +201,27 @@ class TaskScheduler {
/** /**
* Execute a schedule and create tasks * Execute a schedule and create tasks
* Per TASK_WORKFLOW_2024-12-10.md: Different logic per role * Per TASK_WORKFLOW_2024-12-10.md: Different logic per role
*
* TRANSPORT MODES:
* - All schedules now use HTTP transport (Puppeteer/browser)
* - Per-state product_discovery schedules process one state at a time
* - Workers must pass HTTP preflight to claim HTTP tasks
*/ */
private async executeSchedule(schedule: TaskSchedule): Promise<number> { private async executeSchedule(schedule: TaskSchedule): Promise<number> {
switch (schedule.role) { switch (schedule.role) {
case 'product_discovery':
// Per-state product discovery using HTTP transport
return this.generateProductDiscoveryTasks(schedule);
case 'payload_fetch': case 'payload_fetch':
// Per TASK_WORKFLOW_2024-12-10.md: payload_fetch replaces direct product_refresh // DEPRECATED: Legacy payload_fetch redirects to product_discovery
return this.generatePayloadFetchTasks(schedule); console.log(`[TaskScheduler] payload_fetch is deprecated, using product_discovery instead`);
return this.generateProductDiscoveryTasks(schedule);
case 'product_refresh': case 'product_refresh':
// Legacy - kept for manual triggers, but scheduled crawls use payload_fetch // DEPRECATED: Legacy product_refresh redirects to product_discovery
return this.generatePayloadFetchTasks(schedule); console.log(`[TaskScheduler] product_refresh is deprecated, using product_discovery instead`);
return this.generateProductDiscoveryTasks(schedule);
case 'store_discovery': case 'store_discovery':
return this.generateStoreDiscoveryTasks(schedule); return this.generateStoreDiscoveryTasks(schedule);
@@ -216,50 +236,69 @@ class TaskScheduler {
} }
/** /**
* Generate payload_fetch tasks for stores that need crawling * Generate product_discovery tasks for stores in a specific state
* Per TASK_WORKFLOW_2024-12-10.md: payload_fetch hits API, saves to disk, chains to product_refresh * Uses HTTP transport (Puppeteer/browser) for all tasks
*
* Per-state scheduling allows:
* - Different crawl frequencies per state (e.g., AZ=4h, MI=6h)
* - Better rate limit management (one state at a time)
* - Easier debugging and monitoring per state
*/ */
private async generatePayloadFetchTasks(schedule: TaskSchedule): Promise<number> { private async generateProductDiscoveryTasks(schedule: TaskSchedule): Promise<number> {
// Per TASK_WORKFLOW_2024-12-10.md: Find stores needing refresh // state_code is required for per-state schedules
if (!schedule.state_code) {
console.warn(`[TaskScheduler] Schedule ${schedule.name} has no state_code, skipping`);
return 0;
}
// Find stores in this state needing refresh
const result = await pool.query(` const result = await pool.query(`
SELECT d.id SELECT d.id
FROM dispensaries d FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE d.crawl_enabled = true WHERE d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL AND d.platform_dispensary_id IS NOT NULL
-- No pending/running payload_fetch or product_refresh task already AND s.code = $1
-- No pending/running product_discovery task already
AND NOT EXISTS ( AND NOT EXISTS (
SELECT 1 FROM worker_tasks t SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id WHERE t.dispensary_id = d.id
AND t.role IN ('payload_fetch', 'product_refresh') AND t.role = 'product_discovery'
AND t.status IN ('pending', 'claimed', 'running') AND t.status IN ('pending', 'claimed', 'running')
) )
-- Never fetched OR last fetch > interval ago -- Never fetched OR last fetch > interval ago
AND ( AND (
d.last_fetch_at IS NULL d.last_fetch_at IS NULL
OR d.last_fetch_at < NOW() - ($1 || ' hours')::interval OR d.last_fetch_at < NOW() - ($2 || ' hours')::interval
) )
${schedule.state_code ? 'AND d.state_id = (SELECT id FROM states WHERE code = $2)' : ''} ORDER BY d.last_fetch_at NULLS FIRST, d.id
`, schedule.state_code ? [schedule.interval_hours, schedule.state_code] : [schedule.interval_hours]); `, [schedule.state_code, schedule.interval_hours]);
const dispensaryIds = result.rows.map((r: { id: number }) => r.id); const dispensaryIds = result.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length === 0) { if (dispensaryIds.length === 0) {
console.log(`[TaskScheduler] No stores in ${schedule.state_code} need refresh`);
return 0; return 0;
} }
// Per TASK_WORKFLOW_2024-12-10.md: Create payload_fetch tasks (they chain to product_refresh) console.log(`[TaskScheduler] Creating ${dispensaryIds.length} product_discovery tasks for ${schedule.state_code}`);
const tasks = dispensaryIds.map((id: number) => ({
role: 'payload_fetch' as TaskRole,
dispensary_id: id,
priority: schedule.priority,
}));
return taskService.createTasks(tasks); // Create product_discovery tasks with HTTP transport
// Stagger by 15 seconds to prevent overwhelming proxies
const { created } = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery',
15, // 15 seconds apart
schedule.platform || 'dutchie',
'http' // Force HTTP transport
);
return created;
} }
/** /**
* Generate store_discovery tasks * Generate store_discovery tasks
* Per TASK_WORKFLOW_2024-12-10.md: One task per platform * Uses HTTP transport (Puppeteer/browser) for browser-based discovery
*/ */
private async generateStoreDiscoveryTasks(schedule: TaskSchedule): Promise<number> { private async generateStoreDiscoveryTasks(schedule: TaskSchedule): Promise<number> {
// Check if discovery task already pending // Check if discovery task already pending
@@ -276,8 +315,9 @@ class TaskScheduler {
await taskService.createTask({ await taskService.createTask({
role: 'store_discovery', role: 'store_discovery',
platform: 'dutchie', platform: schedule.platform || 'dutchie',
priority: schedule.priority, priority: schedule.priority,
method: 'http', // Force HTTP transport for browser-based discovery
}); });
return 1; return 1;
@@ -310,11 +350,39 @@ class TaskScheduler {
/** /**
* Get all schedules for dashboard display * Get all schedules for dashboard display
* Returns schedules with full metadata including immutability flag
*/ */
async getSchedules(): Promise<TaskSchedule[]> { async getSchedules(): Promise<TaskSchedule[]> {
try { try {
const result = await pool.query(` const result = await pool.query(`
SELECT * FROM task_schedules ORDER BY name SELECT
id,
name,
role,
enabled,
interval_hours,
last_run_at,
next_run_at,
state_code,
priority,
method,
COALESCE(is_immutable, false) as is_immutable,
description,
platform,
last_task_count,
last_error,
created_at,
updated_at
FROM task_schedules
ORDER BY
CASE role
WHEN 'store_discovery' THEN 1
WHEN 'product_discovery' THEN 2
WHEN 'analytics_refresh' THEN 3
ELSE 4
END,
state_code NULLS FIRST,
name
`); `);
return result.rows as TaskSchedule[]; return result.rows as TaskSchedule[];
} catch { } catch {
@@ -322,8 +390,24 @@ class TaskScheduler {
} }
} }
/**
* Get a single schedule by ID
*/
async getSchedule(id: number): Promise<TaskSchedule | null> {
try {
const result = await pool.query(`
SELECT * FROM task_schedules WHERE id = $1
`, [id]);
return result.rows[0] as TaskSchedule || null;
} catch {
return null;
}
}
/** /**
* Update a schedule * Update a schedule
* Allows updating: enabled, interval_hours, priority
* Does NOT allow updating: name, role, state_code, is_immutable
*/ */
async updateSchedule(id: number, updates: Partial<TaskSchedule>): Promise<void> { async updateSchedule(id: number, updates: Partial<TaskSchedule>): Promise<void> {
const setClauses: string[] = []; const setClauses: string[] = [];
@@ -355,6 +439,33 @@ class TaskScheduler {
`, values); `, values);
} }
/**
* Delete a schedule (only if not immutable)
* Returns true if deleted, false if immutable
*/
async deleteSchedule(id: number): Promise<{ deleted: boolean; reason?: string }> {
// Check if schedule is immutable
const result = await pool.query(`
SELECT name, is_immutable FROM task_schedules WHERE id = $1
`, [id]);
if (result.rows.length === 0) {
return { deleted: false, reason: 'Schedule not found' };
}
const schedule = result.rows[0];
if (schedule.is_immutable) {
return {
deleted: false,
reason: `Schedule "${schedule.name}" is immutable and cannot be deleted. You can disable it instead.`
};
}
await pool.query(`DELETE FROM task_schedules WHERE id = $1`, [id]);
return { deleted: true };
}
/** /**
* Trigger a schedule to run immediately * Trigger a schedule to run immediately
*/ */
@@ -369,6 +480,46 @@ class TaskScheduler {
return this.executeSchedule(result.rows[0] as TaskSchedule); return this.executeSchedule(result.rows[0] as TaskSchedule);
} }
/**
* Get schedule statistics for dashboard
*/
async getScheduleStats(): Promise<{
total: number;
enabled: number;
byRole: Record<string, number>;
byState: Record<string, number>;
}> {
try {
const result = await pool.query(`
SELECT
COUNT(*)::int as total,
SUM(CASE WHEN enabled THEN 1 ELSE 0 END)::int as enabled_count,
role,
state_code
FROM task_schedules
GROUP BY role, state_code
`);
let total = 0;
let enabled = 0;
const byRole: Record<string, number> = {};
const byState: Record<string, number> = {};
for (const row of result.rows) {
total += row.total;
enabled += row.enabled_count;
byRole[row.role] = (byRole[row.role] || 0) + row.total;
if (row.state_code) {
byState[row.state_code] = (byState[row.state_code] || 0) + row.total;
}
}
return { total, enabled, byRole, byState };
} catch {
return { total: 0, enabled: 0, byRole: {}, byState: {} };
}
}
} }
// Per TASK_WORKFLOW_2024-12-10.md: Singleton instance // Per TASK_WORKFLOW_2024-12-10.md: Singleton instance

View File

@@ -2,11 +2,18 @@
* Task Handlers Index * Task Handlers Index
* *
* Exports all task handlers for the task worker. * Exports all task handlers for the task worker.
*
* Product Discovery:
* - handleProductDiscoveryCurl: curl/axios based (for curl transport)
* - handleProductDiscoveryHttp: Puppeteer browser-based (for http transport)
*/ */
export { handleProductDiscovery } from './product-discovery'; export { handleProductDiscovery as handleProductDiscoveryCurl } from './product-discovery-curl';
export { handleProductDiscoveryHttp } from './product-discovery-http';
export { handlePayloadFetch as handlePayloadFetchCurl } from './payload-fetch-curl';
export { handleProductRefresh } from './product-refresh'; export { handleProductRefresh } from './product-refresh';
export { handleStoreDiscovery } from './store-discovery'; export { handleStoreDiscovery } from './store-discovery';
export { handleStoreDiscoveryHttp } from './store-discovery-http';
export { handleEntryPointDiscovery } from './entry-point-discovery'; export { handleEntryPointDiscovery } from './entry-point-discovery';
export { handleAnalyticsRefresh } from './analytics-refresh'; export { handleAnalyticsRefresh } from './analytics-refresh';
export { handleWhoami } from './whoami'; export { handleWhoami } from './whoami';

View File

@@ -13,7 +13,7 @@
*/ */
import { TaskContext, TaskResult } from '../task-worker'; import { TaskContext, TaskResult } from '../task-worker';
import { handlePayloadFetch } from './payload-fetch'; import { handlePayloadFetch } from './payload-fetch-curl';
export async function handleProductDiscovery(ctx: TaskContext): Promise<TaskResult> { export async function handleProductDiscovery(ctx: TaskContext): Promise<TaskResult> {
const { task } = ctx; const { task } = ctx;

View File

@@ -0,0 +1,363 @@
/**
* Product Discovery HTTP Handler (Browser-based)
*
* Uses Puppeteer + StealthPlugin to fetch products via browser context.
* Based on test-intercept.js pattern from ORGANIC_SCRAPING_GUIDE.md.
*
* This handler:
* 1. Loads dispensary info
* 2. Launches headless browser with proxy (if provided)
* 3. Establishes session by visiting embedded menu
* 4. Fetches ALL products via GraphQL from browser context
* 5. Saves raw payload to filesystem (gzipped)
* 6. Records metadata in raw_crawl_payloads table
* 7. Queues product_refresh task to process the payload
*
* Why browser-based:
* - Works with session-based residential proxies (Evomi)
* - Lower detection risk than curl/axios
* - Real Chrome TLS fingerprint
*/
import { TaskContext, TaskResult } from '../task-worker';
import { saveRawPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
// GraphQL hash for FilteredProducts query - MUST match CLAUDE.md
const FILTERED_PRODUCTS_HASH = 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0';
export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
return { success: false, error: 'No dispensary_id specified for product_discovery task' };
}
let browser: any = null;
try {
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
FROM dispensaries
WHERE id = $1 AND crawl_enabled = true
`, [dispensaryId]);
if (dispResult.rows.length === 0) {
return { success: false, error: `Dispensary ${dispensaryId} not found or not crawl_enabled` };
}
const dispensary = dispResult.rows[0];
const platformId = dispensary.platform_dispensary_id;
if (!platformId) {
return { success: false, error: `Dispensary ${dispensaryId} has no platform_dispensary_id` };
}
// Extract cName from menu_url
const cNameMatch = dispensary.menu_url?.match(/\/(?:embedded-menu|dispensary)\/([^/?]+)/);
const cName = cNameMatch ? cNameMatch[1] : 'dispensary';
console.log(`[ProductDiscoveryHTTP] Starting for ${dispensary.name} (ID: ${dispensaryId})`);
console.log(`[ProductDiscoveryHTTP] Platform ID: ${platformId}, cName: ${cName}`);
await ctx.heartbeat();
// ============================================================
// STEP 2: Setup Puppeteer with proxy
// ============================================================
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
// Get proxy from CrawlRotator if available
let proxyUrl: string | null = null;
if (crawlRotator) {
const currentProxy = crawlRotator.proxy.getCurrent();
if (currentProxy) {
proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy);
console.log(`[ProductDiscoveryHTTP] Using proxy: ${currentProxy.host}:${currentProxy.port}`);
}
}
// Build browser args
const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox'];
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`);
}
browser = await puppeteer.launch({
headless: 'new',
args: browserArgs,
});
const page = await browser.newPage();
// Setup proxy auth if needed
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
if (proxyUrlParsed.username && proxyUrlParsed.password) {
await page.authenticate({
username: decodeURIComponent(proxyUrlParsed.username),
password: decodeURIComponent(proxyUrlParsed.password),
});
}
}
await ctx.heartbeat();
// ============================================================
// STEP 3: Establish session by visiting embedded menu
// ============================================================
const embedUrl = `https://dutchie.com/embedded-menu/${cName}?menuType=rec`;
console.log(`[ProductDiscoveryHTTP] Establishing session at ${embedUrl}...`);
await page.goto(embedUrl, {
waitUntil: 'networkidle2',
timeout: 60000,
});
// ============================================================
// STEP 3b: Detect and dismiss age gate modal
// ============================================================
try {
// Wait a bit for age gate to appear
await page.waitForTimeout(1500);
// Look for common age gate selectors
const ageGateSelectors = [
'button[data-testid="age-gate-submit"]',
'button:has-text("Yes")',
'button:has-text("I am 21")',
'button:has-text("Enter")',
'[class*="age-gate"] button',
'[class*="AgeGate"] button',
'[data-test="age-gate-button"]',
];
for (const selector of ageGateSelectors) {
try {
const button = await page.$(selector);
if (button) {
await button.click();
console.log(`[ProductDiscoveryHTTP] Age gate dismissed via: ${selector}`);
await page.waitForTimeout(1000); // Wait for modal to close
break;
}
} catch {
// Selector not found, try next
}
}
// Also try evaluating in page context for button with specific text
await page.evaluate(() => {
const buttons = Array.from(document.querySelectorAll('button'));
for (const btn of buttons) {
const text = btn.textContent?.toLowerCase() || '';
if (text.includes('yes') || text.includes('enter') || text.includes('21')) {
(btn as HTMLButtonElement).click();
return true;
}
}
return false;
});
} catch (ageGateErr) {
// Age gate might not be present, continue
console.log(`[ProductDiscoveryHTTP] No age gate detected or already dismissed`);
}
console.log(`[ProductDiscoveryHTTP] Session established, fetching products...`);
await ctx.heartbeat();
// ============================================================
// STEP 4: Fetch ALL products via GraphQL from browser context
// ============================================================
const result = await page.evaluate(async (platformId: string, graphqlHash: string) => {
const allProducts: any[] = [];
const logs: string[] = [];
let pageNum = 0;
const perPage = 100;
let totalCount = 0;
const sessionId = 'browser-session-' + Date.now();
try {
while (pageNum < 30) { // Max 30 pages = 3000 products
const variables = {
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: platformId,
pricingType: 'rec',
Status: 'Active', // CRITICAL: Must be 'Active', not null
types: [],
useCache: true,
isDefaultSort: true,
sortBy: 'popularSortIdx',
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false,
},
page: pageNum,
perPage: perPage,
};
const extensions = {
persistedQuery: {
version: 1,
sha256Hash: graphqlHash,
},
};
// Build GET URL like the browser does
const qs = new URLSearchParams({
operationName: 'FilteredProducts',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
'x-dutchie-session': sessionId,
'apollographql-client-name': 'Marketplace (production)',
},
credentials: 'include',
});
logs.push(`Page ${pageNum}: HTTP ${response.status}`);
if (!response.ok) {
const text = await response.text();
logs.push(`HTTP error: ${response.status} - ${text.slice(0, 200)}`);
break;
}
const json = await response.json();
if (json.errors) {
logs.push(`GraphQL error: ${JSON.stringify(json.errors).slice(0, 200)}`);
break;
}
const data = json?.data?.filteredProducts;
if (!data || !data.products) {
logs.push('No products in response');
break;
}
const products = data.products;
allProducts.push(...products);
if (pageNum === 0) {
totalCount = data.queryInfo?.totalCount || 0;
logs.push(`Total reported: ${totalCount}`);
}
logs.push(`Got ${products.length} products (total: ${allProducts.length}/${totalCount})`);
if (allProducts.length >= totalCount || products.length < perPage) {
break;
}
pageNum++;
// Small delay between pages to be polite
await new Promise(r => setTimeout(r, 200));
}
} catch (err: any) {
logs.push(`Error: ${err.message}`);
}
return { products: allProducts, totalCount, logs };
}, platformId, FILTERED_PRODUCTS_HASH);
// Print logs from browser context
result.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
console.log(`[ProductDiscoveryHTTP] Fetched ${result.products.length} products (API reported ${result.totalCount})`);
await browser.close();
browser = null;
if (result.products.length === 0) {
return {
success: false,
error: 'No products returned from GraphQL',
productsProcessed: 0,
};
}
await ctx.heartbeat();
// ============================================================
// STEP 5: Save raw payload to filesystem
// ============================================================
const rawPayload = {
dispensaryId,
platformId,
cName,
fetchedAt: new Date().toISOString(),
productCount: result.products.length,
products: result.products,
};
const payloadResult = await saveRawPayload(
pool,
dispensaryId,
rawPayload,
null, // crawl_run_id - not using crawl_runs in new system
result.products.length
);
console.log(`[ProductDiscoveryHTTP] Saved payload #${payloadResult.id} (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`);
// ============================================================
// STEP 6: Update dispensary last_fetch_at
// ============================================================
await pool.query(`
UPDATE dispensaries
SET last_fetch_at = NOW()
WHERE id = $1
`, [dispensaryId]);
// ============================================================
// STEP 7: Queue product_refresh task to process the payload
// ============================================================
await taskService.createTask({
role: 'product_refresh',
dispensary_id: dispensaryId,
priority: task.priority || 0,
payload: { payload_id: payloadResult.id },
});
console.log(`[ProductDiscoveryHTTP] Queued product_refresh task for payload #${payloadResult.id}`);
return {
success: true,
payloadId: payloadResult.id,
productCount: result.products.length,
sizeBytes: payloadResult.sizeBytes,
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[ProductDiscoveryHTTP] Error for dispensary ${dispensaryId}:`, errorMessage);
return {
success: false,
error: errorMessage,
};
} finally {
if (browser) {
await browser.close().catch(() => {});
}
}
}

View File

@@ -27,6 +27,7 @@ import {
downloadProductImages, downloadProductImages,
} from '../../hydration/canonical-upsert'; } from '../../hydration/canonical-upsert';
import { loadRawPayloadById, getLatestPayload } from '../../utils/payload-storage'; import { loadRawPayloadById, getLatestPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
const normalizer = new DutchieNormalizer(); const normalizer = new DutchieNormalizer();
@@ -86,7 +87,37 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// Load latest payload for this dispensary // Load latest payload for this dispensary
const result = await getLatestPayload(pool, dispensaryId); const result = await getLatestPayload(pool, dispensaryId);
if (!result) { if (!result) {
return { success: false, error: `No payload found for dispensary ${dispensaryId}` }; // No payload exists - queue upstream task to fetch products
console.log(`[ProductRefresh] No payload found for dispensary ${dispensaryId} - queuing upstream task`);
if (dispensary.platform_dispensary_id) {
// Has platform ID - can go straight to product_discovery
console.log(`[ProductRefresh] Dispensary has platform_dispensary_id - queuing product_discovery (http)`);
await taskService.createTask({
role: 'product_discovery',
dispensary_id: dispensaryId,
priority: task.priority || 0,
method: 'http', // Use browser-based handler for session proxies
});
return {
success: true,
queued: 'product_discovery',
reason: 'No payload exists - queued product_discovery to fetch initial data',
};
} else {
// No platform ID - need entry_point_discovery first
console.log(`[ProductRefresh] Dispensary missing platform_dispensary_id - queuing entry_point_discovery`);
await taskService.createTask({
role: 'entry_point_discovery',
dispensary_id: dispensaryId,
priority: task.priority || 0,
});
return {
success: true,
queued: 'entry_point_discovery',
reason: 'No payload and no platform_dispensary_id - queued entry_point_discovery to resolve ID',
};
}
} }
payloadData = result.payload; payloadData = result.payload;
payloadId = result.metadata.id; payloadId = result.metadata.id;

View File

@@ -0,0 +1,480 @@
/**
* Store Discovery HTTP Handler (Browser-based)
*
* Uses Puppeteer + StealthPlugin to discover stores via browser context.
* Based on product-discovery-http.ts pattern.
*
* This handler:
* 1. Launches headless browser with proxy (if provided)
* 2. Establishes session by visiting Dutchie dispensaries page
* 3. Fetches cities for each state via getAllCitiesByState GraphQL
* 4. Fetches stores for each city via ConsumerDispensaries GraphQL
* 5. Upserts to dutchie_discovery_locations
* 6. Auto-promotes valid locations to dispensaries table
*
* Why browser-based:
* - Works with session-based residential proxies (Evomi)
* - Lower detection risk than curl/axios
* - Real Chrome TLS fingerprint
*/
import { TaskContext, TaskResult } from '../task-worker';
import { upsertLocation } from '../../discovery/location-discovery';
import { promoteDiscoveredLocations } from '../../discovery/promotion';
import { saveDiscoveryPayload } from '../../utils/payload-storage';
// GraphQL hashes - MUST match CLAUDE.md / dutchie/client.ts
const GET_ALL_CITIES_HASH = 'ae547a0466ace5a48f91e55bf6699eacd87e3a42841560f0c0eabed5a0a920e6';
const CONSUMER_DISPENSARIES_HASH = '0a5bfa6ca1d64ae47bcccb7c8077c87147cbc4e6982c17ceec97a2a4948b311b';
interface StateWithCities {
name: string;
country: string;
cities: string[];
}
interface DiscoveredLocation {
id: string;
name: string;
slug: string;
cName?: string;
address?: string;
city?: string;
state?: string;
zip?: string;
latitude?: number;
longitude?: number;
offerPickup?: boolean;
offerDelivery?: boolean;
isRecreational?: boolean;
isMedical?: boolean;
phone?: string;
email?: string;
website?: string;
description?: string;
logoImage?: string;
bannerImage?: string;
chainSlug?: string;
enterpriseId?: string;
retailType?: string;
status?: string;
timezone?: string;
location?: {
ln1?: string;
ln2?: string;
city?: string;
state?: string;
zipcode?: string;
country?: string;
geometry?: { coordinates?: [number, number] };
};
}
export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator } = ctx;
const platform = task.platform || 'dutchie';
let browser: any = null;
try {
console.log(`[StoreDiscoveryHTTP] Starting discovery for platform: ${platform}`);
// ============================================================
// STEP 1: Setup Puppeteer with proxy
// ============================================================
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
// Get proxy from CrawlRotator if available
let proxyUrl: string | null = null;
if (crawlRotator) {
const currentProxy = crawlRotator.proxy.getCurrent();
if (currentProxy) {
proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy);
console.log(`[StoreDiscoveryHTTP] Using proxy: ${currentProxy.host}:${currentProxy.port}`);
}
}
// Build browser args
const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox'];
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`);
}
browser = await puppeteer.launch({
headless: 'new',
args: browserArgs,
});
const page = await browser.newPage();
// Setup proxy auth if needed
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
if (proxyUrlParsed.username && proxyUrlParsed.password) {
await page.authenticate({
username: decodeURIComponent(proxyUrlParsed.username),
password: decodeURIComponent(proxyUrlParsed.password),
});
}
}
await ctx.heartbeat();
// ============================================================
// STEP 2: Establish session by visiting dispensaries page
// ============================================================
const sessionUrl = 'https://dutchie.com/dispensaries';
console.log(`[StoreDiscoveryHTTP] Establishing session at ${sessionUrl}...`);
await page.goto(sessionUrl, {
waitUntil: 'networkidle2',
timeout: 60000,
});
// Handle potential age gate
try {
await page.waitForTimeout(1500);
await page.evaluate(() => {
const buttons = Array.from(document.querySelectorAll('button'));
for (const btn of buttons) {
const text = btn.textContent?.toLowerCase() || '';
if (text.includes('yes') || text.includes('enter') || text.includes('21')) {
(btn as HTMLButtonElement).click();
return true;
}
}
return false;
});
} catch {
// Age gate might not be present
}
console.log(`[StoreDiscoveryHTTP] Session established`);
await ctx.heartbeat();
// ============================================================
// STEP 3: Get states to discover from database
// ============================================================
const statesResult = await pool.query(`
SELECT code FROM states WHERE is_active = true ORDER BY code
`);
const stateCodesToDiscover = statesResult.rows.map((r: { code: string }) => r.code);
if (stateCodesToDiscover.length === 0) {
await browser.close();
return { success: true, storesDiscovered: 0, newStoreIds: [], message: 'No active states to discover' };
}
console.log(`[StoreDiscoveryHTTP] Will discover stores in ${stateCodesToDiscover.length} states`);
// ============================================================
// STEP 4: Fetch cities for each state via GraphQL
// ============================================================
const statesWithCities = await page.evaluate(async (hash: string) => {
const logs: string[] = [];
try {
const extensions = {
persistedQuery: { version: 1, sha256Hash: hash },
};
const qs = new URLSearchParams({
operationName: 'getAllCitiesByState',
variables: JSON.stringify({}),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
},
credentials: 'include',
});
logs.push(`getAllCitiesByState: HTTP ${response.status}`);
if (!response.ok) {
return { states: [], logs };
}
const json = await response.json();
const statesData = json?.data?.statesWithDispensaries || [];
const states: StateWithCities[] = [];
for (const state of statesData) {
if (state && state.name) {
const cities = Array.isArray(state.cities)
? state.cities.filter((c: string | null) => c !== null)
: [];
states.push({
name: state.name,
country: state.country || 'US',
cities,
});
}
}
logs.push(`Found ${states.length} states with cities`);
return { states, logs };
} catch (err: any) {
logs.push(`Error: ${err.message}`);
return { states: [], logs };
}
}, GET_ALL_CITIES_HASH);
statesWithCities.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
if (statesWithCities.states.length === 0) {
await browser.close();
return { success: false, error: 'Failed to fetch states with cities' };
}
await ctx.heartbeat();
// ============================================================
// STEP 5: For each active state, fetch stores for each city
// ============================================================
let totalDiscovered = 0;
let totalUpserted = 0;
const allNewStoreIds: number[] = [];
for (const stateCode of stateCodesToDiscover) {
const stateData = statesWithCities.states.find(
(s: StateWithCities) => s.name.toUpperCase() === stateCode.toUpperCase()
);
if (!stateData || stateData.cities.length === 0) {
console.log(`[StoreDiscoveryHTTP] No cities found for ${stateCode}, skipping`);
continue;
}
console.log(`[StoreDiscoveryHTTP] Discovering ${stateData.cities.length} cities in ${stateCode}...`);
await ctx.heartbeat();
// Accumulate raw store data for this state
const stateRawStores: any[] = [];
const stateCityData: { city: string; stores: any[] }[] = [];
// Fetch stores for each city in this state
for (const city of stateData.cities) {
try {
const cityResult = await page.evaluate(async (
cityName: string,
stateCodeParam: string,
hash: string
) => {
const logs: string[] = [];
const allDispensaries: any[] = [];
let page = 0;
const perPage = 200;
try {
while (page < 5) { // Max 5 pages per city
const variables = {
dispensaryFilter: {
activeOnly: true,
city: cityName,
state: stateCodeParam,
},
page,
perPage,
};
const extensions = {
persistedQuery: { version: 1, sha256Hash: hash },
};
const qs = new URLSearchParams({
operationName: 'ConsumerDispensaries',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
},
credentials: 'include',
});
if (!response.ok) {
logs.push(`${cityName}: HTTP ${response.status}`);
break;
}
const json = await response.json();
const dispensaries = json?.data?.filteredDispensaries || [];
if (dispensaries.length === 0) {
break;
}
// Filter to ensure correct state
const stateFiltered = dispensaries.filter((d: any) =>
d.location?.state?.toUpperCase() === stateCodeParam.toUpperCase()
);
allDispensaries.push(...stateFiltered);
if (dispensaries.length < perPage) {
break;
}
page++;
// Small delay between pages
await new Promise(r => setTimeout(r, 100));
}
logs.push(`${cityName}: ${allDispensaries.length} stores`);
} catch (err: any) {
logs.push(`${cityName}: Error - ${err.message}`);
}
return { dispensaries: allDispensaries, logs };
}, city, stateCode, CONSUMER_DISPENSARIES_HASH);
cityResult.logs.forEach((log: string) => console.log(`[Browser] ${log}`));
// Accumulate raw store data
stateRawStores.push(...cityResult.dispensaries);
stateCityData.push({ city, stores: cityResult.dispensaries });
// Upsert each discovered location
for (const disp of cityResult.dispensaries) {
try {
const location = normalizeLocation(disp);
if (!location.id) {
continue; // Skip locations without platform ID
}
const result = await upsertLocation(pool, location as any, null);
if (result) {
totalUpserted++;
if (result.isNew) {
totalDiscovered++;
}
}
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Upsert error for ${disp.name}:`, err.message);
}
}
// Small delay between cities to avoid rate limiting
await new Promise(r => setTimeout(r, 300));
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Error fetching ${city}, ${stateCode}:`, err.message);
}
}
// Heartbeat after each state
await ctx.heartbeat();
// ============================================================
// STEP 5b: Save raw store payload for this state
// ============================================================
if (stateRawStores.length > 0) {
try {
const rawPayload = {
stateCode,
platform,
fetchedAt: new Date().toISOString(),
storeCount: stateRawStores.length,
citiesProcessed: stateCityData.length,
cities: stateCityData,
stores: stateRawStores,
};
const payloadResult = await saveDiscoveryPayload(pool, stateCode, rawPayload, stateRawStores.length);
console.log(`[StoreDiscoveryHTTP] Saved raw payload for ${stateCode}: ${stateRawStores.length} stores (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`);
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Failed to save payload for ${stateCode}:`, err.message);
}
}
// Auto-promote valid locations for this state
try {
const promotionResult = await promoteDiscoveredLocations(stateCode);
const promoted = promotionResult.created + promotionResult.updated;
if (promoted > 0) {
console.log(`[StoreDiscoveryHTTP] Promoted ${promoted} locations in ${stateCode} (${promotionResult.created} new, ${promotionResult.updated} updated)`);
// newDispensaryIds is returned but not in typed interface
const newIds = (promotionResult as any).newDispensaryIds || [];
allNewStoreIds.push(...newIds);
}
} catch (err: any) {
console.error(`[StoreDiscoveryHTTP] Promotion error for ${stateCode}:`, err.message);
}
}
await browser.close();
browser = null;
console.log(`[StoreDiscoveryHTTP] Complete: ${totalDiscovered} new, ${totalUpserted} upserted, ${allNewStoreIds.length} promoted`);
return {
success: true,
storesDiscovered: totalDiscovered,
storesUpserted: totalUpserted,
statesProcessed: stateCodesToDiscover.length,
newStoreIds: allNewStoreIds,
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[StoreDiscoveryHTTP] Error:`, errorMessage);
return {
success: false,
error: errorMessage,
newStoreIds: [],
};
} finally {
if (browser) {
await browser.close().catch(() => {});
}
}
}
/**
* Normalize a raw dispensary response to our DiscoveredLocation format
*/
function normalizeLocation(raw: any): DiscoveredLocation {
const loc = raw.location || {};
const coords = loc.geometry?.coordinates || [];
return {
id: raw.id || raw._id || '',
name: raw.name || '',
slug: raw.slug || raw.cName || '',
cName: raw.cName || raw.slug || '',
address: raw.address || loc.ln1 || '',
city: raw.city || loc.city || '',
state: raw.state || loc.state || '',
zip: raw.zip || loc.zipcode || loc.zip || '',
latitude: coords[1] || raw.latitude,
longitude: coords[0] || raw.longitude,
timezone: raw.timezone || '',
offerPickup: raw.offerPickup ?? raw.storeSettings?.offerPickup ?? true,
offerDelivery: raw.offerDelivery ?? raw.storeSettings?.offerDelivery ?? false,
isRecreational: raw.isRecreational ?? raw.recDispensary ?? true,
isMedical: raw.isMedical ?? raw.medicalDispensary ?? true,
phone: raw.phone || '',
email: raw.email || '',
website: raw.embedBackUrl || '',
description: raw.description || '',
logoImage: raw.logoImage || '',
bannerImage: raw.bannerImage || '',
chainSlug: raw.chain || '',
enterpriseId: raw.retailer?.enterpriseId || '',
retailType: raw.retailType || '',
status: raw.status || '',
location: loc,
};
}

View File

@@ -17,7 +17,8 @@ export {
export { TaskWorker, TaskContext, TaskResult } from './task-worker'; export { TaskWorker, TaskContext, TaskResult } from './task-worker';
export { export {
handleProductDiscovery, handleProductDiscoveryCurl,
handleProductDiscoveryHttp,
handleProductRefresh, handleProductRefresh,
handleStoreDiscovery, handleStoreDiscovery,
handleEntryPointDiscovery, handleEntryPointDiscovery,

View File

@@ -6,12 +6,15 @@
* task-service.ts and routes/tasks.ts. * task-service.ts and routes/tasks.ts.
* *
* State is in-memory and resets on server restart. * State is in-memory and resets on server restart.
* By default, the pool is PAUSED (closed) - admin must explicitly start it. * By default, the pool is OPEN - workers start claiming tasks immediately.
* This prevents workers from immediately grabbing tasks on deploy before * Admin can pause via API endpoint if needed.
* the system is ready. *
* Note: Each process (backend, worker) has its own copy of this state.
* The /pool/pause and /pool/resume endpoints only affect the backend process.
* Workers always start with pool open.
*/ */
let taskPoolPaused = true; let taskPoolPaused = false;
export function isTaskPoolPaused(): boolean { export function isTaskPoolPaused(): boolean {
return taskPoolPaused; return taskPoolPaused;

View File

@@ -73,6 +73,7 @@ export interface CreateTaskParams {
dispensary_id?: number; dispensary_id?: number;
platform?: string; platform?: string;
priority?: number; priority?: number;
method?: 'curl' | 'http'; // Transport method: curl=axios/proxy, http=Puppeteer/browser
scheduled_for?: Date; scheduled_for?: Date;
payload?: Record<string, unknown>; // Per TASK_WORKFLOW_2024-12-10.md: For task chaining data payload?: Record<string, unknown>; // Per TASK_WORKFLOW_2024-12-10.md: For task chaining data
} }
@@ -106,14 +107,15 @@ class TaskService {
*/ */
async createTask(params: CreateTaskParams): Promise<WorkerTask> { async createTask(params: CreateTaskParams): Promise<WorkerTask> {
const result = await pool.query( const result = await pool.query(
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for, payload) `INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for, payload)
VALUES ($1, $2, $3, $4, $5, $6) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *`, RETURNING *`,
[ [
params.role, params.role,
params.dispensary_id ?? null, params.dispensary_id ?? null,
params.platform ?? null, params.platform ?? null,
params.priority ?? 0, params.priority ?? 0,
params.method ?? null, // null = any worker can pick up, 'http' = http-capable workers only, 'curl' = curl workers only
params.scheduled_for ?? null, params.scheduled_for ?? null,
params.payload ? JSON.stringify(params.payload) : null, params.payload ? JSON.stringify(params.payload) : null,
] ]
@@ -128,8 +130,8 @@ class TaskService {
if (tasks.length === 0) return 0; if (tasks.length === 0) return 0;
const values = tasks.map((t, i) => { const values = tasks.map((t, i) => {
const base = i * 5; const base = i * 6;
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5})`; return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6})`;
}); });
const params = tasks.flatMap((t) => [ const params = tasks.flatMap((t) => [
@@ -137,11 +139,12 @@ class TaskService {
t.dispensary_id ?? null, t.dispensary_id ?? null,
t.platform ?? null, t.platform ?? null,
t.priority ?? 0, t.priority ?? 0,
t.method ?? null,
t.scheduled_for ?? null, t.scheduled_for ?? null,
]); ]);
const result = await pool.query( const result = await pool.query(
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for) `INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for)
VALUES ${values.join(', ')} VALUES ${values.join(', ')}
ON CONFLICT DO NOTHING`, ON CONFLICT DO NOTHING`,
params params
@@ -473,15 +476,17 @@ class TaskService {
case 'store_discovery': { case 'store_discovery': {
// Per TASK_WORKFLOW_2024-12-10.md: New stores discovered -> create product_discovery tasks // Per TASK_WORKFLOW_2024-12-10.md: New stores discovered -> create product_discovery tasks
// Skip entry_point_discovery since platform_dispensary_id is set during promotion // Skip entry_point_discovery since platform_dispensary_id is set during promotion
// All product_discovery tasks use HTTP transport (Puppeteer/browser)
const newStoreIds = (completedTask.result as { newStoreIds?: number[] })?.newStoreIds; const newStoreIds = (completedTask.result as { newStoreIds?: number[] })?.newStoreIds;
if (newStoreIds && newStoreIds.length > 0) { if (newStoreIds && newStoreIds.length > 0) {
console.log(`[TaskService] Chaining ${newStoreIds.length} product_discovery tasks for new stores`); console.log(`[TaskService] Chaining ${newStoreIds.length} product_discovery tasks for new stores (HTTP transport)`);
for (const storeId of newStoreIds) { for (const storeId of newStoreIds) {
await this.createTask({ await this.createTask({
role: 'product_discovery', role: 'product_discovery',
dispensary_id: storeId, dispensary_id: storeId,
platform: completedTask.platform ?? undefined, platform: completedTask.platform ?? undefined,
priority: 10, // High priority for new stores priority: 10, // High priority for new stores
method: 'http', // Force HTTP transport for browser-based scraping
}); });
} }
} }
@@ -498,6 +503,7 @@ class TaskService {
dispensary_id: completedTask.dispensary_id, dispensary_id: completedTask.dispensary_id,
platform: completedTask.platform ?? undefined, platform: completedTask.platform ?? undefined,
priority: 10, priority: 10,
method: 'http', // Force HTTP transport
}); });
} }
break; break;
@@ -522,6 +528,7 @@ class TaskService {
/** /**
* Create store discovery task for a platform/state * Create store discovery task for a platform/state
* Uses HTTP transport (Puppeteer/browser) by default
*/ */
async createStoreDiscoveryTask( async createStoreDiscoveryTask(
platform: string, platform: string,
@@ -532,11 +539,13 @@ class TaskService {
role: 'store_discovery', role: 'store_discovery',
platform, platform,
priority, priority,
method: 'http', // Force HTTP transport
}); });
} }
/** /**
* Create entry point discovery task for a specific store * Create entry point discovery task for a specific store
* @deprecated Entry point resolution now happens during store promotion
*/ */
async createEntryPointTask( async createEntryPointTask(
dispensaryId: number, dispensaryId: number,
@@ -548,11 +557,13 @@ class TaskService {
dispensary_id: dispensaryId, dispensary_id: dispensaryId,
platform, platform,
priority, priority,
method: 'http', // Force HTTP transport
}); });
} }
/** /**
* Create product discovery task for a specific store * Create product discovery task for a specific store
* Uses HTTP transport (Puppeteer/browser) by default
*/ */
async createProductDiscoveryTask( async createProductDiscoveryTask(
dispensaryId: number, dispensaryId: number,
@@ -564,6 +575,7 @@ class TaskService {
dispensary_id: dispensaryId, dispensary_id: dispensaryId,
platform, platform,
priority, priority,
method: 'http', // Force HTTP transport
}); });
} }
@@ -641,6 +653,248 @@ class TaskService {
return (result.rows[0] as { completed_at: Date | null })?.completed_at ?? null; return (result.rows[0] as { completed_at: Date | null })?.completed_at ?? null;
} }
/**
* Create multiple tasks with staggered start times.
*
* STAGGERED TASK WORKFLOW:
* =======================
* This prevents resource contention and proxy assignment lag when creating
* many tasks at once. Each task gets a scheduled_for timestamp offset from
* the previous task.
*
* Workflow:
* 1. Task is created with scheduled_for = NOW() + (index * staggerSeconds)
* 2. Worker claims task only when scheduled_for <= NOW()
* 3. Worker runs preflight check on EVERY task claim
* 4. If preflight passes, worker executes task
* 5. If preflight fails, task is released back to pending for another worker
* 6. Worker finishes task, polls for next available task
* 7. Repeat - preflight runs again on next task claim
*
* Benefits:
* - Prevents all 8 workers from hitting proxies simultaneously
* - Reduces API rate limiting / 403 errors
* - Spreads resource usage over time
* - Each task still runs preflight, ensuring proxy health
*
* @param dispensaryIds - Array of dispensary IDs to create tasks for
* @param role - Task role (e.g., 'product_refresh', 'product_discovery')
* @param staggerSeconds - Seconds between each task's scheduled_for time (default: 15)
* @param platform - Platform identifier (default: 'dutchie')
* @param method - Transport method: 'curl' or 'http' (default: null for any)
* @returns Number of tasks created
*/
async createStaggeredTasks(
dispensaryIds: number[],
role: TaskRole,
staggerSeconds: number = 15,
platform: string = 'dutchie',
method: 'curl' | 'http' | null = null
): Promise<{ created: number; taskIds: number[] }> {
if (dispensaryIds.length === 0) {
return { created: 0, taskIds: [] };
}
// Use a single INSERT with generate_series for efficiency
const result = await pool.query(`
WITH task_data AS (
SELECT
unnest($1::int[]) as dispensary_id,
generate_series(0, array_length($1::int[], 1) - 1) as idx
)
INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status)
SELECT
$2::varchar as role,
td.dispensary_id,
$3::varchar as platform,
$4::varchar as method,
NOW() + (td.idx * $5::int * INTERVAL '1 second') as scheduled_for,
'pending' as status
FROM task_data td
ON CONFLICT DO NOTHING
RETURNING id
`, [dispensaryIds, role, platform, method, staggerSeconds]);
const taskIds = result.rows.map((r: { id: number }) => r.id);
console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks (${staggerSeconds}s apart)`);
return { created: taskIds.length, taskIds };
}
/**
* Create a batch of AZ store tasks with automatic distribution.
*
* This is a convenience method for creating tasks for Arizona stores with:
* - Automatic staggering to prevent resource contention
* - Even distribution across both refresh and discovery roles
*
* @param totalTasks - Total number of tasks to create
* @param staggerSeconds - Seconds between each task's start time
* @param splitRoles - If true, split between product_refresh and product_discovery
* @returns Summary of created tasks
*/
async createAZStoreTasks(
totalTasks: number = 24,
staggerSeconds: number = 15,
splitRoles: boolean = true
): Promise<{
total: number;
product_refresh: number;
product_discovery: number;
taskIds: number[];
}> {
// Get AZ stores with platform_id and menu_url
const storesResult = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE s.code = 'AZ'
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND d.menu_url IS NOT NULL
ORDER BY d.id
`);
const storeIds = storesResult.rows.map((r: { id: number }) => r.id);
if (storeIds.length === 0) {
console.log('[TaskService] No AZ stores found with platform_id and menu_url');
return { total: 0, product_refresh: 0, product_discovery: 0, taskIds: [] };
}
// Limit tasks to available stores
const maxTasks = Math.min(totalTasks, storeIds.length * 2); // 2x for both roles
const allTaskIds: number[] = [];
if (splitRoles) {
// Split between refresh and discovery
const tasksPerRole = Math.floor(maxTasks / 2);
const refreshStores = storeIds.slice(0, tasksPerRole);
const discoveryStores = storeIds.slice(0, tasksPerRole);
// Create refresh tasks first
const refreshResult = await this.createStaggeredTasks(
refreshStores,
'product_refresh',
staggerSeconds,
'dutchie'
);
allTaskIds.push(...refreshResult.taskIds);
// Create discovery tasks starting after refresh tasks are scheduled
const discoveryStartOffset = tasksPerRole * staggerSeconds;
const discoveryResult = await pool.query(`
WITH task_data AS (
SELECT
unnest($1::int[]) as dispensary_id,
generate_series(0, array_length($1::int[], 1) - 1) as idx
)
INSERT INTO worker_tasks (role, dispensary_id, platform, scheduled_for, status)
SELECT
'product_discovery'::varchar as role,
td.dispensary_id,
'dutchie'::varchar as platform,
NOW() + ($2::int * INTERVAL '1 second') + (td.idx * $3::int * INTERVAL '1 second') as scheduled_for,
'pending' as status
FROM task_data td
ON CONFLICT DO NOTHING
RETURNING id
`, [discoveryStores, discoveryStartOffset, staggerSeconds]);
allTaskIds.push(...discoveryResult.rows.map((r: { id: number }) => r.id));
return {
total: allTaskIds.length,
product_refresh: refreshResult.taskIds.length,
product_discovery: discoveryResult.rowCount ?? 0,
taskIds: allTaskIds
};
}
// Single role mode - all product_discovery
const result = await this.createStaggeredTasks(
storeIds.slice(0, totalTasks),
'product_discovery',
staggerSeconds,
'dutchie'
);
return {
total: result.taskIds.length,
product_refresh: 0,
product_discovery: result.taskIds.length,
taskIds: result.taskIds
};
}
/**
* Cleanup stale tasks that are stuck in 'claimed' or 'running' status.
*
* This handles the case where workers crash/restart and leave tasks in-flight.
* These stale tasks block the queue because the claim query excludes dispensary_ids
* that have active tasks.
*
* Called automatically on worker startup and can be called periodically.
*
* @param staleMinutes - Tasks older than this (based on last_heartbeat_at or claimed_at) are reset
* @returns Object with cleanup stats
*/
async cleanupStaleTasks(staleMinutes: number = 30): Promise<{
cleaned: number;
byStatus: { claimed: number; running: number };
byRole: Record<string, number>;
}> {
// First, get stats on what we're about to clean
const statsResult = await pool.query(`
SELECT status, role, COUNT(*)::int as count
FROM worker_tasks
WHERE status IN ('claimed', 'running')
AND COALESCE(last_heartbeat_at, claimed_at, created_at) < NOW() - INTERVAL '1 minute' * $1
GROUP BY status, role
`, [staleMinutes]);
const byStatus = { claimed: 0, running: 0 };
const byRole: Record<string, number> = {};
for (const row of statsResult.rows) {
const { status, role, count } = row as { status: string; role: string; count: number };
if (status === 'claimed') byStatus.claimed += count;
if (status === 'running') byStatus.running += count;
byRole[role] = (byRole[role] || 0) + count;
}
const totalStale = byStatus.claimed + byStatus.running;
if (totalStale === 0) {
return { cleaned: 0, byStatus, byRole };
}
// Reset stale tasks to pending
const result = await pool.query(`
UPDATE worker_tasks
SET
status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
last_heartbeat_at = NULL,
error_message = CONCAT(COALESCE(error_message, ''), ' [Auto-reset: stale after ', $1, ' min]'),
updated_at = NOW()
WHERE status IN ('claimed', 'running')
AND COALESCE(last_heartbeat_at, claimed_at, created_at) < NOW() - INTERVAL '1 minute' * $1
`, [staleMinutes]);
const cleaned = result.rowCount ?? 0;
if (cleaned > 0) {
console.log(`[TaskService] Cleaned up ${cleaned} stale tasks (claimed: ${byStatus.claimed}, running: ${byStatus.running})`);
console.log(`[TaskService] Stale tasks by role: ${Object.entries(byRole).map(([r, c]) => `${r}:${c}`).join(', ')}`);
}
return { cleaned, byStatus, byRole };
}
/** /**
* Calculate workers needed to complete tasks within SLA * Calculate workers needed to complete tasks within SLA
*/ */

View File

@@ -69,10 +69,13 @@ import { runPuppeteerPreflightWithRetry, PuppeteerPreflightResult } from '../ser
// Task handlers by role // Task handlers by role
// Per TASK_WORKFLOW_2024-12-10.md: payload_fetch and product_refresh are now separate // Per TASK_WORKFLOW_2024-12-10.md: payload_fetch and product_refresh are now separate
import { handlePayloadFetch } from './handlers/payload-fetch'; // Dual-transport: curl vs http (browser-based) handlers
import { handlePayloadFetch } from './handlers/payload-fetch-curl';
import { handleProductRefresh } from './handlers/product-refresh'; import { handleProductRefresh } from './handlers/product-refresh';
import { handleProductDiscovery } from './handlers/product-discovery'; import { handleProductDiscovery } from './handlers/product-discovery-curl';
import { handleProductDiscoveryHttp } from './handlers/product-discovery-http';
import { handleStoreDiscovery } from './handlers/store-discovery'; import { handleStoreDiscovery } from './handlers/store-discovery';
import { handleStoreDiscoveryHttp } from './handlers/store-discovery-http';
import { handleEntryPointDiscovery } from './handlers/entry-point-discovery'; import { handleEntryPointDiscovery } from './handlers/entry-point-discovery';
import { handleAnalyticsRefresh } from './handlers/analytics-refresh'; import { handleAnalyticsRefresh } from './handlers/analytics-refresh';
import { handleWhoami } from './handlers/whoami'; import { handleWhoami } from './handlers/whoami';
@@ -95,7 +98,7 @@ const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010';
// Maximum number of tasks this worker will run concurrently // Maximum number of tasks this worker will run concurrently
// Tune based on workload: I/O-bound tasks benefit from higher concurrency // Tune based on workload: I/O-bound tasks benefit from higher concurrency
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3'); const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '15');
// When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks // When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
// Default 85% - gives headroom before OOM // Default 85% - gives headroom before OOM
@@ -144,17 +147,47 @@ type TaskHandler = (ctx: TaskContext) => Promise<TaskResult>;
// Per TASK_WORKFLOW_2024-12-10.md: Handler registry // Per TASK_WORKFLOW_2024-12-10.md: Handler registry
// payload_fetch: Fetches from Dutchie API, saves to disk // payload_fetch: Fetches from Dutchie API, saves to disk
// product_refresh: Reads local payload, normalizes, upserts to DB // product_refresh: Reads local payload, normalizes, upserts to DB
// product_discovery: Main handler for product crawling // product_discovery: Main handler for product crawling (has curl and http variants)
const TASK_HANDLERS: Record<TaskRole, TaskHandler> = { const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
payload_fetch: handlePayloadFetch, // API fetch -> disk payload_fetch: handlePayloadFetch, // API fetch -> disk (curl)
product_refresh: handleProductRefresh, // disk -> DB product_refresh: handleProductRefresh, // disk -> DB
product_discovery: handleProductDiscovery, product_discovery: handleProductDiscovery, // Default: curl (see getHandlerForTask for http override)
store_discovery: handleStoreDiscovery, store_discovery: handleStoreDiscovery,
entry_point_discovery: handleEntryPointDiscovery, entry_point_discovery: handleEntryPointDiscovery,
analytics_refresh: handleAnalyticsRefresh, analytics_refresh: handleAnalyticsRefresh,
whoami: handleWhoami, // Tests proxy + anti-detect whoami: handleWhoami, // Tests proxy + anti-detect
}; };
/**
* Get the appropriate handler for a task, considering both role and method.
*
* Dual-transport handlers:
* - product_discovery: curl (axios) or http (Puppeteer)
* - store_discovery: curl (axios) or http (Puppeteer)
*
* Default method is 'http' since all GraphQL queries should use browser transport
* for better TLS fingerprinting and session-based proxy compatibility.
*/
function getHandlerForTask(task: WorkerTask): TaskHandler | undefined {
const role = task.role as TaskRole;
const method = task.method || 'http'; // Default to HTTP for all GraphQL tasks
// product_discovery: dual-transport support
if (role === 'product_discovery' && method === 'http') {
console.log(`[TaskWorker] Using HTTP handler for product_discovery (method=${method})`);
return handleProductDiscoveryHttp;
}
// store_discovery: dual-transport support
if (role === 'store_discovery' && method === 'http') {
console.log(`[TaskWorker] Using HTTP handler for store_discovery (method=${method})`);
return handleStoreDiscoveryHttp;
}
// Default: use the static handler registry (curl-based)
return TASK_HANDLERS[role];
}
/** /**
* Resource usage stats reported to the registry and used for backoff decisions. * Resource usage stats reported to the registry and used for backoff decisions.
* These values are included in worker heartbeats and displayed in the UI. * These values are included in worker heartbeats and displayed in the UI.
@@ -696,6 +729,20 @@ export class TaskWorker {
// Start registry heartbeat immediately // Start registry heartbeat immediately
this.startRegistryHeartbeat(); this.startRegistryHeartbeat();
// Cleanup stale tasks on startup (only worker-0 does this to avoid races)
// This handles tasks left in 'claimed'/'running' status when workers restart
if (this.workerId.endsWith('-0') || this.workerId === 'scraper-worker-0') {
try {
console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`);
const cleanupResult = await taskService.cleanupStaleTasks(30); // 30 minute threshold
if (cleanupResult.cleaned > 0) {
console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`);
}
} catch (err: any) {
console.error(`[TaskWorker] Stale task cleanup error:`, err.message);
}
}
const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)'; const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)';
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (stealth=lazy, max ${this.maxConcurrentTasks} concurrent tasks)`); console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (stealth=lazy, max ${this.maxConcurrentTasks} concurrent tasks)`);
@@ -740,6 +787,12 @@ export class TaskWorker {
this.backoffReason = null; this.backoffReason = null;
} }
// Periodically reload proxies to pick up changes (new proxies, disabled proxies)
// This runs every ~60 seconds (controlled by setProxyReloadInterval)
if (this.stealthInitialized) {
await this.crawlRotator.reloadIfStale();
}
// Check for decommission signal // Check for decommission signal
const shouldDecommission = await this.checkDecommission(); const shouldDecommission = await this.checkDecommission();
if (shouldDecommission) { if (shouldDecommission) {
@@ -777,13 +830,32 @@ export class TaskWorker {
console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`); console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`);
// ================================================================= // =================================================================
// PREFLIGHT CHECK - CRITICAL: Worker MUST pass before task execution // PREFLIGHT CHECK - Use stored preflight results based on task method
// Verifies: 1) Proxy available 2) Proxy connected 3) Anti-detect ready // We already ran dual-transport preflights at startup, so just verify
// the correct preflight passed for this task's required method.
// ================================================================= // =================================================================
const preflight = await this.crawlRotator.preflight(); const taskMethod = task.method || 'http'; // Default to http if not specified
if (!preflight.passed) { let preflightPassed = false;
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${preflight.error}`); let preflightMsg = '';
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without proxy/anti-detect`);
if (taskMethod === 'http' && this.preflightHttpPassed) {
preflightPassed = true;
preflightMsg = `HTTP preflight passed (IP: ${this.preflightHttpResult?.proxyIp || 'unknown'})`;
} else if (taskMethod === 'curl' && this.preflightCurlPassed) {
preflightPassed = true;
preflightMsg = `CURL preflight passed (IP: ${this.preflightCurlResult?.proxyIp || 'unknown'})`;
} else if (!task.method && (this.preflightHttpPassed || this.preflightCurlPassed)) {
// No method preference - either transport works
preflightPassed = true;
preflightMsg = this.preflightHttpPassed ? 'HTTP preflight passed' : 'CURL preflight passed';
}
if (!preflightPassed) {
const errorMsg = taskMethod === 'http'
? 'HTTP preflight not passed - cannot execute http tasks'
: 'CURL preflight not passed - cannot execute curl tasks';
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${errorMsg}`);
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without preflight`);
// Release task back to pending so another worker can pick it up // Release task back to pending so another worker can pick it up
await taskService.releaseTask(task.id); await taskService.releaseTask(task.id);
@@ -793,7 +865,7 @@ export class TaskWorker {
return; return;
} }
console.log(`[TaskWorker] ${this.friendlyName} preflight PASSED for task ${task.id} (proxy: ${preflight.proxyIp}, ${preflight.responseTimeMs}ms)`); console.log(`[TaskWorker] ${this.friendlyName} preflight verified for task ${task.id}: ${preflightMsg}`);
this.activeTasks.set(task.id, task); this.activeTasks.set(task.id, task);
@@ -837,8 +909,8 @@ export class TaskWorker {
// Mark as running // Mark as running
await taskService.startTask(task.id); await taskService.startTask(task.id);
// Get handler for this role // Get handler for this role (considers method for dual-transport)
const handler = TASK_HANDLERS[task.role]; const handler = getHandlerForTask(task);
if (!handler) { if (!handler) {
throw new Error(`No handler registered for role: ${task.role}`); throw new Error(`No handler registered for role: ${task.role}`);
} }

View File

@@ -366,6 +366,141 @@ export async function listPayloadMetadata(
})); }));
} }
/**
* Result from saving a discovery payload
*/
export interface SaveDiscoveryPayloadResult {
id: number;
storagePath: string;
sizeBytes: number;
sizeBytesRaw: number;
checksum: string;
}
/**
* Generate storage path for a discovery payload
*
* Format: /storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
*/
function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): string {
const year = timestamp.getFullYear();
const month = String(timestamp.getMonth() + 1).padStart(2, '0');
const day = String(timestamp.getDate()).padStart(2, '0');
const ts = timestamp.getTime();
return path.join(
PAYLOAD_BASE_PATH,
'discovery',
String(year),
month,
day,
`state_${stateCode.toLowerCase()}_${ts}.json.gz`
);
}
/**
* Save a raw store discovery payload to filesystem and record metadata in DB
*
* @param pool - Database connection pool
* @param stateCode - State code (e.g., 'AZ', 'MI')
* @param payload - Raw JSON payload from discovery GraphQL
* @param storeCount - Number of stores in payload
* @returns SaveDiscoveryPayloadResult with file info and DB record ID
*/
export async function saveDiscoveryPayload(
pool: Pool,
stateCode: string,
payload: any,
storeCount: number = 0
): Promise<SaveDiscoveryPayloadResult> {
const timestamp = new Date();
const storagePath = generateDiscoveryStoragePath(stateCode, timestamp);
// Serialize and compress
const jsonStr = JSON.stringify(payload);
const rawSize = Buffer.byteLength(jsonStr, 'utf8');
const compressed = await gzip(Buffer.from(jsonStr, 'utf8'));
const compressedSize = compressed.length;
const checksum = calculateChecksum(compressed);
// Write to filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
// Record metadata in DB
const result = await pool.query(`
INSERT INTO raw_crawl_payloads (
payload_type,
state_code,
storage_path,
store_count,
size_bytes,
size_bytes_raw,
fetched_at,
checksum_sha256
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id
`, [
'store_discovery',
stateCode.toUpperCase(),
storagePath,
storeCount,
compressedSize,
rawSize,
timestamp,
checksum
]);
console.log(`[PayloadStorage] Saved discovery payload for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`);
return {
id: result.rows[0].id,
storagePath,
sizeBytes: compressedSize,
sizeBytesRaw: rawSize,
checksum
};
}
/**
* Get the latest discovery payload for a state
*
* @param pool - Database connection pool
* @param stateCode - State code (e.g., 'AZ', 'MI')
* @returns Parsed payload and metadata, or null if none exists
*/
export async function getLatestDiscoveryPayload(
pool: Pool,
stateCode: string
): Promise<{ payload: any; metadata: any } | null> {
const result = await pool.query(`
SELECT id, state_code, storage_path, store_count, fetched_at
FROM raw_crawl_payloads
WHERE payload_type = 'store_discovery'
AND state_code = $1
ORDER BY fetched_at DESC
LIMIT 1
`, [stateCode.toUpperCase()]);
if (result.rows.length === 0) {
return null;
}
const row = result.rows[0];
const payload = await loadPayloadFromPath(row.storage_path);
return {
payload,
metadata: {
id: row.id,
stateCode: row.state_code,
storeCount: row.store_count,
fetchedAt: row.fetched_at,
storagePath: row.storage_path
}
};
}
/** /**
* Delete old payloads (for retention policy) * Delete old payloads (for retention policy)
* *

View File

@@ -3075,6 +3075,8 @@ export interface TaskSchedule {
priority: number; priority: number;
state_code: string | null; state_code: string | null;
platform: string | null; platform: string | null;
method: 'curl' | 'http' | null;
is_immutable: boolean;
last_run_at: string | null; last_run_at: string | null;
next_run_at: string | null; next_run_at: string | null;
last_task_count: number; last_task_count: number;

View File

@@ -25,6 +25,8 @@ import {
Play, Play,
Pause, Pause,
Timer, Timer,
Lock,
Globe,
} from 'lucide-react'; } from 'lucide-react';
interface Task { interface Task {
@@ -385,9 +387,7 @@ const ROLES = [
'store_discovery', 'store_discovery',
'entry_point_discovery', 'entry_point_discovery',
'product_discovery', 'product_discovery',
'product_refresh',
'analytics_refresh', 'analytics_refresh',
'payload_fetch',
]; ];
// ============================================================ // ============================================================
@@ -414,6 +414,7 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo
const [error, setError] = useState<string | null>(null); const [error, setError] = useState<string | null>(null);
const isNew = !schedule; const isNew = !schedule;
const isImmutable = schedule?.is_immutable ?? false;
useEffect(() => { useEffect(() => {
if (schedule) { if (schedule) {
@@ -440,7 +441,7 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo
}, [schedule, isOpen]); }, [schedule, isOpen]);
const handleSubmit = async () => { const handleSubmit = async () => {
if (!name.trim()) { if (!isImmutable && !name.trim()) {
setError('Name is required'); setError('Name is required');
return; return;
} }
@@ -449,16 +450,23 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo
setError(null); setError(null);
try { try {
const data = { // For immutable schedules, only send allowed fields
name: name.trim(), const data = isImmutable
role, ? {
description: description.trim() || undefined, enabled,
enabled, interval_hours: intervalHours,
interval_hours: intervalHours, priority,
priority, }
state_code: stateCode.trim() || undefined, : {
platform: platform.trim() || undefined, name: name.trim(),
}; role,
description: description.trim() || undefined,
enabled,
interval_hours: intervalHours,
priority,
state_code: stateCode.trim() || undefined,
platform: platform.trim() || undefined,
};
if (isNew) { if (isNew) {
await api.createTaskSchedule(data as any); await api.createTaskSchedule(data as any);
@@ -498,6 +506,15 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo
</div> </div>
)} )}
{isImmutable && (
<div className="bg-amber-50 border border-amber-200 rounded-lg p-3 flex items-start gap-2">
<Lock className="w-4 h-4 text-amber-600 flex-shrink-0 mt-0.5" />
<div className="text-sm text-amber-800">
<strong>Immutable schedule.</strong> Only <em>Enabled</em>, <em>Interval</em>, and <em>Priority</em> can be modified.
</div>
</div>
)}
<div> <div>
<label className="block text-sm font-medium text-gray-700 mb-1">Name *</label> <label className="block text-sm font-medium text-gray-700 mb-1">Name *</label>
<input <input
@@ -505,7 +522,10 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo
value={name} value={name}
onChange={(e) => setName(e.target.value)} onChange={(e) => setName(e.target.value)}
placeholder="e.g., product_refresh_all" placeholder="e.g., product_refresh_all"
className="w-full px-3 py-2 border border-gray-200 rounded-lg" disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/> />
</div> </div>
@@ -514,7 +534,10 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo
<select <select
value={role} value={role}
onChange={(e) => setRole(e.target.value)} onChange={(e) => setRole(e.target.value)}
className="w-full px-3 py-2 border border-gray-200 rounded-lg" disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
> >
{TASK_ROLES.map(r => ( {TASK_ROLES.map(r => (
<option key={r.id} value={r.id}>{r.name}</option> <option key={r.id} value={r.id}>{r.name}</option>
@@ -529,7 +552,10 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo
value={description} value={description}
onChange={(e) => setDescription(e.target.value)} onChange={(e) => setDescription(e.target.value)}
placeholder="Optional description" placeholder="Optional description"
className="w-full px-3 py-2 border border-gray-200 rounded-lg" disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/> />
</div> </div>
@@ -567,7 +593,10 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo
onChange={(e) => setStateCode(e.target.value.toUpperCase())} onChange={(e) => setStateCode(e.target.value.toUpperCase())}
placeholder="e.g., AZ" placeholder="e.g., AZ"
maxLength={2} maxLength={2}
className="w-full px-3 py-2 border border-gray-200 rounded-lg" disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/> />
</div> </div>
<div> <div>
@@ -577,7 +606,10 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo
value={platform} value={platform}
onChange={(e) => setPlatform(e.target.value)} onChange={(e) => setPlatform(e.target.value)}
placeholder="e.g., dutchie" placeholder="e.g., dutchie"
className="w-full px-3 py-2 border border-gray-200 rounded-lg" disabled={isImmutable}
className={`w-full px-3 py-2 border border-gray-200 rounded-lg ${
isImmutable ? 'bg-gray-100 text-gray-500 cursor-not-allowed' : ''
}`}
/> />
</div> </div>
</div> </div>
@@ -742,10 +774,25 @@ export default function TasksDashboard() {
}; };
const handleBulkDeleteSchedules = async () => { const handleBulkDeleteSchedules = async () => {
if (selectedSchedules.size === 0) return; // Filter out immutable schedules from selection
if (!confirm(`Delete ${selectedSchedules.size} selected schedule(s)?`)) return; const deletableIds = Array.from(selectedSchedules).filter(id => {
const schedule = schedules.find(s => s.id === id);
return schedule && !schedule.is_immutable;
});
if (deletableIds.length === 0) {
alert('No deletable schedules selected. Immutable schedules cannot be deleted.');
return;
}
const immutableCount = selectedSchedules.size - deletableIds.length;
const confirmMsg = immutableCount > 0
? `Delete ${deletableIds.length} schedule(s)? (${immutableCount} immutable schedule(s) will be skipped)`
: `Delete ${deletableIds.length} selected schedule(s)?`;
if (!confirm(confirmMsg)) return;
try { try {
await api.deleteTaskSchedulesBulk(Array.from(selectedSchedules)); await api.deleteTaskSchedulesBulk(deletableIds);
setSelectedSchedules(new Set()); setSelectedSchedules(new Set());
fetchData(); fetchData();
} catch (err: any) { } catch (err: any) {
@@ -1102,6 +1149,12 @@ export default function TasksDashboard() {
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase"> <th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Role Role
</th> </th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
State
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Method
</th>
<th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase"> <th className="px-4 py-3 text-left text-xs font-medium text-gray-500 uppercase">
Interval Interval
</th> </th>
@@ -1128,17 +1181,48 @@ export default function TasksDashboard() {
checked={selectedSchedules.has(schedule.id)} checked={selectedSchedules.has(schedule.id)}
onChange={() => toggleSelectSchedule(schedule.id)} onChange={() => toggleSelectSchedule(schedule.id)}
className="w-4 h-4 text-emerald-600 rounded" className="w-4 h-4 text-emerald-600 rounded"
disabled={schedule.is_immutable}
/> />
</td> </td>
<td className="px-4 py-3"> <td className="px-4 py-3">
<div className="text-sm font-medium text-gray-900">{schedule.name}</div> <div className="flex items-center gap-2">
{schedule.description && ( {schedule.is_immutable && (
<div className="text-xs text-gray-500">{schedule.description}</div> <span title="Immutable schedule (cannot be deleted)">
)} <Lock className="w-3.5 h-3.5 text-amber-500 flex-shrink-0" />
</span>
)}
<div>
<div className="text-sm font-medium text-gray-900">{schedule.name}</div>
{schedule.description && (
<div className="text-xs text-gray-500">{schedule.description}</div>
)}
</div>
</div>
</td> </td>
<td className="px-4 py-3 text-sm text-gray-600"> <td className="px-4 py-3 text-sm text-gray-600">
{schedule.role.replace(/_/g, ' ')} {schedule.role.replace(/_/g, ' ')}
</td> </td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.state_code ? (
<span className="inline-flex items-center gap-1 px-2 py-0.5 bg-blue-50 text-blue-700 rounded font-medium">
<Globe className="w-3 h-3" />
{schedule.state_code}
</span>
) : (
<span className="text-gray-400">-</span>
)}
</td>
<td className="px-4 py-3 text-sm">
<span className={`inline-flex items-center px-2 py-0.5 rounded text-xs font-medium ${
schedule.method === 'http'
? 'bg-purple-100 text-purple-700'
: schedule.method === 'curl'
? 'bg-orange-100 text-orange-700'
: 'bg-gray-100 text-gray-600'
}`}>
{schedule.method || 'any'}
</span>
</td>
<td className="px-4 py-3 text-sm text-gray-600"> <td className="px-4 py-3 text-sm text-gray-600">
Every {schedule.interval_hours}h Every {schedule.interval_hours}h
</td> </td>
@@ -1199,14 +1283,19 @@ export default function TasksDashboard() {
setShowScheduleModal(true); setShowScheduleModal(true);
}} }}
className="p-1.5 text-gray-400 hover:text-blue-600 hover:bg-blue-50 rounded transition-colors" className="p-1.5 text-gray-400 hover:text-blue-600 hover:bg-blue-50 rounded transition-colors"
title="Edit" title={schedule.is_immutable ? 'Edit (limited fields)' : 'Edit'}
> >
<Edit2 className="w-4 h-4" /> <Edit2 className="w-4 h-4" />
</button> </button>
<button <button
onClick={() => handleDeleteSchedule(schedule.id)} onClick={() => handleDeleteSchedule(schedule.id)}
className="p-1.5 text-gray-400 hover:text-red-600 hover:bg-red-50 rounded transition-colors" disabled={schedule.is_immutable}
title="Delete" className={`p-1.5 rounded transition-colors ${
schedule.is_immutable
? 'text-gray-300 cursor-not-allowed'
: 'text-gray-400 hover:text-red-600 hover:bg-red-50'
}`}
title={schedule.is_immutable ? 'Cannot delete immutable schedule' : 'Delete'}
> >
<Trash2 className="w-4 h-4" /> <Trash2 className="w-4 h-4" />
</button> </button>