feat: Worker geo sessions for state-based task assignment

Workers are now geo-locked to a specific state for their session:
- Session = 60 minutes OR 7 store visits (whichever comes first)
- Workers ONLY claim tasks matching their assigned state
- State assignment prioritizes: most pending tasks, fewest workers

Changes:
- Migration 108: geo session columns, claim_task with geo filter,
  assign_worker_geo(), check_worker_geo_session(), worker_state_capacity view
- task-worker.ts: ensureGeoSession() method before task claiming
- worker-registry.ts: /state-capacity and /geo-sessions API endpoints
- WorkersDashboard: Show qualified icon + geo state in Preflight column

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-13 16:00:09 -07:00
parent c215d11a84
commit 88e590d026
4 changed files with 529 additions and 44 deletions

View File

@@ -161,6 +161,9 @@ router.post('/heartbeat', async (req: Request, res: Response) => {
current_step_detail,
current_step_started_at,
task_steps,
// Geo session fields
geo_state,
geo_city,
} = req.body;
if (!worker_id) {
@@ -178,8 +181,13 @@ router.post('/heartbeat', async (req: Request, res: Response) => {
if (current_step_detail) metadata.current_step_detail = current_step_detail;
if (current_step_started_at) metadata.current_step_started_at = current_step_started_at;
if (task_steps) metadata.task_steps = task_steps;
// Geo session (also store in metadata for dashboard visibility)
if (geo_state) metadata.geo_state = geo_state;
if (geo_city) metadata.geo_city = geo_city;
// Store resources in metadata jsonb column
// Note: current_state and current_city columns are managed by assign_worker_geo() SQL function
// The metadata.geo_state/geo_city are for dashboard display backup
const { rows } = await pool.query(`
UPDATE worker_registry
SET last_heartbeat_at = NOW(),
@@ -188,7 +196,7 @@ router.post('/heartbeat', async (req: Request, res: Response) => {
metadata = COALESCE(metadata, '{}'::jsonb) || COALESCE($4::jsonb, '{}'::jsonb),
updated_at = NOW()
WHERE worker_id = $3
RETURNING id, friendly_name, status
RETURNING id, friendly_name, status, current_state, current_city
`, [current_task_id || null, status, worker_id, Object.keys(metadata).length > 0 ? JSON.stringify(metadata) : null]);
if (rows.length === 0) {
@@ -381,6 +389,13 @@ router.get('/workers', async (req: Request, res: Response) => {
WHEN preflight_http_status = 'passed' THEN true
ELSE false
END as is_qualified,
-- Geo session fields
current_state,
current_city,
geo_session_started_at,
session_task_count,
session_max_tasks,
proxy_geo,
-- Full metadata for resources
metadata,
EXTRACT(EPOCH FROM (NOW() - last_heartbeat_at)) as seconds_since_heartbeat,
@@ -888,6 +903,118 @@ router.get('/pods', async (_req: Request, res: Response) => {
// PREFLIGHT SMOKE TEST
// ============================================================
// ============================================================
// GEO SESSION & STATE CAPACITY
// ============================================================
/**
* GET /api/worker-registry/state-capacity
* Get worker capacity (thinness) by state
* Shows pending tasks vs worker coverage per state
*/
router.get('/state-capacity', async (_req: Request, res: Response) => {
try {
// Check if the view exists first
const viewExists = await pool.query(`
SELECT EXISTS (
SELECT FROM pg_views WHERE viewname = 'worker_state_capacity'
) as exists
`);
if (!viewExists.rows[0].exists) {
// View doesn't exist yet - return empty array
return res.json({
success: true,
states: [],
summary: {
total_pending: 0,
total_workers: 0,
states_with_no_coverage: 0,
states_thin: 0
}
});
}
const { rows } = await pool.query(`
SELECT * FROM worker_state_capacity
ORDER BY pending_tasks DESC NULLS LAST
`);
// Calculate summary
const summary = rows.reduce((acc, row) => {
acc.total_pending += parseInt(row.pending_tasks) || 0;
acc.total_workers += parseInt(row.workers_on_state) || 0;
if (row.status === 'no_coverage') acc.states_with_no_coverage++;
if (row.status === 'thin') acc.states_thin++;
return acc;
}, {
total_pending: 0,
total_workers: 0,
states_with_no_coverage: 0,
states_thin: 0
});
res.json({
success: true,
states: rows,
summary
});
} catch (error: any) {
console.error('[WorkerRegistry] State capacity error:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/worker-registry/geo-sessions
* Get all active geo sessions
*/
router.get('/geo-sessions', async (_req: Request, res: Response) => {
try {
const { rows } = await pool.query(`
SELECT
worker_id,
friendly_name,
current_state,
current_city,
geo_session_started_at,
session_task_count,
session_max_tasks,
proxy_geo,
GREATEST(0, session_max_tasks - session_task_count) as tasks_remaining,
EXTRACT(EPOCH FROM (geo_session_started_at + INTERVAL '60 minutes' - NOW())) / 60 as minutes_remaining,
status
FROM worker_registry
WHERE current_state IS NOT NULL
AND geo_session_started_at > NOW() - INTERVAL '60 minutes'
AND status IN ('active', 'idle')
ORDER BY current_state, session_task_count DESC
`);
// Group by state
const byState: Record<string, typeof rows> = {};
for (const row of rows) {
const state = row.current_state;
if (!byState[state]) byState[state] = [];
byState[state].push(row);
}
res.json({
success: true,
sessions: rows,
by_state: byState,
total_active_sessions: rows.length
});
} catch (error: any) {
console.error('[WorkerRegistry] Geo sessions error:', error);
res.status(500).json({ success: false, error: error.message });
}
});
// ============================================================
// PREFLIGHT SMOKE TEST
// ============================================================
/**
* POST /api/worker-registry/preflight-test
* Run an HTTP (Puppeteer) preflight test and return results

View File

@@ -67,6 +67,9 @@ import { setCrawlRotator } from '../platforms/dutchie';
import { runCurlPreflight, CurlPreflightResult } from '../services/curl-preflight';
import { runPuppeteerPreflightWithRetry, PuppeteerPreflightResult } from '../services/puppeteer-preflight';
// Geo-targeted proxy support
import { buildEvomiProxyUrl, getEvomiConfig } from '../services/crawl-rotator';
// Task handlers by role
// Platform-based handlers: {task}-{platform}.ts convention
import { handleProductRefresh } from './handlers/product-refresh';
@@ -316,6 +319,18 @@ export class TaskWorker {
/** Map of task ID -> step info for concurrent tasks */
private taskSteps: Map<number, { step: string; detail: string | null; startedAt: Date }> = new Map();
// ==========================================================================
// GEO SESSION TRACKING
// ==========================================================================
// Workers are geo-locked to a specific state for their session.
// Session = 60 minutes OR 7 stores, whichever comes first.
// Worker only claims tasks matching their assigned state.
// ==========================================================================
private geoState: string | null = null;
private geoCity: string | null = null;
private geoProxyUrl: string | null = null;
private geoSessionStartedAt: Date | null = null;
constructor(role: TaskRole | null = null, workerId?: string) {
this.pool = getPool();
this.role = role;
@@ -717,6 +732,103 @@ export class TaskWorker {
this.isRetryingPreflight = false;
}
/**
* Ensure worker has a valid geo session before claiming tasks.
*
* GEO SESSION FLOW:
* 1. Check if worker has a valid session (state assigned, not expired, tasks remaining)
* 2. If valid, return true - worker can claim tasks for this state
* 3. If not valid (expired, exhausted, or never assigned):
* a. Call assign_worker_geo() to get new state based on demand
* b. Build Evomi proxy URL for assigned state
* c. Configure proxy in crawl rotator
* d. Return true if assignment successful
* 4. Return false if no tasks available in any state
*
* Session Rules (per migration 108):
* - Session lasts 60 minutes OR 7 store visits, whichever comes first
* - Worker can ONLY claim tasks matching their assigned state
* - State assignment prioritizes: most pending tasks, fewest workers
*/
private async ensureGeoSession(): Promise<boolean> {
try {
// Check current geo session status
const sessionResult = await this.pool.query(`
SELECT *
FROM check_worker_geo_session($1)
`, [this.workerId]);
const session = sessionResult.rows[0];
// If session is valid and has tasks remaining, we're good
if (session?.session_valid && session.session_tasks_remaining > 0) {
// Update local tracking
if (this.geoState !== session.current_state) {
console.log(`[TaskWorker] ${this.friendlyName} has valid geo session: ${session.current_state} (${session.session_tasks_remaining} tasks remaining, ${session.session_minutes_remaining} min left)`);
this.geoState = session.current_state;
this.geoCity = session.current_city;
}
return true;
}
// Session invalid, expired, or exhausted - need new assignment
const reason = !session?.current_state ? 'no state assigned' :
!session?.session_valid ? 'session expired' :
'session exhausted';
console.log(`[TaskWorker] ${this.friendlyName} needs new geo session (${reason})`);
// Request new geo assignment based on demand
const assignResult = await this.pool.query(`
SELECT assign_worker_geo($1) as assigned_state
`, [this.workerId]);
const assignedState = assignResult.rows[0]?.assigned_state;
if (!assignedState) {
// No pending tasks in any state
console.log(`[TaskWorker] ${this.friendlyName} no pending tasks available in any state`);
this.geoState = null;
this.geoCity = null;
this.geoProxyUrl = null;
return false;
}
console.log(`[TaskWorker] ${this.friendlyName} assigned to state: ${assignedState}`);
// Build Evomi proxy URL for this state
const evomiConfig = getEvomiConfig();
if (evomiConfig.enabled) {
const proxyResult = buildEvomiProxyUrl(assignedState, this.workerId);
if (proxyResult) {
this.geoProxyUrl = proxyResult.url;
console.log(`[TaskWorker] ${this.friendlyName} Evomi proxy configured for ${proxyResult.geo}`);
// Update worker_registry with proxy info
await this.pool.query(`
UPDATE worker_registry
SET proxy_geo = $2
WHERE worker_id = $1
`, [this.workerId, proxyResult.geo]);
}
} else {
console.log(`[TaskWorker] ${this.friendlyName} Evomi not configured, using default proxy`);
}
// Update local tracking
this.geoState = assignedState;
this.geoCity = null; // City assigned later if available
this.geoSessionStartedAt = new Date();
console.log(`[TaskWorker] ${this.friendlyName} geo session ready: ${assignedState} (max 7 tasks, 60 min)`);
return true;
} catch (err: any) {
console.error(`[TaskWorker] ${this.friendlyName} geo session error: ${err.message}`);
// On error, allow worker to continue (don't block indefinitely)
// The claim_task function will still filter by geo if set
return this.geoState !== null;
}
}
/**
* Get the effective max concurrent tasks based on working hours.
* Uses the worker's timezone (from preflight IP geolocation) to determine
@@ -896,6 +1008,9 @@ export class TaskWorker {
current_step_detail: this.currentStepDetail,
current_step_started_at: this.currentStepStartedAt?.toISOString() || null,
task_steps: taskSteps, // Per-task step info for concurrent workers
// Geo session tracking for dashboard
geo_state: this.geoState,
geo_city: this.geoCity,
resources: {
memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024),
memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024),
@@ -1102,6 +1217,20 @@ export class TaskWorker {
return; // Return to main loop, will re-check on next iteration
}
// =================================================================
// GEO SESSION GATE - Ensure worker has valid geo assignment
// Worker must have a state assignment to claim tasks.
// Session = 60 min OR 7 stores, whichever comes first.
// If no valid session, assign one based on demand.
// =================================================================
const geoValid = await this.ensureGeoSession();
if (!geoValid) {
// No tasks available in any state, or assignment failed
console.log(`[TaskWorker] ${this.friendlyName} no geo session available, waiting...`);
await this.sleep(30000);
return;
}
// =================================================================
// WORKING HOURS GATE - Simulate natural traffic patterns
// Workers scale their concurrent task limit based on the current
@@ -1122,6 +1251,7 @@ export class TaskWorker {
}
// Pass preflight capabilities to only claim compatible tasks
// claim_task SQL function will filter by worker's current_state
const task = await taskService.claimTask(
this.role,
this.workerId,