- Remove cache_from/cache_to from CI (plugin bug splitting commas) - Add preflight() method to CrawlRotator - tests proxy + anti-detect - Add pre-task preflight check - workers MUST pass before executing - Add releaseTask() to release tasks back to pending on preflight fail - Rename proxy_test task to whoami for clarity 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
807 lines
28 KiB
TypeScript
807 lines
28 KiB
TypeScript
/**
|
|
* Task Worker
|
|
*
|
|
* A unified worker that pulls tasks from the worker_tasks queue.
|
|
* Workers register on startup, get a friendly name, and pull tasks.
|
|
*
|
|
* Architecture:
|
|
* - Tasks are generated on schedule (by scheduler or API)
|
|
* - Workers PULL tasks from the pool (not assigned to them)
|
|
* - Tasks are claimed in order of priority (DESC) then creation time (ASC)
|
|
* - Workers report heartbeats to worker_registry
|
|
* - Workers are ROLE-AGNOSTIC by default (can handle any task type)
|
|
*
|
|
* Stealth & Anti-Detection:
|
|
* PROXIES ARE REQUIRED - workers will fail to start if no proxies available.
|
|
*
|
|
* On startup, workers initialize the CrawlRotator which provides:
|
|
* - Proxy rotation: Loads proxies from `proxies` table, ALL requests use proxy
|
|
* - User-Agent rotation: Cycles through realistic browser fingerprints
|
|
* - Fingerprint rotation: Changes browser profile on blocks
|
|
* - Locale/timezone: Matches Accept-Language to target state
|
|
*
|
|
* The CrawlRotator is wired to the Dutchie client via setCrawlRotator().
|
|
* Task handlers call startSession() which picks a random fingerprint.
|
|
* On 403 errors, the client automatically:
|
|
* 1. Records failure on current proxy
|
|
* 2. Rotates to next proxy
|
|
* 3. Rotates fingerprint
|
|
* 4. Retries the request
|
|
*
|
|
* Usage:
|
|
* npx tsx src/tasks/task-worker.ts # Role-agnostic (any task)
|
|
* WORKER_ROLE=product_refresh npx tsx src/tasks/task-worker.ts # Role-specific
|
|
*
|
|
* Environment:
|
|
* WORKER_ROLE - Which task role to process (optional, null = any task)
|
|
* WORKER_ID - Optional custom worker ID (auto-generated if not provided)
|
|
* POD_NAME - Kubernetes pod name (optional)
|
|
* POLL_INTERVAL_MS - How often to check for tasks (default: 5000)
|
|
* HEARTBEAT_INTERVAL_MS - How often to update heartbeat (default: 30000)
|
|
* API_BASE_URL - Backend API URL for registration (default: http://localhost:3010)
|
|
*/
|
|
|
|
import { Pool } from 'pg';
|
|
import { v4 as uuidv4 } from 'uuid';
|
|
import { taskService, TaskRole, WorkerTask } from './task-service';
|
|
import { getPool } from '../db/pool';
|
|
import os from 'os';
|
|
|
|
// Stealth/rotation support
|
|
import { CrawlRotator } from '../services/crawl-rotator';
|
|
import { setCrawlRotator } from '../platforms/dutchie';
|
|
|
|
// Task handlers by role
|
|
// Per TASK_WORKFLOW_2024-12-10.md: payload_fetch and product_refresh are now separate
|
|
import { handlePayloadFetch } from './handlers/payload-fetch';
|
|
import { handleProductRefresh } from './handlers/product-refresh';
|
|
import { handleProductDiscovery } from './handlers/product-discovery';
|
|
import { handleStoreDiscovery } from './handlers/store-discovery';
|
|
import { handleEntryPointDiscovery } from './handlers/entry-point-discovery';
|
|
import { handleAnalyticsRefresh } from './handlers/analytics-refresh';
|
|
import { handleWhoami } from './handlers/whoami';
|
|
|
|
const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000');
|
|
const HEARTBEAT_INTERVAL_MS = parseInt(process.env.HEARTBEAT_INTERVAL_MS || '30000');
|
|
const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010';
|
|
|
|
// =============================================================================
|
|
// CONCURRENT TASK PROCESSING SETTINGS
|
|
// =============================================================================
|
|
// Workers can process multiple tasks simultaneously using async I/O.
|
|
// This improves throughput for I/O-bound tasks (network calls, DB queries).
|
|
//
|
|
// Resource thresholds trigger "backoff" - the worker stops claiming new tasks
|
|
// but continues processing existing ones until resources return to normal.
|
|
//
|
|
// See: docs/WORKER_TASK_ARCHITECTURE.md#concurrent-task-processing
|
|
// =============================================================================
|
|
|
|
// Maximum number of tasks this worker will run concurrently
|
|
// Tune based on workload: I/O-bound tasks benefit from higher concurrency
|
|
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
|
|
|
|
// When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
|
|
// Default 85% - gives headroom before OOM
|
|
const MEMORY_BACKOFF_THRESHOLD = parseFloat(process.env.MEMORY_BACKOFF_THRESHOLD || '0.85');
|
|
|
|
// Parse max heap size from NODE_OPTIONS (--max-old-space-size=1500)
|
|
// This is used as the denominator for memory percentage calculation
|
|
// V8's heapTotal is dynamic and stays small when idle, causing false high percentages
|
|
function getMaxHeapSizeMb(): number {
|
|
const nodeOptions = process.env.NODE_OPTIONS || '';
|
|
const match = nodeOptions.match(/--max-old-space-size=(\d+)/);
|
|
if (match) {
|
|
return parseInt(match[1], 10);
|
|
}
|
|
// Fallback: use 512MB if not specified
|
|
return 512;
|
|
}
|
|
const MAX_HEAP_SIZE_MB = getMaxHeapSizeMb();
|
|
|
|
// When CPU usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
|
|
// Default 90% - allows some burst capacity
|
|
const CPU_BACKOFF_THRESHOLD = parseFloat(process.env.CPU_BACKOFF_THRESHOLD || '0.90');
|
|
|
|
// How long to wait (ms) when in backoff state before rechecking resources
|
|
const BACKOFF_DURATION_MS = parseInt(process.env.BACKOFF_DURATION_MS || '10000');
|
|
|
|
export interface TaskContext {
|
|
pool: Pool;
|
|
workerId: string;
|
|
task: WorkerTask;
|
|
heartbeat: () => Promise<void>;
|
|
crawlRotator?: CrawlRotator;
|
|
}
|
|
|
|
export interface TaskResult {
|
|
success: boolean;
|
|
productsProcessed?: number;
|
|
snapshotsCreated?: number;
|
|
storesDiscovered?: number;
|
|
error?: string;
|
|
[key: string]: unknown;
|
|
}
|
|
|
|
type TaskHandler = (ctx: TaskContext) => Promise<TaskResult>;
|
|
|
|
// Per TASK_WORKFLOW_2024-12-10.md: Handler registry
|
|
// payload_fetch: Fetches from Dutchie API, saves to disk, chains to product_refresh
|
|
// product_refresh: Reads local payload, normalizes, upserts to DB
|
|
const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
|
|
payload_fetch: handlePayloadFetch, // NEW: API fetch -> disk
|
|
product_refresh: handleProductRefresh, // CHANGED: disk -> DB
|
|
product_discovery: handleProductDiscovery,
|
|
store_discovery: handleStoreDiscovery,
|
|
entry_point_discovery: handleEntryPointDiscovery,
|
|
analytics_refresh: handleAnalyticsRefresh,
|
|
whoami: handleWhoami, // Tests proxy + anti-detect
|
|
};
|
|
|
|
/**
|
|
* Resource usage stats reported to the registry and used for backoff decisions.
|
|
* These values are included in worker heartbeats and displayed in the UI.
|
|
*/
|
|
interface ResourceStats {
|
|
/** Current heap memory usage as decimal (0.0 to 1.0) */
|
|
memoryPercent: number;
|
|
/** Current heap used in MB */
|
|
memoryMb: number;
|
|
/** Total heap available in MB */
|
|
memoryTotalMb: number;
|
|
/** CPU usage percentage since last check (0 to 100) */
|
|
cpuPercent: number;
|
|
/** True if worker is currently in backoff state */
|
|
isBackingOff: boolean;
|
|
/** Reason for backoff (e.g., "Memory at 87.3% (threshold: 85%)") */
|
|
backoffReason: string | null;
|
|
}
|
|
|
|
export class TaskWorker {
|
|
private pool: Pool;
|
|
private workerId: string;
|
|
private role: TaskRole | null; // null = role-agnostic (any task)
|
|
private friendlyName: string = '';
|
|
private isRunning: boolean = false;
|
|
private heartbeatInterval: NodeJS.Timeout | null = null;
|
|
private registryHeartbeatInterval: NodeJS.Timeout | null = null;
|
|
private crawlRotator: CrawlRotator;
|
|
|
|
// ==========================================================================
|
|
// CONCURRENT TASK TRACKING
|
|
// ==========================================================================
|
|
// activeTasks: Map of task ID -> task object for all currently running tasks
|
|
// taskPromises: Map of task ID -> Promise for cleanup when task completes
|
|
// maxConcurrentTasks: How many tasks this worker will run in parallel
|
|
// ==========================================================================
|
|
private activeTasks: Map<number, WorkerTask> = new Map();
|
|
private taskPromises: Map<number, Promise<void>> = new Map();
|
|
private maxConcurrentTasks: number = MAX_CONCURRENT_TASKS;
|
|
|
|
// ==========================================================================
|
|
// RESOURCE MONITORING FOR BACKOFF
|
|
// ==========================================================================
|
|
// CPU tracking uses differential measurement - we track last values and
|
|
// calculate percentage based on elapsed time since last check.
|
|
// ==========================================================================
|
|
private lastCpuUsage: { user: number; system: number } = { user: 0, system: 0 };
|
|
private lastCpuCheck: number = Date.now();
|
|
private isBackingOff: boolean = false;
|
|
private backoffReason: string | null = null;
|
|
|
|
constructor(role: TaskRole | null = null, workerId?: string) {
|
|
this.pool = getPool();
|
|
this.role = role;
|
|
this.workerId = workerId || `worker-${uuidv4().slice(0, 8)}`;
|
|
this.crawlRotator = new CrawlRotator(this.pool);
|
|
|
|
// Initialize CPU tracking
|
|
const cpuUsage = process.cpuUsage();
|
|
this.lastCpuUsage = { user: cpuUsage.user, system: cpuUsage.system };
|
|
this.lastCpuCheck = Date.now();
|
|
}
|
|
|
|
/**
|
|
* Get current resource usage
|
|
* Memory percentage is calculated against MAX_HEAP_SIZE_MB (from --max-old-space-size)
|
|
* NOT against V8's dynamic heapTotal which stays small when idle
|
|
*/
|
|
private getResourceStats(): ResourceStats {
|
|
const memUsage = process.memoryUsage();
|
|
const heapUsedMb = memUsage.heapUsed / 1024 / 1024;
|
|
// Use MAX_HEAP_SIZE_MB as ceiling, not dynamic heapTotal
|
|
// V8's heapTotal stays small when idle (e.g., 36MB) causing false 95%+ readings
|
|
// With --max-old-space-size=1500, we should calculate against 1500MB
|
|
const memoryPercent = heapUsedMb / MAX_HEAP_SIZE_MB;
|
|
|
|
// Calculate CPU usage since last check
|
|
const cpuUsage = process.cpuUsage();
|
|
const now = Date.now();
|
|
const elapsed = now - this.lastCpuCheck;
|
|
|
|
let cpuPercent = 0;
|
|
if (elapsed > 0) {
|
|
const userDiff = (cpuUsage.user - this.lastCpuUsage.user) / 1000; // microseconds to ms
|
|
const systemDiff = (cpuUsage.system - this.lastCpuUsage.system) / 1000;
|
|
cpuPercent = ((userDiff + systemDiff) / elapsed) * 100;
|
|
}
|
|
|
|
// Update last values
|
|
this.lastCpuUsage = { user: cpuUsage.user, system: cpuUsage.system };
|
|
this.lastCpuCheck = now;
|
|
|
|
return {
|
|
memoryPercent,
|
|
memoryMb: Math.round(heapUsedMb),
|
|
memoryTotalMb: MAX_HEAP_SIZE_MB, // Use max-old-space-size, not dynamic heapTotal
|
|
cpuPercent: Math.min(100, cpuPercent), // Cap at 100%
|
|
isBackingOff: this.isBackingOff,
|
|
backoffReason: this.backoffReason,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Check if we should back off from taking new tasks
|
|
*/
|
|
private shouldBackOff(): { backoff: boolean; reason: string | null } {
|
|
const stats = this.getResourceStats();
|
|
|
|
if (stats.memoryPercent > MEMORY_BACKOFF_THRESHOLD) {
|
|
return { backoff: true, reason: `Memory at ${(stats.memoryPercent * 100).toFixed(1)}% (threshold: ${MEMORY_BACKOFF_THRESHOLD * 100}%)` };
|
|
}
|
|
|
|
if (stats.cpuPercent > CPU_BACKOFF_THRESHOLD * 100) {
|
|
return { backoff: true, reason: `CPU at ${stats.cpuPercent.toFixed(1)}% (threshold: ${CPU_BACKOFF_THRESHOLD * 100}%)` };
|
|
}
|
|
|
|
return { backoff: false, reason: null };
|
|
}
|
|
|
|
/**
|
|
* Get count of currently running tasks
|
|
*/
|
|
get activeTaskCount(): number {
|
|
return this.activeTasks.size;
|
|
}
|
|
|
|
/**
|
|
* Check if we can accept more tasks
|
|
*/
|
|
private canAcceptMoreTasks(): boolean {
|
|
return this.activeTasks.size < this.maxConcurrentTasks;
|
|
}
|
|
|
|
/**
|
|
* Initialize stealth systems (proxy rotation, fingerprints)
|
|
* Called once on worker startup before processing any tasks.
|
|
*
|
|
* IMPORTANT: Proxies are REQUIRED. Workers will wait until proxies are available.
|
|
* Workers listen for PostgreSQL NOTIFY 'proxy_added' to wake up immediately when proxies are added.
|
|
*/
|
|
private async initializeStealth(): Promise<void> {
|
|
const MAX_WAIT_MINUTES = 60;
|
|
const POLL_INTERVAL_MS = 30000; // 30 seconds fallback polling
|
|
const maxAttempts = (MAX_WAIT_MINUTES * 60 * 1000) / POLL_INTERVAL_MS;
|
|
let attempts = 0;
|
|
let notifyClient: any = null;
|
|
|
|
// Set up PostgreSQL LISTEN for proxy notifications
|
|
try {
|
|
notifyClient = await this.pool.connect();
|
|
await notifyClient.query('LISTEN proxy_added');
|
|
console.log(`[TaskWorker] Listening for proxy_added notifications...`);
|
|
} catch (err: any) {
|
|
console.log(`[TaskWorker] Could not set up LISTEN (will poll): ${err.message}`);
|
|
}
|
|
|
|
// Create a promise that resolves when notified
|
|
let notifyResolve: (() => void) | null = null;
|
|
if (notifyClient) {
|
|
notifyClient.on('notification', (msg: any) => {
|
|
if (msg.channel === 'proxy_added') {
|
|
console.log(`[TaskWorker] Received proxy_added notification!`);
|
|
if (notifyResolve) notifyResolve();
|
|
}
|
|
});
|
|
}
|
|
|
|
try {
|
|
while (attempts < maxAttempts) {
|
|
try {
|
|
// Load proxies from database
|
|
await this.crawlRotator.initialize();
|
|
|
|
const stats = this.crawlRotator.proxy.getStats();
|
|
if (stats.activeProxies > 0) {
|
|
console.log(`[TaskWorker] Loaded ${stats.activeProxies} proxies (${stats.avgSuccessRate.toFixed(1)}% avg success rate)`);
|
|
|
|
// Wire rotator to Dutchie client - proxies will be used for ALL requests
|
|
setCrawlRotator(this.crawlRotator);
|
|
|
|
console.log(`[TaskWorker] Stealth initialized: ${this.crawlRotator.userAgent.getCount()} fingerprints, proxy REQUIRED for all requests`);
|
|
return;
|
|
}
|
|
|
|
attempts++;
|
|
console.log(`[TaskWorker] No active proxies available (attempt ${attempts}). Waiting for proxies...`);
|
|
|
|
// Wait for either notification or timeout
|
|
await new Promise<void>((resolve) => {
|
|
notifyResolve = resolve;
|
|
setTimeout(resolve, POLL_INTERVAL_MS);
|
|
});
|
|
} catch (error: any) {
|
|
attempts++;
|
|
console.log(`[TaskWorker] Error loading proxies (attempt ${attempts}): ${error.message}. Retrying...`);
|
|
await this.sleep(POLL_INTERVAL_MS);
|
|
}
|
|
}
|
|
|
|
throw new Error(`No active proxies available after waiting ${MAX_WAIT_MINUTES} minutes. Add proxies to the database.`);
|
|
} finally {
|
|
// Clean up LISTEN connection
|
|
if (notifyClient) {
|
|
try {
|
|
await notifyClient.query('UNLISTEN proxy_added');
|
|
notifyClient.release();
|
|
} catch {
|
|
// Ignore cleanup errors
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Register worker with the registry (get friendly name)
|
|
*/
|
|
private async register(): Promise<void> {
|
|
try {
|
|
const response = await fetch(`${API_BASE_URL}/api/worker-registry/register`, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({
|
|
role: this.role,
|
|
worker_id: this.workerId,
|
|
pod_name: process.env.POD_NAME || process.env.HOSTNAME,
|
|
hostname: os.hostname(),
|
|
metadata: {
|
|
pid: process.pid,
|
|
node_version: process.version,
|
|
started_at: new Date().toISOString()
|
|
}
|
|
})
|
|
});
|
|
|
|
const data = await response.json();
|
|
if (data.success) {
|
|
this.friendlyName = data.friendly_name;
|
|
console.log(`[TaskWorker] ${data.message}`);
|
|
} else {
|
|
console.warn(`[TaskWorker] Registration warning: ${data.error}`);
|
|
this.friendlyName = this.workerId.slice(0, 12);
|
|
}
|
|
} catch (error: any) {
|
|
// Registration is optional - worker can still function without it
|
|
console.warn(`[TaskWorker] Could not register with API (will continue): ${error.message}`);
|
|
this.friendlyName = this.workerId.slice(0, 12);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Deregister worker from the registry
|
|
*/
|
|
private async deregister(): Promise<void> {
|
|
try {
|
|
await fetch(`${API_BASE_URL}/api/worker-registry/deregister`, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({ worker_id: this.workerId })
|
|
});
|
|
console.log(`[TaskWorker] ${this.friendlyName} signed off`);
|
|
} catch {
|
|
// Ignore deregistration errors
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send heartbeat to registry with resource usage and proxy location
|
|
*/
|
|
private async sendRegistryHeartbeat(): Promise<void> {
|
|
try {
|
|
const memUsage = process.memoryUsage();
|
|
const cpuUsage = process.cpuUsage();
|
|
const proxyLocation = this.crawlRotator.getProxyLocation();
|
|
const resourceStats = this.getResourceStats();
|
|
|
|
// Get array of active task IDs
|
|
const activeTaskIds = Array.from(this.activeTasks.keys());
|
|
|
|
await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({
|
|
worker_id: this.workerId,
|
|
current_task_id: activeTaskIds[0] || null, // Primary task for backwards compat
|
|
current_task_ids: activeTaskIds, // All active tasks
|
|
active_task_count: this.activeTasks.size,
|
|
max_concurrent_tasks: this.maxConcurrentTasks,
|
|
status: this.activeTasks.size > 0 ? 'active' : 'idle',
|
|
resources: {
|
|
memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024),
|
|
memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024),
|
|
memory_rss_mb: Math.round(memUsage.rss / 1024 / 1024),
|
|
memory_percent: Math.round(resourceStats.memoryPercent * 100),
|
|
cpu_user_ms: Math.round(cpuUsage.user / 1000),
|
|
cpu_system_ms: Math.round(cpuUsage.system / 1000),
|
|
cpu_percent: Math.round(resourceStats.cpuPercent),
|
|
proxy_location: proxyLocation,
|
|
is_backing_off: this.isBackingOff,
|
|
backoff_reason: this.backoffReason,
|
|
}
|
|
})
|
|
});
|
|
} catch {
|
|
// Ignore heartbeat errors
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Report task completion to registry
|
|
*/
|
|
private async reportTaskCompletion(success: boolean): Promise<void> {
|
|
try {
|
|
await fetch(`${API_BASE_URL}/api/worker-registry/task-completed`, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify({
|
|
worker_id: this.workerId,
|
|
success
|
|
})
|
|
});
|
|
} catch {
|
|
// Ignore errors
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start registry heartbeat interval
|
|
*/
|
|
private startRegistryHeartbeat(): void {
|
|
this.registryHeartbeatInterval = setInterval(async () => {
|
|
await this.sendRegistryHeartbeat();
|
|
}, HEARTBEAT_INTERVAL_MS);
|
|
}
|
|
|
|
/**
|
|
* Stop registry heartbeat interval
|
|
*/
|
|
private stopRegistryHeartbeat(): void {
|
|
if (this.registryHeartbeatInterval) {
|
|
clearInterval(this.registryHeartbeatInterval);
|
|
this.registryHeartbeatInterval = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start the worker loop
|
|
*/
|
|
async start(): Promise<void> {
|
|
this.isRunning = true;
|
|
|
|
// Initialize stealth systems (proxy rotation, fingerprints)
|
|
await this.initializeStealth();
|
|
|
|
// Register with the API to get a friendly name
|
|
await this.register();
|
|
|
|
// Start registry heartbeat
|
|
this.startRegistryHeartbeat();
|
|
|
|
const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)';
|
|
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (max ${this.maxConcurrentTasks} concurrent tasks)`);
|
|
|
|
while (this.isRunning) {
|
|
try {
|
|
await this.mainLoop();
|
|
} catch (error: any) {
|
|
console.error(`[TaskWorker] Loop error:`, error.message);
|
|
await this.sleep(POLL_INTERVAL_MS);
|
|
}
|
|
}
|
|
|
|
// Wait for any remaining tasks to complete
|
|
if (this.taskPromises.size > 0) {
|
|
console.log(`[TaskWorker] Waiting for ${this.taskPromises.size} active tasks to complete...`);
|
|
await Promise.allSettled(this.taskPromises.values());
|
|
}
|
|
|
|
console.log(`[TaskWorker] Worker ${this.workerId} stopped`);
|
|
}
|
|
|
|
/**
|
|
* Main loop - tries to fill up to maxConcurrentTasks
|
|
*/
|
|
private async mainLoop(): Promise<void> {
|
|
// Check resource usage and backoff if needed
|
|
const { backoff, reason } = this.shouldBackOff();
|
|
if (backoff) {
|
|
if (!this.isBackingOff) {
|
|
console.log(`[TaskWorker] ${this.friendlyName} backing off: ${reason}`);
|
|
}
|
|
this.isBackingOff = true;
|
|
this.backoffReason = reason;
|
|
await this.sleep(BACKOFF_DURATION_MS);
|
|
return;
|
|
}
|
|
|
|
// Clear backoff state
|
|
if (this.isBackingOff) {
|
|
console.log(`[TaskWorker] ${this.friendlyName} resuming normal operation`);
|
|
this.isBackingOff = false;
|
|
this.backoffReason = null;
|
|
}
|
|
|
|
// Check for decommission signal
|
|
const shouldDecommission = await this.checkDecommission();
|
|
if (shouldDecommission) {
|
|
console.log(`[TaskWorker] ${this.friendlyName} received decommission signal - waiting for ${this.activeTasks.size} tasks to complete`);
|
|
// Stop accepting new tasks, wait for current to finish
|
|
this.isRunning = false;
|
|
return;
|
|
}
|
|
|
|
// Try to claim more tasks if we have capacity
|
|
if (this.canAcceptMoreTasks()) {
|
|
const task = await taskService.claimTask(this.role, this.workerId);
|
|
|
|
if (task) {
|
|
console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`);
|
|
|
|
// =================================================================
|
|
// PREFLIGHT CHECK - CRITICAL: Worker MUST pass before task execution
|
|
// Verifies: 1) Proxy available 2) Proxy connected 3) Anti-detect ready
|
|
// =================================================================
|
|
const preflight = await this.crawlRotator.preflight();
|
|
if (!preflight.passed) {
|
|
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${preflight.error}`);
|
|
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without proxy/anti-detect`);
|
|
|
|
// Release task back to pending so another worker can pick it up
|
|
await taskService.releaseTask(task.id);
|
|
|
|
// Wait before trying again - give proxies time to recover
|
|
await this.sleep(30000); // 30 second wait on preflight failure
|
|
return;
|
|
}
|
|
|
|
console.log(`[TaskWorker] ${this.friendlyName} preflight PASSED for task ${task.id} (proxy: ${preflight.proxyIp}, ${preflight.responseTimeMs}ms)`);
|
|
|
|
this.activeTasks.set(task.id, task);
|
|
|
|
// Start task in background (don't await)
|
|
const taskPromise = this.executeTask(task);
|
|
this.taskPromises.set(task.id, taskPromise);
|
|
|
|
// Clean up when done
|
|
taskPromise.finally(() => {
|
|
this.activeTasks.delete(task.id);
|
|
this.taskPromises.delete(task.id);
|
|
});
|
|
|
|
// Immediately try to claim more tasks (don't wait for poll interval)
|
|
return;
|
|
}
|
|
}
|
|
|
|
// No task claimed or at capacity - wait before next poll
|
|
await this.sleep(POLL_INTERVAL_MS);
|
|
}
|
|
|
|
/**
|
|
* Stop the worker
|
|
*/
|
|
async stop(): Promise<void> {
|
|
this.isRunning = false;
|
|
this.stopHeartbeat();
|
|
this.stopRegistryHeartbeat();
|
|
await this.deregister();
|
|
console.log(`[TaskWorker] ${this.friendlyName} stopped`);
|
|
}
|
|
|
|
/**
|
|
* Execute a single task (runs concurrently with other tasks)
|
|
*/
|
|
private async executeTask(task: WorkerTask): Promise<void> {
|
|
console.log(`[TaskWorker] ${this.friendlyName} starting task ${task.id} (${task.role}) for dispensary ${task.dispensary_id || 'N/A'}`);
|
|
|
|
try {
|
|
// Mark as running
|
|
await taskService.startTask(task.id);
|
|
|
|
// Get handler for this role
|
|
const handler = TASK_HANDLERS[task.role];
|
|
if (!handler) {
|
|
throw new Error(`No handler registered for role: ${task.role}`);
|
|
}
|
|
|
|
// Create context
|
|
const ctx: TaskContext = {
|
|
pool: this.pool,
|
|
workerId: this.workerId,
|
|
task,
|
|
heartbeat: async () => {
|
|
await taskService.heartbeat(task.id);
|
|
},
|
|
crawlRotator: this.crawlRotator,
|
|
};
|
|
|
|
// Execute the task
|
|
const result = await handler(ctx);
|
|
|
|
if (result.success) {
|
|
// Mark as completed
|
|
await taskService.completeTask(task.id, result);
|
|
await this.reportTaskCompletion(true);
|
|
console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id} [${this.activeTasks.size}/${this.maxConcurrentTasks} active]`);
|
|
|
|
// Chain next task if applicable
|
|
const chainedTask = await taskService.chainNextTask({
|
|
...task,
|
|
status: 'completed',
|
|
result,
|
|
});
|
|
if (chainedTask) {
|
|
console.log(`[TaskWorker] Chained new task ${chainedTask.id} (${chainedTask.role})`);
|
|
}
|
|
} else {
|
|
// Mark as failed
|
|
await taskService.failTask(task.id, result.error || 'Unknown error');
|
|
await this.reportTaskCompletion(false);
|
|
console.log(`[TaskWorker] ${this.friendlyName} failed task ${task.id}: ${result.error}`);
|
|
}
|
|
} catch (error: any) {
|
|
// Mark as failed
|
|
await taskService.failTask(task.id, error.message);
|
|
await this.reportTaskCompletion(false);
|
|
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} error:`, error.message);
|
|
}
|
|
// Note: cleanup (removing from activeTasks) is handled in mainLoop's finally block
|
|
}
|
|
|
|
/**
|
|
* Check if this worker has been flagged for decommission
|
|
* Returns true if worker should stop after current task
|
|
*/
|
|
private async checkDecommission(): Promise<boolean> {
|
|
try {
|
|
// Check worker_registry for decommission flag
|
|
const result = await this.pool.query(
|
|
`SELECT decommission_requested, decommission_reason
|
|
FROM worker_registry
|
|
WHERE worker_id = $1`,
|
|
[this.workerId]
|
|
);
|
|
|
|
if (result.rows.length > 0 && result.rows[0].decommission_requested) {
|
|
const reason = result.rows[0].decommission_reason || 'No reason provided';
|
|
console.log(`[TaskWorker] Decommission requested: ${reason}`);
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
} catch (error: any) {
|
|
// If we can't check, continue running
|
|
console.warn(`[TaskWorker] Could not check decommission status: ${error.message}`);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Start heartbeat interval
|
|
*/
|
|
private startHeartbeat(taskId: number): void {
|
|
this.heartbeatInterval = setInterval(async () => {
|
|
try {
|
|
await taskService.heartbeat(taskId);
|
|
} catch (error: any) {
|
|
console.warn(`[TaskWorker] Heartbeat failed:`, error.message);
|
|
}
|
|
}, HEARTBEAT_INTERVAL_MS);
|
|
}
|
|
|
|
/**
|
|
* Stop heartbeat interval
|
|
*/
|
|
private stopHeartbeat(): void {
|
|
if (this.heartbeatInterval) {
|
|
clearInterval(this.heartbeatInterval);
|
|
this.heartbeatInterval = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Sleep helper
|
|
*/
|
|
private sleep(ms: number): Promise<void> {
|
|
return new Promise((resolve) => setTimeout(resolve, ms));
|
|
}
|
|
|
|
/**
|
|
* Get worker info
|
|
*/
|
|
getInfo(): {
|
|
workerId: string;
|
|
role: TaskRole | null;
|
|
isRunning: boolean;
|
|
activeTaskIds: number[];
|
|
activeTaskCount: number;
|
|
maxConcurrentTasks: number;
|
|
isBackingOff: boolean;
|
|
backoffReason: string | null;
|
|
} {
|
|
return {
|
|
workerId: this.workerId,
|
|
role: this.role,
|
|
isRunning: this.isRunning,
|
|
activeTaskIds: Array.from(this.activeTasks.keys()),
|
|
activeTaskCount: this.activeTasks.size,
|
|
maxConcurrentTasks: this.maxConcurrentTasks,
|
|
isBackingOff: this.isBackingOff,
|
|
backoffReason: this.backoffReason,
|
|
};
|
|
}
|
|
}
|
|
|
|
// ============================================================
|
|
// CLI ENTRY POINT
|
|
// ============================================================
|
|
|
|
async function main(): Promise<void> {
|
|
const role = process.env.WORKER_ROLE as TaskRole | undefined;
|
|
|
|
// Per TASK_WORKFLOW_2024-12-10.md: Valid task roles
|
|
const validRoles: TaskRole[] = [
|
|
'store_discovery',
|
|
'entry_point_discovery',
|
|
'product_discovery',
|
|
'payload_fetch', // NEW: Fetches from API, saves to disk
|
|
'product_refresh', // CHANGED: Reads from disk, processes to DB
|
|
'analytics_refresh',
|
|
];
|
|
|
|
// If role specified, validate it
|
|
if (role && !validRoles.includes(role)) {
|
|
console.error(`Error: Invalid WORKER_ROLE: ${role}`);
|
|
console.error(`Valid roles: ${validRoles.join(', ')}`);
|
|
console.error('Or omit WORKER_ROLE for role-agnostic worker (any task)');
|
|
process.exit(1);
|
|
}
|
|
|
|
const workerId = process.env.WORKER_ID;
|
|
// Pass null for role-agnostic, or the specific role
|
|
const worker = new TaskWorker(role || null, workerId);
|
|
|
|
// Handle graceful shutdown
|
|
process.on('SIGTERM', () => {
|
|
console.log('[TaskWorker] Received SIGTERM, shutting down...');
|
|
worker.stop();
|
|
});
|
|
|
|
process.on('SIGINT', () => {
|
|
console.log('[TaskWorker] Received SIGINT, shutting down...');
|
|
worker.stop();
|
|
});
|
|
|
|
await worker.start();
|
|
}
|
|
|
|
// Run if this is the main module
|
|
if (require.main === module) {
|
|
main().catch((error) => {
|
|
console.error('[TaskWorker] Fatal error:', error);
|
|
process.exit(1);
|
|
});
|
|
}
|
|
|
|
export { main };
|