From 072388ffb26b08f835b11ce12111aa5b048eca72 Mon Sep 17 00:00:00 2001 From: Kelly Date: Sat, 13 Dec 2025 20:17:52 -0700 Subject: [PATCH] fix(identity): Use unique session IDs for proxy rotation + add task pool gate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix buildEvomiProxyUrl to use passed session ID from identity pool instead of truncating to worker+region (causing same IP for all workers) - Add task pool gate feature with database-backed state - Add /tasks/pool/toggle endpoint and UI toggle button - Fix isTaskPoolPaused() missing await in claimTask 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/migrations/111_system_settings.sql | 35 +++++++ backend/src/routes/tasks.ts | 104 ++++++++++++++++----- backend/src/services/crawl-rotator.ts | 11 ++- backend/src/tasks/task-pool-state.ts | 102 +++++++++++++++----- backend/src/tasks/task-service.ts | 2 +- cannaiq/src/lib/api.ts | 13 ++- cannaiq/src/pages/TasksDashboard.tsx | 56 ++++++++--- 7 files changed, 254 insertions(+), 69 deletions(-) create mode 100644 backend/migrations/111_system_settings.sql diff --git a/backend/migrations/111_system_settings.sql b/backend/migrations/111_system_settings.sql new file mode 100644 index 00000000..69f588af --- /dev/null +++ b/backend/migrations/111_system_settings.sql @@ -0,0 +1,35 @@ +-- Migration: 111_system_settings.sql +-- Description: System settings table for runtime configuration +-- Created: 2024-12-14 + +CREATE TABLE IF NOT EXISTS system_settings ( + key VARCHAR(100) PRIMARY KEY, + value TEXT NOT NULL, + description TEXT, + updated_at TIMESTAMPTZ DEFAULT NOW(), + updated_by INTEGER REFERENCES users(id) +); + +-- Task pool gate - controls whether workers can claim tasks +INSERT INTO system_settings (key, value, description) VALUES + ('task_pool_open', 'true', 'When false, workers cannot claim new tasks from the pool') +ON CONFLICT (key) DO NOTHING; + +-- Updated at trigger +CREATE OR REPLACE FUNCTION update_system_settings_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS system_settings_updated_at ON system_settings; +CREATE TRIGGER system_settings_updated_at + BEFORE UPDATE ON system_settings + FOR EACH ROW + EXECUTE FUNCTION update_system_settings_updated_at(); + +COMMENT ON TABLE system_settings IS 'Runtime configuration settings'; +COMMENT ON COLUMN system_settings.key IS 'Setting name (e.g., task_pool_open)'; +COMMENT ON COLUMN system_settings.value IS 'Setting value as string'; diff --git a/backend/src/routes/tasks.ts b/backend/src/routes/tasks.ts index 7e401ceb..2e4c87bf 100644 --- a/backend/src/routes/tasks.ts +++ b/backend/src/routes/tasks.ts @@ -49,8 +49,11 @@ function getRequestMetadata(req: Request): Record { import { pool } from '../db/pool'; import { isTaskPoolPaused, + isTaskPoolOpen, pauseTaskPool, resumeTaskPool, + closeTaskPool, + openTaskPool, getTaskPoolStatus, } from '../tasks/task-pool-state'; @@ -1546,40 +1549,93 @@ router.get('/states', async (_req: Request, res: Response) => { /** * GET /api/tasks/pool/status - * Check if task pool is paused + * Check if task pool is open or closed */ router.get('/pool/status', async (_req: Request, res: Response) => { - const status = getTaskPoolStatus(); - res.json({ - success: true, - ...status, - }); + try { + const status = await getTaskPoolStatus(); + res.json({ + success: true, + ...status, + }); + } catch (err: any) { + res.status(500).json({ success: false, error: err.message }); + } }); /** - * POST /api/tasks/pool/pause - * Pause the task pool - workers won't pick up new tasks + * POST /api/tasks/pool/close + * Close the task pool - workers won't pick up new tasks */ +router.post('/pool/close', async (_req: Request, res: Response) => { + try { + await closeTaskPool(); + res.json({ + success: true, + open: false, + message: 'Pool is Closed - workers will not pick up new tasks', + }); + } catch (err: any) { + res.status(500).json({ success: false, error: err.message }); + } +}); + +/** + * POST /api/tasks/pool/open + * Open the task pool - workers will pick up tasks again + */ +router.post('/pool/open', async (_req: Request, res: Response) => { + try { + await openTaskPool(); + res.json({ + success: true, + open: true, + message: 'Pool is Open - workers are picking up tasks', + }); + } catch (err: any) { + res.status(500).json({ success: false, error: err.message }); + } +}); + +/** + * POST /api/tasks/pool/toggle + * Toggle the task pool state + */ +router.post('/pool/toggle', async (_req: Request, res: Response) => { + try { + const isOpen = await isTaskPoolOpen(); + if (isOpen) { + await closeTaskPool(); + } else { + await openTaskPool(); + } + const status = await getTaskPoolStatus(); + res.json({ + success: true, + ...status, + }); + } catch (err: any) { + res.status(500).json({ success: false, error: err.message }); + } +}); + +// Legacy endpoints for compatibility router.post('/pool/pause', async (_req: Request, res: Response) => { - pauseTaskPool(); - res.json({ - success: true, - paused: true, - message: 'Task pool paused - workers will not pick up new tasks', - }); + try { + await closeTaskPool(); + res.json({ success: true, paused: true, message: 'Task pool closed' }); + } catch (err: any) { + res.status(500).json({ success: false, error: err.message }); + } }); -/** - * POST /api/tasks/pool/resume - * Resume the task pool - workers will pick up tasks again - */ router.post('/pool/resume', async (_req: Request, res: Response) => { - resumeTaskPool(); - res.json({ - success: true, - paused: false, - message: 'Task pool resumed - workers will pick up new tasks', - }); + try { + await openTaskPool(); + res.json({ success: true, paused: false, message: 'Task pool opened' }); + } catch (err: any) { + res.status(500).json({ success: false, error: err.message }); + } }); export default router; diff --git a/backend/src/services/crawl-rotator.ts b/backend/src/services/crawl-rotator.ts index 91d6a00d..b78c2c68 100644 --- a/backend/src/services/crawl-rotator.ts +++ b/backend/src/services/crawl-rotator.ts @@ -968,7 +968,7 @@ export function getEvomiConfig(): EvomiConfig { */ export function buildEvomiProxyUrl( stateCode: string, - workerId: string, + sessionOrWorkerId: string, city?: string ): { url: string; geo: string; source: 'api' } | null { const config = getEvomiConfig(); @@ -983,8 +983,13 @@ export function buildEvomiProxyUrl( return null; } - // Generate session ID: workerId + region (sticky per worker per state) - const sessionId = `${workerId.replace(/[^a-zA-Z0-9]/g, '').slice(0, 6)}${region.slice(0, 4)}`; + // Use the passed session ID directly (from identity pool) or generate from worker ID + // Identity pool passes unique session IDs like "scrapc-1702...-abc123" + // Regular callers pass worker IDs - we add region suffix for stickiness + const isIdentitySession = sessionOrWorkerId.includes('-') && sessionOrWorkerId.length > 20; + const sessionId = isIdentitySession + ? sessionOrWorkerId.replace(/[^a-zA-Z0-9]/g, '').slice(0, 20) // Use identity session (max 20 chars) + : `${sessionOrWorkerId.replace(/[^a-zA-Z0-9]/g, '').slice(0, 6)}${region.slice(0, 4)}`; // Worker ID + region // Build geo target string let geoParams = `_country-US_region-${region}`; diff --git a/backend/src/tasks/task-pool-state.ts b/backend/src/tasks/task-pool-state.ts index a966da93..4aa611b0 100644 --- a/backend/src/tasks/task-pool-state.ts +++ b/backend/src/tasks/task-pool-state.ts @@ -1,40 +1,94 @@ /** * Task Pool State * - * Shared state for task pool pause/resume functionality. - * This is kept separate to avoid circular dependencies between - * task-service.ts and routes/tasks.ts. + * Database-backed state for task pool open/close functionality. + * Uses system_settings table so all workers across K8s pods see the same state. * - * State is in-memory and resets on server restart. - * By default, the pool is OPEN - workers start claiming tasks immediately. - * Admin can pause via API endpoint if needed. + * Settings key: 'task_pool_open' = 'true' | 'false' * - * Note: Each process (backend, worker) has its own copy of this state. - * The /pool/pause and /pool/resume endpoints only affect the backend process. - * Workers always start with pool open. + * Workers check this before claiming tasks. + * Admin can toggle via API endpoint or UI. */ -let taskPoolPaused = false; +import { pool } from '../db/pool'; -export function isTaskPoolPaused(): boolean { - return taskPoolPaused; +// Cache to avoid hitting DB on every check (5 second TTL) +let cachedState: { open: boolean; checkedAt: number } | null = null; +const CACHE_TTL_MS = 5000; + +/** + * Check if task pool is open (workers can claim tasks) + * Uses cache with 5 second TTL + */ +export async function isTaskPoolOpen(): Promise { + // Return cached value if fresh + if (cachedState && Date.now() - cachedState.checkedAt < CACHE_TTL_MS) { + return cachedState.open; + } + + try { + const result = await pool.query( + "SELECT value FROM system_settings WHERE key = 'task_pool_open'" + ); + const isOpen = result.rows[0]?.value !== 'false'; + cachedState = { open: isOpen, checkedAt: Date.now() }; + return isOpen; + } catch (err) { + // If table doesn't exist or error, default to open + console.warn('[TaskPool] Could not check pool state, defaulting to open:', err); + return true; + } } -export function pauseTaskPool(): void { - taskPoolPaused = true; - console.log('[TaskPool] Task pool PAUSED - workers will not pick up new tasks'); +/** + * Check if task pool is paused (inverse of isOpen for compatibility) + */ +export async function isTaskPoolPaused(): Promise { + return !(await isTaskPoolOpen()); } -export function resumeTaskPool(): void { - taskPoolPaused = false; - console.log('[TaskPool] Task pool RESUMED - workers can pick up tasks'); +/** + * Close the task pool - workers cannot claim new tasks + */ +export async function closeTaskPool(): Promise { + await pool.query(` + INSERT INTO system_settings (key, value, description) + VALUES ('task_pool_open', 'false', 'When false, workers cannot claim new tasks from the pool') + ON CONFLICT (key) DO UPDATE SET value = 'false' + `); + cachedState = { open: false, checkedAt: Date.now() }; + console.log('[TaskPool] Task pool CLOSED - workers will not pick up new tasks'); } -export function getTaskPoolStatus(): { paused: boolean; message: string } { +/** + * Open the task pool - workers can claim tasks + */ +export async function openTaskPool(): Promise { + await pool.query(` + INSERT INTO system_settings (key, value, description) + VALUES ('task_pool_open', 'true', 'When false, workers cannot claim new tasks from the pool') + ON CONFLICT (key) DO UPDATE SET value = 'true' + `); + cachedState = { open: true, checkedAt: Date.now() }; + console.log('[TaskPool] Task pool OPEN - workers can pick up tasks'); +} + +/** + * Legacy aliases for compatibility + */ +export const pauseTaskPool = closeTaskPool; +export const resumeTaskPool = openTaskPool; + +/** + * Get task pool status + */ +export async function getTaskPoolStatus(): Promise<{ open: boolean; message: string }> { + const isOpen = await isTaskPoolOpen(); return { - paused: taskPoolPaused, - message: taskPoolPaused - ? 'Task pool is paused - workers will not pick up new tasks' - : 'Task pool is open - workers are picking up tasks', - }; + open: isOpen, + paused: !isOpen, // Legacy compatibility + message: isOpen + ? 'Pool is Open - workers are picking up tasks' + : 'Pool is Closed - workers will not pick up new tasks', + } as any; } diff --git a/backend/src/tasks/task-service.ts b/backend/src/tasks/task-service.ts index 1d3c0e91..f5b92f7c 100644 --- a/backend/src/tasks/task-service.ts +++ b/backend/src/tasks/task-service.ts @@ -187,7 +187,7 @@ class TaskService { httpPassed: boolean = false ): Promise { // Check if task pool is paused - don't claim any tasks - if (isTaskPoolPaused()) { + if (await isTaskPoolPaused()) { return null; } diff --git a/cannaiq/src/lib/api.ts b/cannaiq/src/lib/api.ts index 2823be16..27ce6ffc 100755 --- a/cannaiq/src/lib/api.ts +++ b/cannaiq/src/lib/api.ts @@ -2991,25 +2991,32 @@ class ApiClient { // Task Pool Control async getTaskPoolStatus() { - return this.request<{ success: boolean; paused: boolean; message: string }>( + return this.request<{ success: boolean; open: boolean; paused: boolean; message: string }>( '/api/tasks/pool/status' ); } async pauseTaskPool() { - return this.request<{ success: boolean; paused: boolean; message: string }>( + return this.request<{ success: boolean; open: boolean; paused: boolean; message: string }>( '/api/tasks/pool/pause', { method: 'POST' } ); } async resumeTaskPool() { - return this.request<{ success: boolean; paused: boolean; message: string }>( + return this.request<{ success: boolean; open: boolean; paused: boolean; message: string }>( '/api/tasks/pool/resume', { method: 'POST' } ); } + async toggleTaskPool() { + return this.request<{ success: boolean; open: boolean; paused: boolean; message: string }>( + '/api/tasks/pool/toggle', + { method: 'POST' } + ); + } + // K8s Worker Control async getK8sWorkers() { return this.request<{ diff --git a/cannaiq/src/pages/TasksDashboard.tsx b/cannaiq/src/pages/TasksDashboard.tsx index 810d02bd..16be2815 100644 --- a/cannaiq/src/pages/TasksDashboard.tsx +++ b/cannaiq/src/pages/TasksDashboard.tsx @@ -899,7 +899,8 @@ export default function TasksDashboard() { const [workers, setWorkers] = useState([]); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); - const [poolPaused, setPoolPaused] = useState(false); + const [poolOpen, setPoolOpen] = useState(true); + const [poolToggling, setPoolToggling] = useState(false); const [showCreateModal, setShowCreateModal] = useState(false); // Schedules state @@ -938,7 +939,7 @@ export default function TasksDashboard() { setTasks(tasksRes.tasks || []); setCounts(countsRes); setCapacity(capacityRes.metrics || []); - setPoolPaused(poolStatus.paused); + setPoolOpen(poolStatus.open ?? !poolStatus.paused); setSchedules(schedulesRes.schedules || []); setWorkers(workersRes.workers || []); setError(null); @@ -949,6 +950,19 @@ export default function TasksDashboard() { } }; + const handleTogglePool = async () => { + setPoolToggling(true); + try { + const result = await api.toggleTaskPool(); + setPoolOpen(result.open); + } catch (err: any) { + console.error('Toggle pool error:', err); + alert(err.response?.data?.error || 'Failed to toggle pool'); + } finally { + setPoolToggling(false); + } + }; + const handleDeleteSchedule = async (scheduleId: number) => { if (!confirm('Delete this schedule?')) return; try { @@ -1122,6 +1136,25 @@ export default function TasksDashboard() {
+ {/* Pool Toggle */} + {/* Create Task Button */} - {/* Pool status indicator */} - {poolPaused && ( - - - Pool Paused - - )} Auto-refreshes every 15s
@@ -1176,7 +1202,7 @@ export default function TasksDashboard() { >
- {getStatusIcon(status, poolPaused)} + {getStatusIcon(status, !poolOpen)} {status}
@@ -1421,9 +1447,9 @@ export default function TasksDashboard() { {schedule.state_code} ) : ( - + - ALL + All )} @@ -1431,9 +1457,11 @@ export default function TasksDashboard() { {schedule.platform || 'dutchie'} @@ -1628,7 +1656,7 @@ export default function TasksDashboard() { STATUS_COLORS[task.status] }`} > - {getStatusIcon(task.status, poolPaused)} + {getStatusIcon(task.status, !poolOpen)} {task.status}