/** * Task Worker * * A unified worker that pulls tasks from the worker_tasks queue. * Workers register on startup, get a friendly name, and pull tasks. * * Architecture: * - Tasks are generated on schedule (by scheduler or API) * - Workers PULL tasks from the pool (not assigned to them) * - Tasks are claimed in order of priority (DESC) then creation time (ASC) * - Workers report heartbeats to worker_registry * - Workers are ROLE-AGNOSTIC by default (can handle any task type) * * Stealth & Anti-Detection: * PROXIES ARE REQUIRED - workers will fail to start if no proxies available. * * On startup, workers initialize the CrawlRotator which provides: * - Proxy rotation: Loads proxies from `proxies` table, ALL requests use proxy * - User-Agent rotation: Cycles through realistic browser fingerprints * - Fingerprint rotation: Changes browser profile on blocks * - Locale/timezone: Matches Accept-Language to target state * * The CrawlRotator is wired to the Dutchie client via setCrawlRotator(). * Task handlers call startSession() which picks a random fingerprint. * On 403 errors, the client automatically: * 1. Records failure on current proxy * 2. Rotates to next proxy * 3. Rotates fingerprint * 4. Retries the request * * Usage: * npx tsx src/tasks/task-worker.ts # Role-agnostic (any task) * WORKER_ROLE=product_refresh npx tsx src/tasks/task-worker.ts # Role-specific * * Environment: * WORKER_ROLE - Which task role to process (optional, null = any task) * WORKER_ID - Optional custom worker ID (auto-generated if not provided) * POD_NAME - Kubernetes pod name (optional) * POLL_INTERVAL_MS - How often to check for tasks (default: 5000) * HEARTBEAT_INTERVAL_MS - How often to update heartbeat (default: 30000) * API_BASE_URL - Backend API URL for registration (default: http://localhost:3010) */ import { Pool } from 'pg'; import { v4 as uuidv4 } from 'uuid'; import { taskService, TaskRole, WorkerTask } from './task-service'; import { getPool } from '../db/pool'; import os from 'os'; // Stealth/rotation support import { CrawlRotator } from '../services/crawl-rotator'; import { setCrawlRotator } from '../platforms/dutchie'; // Task handlers by role // Per TASK_WORKFLOW_2024-12-10.md: payload_fetch and product_refresh are now separate import { handlePayloadFetch } from './handlers/payload-fetch'; import { handleProductRefresh } from './handlers/product-refresh'; import { handleProductDiscovery } from './handlers/product-discovery'; import { handleStoreDiscovery } from './handlers/store-discovery'; import { handleEntryPointDiscovery } from './handlers/entry-point-discovery'; import { handleAnalyticsRefresh } from './handlers/analytics-refresh'; const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000'); const HEARTBEAT_INTERVAL_MS = parseInt(process.env.HEARTBEAT_INTERVAL_MS || '30000'); const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010'; // ============================================================================= // CONCURRENT TASK PROCESSING SETTINGS // ============================================================================= // Workers can process multiple tasks simultaneously using async I/O. // This improves throughput for I/O-bound tasks (network calls, DB queries). // // Resource thresholds trigger "backoff" - the worker stops claiming new tasks // but continues processing existing ones until resources return to normal. // // See: docs/WORKER_TASK_ARCHITECTURE.md#concurrent-task-processing // ============================================================================= // Maximum number of tasks this worker will run concurrently // Tune based on workload: I/O-bound tasks benefit from higher concurrency const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3'); // When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks // Default 85% - gives headroom before OOM const MEMORY_BACKOFF_THRESHOLD = parseFloat(process.env.MEMORY_BACKOFF_THRESHOLD || '0.85'); // When CPU usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks // Default 90% - allows some burst capacity const CPU_BACKOFF_THRESHOLD = parseFloat(process.env.CPU_BACKOFF_THRESHOLD || '0.90'); // How long to wait (ms) when in backoff state before rechecking resources const BACKOFF_DURATION_MS = parseInt(process.env.BACKOFF_DURATION_MS || '10000'); export interface TaskContext { pool: Pool; workerId: string; task: WorkerTask; heartbeat: () => Promise; } export interface TaskResult { success: boolean; productsProcessed?: number; snapshotsCreated?: number; storesDiscovered?: number; error?: string; [key: string]: unknown; } type TaskHandler = (ctx: TaskContext) => Promise; // Per TASK_WORKFLOW_2024-12-10.md: Handler registry // payload_fetch: Fetches from Dutchie API, saves to disk, chains to product_refresh // product_refresh: Reads local payload, normalizes, upserts to DB const TASK_HANDLERS: Record = { payload_fetch: handlePayloadFetch, // NEW: API fetch -> disk product_refresh: handleProductRefresh, // CHANGED: disk -> DB product_discovery: handleProductDiscovery, store_discovery: handleStoreDiscovery, entry_point_discovery: handleEntryPointDiscovery, analytics_refresh: handleAnalyticsRefresh, }; /** * Resource usage stats reported to the registry and used for backoff decisions. * These values are included in worker heartbeats and displayed in the UI. */ interface ResourceStats { /** Current heap memory usage as decimal (0.0 to 1.0) */ memoryPercent: number; /** Current heap used in MB */ memoryMb: number; /** Total heap available in MB */ memoryTotalMb: number; /** CPU usage percentage since last check (0 to 100) */ cpuPercent: number; /** True if worker is currently in backoff state */ isBackingOff: boolean; /** Reason for backoff (e.g., "Memory at 87.3% (threshold: 85%)") */ backoffReason: string | null; } export class TaskWorker { private pool: Pool; private workerId: string; private role: TaskRole | null; // null = role-agnostic (any task) private friendlyName: string = ''; private isRunning: boolean = false; private heartbeatInterval: NodeJS.Timeout | null = null; private registryHeartbeatInterval: NodeJS.Timeout | null = null; private crawlRotator: CrawlRotator; // ========================================================================== // CONCURRENT TASK TRACKING // ========================================================================== // activeTasks: Map of task ID -> task object for all currently running tasks // taskPromises: Map of task ID -> Promise for cleanup when task completes // maxConcurrentTasks: How many tasks this worker will run in parallel // ========================================================================== private activeTasks: Map = new Map(); private taskPromises: Map> = new Map(); private maxConcurrentTasks: number = MAX_CONCURRENT_TASKS; // ========================================================================== // RESOURCE MONITORING FOR BACKOFF // ========================================================================== // CPU tracking uses differential measurement - we track last values and // calculate percentage based on elapsed time since last check. // ========================================================================== private lastCpuUsage: { user: number; system: number } = { user: 0, system: 0 }; private lastCpuCheck: number = Date.now(); private isBackingOff: boolean = false; private backoffReason: string | null = null; constructor(role: TaskRole | null = null, workerId?: string) { this.pool = getPool(); this.role = role; this.workerId = workerId || `worker-${uuidv4().slice(0, 8)}`; this.crawlRotator = new CrawlRotator(this.pool); // Initialize CPU tracking const cpuUsage = process.cpuUsage(); this.lastCpuUsage = { user: cpuUsage.user, system: cpuUsage.system }; this.lastCpuCheck = Date.now(); } /** * Get current resource usage */ private getResourceStats(): ResourceStats { const memUsage = process.memoryUsage(); const heapUsedMb = memUsage.heapUsed / 1024 / 1024; const heapTotalMb = memUsage.heapTotal / 1024 / 1024; const memoryPercent = heapUsedMb / heapTotalMb; // Calculate CPU usage since last check const cpuUsage = process.cpuUsage(); const now = Date.now(); const elapsed = now - this.lastCpuCheck; let cpuPercent = 0; if (elapsed > 0) { const userDiff = (cpuUsage.user - this.lastCpuUsage.user) / 1000; // microseconds to ms const systemDiff = (cpuUsage.system - this.lastCpuUsage.system) / 1000; cpuPercent = ((userDiff + systemDiff) / elapsed) * 100; } // Update last values this.lastCpuUsage = { user: cpuUsage.user, system: cpuUsage.system }; this.lastCpuCheck = now; return { memoryPercent, memoryMb: Math.round(heapUsedMb), memoryTotalMb: Math.round(heapTotalMb), cpuPercent: Math.min(100, cpuPercent), // Cap at 100% isBackingOff: this.isBackingOff, backoffReason: this.backoffReason, }; } /** * Check if we should back off from taking new tasks */ private shouldBackOff(): { backoff: boolean; reason: string | null } { const stats = this.getResourceStats(); if (stats.memoryPercent > MEMORY_BACKOFF_THRESHOLD) { return { backoff: true, reason: `Memory at ${(stats.memoryPercent * 100).toFixed(1)}% (threshold: ${MEMORY_BACKOFF_THRESHOLD * 100}%)` }; } if (stats.cpuPercent > CPU_BACKOFF_THRESHOLD * 100) { return { backoff: true, reason: `CPU at ${stats.cpuPercent.toFixed(1)}% (threshold: ${CPU_BACKOFF_THRESHOLD * 100}%)` }; } return { backoff: false, reason: null }; } /** * Get count of currently running tasks */ get activeTaskCount(): number { return this.activeTasks.size; } /** * Check if we can accept more tasks */ private canAcceptMoreTasks(): boolean { return this.activeTasks.size < this.maxConcurrentTasks; } /** * Initialize stealth systems (proxy rotation, fingerprints) * Called once on worker startup before processing any tasks. * * IMPORTANT: Proxies are REQUIRED. Workers will wait until proxies are available. * Workers listen for PostgreSQL NOTIFY 'proxy_added' to wake up immediately when proxies are added. */ private async initializeStealth(): Promise { const MAX_WAIT_MINUTES = 60; const POLL_INTERVAL_MS = 30000; // 30 seconds fallback polling const maxAttempts = (MAX_WAIT_MINUTES * 60 * 1000) / POLL_INTERVAL_MS; let attempts = 0; let notifyClient: any = null; // Set up PostgreSQL LISTEN for proxy notifications try { notifyClient = await this.pool.connect(); await notifyClient.query('LISTEN proxy_added'); console.log(`[TaskWorker] Listening for proxy_added notifications...`); } catch (err: any) { console.log(`[TaskWorker] Could not set up LISTEN (will poll): ${err.message}`); } // Create a promise that resolves when notified let notifyResolve: (() => void) | null = null; if (notifyClient) { notifyClient.on('notification', (msg: any) => { if (msg.channel === 'proxy_added') { console.log(`[TaskWorker] Received proxy_added notification!`); if (notifyResolve) notifyResolve(); } }); } try { while (attempts < maxAttempts) { try { // Load proxies from database await this.crawlRotator.initialize(); const stats = this.crawlRotator.proxy.getStats(); if (stats.activeProxies > 0) { console.log(`[TaskWorker] Loaded ${stats.activeProxies} proxies (${stats.avgSuccessRate.toFixed(1)}% avg success rate)`); // Wire rotator to Dutchie client - proxies will be used for ALL requests setCrawlRotator(this.crawlRotator); console.log(`[TaskWorker] Stealth initialized: ${this.crawlRotator.userAgent.getCount()} fingerprints, proxy REQUIRED for all requests`); return; } attempts++; console.log(`[TaskWorker] No active proxies available (attempt ${attempts}). Waiting for proxies...`); // Wait for either notification or timeout await new Promise((resolve) => { notifyResolve = resolve; setTimeout(resolve, POLL_INTERVAL_MS); }); } catch (error: any) { attempts++; console.log(`[TaskWorker] Error loading proxies (attempt ${attempts}): ${error.message}. Retrying...`); await this.sleep(POLL_INTERVAL_MS); } } throw new Error(`No active proxies available after waiting ${MAX_WAIT_MINUTES} minutes. Add proxies to the database.`); } finally { // Clean up LISTEN connection if (notifyClient) { try { await notifyClient.query('UNLISTEN proxy_added'); notifyClient.release(); } catch { // Ignore cleanup errors } } } } /** * Register worker with the registry (get friendly name) */ private async register(): Promise { try { const response = await fetch(`${API_BASE_URL}/api/worker-registry/register`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ role: this.role, worker_id: this.workerId, pod_name: process.env.POD_NAME || process.env.HOSTNAME, hostname: os.hostname(), metadata: { pid: process.pid, node_version: process.version, started_at: new Date().toISOString() } }) }); const data = await response.json(); if (data.success) { this.friendlyName = data.friendly_name; console.log(`[TaskWorker] ${data.message}`); } else { console.warn(`[TaskWorker] Registration warning: ${data.error}`); this.friendlyName = this.workerId.slice(0, 12); } } catch (error: any) { // Registration is optional - worker can still function without it console.warn(`[TaskWorker] Could not register with API (will continue): ${error.message}`); this.friendlyName = this.workerId.slice(0, 12); } } /** * Deregister worker from the registry */ private async deregister(): Promise { try { await fetch(`${API_BASE_URL}/api/worker-registry/deregister`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ worker_id: this.workerId }) }); console.log(`[TaskWorker] ${this.friendlyName} signed off`); } catch { // Ignore deregistration errors } } /** * Send heartbeat to registry with resource usage and proxy location */ private async sendRegistryHeartbeat(): Promise { try { const memUsage = process.memoryUsage(); const cpuUsage = process.cpuUsage(); const proxyLocation = this.crawlRotator.getProxyLocation(); const resourceStats = this.getResourceStats(); // Get array of active task IDs const activeTaskIds = Array.from(this.activeTasks.keys()); await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ worker_id: this.workerId, current_task_id: activeTaskIds[0] || null, // Primary task for backwards compat current_task_ids: activeTaskIds, // All active tasks active_task_count: this.activeTasks.size, max_concurrent_tasks: this.maxConcurrentTasks, status: this.activeTasks.size > 0 ? 'active' : 'idle', resources: { memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024), memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024), memory_rss_mb: Math.round(memUsage.rss / 1024 / 1024), memory_percent: Math.round(resourceStats.memoryPercent * 100), cpu_user_ms: Math.round(cpuUsage.user / 1000), cpu_system_ms: Math.round(cpuUsage.system / 1000), cpu_percent: Math.round(resourceStats.cpuPercent), proxy_location: proxyLocation, is_backing_off: this.isBackingOff, backoff_reason: this.backoffReason, } }) }); } catch { // Ignore heartbeat errors } } /** * Report task completion to registry */ private async reportTaskCompletion(success: boolean): Promise { try { await fetch(`${API_BASE_URL}/api/worker-registry/task-completed`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ worker_id: this.workerId, success }) }); } catch { // Ignore errors } } /** * Start registry heartbeat interval */ private startRegistryHeartbeat(): void { this.registryHeartbeatInterval = setInterval(async () => { await this.sendRegistryHeartbeat(); }, HEARTBEAT_INTERVAL_MS); } /** * Stop registry heartbeat interval */ private stopRegistryHeartbeat(): void { if (this.registryHeartbeatInterval) { clearInterval(this.registryHeartbeatInterval); this.registryHeartbeatInterval = null; } } /** * Start the worker loop */ async start(): Promise { this.isRunning = true; // Initialize stealth systems (proxy rotation, fingerprints) await this.initializeStealth(); // Register with the API to get a friendly name await this.register(); // Start registry heartbeat this.startRegistryHeartbeat(); const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)'; console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (max ${this.maxConcurrentTasks} concurrent tasks)`); while (this.isRunning) { try { await this.mainLoop(); } catch (error: any) { console.error(`[TaskWorker] Loop error:`, error.message); await this.sleep(POLL_INTERVAL_MS); } } // Wait for any remaining tasks to complete if (this.taskPromises.size > 0) { console.log(`[TaskWorker] Waiting for ${this.taskPromises.size} active tasks to complete...`); await Promise.allSettled(this.taskPromises.values()); } console.log(`[TaskWorker] Worker ${this.workerId} stopped`); } /** * Main loop - tries to fill up to maxConcurrentTasks */ private async mainLoop(): Promise { // Check resource usage and backoff if needed const { backoff, reason } = this.shouldBackOff(); if (backoff) { if (!this.isBackingOff) { console.log(`[TaskWorker] ${this.friendlyName} backing off: ${reason}`); } this.isBackingOff = true; this.backoffReason = reason; await this.sleep(BACKOFF_DURATION_MS); return; } // Clear backoff state if (this.isBackingOff) { console.log(`[TaskWorker] ${this.friendlyName} resuming normal operation`); this.isBackingOff = false; this.backoffReason = null; } // Check for decommission signal const shouldDecommission = await this.checkDecommission(); if (shouldDecommission) { console.log(`[TaskWorker] ${this.friendlyName} received decommission signal - waiting for ${this.activeTasks.size} tasks to complete`); // Stop accepting new tasks, wait for current to finish this.isRunning = false; return; } // Try to claim more tasks if we have capacity if (this.canAcceptMoreTasks()) { const task = await taskService.claimTask(this.role, this.workerId); if (task) { console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`); this.activeTasks.set(task.id, task); // Start task in background (don't await) const taskPromise = this.executeTask(task); this.taskPromises.set(task.id, taskPromise); // Clean up when done taskPromise.finally(() => { this.activeTasks.delete(task.id); this.taskPromises.delete(task.id); }); // Immediately try to claim more tasks (don't wait for poll interval) return; } } // No task claimed or at capacity - wait before next poll await this.sleep(POLL_INTERVAL_MS); } /** * Stop the worker */ async stop(): Promise { this.isRunning = false; this.stopHeartbeat(); this.stopRegistryHeartbeat(); await this.deregister(); console.log(`[TaskWorker] ${this.friendlyName} stopped`); } /** * Execute a single task (runs concurrently with other tasks) */ private async executeTask(task: WorkerTask): Promise { console.log(`[TaskWorker] ${this.friendlyName} starting task ${task.id} (${task.role}) for dispensary ${task.dispensary_id || 'N/A'}`); try { // Mark as running await taskService.startTask(task.id); // Get handler for this role const handler = TASK_HANDLERS[task.role]; if (!handler) { throw new Error(`No handler registered for role: ${task.role}`); } // Create context const ctx: TaskContext = { pool: this.pool, workerId: this.workerId, task, heartbeat: async () => { await taskService.heartbeat(task.id); }, }; // Execute the task const result = await handler(ctx); if (result.success) { // Mark as completed await taskService.completeTask(task.id, result); await this.reportTaskCompletion(true); console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id} [${this.activeTasks.size}/${this.maxConcurrentTasks} active]`); // Chain next task if applicable const chainedTask = await taskService.chainNextTask({ ...task, status: 'completed', result, }); if (chainedTask) { console.log(`[TaskWorker] Chained new task ${chainedTask.id} (${chainedTask.role})`); } } else { // Mark as failed await taskService.failTask(task.id, result.error || 'Unknown error'); await this.reportTaskCompletion(false); console.log(`[TaskWorker] ${this.friendlyName} failed task ${task.id}: ${result.error}`); } } catch (error: any) { // Mark as failed await taskService.failTask(task.id, error.message); await this.reportTaskCompletion(false); console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} error:`, error.message); } // Note: cleanup (removing from activeTasks) is handled in mainLoop's finally block } /** * Check if this worker has been flagged for decommission * Returns true if worker should stop after current task */ private async checkDecommission(): Promise { try { // Check worker_registry for decommission flag const result = await this.pool.query( `SELECT decommission_requested, decommission_reason FROM worker_registry WHERE worker_id = $1`, [this.workerId] ); if (result.rows.length > 0 && result.rows[0].decommission_requested) { const reason = result.rows[0].decommission_reason || 'No reason provided'; console.log(`[TaskWorker] Decommission requested: ${reason}`); return true; } return false; } catch (error: any) { // If we can't check, continue running console.warn(`[TaskWorker] Could not check decommission status: ${error.message}`); return false; } } /** * Start heartbeat interval */ private startHeartbeat(taskId: number): void { this.heartbeatInterval = setInterval(async () => { try { await taskService.heartbeat(taskId); } catch (error: any) { console.warn(`[TaskWorker] Heartbeat failed:`, error.message); } }, HEARTBEAT_INTERVAL_MS); } /** * Stop heartbeat interval */ private stopHeartbeat(): void { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; } } /** * Sleep helper */ private sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } /** * Get worker info */ getInfo(): { workerId: string; role: TaskRole | null; isRunning: boolean; activeTaskIds: number[]; activeTaskCount: number; maxConcurrentTasks: number; isBackingOff: boolean; backoffReason: string | null; } { return { workerId: this.workerId, role: this.role, isRunning: this.isRunning, activeTaskIds: Array.from(this.activeTasks.keys()), activeTaskCount: this.activeTasks.size, maxConcurrentTasks: this.maxConcurrentTasks, isBackingOff: this.isBackingOff, backoffReason: this.backoffReason, }; } } // ============================================================ // CLI ENTRY POINT // ============================================================ async function main(): Promise { const role = process.env.WORKER_ROLE as TaskRole | undefined; // Per TASK_WORKFLOW_2024-12-10.md: Valid task roles const validRoles: TaskRole[] = [ 'store_discovery', 'entry_point_discovery', 'product_discovery', 'payload_fetch', // NEW: Fetches from API, saves to disk 'product_refresh', // CHANGED: Reads from disk, processes to DB 'analytics_refresh', ]; // If role specified, validate it if (role && !validRoles.includes(role)) { console.error(`Error: Invalid WORKER_ROLE: ${role}`); console.error(`Valid roles: ${validRoles.join(', ')}`); console.error('Or omit WORKER_ROLE for role-agnostic worker (any task)'); process.exit(1); } const workerId = process.env.WORKER_ID; // Pass null for role-agnostic, or the specific role const worker = new TaskWorker(role || null, workerId); // Handle graceful shutdown process.on('SIGTERM', () => { console.log('[TaskWorker] Received SIGTERM, shutting down...'); worker.stop(); }); process.on('SIGINT', () => { console.log('[TaskWorker] Received SIGINT, shutting down...'); worker.stop(); }); await worker.start(); } // Run if this is the main module if (require.main === module) { main().catch((error) => { console.error('[TaskWorker] Fatal error:', error); process.exit(1); }); } export { main };