feat(workers): Implement geo-based task pools
Workers now follow the correct flow: 1. Check what pools have pending tasks 2. Claim a pool (e.g., Phoenix AZ) 3. Get Evomi proxy for that geo 4. Run preflight with geo proxy 5. Pull tasks from pool (up to 6 stores) 6. Execute tasks 7. Release pool when exhausted (6 stores visited) Task pools group dispensaries by metro area (100mi radius): - Phoenix AZ, Tucson AZ - Los Angeles CA, San Francisco CA, San Diego CA, Sacramento CA - Denver CO, Chicago IL, Boston MA, Detroit MI - Las Vegas NV, Reno NV, Newark NJ, New York NY - Oklahoma City OK, Tulsa OK, Portland OR, Seattle WA Benefits: - Workers know geo BEFORE getting proxy (no more "No geo assigned") - IP diversity within metro area (Phoenix worker can use Tempe IP) - Simpler worker logic - just match pool geo - Pre-organized tasks, not grouped at claim time New files: - migrations/113_task_pools.sql - schema, seed data, functions - src/services/task-pool.ts - TypeScript service Env vars: - USE_TASK_POOLS=true (new system) - USE_IDENTITY_POOL=false (disabled) 🤖 Generated with [Claude Code](https://claude.ai/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
381
backend/migrations/113_task_pools.sql
Normal file
381
backend/migrations/113_task_pools.sql
Normal file
@@ -0,0 +1,381 @@
|
|||||||
|
-- Task Pools: Group tasks by geo area for worker assignment
|
||||||
|
-- Workers claim a pool, get proxy for that geo, then pull tasks from pool
|
||||||
|
|
||||||
|
-- ============================================================================
|
||||||
|
-- TASK POOLS TABLE
|
||||||
|
-- ============================================================================
|
||||||
|
-- Each pool represents a metro area (e.g., Phoenix AZ = 100mi radius)
|
||||||
|
-- Dispensaries are assigned to pools based on location
|
||||||
|
-- Workers claim a pool, not individual tasks
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS task_pools (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
name VARCHAR(100) NOT NULL UNIQUE, -- e.g., 'phoenix_az'
|
||||||
|
display_name VARCHAR(100) NOT NULL, -- e.g., 'Phoenix, AZ'
|
||||||
|
state_code VARCHAR(2) NOT NULL, -- e.g., 'AZ'
|
||||||
|
city VARCHAR(100) NOT NULL, -- e.g., 'Phoenix'
|
||||||
|
latitude DECIMAL(10, 6) NOT NULL, -- pool center lat
|
||||||
|
longitude DECIMAL(10, 6) NOT NULL, -- pool center lng
|
||||||
|
radius_miles INTEGER DEFAULT 100, -- pool radius (100mi default)
|
||||||
|
timezone VARCHAR(50) NOT NULL, -- e.g., 'America/Phoenix'
|
||||||
|
is_active BOOLEAN DEFAULT true,
|
||||||
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
||||||
|
updated_at TIMESTAMPTZ DEFAULT NOW()
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Index for active pools
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_task_pools_active ON task_pools(is_active) WHERE is_active = true;
|
||||||
|
|
||||||
|
-- ============================================================================
|
||||||
|
-- LINK DISPENSARIES TO POOLS
|
||||||
|
-- ============================================================================
|
||||||
|
-- Add pool_id to dispensaries table
|
||||||
|
|
||||||
|
ALTER TABLE dispensaries
|
||||||
|
ADD COLUMN IF NOT EXISTS pool_id INTEGER REFERENCES task_pools(id);
|
||||||
|
|
||||||
|
-- Index for pool membership
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_dispensaries_pool ON dispensaries(pool_id) WHERE pool_id IS NOT NULL;
|
||||||
|
|
||||||
|
-- ============================================================================
|
||||||
|
-- WORKER POOL ASSIGNMENT
|
||||||
|
-- ============================================================================
|
||||||
|
-- Track which pool a worker is currently assigned to
|
||||||
|
|
||||||
|
ALTER TABLE worker_registry
|
||||||
|
ADD COLUMN IF NOT EXISTS current_pool_id INTEGER REFERENCES task_pools(id),
|
||||||
|
ADD COLUMN IF NOT EXISTS pool_claimed_at TIMESTAMPTZ,
|
||||||
|
ADD COLUMN IF NOT EXISTS pool_stores_visited INTEGER DEFAULT 0,
|
||||||
|
ADD COLUMN IF NOT EXISTS pool_max_stores INTEGER DEFAULT 6;
|
||||||
|
|
||||||
|
-- ============================================================================
|
||||||
|
-- SEED INITIAL POOLS
|
||||||
|
-- ============================================================================
|
||||||
|
-- Major cannabis markets with approximate center coordinates
|
||||||
|
|
||||||
|
INSERT INTO task_pools (name, display_name, state_code, city, latitude, longitude, timezone, radius_miles) VALUES
|
||||||
|
-- Arizona
|
||||||
|
('phoenix_az', 'Phoenix, AZ', 'AZ', 'Phoenix', 33.4484, -112.0740, 'America/Phoenix', 100),
|
||||||
|
('tucson_az', 'Tucson, AZ', 'AZ', 'Tucson', 32.2226, -110.9747, 'America/Phoenix', 75),
|
||||||
|
|
||||||
|
-- California
|
||||||
|
('los_angeles_ca', 'Los Angeles, CA', 'CA', 'Los Angeles', 34.0522, -118.2437, 'America/Los_Angeles', 100),
|
||||||
|
('san_francisco_ca', 'San Francisco, CA', 'CA', 'San Francisco', 37.7749, -122.4194, 'America/Los_Angeles', 75),
|
||||||
|
('san_diego_ca', 'San Diego, CA', 'CA', 'San Diego', 32.7157, -117.1611, 'America/Los_Angeles', 75),
|
||||||
|
('sacramento_ca', 'Sacramento, CA', 'CA', 'Sacramento', 38.5816, -121.4944, 'America/Los_Angeles', 75),
|
||||||
|
|
||||||
|
-- Colorado
|
||||||
|
('denver_co', 'Denver, CO', 'CO', 'Denver', 39.7392, -104.9903, 'America/Denver', 100),
|
||||||
|
|
||||||
|
-- Illinois
|
||||||
|
('chicago_il', 'Chicago, IL', 'IL', 'Chicago', 41.8781, -87.6298, 'America/Chicago', 100),
|
||||||
|
|
||||||
|
-- Massachusetts
|
||||||
|
('boston_ma', 'Boston, MA', 'MA', 'Boston', 42.3601, -71.0589, 'America/New_York', 75),
|
||||||
|
|
||||||
|
-- Michigan
|
||||||
|
('detroit_mi', 'Detroit, MI', 'MI', 'Detroit', 42.3314, -83.0458, 'America/Detroit', 100),
|
||||||
|
|
||||||
|
-- Nevada
|
||||||
|
('las_vegas_nv', 'Las Vegas, NV', 'NV', 'Las Vegas', 36.1699, -115.1398, 'America/Los_Angeles', 75),
|
||||||
|
('reno_nv', 'Reno, NV', 'NV', 'Reno', 39.5296, -119.8138, 'America/Los_Angeles', 50),
|
||||||
|
|
||||||
|
-- New Jersey
|
||||||
|
('newark_nj', 'Newark, NJ', 'NJ', 'Newark', 40.7357, -74.1724, 'America/New_York', 75),
|
||||||
|
|
||||||
|
-- New York
|
||||||
|
('new_york_ny', 'New York, NY', 'NY', 'New York', 40.7128, -74.0060, 'America/New_York', 75),
|
||||||
|
|
||||||
|
-- Oklahoma
|
||||||
|
('oklahoma_city_ok', 'Oklahoma City, OK', 'OK', 'Oklahoma City', 35.4676, -97.5164, 'America/Chicago', 100),
|
||||||
|
('tulsa_ok', 'Tulsa, OK', 'OK', 'Tulsa', 36.1540, -95.9928, 'America/Chicago', 75),
|
||||||
|
|
||||||
|
-- Oregon
|
||||||
|
('portland_or', 'Portland, OR', 'OR', 'Portland', 45.5152, -122.6784, 'America/Los_Angeles', 75),
|
||||||
|
|
||||||
|
-- Washington
|
||||||
|
('seattle_wa', 'Seattle, WA', 'WA', 'Seattle', 47.6062, -122.3321, 'America/Los_Angeles', 100)
|
||||||
|
|
||||||
|
ON CONFLICT (name) DO NOTHING;
|
||||||
|
|
||||||
|
-- ============================================================================
|
||||||
|
-- FUNCTION: Assign dispensary to nearest pool
|
||||||
|
-- ============================================================================
|
||||||
|
CREATE OR REPLACE FUNCTION assign_dispensary_to_pool(disp_id INTEGER)
|
||||||
|
RETURNS INTEGER AS $$
|
||||||
|
DECLARE
|
||||||
|
disp_lat DECIMAL(10,6);
|
||||||
|
disp_lng DECIMAL(10,6);
|
||||||
|
nearest_pool_id INTEGER;
|
||||||
|
BEGIN
|
||||||
|
-- Get dispensary coordinates
|
||||||
|
SELECT latitude, longitude INTO disp_lat, disp_lng
|
||||||
|
FROM dispensaries WHERE id = disp_id;
|
||||||
|
|
||||||
|
IF disp_lat IS NULL OR disp_lng IS NULL THEN
|
||||||
|
RETURN NULL;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- Find nearest active pool within radius
|
||||||
|
-- Using Haversine approximation (accurate enough for 100mi)
|
||||||
|
SELECT id INTO nearest_pool_id
|
||||||
|
FROM task_pools
|
||||||
|
WHERE is_active = true
|
||||||
|
AND (
|
||||||
|
3959 * acos(
|
||||||
|
cos(radians(latitude)) * cos(radians(disp_lat)) *
|
||||||
|
cos(radians(disp_lng) - radians(longitude)) +
|
||||||
|
sin(radians(latitude)) * sin(radians(disp_lat))
|
||||||
|
)
|
||||||
|
) <= radius_miles
|
||||||
|
ORDER BY (
|
||||||
|
3959 * acos(
|
||||||
|
cos(radians(latitude)) * cos(radians(disp_lat)) *
|
||||||
|
cos(radians(disp_lng) - radians(longitude)) +
|
||||||
|
sin(radians(latitude)) * sin(radians(disp_lat))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
LIMIT 1;
|
||||||
|
|
||||||
|
-- Update dispensary
|
||||||
|
IF nearest_pool_id IS NOT NULL THEN
|
||||||
|
UPDATE dispensaries SET pool_id = nearest_pool_id WHERE id = disp_id;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
RETURN nearest_pool_id;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- ============================================================================
|
||||||
|
-- FUNCTION: Assign all dispensaries to pools (batch)
|
||||||
|
-- ============================================================================
|
||||||
|
CREATE OR REPLACE FUNCTION assign_all_dispensaries_to_pools()
|
||||||
|
RETURNS TABLE(assigned INTEGER, unassigned INTEGER) AS $$
|
||||||
|
DECLARE
|
||||||
|
assigned_count INTEGER := 0;
|
||||||
|
unassigned_count INTEGER := 0;
|
||||||
|
disp RECORD;
|
||||||
|
pool_id INTEGER;
|
||||||
|
BEGIN
|
||||||
|
FOR disp IN SELECT id FROM dispensaries WHERE pool_id IS NULL AND latitude IS NOT NULL LOOP
|
||||||
|
pool_id := assign_dispensary_to_pool(disp.id);
|
||||||
|
IF pool_id IS NOT NULL THEN
|
||||||
|
assigned_count := assigned_count + 1;
|
||||||
|
ELSE
|
||||||
|
unassigned_count := unassigned_count + 1;
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
RETURN QUERY SELECT assigned_count, unassigned_count;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- ============================================================================
|
||||||
|
-- FUNCTION: Get pools with pending tasks
|
||||||
|
-- ============================================================================
|
||||||
|
CREATE OR REPLACE FUNCTION get_pools_with_pending_tasks()
|
||||||
|
RETURNS TABLE(
|
||||||
|
pool_id INTEGER,
|
||||||
|
pool_name VARCHAR(100),
|
||||||
|
display_name VARCHAR(100),
|
||||||
|
state_code VARCHAR(2),
|
||||||
|
city VARCHAR(100),
|
||||||
|
timezone VARCHAR(50),
|
||||||
|
pending_count BIGINT,
|
||||||
|
store_count BIGINT
|
||||||
|
) AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN QUERY
|
||||||
|
SELECT
|
||||||
|
tp.id as pool_id,
|
||||||
|
tp.name as pool_name,
|
||||||
|
tp.display_name,
|
||||||
|
tp.state_code,
|
||||||
|
tp.city,
|
||||||
|
tp.timezone,
|
||||||
|
COUNT(DISTINCT t.id) as pending_count,
|
||||||
|
COUNT(DISTINCT d.id) as store_count
|
||||||
|
FROM task_pools tp
|
||||||
|
JOIN dispensaries d ON d.pool_id = tp.id
|
||||||
|
JOIN tasks t ON t.dispensary_id = d.id AND t.status = 'pending'
|
||||||
|
WHERE tp.is_active = true
|
||||||
|
GROUP BY tp.id, tp.name, tp.display_name, tp.state_code, tp.city, tp.timezone
|
||||||
|
HAVING COUNT(DISTINCT t.id) > 0
|
||||||
|
ORDER BY COUNT(DISTINCT t.id) DESC;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- ============================================================================
|
||||||
|
-- FUNCTION: Worker claims a pool
|
||||||
|
-- ============================================================================
|
||||||
|
CREATE OR REPLACE FUNCTION worker_claim_pool(
|
||||||
|
p_worker_id VARCHAR(100),
|
||||||
|
p_pool_id INTEGER DEFAULT NULL
|
||||||
|
)
|
||||||
|
RETURNS TABLE(
|
||||||
|
pool_id INTEGER,
|
||||||
|
pool_name VARCHAR(100),
|
||||||
|
display_name VARCHAR(100),
|
||||||
|
state_code VARCHAR(2),
|
||||||
|
city VARCHAR(100),
|
||||||
|
latitude DECIMAL(10,6),
|
||||||
|
longitude DECIMAL(10,6),
|
||||||
|
timezone VARCHAR(50)
|
||||||
|
) AS $$
|
||||||
|
DECLARE
|
||||||
|
claimed_pool_id INTEGER;
|
||||||
|
BEGIN
|
||||||
|
-- If no pool specified, pick the one with most pending tasks
|
||||||
|
IF p_pool_id IS NULL THEN
|
||||||
|
SELECT tp.id INTO claimed_pool_id
|
||||||
|
FROM task_pools tp
|
||||||
|
JOIN dispensaries d ON d.pool_id = tp.id
|
||||||
|
JOIN tasks t ON t.dispensary_id = d.id AND t.status = 'pending'
|
||||||
|
WHERE tp.is_active = true
|
||||||
|
GROUP BY tp.id
|
||||||
|
ORDER BY COUNT(DISTINCT t.id) DESC
|
||||||
|
LIMIT 1;
|
||||||
|
ELSE
|
||||||
|
claimed_pool_id := p_pool_id;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
IF claimed_pool_id IS NULL THEN
|
||||||
|
RETURN;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- Update worker registry with pool assignment
|
||||||
|
UPDATE worker_registry
|
||||||
|
SET
|
||||||
|
current_pool_id = claimed_pool_id,
|
||||||
|
pool_claimed_at = NOW(),
|
||||||
|
pool_stores_visited = 0,
|
||||||
|
pool_max_stores = 6,
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE worker_id = p_worker_id;
|
||||||
|
|
||||||
|
-- Return pool info
|
||||||
|
RETURN QUERY
|
||||||
|
SELECT
|
||||||
|
tp.id,
|
||||||
|
tp.name,
|
||||||
|
tp.display_name,
|
||||||
|
tp.state_code,
|
||||||
|
tp.city,
|
||||||
|
tp.latitude,
|
||||||
|
tp.longitude,
|
||||||
|
tp.timezone
|
||||||
|
FROM task_pools tp
|
||||||
|
WHERE tp.id = claimed_pool_id;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- ============================================================================
|
||||||
|
-- FUNCTION: Pull tasks from worker's pool (up to 6 stores)
|
||||||
|
-- ============================================================================
|
||||||
|
CREATE OR REPLACE FUNCTION pull_tasks_from_pool(
|
||||||
|
p_worker_id VARCHAR(100),
|
||||||
|
p_max_stores INTEGER DEFAULT 6
|
||||||
|
)
|
||||||
|
RETURNS TABLE(
|
||||||
|
task_id INTEGER,
|
||||||
|
dispensary_id INTEGER,
|
||||||
|
dispensary_name VARCHAR(255),
|
||||||
|
role VARCHAR(50),
|
||||||
|
platform VARCHAR(50),
|
||||||
|
method VARCHAR(20)
|
||||||
|
) AS $$
|
||||||
|
DECLARE
|
||||||
|
worker_pool_id INTEGER;
|
||||||
|
stores_visited INTEGER;
|
||||||
|
max_stores INTEGER;
|
||||||
|
stores_remaining INTEGER;
|
||||||
|
BEGIN
|
||||||
|
-- Get worker's current pool and store count
|
||||||
|
SELECT current_pool_id, pool_stores_visited, pool_max_stores
|
||||||
|
INTO worker_pool_id, stores_visited, max_stores
|
||||||
|
FROM worker_registry
|
||||||
|
WHERE worker_id = p_worker_id;
|
||||||
|
|
||||||
|
IF worker_pool_id IS NULL THEN
|
||||||
|
RAISE EXCEPTION 'Worker % has no pool assigned', p_worker_id;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
stores_remaining := max_stores - stores_visited;
|
||||||
|
IF stores_remaining <= 0 THEN
|
||||||
|
RETURN; -- Worker exhausted
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- Claim tasks from pool (one task per store, up to remaining capacity)
|
||||||
|
RETURN QUERY
|
||||||
|
WITH available_stores AS (
|
||||||
|
SELECT DISTINCT ON (d.id)
|
||||||
|
t.id as task_id,
|
||||||
|
d.id as dispensary_id,
|
||||||
|
d.name as dispensary_name,
|
||||||
|
t.role,
|
||||||
|
t.platform,
|
||||||
|
t.method
|
||||||
|
FROM tasks t
|
||||||
|
JOIN dispensaries d ON d.id = t.dispensary_id
|
||||||
|
WHERE d.pool_id = worker_pool_id
|
||||||
|
AND t.status = 'pending'
|
||||||
|
AND t.scheduled_for <= NOW()
|
||||||
|
ORDER BY d.id, t.priority DESC, t.created_at ASC
|
||||||
|
LIMIT stores_remaining
|
||||||
|
),
|
||||||
|
claimed AS (
|
||||||
|
UPDATE tasks
|
||||||
|
SET
|
||||||
|
status = 'claimed',
|
||||||
|
claimed_by = p_worker_id,
|
||||||
|
claimed_at = NOW()
|
||||||
|
WHERE id IN (SELECT task_id FROM available_stores)
|
||||||
|
RETURNING id
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
av.task_id,
|
||||||
|
av.dispensary_id,
|
||||||
|
av.dispensary_name,
|
||||||
|
av.role,
|
||||||
|
av.platform,
|
||||||
|
av.method
|
||||||
|
FROM available_stores av
|
||||||
|
WHERE av.task_id IN (SELECT id FROM claimed);
|
||||||
|
|
||||||
|
-- Update worker store count
|
||||||
|
UPDATE worker_registry
|
||||||
|
SET
|
||||||
|
pool_stores_visited = pool_stores_visited + (
|
||||||
|
SELECT COUNT(DISTINCT dispensary_id)
|
||||||
|
FROM tasks
|
||||||
|
WHERE claimed_by = p_worker_id AND status = 'claimed'
|
||||||
|
),
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE worker_id = p_worker_id;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- ============================================================================
|
||||||
|
-- FUNCTION: Worker releases pool (exhausted or done)
|
||||||
|
-- ============================================================================
|
||||||
|
CREATE OR REPLACE FUNCTION worker_release_pool(p_worker_id VARCHAR(100))
|
||||||
|
RETURNS BOOLEAN AS $$
|
||||||
|
BEGIN
|
||||||
|
UPDATE worker_registry
|
||||||
|
SET
|
||||||
|
current_pool_id = NULL,
|
||||||
|
pool_claimed_at = NULL,
|
||||||
|
pool_stores_visited = 0,
|
||||||
|
current_state = NULL,
|
||||||
|
current_city = NULL,
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE worker_id = p_worker_id;
|
||||||
|
|
||||||
|
RETURN true;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- ============================================================================
|
||||||
|
-- RUN: Assign existing dispensaries to pools
|
||||||
|
-- ============================================================================
|
||||||
|
SELECT * FROM assign_all_dispensaries_to_pools();
|
||||||
190
backend/src/services/task-pool.ts
Normal file
190
backend/src/services/task-pool.ts
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
/**
|
||||||
|
* Task Pool Service
|
||||||
|
*
|
||||||
|
* Manages the pool-based worker flow:
|
||||||
|
* 1. Worker checks what pools have pending tasks
|
||||||
|
* 2. Worker claims a pool (gets geo info)
|
||||||
|
* 3. Worker gets proxy for that geo
|
||||||
|
* 4. Worker runs preflight
|
||||||
|
* 5. Worker pulls tasks from pool (up to 6 stores)
|
||||||
|
* 6. Worker executes tasks
|
||||||
|
* 7. Worker releases pool when exhausted (6 stores visited)
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { pool } from '../db/pool';
|
||||||
|
import { buildEvomiProxyUrl, getEvomiConfig } from './crawl-rotator';
|
||||||
|
|
||||||
|
export interface TaskPool {
|
||||||
|
pool_id: number;
|
||||||
|
pool_name: string;
|
||||||
|
display_name: string;
|
||||||
|
state_code: string;
|
||||||
|
city: string;
|
||||||
|
latitude: number;
|
||||||
|
longitude: number;
|
||||||
|
timezone: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface PoolWithPendingTasks extends TaskPool {
|
||||||
|
pending_count: number;
|
||||||
|
store_count: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface PoolTask {
|
||||||
|
task_id: number;
|
||||||
|
dispensary_id: number;
|
||||||
|
dispensary_name: string;
|
||||||
|
role: string;
|
||||||
|
platform: string;
|
||||||
|
method: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ClaimedPoolResult {
|
||||||
|
pool: TaskPool;
|
||||||
|
proxyUrl: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all pools that have pending tasks
|
||||||
|
*/
|
||||||
|
export async function getPoolsWithPendingTasks(): Promise<PoolWithPendingTasks[]> {
|
||||||
|
const { rows } = await pool.query<PoolWithPendingTasks>(
|
||||||
|
`SELECT * FROM get_pools_with_pending_tasks()`
|
||||||
|
);
|
||||||
|
return rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Worker claims a pool
|
||||||
|
* Returns pool info + proxy URL for that geo
|
||||||
|
*/
|
||||||
|
export async function claimPool(
|
||||||
|
workerId: string,
|
||||||
|
poolId?: number
|
||||||
|
): Promise<ClaimedPoolResult | null> {
|
||||||
|
// Claim pool in database
|
||||||
|
const { rows } = await pool.query<TaskPool>(
|
||||||
|
`SELECT * FROM worker_claim_pool($1, $2)`,
|
||||||
|
[workerId, poolId || null]
|
||||||
|
);
|
||||||
|
|
||||||
|
if (rows.length === 0) {
|
||||||
|
console.log(`[TaskPool] No pools available for ${workerId}`);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const claimedPool = rows[0];
|
||||||
|
|
||||||
|
// Build Evomi proxy URL for this pool's geo
|
||||||
|
const evomiConfig = getEvomiConfig();
|
||||||
|
if (!evomiConfig.enabled) {
|
||||||
|
throw new Error('Evomi proxy not configured');
|
||||||
|
}
|
||||||
|
|
||||||
|
const proxyResult = buildEvomiProxyUrl(
|
||||||
|
claimedPool.state_code,
|
||||||
|
workerId,
|
||||||
|
claimedPool.city.toLowerCase().replace(/\s+/g, '.')
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!proxyResult) {
|
||||||
|
throw new Error(`Failed to build proxy URL for ${claimedPool.display_name}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`[TaskPool] ${workerId} claimed pool: ${claimedPool.display_name}`);
|
||||||
|
|
||||||
|
return {
|
||||||
|
pool: claimedPool,
|
||||||
|
proxyUrl: proxyResult.url,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pull tasks from worker's assigned pool
|
||||||
|
* Returns up to 6 stores worth of tasks
|
||||||
|
*/
|
||||||
|
export async function pullTasksFromPool(
|
||||||
|
workerId: string,
|
||||||
|
maxStores: number = 6
|
||||||
|
): Promise<PoolTask[]> {
|
||||||
|
const { rows } = await pool.query<PoolTask>(
|
||||||
|
`SELECT * FROM pull_tasks_from_pool($1, $2)`,
|
||||||
|
[workerId, maxStores]
|
||||||
|
);
|
||||||
|
|
||||||
|
if (rows.length > 0) {
|
||||||
|
const storeIds = [...new Set(rows.map(t => t.dispensary_id))];
|
||||||
|
console.log(`[TaskPool] ${workerId} pulled ${rows.length} tasks for ${storeIds.length} stores`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Worker releases their pool (exhausted or done)
|
||||||
|
*/
|
||||||
|
export async function releasePool(workerId: string): Promise<boolean> {
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`SELECT worker_release_pool($1) as success`,
|
||||||
|
[workerId]
|
||||||
|
);
|
||||||
|
console.log(`[TaskPool] ${workerId} released pool`);
|
||||||
|
return rows[0]?.success || false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if worker is exhausted (visited 6 stores)
|
||||||
|
*/
|
||||||
|
export async function isWorkerExhausted(workerId: string): Promise<boolean> {
|
||||||
|
const { rows } = await pool.query(
|
||||||
|
`SELECT pool_stores_visited >= pool_max_stores as exhausted
|
||||||
|
FROM worker_registry WHERE worker_id = $1`,
|
||||||
|
[workerId]
|
||||||
|
);
|
||||||
|
return rows[0]?.exhausted || false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get worker's current pool info
|
||||||
|
*/
|
||||||
|
export async function getWorkerPool(workerId: string): Promise<TaskPool | null> {
|
||||||
|
const { rows } = await pool.query<TaskPool>(
|
||||||
|
`SELECT tp.id as pool_id, tp.name as pool_name, tp.display_name,
|
||||||
|
tp.state_code, tp.city, tp.latitude, tp.longitude, tp.timezone
|
||||||
|
FROM worker_registry wr
|
||||||
|
JOIN task_pools tp ON tp.id = wr.current_pool_id
|
||||||
|
WHERE wr.worker_id = $1`,
|
||||||
|
[workerId]
|
||||||
|
);
|
||||||
|
return rows[0] || null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update worker's store visit count
|
||||||
|
*/
|
||||||
|
export async function incrementStoreVisits(
|
||||||
|
workerId: string,
|
||||||
|
count: number = 1
|
||||||
|
): Promise<void> {
|
||||||
|
await pool.query(
|
||||||
|
`UPDATE worker_registry
|
||||||
|
SET pool_stores_visited = pool_stores_visited + $2, updated_at = NOW()
|
||||||
|
WHERE worker_id = $1`,
|
||||||
|
[workerId, count]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update worker_registry with pool geo info (for dashboard)
|
||||||
|
*/
|
||||||
|
export async function updateWorkerPoolGeo(
|
||||||
|
workerId: string,
|
||||||
|
poolInfo: TaskPool
|
||||||
|
): Promise<void> {
|
||||||
|
await pool.query(
|
||||||
|
`UPDATE worker_registry
|
||||||
|
SET current_state = $2, current_city = $3, updated_at = NOW()
|
||||||
|
WHERE worker_id = $1`,
|
||||||
|
[workerId, poolInfo.state_code, poolInfo.city]
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -76,6 +76,13 @@ import { IdentityPoolService, WorkerIdentity, getRandomTaskCount } from '../serv
|
|||||||
// NEW: Session-based worker pool (claim tasks first, then get IP)
|
// NEW: Session-based worker pool (claim tasks first, then get IP)
|
||||||
import * as WorkerSession from '../services/worker-session';
|
import * as WorkerSession from '../services/worker-session';
|
||||||
|
|
||||||
|
// NEW: Task pool system (geo-based pools)
|
||||||
|
import * as TaskPool from '../services/task-pool';
|
||||||
|
|
||||||
|
// Feature flag: Use new task pool system (geo-based pools)
|
||||||
|
// This is the correct flow: check pools → claim pool → get proxy → preflight → pull tasks
|
||||||
|
const USE_TASK_POOLS = process.env.USE_TASK_POOLS === 'true';
|
||||||
|
|
||||||
// Feature flag: Use new identity pool system (set via env var)
|
// Feature flag: Use new identity pool system (set via env var)
|
||||||
const USE_IDENTITY_POOL = process.env.USE_IDENTITY_POOL === 'true';
|
const USE_IDENTITY_POOL = process.env.USE_IDENTITY_POOL === 'true';
|
||||||
|
|
||||||
@@ -1490,7 +1497,16 @@ export class TaskWorker {
|
|||||||
// Try to claim more tasks if we have capacity
|
// Try to claim more tasks if we have capacity
|
||||||
if (this.canAcceptMoreTasks()) {
|
if (this.canAcceptMoreTasks()) {
|
||||||
// =================================================================
|
// =================================================================
|
||||||
// NEW SESSION POOL FLOW (enabled via USE_SESSION_POOL)
|
// TASK POOL FLOW (enabled via USE_TASK_POOLS)
|
||||||
|
// Correct order: check pools → claim pool → get proxy → preflight → pull tasks
|
||||||
|
// =================================================================
|
||||||
|
if (USE_TASK_POOLS) {
|
||||||
|
await this.taskPoolMainLoop();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// =================================================================
|
||||||
|
// SESSION POOL FLOW (enabled via USE_SESSION_POOL)
|
||||||
// Correct order: claim tasks → get geo from tasks → get IP → preflight
|
// Correct order: claim tasks → get geo from tasks → get IP → preflight
|
||||||
// =================================================================
|
// =================================================================
|
||||||
if (USE_SESSION_POOL) {
|
if (USE_SESSION_POOL) {
|
||||||
@@ -1825,6 +1841,269 @@ export class TaskWorker {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ===========================================================================
|
||||||
|
// TASK POOL MAIN LOOP (new system - enabled via USE_TASK_POOLS)
|
||||||
|
// ===========================================================================
|
||||||
|
// Correct flow:
|
||||||
|
// 1. Check what pools have pending tasks
|
||||||
|
// 2. Claim a pool (geo-based)
|
||||||
|
// 3. Get Evomi proxy for that pool's geo
|
||||||
|
// 4. Run preflight with that proxy
|
||||||
|
// 5. Pull tasks from pool (up to 6 stores)
|
||||||
|
// 6. Execute tasks
|
||||||
|
// 7. When exhausted (6 stores), release pool and repeat
|
||||||
|
// ===========================================================================
|
||||||
|
private currentPoolInfo: TaskPool.TaskPool | null = null;
|
||||||
|
private poolProxyUrl: string | null = null;
|
||||||
|
private poolPreflightPassed: boolean = false;
|
||||||
|
private poolTasks: TaskPool.PoolTask[] = [];
|
||||||
|
|
||||||
|
private async taskPoolMainLoop(): Promise<void> {
|
||||||
|
// Check if worker is exhausted (visited 6 stores)
|
||||||
|
if (this.currentPoolInfo) {
|
||||||
|
const exhausted = await TaskPool.isWorkerExhausted(this.workerId);
|
||||||
|
if (exhausted) {
|
||||||
|
console.log(`[TaskWorker] ${this.friendlyName} exhausted (6 stores), releasing pool...`);
|
||||||
|
await TaskPool.releasePool(this.workerId);
|
||||||
|
this.currentPoolInfo = null;
|
||||||
|
this.poolProxyUrl = null;
|
||||||
|
this.poolPreflightPassed = false;
|
||||||
|
this.poolTasks = [];
|
||||||
|
this.geoState = null;
|
||||||
|
this.geoCity = null;
|
||||||
|
this.crawlRotator.clearFixedProxy();
|
||||||
|
// Continue to claim new pool
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no pool claimed, check what's available and claim one
|
||||||
|
if (!this.currentPoolInfo) {
|
||||||
|
// Step 1: Check what pools have pending tasks
|
||||||
|
this.setPreflightStep('checking', 'Checking task pools');
|
||||||
|
const pools = await TaskPool.getPoolsWithPendingTasks();
|
||||||
|
|
||||||
|
if (pools.length === 0) {
|
||||||
|
this.setPreflightStep('waiting', 'No pools with pending tasks');
|
||||||
|
await this.sleep(30000);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`[TaskWorker] ${this.friendlyName} found ${pools.length} pools with tasks`);
|
||||||
|
pools.slice(0, 3).forEach(p => {
|
||||||
|
console.log(`[TaskWorker] - ${p.display_name}: ${p.pending_count} tasks, ${p.store_count} stores`);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Step 2: Claim a pool
|
||||||
|
this.setPreflightStep('claiming', 'Claiming task pool');
|
||||||
|
const result = await TaskPool.claimPool(this.workerId);
|
||||||
|
|
||||||
|
if (!result) {
|
||||||
|
this.setPreflightStep('waiting', 'No pools available');
|
||||||
|
await this.sleep(30000);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.currentPoolInfo = result.pool;
|
||||||
|
this.poolProxyUrl = result.proxyUrl;
|
||||||
|
this.geoState = result.pool.state_code;
|
||||||
|
this.geoCity = result.pool.city;
|
||||||
|
|
||||||
|
// Update dashboard with geo info
|
||||||
|
await TaskPool.updateWorkerPoolGeo(this.workerId, result.pool);
|
||||||
|
|
||||||
|
console.log(`[TaskWorker] ${this.friendlyName} claimed pool: ${result.pool.display_name}`);
|
||||||
|
|
||||||
|
// Step 3: Configure proxy for this pool's geo
|
||||||
|
this.setPreflightStep('proxy', `Setting proxy for ${result.pool.display_name}`);
|
||||||
|
this.crawlRotator.setFixedProxy(this.poolProxyUrl);
|
||||||
|
|
||||||
|
// Step 4: Initialize stealth if needed
|
||||||
|
if (!this.stealthInitialized) {
|
||||||
|
this.setPreflightStep('init', 'Initializing stealth plugins');
|
||||||
|
const initSuccess = await this.initStealthOnly();
|
||||||
|
if (!initSuccess) {
|
||||||
|
this.setPreflightStep('init_failed', 'Stealth init failed');
|
||||||
|
await TaskPool.releasePool(this.workerId);
|
||||||
|
this.currentPoolInfo = null;
|
||||||
|
this.poolProxyUrl = null;
|
||||||
|
await this.sleep(30000);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 5: Run preflight with pool's proxy
|
||||||
|
this.setPreflightStep('preflight', 'Running browser preflight');
|
||||||
|
console.log(`[TaskWorker] ${this.friendlyName} running preflight for ${result.pool.display_name}...`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.runDualPreflights();
|
||||||
|
|
||||||
|
if (this.preflightHttpPassed) {
|
||||||
|
this.poolPreflightPassed = true;
|
||||||
|
this.setPreflightStep('ready', `Qualified - ${result.pool.display_name}`);
|
||||||
|
console.log(`[TaskWorker] ${this.friendlyName} preflight PASSED (IP: ${this.preflightHttpResult?.proxyIp || 'unknown'})`);
|
||||||
|
} else {
|
||||||
|
this.setPreflightStep('failed', this.preflightHttpResult?.error || 'Preflight failed');
|
||||||
|
console.error(`[TaskWorker] ${this.friendlyName} preflight FAILED, releasing pool...`);
|
||||||
|
await TaskPool.releasePool(this.workerId);
|
||||||
|
this.currentPoolInfo = null;
|
||||||
|
this.poolProxyUrl = null;
|
||||||
|
this.poolPreflightPassed = false;
|
||||||
|
this.crawlRotator.clearFixedProxy();
|
||||||
|
await this.sleep(30000);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (err: any) {
|
||||||
|
this.setPreflightStep('error', err.message);
|
||||||
|
console.error(`[TaskWorker] ${this.friendlyName} preflight error: ${err.message}`);
|
||||||
|
await TaskPool.releasePool(this.workerId);
|
||||||
|
this.currentPoolInfo = null;
|
||||||
|
this.poolProxyUrl = null;
|
||||||
|
await this.sleep(30000);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We have a pool and preflight passed - pull and execute tasks
|
||||||
|
if (!this.poolPreflightPassed) {
|
||||||
|
console.log(`[TaskWorker] ${this.friendlyName} pool preflight not passed, waiting...`);
|
||||||
|
await this.sleep(10000);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pull tasks from pool if we don't have any
|
||||||
|
if (this.poolTasks.length === 0 || this.poolTasks.every(t => this.activeTasks.has(t.task_id))) {
|
||||||
|
const tasks = await TaskPool.pullTasksFromPool(this.workerId);
|
||||||
|
if (tasks.length === 0) {
|
||||||
|
// No more tasks in this pool - check if exhausted
|
||||||
|
const exhausted = await TaskPool.isWorkerExhausted(this.workerId);
|
||||||
|
if (exhausted) {
|
||||||
|
console.log(`[TaskWorker] ${this.friendlyName} pool exhausted, will release on next loop`);
|
||||||
|
} else {
|
||||||
|
console.log(`[TaskWorker] ${this.friendlyName} no tasks in pool, waiting...`);
|
||||||
|
}
|
||||||
|
await this.sleep(POLL_INTERVAL_MS);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.poolTasks = tasks;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find next task to execute
|
||||||
|
const nextTask = this.poolTasks.find(t => !this.activeTasks.has(t.task_id));
|
||||||
|
if (!nextTask) {
|
||||||
|
await this.sleep(POLL_INTERVAL_MS);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert pool task to worker task format
|
||||||
|
const workerTask: WorkerTask = {
|
||||||
|
id: nextTask.task_id,
|
||||||
|
role: nextTask.role as TaskRole,
|
||||||
|
dispensary_id: nextTask.dispensary_id,
|
||||||
|
dispensary_name: nextTask.dispensary_name,
|
||||||
|
method: (nextTask.method || 'http') as 'http' | 'curl' | null,
|
||||||
|
platform: nextTask.platform,
|
||||||
|
priority: 50,
|
||||||
|
status: 'claimed' as TaskStatus,
|
||||||
|
scheduled_for: null,
|
||||||
|
worker_id: this.workerId,
|
||||||
|
claimed_at: new Date(),
|
||||||
|
started_at: null,
|
||||||
|
completed_at: null,
|
||||||
|
last_heartbeat_at: null,
|
||||||
|
result: null,
|
||||||
|
error_message: null,
|
||||||
|
retry_count: 0,
|
||||||
|
max_retries: 3,
|
||||||
|
payload: null,
|
||||||
|
created_at: new Date(),
|
||||||
|
updated_at: new Date(),
|
||||||
|
};
|
||||||
|
|
||||||
|
console.log(`[TaskWorker] ${this.friendlyName} starting task ${nextTask.task_id} (${nextTask.role}) for ${nextTask.dispensary_name}`);
|
||||||
|
|
||||||
|
this.activeTasks.set(nextTask.task_id, workerTask);
|
||||||
|
|
||||||
|
// Execute task in background
|
||||||
|
const taskPromise = this.executePoolTask(workerTask);
|
||||||
|
this.taskPromises.set(nextTask.task_id, taskPromise);
|
||||||
|
|
||||||
|
// Cleanup when done
|
||||||
|
taskPromise.finally(() => {
|
||||||
|
this.activeTasks.delete(nextTask.task_id);
|
||||||
|
this.taskPromises.delete(nextTask.task_id);
|
||||||
|
// Remove from poolTasks
|
||||||
|
this.poolTasks = this.poolTasks.filter(t => t.task_id !== nextTask.task_id);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize stealth WITHOUT running preflight
|
||||||
|
* Used by task pool flow where preflight runs after pool is claimed
|
||||||
|
*/
|
||||||
|
private async initStealthOnly(): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
await this.ensureStealthReady();
|
||||||
|
this.stealthInitialized = true;
|
||||||
|
return true;
|
||||||
|
} catch (err: any) {
|
||||||
|
console.error(`[TaskWorker] Stealth init failed: ${err.message}`);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a task for task pool (handles completion/failure tracking)
|
||||||
|
*/
|
||||||
|
private async executePoolTask(task: WorkerTask): Promise<void> {
|
||||||
|
console.log(`[TaskWorker] ${this.friendlyName} executing pool task ${task.id} (${task.role})`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Mark as running
|
||||||
|
await taskService.startTask(task.id);
|
||||||
|
|
||||||
|
// Get handler for this role
|
||||||
|
const handler = getHandlerForTask(task);
|
||||||
|
if (!handler) {
|
||||||
|
throw new Error(`No handler registered for role: ${task.role}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create context
|
||||||
|
const ctx: TaskContext = {
|
||||||
|
pool: this.pool,
|
||||||
|
workerId: this.workerId,
|
||||||
|
task,
|
||||||
|
heartbeat: async () => {
|
||||||
|
await taskService.heartbeat(task.id);
|
||||||
|
},
|
||||||
|
crawlRotator: this.crawlRotator,
|
||||||
|
updateStep: (step: string, detail?: string) => {
|
||||||
|
this.updateTaskStep(task.id, step, detail);
|
||||||
|
},
|
||||||
|
fingerprint: this.storedFingerprint || undefined,
|
||||||
|
};
|
||||||
|
|
||||||
|
this.updateTaskStep(task.id, 'starting', `Initializing ${task.role}`);
|
||||||
|
|
||||||
|
// Execute
|
||||||
|
const result = await handler(ctx);
|
||||||
|
|
||||||
|
this.clearTaskStep(task.id);
|
||||||
|
|
||||||
|
if (result.success) {
|
||||||
|
await taskService.completeTask(task.id, { result: result.data });
|
||||||
|
console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED`);
|
||||||
|
} else {
|
||||||
|
await taskService.failTask(task.id, result.error || 'Unknown error');
|
||||||
|
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} FAILED: ${result.error}`);
|
||||||
|
}
|
||||||
|
} catch (err: any) {
|
||||||
|
this.clearTaskStep(task.id);
|
||||||
|
await taskService.failTask(task.id, err.message);
|
||||||
|
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} ERROR: ${err.message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute a task for session pool (handles completion/failure tracking)
|
* Execute a task for session pool (handles completion/failure tracking)
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -51,6 +51,13 @@ spec:
|
|||||||
# 3 browsers × ~400MB = ~1.3GB (safe for 2GB pod limit)
|
# 3 browsers × ~400MB = ~1.3GB (safe for 2GB pod limit)
|
||||||
- name: MAX_CONCURRENT_TASKS
|
- name: MAX_CONCURRENT_TASKS
|
||||||
value: "3"
|
value: "3"
|
||||||
|
# Task Pool System (geo-based pools)
|
||||||
|
# Correct flow: check pools → claim pool → get proxy → preflight → pull tasks
|
||||||
|
- name: USE_TASK_POOLS
|
||||||
|
value: "true"
|
||||||
|
# Disable legacy identity pool
|
||||||
|
- name: USE_IDENTITY_POOL
|
||||||
|
value: "false"
|
||||||
resources:
|
resources:
|
||||||
requests:
|
requests:
|
||||||
memory: "1Gi"
|
memory: "1Gi"
|
||||||
|
|||||||
Reference in New Issue
Block a user