feat(workers): Implement geo-based task pools

Workers now follow the correct flow:
1. Check what pools have pending tasks
2. Claim a pool (e.g., Phoenix AZ)
3. Get Evomi proxy for that geo
4. Run preflight with geo proxy
5. Pull tasks from pool (up to 6 stores)
6. Execute tasks
7. Release pool when exhausted (6 stores visited)

Task pools group dispensaries by metro area (100mi radius):
- Phoenix AZ, Tucson AZ
- Los Angeles CA, San Francisco CA, San Diego CA, Sacramento CA
- Denver CO, Chicago IL, Boston MA, Detroit MI
- Las Vegas NV, Reno NV, Newark NJ, New York NY
- Oklahoma City OK, Tulsa OK, Portland OR, Seattle WA

Benefits:
- Workers know geo BEFORE getting proxy (no more "No geo assigned")
- IP diversity within metro area (Phoenix worker can use Tempe IP)
- Simpler worker logic - just match pool geo
- Pre-organized tasks, not grouped at claim time

New files:
- migrations/113_task_pools.sql - schema, seed data, functions
- src/services/task-pool.ts - TypeScript service

Env vars:
- USE_TASK_POOLS=true (new system)
- USE_IDENTITY_POOL=false (disabled)

🤖 Generated with [Claude Code](https://claude.ai/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-14 01:41:52 -07:00
parent eedc027ff6
commit 1861e18396
4 changed files with 858 additions and 1 deletions

View File

@@ -0,0 +1,190 @@
/**
* Task Pool Service
*
* Manages the pool-based worker flow:
* 1. Worker checks what pools have pending tasks
* 2. Worker claims a pool (gets geo info)
* 3. Worker gets proxy for that geo
* 4. Worker runs preflight
* 5. Worker pulls tasks from pool (up to 6 stores)
* 6. Worker executes tasks
* 7. Worker releases pool when exhausted (6 stores visited)
*/
import { pool } from '../db/pool';
import { buildEvomiProxyUrl, getEvomiConfig } from './crawl-rotator';
export interface TaskPool {
pool_id: number;
pool_name: string;
display_name: string;
state_code: string;
city: string;
latitude: number;
longitude: number;
timezone: string;
}
export interface PoolWithPendingTasks extends TaskPool {
pending_count: number;
store_count: number;
}
export interface PoolTask {
task_id: number;
dispensary_id: number;
dispensary_name: string;
role: string;
platform: string;
method: string | null;
}
export interface ClaimedPoolResult {
pool: TaskPool;
proxyUrl: string;
}
/**
* Get all pools that have pending tasks
*/
export async function getPoolsWithPendingTasks(): Promise<PoolWithPendingTasks[]> {
const { rows } = await pool.query<PoolWithPendingTasks>(
`SELECT * FROM get_pools_with_pending_tasks()`
);
return rows;
}
/**
* Worker claims a pool
* Returns pool info + proxy URL for that geo
*/
export async function claimPool(
workerId: string,
poolId?: number
): Promise<ClaimedPoolResult | null> {
// Claim pool in database
const { rows } = await pool.query<TaskPool>(
`SELECT * FROM worker_claim_pool($1, $2)`,
[workerId, poolId || null]
);
if (rows.length === 0) {
console.log(`[TaskPool] No pools available for ${workerId}`);
return null;
}
const claimedPool = rows[0];
// Build Evomi proxy URL for this pool's geo
const evomiConfig = getEvomiConfig();
if (!evomiConfig.enabled) {
throw new Error('Evomi proxy not configured');
}
const proxyResult = buildEvomiProxyUrl(
claimedPool.state_code,
workerId,
claimedPool.city.toLowerCase().replace(/\s+/g, '.')
);
if (!proxyResult) {
throw new Error(`Failed to build proxy URL for ${claimedPool.display_name}`);
}
console.log(`[TaskPool] ${workerId} claimed pool: ${claimedPool.display_name}`);
return {
pool: claimedPool,
proxyUrl: proxyResult.url,
};
}
/**
* Pull tasks from worker's assigned pool
* Returns up to 6 stores worth of tasks
*/
export async function pullTasksFromPool(
workerId: string,
maxStores: number = 6
): Promise<PoolTask[]> {
const { rows } = await pool.query<PoolTask>(
`SELECT * FROM pull_tasks_from_pool($1, $2)`,
[workerId, maxStores]
);
if (rows.length > 0) {
const storeIds = [...new Set(rows.map(t => t.dispensary_id))];
console.log(`[TaskPool] ${workerId} pulled ${rows.length} tasks for ${storeIds.length} stores`);
}
return rows;
}
/**
* Worker releases their pool (exhausted or done)
*/
export async function releasePool(workerId: string): Promise<boolean> {
const { rows } = await pool.query(
`SELECT worker_release_pool($1) as success`,
[workerId]
);
console.log(`[TaskPool] ${workerId} released pool`);
return rows[0]?.success || false;
}
/**
* Check if worker is exhausted (visited 6 stores)
*/
export async function isWorkerExhausted(workerId: string): Promise<boolean> {
const { rows } = await pool.query(
`SELECT pool_stores_visited >= pool_max_stores as exhausted
FROM worker_registry WHERE worker_id = $1`,
[workerId]
);
return rows[0]?.exhausted || false;
}
/**
* Get worker's current pool info
*/
export async function getWorkerPool(workerId: string): Promise<TaskPool | null> {
const { rows } = await pool.query<TaskPool>(
`SELECT tp.id as pool_id, tp.name as pool_name, tp.display_name,
tp.state_code, tp.city, tp.latitude, tp.longitude, tp.timezone
FROM worker_registry wr
JOIN task_pools tp ON tp.id = wr.current_pool_id
WHERE wr.worker_id = $1`,
[workerId]
);
return rows[0] || null;
}
/**
* Update worker's store visit count
*/
export async function incrementStoreVisits(
workerId: string,
count: number = 1
): Promise<void> {
await pool.query(
`UPDATE worker_registry
SET pool_stores_visited = pool_stores_visited + $2, updated_at = NOW()
WHERE worker_id = $1`,
[workerId, count]
);
}
/**
* Update worker_registry with pool geo info (for dashboard)
*/
export async function updateWorkerPoolGeo(
workerId: string,
poolInfo: TaskPool
): Promise<void> {
await pool.query(
`UPDATE worker_registry
SET current_state = $2, current_city = $3, updated_at = NOW()
WHERE worker_id = $1`,
[workerId, poolInfo.state_code, poolInfo.city]
);
}

View File

@@ -76,6 +76,13 @@ import { IdentityPoolService, WorkerIdentity, getRandomTaskCount } from '../serv
// NEW: Session-based worker pool (claim tasks first, then get IP)
import * as WorkerSession from '../services/worker-session';
// NEW: Task pool system (geo-based pools)
import * as TaskPool from '../services/task-pool';
// Feature flag: Use new task pool system (geo-based pools)
// This is the correct flow: check pools → claim pool → get proxy → preflight → pull tasks
const USE_TASK_POOLS = process.env.USE_TASK_POOLS === 'true';
// Feature flag: Use new identity pool system (set via env var)
const USE_IDENTITY_POOL = process.env.USE_IDENTITY_POOL === 'true';
@@ -1490,7 +1497,16 @@ export class TaskWorker {
// Try to claim more tasks if we have capacity
if (this.canAcceptMoreTasks()) {
// =================================================================
// NEW SESSION POOL FLOW (enabled via USE_SESSION_POOL)
// TASK POOL FLOW (enabled via USE_TASK_POOLS)
// Correct order: check pools → claim pool → get proxy → preflight → pull tasks
// =================================================================
if (USE_TASK_POOLS) {
await this.taskPoolMainLoop();
return;
}
// =================================================================
// SESSION POOL FLOW (enabled via USE_SESSION_POOL)
// Correct order: claim tasks → get geo from tasks → get IP → preflight
// =================================================================
if (USE_SESSION_POOL) {
@@ -1825,6 +1841,269 @@ export class TaskWorker {
});
}
// ===========================================================================
// TASK POOL MAIN LOOP (new system - enabled via USE_TASK_POOLS)
// ===========================================================================
// Correct flow:
// 1. Check what pools have pending tasks
// 2. Claim a pool (geo-based)
// 3. Get Evomi proxy for that pool's geo
// 4. Run preflight with that proxy
// 5. Pull tasks from pool (up to 6 stores)
// 6. Execute tasks
// 7. When exhausted (6 stores), release pool and repeat
// ===========================================================================
private currentPoolInfo: TaskPool.TaskPool | null = null;
private poolProxyUrl: string | null = null;
private poolPreflightPassed: boolean = false;
private poolTasks: TaskPool.PoolTask[] = [];
private async taskPoolMainLoop(): Promise<void> {
// Check if worker is exhausted (visited 6 stores)
if (this.currentPoolInfo) {
const exhausted = await TaskPool.isWorkerExhausted(this.workerId);
if (exhausted) {
console.log(`[TaskWorker] ${this.friendlyName} exhausted (6 stores), releasing pool...`);
await TaskPool.releasePool(this.workerId);
this.currentPoolInfo = null;
this.poolProxyUrl = null;
this.poolPreflightPassed = false;
this.poolTasks = [];
this.geoState = null;
this.geoCity = null;
this.crawlRotator.clearFixedProxy();
// Continue to claim new pool
}
}
// If no pool claimed, check what's available and claim one
if (!this.currentPoolInfo) {
// Step 1: Check what pools have pending tasks
this.setPreflightStep('checking', 'Checking task pools');
const pools = await TaskPool.getPoolsWithPendingTasks();
if (pools.length === 0) {
this.setPreflightStep('waiting', 'No pools with pending tasks');
await this.sleep(30000);
return;
}
console.log(`[TaskWorker] ${this.friendlyName} found ${pools.length} pools with tasks`);
pools.slice(0, 3).forEach(p => {
console.log(`[TaskWorker] - ${p.display_name}: ${p.pending_count} tasks, ${p.store_count} stores`);
});
// Step 2: Claim a pool
this.setPreflightStep('claiming', 'Claiming task pool');
const result = await TaskPool.claimPool(this.workerId);
if (!result) {
this.setPreflightStep('waiting', 'No pools available');
await this.sleep(30000);
return;
}
this.currentPoolInfo = result.pool;
this.poolProxyUrl = result.proxyUrl;
this.geoState = result.pool.state_code;
this.geoCity = result.pool.city;
// Update dashboard with geo info
await TaskPool.updateWorkerPoolGeo(this.workerId, result.pool);
console.log(`[TaskWorker] ${this.friendlyName} claimed pool: ${result.pool.display_name}`);
// Step 3: Configure proxy for this pool's geo
this.setPreflightStep('proxy', `Setting proxy for ${result.pool.display_name}`);
this.crawlRotator.setFixedProxy(this.poolProxyUrl);
// Step 4: Initialize stealth if needed
if (!this.stealthInitialized) {
this.setPreflightStep('init', 'Initializing stealth plugins');
const initSuccess = await this.initStealthOnly();
if (!initSuccess) {
this.setPreflightStep('init_failed', 'Stealth init failed');
await TaskPool.releasePool(this.workerId);
this.currentPoolInfo = null;
this.poolProxyUrl = null;
await this.sleep(30000);
return;
}
}
// Step 5: Run preflight with pool's proxy
this.setPreflightStep('preflight', 'Running browser preflight');
console.log(`[TaskWorker] ${this.friendlyName} running preflight for ${result.pool.display_name}...`);
try {
await this.runDualPreflights();
if (this.preflightHttpPassed) {
this.poolPreflightPassed = true;
this.setPreflightStep('ready', `Qualified - ${result.pool.display_name}`);
console.log(`[TaskWorker] ${this.friendlyName} preflight PASSED (IP: ${this.preflightHttpResult?.proxyIp || 'unknown'})`);
} else {
this.setPreflightStep('failed', this.preflightHttpResult?.error || 'Preflight failed');
console.error(`[TaskWorker] ${this.friendlyName} preflight FAILED, releasing pool...`);
await TaskPool.releasePool(this.workerId);
this.currentPoolInfo = null;
this.poolProxyUrl = null;
this.poolPreflightPassed = false;
this.crawlRotator.clearFixedProxy();
await this.sleep(30000);
return;
}
} catch (err: any) {
this.setPreflightStep('error', err.message);
console.error(`[TaskWorker] ${this.friendlyName} preflight error: ${err.message}`);
await TaskPool.releasePool(this.workerId);
this.currentPoolInfo = null;
this.poolProxyUrl = null;
await this.sleep(30000);
return;
}
}
// We have a pool and preflight passed - pull and execute tasks
if (!this.poolPreflightPassed) {
console.log(`[TaskWorker] ${this.friendlyName} pool preflight not passed, waiting...`);
await this.sleep(10000);
return;
}
// Pull tasks from pool if we don't have any
if (this.poolTasks.length === 0 || this.poolTasks.every(t => this.activeTasks.has(t.task_id))) {
const tasks = await TaskPool.pullTasksFromPool(this.workerId);
if (tasks.length === 0) {
// No more tasks in this pool - check if exhausted
const exhausted = await TaskPool.isWorkerExhausted(this.workerId);
if (exhausted) {
console.log(`[TaskWorker] ${this.friendlyName} pool exhausted, will release on next loop`);
} else {
console.log(`[TaskWorker] ${this.friendlyName} no tasks in pool, waiting...`);
}
await this.sleep(POLL_INTERVAL_MS);
return;
}
this.poolTasks = tasks;
}
// Find next task to execute
const nextTask = this.poolTasks.find(t => !this.activeTasks.has(t.task_id));
if (!nextTask) {
await this.sleep(POLL_INTERVAL_MS);
return;
}
// Convert pool task to worker task format
const workerTask: WorkerTask = {
id: nextTask.task_id,
role: nextTask.role as TaskRole,
dispensary_id: nextTask.dispensary_id,
dispensary_name: nextTask.dispensary_name,
method: (nextTask.method || 'http') as 'http' | 'curl' | null,
platform: nextTask.platform,
priority: 50,
status: 'claimed' as TaskStatus,
scheduled_for: null,
worker_id: this.workerId,
claimed_at: new Date(),
started_at: null,
completed_at: null,
last_heartbeat_at: null,
result: null,
error_message: null,
retry_count: 0,
max_retries: 3,
payload: null,
created_at: new Date(),
updated_at: new Date(),
};
console.log(`[TaskWorker] ${this.friendlyName} starting task ${nextTask.task_id} (${nextTask.role}) for ${nextTask.dispensary_name}`);
this.activeTasks.set(nextTask.task_id, workerTask);
// Execute task in background
const taskPromise = this.executePoolTask(workerTask);
this.taskPromises.set(nextTask.task_id, taskPromise);
// Cleanup when done
taskPromise.finally(() => {
this.activeTasks.delete(nextTask.task_id);
this.taskPromises.delete(nextTask.task_id);
// Remove from poolTasks
this.poolTasks = this.poolTasks.filter(t => t.task_id !== nextTask.task_id);
});
}
/**
* Initialize stealth WITHOUT running preflight
* Used by task pool flow where preflight runs after pool is claimed
*/
private async initStealthOnly(): Promise<boolean> {
try {
await this.ensureStealthReady();
this.stealthInitialized = true;
return true;
} catch (err: any) {
console.error(`[TaskWorker] Stealth init failed: ${err.message}`);
return false;
}
}
/**
* Execute a task for task pool (handles completion/failure tracking)
*/
private async executePoolTask(task: WorkerTask): Promise<void> {
console.log(`[TaskWorker] ${this.friendlyName} executing pool task ${task.id} (${task.role})`);
try {
// Mark as running
await taskService.startTask(task.id);
// Get handler for this role
const handler = getHandlerForTask(task);
if (!handler) {
throw new Error(`No handler registered for role: ${task.role}`);
}
// Create context
const ctx: TaskContext = {
pool: this.pool,
workerId: this.workerId,
task,
heartbeat: async () => {
await taskService.heartbeat(task.id);
},
crawlRotator: this.crawlRotator,
updateStep: (step: string, detail?: string) => {
this.updateTaskStep(task.id, step, detail);
},
fingerprint: this.storedFingerprint || undefined,
};
this.updateTaskStep(task.id, 'starting', `Initializing ${task.role}`);
// Execute
const result = await handler(ctx);
this.clearTaskStep(task.id);
if (result.success) {
await taskService.completeTask(task.id, { result: result.data });
console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED`);
} else {
await taskService.failTask(task.id, result.error || 'Unknown error');
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} FAILED: ${result.error}`);
}
} catch (err: any) {
this.clearTaskStep(task.id);
await taskService.failTask(task.id, err.message);
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} ERROR: ${err.message}`);
}
}
/**
* Execute a task for session pool (handles completion/failure tracking)
*/