feat(workers): Session pool system - claim tasks first, then get IP

New worker flow (enabled via USE_SESSION_POOL=true):
1. Worker claims up to 6 tasks for same geo (atomically marked claimed)
2. Gets Evomi proxy for that geo
3. Checks IP availability (not in use, not in 8hr cooldown)
4. Locks IP exclusively to this worker
5. Runs preflight with locked IP
6. Executes tasks (3 concurrent)
7. After 6 tasks, retires session (8hr IP cooldown)
8. Repeats with new IP

Key files:
- migrations/112_worker_session_pool.sql: Session table + atomic claiming
- services/worker-session.ts: Session lifecycle management
- tasks/task-worker.ts: sessionPoolMainLoop() with new flow
- services/crawl-rotator.ts: setFixedProxy() for session locking

Failed tasks return to pending for retry by another worker.
No two workers can share same IP simultaneously.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-13 22:54:45 -07:00
parent f0bb454ca2
commit 4cb4e1c502
4 changed files with 1056 additions and 1 deletions

View File

@@ -0,0 +1,390 @@
-- Migration 112: Worker Session Pool
-- Tracks IP/fingerprint sessions with exclusive locks and cooldowns
-- Each worker claims up to 6 tasks, uses one IP/fingerprint for those tasks,
-- then retires the session (8hr cooldown before IP can be reused)
-- Drop old identity pool tables if they exist (replacing with simpler session model)
DROP TABLE IF EXISTS worker_identity_claims CASCADE;
DROP TABLE IF EXISTS worker_identities CASCADE;
-- Worker sessions: tracks active and cooling down IP/fingerprint pairs
CREATE TABLE IF NOT EXISTS worker_sessions (
id SERIAL PRIMARY KEY,
-- IP and fingerprint for this session
ip_address VARCHAR(45) NOT NULL,
fingerprint_hash VARCHAR(64) NOT NULL,
fingerprint_data JSONB,
-- Geo this session is locked to
state_code VARCHAR(2) NOT NULL,
city VARCHAR(100),
-- Ownership
worker_id VARCHAR(255), -- NULL if in cooldown
-- Status: 'active' (locked to worker), 'cooldown' (8hr wait), 'available'
status VARCHAR(20) NOT NULL DEFAULT 'available',
-- Task tracking
tasks_claimed INTEGER NOT NULL DEFAULT 0,
tasks_completed INTEGER NOT NULL DEFAULT 0,
tasks_failed INTEGER NOT NULL DEFAULT 0,
max_tasks INTEGER NOT NULL DEFAULT 6,
-- Timestamps
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
locked_at TIMESTAMPTZ, -- When worker locked this session
retired_at TIMESTAMPTZ, -- When session was retired (cooldown starts)
cooldown_until TIMESTAMPTZ, -- When session becomes available again
-- Constraints
CONSTRAINT valid_status CHECK (status IN ('active', 'cooldown', 'available'))
);
-- Indexes for fast lookups
CREATE INDEX IF NOT EXISTS idx_worker_sessions_ip ON worker_sessions(ip_address);
CREATE INDEX IF NOT EXISTS idx_worker_sessions_status ON worker_sessions(status);
CREATE INDEX IF NOT EXISTS idx_worker_sessions_worker ON worker_sessions(worker_id) WHERE worker_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_worker_sessions_geo ON worker_sessions(state_code, city);
CREATE INDEX IF NOT EXISTS idx_worker_sessions_cooldown ON worker_sessions(cooldown_until) WHERE status = 'cooldown';
-- Unique constraint: only one active session per IP
CREATE UNIQUE INDEX IF NOT EXISTS idx_worker_sessions_active_ip
ON worker_sessions(ip_address)
WHERE status = 'active';
-- Function: Check if IP is available (not active, not in cooldown)
CREATE OR REPLACE FUNCTION is_ip_available(check_ip VARCHAR(45))
RETURNS BOOLEAN AS $$
BEGIN
-- Check if any session has this IP and is either active or in cooldown
RETURN NOT EXISTS (
SELECT 1 FROM worker_sessions
WHERE ip_address = check_ip
AND (status = 'active' OR (status = 'cooldown' AND cooldown_until > NOW()))
);
END;
$$ LANGUAGE plpgsql;
-- Function: Lock a session to a worker
-- Returns the session if successful, NULL if IP not available
CREATE OR REPLACE FUNCTION lock_worker_session(
p_worker_id VARCHAR(255),
p_ip_address VARCHAR(45),
p_state_code VARCHAR(2),
p_city VARCHAR(100) DEFAULT NULL,
p_fingerprint_hash VARCHAR(64) DEFAULT NULL,
p_fingerprint_data JSONB DEFAULT NULL
) RETURNS worker_sessions AS $$
DECLARE
v_session worker_sessions;
BEGIN
-- First check if IP is available
IF NOT is_ip_available(p_ip_address) THEN
RETURN NULL;
END IF;
-- Try to find an existing available session for this IP
SELECT * INTO v_session
FROM worker_sessions
WHERE ip_address = p_ip_address
AND status = 'available'
FOR UPDATE SKIP LOCKED
LIMIT 1;
IF v_session.id IS NOT NULL THEN
-- Reuse existing session
UPDATE worker_sessions SET
worker_id = p_worker_id,
status = 'active',
state_code = p_state_code,
city = p_city,
fingerprint_hash = COALESCE(p_fingerprint_hash, fingerprint_hash),
fingerprint_data = COALESCE(p_fingerprint_data, fingerprint_data),
tasks_claimed = 0,
tasks_completed = 0,
tasks_failed = 0,
locked_at = NOW(),
retired_at = NULL,
cooldown_until = NULL
WHERE id = v_session.id
RETURNING * INTO v_session;
ELSE
-- Create new session
INSERT INTO worker_sessions (
ip_address, fingerprint_hash, fingerprint_data,
state_code, city, worker_id, status, locked_at
) VALUES (
p_ip_address, COALESCE(p_fingerprint_hash, md5(random()::text)),
p_fingerprint_data, p_state_code, p_city, p_worker_id, 'active', NOW()
)
RETURNING * INTO v_session;
END IF;
RETURN v_session;
END;
$$ LANGUAGE plpgsql;
-- Function: Retire a session (start 8hr cooldown)
CREATE OR REPLACE FUNCTION retire_worker_session(p_worker_id VARCHAR(255))
RETURNS BOOLEAN AS $$
DECLARE
v_updated INTEGER;
BEGIN
UPDATE worker_sessions SET
status = 'cooldown',
worker_id = NULL,
retired_at = NOW(),
cooldown_until = NOW() + INTERVAL '8 hours'
WHERE worker_id = p_worker_id
AND status = 'active';
GET DIAGNOSTICS v_updated = ROW_COUNT;
RETURN v_updated > 0;
END;
$$ LANGUAGE plpgsql;
-- Function: Release expired cooldowns
CREATE OR REPLACE FUNCTION release_expired_sessions()
RETURNS INTEGER AS $$
DECLARE
v_released INTEGER;
BEGIN
UPDATE worker_sessions SET
status = 'available'
WHERE status = 'cooldown'
AND cooldown_until <= NOW();
GET DIAGNOSTICS v_released = ROW_COUNT;
RETURN v_released;
END;
$$ LANGUAGE plpgsql;
-- Function: Get session for worker
CREATE OR REPLACE FUNCTION get_worker_session(p_worker_id VARCHAR(255))
RETURNS worker_sessions AS $$
SELECT * FROM worker_sessions
WHERE worker_id = p_worker_id AND status = 'active'
LIMIT 1;
$$ LANGUAGE sql;
-- Function: Increment task counters
CREATE OR REPLACE FUNCTION session_task_completed(p_worker_id VARCHAR(255))
RETURNS BOOLEAN AS $$
BEGIN
UPDATE worker_sessions SET
tasks_completed = tasks_completed + 1
WHERE worker_id = p_worker_id AND status = 'active';
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION session_task_failed(p_worker_id VARCHAR(255))
RETURNS BOOLEAN AS $$
BEGIN
UPDATE worker_sessions SET
tasks_failed = tasks_failed + 1
WHERE worker_id = p_worker_id AND status = 'active';
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION session_task_claimed(p_worker_id VARCHAR(255), p_count INTEGER DEFAULT 1)
RETURNS BOOLEAN AS $$
BEGIN
UPDATE worker_sessions SET
tasks_claimed = tasks_claimed + p_count
WHERE worker_id = p_worker_id AND status = 'active';
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
-- Scheduled job hint: Run release_expired_sessions() every 5 minutes
COMMENT ON FUNCTION release_expired_sessions() IS
'Run periodically to release sessions from cooldown. Suggest: every 5 minutes.';
-- =============================================================================
-- ATOMIC TASK CLAIMING
-- Worker claims up to 6 tasks for same geo in one transaction
-- =============================================================================
-- Function: Claim up to N tasks for same geo
-- Returns claimed tasks with dispensary geo info
CREATE OR REPLACE FUNCTION claim_tasks_batch(
p_worker_id VARCHAR(255),
p_max_tasks INTEGER DEFAULT 6,
p_role VARCHAR(50) DEFAULT NULL -- Optional role filter
) RETURNS TABLE (
task_id INTEGER,
role VARCHAR(50),
dispensary_id INTEGER,
dispensary_name VARCHAR(255),
city VARCHAR(100),
state_code VARCHAR(2),
platform VARCHAR(50),
method VARCHAR(20)
) AS $$
DECLARE
v_target_state VARCHAR(2);
v_target_city VARCHAR(100);
v_claimed_count INTEGER := 0;
BEGIN
-- First, find the geo with most pending tasks to target
SELECT d.state, d.city INTO v_target_state, v_target_city
FROM worker_tasks t
JOIN dispensaries d ON t.dispensary_id = d.id
WHERE t.status = 'pending'
AND (p_role IS NULL OR t.role = p_role)
GROUP BY d.state, d.city
ORDER BY COUNT(*) DESC
LIMIT 1;
-- No pending tasks
IF v_target_state IS NULL THEN
RETURN;
END IF;
-- Claim up to p_max_tasks for this geo
RETURN QUERY
WITH claimed AS (
UPDATE worker_tasks t SET
status = 'claimed',
worker_id = p_worker_id,
claimed_at = NOW()
FROM (
SELECT t2.id
FROM worker_tasks t2
JOIN dispensaries d ON t2.dispensary_id = d.id
WHERE t2.status = 'pending'
AND d.state = v_target_state
AND (v_target_city IS NULL OR d.city = v_target_city)
AND (p_role IS NULL OR t2.role = p_role)
ORDER BY t2.priority DESC, t2.created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT p_max_tasks
) sub
WHERE t.id = sub.id
RETURNING t.id, t.role, t.dispensary_id, t.method
)
SELECT
c.id as task_id,
c.role,
c.dispensary_id,
d.name as dispensary_name,
d.city,
d.state as state_code,
d.platform,
c.method
FROM claimed c
JOIN dispensaries d ON c.dispensary_id = d.id;
END;
$$ LANGUAGE plpgsql;
-- Function: Release claimed tasks back to pending (for failed worker or cleanup)
CREATE OR REPLACE FUNCTION release_claimed_tasks(p_worker_id VARCHAR(255))
RETURNS INTEGER AS $$
DECLARE
v_released INTEGER;
BEGIN
UPDATE worker_tasks SET
status = 'pending',
worker_id = NULL,
claimed_at = NULL
WHERE worker_id = p_worker_id
AND status IN ('claimed', 'running');
GET DIAGNOSTICS v_released = ROW_COUNT;
RETURN v_released;
END;
$$ LANGUAGE plpgsql;
-- Function: Mark task as running
CREATE OR REPLACE FUNCTION start_task(p_task_id INTEGER, p_worker_id VARCHAR(255))
RETURNS BOOLEAN AS $$
BEGIN
UPDATE worker_tasks SET
status = 'running',
started_at = NOW()
WHERE id = p_task_id
AND worker_id = p_worker_id
AND status = 'claimed';
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
-- Function: Mark task as completed (leaves pool)
CREATE OR REPLACE FUNCTION complete_task(
p_task_id INTEGER,
p_worker_id VARCHAR(255),
p_result JSONB DEFAULT NULL
) RETURNS BOOLEAN AS $$
BEGIN
UPDATE worker_tasks SET
status = 'completed',
completed_at = NOW(),
result = p_result
WHERE id = p_task_id
AND worker_id = p_worker_id
AND status = 'running';
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
-- Function: Mark task as failed (returns to pending for retry)
CREATE OR REPLACE FUNCTION fail_task(
p_task_id INTEGER,
p_worker_id VARCHAR(255),
p_error TEXT DEFAULT NULL,
p_max_retries INTEGER DEFAULT 3
) RETURNS BOOLEAN AS $$
DECLARE
v_retry_count INTEGER;
BEGIN
-- Get current retry count
SELECT COALESCE(retry_count, 0) INTO v_retry_count
FROM worker_tasks WHERE id = p_task_id;
IF v_retry_count >= p_max_retries THEN
-- Max retries exceeded - mark as permanently failed
UPDATE worker_tasks SET
status = 'failed',
completed_at = NOW(),
error_message = p_error,
retry_count = v_retry_count + 1
WHERE id = p_task_id
AND worker_id = p_worker_id;
ELSE
-- Return to pending for retry
UPDATE worker_tasks SET
status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
error_message = p_error,
retry_count = v_retry_count + 1
WHERE id = p_task_id
AND worker_id = p_worker_id;
END IF;
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
-- Add retry_count column if not exists
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'worker_tasks' AND column_name = 'retry_count'
) THEN
ALTER TABLE worker_tasks ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0;
END IF;
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'worker_tasks' AND column_name = 'claimed_at'
) THEN
ALTER TABLE worker_tasks ADD COLUMN claimed_at TIMESTAMPTZ;
END IF;
END $$;

View File

@@ -139,6 +139,10 @@ export class ProxyRotator {
// Proxy reload interval - how often to check for proxy changes (default: 60 seconds)
private reloadIntervalMs: number = 60000;
// Fixed proxy URL (when set, bypasses normal rotation)
private fixedProxyUrl: string | null = null;
private fixedProxy: Proxy | null = null;
constructor(pool?: Pool) {
this.pool = pool || null;
}
@@ -147,6 +151,55 @@ export class ProxyRotator {
this.pool = pool;
}
/**
* Set a fixed proxy URL (bypasses rotation)
* Used by session pool to lock worker to specific proxy
*/
setFixedProxy(proxyUrl: string): void {
this.fixedProxyUrl = proxyUrl;
// Parse URL into Proxy format
try {
const url = new URL(proxyUrl);
this.fixedProxy = {
id: -1,
host: url.hostname,
port: parseInt(url.port) || 80,
username: url.username || undefined,
password: url.password || undefined,
protocol: url.protocol.replace(':', '') as 'http' | 'https' | 'socks5',
isActive: true,
proxyUrl: proxyUrl,
lastUsedAt: null,
failureCount: 0,
successCount: 0,
avgResponseTimeMs: null,
maxConnections: 1,
consecutive403Count: 0,
};
console.log(`[ProxyRotator] Fixed proxy set: ${url.hostname}:${url.port}`);
} catch (err: any) {
console.error(`[ProxyRotator] Invalid proxy URL: ${err.message}`);
this.fixedProxyUrl = null;
this.fixedProxy = null;
}
}
/**
* Clear fixed proxy (resume normal rotation)
*/
clearFixedProxy(): void {
this.fixedProxyUrl = null;
this.fixedProxy = null;
console.log('[ProxyRotator] Fixed proxy cleared, resuming rotation');
}
/**
* Check if using fixed proxy
*/
isUsingFixedProxy(): boolean {
return this.fixedProxyUrl !== null;
}
/**
* Set the reload interval for periodic proxy checks
*/
@@ -267,8 +320,13 @@ export class ProxyRotator {
/**
* Get current proxy without rotating
* Returns fixed proxy if set, otherwise current rotating proxy
*/
getCurrent(): Proxy | null {
// Return fixed proxy if set
if (this.fixedProxy) {
return this.fixedProxy;
}
if (this.proxies.length === 0) return null;
return this.proxies[this.currentIndex];
}
@@ -687,6 +745,21 @@ export class CrawlRotator {
this.proxy.setReloadInterval(ms);
}
/**
* Set a fixed proxy URL (bypasses rotation)
* Used by session pool to lock worker to specific proxy
*/
setFixedProxy(proxyUrl: string): void {
this.proxy.setFixedProxy(proxyUrl);
}
/**
* Clear fixed proxy (resume normal rotation)
*/
clearFixedProxy(): void {
this.proxy.clearFixedProxy();
}
/**
* Rotate proxy only (get new IP)
*/

View File

@@ -0,0 +1,347 @@
/**
* Worker Session Service
*
* Manages the worker session lifecycle:
* 1. Claim up to 6 tasks for same geo
* 2. Get Evomi proxy for that geo
* 3. Check IP availability (not in use, not in cooldown)
* 4. Lock IP/fingerprint to worker
* 5. Track task completion
* 6. Retire session after 6 tasks (8hr cooldown)
*/
import { pool } from '../db/pool';
import { buildEvomiProxyUrl, getEvomiConfig } from './crawl-rotator';
export interface ClaimedTask {
task_id: number;
role: string;
dispensary_id: number;
dispensary_name: string;
city: string | null;
state_code: string;
platform: string;
method: string | null;
}
export interface WorkerSession {
id: number;
ip_address: string;
fingerprint_hash: string;
fingerprint_data: Record<string, unknown> | null;
state_code: string;
city: string | null;
worker_id: string;
status: 'active' | 'cooldown' | 'available';
tasks_claimed: number;
tasks_completed: number;
tasks_failed: number;
max_tasks: number;
locked_at: Date;
}
export interface SessionWithTasks {
session: WorkerSession;
tasks: ClaimedTask[];
proxyUrl: string;
}
const MAX_TASKS_PER_SESSION = 6;
const MAX_IP_ATTEMPTS = 10; // How many IPs to try before giving up
const COOLDOWN_HOURS = 8;
/**
* Claim tasks and establish a session for a worker.
* This is the main entry point for the new worker flow.
*
* Flow:
* 1. Claim up to 6 tasks for same geo
* 2. Get Evomi proxy for that geo
* 3. Try IPs until we find one that's available
* 4. Lock IP to this worker
* 5. Return session + tasks + proxy URL
*/
export async function claimSessionWithTasks(
workerId: string,
role?: string
): Promise<SessionWithTasks | null> {
const client = await pool.connect();
try {
await client.query('BEGIN');
// Step 1: Claim up to 6 tasks for same geo
const { rows: tasks } = await client.query<ClaimedTask>(
`SELECT * FROM claim_tasks_batch($1, $2, $3)`,
[workerId, MAX_TASKS_PER_SESSION, role || null]
);
if (tasks.length === 0) {
await client.query('ROLLBACK');
console.log(`[WorkerSession] No pending tasks available for ${workerId}`);
return null;
}
// Get geo from first claimed task (all same geo)
const { state_code, city } = tasks[0];
console.log(`[WorkerSession] ${workerId} claimed ${tasks.length} tasks for ${city || 'any'}, ${state_code}`);
// Step 2: Get Evomi proxy for this geo
const evomiConfig = getEvomiConfig();
if (!evomiConfig.enabled) {
await client.query('ROLLBACK');
throw new Error('Evomi proxy not configured');
}
// Step 3: Try to get an available IP
let session: WorkerSession | null = null;
let proxyUrl: string | null = null;
for (let attempt = 0; attempt < MAX_IP_ATTEMPTS; attempt++) {
// Build proxy URL with unique session ID for each attempt
const sessionId = `${workerId}-${Date.now()}-${attempt}`;
const proxyResult = buildEvomiProxyUrl(state_code, sessionId, city || undefined);
if (!proxyResult) {
console.warn(`[WorkerSession] Failed to build proxy URL for ${state_code}`);
continue;
}
// TODO: Actually make a request through the proxy to get the real IP
// For now, we'll use a placeholder - in production, run a quick IP check
const testIp = await getProxyIp(proxyResult.url);
if (!testIp) {
console.warn(`[WorkerSession] Failed to get IP from proxy attempt ${attempt + 1}`);
continue;
}
// Step 4: Try to lock this IP
const { rows } = await client.query<WorkerSession>(
`SELECT * FROM lock_worker_session($1, $2, $3, $4)`,
[workerId, testIp, state_code, city]
);
if (rows[0]?.id) {
session = rows[0];
proxyUrl = proxyResult.url;
console.log(`[WorkerSession] ${workerId} locked IP ${testIp} for ${city || 'any'}, ${state_code}`);
break;
}
console.log(`[WorkerSession] IP ${testIp} not available (in use or cooldown), trying next...`);
}
if (!session || !proxyUrl) {
// Release claimed tasks back to pool
await client.query(`SELECT release_claimed_tasks($1)`, [workerId]);
await client.query('ROLLBACK');
console.error(`[WorkerSession] ${workerId} failed to get available IP after ${MAX_IP_ATTEMPTS} attempts`);
return null;
}
// Update session with task count
await client.query(
`SELECT session_task_claimed($1, $2)`,
[workerId, tasks.length]
);
await client.query('COMMIT');
return {
session,
tasks,
proxyUrl,
};
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
/**
* Get the real IP address from a proxy by making a test request
*/
async function getProxyIp(proxyUrl: string): Promise<string | null> {
try {
// Use a simple IP check service
const { default: axios } = await import('axios');
const { HttpsProxyAgent } = await import('https-proxy-agent');
const agent = new HttpsProxyAgent(proxyUrl);
const response = await axios.get('https://api.ipify.org?format=json', {
httpAgent: agent,
httpsAgent: agent,
timeout: 10000,
});
return response.data?.ip || null;
} catch (err: any) {
console.warn(`[WorkerSession] IP check failed: ${err.message}`);
return null;
}
}
/**
* Mark a task as started (running)
*/
export async function startTask(taskId: number, workerId: string): Promise<boolean> {
const { rows } = await pool.query(
`SELECT start_task($1, $2) as success`,
[taskId, workerId]
);
return rows[0]?.success || false;
}
/**
* Mark a task as completed
*/
export async function completeTask(
taskId: number,
workerId: string,
result?: Record<string, unknown>
): Promise<boolean> {
const client = await pool.connect();
try {
await client.query('BEGIN');
// Complete the task
const { rows } = await client.query(
`SELECT complete_task($1, $2, $3) as success`,
[taskId, workerId, result ? JSON.stringify(result) : null]
);
if (rows[0]?.success) {
// Update session counter
await client.query(`SELECT session_task_completed($1)`, [workerId]);
}
await client.query('COMMIT');
return rows[0]?.success || false;
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
/**
* Mark a task as failed (returns to pending for retry)
*/
export async function failTask(
taskId: number,
workerId: string,
error?: string
): Promise<boolean> {
const client = await pool.connect();
try {
await client.query('BEGIN');
// Fail the task (may return to pending or mark as permanently failed)
const { rows } = await client.query(
`SELECT fail_task($1, $2, $3) as success`,
[taskId, workerId, error || null]
);
if (rows[0]?.success) {
// Update session counter
await client.query(`SELECT session_task_failed($1)`, [workerId]);
}
await client.query('COMMIT');
return rows[0]?.success || false;
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
/**
* Get current session for a worker
*/
export async function getWorkerSession(workerId: string): Promise<WorkerSession | null> {
const { rows } = await pool.query(
`SELECT * FROM get_worker_session($1)`,
[workerId]
);
return rows[0] as WorkerSession || null;
}
/**
* Check if worker session is complete (all 6 tasks done)
*/
export async function isSessionComplete(workerId: string): Promise<boolean> {
const session = await getWorkerSession(workerId);
if (!session) return true; // No session = complete
const totalDone = session.tasks_completed + session.tasks_failed;
return totalDone >= session.tasks_claimed;
}
/**
* Retire a worker's session (start 8hr cooldown)
*/
export async function retireSession(workerId: string): Promise<boolean> {
const { rows } = await pool.query(
`SELECT retire_worker_session($1) as success`,
[workerId]
);
console.log(`[WorkerSession] ${workerId} session retired, IP in ${COOLDOWN_HOURS}hr cooldown`);
return rows[0]?.success || false;
}
/**
* Release any claimed tasks back to pool (for worker shutdown)
*/
export async function releaseClaimedTasks(workerId: string): Promise<number> {
const { rows } = await pool.query(
`SELECT release_claimed_tasks($1) as count`,
[workerId]
);
const count = rows[0]?.count || 0;
if (count > 0) {
console.log(`[WorkerSession] Released ${count} claimed tasks for ${workerId}`);
}
return count;
}
/**
* Cleanup: release expired sessions from cooldown
*/
export async function releaseExpiredSessions(): Promise<number> {
const { rows } = await pool.query(
`SELECT release_expired_sessions() as count`
);
return rows[0]?.count || 0;
}
/**
* Get session stats for monitoring
*/
export async function getSessionStats(): Promise<{
active: number;
cooldown: number;
available: number;
uniqueIps: number;
}> {
const { rows } = await pool.query(`
SELECT
COUNT(*) FILTER (WHERE status = 'active') as active,
COUNT(*) FILTER (WHERE status = 'cooldown') as cooldown,
COUNT(*) FILTER (WHERE status = 'available') as available,
COUNT(DISTINCT ip_address) as unique_ips
FROM worker_sessions
`);
return {
active: parseInt(rows[0]?.active || '0'),
cooldown: parseInt(rows[0]?.cooldown || '0'),
available: parseInt(rows[0]?.available || '0'),
uniqueIps: parseInt(rows[0]?.unique_ips || '0'),
};
}

View File

@@ -55,7 +55,7 @@
import { Pool } from 'pg';
import { v4 as uuidv4 } from 'uuid';
import { taskService, TaskRole, WorkerTask } from './task-service';
import { taskService, TaskRole, TaskStatus, WorkerTask } from './task-service';
import { getPool } from '../db/pool';
import os from 'os';
@@ -73,9 +73,16 @@ import { buildEvomiProxyUrl, getEvomiConfig } from '../services/crawl-rotator';
// Identity pool for diverse IP/fingerprint rotation
import { IdentityPoolService, WorkerIdentity, getRandomTaskCount } from '../services/identity-pool';
// NEW: Session-based worker pool (claim tasks first, then get IP)
import * as WorkerSession from '../services/worker-session';
// Feature flag: Use new identity pool system (set via env var)
const USE_IDENTITY_POOL = process.env.USE_IDENTITY_POOL === 'true';
// Feature flag: Use new session pool system (claim 6 tasks, then get IP)
// This is the new correct flow: claim tasks → get geo from tasks → get IP → preflight
const USE_SESSION_POOL = process.env.USE_SESSION_POOL === 'true';
// Task handlers by role
// Platform-based handlers: {task}-{platform}.ts convention
import { handleProductRefresh } from './handlers/product-refresh';
@@ -376,6 +383,18 @@ export class TaskWorker {
private identityMaxTasks: number = 5; // Random 3-5, set when identity claimed
private identityProxyUrl: string | null = null;
// ==========================================================================
// SESSION POOL TRACKING (new system - enabled via USE_SESSION_POOL)
// ==========================================================================
// Correct flow: claim tasks FIRST, then get IP based on task geo.
// Worker claims up to 6 tasks for same geo, gets IP, preflights, executes.
// After 6 tasks, IP goes into 8hr cooldown.
// ==========================================================================
private sessionTasks: WorkerSession.ClaimedTask[] = [];
private currentSession: WorkerSession.WorkerSession | null = null;
private sessionProxyUrl: string | null = null;
private sessionPreflightPassed: boolean = false;
constructor(role: TaskRole | null = null, workerId?: string) {
this.pool = getPool();
this.role = role;
@@ -1459,6 +1478,19 @@ export class TaskWorker {
// Try to claim more tasks if we have capacity
if (this.canAcceptMoreTasks()) {
// =================================================================
// NEW SESSION POOL FLOW (enabled via USE_SESSION_POOL)
// Correct order: claim tasks → get geo from tasks → get IP → preflight
// =================================================================
if (USE_SESSION_POOL) {
await this.sessionPoolMainLoop();
return;
}
// =================================================================
// LEGACY FLOWS BELOW (USE_IDENTITY_POOL or default geo session)
// =================================================================
// =================================================================
// LAZY INITIALIZATION - Initialize stealth on first task claim
// Workers start immediately and init proxies only when needed
@@ -1611,6 +1643,209 @@ export class TaskWorker {
await this.sleep(POLL_INTERVAL_MS);
}
// ===========================================================================
// SESSION POOL MAIN LOOP (new system - enabled via USE_SESSION_POOL)
// ===========================================================================
// Correct flow: claim tasks FIRST, then get IP based on task geo.
// 1. Check if we have an active session with tasks
// 2. If not, claim up to 6 tasks for same geo
// 3. Get Evomi proxy for that geo, check IP availability
// 4. Run preflight with that IP
// 5. Execute tasks (3 concurrent)
// 6. When all 6 done, retire session (8hr cooldown), repeat
// ===========================================================================
private async sessionPoolMainLoop(): Promise<void> {
// Check if session is complete (all claimed tasks done)
if (this.currentSession) {
const isComplete = await WorkerSession.isSessionComplete(this.workerId);
if (isComplete) {
console.log(`[TaskWorker] ${this.friendlyName} session complete, retiring...`);
await WorkerSession.retireSession(this.workerId);
this.currentSession = null;
this.sessionTasks = [];
this.sessionProxyUrl = null;
this.sessionPreflightPassed = false;
this.geoState = null;
this.geoCity = null;
// Continue to claim new session
}
}
// If no active session, claim new batch of tasks
if (!this.currentSession) {
console.log(`[TaskWorker] ${this.friendlyName} claiming new session...`);
// Initialize stealth if needed (for fingerprint generation)
if (!this.stealthInitialized) {
const initSuccess = await this.ensureStealthInitialized();
if (!initSuccess) {
console.log(`[TaskWorker] ${this.friendlyName} stealth init failed, waiting...`);
await this.sleep(30000);
return;
}
}
// Claim tasks and establish session
const result = await WorkerSession.claimSessionWithTasks(this.workerId, this.role || undefined);
if (!result) {
// No tasks available or couldn't get IP
console.log(`[TaskWorker] ${this.friendlyName} no session available, waiting...`);
await this.sleep(30000);
return;
}
this.currentSession = result.session;
this.sessionTasks = result.tasks;
this.sessionProxyUrl = result.proxyUrl;
this.geoState = result.session.state_code;
this.geoCity = result.session.city || null;
console.log(`[TaskWorker] ${this.friendlyName} new session: ${result.tasks.length} tasks for ${this.geoCity || 'any'}, ${this.geoState} (IP: ${result.session.ip_address})`);
// Configure proxy in crawl rotator
if (this.sessionProxyUrl) {
this.crawlRotator.setFixedProxy(this.sessionProxyUrl);
}
// Run preflight with this session's proxy
console.log(`[TaskWorker] ${this.friendlyName} running preflight for session...`);
try {
await this.runDualPreflights();
if (this.preflightHttpPassed) {
this.sessionPreflightPassed = true;
console.log(`[TaskWorker] ${this.friendlyName} session preflight PASSED (IP: ${this.preflightHttpResult?.proxyIp || 'unknown'})`);
} else {
// Preflight failed - release tasks and session
console.error(`[TaskWorker] ${this.friendlyName} session preflight FAILED, releasing tasks...`);
await WorkerSession.releaseClaimedTasks(this.workerId);
await WorkerSession.retireSession(this.workerId);
this.currentSession = null;
this.sessionTasks = [];
this.sessionProxyUrl = null;
this.sessionPreflightPassed = false;
await this.sleep(30000);
return;
}
} catch (err: any) {
console.error(`[TaskWorker] ${this.friendlyName} preflight error: ${err.message}`);
await WorkerSession.releaseClaimedTasks(this.workerId);
await WorkerSession.retireSession(this.workerId);
this.currentSession = null;
this.sessionTasks = [];
await this.sleep(30000);
return;
}
}
// We have an active session with tasks - execute next pending task
if (!this.sessionPreflightPassed) {
console.log(`[TaskWorker] ${this.friendlyName} session preflight not passed, waiting...`);
await this.sleep(10000);
return;
}
// Find next task to execute from our claimed batch
const nextTask = this.sessionTasks.find(t => !this.activeTasks.has(t.task_id));
if (!nextTask) {
// All claimed tasks are either running or done
await this.sleep(POLL_INTERVAL_MS);
return;
}
// Convert session task to worker task format
const workerTask: WorkerTask = {
id: nextTask.task_id,
role: nextTask.role as TaskRole,
dispensary_id: nextTask.dispensary_id,
dispensary_name: nextTask.dispensary_name,
method: (nextTask.method || 'http') as 'http' | 'curl' | null,
platform: nextTask.platform,
priority: 50,
status: 'claimed' as TaskStatus,
scheduled_for: null,
worker_id: this.workerId,
claimed_at: new Date(),
started_at: null,
completed_at: null,
last_heartbeat_at: null,
result: null,
error_message: null,
retry_count: 0,
max_retries: 3,
payload: null,
created_at: new Date(),
updated_at: new Date(),
};
// Mark task as running
await WorkerSession.startTask(nextTask.task_id, this.workerId);
console.log(`[TaskWorker] ${this.friendlyName} starting task ${nextTask.task_id} (${nextTask.role}) for ${nextTask.dispensary_name}`);
this.activeTasks.set(nextTask.task_id, workerTask);
// Execute task in background
const taskPromise = this.executeSessionTask(workerTask);
this.taskPromises.set(nextTask.task_id, taskPromise);
// Cleanup when done
taskPromise.finally(() => {
this.activeTasks.delete(nextTask.task_id);
this.taskPromises.delete(nextTask.task_id);
});
}
/**
* Execute a task for session pool (handles completion/failure tracking)
*/
private async executeSessionTask(task: WorkerTask): Promise<void> {
console.log(`[TaskWorker] ${this.friendlyName} executing session task ${task.id} (${task.role})`);
try {
// Get handler for this role
const handler = getHandlerForTask(task);
if (!handler) {
throw new Error(`No handler registered for role: ${task.role}`);
}
// Create context
const ctx: TaskContext = {
pool: this.pool,
workerId: this.workerId,
task,
heartbeat: async () => {
// Session tasks use different heartbeat
},
crawlRotator: this.crawlRotator,
updateStep: (step: string, detail?: string) => {
this.updateTaskStep(task.id, step, detail);
},
fingerprint: this.storedFingerprint || undefined,
};
this.updateTaskStep(task.id, 'starting', `Initializing ${task.role}`);
// Execute
const result = await handler(ctx);
this.clearTaskStep(task.id);
if (result.success) {
await WorkerSession.completeTask(task.id, this.workerId, { result: result.data });
console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED`);
} else {
await WorkerSession.failTask(task.id, this.workerId, result.error);
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} FAILED: ${result.error}`);
}
} catch (err: any) {
this.clearTaskStep(task.id);
await WorkerSession.failTask(task.id, this.workerId, err.message);
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} ERROR: ${err.message}`);
}
}
/**
* Stop the worker
*/
@@ -1619,6 +1854,16 @@ export class TaskWorker {
this.stopHeartbeat();
this.stopRegistryHeartbeat();
this.stopPeriodicStaleCleanup();
// Clean up session pool state if using new system
if (USE_SESSION_POOL && this.currentSession) {
console.log(`[TaskWorker] ${this.friendlyName} releasing session pool resources...`);
await WorkerSession.releaseClaimedTasks(this.workerId);
await WorkerSession.retireSession(this.workerId);
this.currentSession = null;
this.sessionTasks = [];
}
await this.deregister();
console.log(`[TaskWorker] ${this.friendlyName} stopped`);
}