diff --git a/backend/migrations/112_worker_session_pool.sql b/backend/migrations/112_worker_session_pool.sql new file mode 100644 index 00000000..f4a126ba --- /dev/null +++ b/backend/migrations/112_worker_session_pool.sql @@ -0,0 +1,390 @@ +-- Migration 112: Worker Session Pool +-- Tracks IP/fingerprint sessions with exclusive locks and cooldowns +-- Each worker claims up to 6 tasks, uses one IP/fingerprint for those tasks, +-- then retires the session (8hr cooldown before IP can be reused) + +-- Drop old identity pool tables if they exist (replacing with simpler session model) +DROP TABLE IF EXISTS worker_identity_claims CASCADE; +DROP TABLE IF EXISTS worker_identities CASCADE; + +-- Worker sessions: tracks active and cooling down IP/fingerprint pairs +CREATE TABLE IF NOT EXISTS worker_sessions ( + id SERIAL PRIMARY KEY, + + -- IP and fingerprint for this session + ip_address VARCHAR(45) NOT NULL, + fingerprint_hash VARCHAR(64) NOT NULL, + fingerprint_data JSONB, + + -- Geo this session is locked to + state_code VARCHAR(2) NOT NULL, + city VARCHAR(100), + + -- Ownership + worker_id VARCHAR(255), -- NULL if in cooldown + + -- Status: 'active' (locked to worker), 'cooldown' (8hr wait), 'available' + status VARCHAR(20) NOT NULL DEFAULT 'available', + + -- Task tracking + tasks_claimed INTEGER NOT NULL DEFAULT 0, + tasks_completed INTEGER NOT NULL DEFAULT 0, + tasks_failed INTEGER NOT NULL DEFAULT 0, + max_tasks INTEGER NOT NULL DEFAULT 6, + + -- Timestamps + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + locked_at TIMESTAMPTZ, -- When worker locked this session + retired_at TIMESTAMPTZ, -- When session was retired (cooldown starts) + cooldown_until TIMESTAMPTZ, -- When session becomes available again + + -- Constraints + CONSTRAINT valid_status CHECK (status IN ('active', 'cooldown', 'available')) +); + +-- Indexes for fast lookups +CREATE INDEX IF NOT EXISTS idx_worker_sessions_ip ON worker_sessions(ip_address); +CREATE INDEX IF NOT EXISTS idx_worker_sessions_status ON worker_sessions(status); +CREATE INDEX IF NOT EXISTS idx_worker_sessions_worker ON worker_sessions(worker_id) WHERE worker_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_worker_sessions_geo ON worker_sessions(state_code, city); +CREATE INDEX IF NOT EXISTS idx_worker_sessions_cooldown ON worker_sessions(cooldown_until) WHERE status = 'cooldown'; + +-- Unique constraint: only one active session per IP +CREATE UNIQUE INDEX IF NOT EXISTS idx_worker_sessions_active_ip + ON worker_sessions(ip_address) + WHERE status = 'active'; + +-- Function: Check if IP is available (not active, not in cooldown) +CREATE OR REPLACE FUNCTION is_ip_available(check_ip VARCHAR(45)) +RETURNS BOOLEAN AS $$ +BEGIN + -- Check if any session has this IP and is either active or in cooldown + RETURN NOT EXISTS ( + SELECT 1 FROM worker_sessions + WHERE ip_address = check_ip + AND (status = 'active' OR (status = 'cooldown' AND cooldown_until > NOW())) + ); +END; +$$ LANGUAGE plpgsql; + +-- Function: Lock a session to a worker +-- Returns the session if successful, NULL if IP not available +CREATE OR REPLACE FUNCTION lock_worker_session( + p_worker_id VARCHAR(255), + p_ip_address VARCHAR(45), + p_state_code VARCHAR(2), + p_city VARCHAR(100) DEFAULT NULL, + p_fingerprint_hash VARCHAR(64) DEFAULT NULL, + p_fingerprint_data JSONB DEFAULT NULL +) RETURNS worker_sessions AS $$ +DECLARE + v_session worker_sessions; +BEGIN + -- First check if IP is available + IF NOT is_ip_available(p_ip_address) THEN + RETURN NULL; + END IF; + + -- Try to find an existing available session for this IP + SELECT * INTO v_session + FROM worker_sessions + WHERE ip_address = p_ip_address + AND status = 'available' + FOR UPDATE SKIP LOCKED + LIMIT 1; + + IF v_session.id IS NOT NULL THEN + -- Reuse existing session + UPDATE worker_sessions SET + worker_id = p_worker_id, + status = 'active', + state_code = p_state_code, + city = p_city, + fingerprint_hash = COALESCE(p_fingerprint_hash, fingerprint_hash), + fingerprint_data = COALESCE(p_fingerprint_data, fingerprint_data), + tasks_claimed = 0, + tasks_completed = 0, + tasks_failed = 0, + locked_at = NOW(), + retired_at = NULL, + cooldown_until = NULL + WHERE id = v_session.id + RETURNING * INTO v_session; + ELSE + -- Create new session + INSERT INTO worker_sessions ( + ip_address, fingerprint_hash, fingerprint_data, + state_code, city, worker_id, status, locked_at + ) VALUES ( + p_ip_address, COALESCE(p_fingerprint_hash, md5(random()::text)), + p_fingerprint_data, p_state_code, p_city, p_worker_id, 'active', NOW() + ) + RETURNING * INTO v_session; + END IF; + + RETURN v_session; +END; +$$ LANGUAGE plpgsql; + +-- Function: Retire a session (start 8hr cooldown) +CREATE OR REPLACE FUNCTION retire_worker_session(p_worker_id VARCHAR(255)) +RETURNS BOOLEAN AS $$ +DECLARE + v_updated INTEGER; +BEGIN + UPDATE worker_sessions SET + status = 'cooldown', + worker_id = NULL, + retired_at = NOW(), + cooldown_until = NOW() + INTERVAL '8 hours' + WHERE worker_id = p_worker_id + AND status = 'active'; + + GET DIAGNOSTICS v_updated = ROW_COUNT; + RETURN v_updated > 0; +END; +$$ LANGUAGE plpgsql; + +-- Function: Release expired cooldowns +CREATE OR REPLACE FUNCTION release_expired_sessions() +RETURNS INTEGER AS $$ +DECLARE + v_released INTEGER; +BEGIN + UPDATE worker_sessions SET + status = 'available' + WHERE status = 'cooldown' + AND cooldown_until <= NOW(); + + GET DIAGNOSTICS v_released = ROW_COUNT; + RETURN v_released; +END; +$$ LANGUAGE plpgsql; + +-- Function: Get session for worker +CREATE OR REPLACE FUNCTION get_worker_session(p_worker_id VARCHAR(255)) +RETURNS worker_sessions AS $$ + SELECT * FROM worker_sessions + WHERE worker_id = p_worker_id AND status = 'active' + LIMIT 1; +$$ LANGUAGE sql; + +-- Function: Increment task counters +CREATE OR REPLACE FUNCTION session_task_completed(p_worker_id VARCHAR(255)) +RETURNS BOOLEAN AS $$ +BEGIN + UPDATE worker_sessions SET + tasks_completed = tasks_completed + 1 + WHERE worker_id = p_worker_id AND status = 'active'; + RETURN FOUND; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION session_task_failed(p_worker_id VARCHAR(255)) +RETURNS BOOLEAN AS $$ +BEGIN + UPDATE worker_sessions SET + tasks_failed = tasks_failed + 1 + WHERE worker_id = p_worker_id AND status = 'active'; + RETURN FOUND; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION session_task_claimed(p_worker_id VARCHAR(255), p_count INTEGER DEFAULT 1) +RETURNS BOOLEAN AS $$ +BEGIN + UPDATE worker_sessions SET + tasks_claimed = tasks_claimed + p_count + WHERE worker_id = p_worker_id AND status = 'active'; + RETURN FOUND; +END; +$$ LANGUAGE plpgsql; + +-- Scheduled job hint: Run release_expired_sessions() every 5 minutes +COMMENT ON FUNCTION release_expired_sessions() IS + 'Run periodically to release sessions from cooldown. Suggest: every 5 minutes.'; + +-- ============================================================================= +-- ATOMIC TASK CLAIMING +-- Worker claims up to 6 tasks for same geo in one transaction +-- ============================================================================= + +-- Function: Claim up to N tasks for same geo +-- Returns claimed tasks with dispensary geo info +CREATE OR REPLACE FUNCTION claim_tasks_batch( + p_worker_id VARCHAR(255), + p_max_tasks INTEGER DEFAULT 6, + p_role VARCHAR(50) DEFAULT NULL -- Optional role filter +) RETURNS TABLE ( + task_id INTEGER, + role VARCHAR(50), + dispensary_id INTEGER, + dispensary_name VARCHAR(255), + city VARCHAR(100), + state_code VARCHAR(2), + platform VARCHAR(50), + method VARCHAR(20) +) AS $$ +DECLARE + v_target_state VARCHAR(2); + v_target_city VARCHAR(100); + v_claimed_count INTEGER := 0; +BEGIN + -- First, find the geo with most pending tasks to target + SELECT d.state, d.city INTO v_target_state, v_target_city + FROM worker_tasks t + JOIN dispensaries d ON t.dispensary_id = d.id + WHERE t.status = 'pending' + AND (p_role IS NULL OR t.role = p_role) + GROUP BY d.state, d.city + ORDER BY COUNT(*) DESC + LIMIT 1; + + -- No pending tasks + IF v_target_state IS NULL THEN + RETURN; + END IF; + + -- Claim up to p_max_tasks for this geo + RETURN QUERY + WITH claimed AS ( + UPDATE worker_tasks t SET + status = 'claimed', + worker_id = p_worker_id, + claimed_at = NOW() + FROM ( + SELECT t2.id + FROM worker_tasks t2 + JOIN dispensaries d ON t2.dispensary_id = d.id + WHERE t2.status = 'pending' + AND d.state = v_target_state + AND (v_target_city IS NULL OR d.city = v_target_city) + AND (p_role IS NULL OR t2.role = p_role) + ORDER BY t2.priority DESC, t2.created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT p_max_tasks + ) sub + WHERE t.id = sub.id + RETURNING t.id, t.role, t.dispensary_id, t.method + ) + SELECT + c.id as task_id, + c.role, + c.dispensary_id, + d.name as dispensary_name, + d.city, + d.state as state_code, + d.platform, + c.method + FROM claimed c + JOIN dispensaries d ON c.dispensary_id = d.id; +END; +$$ LANGUAGE plpgsql; + +-- Function: Release claimed tasks back to pending (for failed worker or cleanup) +CREATE OR REPLACE FUNCTION release_claimed_tasks(p_worker_id VARCHAR(255)) +RETURNS INTEGER AS $$ +DECLARE + v_released INTEGER; +BEGIN + UPDATE worker_tasks SET + status = 'pending', + worker_id = NULL, + claimed_at = NULL + WHERE worker_id = p_worker_id + AND status IN ('claimed', 'running'); + + GET DIAGNOSTICS v_released = ROW_COUNT; + RETURN v_released; +END; +$$ LANGUAGE plpgsql; + +-- Function: Mark task as running +CREATE OR REPLACE FUNCTION start_task(p_task_id INTEGER, p_worker_id VARCHAR(255)) +RETURNS BOOLEAN AS $$ +BEGIN + UPDATE worker_tasks SET + status = 'running', + started_at = NOW() + WHERE id = p_task_id + AND worker_id = p_worker_id + AND status = 'claimed'; + RETURN FOUND; +END; +$$ LANGUAGE plpgsql; + +-- Function: Mark task as completed (leaves pool) +CREATE OR REPLACE FUNCTION complete_task( + p_task_id INTEGER, + p_worker_id VARCHAR(255), + p_result JSONB DEFAULT NULL +) RETURNS BOOLEAN AS $$ +BEGIN + UPDATE worker_tasks SET + status = 'completed', + completed_at = NOW(), + result = p_result + WHERE id = p_task_id + AND worker_id = p_worker_id + AND status = 'running'; + RETURN FOUND; +END; +$$ LANGUAGE plpgsql; + +-- Function: Mark task as failed (returns to pending for retry) +CREATE OR REPLACE FUNCTION fail_task( + p_task_id INTEGER, + p_worker_id VARCHAR(255), + p_error TEXT DEFAULT NULL, + p_max_retries INTEGER DEFAULT 3 +) RETURNS BOOLEAN AS $$ +DECLARE + v_retry_count INTEGER; +BEGIN + -- Get current retry count + SELECT COALESCE(retry_count, 0) INTO v_retry_count + FROM worker_tasks WHERE id = p_task_id; + + IF v_retry_count >= p_max_retries THEN + -- Max retries exceeded - mark as permanently failed + UPDATE worker_tasks SET + status = 'failed', + completed_at = NOW(), + error_message = p_error, + retry_count = v_retry_count + 1 + WHERE id = p_task_id + AND worker_id = p_worker_id; + ELSE + -- Return to pending for retry + UPDATE worker_tasks SET + status = 'pending', + worker_id = NULL, + claimed_at = NULL, + started_at = NULL, + error_message = p_error, + retry_count = v_retry_count + 1 + WHERE id = p_task_id + AND worker_id = p_worker_id; + END IF; + + RETURN FOUND; +END; +$$ LANGUAGE plpgsql; + +-- Add retry_count column if not exists +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'worker_tasks' AND column_name = 'retry_count' + ) THEN + ALTER TABLE worker_tasks ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0; + END IF; + + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_name = 'worker_tasks' AND column_name = 'claimed_at' + ) THEN + ALTER TABLE worker_tasks ADD COLUMN claimed_at TIMESTAMPTZ; + END IF; +END $$; diff --git a/backend/src/services/crawl-rotator.ts b/backend/src/services/crawl-rotator.ts index b78c2c68..6cf0a553 100644 --- a/backend/src/services/crawl-rotator.ts +++ b/backend/src/services/crawl-rotator.ts @@ -139,6 +139,10 @@ export class ProxyRotator { // Proxy reload interval - how often to check for proxy changes (default: 60 seconds) private reloadIntervalMs: number = 60000; + // Fixed proxy URL (when set, bypasses normal rotation) + private fixedProxyUrl: string | null = null; + private fixedProxy: Proxy | null = null; + constructor(pool?: Pool) { this.pool = pool || null; } @@ -147,6 +151,55 @@ export class ProxyRotator { this.pool = pool; } + /** + * Set a fixed proxy URL (bypasses rotation) + * Used by session pool to lock worker to specific proxy + */ + setFixedProxy(proxyUrl: string): void { + this.fixedProxyUrl = proxyUrl; + // Parse URL into Proxy format + try { + const url = new URL(proxyUrl); + this.fixedProxy = { + id: -1, + host: url.hostname, + port: parseInt(url.port) || 80, + username: url.username || undefined, + password: url.password || undefined, + protocol: url.protocol.replace(':', '') as 'http' | 'https' | 'socks5', + isActive: true, + proxyUrl: proxyUrl, + lastUsedAt: null, + failureCount: 0, + successCount: 0, + avgResponseTimeMs: null, + maxConnections: 1, + consecutive403Count: 0, + }; + console.log(`[ProxyRotator] Fixed proxy set: ${url.hostname}:${url.port}`); + } catch (err: any) { + console.error(`[ProxyRotator] Invalid proxy URL: ${err.message}`); + this.fixedProxyUrl = null; + this.fixedProxy = null; + } + } + + /** + * Clear fixed proxy (resume normal rotation) + */ + clearFixedProxy(): void { + this.fixedProxyUrl = null; + this.fixedProxy = null; + console.log('[ProxyRotator] Fixed proxy cleared, resuming rotation'); + } + + /** + * Check if using fixed proxy + */ + isUsingFixedProxy(): boolean { + return this.fixedProxyUrl !== null; + } + /** * Set the reload interval for periodic proxy checks */ @@ -267,8 +320,13 @@ export class ProxyRotator { /** * Get current proxy without rotating + * Returns fixed proxy if set, otherwise current rotating proxy */ getCurrent(): Proxy | null { + // Return fixed proxy if set + if (this.fixedProxy) { + return this.fixedProxy; + } if (this.proxies.length === 0) return null; return this.proxies[this.currentIndex]; } @@ -687,6 +745,21 @@ export class CrawlRotator { this.proxy.setReloadInterval(ms); } + /** + * Set a fixed proxy URL (bypasses rotation) + * Used by session pool to lock worker to specific proxy + */ + setFixedProxy(proxyUrl: string): void { + this.proxy.setFixedProxy(proxyUrl); + } + + /** + * Clear fixed proxy (resume normal rotation) + */ + clearFixedProxy(): void { + this.proxy.clearFixedProxy(); + } + /** * Rotate proxy only (get new IP) */ diff --git a/backend/src/services/worker-session.ts b/backend/src/services/worker-session.ts new file mode 100644 index 00000000..9031acf2 --- /dev/null +++ b/backend/src/services/worker-session.ts @@ -0,0 +1,347 @@ +/** + * Worker Session Service + * + * Manages the worker session lifecycle: + * 1. Claim up to 6 tasks for same geo + * 2. Get Evomi proxy for that geo + * 3. Check IP availability (not in use, not in cooldown) + * 4. Lock IP/fingerprint to worker + * 5. Track task completion + * 6. Retire session after 6 tasks (8hr cooldown) + */ + +import { pool } from '../db/pool'; +import { buildEvomiProxyUrl, getEvomiConfig } from './crawl-rotator'; + +export interface ClaimedTask { + task_id: number; + role: string; + dispensary_id: number; + dispensary_name: string; + city: string | null; + state_code: string; + platform: string; + method: string | null; +} + +export interface WorkerSession { + id: number; + ip_address: string; + fingerprint_hash: string; + fingerprint_data: Record | null; + state_code: string; + city: string | null; + worker_id: string; + status: 'active' | 'cooldown' | 'available'; + tasks_claimed: number; + tasks_completed: number; + tasks_failed: number; + max_tasks: number; + locked_at: Date; +} + +export interface SessionWithTasks { + session: WorkerSession; + tasks: ClaimedTask[]; + proxyUrl: string; +} + +const MAX_TASKS_PER_SESSION = 6; +const MAX_IP_ATTEMPTS = 10; // How many IPs to try before giving up +const COOLDOWN_HOURS = 8; + +/** + * Claim tasks and establish a session for a worker. + * This is the main entry point for the new worker flow. + * + * Flow: + * 1. Claim up to 6 tasks for same geo + * 2. Get Evomi proxy for that geo + * 3. Try IPs until we find one that's available + * 4. Lock IP to this worker + * 5. Return session + tasks + proxy URL + */ +export async function claimSessionWithTasks( + workerId: string, + role?: string +): Promise { + const client = await pool.connect(); + + try { + await client.query('BEGIN'); + + // Step 1: Claim up to 6 tasks for same geo + const { rows: tasks } = await client.query( + `SELECT * FROM claim_tasks_batch($1, $2, $3)`, + [workerId, MAX_TASKS_PER_SESSION, role || null] + ); + + if (tasks.length === 0) { + await client.query('ROLLBACK'); + console.log(`[WorkerSession] No pending tasks available for ${workerId}`); + return null; + } + + // Get geo from first claimed task (all same geo) + const { state_code, city } = tasks[0]; + console.log(`[WorkerSession] ${workerId} claimed ${tasks.length} tasks for ${city || 'any'}, ${state_code}`); + + // Step 2: Get Evomi proxy for this geo + const evomiConfig = getEvomiConfig(); + if (!evomiConfig.enabled) { + await client.query('ROLLBACK'); + throw new Error('Evomi proxy not configured'); + } + + // Step 3: Try to get an available IP + let session: WorkerSession | null = null; + let proxyUrl: string | null = null; + + for (let attempt = 0; attempt < MAX_IP_ATTEMPTS; attempt++) { + // Build proxy URL with unique session ID for each attempt + const sessionId = `${workerId}-${Date.now()}-${attempt}`; + const proxyResult = buildEvomiProxyUrl(state_code, sessionId, city || undefined); + + if (!proxyResult) { + console.warn(`[WorkerSession] Failed to build proxy URL for ${state_code}`); + continue; + } + + // TODO: Actually make a request through the proxy to get the real IP + // For now, we'll use a placeholder - in production, run a quick IP check + const testIp = await getProxyIp(proxyResult.url); + + if (!testIp) { + console.warn(`[WorkerSession] Failed to get IP from proxy attempt ${attempt + 1}`); + continue; + } + + // Step 4: Try to lock this IP + const { rows } = await client.query( + `SELECT * FROM lock_worker_session($1, $2, $3, $4)`, + [workerId, testIp, state_code, city] + ); + + if (rows[0]?.id) { + session = rows[0]; + proxyUrl = proxyResult.url; + console.log(`[WorkerSession] ${workerId} locked IP ${testIp} for ${city || 'any'}, ${state_code}`); + break; + } + + console.log(`[WorkerSession] IP ${testIp} not available (in use or cooldown), trying next...`); + } + + if (!session || !proxyUrl) { + // Release claimed tasks back to pool + await client.query(`SELECT release_claimed_tasks($1)`, [workerId]); + await client.query('ROLLBACK'); + console.error(`[WorkerSession] ${workerId} failed to get available IP after ${MAX_IP_ATTEMPTS} attempts`); + return null; + } + + // Update session with task count + await client.query( + `SELECT session_task_claimed($1, $2)`, + [workerId, tasks.length] + ); + + await client.query('COMMIT'); + + return { + session, + tasks, + proxyUrl, + }; + } catch (err) { + await client.query('ROLLBACK'); + throw err; + } finally { + client.release(); + } +} + +/** + * Get the real IP address from a proxy by making a test request + */ +async function getProxyIp(proxyUrl: string): Promise { + try { + // Use a simple IP check service + const { default: axios } = await import('axios'); + const { HttpsProxyAgent } = await import('https-proxy-agent'); + + const agent = new HttpsProxyAgent(proxyUrl); + const response = await axios.get('https://api.ipify.org?format=json', { + httpAgent: agent, + httpsAgent: agent, + timeout: 10000, + }); + + return response.data?.ip || null; + } catch (err: any) { + console.warn(`[WorkerSession] IP check failed: ${err.message}`); + return null; + } +} + +/** + * Mark a task as started (running) + */ +export async function startTask(taskId: number, workerId: string): Promise { + const { rows } = await pool.query( + `SELECT start_task($1, $2) as success`, + [taskId, workerId] + ); + return rows[0]?.success || false; +} + +/** + * Mark a task as completed + */ +export async function completeTask( + taskId: number, + workerId: string, + result?: Record +): Promise { + const client = await pool.connect(); + try { + await client.query('BEGIN'); + + // Complete the task + const { rows } = await client.query( + `SELECT complete_task($1, $2, $3) as success`, + [taskId, workerId, result ? JSON.stringify(result) : null] + ); + + if (rows[0]?.success) { + // Update session counter + await client.query(`SELECT session_task_completed($1)`, [workerId]); + } + + await client.query('COMMIT'); + return rows[0]?.success || false; + } catch (err) { + await client.query('ROLLBACK'); + throw err; + } finally { + client.release(); + } +} + +/** + * Mark a task as failed (returns to pending for retry) + */ +export async function failTask( + taskId: number, + workerId: string, + error?: string +): Promise { + const client = await pool.connect(); + try { + await client.query('BEGIN'); + + // Fail the task (may return to pending or mark as permanently failed) + const { rows } = await client.query( + `SELECT fail_task($1, $2, $3) as success`, + [taskId, workerId, error || null] + ); + + if (rows[0]?.success) { + // Update session counter + await client.query(`SELECT session_task_failed($1)`, [workerId]); + } + + await client.query('COMMIT'); + return rows[0]?.success || false; + } catch (err) { + await client.query('ROLLBACK'); + throw err; + } finally { + client.release(); + } +} + +/** + * Get current session for a worker + */ +export async function getWorkerSession(workerId: string): Promise { + const { rows } = await pool.query( + `SELECT * FROM get_worker_session($1)`, + [workerId] + ); + return rows[0] as WorkerSession || null; +} + +/** + * Check if worker session is complete (all 6 tasks done) + */ +export async function isSessionComplete(workerId: string): Promise { + const session = await getWorkerSession(workerId); + if (!session) return true; // No session = complete + + const totalDone = session.tasks_completed + session.tasks_failed; + return totalDone >= session.tasks_claimed; +} + +/** + * Retire a worker's session (start 8hr cooldown) + */ +export async function retireSession(workerId: string): Promise { + const { rows } = await pool.query( + `SELECT retire_worker_session($1) as success`, + [workerId] + ); + console.log(`[WorkerSession] ${workerId} session retired, IP in ${COOLDOWN_HOURS}hr cooldown`); + return rows[0]?.success || false; +} + +/** + * Release any claimed tasks back to pool (for worker shutdown) + */ +export async function releaseClaimedTasks(workerId: string): Promise { + const { rows } = await pool.query( + `SELECT release_claimed_tasks($1) as count`, + [workerId] + ); + const count = rows[0]?.count || 0; + if (count > 0) { + console.log(`[WorkerSession] Released ${count} claimed tasks for ${workerId}`); + } + return count; +} + +/** + * Cleanup: release expired sessions from cooldown + */ +export async function releaseExpiredSessions(): Promise { + const { rows } = await pool.query( + `SELECT release_expired_sessions() as count` + ); + return rows[0]?.count || 0; +} + +/** + * Get session stats for monitoring + */ +export async function getSessionStats(): Promise<{ + active: number; + cooldown: number; + available: number; + uniqueIps: number; +}> { + const { rows } = await pool.query(` + SELECT + COUNT(*) FILTER (WHERE status = 'active') as active, + COUNT(*) FILTER (WHERE status = 'cooldown') as cooldown, + COUNT(*) FILTER (WHERE status = 'available') as available, + COUNT(DISTINCT ip_address) as unique_ips + FROM worker_sessions + `); + + return { + active: parseInt(rows[0]?.active || '0'), + cooldown: parseInt(rows[0]?.cooldown || '0'), + available: parseInt(rows[0]?.available || '0'), + uniqueIps: parseInt(rows[0]?.unique_ips || '0'), + }; +} diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 52222a76..73c10569 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -55,7 +55,7 @@ import { Pool } from 'pg'; import { v4 as uuidv4 } from 'uuid'; -import { taskService, TaskRole, WorkerTask } from './task-service'; +import { taskService, TaskRole, TaskStatus, WorkerTask } from './task-service'; import { getPool } from '../db/pool'; import os from 'os'; @@ -73,9 +73,16 @@ import { buildEvomiProxyUrl, getEvomiConfig } from '../services/crawl-rotator'; // Identity pool for diverse IP/fingerprint rotation import { IdentityPoolService, WorkerIdentity, getRandomTaskCount } from '../services/identity-pool'; +// NEW: Session-based worker pool (claim tasks first, then get IP) +import * as WorkerSession from '../services/worker-session'; + // Feature flag: Use new identity pool system (set via env var) const USE_IDENTITY_POOL = process.env.USE_IDENTITY_POOL === 'true'; +// Feature flag: Use new session pool system (claim 6 tasks, then get IP) +// This is the new correct flow: claim tasks → get geo from tasks → get IP → preflight +const USE_SESSION_POOL = process.env.USE_SESSION_POOL === 'true'; + // Task handlers by role // Platform-based handlers: {task}-{platform}.ts convention import { handleProductRefresh } from './handlers/product-refresh'; @@ -376,6 +383,18 @@ export class TaskWorker { private identityMaxTasks: number = 5; // Random 3-5, set when identity claimed private identityProxyUrl: string | null = null; + // ========================================================================== + // SESSION POOL TRACKING (new system - enabled via USE_SESSION_POOL) + // ========================================================================== + // Correct flow: claim tasks FIRST, then get IP based on task geo. + // Worker claims up to 6 tasks for same geo, gets IP, preflights, executes. + // After 6 tasks, IP goes into 8hr cooldown. + // ========================================================================== + private sessionTasks: WorkerSession.ClaimedTask[] = []; + private currentSession: WorkerSession.WorkerSession | null = null; + private sessionProxyUrl: string | null = null; + private sessionPreflightPassed: boolean = false; + constructor(role: TaskRole | null = null, workerId?: string) { this.pool = getPool(); this.role = role; @@ -1459,6 +1478,19 @@ export class TaskWorker { // Try to claim more tasks if we have capacity if (this.canAcceptMoreTasks()) { + // ================================================================= + // NEW SESSION POOL FLOW (enabled via USE_SESSION_POOL) + // Correct order: claim tasks → get geo from tasks → get IP → preflight + // ================================================================= + if (USE_SESSION_POOL) { + await this.sessionPoolMainLoop(); + return; + } + + // ================================================================= + // LEGACY FLOWS BELOW (USE_IDENTITY_POOL or default geo session) + // ================================================================= + // ================================================================= // LAZY INITIALIZATION - Initialize stealth on first task claim // Workers start immediately and init proxies only when needed @@ -1611,6 +1643,209 @@ export class TaskWorker { await this.sleep(POLL_INTERVAL_MS); } + // =========================================================================== + // SESSION POOL MAIN LOOP (new system - enabled via USE_SESSION_POOL) + // =========================================================================== + // Correct flow: claim tasks FIRST, then get IP based on task geo. + // 1. Check if we have an active session with tasks + // 2. If not, claim up to 6 tasks for same geo + // 3. Get Evomi proxy for that geo, check IP availability + // 4. Run preflight with that IP + // 5. Execute tasks (3 concurrent) + // 6. When all 6 done, retire session (8hr cooldown), repeat + // =========================================================================== + private async sessionPoolMainLoop(): Promise { + // Check if session is complete (all claimed tasks done) + if (this.currentSession) { + const isComplete = await WorkerSession.isSessionComplete(this.workerId); + if (isComplete) { + console.log(`[TaskWorker] ${this.friendlyName} session complete, retiring...`); + await WorkerSession.retireSession(this.workerId); + this.currentSession = null; + this.sessionTasks = []; + this.sessionProxyUrl = null; + this.sessionPreflightPassed = false; + this.geoState = null; + this.geoCity = null; + // Continue to claim new session + } + } + + // If no active session, claim new batch of tasks + if (!this.currentSession) { + console.log(`[TaskWorker] ${this.friendlyName} claiming new session...`); + + // Initialize stealth if needed (for fingerprint generation) + if (!this.stealthInitialized) { + const initSuccess = await this.ensureStealthInitialized(); + if (!initSuccess) { + console.log(`[TaskWorker] ${this.friendlyName} stealth init failed, waiting...`); + await this.sleep(30000); + return; + } + } + + // Claim tasks and establish session + const result = await WorkerSession.claimSessionWithTasks(this.workerId, this.role || undefined); + + if (!result) { + // No tasks available or couldn't get IP + console.log(`[TaskWorker] ${this.friendlyName} no session available, waiting...`); + await this.sleep(30000); + return; + } + + this.currentSession = result.session; + this.sessionTasks = result.tasks; + this.sessionProxyUrl = result.proxyUrl; + this.geoState = result.session.state_code; + this.geoCity = result.session.city || null; + + console.log(`[TaskWorker] ${this.friendlyName} new session: ${result.tasks.length} tasks for ${this.geoCity || 'any'}, ${this.geoState} (IP: ${result.session.ip_address})`); + + // Configure proxy in crawl rotator + if (this.sessionProxyUrl) { + this.crawlRotator.setFixedProxy(this.sessionProxyUrl); + } + + // Run preflight with this session's proxy + console.log(`[TaskWorker] ${this.friendlyName} running preflight for session...`); + try { + await this.runDualPreflights(); + + if (this.preflightHttpPassed) { + this.sessionPreflightPassed = true; + console.log(`[TaskWorker] ${this.friendlyName} session preflight PASSED (IP: ${this.preflightHttpResult?.proxyIp || 'unknown'})`); + } else { + // Preflight failed - release tasks and session + console.error(`[TaskWorker] ${this.friendlyName} session preflight FAILED, releasing tasks...`); + await WorkerSession.releaseClaimedTasks(this.workerId); + await WorkerSession.retireSession(this.workerId); + this.currentSession = null; + this.sessionTasks = []; + this.sessionProxyUrl = null; + this.sessionPreflightPassed = false; + await this.sleep(30000); + return; + } + } catch (err: any) { + console.error(`[TaskWorker] ${this.friendlyName} preflight error: ${err.message}`); + await WorkerSession.releaseClaimedTasks(this.workerId); + await WorkerSession.retireSession(this.workerId); + this.currentSession = null; + this.sessionTasks = []; + await this.sleep(30000); + return; + } + } + + // We have an active session with tasks - execute next pending task + if (!this.sessionPreflightPassed) { + console.log(`[TaskWorker] ${this.friendlyName} session preflight not passed, waiting...`); + await this.sleep(10000); + return; + } + + // Find next task to execute from our claimed batch + const nextTask = this.sessionTasks.find(t => !this.activeTasks.has(t.task_id)); + if (!nextTask) { + // All claimed tasks are either running or done + await this.sleep(POLL_INTERVAL_MS); + return; + } + + // Convert session task to worker task format + const workerTask: WorkerTask = { + id: nextTask.task_id, + role: nextTask.role as TaskRole, + dispensary_id: nextTask.dispensary_id, + dispensary_name: nextTask.dispensary_name, + method: (nextTask.method || 'http') as 'http' | 'curl' | null, + platform: nextTask.platform, + priority: 50, + status: 'claimed' as TaskStatus, + scheduled_for: null, + worker_id: this.workerId, + claimed_at: new Date(), + started_at: null, + completed_at: null, + last_heartbeat_at: null, + result: null, + error_message: null, + retry_count: 0, + max_retries: 3, + payload: null, + created_at: new Date(), + updated_at: new Date(), + }; + + // Mark task as running + await WorkerSession.startTask(nextTask.task_id, this.workerId); + + console.log(`[TaskWorker] ${this.friendlyName} starting task ${nextTask.task_id} (${nextTask.role}) for ${nextTask.dispensary_name}`); + + this.activeTasks.set(nextTask.task_id, workerTask); + + // Execute task in background + const taskPromise = this.executeSessionTask(workerTask); + this.taskPromises.set(nextTask.task_id, taskPromise); + + // Cleanup when done + taskPromise.finally(() => { + this.activeTasks.delete(nextTask.task_id); + this.taskPromises.delete(nextTask.task_id); + }); + } + + /** + * Execute a task for session pool (handles completion/failure tracking) + */ + private async executeSessionTask(task: WorkerTask): Promise { + console.log(`[TaskWorker] ${this.friendlyName} executing session task ${task.id} (${task.role})`); + + try { + // Get handler for this role + const handler = getHandlerForTask(task); + if (!handler) { + throw new Error(`No handler registered for role: ${task.role}`); + } + + // Create context + const ctx: TaskContext = { + pool: this.pool, + workerId: this.workerId, + task, + heartbeat: async () => { + // Session tasks use different heartbeat + }, + crawlRotator: this.crawlRotator, + updateStep: (step: string, detail?: string) => { + this.updateTaskStep(task.id, step, detail); + }, + fingerprint: this.storedFingerprint || undefined, + }; + + this.updateTaskStep(task.id, 'starting', `Initializing ${task.role}`); + + // Execute + const result = await handler(ctx); + + this.clearTaskStep(task.id); + + if (result.success) { + await WorkerSession.completeTask(task.id, this.workerId, { result: result.data }); + console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED`); + } else { + await WorkerSession.failTask(task.id, this.workerId, result.error); + console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} FAILED: ${result.error}`); + } + } catch (err: any) { + this.clearTaskStep(task.id); + await WorkerSession.failTask(task.id, this.workerId, err.message); + console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} ERROR: ${err.message}`); + } + } + /** * Stop the worker */ @@ -1619,6 +1854,16 @@ export class TaskWorker { this.stopHeartbeat(); this.stopRegistryHeartbeat(); this.stopPeriodicStaleCleanup(); + + // Clean up session pool state if using new system + if (USE_SESSION_POOL && this.currentSession) { + console.log(`[TaskWorker] ${this.friendlyName} releasing session pool resources...`); + await WorkerSession.releaseClaimedTasks(this.workerId); + await WorkerSession.retireSession(this.workerId); + this.currentSession = null; + this.sessionTasks = []; + } + await this.deregister(); console.log(`[TaskWorker] ${this.friendlyName} stopped`); }