Files
cannaiq/backend/migrations/108_worker_geo_sessions.sql
Kelly 88e590d026 feat: Worker geo sessions for state-based task assignment
Workers are now geo-locked to a specific state for their session:
- Session = 60 minutes OR 7 store visits (whichever comes first)
- Workers ONLY claim tasks matching their assigned state
- State assignment prioritizes: most pending tasks, fewest workers

Changes:
- Migration 108: geo session columns, claim_task with geo filter,
  assign_worker_geo(), check_worker_geo_session(), worker_state_capacity view
- task-worker.ts: ensureGeoSession() method before task claiming
- worker-registry.ts: /state-capacity and /geo-sessions API endpoints
- WorkersDashboard: Show qualified icon + geo state in Preflight column

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-13 16:00:09 -07:00

232 lines
8.1 KiB
PL/PgSQL

-- Migration: 108_worker_geo_sessions.sql
-- Description: Add geo session tracking to worker_registry for state-based task assignment
-- Created: 2025-12-13
-- Worker geo session columns
-- Worker qualifies with a geo (state/city), then only claims tasks matching that geo
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS current_state VARCHAR(2);
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS current_city VARCHAR(100);
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS geo_session_started_at TIMESTAMPTZ;
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS session_task_count INT DEFAULT 0;
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS session_max_tasks INT DEFAULT 7;
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS proxy_geo VARCHAR(100);
-- Comments
COMMENT ON COLUMN worker_registry.current_state IS 'Worker''s current geo assignment (US state code, e.g., AZ)';
COMMENT ON COLUMN worker_registry.current_city IS 'Worker''s current city assignment (optional, e.g., phoenix)';
COMMENT ON COLUMN worker_registry.geo_session_started_at IS 'When worker''s current geo session started';
COMMENT ON COLUMN worker_registry.session_task_count IS 'Number of tasks completed in current geo session';
COMMENT ON COLUMN worker_registry.session_max_tasks IS 'Max tasks per geo session before re-qualification (default 7)';
COMMENT ON COLUMN worker_registry.proxy_geo IS 'Geo target string used for proxy (e.g., "arizona" or "phoenix, arizona")';
-- Index for finding workers by state
CREATE INDEX IF NOT EXISTS idx_worker_registry_current_state
ON worker_registry(current_state)
WHERE current_state IS NOT NULL;
-- ============================================================
-- UPDATED claim_task FUNCTION
-- Now filters by worker's geo session state
-- ============================================================
CREATE OR REPLACE FUNCTION claim_task(
p_role VARCHAR(50),
p_worker_id VARCHAR(100),
p_curl_passed BOOLEAN DEFAULT TRUE,
p_http_passed BOOLEAN DEFAULT FALSE
) RETURNS worker_tasks AS $$
DECLARE
claimed_task worker_tasks;
worker_state VARCHAR(2);
session_valid BOOLEAN;
session_tasks INT;
max_tasks INT;
BEGIN
-- Get worker's current geo session info
SELECT
current_state,
session_task_count,
session_max_tasks,
(geo_session_started_at IS NOT NULL AND geo_session_started_at > NOW() - INTERVAL '60 minutes')
INTO worker_state, session_tasks, max_tasks, session_valid
FROM worker_registry
WHERE worker_id = p_worker_id;
-- If no valid geo session, or session exhausted, worker can't claim tasks
-- Worker must re-qualify first
IF worker_state IS NULL OR NOT session_valid OR session_tasks >= COALESCE(max_tasks, 7) THEN
RETURN NULL;
END IF;
-- Claim task matching worker's state
UPDATE worker_tasks
SET
status = 'claimed',
worker_id = p_worker_id,
claimed_at = NOW(),
updated_at = NOW()
WHERE id = (
SELECT wt.id FROM worker_tasks wt
JOIN dispensaries d ON wt.dispensary_id = d.id
WHERE wt.role = p_role
AND wt.status = 'pending'
AND (wt.scheduled_for IS NULL OR wt.scheduled_for <= NOW())
-- GEO FILTER: Task's dispensary must match worker's state
AND d.state = worker_state
-- Method compatibility: worker must have passed the required preflight
AND (
wt.method IS NULL -- No preference, any worker can claim
OR (wt.method = 'curl' AND p_curl_passed = TRUE)
OR (wt.method = 'http' AND p_http_passed = TRUE)
)
-- Exclude stores that already have an active task
AND (wt.dispensary_id IS NULL OR wt.dispensary_id NOT IN (
SELECT dispensary_id FROM worker_tasks
WHERE status IN ('claimed', 'running')
AND dispensary_id IS NOT NULL
))
ORDER BY wt.priority DESC, wt.created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING * INTO claimed_task;
-- If task claimed, increment session task count
-- Note: Use claimed_task.id IS NOT NULL (not claimed_task IS NOT NULL)
-- PostgreSQL composite type NULL check quirk
IF claimed_task.id IS NOT NULL THEN
UPDATE worker_registry
SET session_task_count = session_task_count + 1
WHERE worker_id = p_worker_id;
END IF;
RETURN claimed_task;
END;
$$ LANGUAGE plpgsql;
-- ============================================================
-- FUNCTION: assign_worker_geo
-- Assigns a geo session to a worker based on demand
-- Returns the assigned state, or NULL if no tasks available
-- ============================================================
CREATE OR REPLACE FUNCTION assign_worker_geo(
p_worker_id VARCHAR(100)
) RETURNS VARCHAR(2) AS $$
DECLARE
assigned_state VARCHAR(2);
BEGIN
-- Find state with highest demand (pending tasks) and lowest coverage (workers)
SELECT d.state INTO assigned_state
FROM dispensaries d
JOIN worker_tasks wt ON wt.dispensary_id = d.id
LEFT JOIN worker_registry wr ON wr.current_state = d.state
AND wr.status = 'active'
AND wr.geo_session_started_at > NOW() - INTERVAL '60 minutes'
WHERE wt.status = 'pending'
AND d.platform_dispensary_id IS NOT NULL
GROUP BY d.state
ORDER BY
COUNT(wt.id) DESC, -- Most pending tasks first
COUNT(DISTINCT wr.worker_id) ASC -- Fewest workers second
LIMIT 1;
-- If no pending tasks anywhere, return NULL
IF assigned_state IS NULL THEN
RETURN NULL;
END IF;
-- Assign the state to this worker
UPDATE worker_registry
SET
current_state = assigned_state,
current_city = NULL, -- City assigned later if available
geo_session_started_at = NOW(),
session_task_count = 0
WHERE worker_id = p_worker_id;
RETURN assigned_state;
END;
$$ LANGUAGE plpgsql;
-- ============================================================
-- FUNCTION: check_worker_geo_session
-- Returns info about worker's current geo session
-- ============================================================
CREATE OR REPLACE FUNCTION check_worker_geo_session(
p_worker_id VARCHAR(100)
) RETURNS TABLE (
current_state VARCHAR(2),
current_city VARCHAR(100),
session_valid BOOLEAN,
session_tasks_remaining INT,
session_minutes_remaining INT
) AS $$
BEGIN
RETURN QUERY
SELECT
wr.current_state,
wr.current_city,
(wr.geo_session_started_at IS NOT NULL AND wr.geo_session_started_at > NOW() - INTERVAL '60 minutes') as session_valid,
GREATEST(0, wr.session_max_tasks - wr.session_task_count) as session_tasks_remaining,
GREATEST(0, EXTRACT(EPOCH FROM (wr.geo_session_started_at + INTERVAL '60 minutes' - NOW())) / 60)::INT as session_minutes_remaining
FROM worker_registry wr
WHERE wr.worker_id = p_worker_id;
END;
$$ LANGUAGE plpgsql;
-- View for worker thinness per state
-- Derives states from dispensaries table - no external states table dependency
CREATE OR REPLACE VIEW worker_state_capacity AS
WITH active_states AS (
-- Get unique states from dispensaries with valid platform IDs
SELECT DISTINCT state as code
FROM dispensaries
WHERE state IS NOT NULL
AND platform_dispensary_id IS NOT NULL
),
pending_by_state AS (
SELECT d.state, COUNT(*) as count
FROM worker_tasks t
JOIN dispensaries d ON t.dispensary_id = d.id
WHERE t.status = 'pending'
AND d.state IS NOT NULL
GROUP BY d.state
),
workers_by_state AS (
SELECT
current_state,
COUNT(*) as count,
SUM(GREATEST(0, session_max_tasks - session_task_count)) as remaining_capacity
FROM worker_registry
WHERE status IN ('active', 'idle') -- Include both active and idle workers
AND preflight_http_status = 'passed'
AND current_state IS NOT NULL
AND geo_session_started_at > NOW() - INTERVAL '60 minutes'
GROUP BY current_state
)
SELECT
s.code as state,
s.code as state_name, -- Use code as name since we don't have a states lookup table
COALESCE(p.count, 0) as pending_tasks,
COALESCE(w.count, 0) as workers_on_state,
COALESCE(w.remaining_capacity, 0) as remaining_capacity,
CASE
WHEN COALESCE(w.remaining_capacity, 0) = 0 AND COALESCE(p.count, 0) > 0 THEN 'no_coverage'
WHEN COALESCE(w.remaining_capacity, 0) < COALESCE(p.count, 0) THEN 'thin'
ELSE 'ok'
END as status
FROM active_states s
LEFT JOIN pending_by_state p ON p.state = s.code
LEFT JOIN workers_by_state w ON w.current_state = s.code
ORDER BY COALESCE(p.count, 0) DESC;