diff --git a/backend/migrations/108_worker_geo_sessions.sql b/backend/migrations/108_worker_geo_sessions.sql new file mode 100644 index 00000000..f33fd0f2 --- /dev/null +++ b/backend/migrations/108_worker_geo_sessions.sql @@ -0,0 +1,231 @@ +-- Migration: 108_worker_geo_sessions.sql +-- Description: Add geo session tracking to worker_registry for state-based task assignment +-- Created: 2025-12-13 + +-- Worker geo session columns +-- Worker qualifies with a geo (state/city), then only claims tasks matching that geo +ALTER TABLE worker_registry +ADD COLUMN IF NOT EXISTS current_state VARCHAR(2); + +ALTER TABLE worker_registry +ADD COLUMN IF NOT EXISTS current_city VARCHAR(100); + +ALTER TABLE worker_registry +ADD COLUMN IF NOT EXISTS geo_session_started_at TIMESTAMPTZ; + +ALTER TABLE worker_registry +ADD COLUMN IF NOT EXISTS session_task_count INT DEFAULT 0; + +ALTER TABLE worker_registry +ADD COLUMN IF NOT EXISTS session_max_tasks INT DEFAULT 7; + +ALTER TABLE worker_registry +ADD COLUMN IF NOT EXISTS proxy_geo VARCHAR(100); + +-- Comments +COMMENT ON COLUMN worker_registry.current_state IS 'Worker''s current geo assignment (US state code, e.g., AZ)'; +COMMENT ON COLUMN worker_registry.current_city IS 'Worker''s current city assignment (optional, e.g., phoenix)'; +COMMENT ON COLUMN worker_registry.geo_session_started_at IS 'When worker''s current geo session started'; +COMMENT ON COLUMN worker_registry.session_task_count IS 'Number of tasks completed in current geo session'; +COMMENT ON COLUMN worker_registry.session_max_tasks IS 'Max tasks per geo session before re-qualification (default 7)'; +COMMENT ON COLUMN worker_registry.proxy_geo IS 'Geo target string used for proxy (e.g., "arizona" or "phoenix, arizona")'; + +-- Index for finding workers by state +CREATE INDEX IF NOT EXISTS idx_worker_registry_current_state + ON worker_registry(current_state) + WHERE current_state IS NOT NULL; + +-- ============================================================ +-- UPDATED claim_task FUNCTION +-- Now filters by worker's geo session state +-- ============================================================ +CREATE OR REPLACE FUNCTION claim_task( + p_role VARCHAR(50), + p_worker_id VARCHAR(100), + p_curl_passed BOOLEAN DEFAULT TRUE, + p_http_passed BOOLEAN DEFAULT FALSE +) RETURNS worker_tasks AS $$ +DECLARE + claimed_task worker_tasks; + worker_state VARCHAR(2); + session_valid BOOLEAN; + session_tasks INT; + max_tasks INT; +BEGIN + -- Get worker's current geo session info + SELECT + current_state, + session_task_count, + session_max_tasks, + (geo_session_started_at IS NOT NULL AND geo_session_started_at > NOW() - INTERVAL '60 minutes') + INTO worker_state, session_tasks, max_tasks, session_valid + FROM worker_registry + WHERE worker_id = p_worker_id; + + -- If no valid geo session, or session exhausted, worker can't claim tasks + -- Worker must re-qualify first + IF worker_state IS NULL OR NOT session_valid OR session_tasks >= COALESCE(max_tasks, 7) THEN + RETURN NULL; + END IF; + + -- Claim task matching worker's state + UPDATE worker_tasks + SET + status = 'claimed', + worker_id = p_worker_id, + claimed_at = NOW(), + updated_at = NOW() + WHERE id = ( + SELECT wt.id FROM worker_tasks wt + JOIN dispensaries d ON wt.dispensary_id = d.id + WHERE wt.role = p_role + AND wt.status = 'pending' + AND (wt.scheduled_for IS NULL OR wt.scheduled_for <= NOW()) + -- GEO FILTER: Task's dispensary must match worker's state + AND d.state = worker_state + -- Method compatibility: worker must have passed the required preflight + AND ( + wt.method IS NULL -- No preference, any worker can claim + OR (wt.method = 'curl' AND p_curl_passed = TRUE) + OR (wt.method = 'http' AND p_http_passed = TRUE) + ) + -- Exclude stores that already have an active task + AND (wt.dispensary_id IS NULL OR wt.dispensary_id NOT IN ( + SELECT dispensary_id FROM worker_tasks + WHERE status IN ('claimed', 'running') + AND dispensary_id IS NOT NULL + )) + ORDER BY wt.priority DESC, wt.created_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + RETURNING * INTO claimed_task; + + -- If task claimed, increment session task count + -- Note: Use claimed_task.id IS NOT NULL (not claimed_task IS NOT NULL) + -- PostgreSQL composite type NULL check quirk + IF claimed_task.id IS NOT NULL THEN + UPDATE worker_registry + SET session_task_count = session_task_count + 1 + WHERE worker_id = p_worker_id; + END IF; + + RETURN claimed_task; +END; +$$ LANGUAGE plpgsql; + +-- ============================================================ +-- FUNCTION: assign_worker_geo +-- Assigns a geo session to a worker based on demand +-- Returns the assigned state, or NULL if no tasks available +-- ============================================================ +CREATE OR REPLACE FUNCTION assign_worker_geo( + p_worker_id VARCHAR(100) +) RETURNS VARCHAR(2) AS $$ +DECLARE + assigned_state VARCHAR(2); +BEGIN + -- Find state with highest demand (pending tasks) and lowest coverage (workers) + SELECT d.state INTO assigned_state + FROM dispensaries d + JOIN worker_tasks wt ON wt.dispensary_id = d.id + LEFT JOIN worker_registry wr ON wr.current_state = d.state + AND wr.status = 'active' + AND wr.geo_session_started_at > NOW() - INTERVAL '60 minutes' + WHERE wt.status = 'pending' + AND d.platform_dispensary_id IS NOT NULL + GROUP BY d.state + ORDER BY + COUNT(wt.id) DESC, -- Most pending tasks first + COUNT(DISTINCT wr.worker_id) ASC -- Fewest workers second + LIMIT 1; + + -- If no pending tasks anywhere, return NULL + IF assigned_state IS NULL THEN + RETURN NULL; + END IF; + + -- Assign the state to this worker + UPDATE worker_registry + SET + current_state = assigned_state, + current_city = NULL, -- City assigned later if available + geo_session_started_at = NOW(), + session_task_count = 0 + WHERE worker_id = p_worker_id; + + RETURN assigned_state; +END; +$$ LANGUAGE plpgsql; + +-- ============================================================ +-- FUNCTION: check_worker_geo_session +-- Returns info about worker's current geo session +-- ============================================================ +CREATE OR REPLACE FUNCTION check_worker_geo_session( + p_worker_id VARCHAR(100) +) RETURNS TABLE ( + current_state VARCHAR(2), + current_city VARCHAR(100), + session_valid BOOLEAN, + session_tasks_remaining INT, + session_minutes_remaining INT +) AS $$ +BEGIN + RETURN QUERY + SELECT + wr.current_state, + wr.current_city, + (wr.geo_session_started_at IS NOT NULL AND wr.geo_session_started_at > NOW() - INTERVAL '60 minutes') as session_valid, + GREATEST(0, wr.session_max_tasks - wr.session_task_count) as session_tasks_remaining, + GREATEST(0, EXTRACT(EPOCH FROM (wr.geo_session_started_at + INTERVAL '60 minutes' - NOW())) / 60)::INT as session_minutes_remaining + FROM worker_registry wr + WHERE wr.worker_id = p_worker_id; +END; +$$ LANGUAGE plpgsql; + +-- View for worker thinness per state +-- Derives states from dispensaries table - no external states table dependency +CREATE OR REPLACE VIEW worker_state_capacity AS +WITH active_states AS ( + -- Get unique states from dispensaries with valid platform IDs + SELECT DISTINCT state as code + FROM dispensaries + WHERE state IS NOT NULL + AND platform_dispensary_id IS NOT NULL +), +pending_by_state AS ( + SELECT d.state, COUNT(*) as count + FROM worker_tasks t + JOIN dispensaries d ON t.dispensary_id = d.id + WHERE t.status = 'pending' + AND d.state IS NOT NULL + GROUP BY d.state +), +workers_by_state AS ( + SELECT + current_state, + COUNT(*) as count, + SUM(GREATEST(0, session_max_tasks - session_task_count)) as remaining_capacity + FROM worker_registry + WHERE status IN ('active', 'idle') -- Include both active and idle workers + AND preflight_http_status = 'passed' + AND current_state IS NOT NULL + AND geo_session_started_at > NOW() - INTERVAL '60 minutes' + GROUP BY current_state +) +SELECT + s.code as state, + s.code as state_name, -- Use code as name since we don't have a states lookup table + COALESCE(p.count, 0) as pending_tasks, + COALESCE(w.count, 0) as workers_on_state, + COALESCE(w.remaining_capacity, 0) as remaining_capacity, + CASE + WHEN COALESCE(w.remaining_capacity, 0) = 0 AND COALESCE(p.count, 0) > 0 THEN 'no_coverage' + WHEN COALESCE(w.remaining_capacity, 0) < COALESCE(p.count, 0) THEN 'thin' + ELSE 'ok' + END as status +FROM active_states s +LEFT JOIN pending_by_state p ON p.state = s.code +LEFT JOIN workers_by_state w ON w.current_state = s.code +ORDER BY COALESCE(p.count, 0) DESC; diff --git a/backend/src/routes/worker-registry.ts b/backend/src/routes/worker-registry.ts index 42a4f391..cf4b8ac2 100644 --- a/backend/src/routes/worker-registry.ts +++ b/backend/src/routes/worker-registry.ts @@ -161,6 +161,9 @@ router.post('/heartbeat', async (req: Request, res: Response) => { current_step_detail, current_step_started_at, task_steps, + // Geo session fields + geo_state, + geo_city, } = req.body; if (!worker_id) { @@ -178,8 +181,13 @@ router.post('/heartbeat', async (req: Request, res: Response) => { if (current_step_detail) metadata.current_step_detail = current_step_detail; if (current_step_started_at) metadata.current_step_started_at = current_step_started_at; if (task_steps) metadata.task_steps = task_steps; + // Geo session (also store in metadata for dashboard visibility) + if (geo_state) metadata.geo_state = geo_state; + if (geo_city) metadata.geo_city = geo_city; // Store resources in metadata jsonb column + // Note: current_state and current_city columns are managed by assign_worker_geo() SQL function + // The metadata.geo_state/geo_city are for dashboard display backup const { rows } = await pool.query(` UPDATE worker_registry SET last_heartbeat_at = NOW(), @@ -188,7 +196,7 @@ router.post('/heartbeat', async (req: Request, res: Response) => { metadata = COALESCE(metadata, '{}'::jsonb) || COALESCE($4::jsonb, '{}'::jsonb), updated_at = NOW() WHERE worker_id = $3 - RETURNING id, friendly_name, status + RETURNING id, friendly_name, status, current_state, current_city `, [current_task_id || null, status, worker_id, Object.keys(metadata).length > 0 ? JSON.stringify(metadata) : null]); if (rows.length === 0) { @@ -381,6 +389,13 @@ router.get('/workers', async (req: Request, res: Response) => { WHEN preflight_http_status = 'passed' THEN true ELSE false END as is_qualified, + -- Geo session fields + current_state, + current_city, + geo_session_started_at, + session_task_count, + session_max_tasks, + proxy_geo, -- Full metadata for resources metadata, EXTRACT(EPOCH FROM (NOW() - last_heartbeat_at)) as seconds_since_heartbeat, @@ -888,6 +903,118 @@ router.get('/pods', async (_req: Request, res: Response) => { // PREFLIGHT SMOKE TEST // ============================================================ +// ============================================================ +// GEO SESSION & STATE CAPACITY +// ============================================================ + +/** + * GET /api/worker-registry/state-capacity + * Get worker capacity (thinness) by state + * Shows pending tasks vs worker coverage per state + */ +router.get('/state-capacity', async (_req: Request, res: Response) => { + try { + // Check if the view exists first + const viewExists = await pool.query(` + SELECT EXISTS ( + SELECT FROM pg_views WHERE viewname = 'worker_state_capacity' + ) as exists + `); + + if (!viewExists.rows[0].exists) { + // View doesn't exist yet - return empty array + return res.json({ + success: true, + states: [], + summary: { + total_pending: 0, + total_workers: 0, + states_with_no_coverage: 0, + states_thin: 0 + } + }); + } + + const { rows } = await pool.query(` + SELECT * FROM worker_state_capacity + ORDER BY pending_tasks DESC NULLS LAST + `); + + // Calculate summary + const summary = rows.reduce((acc, row) => { + acc.total_pending += parseInt(row.pending_tasks) || 0; + acc.total_workers += parseInt(row.workers_on_state) || 0; + if (row.status === 'no_coverage') acc.states_with_no_coverage++; + if (row.status === 'thin') acc.states_thin++; + return acc; + }, { + total_pending: 0, + total_workers: 0, + states_with_no_coverage: 0, + states_thin: 0 + }); + + res.json({ + success: true, + states: rows, + summary + }); + } catch (error: any) { + console.error('[WorkerRegistry] State capacity error:', error); + res.status(500).json({ success: false, error: error.message }); + } +}); + +/** + * GET /api/worker-registry/geo-sessions + * Get all active geo sessions + */ +router.get('/geo-sessions', async (_req: Request, res: Response) => { + try { + const { rows } = await pool.query(` + SELECT + worker_id, + friendly_name, + current_state, + current_city, + geo_session_started_at, + session_task_count, + session_max_tasks, + proxy_geo, + GREATEST(0, session_max_tasks - session_task_count) as tasks_remaining, + EXTRACT(EPOCH FROM (geo_session_started_at + INTERVAL '60 minutes' - NOW())) / 60 as minutes_remaining, + status + FROM worker_registry + WHERE current_state IS NOT NULL + AND geo_session_started_at > NOW() - INTERVAL '60 minutes' + AND status IN ('active', 'idle') + ORDER BY current_state, session_task_count DESC + `); + + // Group by state + const byState: Record = {}; + for (const row of rows) { + const state = row.current_state; + if (!byState[state]) byState[state] = []; + byState[state].push(row); + } + + res.json({ + success: true, + sessions: rows, + by_state: byState, + total_active_sessions: rows.length + }); + } catch (error: any) { + console.error('[WorkerRegistry] Geo sessions error:', error); + res.status(500).json({ success: false, error: error.message }); + } +}); + +// ============================================================ +// PREFLIGHT SMOKE TEST +// ============================================================ + /** * POST /api/worker-registry/preflight-test * Run an HTTP (Puppeteer) preflight test and return results diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 8011f5e7..475ac9b8 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -67,6 +67,9 @@ import { setCrawlRotator } from '../platforms/dutchie'; import { runCurlPreflight, CurlPreflightResult } from '../services/curl-preflight'; import { runPuppeteerPreflightWithRetry, PuppeteerPreflightResult } from '../services/puppeteer-preflight'; +// Geo-targeted proxy support +import { buildEvomiProxyUrl, getEvomiConfig } from '../services/crawl-rotator'; + // Task handlers by role // Platform-based handlers: {task}-{platform}.ts convention import { handleProductRefresh } from './handlers/product-refresh'; @@ -316,6 +319,18 @@ export class TaskWorker { /** Map of task ID -> step info for concurrent tasks */ private taskSteps: Map = new Map(); + // ========================================================================== + // GEO SESSION TRACKING + // ========================================================================== + // Workers are geo-locked to a specific state for their session. + // Session = 60 minutes OR 7 stores, whichever comes first. + // Worker only claims tasks matching their assigned state. + // ========================================================================== + private geoState: string | null = null; + private geoCity: string | null = null; + private geoProxyUrl: string | null = null; + private geoSessionStartedAt: Date | null = null; + constructor(role: TaskRole | null = null, workerId?: string) { this.pool = getPool(); this.role = role; @@ -717,6 +732,103 @@ export class TaskWorker { this.isRetryingPreflight = false; } + /** + * Ensure worker has a valid geo session before claiming tasks. + * + * GEO SESSION FLOW: + * 1. Check if worker has a valid session (state assigned, not expired, tasks remaining) + * 2. If valid, return true - worker can claim tasks for this state + * 3. If not valid (expired, exhausted, or never assigned): + * a. Call assign_worker_geo() to get new state based on demand + * b. Build Evomi proxy URL for assigned state + * c. Configure proxy in crawl rotator + * d. Return true if assignment successful + * 4. Return false if no tasks available in any state + * + * Session Rules (per migration 108): + * - Session lasts 60 minutes OR 7 store visits, whichever comes first + * - Worker can ONLY claim tasks matching their assigned state + * - State assignment prioritizes: most pending tasks, fewest workers + */ + private async ensureGeoSession(): Promise { + try { + // Check current geo session status + const sessionResult = await this.pool.query(` + SELECT * + FROM check_worker_geo_session($1) + `, [this.workerId]); + + const session = sessionResult.rows[0]; + + // If session is valid and has tasks remaining, we're good + if (session?.session_valid && session.session_tasks_remaining > 0) { + // Update local tracking + if (this.geoState !== session.current_state) { + console.log(`[TaskWorker] ${this.friendlyName} has valid geo session: ${session.current_state} (${session.session_tasks_remaining} tasks remaining, ${session.session_minutes_remaining} min left)`); + this.geoState = session.current_state; + this.geoCity = session.current_city; + } + return true; + } + + // Session invalid, expired, or exhausted - need new assignment + const reason = !session?.current_state ? 'no state assigned' : + !session?.session_valid ? 'session expired' : + 'session exhausted'; + console.log(`[TaskWorker] ${this.friendlyName} needs new geo session (${reason})`); + + // Request new geo assignment based on demand + const assignResult = await this.pool.query(` + SELECT assign_worker_geo($1) as assigned_state + `, [this.workerId]); + + const assignedState = assignResult.rows[0]?.assigned_state; + + if (!assignedState) { + // No pending tasks in any state + console.log(`[TaskWorker] ${this.friendlyName} no pending tasks available in any state`); + this.geoState = null; + this.geoCity = null; + this.geoProxyUrl = null; + return false; + } + + console.log(`[TaskWorker] ${this.friendlyName} assigned to state: ${assignedState}`); + + // Build Evomi proxy URL for this state + const evomiConfig = getEvomiConfig(); + if (evomiConfig.enabled) { + const proxyResult = buildEvomiProxyUrl(assignedState, this.workerId); + if (proxyResult) { + this.geoProxyUrl = proxyResult.url; + console.log(`[TaskWorker] ${this.friendlyName} Evomi proxy configured for ${proxyResult.geo}`); + + // Update worker_registry with proxy info + await this.pool.query(` + UPDATE worker_registry + SET proxy_geo = $2 + WHERE worker_id = $1 + `, [this.workerId, proxyResult.geo]); + } + } else { + console.log(`[TaskWorker] ${this.friendlyName} Evomi not configured, using default proxy`); + } + + // Update local tracking + this.geoState = assignedState; + this.geoCity = null; // City assigned later if available + this.geoSessionStartedAt = new Date(); + + console.log(`[TaskWorker] ${this.friendlyName} geo session ready: ${assignedState} (max 7 tasks, 60 min)`); + return true; + } catch (err: any) { + console.error(`[TaskWorker] ${this.friendlyName} geo session error: ${err.message}`); + // On error, allow worker to continue (don't block indefinitely) + // The claim_task function will still filter by geo if set + return this.geoState !== null; + } + } + /** * Get the effective max concurrent tasks based on working hours. * Uses the worker's timezone (from preflight IP geolocation) to determine @@ -896,6 +1008,9 @@ export class TaskWorker { current_step_detail: this.currentStepDetail, current_step_started_at: this.currentStepStartedAt?.toISOString() || null, task_steps: taskSteps, // Per-task step info for concurrent workers + // Geo session tracking for dashboard + geo_state: this.geoState, + geo_city: this.geoCity, resources: { memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024), memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024), @@ -1102,6 +1217,20 @@ export class TaskWorker { return; // Return to main loop, will re-check on next iteration } + // ================================================================= + // GEO SESSION GATE - Ensure worker has valid geo assignment + // Worker must have a state assignment to claim tasks. + // Session = 60 min OR 7 stores, whichever comes first. + // If no valid session, assign one based on demand. + // ================================================================= + const geoValid = await this.ensureGeoSession(); + if (!geoValid) { + // No tasks available in any state, or assignment failed + console.log(`[TaskWorker] ${this.friendlyName} no geo session available, waiting...`); + await this.sleep(30000); + return; + } + // ================================================================= // WORKING HOURS GATE - Simulate natural traffic patterns // Workers scale their concurrent task limit based on the current @@ -1122,6 +1251,7 @@ export class TaskWorker { } // Pass preflight capabilities to only claim compatible tasks + // claim_task SQL function will filter by worker's current_state const task = await taskService.claimTask( this.role, this.workerId, diff --git a/cannaiq/src/pages/WorkersDashboard.tsx b/cannaiq/src/pages/WorkersDashboard.tsx index d674485c..9d100147 100644 --- a/cannaiq/src/pages/WorkersDashboard.tsx +++ b/cannaiq/src/pages/WorkersDashboard.tsx @@ -75,6 +75,13 @@ interface Worker { productsReturned?: number; }; is_qualified?: boolean; + // Geo session fields + current_state?: string; + current_city?: string; + geo_session_started_at?: string; + session_task_count?: number; + session_max_tasks?: number; + proxy_geo?: string; metadata: { cpu?: number; memory?: number; @@ -327,77 +334,67 @@ function ResourceBadge({ worker }: { worker: Worker }) { ); } -// Preflight Summary - shows IP, fingerprint, antidetect status, and qualification +// Preflight Summary - shows qualification status with geo region function PreflightSummary({ worker }: { worker: Worker }) { const httpStatus = worker.preflight_http_status || 'pending'; const isQualified = worker.is_qualified || httpStatus === 'passed'; const httpIp = worker.http_ip; - const fingerprint = worker.fingerprint_data; const httpError = worker.preflight_http_error; - const httpMs = worker.preflight_http_ms; + const geoState = worker.current_state; + const geoCity = worker.current_city; - // Build detailed tooltip + // Build tooltip const tooltipLines: string[] = []; tooltipLines.push(`HTTP Preflight: ${httpStatus.toUpperCase()}`); if (httpIp) tooltipLines.push(`IP: ${httpIp}`); - if (httpMs) tooltipLines.push(`Response: ${httpMs}ms`); - if (fingerprint?.browser) tooltipLines.push(`Browser: ${fingerprint.browser}`); - if (fingerprint?.timezone) tooltipLines.push(`Timezone: ${fingerprint.timezone}`); - if (fingerprint?.productsReturned !== undefined) tooltipLines.push(`Products returned: ${fingerprint.productsReturned}`); - if (fingerprint?.botDetection) { - const bd = fingerprint.botDetection; - tooltipLines.push(`Bot detection - webdriver: ${bd.webdriver ? 'detected' : 'hidden'}`); + if (geoState) tooltipLines.push(`Geo: ${geoCity ? `${geoCity}, ` : ''}${geoState}`); + if (worker.session_task_count !== undefined) { + tooltipLines.push(`Session: ${worker.session_task_count}/${worker.session_max_tasks || 7} tasks`); } if (httpError) tooltipLines.push(`Error: ${httpError}`); - // Qualification styling - compact with icon badge and geo + // Qualified - show icon + geo if (isQualified) { return ( -
- {/* Qualified icon + IP on same line */} -
-
- +
+
+ +
+ {geoState ? ( +
+ + + {geoCity ? `${geoCity}, ` : ''}{geoState} +
- {httpIp && ( - {httpIp} - )} -
- {/* Antidetect status with response time */} -
- Antidetect - OK - {httpMs && ({httpMs}ms)} -
+ ) : ( + No geo assigned + )}
); } - // Not qualified - show failure state + // Failed if (httpStatus === 'failed') { return ( -
-
- - NOT QUALIFIED -
-
- {httpError || 'Preflight failed'} +
+
+
+ + {httpError || 'Failed'} +
); } - // Pending state + // Pending return ( -
-
- - QUALIFYING... -
-
- Running preflight check +
+
+
+ Qualifying...
); }