-- Migration: 108_worker_geo_sessions.sql -- Description: Add geo session tracking to worker_registry for state-based task assignment -- Created: 2025-12-13 -- Worker geo session columns -- Worker qualifies with a geo (state/city), then only claims tasks matching that geo ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS current_state VARCHAR(2); ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS current_city VARCHAR(100); ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS geo_session_started_at TIMESTAMPTZ; ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS session_task_count INT DEFAULT 0; ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS session_max_tasks INT DEFAULT 7; ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS proxy_geo VARCHAR(100); -- Comments COMMENT ON COLUMN worker_registry.current_state IS 'Worker''s current geo assignment (US state code, e.g., AZ)'; COMMENT ON COLUMN worker_registry.current_city IS 'Worker''s current city assignment (optional, e.g., phoenix)'; COMMENT ON COLUMN worker_registry.geo_session_started_at IS 'When worker''s current geo session started'; COMMENT ON COLUMN worker_registry.session_task_count IS 'Number of tasks completed in current geo session'; COMMENT ON COLUMN worker_registry.session_max_tasks IS 'Max tasks per geo session before re-qualification (default 7)'; COMMENT ON COLUMN worker_registry.proxy_geo IS 'Geo target string used for proxy (e.g., "arizona" or "phoenix, arizona")'; -- Index for finding workers by state CREATE INDEX IF NOT EXISTS idx_worker_registry_current_state ON worker_registry(current_state) WHERE current_state IS NOT NULL; -- ============================================================ -- UPDATED claim_task FUNCTION -- Now filters by worker's geo session state -- ============================================================ CREATE OR REPLACE FUNCTION claim_task( p_role VARCHAR(50), p_worker_id VARCHAR(100), p_curl_passed BOOLEAN DEFAULT TRUE, p_http_passed BOOLEAN DEFAULT FALSE ) RETURNS worker_tasks AS $$ DECLARE claimed_task worker_tasks; worker_state VARCHAR(2); session_valid BOOLEAN; session_tasks INT; max_tasks INT; BEGIN -- Get worker's current geo session info SELECT current_state, session_task_count, session_max_tasks, (geo_session_started_at IS NOT NULL AND geo_session_started_at > NOW() - INTERVAL '60 minutes') INTO worker_state, session_tasks, max_tasks, session_valid FROM worker_registry WHERE worker_id = p_worker_id; -- If no valid geo session, or session exhausted, worker can't claim tasks -- Worker must re-qualify first IF worker_state IS NULL OR NOT session_valid OR session_tasks >= COALESCE(max_tasks, 7) THEN RETURN NULL; END IF; -- Claim task matching worker's state UPDATE worker_tasks SET status = 'claimed', worker_id = p_worker_id, claimed_at = NOW(), updated_at = NOW() WHERE id = ( SELECT wt.id FROM worker_tasks wt JOIN dispensaries d ON wt.dispensary_id = d.id WHERE wt.role = p_role AND wt.status = 'pending' AND (wt.scheduled_for IS NULL OR wt.scheduled_for <= NOW()) -- GEO FILTER: Task's dispensary must match worker's state AND d.state = worker_state -- Method compatibility: worker must have passed the required preflight AND ( wt.method IS NULL -- No preference, any worker can claim OR (wt.method = 'curl' AND p_curl_passed = TRUE) OR (wt.method = 'http' AND p_http_passed = TRUE) ) -- Exclude stores that already have an active task AND (wt.dispensary_id IS NULL OR wt.dispensary_id NOT IN ( SELECT dispensary_id FROM worker_tasks WHERE status IN ('claimed', 'running') AND dispensary_id IS NOT NULL )) ORDER BY wt.priority DESC, wt.created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING * INTO claimed_task; -- If task claimed, increment session task count -- Note: Use claimed_task.id IS NOT NULL (not claimed_task IS NOT NULL) -- PostgreSQL composite type NULL check quirk IF claimed_task.id IS NOT NULL THEN UPDATE worker_registry SET session_task_count = session_task_count + 1 WHERE worker_id = p_worker_id; END IF; RETURN claimed_task; END; $$ LANGUAGE plpgsql; -- ============================================================ -- FUNCTION: assign_worker_geo -- Assigns a geo session to a worker based on demand -- Returns the assigned state, or NULL if no tasks available -- ============================================================ CREATE OR REPLACE FUNCTION assign_worker_geo( p_worker_id VARCHAR(100) ) RETURNS VARCHAR(2) AS $$ DECLARE assigned_state VARCHAR(2); BEGIN -- Find state with highest demand (pending tasks) and lowest coverage (workers) SELECT d.state INTO assigned_state FROM dispensaries d JOIN worker_tasks wt ON wt.dispensary_id = d.id LEFT JOIN worker_registry wr ON wr.current_state = d.state AND wr.status = 'active' AND wr.geo_session_started_at > NOW() - INTERVAL '60 minutes' WHERE wt.status = 'pending' AND d.platform_dispensary_id IS NOT NULL GROUP BY d.state ORDER BY COUNT(wt.id) DESC, -- Most pending tasks first COUNT(DISTINCT wr.worker_id) ASC -- Fewest workers second LIMIT 1; -- If no pending tasks anywhere, return NULL IF assigned_state IS NULL THEN RETURN NULL; END IF; -- Assign the state to this worker UPDATE worker_registry SET current_state = assigned_state, current_city = NULL, -- City assigned later if available geo_session_started_at = NOW(), session_task_count = 0 WHERE worker_id = p_worker_id; RETURN assigned_state; END; $$ LANGUAGE plpgsql; -- ============================================================ -- FUNCTION: check_worker_geo_session -- Returns info about worker's current geo session -- ============================================================ CREATE OR REPLACE FUNCTION check_worker_geo_session( p_worker_id VARCHAR(100) ) RETURNS TABLE ( current_state VARCHAR(2), current_city VARCHAR(100), session_valid BOOLEAN, session_tasks_remaining INT, session_minutes_remaining INT ) AS $$ BEGIN RETURN QUERY SELECT wr.current_state, wr.current_city, (wr.geo_session_started_at IS NOT NULL AND wr.geo_session_started_at > NOW() - INTERVAL '60 minutes') as session_valid, GREATEST(0, wr.session_max_tasks - wr.session_task_count) as session_tasks_remaining, GREATEST(0, EXTRACT(EPOCH FROM (wr.geo_session_started_at + INTERVAL '60 minutes' - NOW())) / 60)::INT as session_minutes_remaining FROM worker_registry wr WHERE wr.worker_id = p_worker_id; END; $$ LANGUAGE plpgsql; -- View for worker thinness per state -- Derives states from dispensaries table - no external states table dependency CREATE OR REPLACE VIEW worker_state_capacity AS WITH active_states AS ( -- Get unique states from dispensaries with valid platform IDs SELECT DISTINCT state as code FROM dispensaries WHERE state IS NOT NULL AND platform_dispensary_id IS NOT NULL ), pending_by_state AS ( SELECT d.state, COUNT(*) as count FROM worker_tasks t JOIN dispensaries d ON t.dispensary_id = d.id WHERE t.status = 'pending' AND d.state IS NOT NULL GROUP BY d.state ), workers_by_state AS ( SELECT current_state, COUNT(*) as count, SUM(GREATEST(0, session_max_tasks - session_task_count)) as remaining_capacity FROM worker_registry WHERE status IN ('active', 'idle') -- Include both active and idle workers AND preflight_http_status = 'passed' AND current_state IS NOT NULL AND geo_session_started_at > NOW() - INTERVAL '60 minutes' GROUP BY current_state ) SELECT s.code as state, s.code as state_name, -- Use code as name since we don't have a states lookup table COALESCE(p.count, 0) as pending_tasks, COALESCE(w.count, 0) as workers_on_state, COALESCE(w.remaining_capacity, 0) as remaining_capacity, CASE WHEN COALESCE(w.remaining_capacity, 0) = 0 AND COALESCE(p.count, 0) > 0 THEN 'no_coverage' WHEN COALESCE(w.remaining_capacity, 0) < COALESCE(p.count, 0) THEN 'thin' ELSE 'ok' END as status FROM active_states s LEFT JOIN pending_by_state p ON p.state = s.code LEFT JOIN workers_by_state w ON w.current_state = s.code ORDER BY COALESCE(p.count, 0) DESC;