fix(identity): Use unique session IDs for proxy rotation + add task pool gate

- Fix buildEvomiProxyUrl to use passed session ID from identity pool
  instead of truncating to worker+region (causing same IP for all workers)
- Add task pool gate feature with database-backed state
- Add /tasks/pool/toggle endpoint and UI toggle button
- Fix isTaskPoolPaused() missing await in claimTask

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-13 20:17:52 -07:00
parent b456fe5097
commit 072388ffb2
7 changed files with 254 additions and 69 deletions

View File

@@ -49,8 +49,11 @@ function getRequestMetadata(req: Request): Record<string, unknown> {
import { pool } from '../db/pool';
import {
isTaskPoolPaused,
isTaskPoolOpen,
pauseTaskPool,
resumeTaskPool,
closeTaskPool,
openTaskPool,
getTaskPoolStatus,
} from '../tasks/task-pool-state';
@@ -1546,40 +1549,93 @@ router.get('/states', async (_req: Request, res: Response) => {
/**
* GET /api/tasks/pool/status
* Check if task pool is paused
* Check if task pool is open or closed
*/
router.get('/pool/status', async (_req: Request, res: Response) => {
const status = getTaskPoolStatus();
res.json({
success: true,
...status,
});
try {
const status = await getTaskPoolStatus();
res.json({
success: true,
...status,
});
} catch (err: any) {
res.status(500).json({ success: false, error: err.message });
}
});
/**
* POST /api/tasks/pool/pause
* Pause the task pool - workers won't pick up new tasks
* POST /api/tasks/pool/close
* Close the task pool - workers won't pick up new tasks
*/
router.post('/pool/close', async (_req: Request, res: Response) => {
try {
await closeTaskPool();
res.json({
success: true,
open: false,
message: 'Pool is Closed - workers will not pick up new tasks',
});
} catch (err: any) {
res.status(500).json({ success: false, error: err.message });
}
});
/**
* POST /api/tasks/pool/open
* Open the task pool - workers will pick up tasks again
*/
router.post('/pool/open', async (_req: Request, res: Response) => {
try {
await openTaskPool();
res.json({
success: true,
open: true,
message: 'Pool is Open - workers are picking up tasks',
});
} catch (err: any) {
res.status(500).json({ success: false, error: err.message });
}
});
/**
* POST /api/tasks/pool/toggle
* Toggle the task pool state
*/
router.post('/pool/toggle', async (_req: Request, res: Response) => {
try {
const isOpen = await isTaskPoolOpen();
if (isOpen) {
await closeTaskPool();
} else {
await openTaskPool();
}
const status = await getTaskPoolStatus();
res.json({
success: true,
...status,
});
} catch (err: any) {
res.status(500).json({ success: false, error: err.message });
}
});
// Legacy endpoints for compatibility
router.post('/pool/pause', async (_req: Request, res: Response) => {
pauseTaskPool();
res.json({
success: true,
paused: true,
message: 'Task pool paused - workers will not pick up new tasks',
});
try {
await closeTaskPool();
res.json({ success: true, paused: true, message: 'Task pool closed' });
} catch (err: any) {
res.status(500).json({ success: false, error: err.message });
}
});
/**
* POST /api/tasks/pool/resume
* Resume the task pool - workers will pick up tasks again
*/
router.post('/pool/resume', async (_req: Request, res: Response) => {
resumeTaskPool();
res.json({
success: true,
paused: false,
message: 'Task pool resumed - workers will pick up new tasks',
});
try {
await openTaskPool();
res.json({ success: true, paused: false, message: 'Task pool opened' });
} catch (err: any) {
res.status(500).json({ success: false, error: err.message });
}
});
export default router;

View File

@@ -968,7 +968,7 @@ export function getEvomiConfig(): EvomiConfig {
*/
export function buildEvomiProxyUrl(
stateCode: string,
workerId: string,
sessionOrWorkerId: string,
city?: string
): { url: string; geo: string; source: 'api' } | null {
const config = getEvomiConfig();
@@ -983,8 +983,13 @@ export function buildEvomiProxyUrl(
return null;
}
// Generate session ID: workerId + region (sticky per worker per state)
const sessionId = `${workerId.replace(/[^a-zA-Z0-9]/g, '').slice(0, 6)}${region.slice(0, 4)}`;
// Use the passed session ID directly (from identity pool) or generate from worker ID
// Identity pool passes unique session IDs like "scrapc-1702...-abc123"
// Regular callers pass worker IDs - we add region suffix for stickiness
const isIdentitySession = sessionOrWorkerId.includes('-') && sessionOrWorkerId.length > 20;
const sessionId = isIdentitySession
? sessionOrWorkerId.replace(/[^a-zA-Z0-9]/g, '').slice(0, 20) // Use identity session (max 20 chars)
: `${sessionOrWorkerId.replace(/[^a-zA-Z0-9]/g, '').slice(0, 6)}${region.slice(0, 4)}`; // Worker ID + region
// Build geo target string
let geoParams = `_country-US_region-${region}`;

View File

@@ -1,40 +1,94 @@
/**
* Task Pool State
*
* Shared state for task pool pause/resume functionality.
* This is kept separate to avoid circular dependencies between
* task-service.ts and routes/tasks.ts.
* Database-backed state for task pool open/close functionality.
* Uses system_settings table so all workers across K8s pods see the same state.
*
* State is in-memory and resets on server restart.
* By default, the pool is OPEN - workers start claiming tasks immediately.
* Admin can pause via API endpoint if needed.
* Settings key: 'task_pool_open' = 'true' | 'false'
*
* Note: Each process (backend, worker) has its own copy of this state.
* The /pool/pause and /pool/resume endpoints only affect the backend process.
* Workers always start with pool open.
* Workers check this before claiming tasks.
* Admin can toggle via API endpoint or UI.
*/
let taskPoolPaused = false;
import { pool } from '../db/pool';
export function isTaskPoolPaused(): boolean {
return taskPoolPaused;
// Cache to avoid hitting DB on every check (5 second TTL)
let cachedState: { open: boolean; checkedAt: number } | null = null;
const CACHE_TTL_MS = 5000;
/**
* Check if task pool is open (workers can claim tasks)
* Uses cache with 5 second TTL
*/
export async function isTaskPoolOpen(): Promise<boolean> {
// Return cached value if fresh
if (cachedState && Date.now() - cachedState.checkedAt < CACHE_TTL_MS) {
return cachedState.open;
}
try {
const result = await pool.query(
"SELECT value FROM system_settings WHERE key = 'task_pool_open'"
);
const isOpen = result.rows[0]?.value !== 'false';
cachedState = { open: isOpen, checkedAt: Date.now() };
return isOpen;
} catch (err) {
// If table doesn't exist or error, default to open
console.warn('[TaskPool] Could not check pool state, defaulting to open:', err);
return true;
}
}
export function pauseTaskPool(): void {
taskPoolPaused = true;
console.log('[TaskPool] Task pool PAUSED - workers will not pick up new tasks');
/**
* Check if task pool is paused (inverse of isOpen for compatibility)
*/
export async function isTaskPoolPaused(): Promise<boolean> {
return !(await isTaskPoolOpen());
}
export function resumeTaskPool(): void {
taskPoolPaused = false;
console.log('[TaskPool] Task pool RESUMED - workers can pick up tasks');
/**
* Close the task pool - workers cannot claim new tasks
*/
export async function closeTaskPool(): Promise<void> {
await pool.query(`
INSERT INTO system_settings (key, value, description)
VALUES ('task_pool_open', 'false', 'When false, workers cannot claim new tasks from the pool')
ON CONFLICT (key) DO UPDATE SET value = 'false'
`);
cachedState = { open: false, checkedAt: Date.now() };
console.log('[TaskPool] Task pool CLOSED - workers will not pick up new tasks');
}
export function getTaskPoolStatus(): { paused: boolean; message: string } {
/**
* Open the task pool - workers can claim tasks
*/
export async function openTaskPool(): Promise<void> {
await pool.query(`
INSERT INTO system_settings (key, value, description)
VALUES ('task_pool_open', 'true', 'When false, workers cannot claim new tasks from the pool')
ON CONFLICT (key) DO UPDATE SET value = 'true'
`);
cachedState = { open: true, checkedAt: Date.now() };
console.log('[TaskPool] Task pool OPEN - workers can pick up tasks');
}
/**
* Legacy aliases for compatibility
*/
export const pauseTaskPool = closeTaskPool;
export const resumeTaskPool = openTaskPool;
/**
* Get task pool status
*/
export async function getTaskPoolStatus(): Promise<{ open: boolean; message: string }> {
const isOpen = await isTaskPoolOpen();
return {
paused: taskPoolPaused,
message: taskPoolPaused
? 'Task pool is paused - workers will not pick up new tasks'
: 'Task pool is open - workers are picking up tasks',
};
open: isOpen,
paused: !isOpen, // Legacy compatibility
message: isOpen
? 'Pool is Open - workers are picking up tasks'
: 'Pool is Closed - workers will not pick up new tasks',
} as any;
}

View File

@@ -187,7 +187,7 @@ class TaskService {
httpPassed: boolean = false
): Promise<WorkerTask | null> {
// Check if task pool is paused - don't claim any tasks
if (isTaskPoolPaused()) {
if (await isTaskPoolPaused()) {
return null;
}