From 1861e18396b19d0a1a970146e8040029083a53f0 Mon Sep 17 00:00:00 2001 From: Kelly Date: Sun, 14 Dec 2025 01:41:52 -0700 Subject: [PATCH] feat(workers): Implement geo-based task pools MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Workers now follow the correct flow: 1. Check what pools have pending tasks 2. Claim a pool (e.g., Phoenix AZ) 3. Get Evomi proxy for that geo 4. Run preflight with geo proxy 5. Pull tasks from pool (up to 6 stores) 6. Execute tasks 7. Release pool when exhausted (6 stores visited) Task pools group dispensaries by metro area (100mi radius): - Phoenix AZ, Tucson AZ - Los Angeles CA, San Francisco CA, San Diego CA, Sacramento CA - Denver CO, Chicago IL, Boston MA, Detroit MI - Las Vegas NV, Reno NV, Newark NJ, New York NY - Oklahoma City OK, Tulsa OK, Portland OR, Seattle WA Benefits: - Workers know geo BEFORE getting proxy (no more "No geo assigned") - IP diversity within metro area (Phoenix worker can use Tempe IP) - Simpler worker logic - just match pool geo - Pre-organized tasks, not grouped at claim time New files: - migrations/113_task_pools.sql - schema, seed data, functions - src/services/task-pool.ts - TypeScript service Env vars: - USE_TASK_POOLS=true (new system) - USE_IDENTITY_POOL=false (disabled) 🤖 Generated with [Claude Code](https://claude.ai/claude-code) Co-Authored-By: Claude --- backend/migrations/113_task_pools.sql | 381 ++++++++++++++++++++++++++ backend/src/services/task-pool.ts | 190 +++++++++++++ backend/src/tasks/task-worker.ts | 281 ++++++++++++++++++- k8s/scraper-worker.yaml | 7 + 4 files changed, 858 insertions(+), 1 deletion(-) create mode 100644 backend/migrations/113_task_pools.sql create mode 100644 backend/src/services/task-pool.ts diff --git a/backend/migrations/113_task_pools.sql b/backend/migrations/113_task_pools.sql new file mode 100644 index 00000000..a8ab2e2b --- /dev/null +++ b/backend/migrations/113_task_pools.sql @@ -0,0 +1,381 @@ +-- Task Pools: Group tasks by geo area for worker assignment +-- Workers claim a pool, get proxy for that geo, then pull tasks from pool + +-- ============================================================================ +-- TASK POOLS TABLE +-- ============================================================================ +-- Each pool represents a metro area (e.g., Phoenix AZ = 100mi radius) +-- Dispensaries are assigned to pools based on location +-- Workers claim a pool, not individual tasks + +CREATE TABLE IF NOT EXISTS task_pools ( + id SERIAL PRIMARY KEY, + name VARCHAR(100) NOT NULL UNIQUE, -- e.g., 'phoenix_az' + display_name VARCHAR(100) NOT NULL, -- e.g., 'Phoenix, AZ' + state_code VARCHAR(2) NOT NULL, -- e.g., 'AZ' + city VARCHAR(100) NOT NULL, -- e.g., 'Phoenix' + latitude DECIMAL(10, 6) NOT NULL, -- pool center lat + longitude DECIMAL(10, 6) NOT NULL, -- pool center lng + radius_miles INTEGER DEFAULT 100, -- pool radius (100mi default) + timezone VARCHAR(50) NOT NULL, -- e.g., 'America/Phoenix' + is_active BOOLEAN DEFAULT true, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +-- Index for active pools +CREATE INDEX IF NOT EXISTS idx_task_pools_active ON task_pools(is_active) WHERE is_active = true; + +-- ============================================================================ +-- LINK DISPENSARIES TO POOLS +-- ============================================================================ +-- Add pool_id to dispensaries table + +ALTER TABLE dispensaries +ADD COLUMN IF NOT EXISTS pool_id INTEGER REFERENCES task_pools(id); + +-- Index for pool membership +CREATE INDEX IF NOT EXISTS idx_dispensaries_pool ON dispensaries(pool_id) WHERE pool_id IS NOT NULL; + +-- ============================================================================ +-- WORKER POOL ASSIGNMENT +-- ============================================================================ +-- Track which pool a worker is currently assigned to + +ALTER TABLE worker_registry +ADD COLUMN IF NOT EXISTS current_pool_id INTEGER REFERENCES task_pools(id), +ADD COLUMN IF NOT EXISTS pool_claimed_at TIMESTAMPTZ, +ADD COLUMN IF NOT EXISTS pool_stores_visited INTEGER DEFAULT 0, +ADD COLUMN IF NOT EXISTS pool_max_stores INTEGER DEFAULT 6; + +-- ============================================================================ +-- SEED INITIAL POOLS +-- ============================================================================ +-- Major cannabis markets with approximate center coordinates + +INSERT INTO task_pools (name, display_name, state_code, city, latitude, longitude, timezone, radius_miles) VALUES + -- Arizona + ('phoenix_az', 'Phoenix, AZ', 'AZ', 'Phoenix', 33.4484, -112.0740, 'America/Phoenix', 100), + ('tucson_az', 'Tucson, AZ', 'AZ', 'Tucson', 32.2226, -110.9747, 'America/Phoenix', 75), + + -- California + ('los_angeles_ca', 'Los Angeles, CA', 'CA', 'Los Angeles', 34.0522, -118.2437, 'America/Los_Angeles', 100), + ('san_francisco_ca', 'San Francisco, CA', 'CA', 'San Francisco', 37.7749, -122.4194, 'America/Los_Angeles', 75), + ('san_diego_ca', 'San Diego, CA', 'CA', 'San Diego', 32.7157, -117.1611, 'America/Los_Angeles', 75), + ('sacramento_ca', 'Sacramento, CA', 'CA', 'Sacramento', 38.5816, -121.4944, 'America/Los_Angeles', 75), + + -- Colorado + ('denver_co', 'Denver, CO', 'CO', 'Denver', 39.7392, -104.9903, 'America/Denver', 100), + + -- Illinois + ('chicago_il', 'Chicago, IL', 'IL', 'Chicago', 41.8781, -87.6298, 'America/Chicago', 100), + + -- Massachusetts + ('boston_ma', 'Boston, MA', 'MA', 'Boston', 42.3601, -71.0589, 'America/New_York', 75), + + -- Michigan + ('detroit_mi', 'Detroit, MI', 'MI', 'Detroit', 42.3314, -83.0458, 'America/Detroit', 100), + + -- Nevada + ('las_vegas_nv', 'Las Vegas, NV', 'NV', 'Las Vegas', 36.1699, -115.1398, 'America/Los_Angeles', 75), + ('reno_nv', 'Reno, NV', 'NV', 'Reno', 39.5296, -119.8138, 'America/Los_Angeles', 50), + + -- New Jersey + ('newark_nj', 'Newark, NJ', 'NJ', 'Newark', 40.7357, -74.1724, 'America/New_York', 75), + + -- New York + ('new_york_ny', 'New York, NY', 'NY', 'New York', 40.7128, -74.0060, 'America/New_York', 75), + + -- Oklahoma + ('oklahoma_city_ok', 'Oklahoma City, OK', 'OK', 'Oklahoma City', 35.4676, -97.5164, 'America/Chicago', 100), + ('tulsa_ok', 'Tulsa, OK', 'OK', 'Tulsa', 36.1540, -95.9928, 'America/Chicago', 75), + + -- Oregon + ('portland_or', 'Portland, OR', 'OR', 'Portland', 45.5152, -122.6784, 'America/Los_Angeles', 75), + + -- Washington + ('seattle_wa', 'Seattle, WA', 'WA', 'Seattle', 47.6062, -122.3321, 'America/Los_Angeles', 100) + +ON CONFLICT (name) DO NOTHING; + +-- ============================================================================ +-- FUNCTION: Assign dispensary to nearest pool +-- ============================================================================ +CREATE OR REPLACE FUNCTION assign_dispensary_to_pool(disp_id INTEGER) +RETURNS INTEGER AS $$ +DECLARE + disp_lat DECIMAL(10,6); + disp_lng DECIMAL(10,6); + nearest_pool_id INTEGER; +BEGIN + -- Get dispensary coordinates + SELECT latitude, longitude INTO disp_lat, disp_lng + FROM dispensaries WHERE id = disp_id; + + IF disp_lat IS NULL OR disp_lng IS NULL THEN + RETURN NULL; + END IF; + + -- Find nearest active pool within radius + -- Using Haversine approximation (accurate enough for 100mi) + SELECT id INTO nearest_pool_id + FROM task_pools + WHERE is_active = true + AND ( + 3959 * acos( + cos(radians(latitude)) * cos(radians(disp_lat)) * + cos(radians(disp_lng) - radians(longitude)) + + sin(radians(latitude)) * sin(radians(disp_lat)) + ) + ) <= radius_miles + ORDER BY ( + 3959 * acos( + cos(radians(latitude)) * cos(radians(disp_lat)) * + cos(radians(disp_lng) - radians(longitude)) + + sin(radians(latitude)) * sin(radians(disp_lat)) + ) + ) + LIMIT 1; + + -- Update dispensary + IF nearest_pool_id IS NOT NULL THEN + UPDATE dispensaries SET pool_id = nearest_pool_id WHERE id = disp_id; + END IF; + + RETURN nearest_pool_id; +END; +$$ LANGUAGE plpgsql; + +-- ============================================================================ +-- FUNCTION: Assign all dispensaries to pools (batch) +-- ============================================================================ +CREATE OR REPLACE FUNCTION assign_all_dispensaries_to_pools() +RETURNS TABLE(assigned INTEGER, unassigned INTEGER) AS $$ +DECLARE + assigned_count INTEGER := 0; + unassigned_count INTEGER := 0; + disp RECORD; + pool_id INTEGER; +BEGIN + FOR disp IN SELECT id FROM dispensaries WHERE pool_id IS NULL AND latitude IS NOT NULL LOOP + pool_id := assign_dispensary_to_pool(disp.id); + IF pool_id IS NOT NULL THEN + assigned_count := assigned_count + 1; + ELSE + unassigned_count := unassigned_count + 1; + END IF; + END LOOP; + + RETURN QUERY SELECT assigned_count, unassigned_count; +END; +$$ LANGUAGE plpgsql; + +-- ============================================================================ +-- FUNCTION: Get pools with pending tasks +-- ============================================================================ +CREATE OR REPLACE FUNCTION get_pools_with_pending_tasks() +RETURNS TABLE( + pool_id INTEGER, + pool_name VARCHAR(100), + display_name VARCHAR(100), + state_code VARCHAR(2), + city VARCHAR(100), + timezone VARCHAR(50), + pending_count BIGINT, + store_count BIGINT +) AS $$ +BEGIN + RETURN QUERY + SELECT + tp.id as pool_id, + tp.name as pool_name, + tp.display_name, + tp.state_code, + tp.city, + tp.timezone, + COUNT(DISTINCT t.id) as pending_count, + COUNT(DISTINCT d.id) as store_count + FROM task_pools tp + JOIN dispensaries d ON d.pool_id = tp.id + JOIN tasks t ON t.dispensary_id = d.id AND t.status = 'pending' + WHERE tp.is_active = true + GROUP BY tp.id, tp.name, tp.display_name, tp.state_code, tp.city, tp.timezone + HAVING COUNT(DISTINCT t.id) > 0 + ORDER BY COUNT(DISTINCT t.id) DESC; +END; +$$ LANGUAGE plpgsql; + +-- ============================================================================ +-- FUNCTION: Worker claims a pool +-- ============================================================================ +CREATE OR REPLACE FUNCTION worker_claim_pool( + p_worker_id VARCHAR(100), + p_pool_id INTEGER DEFAULT NULL +) +RETURNS TABLE( + pool_id INTEGER, + pool_name VARCHAR(100), + display_name VARCHAR(100), + state_code VARCHAR(2), + city VARCHAR(100), + latitude DECIMAL(10,6), + longitude DECIMAL(10,6), + timezone VARCHAR(50) +) AS $$ +DECLARE + claimed_pool_id INTEGER; +BEGIN + -- If no pool specified, pick the one with most pending tasks + IF p_pool_id IS NULL THEN + SELECT tp.id INTO claimed_pool_id + FROM task_pools tp + JOIN dispensaries d ON d.pool_id = tp.id + JOIN tasks t ON t.dispensary_id = d.id AND t.status = 'pending' + WHERE tp.is_active = true + GROUP BY tp.id + ORDER BY COUNT(DISTINCT t.id) DESC + LIMIT 1; + ELSE + claimed_pool_id := p_pool_id; + END IF; + + IF claimed_pool_id IS NULL THEN + RETURN; + END IF; + + -- Update worker registry with pool assignment + UPDATE worker_registry + SET + current_pool_id = claimed_pool_id, + pool_claimed_at = NOW(), + pool_stores_visited = 0, + pool_max_stores = 6, + updated_at = NOW() + WHERE worker_id = p_worker_id; + + -- Return pool info + RETURN QUERY + SELECT + tp.id, + tp.name, + tp.display_name, + tp.state_code, + tp.city, + tp.latitude, + tp.longitude, + tp.timezone + FROM task_pools tp + WHERE tp.id = claimed_pool_id; +END; +$$ LANGUAGE plpgsql; + +-- ============================================================================ +-- FUNCTION: Pull tasks from worker's pool (up to 6 stores) +-- ============================================================================ +CREATE OR REPLACE FUNCTION pull_tasks_from_pool( + p_worker_id VARCHAR(100), + p_max_stores INTEGER DEFAULT 6 +) +RETURNS TABLE( + task_id INTEGER, + dispensary_id INTEGER, + dispensary_name VARCHAR(255), + role VARCHAR(50), + platform VARCHAR(50), + method VARCHAR(20) +) AS $$ +DECLARE + worker_pool_id INTEGER; + stores_visited INTEGER; + max_stores INTEGER; + stores_remaining INTEGER; +BEGIN + -- Get worker's current pool and store count + SELECT current_pool_id, pool_stores_visited, pool_max_stores + INTO worker_pool_id, stores_visited, max_stores + FROM worker_registry + WHERE worker_id = p_worker_id; + + IF worker_pool_id IS NULL THEN + RAISE EXCEPTION 'Worker % has no pool assigned', p_worker_id; + END IF; + + stores_remaining := max_stores - stores_visited; + IF stores_remaining <= 0 THEN + RETURN; -- Worker exhausted + END IF; + + -- Claim tasks from pool (one task per store, up to remaining capacity) + RETURN QUERY + WITH available_stores AS ( + SELECT DISTINCT ON (d.id) + t.id as task_id, + d.id as dispensary_id, + d.name as dispensary_name, + t.role, + t.platform, + t.method + FROM tasks t + JOIN dispensaries d ON d.id = t.dispensary_id + WHERE d.pool_id = worker_pool_id + AND t.status = 'pending' + AND t.scheduled_for <= NOW() + ORDER BY d.id, t.priority DESC, t.created_at ASC + LIMIT stores_remaining + ), + claimed AS ( + UPDATE tasks + SET + status = 'claimed', + claimed_by = p_worker_id, + claimed_at = NOW() + WHERE id IN (SELECT task_id FROM available_stores) + RETURNING id + ) + SELECT + av.task_id, + av.dispensary_id, + av.dispensary_name, + av.role, + av.platform, + av.method + FROM available_stores av + WHERE av.task_id IN (SELECT id FROM claimed); + + -- Update worker store count + UPDATE worker_registry + SET + pool_stores_visited = pool_stores_visited + ( + SELECT COUNT(DISTINCT dispensary_id) + FROM tasks + WHERE claimed_by = p_worker_id AND status = 'claimed' + ), + updated_at = NOW() + WHERE worker_id = p_worker_id; +END; +$$ LANGUAGE plpgsql; + +-- ============================================================================ +-- FUNCTION: Worker releases pool (exhausted or done) +-- ============================================================================ +CREATE OR REPLACE FUNCTION worker_release_pool(p_worker_id VARCHAR(100)) +RETURNS BOOLEAN AS $$ +BEGIN + UPDATE worker_registry + SET + current_pool_id = NULL, + pool_claimed_at = NULL, + pool_stores_visited = 0, + current_state = NULL, + current_city = NULL, + updated_at = NOW() + WHERE worker_id = p_worker_id; + + RETURN true; +END; +$$ LANGUAGE plpgsql; + +-- ============================================================================ +-- RUN: Assign existing dispensaries to pools +-- ============================================================================ +SELECT * FROM assign_all_dispensaries_to_pools(); diff --git a/backend/src/services/task-pool.ts b/backend/src/services/task-pool.ts new file mode 100644 index 00000000..3c44dd92 --- /dev/null +++ b/backend/src/services/task-pool.ts @@ -0,0 +1,190 @@ +/** + * Task Pool Service + * + * Manages the pool-based worker flow: + * 1. Worker checks what pools have pending tasks + * 2. Worker claims a pool (gets geo info) + * 3. Worker gets proxy for that geo + * 4. Worker runs preflight + * 5. Worker pulls tasks from pool (up to 6 stores) + * 6. Worker executes tasks + * 7. Worker releases pool when exhausted (6 stores visited) + */ + +import { pool } from '../db/pool'; +import { buildEvomiProxyUrl, getEvomiConfig } from './crawl-rotator'; + +export interface TaskPool { + pool_id: number; + pool_name: string; + display_name: string; + state_code: string; + city: string; + latitude: number; + longitude: number; + timezone: string; +} + +export interface PoolWithPendingTasks extends TaskPool { + pending_count: number; + store_count: number; +} + +export interface PoolTask { + task_id: number; + dispensary_id: number; + dispensary_name: string; + role: string; + platform: string; + method: string | null; +} + +export interface ClaimedPoolResult { + pool: TaskPool; + proxyUrl: string; +} + +/** + * Get all pools that have pending tasks + */ +export async function getPoolsWithPendingTasks(): Promise { + const { rows } = await pool.query( + `SELECT * FROM get_pools_with_pending_tasks()` + ); + return rows; +} + +/** + * Worker claims a pool + * Returns pool info + proxy URL for that geo + */ +export async function claimPool( + workerId: string, + poolId?: number +): Promise { + // Claim pool in database + const { rows } = await pool.query( + `SELECT * FROM worker_claim_pool($1, $2)`, + [workerId, poolId || null] + ); + + if (rows.length === 0) { + console.log(`[TaskPool] No pools available for ${workerId}`); + return null; + } + + const claimedPool = rows[0]; + + // Build Evomi proxy URL for this pool's geo + const evomiConfig = getEvomiConfig(); + if (!evomiConfig.enabled) { + throw new Error('Evomi proxy not configured'); + } + + const proxyResult = buildEvomiProxyUrl( + claimedPool.state_code, + workerId, + claimedPool.city.toLowerCase().replace(/\s+/g, '.') + ); + + if (!proxyResult) { + throw new Error(`Failed to build proxy URL for ${claimedPool.display_name}`); + } + + console.log(`[TaskPool] ${workerId} claimed pool: ${claimedPool.display_name}`); + + return { + pool: claimedPool, + proxyUrl: proxyResult.url, + }; +} + +/** + * Pull tasks from worker's assigned pool + * Returns up to 6 stores worth of tasks + */ +export async function pullTasksFromPool( + workerId: string, + maxStores: number = 6 +): Promise { + const { rows } = await pool.query( + `SELECT * FROM pull_tasks_from_pool($1, $2)`, + [workerId, maxStores] + ); + + if (rows.length > 0) { + const storeIds = [...new Set(rows.map(t => t.dispensary_id))]; + console.log(`[TaskPool] ${workerId} pulled ${rows.length} tasks for ${storeIds.length} stores`); + } + + return rows; +} + +/** + * Worker releases their pool (exhausted or done) + */ +export async function releasePool(workerId: string): Promise { + const { rows } = await pool.query( + `SELECT worker_release_pool($1) as success`, + [workerId] + ); + console.log(`[TaskPool] ${workerId} released pool`); + return rows[0]?.success || false; +} + +/** + * Check if worker is exhausted (visited 6 stores) + */ +export async function isWorkerExhausted(workerId: string): Promise { + const { rows } = await pool.query( + `SELECT pool_stores_visited >= pool_max_stores as exhausted + FROM worker_registry WHERE worker_id = $1`, + [workerId] + ); + return rows[0]?.exhausted || false; +} + +/** + * Get worker's current pool info + */ +export async function getWorkerPool(workerId: string): Promise { + const { rows } = await pool.query( + `SELECT tp.id as pool_id, tp.name as pool_name, tp.display_name, + tp.state_code, tp.city, tp.latitude, tp.longitude, tp.timezone + FROM worker_registry wr + JOIN task_pools tp ON tp.id = wr.current_pool_id + WHERE wr.worker_id = $1`, + [workerId] + ); + return rows[0] || null; +} + +/** + * Update worker's store visit count + */ +export async function incrementStoreVisits( + workerId: string, + count: number = 1 +): Promise { + await pool.query( + `UPDATE worker_registry + SET pool_stores_visited = pool_stores_visited + $2, updated_at = NOW() + WHERE worker_id = $1`, + [workerId, count] + ); +} + +/** + * Update worker_registry with pool geo info (for dashboard) + */ +export async function updateWorkerPoolGeo( + workerId: string, + poolInfo: TaskPool +): Promise { + await pool.query( + `UPDATE worker_registry + SET current_state = $2, current_city = $3, updated_at = NOW() + WHERE worker_id = $1`, + [workerId, poolInfo.state_code, poolInfo.city] + ); +} diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 08caa94f..ad1cf861 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -76,6 +76,13 @@ import { IdentityPoolService, WorkerIdentity, getRandomTaskCount } from '../serv // NEW: Session-based worker pool (claim tasks first, then get IP) import * as WorkerSession from '../services/worker-session'; +// NEW: Task pool system (geo-based pools) +import * as TaskPool from '../services/task-pool'; + +// Feature flag: Use new task pool system (geo-based pools) +// This is the correct flow: check pools → claim pool → get proxy → preflight → pull tasks +const USE_TASK_POOLS = process.env.USE_TASK_POOLS === 'true'; + // Feature flag: Use new identity pool system (set via env var) const USE_IDENTITY_POOL = process.env.USE_IDENTITY_POOL === 'true'; @@ -1490,7 +1497,16 @@ export class TaskWorker { // Try to claim more tasks if we have capacity if (this.canAcceptMoreTasks()) { // ================================================================= - // NEW SESSION POOL FLOW (enabled via USE_SESSION_POOL) + // TASK POOL FLOW (enabled via USE_TASK_POOLS) + // Correct order: check pools → claim pool → get proxy → preflight → pull tasks + // ================================================================= + if (USE_TASK_POOLS) { + await this.taskPoolMainLoop(); + return; + } + + // ================================================================= + // SESSION POOL FLOW (enabled via USE_SESSION_POOL) // Correct order: claim tasks → get geo from tasks → get IP → preflight // ================================================================= if (USE_SESSION_POOL) { @@ -1825,6 +1841,269 @@ export class TaskWorker { }); } + // =========================================================================== + // TASK POOL MAIN LOOP (new system - enabled via USE_TASK_POOLS) + // =========================================================================== + // Correct flow: + // 1. Check what pools have pending tasks + // 2. Claim a pool (geo-based) + // 3. Get Evomi proxy for that pool's geo + // 4. Run preflight with that proxy + // 5. Pull tasks from pool (up to 6 stores) + // 6. Execute tasks + // 7. When exhausted (6 stores), release pool and repeat + // =========================================================================== + private currentPoolInfo: TaskPool.TaskPool | null = null; + private poolProxyUrl: string | null = null; + private poolPreflightPassed: boolean = false; + private poolTasks: TaskPool.PoolTask[] = []; + + private async taskPoolMainLoop(): Promise { + // Check if worker is exhausted (visited 6 stores) + if (this.currentPoolInfo) { + const exhausted = await TaskPool.isWorkerExhausted(this.workerId); + if (exhausted) { + console.log(`[TaskWorker] ${this.friendlyName} exhausted (6 stores), releasing pool...`); + await TaskPool.releasePool(this.workerId); + this.currentPoolInfo = null; + this.poolProxyUrl = null; + this.poolPreflightPassed = false; + this.poolTasks = []; + this.geoState = null; + this.geoCity = null; + this.crawlRotator.clearFixedProxy(); + // Continue to claim new pool + } + } + + // If no pool claimed, check what's available and claim one + if (!this.currentPoolInfo) { + // Step 1: Check what pools have pending tasks + this.setPreflightStep('checking', 'Checking task pools'); + const pools = await TaskPool.getPoolsWithPendingTasks(); + + if (pools.length === 0) { + this.setPreflightStep('waiting', 'No pools with pending tasks'); + await this.sleep(30000); + return; + } + + console.log(`[TaskWorker] ${this.friendlyName} found ${pools.length} pools with tasks`); + pools.slice(0, 3).forEach(p => { + console.log(`[TaskWorker] - ${p.display_name}: ${p.pending_count} tasks, ${p.store_count} stores`); + }); + + // Step 2: Claim a pool + this.setPreflightStep('claiming', 'Claiming task pool'); + const result = await TaskPool.claimPool(this.workerId); + + if (!result) { + this.setPreflightStep('waiting', 'No pools available'); + await this.sleep(30000); + return; + } + + this.currentPoolInfo = result.pool; + this.poolProxyUrl = result.proxyUrl; + this.geoState = result.pool.state_code; + this.geoCity = result.pool.city; + + // Update dashboard with geo info + await TaskPool.updateWorkerPoolGeo(this.workerId, result.pool); + + console.log(`[TaskWorker] ${this.friendlyName} claimed pool: ${result.pool.display_name}`); + + // Step 3: Configure proxy for this pool's geo + this.setPreflightStep('proxy', `Setting proxy for ${result.pool.display_name}`); + this.crawlRotator.setFixedProxy(this.poolProxyUrl); + + // Step 4: Initialize stealth if needed + if (!this.stealthInitialized) { + this.setPreflightStep('init', 'Initializing stealth plugins'); + const initSuccess = await this.initStealthOnly(); + if (!initSuccess) { + this.setPreflightStep('init_failed', 'Stealth init failed'); + await TaskPool.releasePool(this.workerId); + this.currentPoolInfo = null; + this.poolProxyUrl = null; + await this.sleep(30000); + return; + } + } + + // Step 5: Run preflight with pool's proxy + this.setPreflightStep('preflight', 'Running browser preflight'); + console.log(`[TaskWorker] ${this.friendlyName} running preflight for ${result.pool.display_name}...`); + + try { + await this.runDualPreflights(); + + if (this.preflightHttpPassed) { + this.poolPreflightPassed = true; + this.setPreflightStep('ready', `Qualified - ${result.pool.display_name}`); + console.log(`[TaskWorker] ${this.friendlyName} preflight PASSED (IP: ${this.preflightHttpResult?.proxyIp || 'unknown'})`); + } else { + this.setPreflightStep('failed', this.preflightHttpResult?.error || 'Preflight failed'); + console.error(`[TaskWorker] ${this.friendlyName} preflight FAILED, releasing pool...`); + await TaskPool.releasePool(this.workerId); + this.currentPoolInfo = null; + this.poolProxyUrl = null; + this.poolPreflightPassed = false; + this.crawlRotator.clearFixedProxy(); + await this.sleep(30000); + return; + } + } catch (err: any) { + this.setPreflightStep('error', err.message); + console.error(`[TaskWorker] ${this.friendlyName} preflight error: ${err.message}`); + await TaskPool.releasePool(this.workerId); + this.currentPoolInfo = null; + this.poolProxyUrl = null; + await this.sleep(30000); + return; + } + } + + // We have a pool and preflight passed - pull and execute tasks + if (!this.poolPreflightPassed) { + console.log(`[TaskWorker] ${this.friendlyName} pool preflight not passed, waiting...`); + await this.sleep(10000); + return; + } + + // Pull tasks from pool if we don't have any + if (this.poolTasks.length === 0 || this.poolTasks.every(t => this.activeTasks.has(t.task_id))) { + const tasks = await TaskPool.pullTasksFromPool(this.workerId); + if (tasks.length === 0) { + // No more tasks in this pool - check if exhausted + const exhausted = await TaskPool.isWorkerExhausted(this.workerId); + if (exhausted) { + console.log(`[TaskWorker] ${this.friendlyName} pool exhausted, will release on next loop`); + } else { + console.log(`[TaskWorker] ${this.friendlyName} no tasks in pool, waiting...`); + } + await this.sleep(POLL_INTERVAL_MS); + return; + } + this.poolTasks = tasks; + } + + // Find next task to execute + const nextTask = this.poolTasks.find(t => !this.activeTasks.has(t.task_id)); + if (!nextTask) { + await this.sleep(POLL_INTERVAL_MS); + return; + } + + // Convert pool task to worker task format + const workerTask: WorkerTask = { + id: nextTask.task_id, + role: nextTask.role as TaskRole, + dispensary_id: nextTask.dispensary_id, + dispensary_name: nextTask.dispensary_name, + method: (nextTask.method || 'http') as 'http' | 'curl' | null, + platform: nextTask.platform, + priority: 50, + status: 'claimed' as TaskStatus, + scheduled_for: null, + worker_id: this.workerId, + claimed_at: new Date(), + started_at: null, + completed_at: null, + last_heartbeat_at: null, + result: null, + error_message: null, + retry_count: 0, + max_retries: 3, + payload: null, + created_at: new Date(), + updated_at: new Date(), + }; + + console.log(`[TaskWorker] ${this.friendlyName} starting task ${nextTask.task_id} (${nextTask.role}) for ${nextTask.dispensary_name}`); + + this.activeTasks.set(nextTask.task_id, workerTask); + + // Execute task in background + const taskPromise = this.executePoolTask(workerTask); + this.taskPromises.set(nextTask.task_id, taskPromise); + + // Cleanup when done + taskPromise.finally(() => { + this.activeTasks.delete(nextTask.task_id); + this.taskPromises.delete(nextTask.task_id); + // Remove from poolTasks + this.poolTasks = this.poolTasks.filter(t => t.task_id !== nextTask.task_id); + }); + } + + /** + * Initialize stealth WITHOUT running preflight + * Used by task pool flow where preflight runs after pool is claimed + */ + private async initStealthOnly(): Promise { + try { + await this.ensureStealthReady(); + this.stealthInitialized = true; + return true; + } catch (err: any) { + console.error(`[TaskWorker] Stealth init failed: ${err.message}`); + return false; + } + } + + /** + * Execute a task for task pool (handles completion/failure tracking) + */ + private async executePoolTask(task: WorkerTask): Promise { + console.log(`[TaskWorker] ${this.friendlyName} executing pool task ${task.id} (${task.role})`); + + try { + // Mark as running + await taskService.startTask(task.id); + + // Get handler for this role + const handler = getHandlerForTask(task); + if (!handler) { + throw new Error(`No handler registered for role: ${task.role}`); + } + + // Create context + const ctx: TaskContext = { + pool: this.pool, + workerId: this.workerId, + task, + heartbeat: async () => { + await taskService.heartbeat(task.id); + }, + crawlRotator: this.crawlRotator, + updateStep: (step: string, detail?: string) => { + this.updateTaskStep(task.id, step, detail); + }, + fingerprint: this.storedFingerprint || undefined, + }; + + this.updateTaskStep(task.id, 'starting', `Initializing ${task.role}`); + + // Execute + const result = await handler(ctx); + + this.clearTaskStep(task.id); + + if (result.success) { + await taskService.completeTask(task.id, { result: result.data }); + console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED`); + } else { + await taskService.failTask(task.id, result.error || 'Unknown error'); + console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} FAILED: ${result.error}`); + } + } catch (err: any) { + this.clearTaskStep(task.id); + await taskService.failTask(task.id, err.message); + console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} ERROR: ${err.message}`); + } + } + /** * Execute a task for session pool (handles completion/failure tracking) */ diff --git a/k8s/scraper-worker.yaml b/k8s/scraper-worker.yaml index f3221a2b..df262438 100644 --- a/k8s/scraper-worker.yaml +++ b/k8s/scraper-worker.yaml @@ -51,6 +51,13 @@ spec: # 3 browsers × ~400MB = ~1.3GB (safe for 2GB pod limit) - name: MAX_CONCURRENT_TASKS value: "3" + # Task Pool System (geo-based pools) + # Correct flow: check pools → claim pool → get proxy → preflight → pull tasks + - name: USE_TASK_POOLS + value: "true" + # Disable legacy identity pool + - name: USE_IDENTITY_POOL + value: "false" resources: requests: memory: "1Gi"