From f82eed4dc3826d0ebb4ea3cd27056a92e2d72690 Mon Sep 17 00:00:00 2001 From: Kelly Date: Fri, 12 Dec 2025 01:53:15 -0700 Subject: [PATCH] feat(workers): Add proxy reload, staggered tasks, and bulk proxy import MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Periodic proxy reload: Workers now reload proxies every 60s to pick up changes - Staggered task scheduling: New API endpoints for creating tasks with delays - Bulk proxy import: Script supports multiple URL formats including host:port:user:pass - Proxy URL column: Migration 086 adds proxy_url for non-standard formats Key changes: - crawl-rotator.ts: Added reloadIfStale(), isStale(), setReloadInterval() - task-worker.ts: Calls reloadIfStale() in main loop - task-service.ts: Added createStaggeredTasks() and createAZStoreTasks() - tasks.ts: Added POST /batch/staggered and /batch/az-stores endpoints - import-proxies.ts: New script for bulk proxy import - CLAUDE.md: Documented staggered task workflow 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- CLAUDE.md | 72 ++++++ backend/migrations/086_proxy_url_column.sql | 10 + backend/src/routes/tasks.ts | 117 +++++++++ backend/src/routes/worker-registry.ts | 7 +- backend/src/scripts/import-proxies.ts | 263 ++++++++++++++++++++ backend/src/services/crawl-rotator.ts | 99 +++++++- backend/src/tasks/task-service.ts | 175 +++++++++++++ backend/src/tasks/task-worker.ts | 6 + 8 files changed, 742 insertions(+), 7 deletions(-) create mode 100644 backend/migrations/086_proxy_url_column.sql create mode 100644 backend/src/scripts/import-proxies.ts diff --git a/CLAUDE.md b/CLAUDE.md index 8c98d998..8a94047a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -205,6 +205,78 @@ These binaries mimic real browser TLS fingerprints to avoid detection. --- +## Staggered Task Workflow (Added 2025-12-12) + +### Overview +When creating many tasks at once (e.g., product refresh for all AZ stores), staggered scheduling prevents resource contention, proxy assignment lag, and API rate limiting. + +### How It Works +``` +1. Task created with scheduled_for = NOW() + (index * stagger_seconds) +2. Worker claims task only when scheduled_for <= NOW() +3. Worker runs preflight on EVERY task claim (proxy health check) +4. If preflight passes, worker executes task +5. If preflight fails, task released back to pending for another worker +6. Worker finishes task, polls for next available task +7. Repeat - preflight runs on each new task claim +``` + +### Key Points +- **Preflight is per-task, not per-startup**: Each task claim triggers a new preflight check +- **Stagger prevents thundering herd**: 15 seconds between tasks is default +- **Task assignment is the trigger**: Worker picks up task → runs preflight → executes if passed + +### API Endpoints +```bash +# Create staggered tasks for specific dispensary IDs +POST /api/tasks/batch/staggered +{ + "dispensary_ids": [1, 2, 3, 4], + "role": "product_refresh", # or "product_discovery" + "stagger_seconds": 15, # default: 15 + "platform": "dutchie", # default: "dutchie" + "method": null # "curl" | "http" | null +} + +# Create staggered tasks for AZ stores (convenience endpoint) +POST /api/tasks/batch/az-stores +{ + "total_tasks": 24, # default: 24 + "stagger_seconds": 15, # default: 15 + "split_roles": true # default: true (12 refresh, 12 discovery) +} +``` + +### Example: 24 Tasks for AZ Stores +```bash +curl -X POST http://localhost:3010/api/tasks/batch/az-stores \ + -H "Content-Type: application/json" \ + -d '{"total_tasks": 24, "stagger_seconds": 15, "split_roles": true}' +``` + +Response: +```json +{ + "success": true, + "total": 24, + "product_refresh": 12, + "product_discovery": 12, + "stagger_seconds": 15, + "total_duration_seconds": 345, + "estimated_completion": "2025-12-12T08:40:00.000Z", + "message": "Created 24 staggered tasks for AZ stores (12 refresh, 12 discovery)" +} +``` + +### Related Files +| File | Purpose | +|------|---------| +| `src/tasks/task-service.ts` | `createStaggeredTasks()` and `createAZStoreTasks()` methods | +| `src/routes/tasks.ts` | API endpoints for batch task creation | +| `src/tasks/task-worker.ts` | Worker task claiming and preflight logic | + +--- + ## Documentation | Doc | Purpose | diff --git a/backend/migrations/086_proxy_url_column.sql b/backend/migrations/086_proxy_url_column.sql new file mode 100644 index 00000000..129d135a --- /dev/null +++ b/backend/migrations/086_proxy_url_column.sql @@ -0,0 +1,10 @@ +-- Migration 086: Add proxy_url column for alternative URL formats +-- Some proxy providers use non-standard URL formats (e.g., host:port:user:pass) +-- This column allows storing the raw URL directly + +-- Add proxy_url column - if set, used directly instead of constructing from parts +ALTER TABLE proxies +ADD COLUMN IF NOT EXISTS proxy_url TEXT; + +-- Add comment +COMMENT ON COLUMN proxies.proxy_url IS 'Raw proxy URL (if provider uses non-standard format). Takes precedence over constructed URL from host/port/user/pass.'; diff --git a/backend/src/routes/tasks.ts b/backend/src/routes/tasks.ts index bcf1bbe7..b1cbbc4f 100644 --- a/backend/src/routes/tasks.ts +++ b/backend/src/routes/tasks.ts @@ -976,6 +976,123 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => { } }); +// ============================================================ +// STAGGERED BATCH TASK CREATION +// ============================================================ + +/** + * POST /api/tasks/batch/staggered + * Create multiple tasks with staggered start times + * + * This endpoint prevents resource contention when creating many tasks by + * staggering their scheduled_for timestamps. Each task becomes eligible + * for claiming only after its scheduled time. + * + * WORKFLOW: + * 1. Tasks created with scheduled_for = NOW() + (index * stagger_seconds) + * 2. Worker claims task only when scheduled_for <= NOW() + * 3. Worker runs preflight on EVERY task claim + * 4. If preflight passes, worker executes task + * 5. If preflight fails, task released back to pending for another worker + * + * Body: + * - dispensary_ids: number[] (required) - Array of dispensary IDs + * - role: TaskRole (required) - 'product_refresh' | 'product_discovery' + * - stagger_seconds: number (default: 15) - Seconds between each task start + * - platform: string (default: 'dutchie') + * - method: 'curl' | 'http' | null (default: null) + */ +router.post('/batch/staggered', async (req: Request, res: Response) => { + try { + const { + dispensary_ids, + role, + stagger_seconds = 15, + platform = 'dutchie', + method = null, + } = req.body; + + if (!dispensary_ids || !Array.isArray(dispensary_ids) || dispensary_ids.length === 0) { + return res.status(400).json({ error: 'dispensary_ids array is required' }); + } + + if (!role) { + return res.status(400).json({ error: 'role is required' }); + } + + const result = await taskService.createStaggeredTasks( + dispensary_ids, + role as TaskRole, + stagger_seconds, + platform, + method + ); + + const totalDuration = (dispensary_ids.length - 1) * stagger_seconds; + const estimatedEndTime = new Date(Date.now() + totalDuration * 1000); + + res.status(201).json({ + success: true, + created: result.created, + task_ids: result.taskIds, + stagger_seconds, + total_duration_seconds: totalDuration, + estimated_completion: estimatedEndTime.toISOString(), + message: `Created ${result.created} staggered ${role} tasks (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`, + }); + } catch (error: unknown) { + console.error('Error creating staggered tasks:', error); + res.status(500).json({ error: 'Failed to create staggered tasks' }); + } +}); + +/** + * POST /api/tasks/batch/az-stores + * Convenience endpoint to create staggered tasks for Arizona stores + * + * Body: + * - total_tasks: number (default: 24) - Total tasks to create + * - stagger_seconds: number (default: 15) - Seconds between each task + * - split_roles: boolean (default: true) - Split between product_refresh and product_discovery + */ +router.post('/batch/az-stores', async (req: Request, res: Response) => { + try { + const { + total_tasks = 24, + stagger_seconds = 15, + split_roles = true, + } = req.body; + + const result = await taskService.createAZStoreTasks( + total_tasks, + stagger_seconds, + split_roles + ); + + const totalDuration = (result.total - 1) * stagger_seconds; + const estimatedEndTime = new Date(Date.now() + totalDuration * 1000); + + res.status(201).json({ + success: true, + total: result.total, + product_refresh: result.product_refresh, + product_discovery: result.product_discovery, + task_ids: result.taskIds, + stagger_seconds, + total_duration_seconds: totalDuration, + estimated_completion: estimatedEndTime.toISOString(), + message: `Created ${result.total} staggered tasks for AZ stores (${result.product_refresh} refresh, ${result.product_discovery} discovery)`, + }); + } catch (error: unknown) { + console.error('Error creating AZ store tasks:', error); + res.status(500).json({ error: 'Failed to create AZ store tasks' }); + } +}); + +// ============================================================ +// TASK POOL MANAGEMENT +// ============================================================ + /** * GET /api/tasks/pool/status * Check if task pool is paused diff --git a/backend/src/routes/worker-registry.ts b/backend/src/routes/worker-registry.ts index 2b949cad..3cab0333 100644 --- a/backend/src/routes/worker-registry.ts +++ b/backend/src/routes/worker-registry.ts @@ -252,12 +252,9 @@ router.post('/deregister', async (req: Request, res: Response) => { // Release the name back to the pool await pool.query('SELECT release_worker_name($1)', [worker_id]); - // Mark as terminated + // Delete the worker entry (clean shutdown) const { rows } = await pool.query(` - UPDATE worker_registry - SET status = 'terminated', - current_task_id = NULL, - updated_at = NOW() + DELETE FROM worker_registry WHERE worker_id = $1 RETURNING id, friendly_name `, [worker_id]); diff --git a/backend/src/scripts/import-proxies.ts b/backend/src/scripts/import-proxies.ts new file mode 100644 index 00000000..acdc5d75 --- /dev/null +++ b/backend/src/scripts/import-proxies.ts @@ -0,0 +1,263 @@ +/** + * Bulk Proxy Import Script + * + * Imports proxies from various formats into the proxies table. + * Supports: + * - Standard format: http://user:pass@host:port + * - Colon format: http://host:port:user:pass + * - Simple format: host:port:user:pass (defaults to http) + * + * Usage: + * npx tsx src/scripts/import-proxies.ts < proxies.txt + * echo "http://host:port:user:pass" | npx tsx src/scripts/import-proxies.ts + * npx tsx src/scripts/import-proxies.ts --file proxies.txt + * npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass" + * + * Options: + * --file Read proxies from file (one per line) + * --url Import a single proxy URL + * --max-connections Set max_connections for all imported proxies (default: 1) + * --dry-run Parse and show what would be imported without inserting + */ + +import { getPool } from '../db/pool'; +import * as fs from 'fs'; +import * as readline from 'readline'; + +interface ParsedProxy { + protocol: string; + host: string; + port: number; + username?: string; + password?: string; + rawUrl: string; +} + +/** + * Parse a proxy URL in various formats + */ +function parseProxyUrl(input: string): ParsedProxy | null { + const trimmed = input.trim(); + if (!trimmed || trimmed.startsWith('#')) return null; + + // Format 1: Standard URL format - http://user:pass@host:port + const standardMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):([^@]+)@([^:]+):(\d+)$/); + if (standardMatch) { + return { + protocol: standardMatch[1], + username: standardMatch[2], + password: standardMatch[3], + host: standardMatch[4], + port: parseInt(standardMatch[5], 10), + rawUrl: trimmed, + }; + } + + // Format 2: Standard URL without auth - http://host:port + const noAuthMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+)$/); + if (noAuthMatch) { + return { + protocol: noAuthMatch[1], + host: noAuthMatch[2], + port: parseInt(noAuthMatch[3], 10), + rawUrl: trimmed, + }; + } + + // Format 3: Colon format with protocol - http://host:port:user:pass + const colonWithProtocolMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+):([^:]+):(.+)$/); + if (colonWithProtocolMatch) { + return { + protocol: colonWithProtocolMatch[1], + host: colonWithProtocolMatch[2], + port: parseInt(colonWithProtocolMatch[3], 10), + username: colonWithProtocolMatch[4], + password: colonWithProtocolMatch[5], + rawUrl: trimmed, // Keep raw URL for non-standard format + }; + } + + // Format 4: Colon format without protocol - host:port:user:pass + const colonMatch = trimmed.match(/^([^:]+):(\d+):([^:]+):(.+)$/); + if (colonMatch) { + return { + protocol: 'http', + host: colonMatch[1], + port: parseInt(colonMatch[2], 10), + username: colonMatch[3], + password: colonMatch[4], + rawUrl: `http://${trimmed}`, // Construct raw URL + }; + } + + // Format 5: Simple host:port + const simpleMatch = trimmed.match(/^([^:]+):(\d+)$/); + if (simpleMatch) { + return { + protocol: 'http', + host: simpleMatch[1], + port: parseInt(simpleMatch[2], 10), + rawUrl: `http://${trimmed}`, + }; + } + + console.error(`[ImportProxies] Could not parse: ${trimmed}`); + return null; +} + +/** + * Check if proxy URL is in non-standard format (needs proxy_url column) + */ +function isNonStandardFormat(rawUrl: string): boolean { + // Colon format: protocol://host:port:user:pass + return /^(https?|socks5):\/\/[^:]+:\d+:[^:]+:.+$/.test(rawUrl); +} + +async function importProxies(proxies: ParsedProxy[], maxConnections: number, dryRun: boolean) { + if (dryRun) { + console.log('\n[ImportProxies] DRY RUN - Would import:'); + for (const p of proxies) { + const needsRawUrl = isNonStandardFormat(p.rawUrl); + console.log(` ${p.host}:${p.port} (${p.protocol}) user=${p.username || 'none'} needsProxyUrl=${needsRawUrl}`); + } + console.log(`\nTotal: ${proxies.length} proxies`); + return; + } + + const pool = getPool(); + let inserted = 0; + let skipped = 0; + + for (const proxy of proxies) { + try { + // Determine if we need to store the raw URL (non-standard format) + const needsRawUrl = isNonStandardFormat(proxy.rawUrl); + + const result = await pool.query(` + INSERT INTO proxies (host, port, protocol, username, password, max_connections, proxy_url, active) + VALUES ($1, $2, $3, $4, $5, $6, $7, true) + ON CONFLICT (host, port, protocol) + DO UPDATE SET + username = EXCLUDED.username, + password = EXCLUDED.password, + max_connections = EXCLUDED.max_connections, + proxy_url = EXCLUDED.proxy_url, + active = true, + updated_at = NOW() + RETURNING id, (xmax = 0) as is_insert + `, [ + proxy.host, + proxy.port, + proxy.protocol, + proxy.username || null, + proxy.password || null, + maxConnections, + needsRawUrl ? proxy.rawUrl : null, + ]); + + const isInsert = result.rows[0]?.is_insert; + if (isInsert) { + inserted++; + console.log(`[ImportProxies] Inserted: ${proxy.host}:${proxy.port}`); + } else { + console.log(`[ImportProxies] Updated: ${proxy.host}:${proxy.port}`); + inserted++; // Count updates too + } + } catch (err: any) { + console.error(`[ImportProxies] Error inserting ${proxy.host}:${proxy.port}: ${err.message}`); + skipped++; + } + } + + console.log(`\n[ImportProxies] Complete: ${inserted} imported, ${skipped} skipped`); + + // Notify any listening workers + try { + await pool.query(`NOTIFY proxy_added, 'bulk import'`); + console.log('[ImportProxies] Sent proxy_added notification to workers'); + } catch { + // Ignore notification errors + } +} + +async function readFromStdin(): Promise { + return new Promise((resolve) => { + const lines: string[] = []; + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + terminal: false, + }); + + rl.on('line', (line) => { + lines.push(line); + }); + + rl.on('close', () => { + resolve(lines); + }); + }); +} + +async function main() { + const args = process.argv.slice(2); + let lines: string[] = []; + let maxConnections = 1; + let dryRun = false; + + // Parse arguments + for (let i = 0; i < args.length; i++) { + if (args[i] === '--file' && args[i + 1]) { + const content = fs.readFileSync(args[i + 1], 'utf-8'); + lines.push(...content.split('\n')); + i++; + } else if (args[i] === '--url' && args[i + 1]) { + lines.push(args[i + 1]); + i++; + } else if (args[i] === '--max-connections' && args[i + 1]) { + maxConnections = parseInt(args[i + 1], 10); + i++; + } else if (args[i] === '--dry-run') { + dryRun = true; + } else if (!args[i].startsWith('--')) { + // Treat as URL directly + lines.push(args[i]); + } + } + + // If no lines yet, read from stdin + if (lines.length === 0) { + console.log('[ImportProxies] Reading from stdin...'); + lines = await readFromStdin(); + } + + // Parse all lines + const proxies: ParsedProxy[] = []; + for (const line of lines) { + const parsed = parseProxyUrl(line); + if (parsed) { + proxies.push(parsed); + } + } + + if (proxies.length === 0) { + console.error('[ImportProxies] No valid proxies found'); + console.error('\nUsage:'); + console.error(' npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"'); + console.error(' npx tsx src/scripts/import-proxies.ts --file proxies.txt'); + console.error(' echo "host:port:user:pass" | npx tsx src/scripts/import-proxies.ts'); + console.error('\nSupported formats:'); + console.error(' http://user:pass@host:port (standard)'); + console.error(' http://host:port:user:pass (colon format)'); + console.error(' host:port:user:pass (simple)'); + process.exit(1); + } + + console.log(`[ImportProxies] Parsed ${proxies.length} proxies (max_connections=${maxConnections})`); + await importProxies(proxies, maxConnections, dryRun); +} + +main().catch((err) => { + console.error('[ImportProxies] Fatal error:', err); + process.exit(1); +}); diff --git a/backend/src/services/crawl-rotator.ts b/backend/src/services/crawl-rotator.ts index 0a04dcc8..cea63625 100644 --- a/backend/src/services/crawl-rotator.ts +++ b/backend/src/services/crawl-rotator.ts @@ -77,6 +77,11 @@ export interface Proxy { country?: string; countryCode?: string; timezone?: string; + /** + * Raw proxy URL override. If set, used directly instead of constructing from parts. + * Supports non-standard formats like: http://host:port:user:pass + */ + proxyUrl?: string; } export interface ProxyStats { @@ -129,6 +134,10 @@ export class ProxyRotator { private proxies: Proxy[] = []; private currentIndex: number = 0; private lastRotation: Date = new Date(); + private lastReloadAt: Date = new Date(); + + // Proxy reload interval - how often to check for proxy changes (default: 60 seconds) + private reloadIntervalMs: number = 60000; constructor(pool?: Pool) { this.pool = pool || null; @@ -138,6 +147,13 @@ export class ProxyRotator { this.pool = pool; } + /** + * Set the reload interval for periodic proxy checks + */ + setReloadInterval(ms: number): void { + this.reloadIntervalMs = ms; + } + /** * Load proxies from database */ @@ -167,22 +183,76 @@ export class ProxyRotator { state, country, country_code as "countryCode", - timezone + timezone, + proxy_url as "proxyUrl" FROM proxies WHERE active = true ORDER BY failure_count ASC, last_tested_at ASC NULLS FIRST `); this.proxies = result.rows; + this.lastReloadAt = new Date(); const totalCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0); - console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections)`); + console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections / threads)`); } catch (error) { console.warn(`[ProxyRotator] Could not load proxies: ${error}`); this.proxies = []; } } + /** + * Check if proxy list is stale and needs reload + */ + isStale(): boolean { + const elapsed = Date.now() - this.lastReloadAt.getTime(); + return elapsed > this.reloadIntervalMs; + } + + /** + * Reload proxies if the cache is stale. + * This ensures workers pick up new proxies or see disabled proxies removed. + * Returns true if proxies were reloaded. + */ + async reloadIfStale(): Promise { + if (!this.isStale()) { + return false; + } + + const oldCount = this.proxies.length; + const oldCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0); + const oldIds = new Set(this.proxies.map(p => p.id)); + + await this.loadProxies(); + + const newCount = this.proxies.length; + const newCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0); + const newIds = new Set(this.proxies.map(p => p.id)); + + // Log changes + const added = this.proxies.filter(p => !oldIds.has(p.id)); + const removed = [...oldIds].filter(id => !newIds.has(id)); + + if (added.length > 0 || removed.length > 0 || oldCapacity !== newCapacity) { + console.log(`[ProxyRotator] Reloaded proxies: ${oldCount}→${newCount} proxies, ${oldCapacity}→${newCapacity} threads`); + if (added.length > 0) { + console.log(`[ProxyRotator] Added: ${added.map(p => `${p.host}:${p.port} (${p.maxConnections} threads)`).join(', ')}`); + } + if (removed.length > 0) { + console.log(`[ProxyRotator] Removed: ${removed.join(', ')}`); + } + } + + return true; + } + + /** + * Get time since last reload in seconds + */ + getSecondsSinceReload(): number { + return Math.floor((Date.now() - this.lastReloadAt.getTime()) / 1000); + } + /** * Get next proxy in rotation */ @@ -342,8 +412,16 @@ export class ProxyRotator { /** * Get proxy URL for HTTP client + * If proxy.proxyUrl is set, uses it directly (supports non-standard formats). + * Otherwise constructs standard format: protocol://user:pass@host:port */ getProxyUrl(proxy: Proxy): string { + // Use raw proxyUrl if set (supports non-standard formats like host:port:user:pass) + if (proxy.proxyUrl) { + return proxy.proxyUrl; + } + + // Construct standard format const auth = proxy.username && proxy.password ? `${proxy.username}:${proxy.password}@` : ''; @@ -584,6 +662,23 @@ export class CrawlRotator { await this.proxy.loadProxies(); } + /** + * Reload proxy list if stale. + * Workers should call this periodically to pick up proxy changes. + * Returns true if proxies were reloaded. + */ + async reloadIfStale(): Promise { + return this.proxy.reloadIfStale(); + } + + /** + * Set proxy reload interval in milliseconds. + * Default is 60 seconds. + */ + setProxyReloadInterval(ms: number): void { + this.proxy.setReloadInterval(ms); + } + /** * Rotate proxy only (get new IP) */ diff --git a/backend/src/tasks/task-service.ts b/backend/src/tasks/task-service.ts index 2e9f1f5b..e7b4c8cf 100644 --- a/backend/src/tasks/task-service.ts +++ b/backend/src/tasks/task-service.ts @@ -641,6 +641,181 @@ class TaskService { return (result.rows[0] as { completed_at: Date | null })?.completed_at ?? null; } + /** + * Create multiple tasks with staggered start times. + * + * STAGGERED TASK WORKFLOW: + * ======================= + * This prevents resource contention and proxy assignment lag when creating + * many tasks at once. Each task gets a scheduled_for timestamp offset from + * the previous task. + * + * Workflow: + * 1. Task is created with scheduled_for = NOW() + (index * staggerSeconds) + * 2. Worker claims task only when scheduled_for <= NOW() + * 3. Worker runs preflight check on EVERY task claim + * 4. If preflight passes, worker executes task + * 5. If preflight fails, task is released back to pending for another worker + * 6. Worker finishes task, polls for next available task + * 7. Repeat - preflight runs again on next task claim + * + * Benefits: + * - Prevents all 8 workers from hitting proxies simultaneously + * - Reduces API rate limiting / 403 errors + * - Spreads resource usage over time + * - Each task still runs preflight, ensuring proxy health + * + * @param dispensaryIds - Array of dispensary IDs to create tasks for + * @param role - Task role (e.g., 'product_refresh', 'product_discovery') + * @param staggerSeconds - Seconds between each task's scheduled_for time (default: 15) + * @param platform - Platform identifier (default: 'dutchie') + * @param method - Transport method: 'curl' or 'http' (default: null for any) + * @returns Number of tasks created + */ + async createStaggeredTasks( + dispensaryIds: number[], + role: TaskRole, + staggerSeconds: number = 15, + platform: string = 'dutchie', + method: 'curl' | 'http' | null = null + ): Promise<{ created: number; taskIds: number[] }> { + if (dispensaryIds.length === 0) { + return { created: 0, taskIds: [] }; + } + + // Use a single INSERT with generate_series for efficiency + const result = await pool.query(` + WITH task_data AS ( + SELECT + unnest($1::int[]) as dispensary_id, + generate_series(0, array_length($1::int[], 1) - 1) as idx + ) + INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status) + SELECT + $2::varchar as role, + td.dispensary_id, + $3::varchar as platform, + $4::varchar as method, + NOW() + (td.idx * $5::int * INTERVAL '1 second') as scheduled_for, + 'pending' as status + FROM task_data td + ON CONFLICT DO NOTHING + RETURNING id + `, [dispensaryIds, role, platform, method, staggerSeconds]); + + const taskIds = result.rows.map((r: { id: number }) => r.id); + + console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks (${staggerSeconds}s apart)`); + + return { created: taskIds.length, taskIds }; + } + + /** + * Create a batch of AZ store tasks with automatic distribution. + * + * This is a convenience method for creating tasks for Arizona stores with: + * - Automatic staggering to prevent resource contention + * - Even distribution across both refresh and discovery roles + * + * @param totalTasks - Total number of tasks to create + * @param staggerSeconds - Seconds between each task's start time + * @param splitRoles - If true, split between product_refresh and product_discovery + * @returns Summary of created tasks + */ + async createAZStoreTasks( + totalTasks: number = 24, + staggerSeconds: number = 15, + splitRoles: boolean = true + ): Promise<{ + total: number; + product_refresh: number; + product_discovery: number; + taskIds: number[]; + }> { + // Get AZ stores with platform_id and menu_url + const storesResult = await pool.query(` + SELECT d.id + FROM dispensaries d + JOIN states s ON d.state_id = s.id + WHERE s.code = 'AZ' + AND d.crawl_enabled = true + AND d.platform_dispensary_id IS NOT NULL + AND d.menu_url IS NOT NULL + ORDER BY d.id + `); + + const storeIds = storesResult.rows.map((r: { id: number }) => r.id); + + if (storeIds.length === 0) { + console.log('[TaskService] No AZ stores found with platform_id and menu_url'); + return { total: 0, product_refresh: 0, product_discovery: 0, taskIds: [] }; + } + + // Limit tasks to available stores + const maxTasks = Math.min(totalTasks, storeIds.length * 2); // 2x for both roles + const allTaskIds: number[] = []; + + if (splitRoles) { + // Split between refresh and discovery + const tasksPerRole = Math.floor(maxTasks / 2); + const refreshStores = storeIds.slice(0, tasksPerRole); + const discoveryStores = storeIds.slice(0, tasksPerRole); + + // Create refresh tasks first + const refreshResult = await this.createStaggeredTasks( + refreshStores, + 'product_refresh', + staggerSeconds, + 'dutchie' + ); + allTaskIds.push(...refreshResult.taskIds); + + // Create discovery tasks starting after refresh tasks are scheduled + const discoveryStartOffset = tasksPerRole * staggerSeconds; + const discoveryResult = await pool.query(` + WITH task_data AS ( + SELECT + unnest($1::int[]) as dispensary_id, + generate_series(0, array_length($1::int[], 1) - 1) as idx + ) + INSERT INTO worker_tasks (role, dispensary_id, platform, scheduled_for, status) + SELECT + 'product_discovery'::varchar as role, + td.dispensary_id, + 'dutchie'::varchar as platform, + NOW() + ($2::int * INTERVAL '1 second') + (td.idx * $3::int * INTERVAL '1 second') as scheduled_for, + 'pending' as status + FROM task_data td + ON CONFLICT DO NOTHING + RETURNING id + `, [discoveryStores, discoveryStartOffset, staggerSeconds]); + + allTaskIds.push(...discoveryResult.rows.map((r: { id: number }) => r.id)); + + return { + total: allTaskIds.length, + product_refresh: refreshResult.taskIds.length, + product_discovery: discoveryResult.rowCount ?? 0, + taskIds: allTaskIds + }; + } + + // Single role mode - all product_discovery + const result = await this.createStaggeredTasks( + storeIds.slice(0, totalTasks), + 'product_discovery', + staggerSeconds, + 'dutchie' + ); + + return { + total: result.taskIds.length, + product_refresh: 0, + product_discovery: result.taskIds.length, + taskIds: result.taskIds + }; + } + /** * Calculate workers needed to complete tasks within SLA */ diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 7cb99bd3..7ad4b883 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -740,6 +740,12 @@ export class TaskWorker { this.backoffReason = null; } + // Periodically reload proxies to pick up changes (new proxies, disabled proxies) + // This runs every ~60 seconds (controlled by setProxyReloadInterval) + if (this.stealthInitialized) { + await this.crawlRotator.reloadIfStale(); + } + // Check for decommission signal const shouldDecommission = await this.checkDecommission(); if (shouldDecommission) {