-- Migration 112: Worker Session Pool -- Tracks IP/fingerprint sessions with exclusive locks and cooldowns -- Each worker claims up to 6 tasks, uses one IP/fingerprint for those tasks, -- then retires the session (8hr cooldown before IP can be reused) -- Drop old identity pool tables if they exist (replacing with simpler session model) DROP TABLE IF EXISTS worker_identity_claims CASCADE; DROP TABLE IF EXISTS worker_identities CASCADE; -- Worker sessions: tracks active and cooling down IP/fingerprint pairs CREATE TABLE IF NOT EXISTS worker_sessions ( id SERIAL PRIMARY KEY, -- IP and fingerprint for this session ip_address VARCHAR(45) NOT NULL, fingerprint_hash VARCHAR(64) NOT NULL, fingerprint_data JSONB, -- Geo this session is locked to state_code VARCHAR(2) NOT NULL, city VARCHAR(100), -- Ownership worker_id VARCHAR(255), -- NULL if in cooldown -- Status: 'active' (locked to worker), 'cooldown' (8hr wait), 'available' status VARCHAR(20) NOT NULL DEFAULT 'available', -- Task tracking tasks_claimed INTEGER NOT NULL DEFAULT 0, tasks_completed INTEGER NOT NULL DEFAULT 0, tasks_failed INTEGER NOT NULL DEFAULT 0, max_tasks INTEGER NOT NULL DEFAULT 6, -- Timestamps created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), locked_at TIMESTAMPTZ, -- When worker locked this session retired_at TIMESTAMPTZ, -- When session was retired (cooldown starts) cooldown_until TIMESTAMPTZ, -- When session becomes available again -- Constraints CONSTRAINT valid_status CHECK (status IN ('active', 'cooldown', 'available')) ); -- Indexes for fast lookups CREATE INDEX IF NOT EXISTS idx_worker_sessions_ip ON worker_sessions(ip_address); CREATE INDEX IF NOT EXISTS idx_worker_sessions_status ON worker_sessions(status); CREATE INDEX IF NOT EXISTS idx_worker_sessions_worker ON worker_sessions(worker_id) WHERE worker_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_worker_sessions_geo ON worker_sessions(state_code, city); CREATE INDEX IF NOT EXISTS idx_worker_sessions_cooldown ON worker_sessions(cooldown_until) WHERE status = 'cooldown'; -- Unique constraint: only one active session per IP CREATE UNIQUE INDEX IF NOT EXISTS idx_worker_sessions_active_ip ON worker_sessions(ip_address) WHERE status = 'active'; -- Function: Check if IP is available (not active, not in cooldown) CREATE OR REPLACE FUNCTION is_ip_available(check_ip VARCHAR(45)) RETURNS BOOLEAN AS $$ BEGIN -- Check if any session has this IP and is either active or in cooldown RETURN NOT EXISTS ( SELECT 1 FROM worker_sessions WHERE ip_address = check_ip AND (status = 'active' OR (status = 'cooldown' AND cooldown_until > NOW())) ); END; $$ LANGUAGE plpgsql; -- Function: Lock a session to a worker -- Returns the session if successful, NULL if IP not available CREATE OR REPLACE FUNCTION lock_worker_session( p_worker_id VARCHAR(255), p_ip_address VARCHAR(45), p_state_code VARCHAR(2), p_city VARCHAR(100) DEFAULT NULL, p_fingerprint_hash VARCHAR(64) DEFAULT NULL, p_fingerprint_data JSONB DEFAULT NULL ) RETURNS worker_sessions AS $$ DECLARE v_session worker_sessions; BEGIN -- First check if IP is available IF NOT is_ip_available(p_ip_address) THEN RETURN NULL; END IF; -- Try to find an existing available session for this IP SELECT * INTO v_session FROM worker_sessions WHERE ip_address = p_ip_address AND status = 'available' FOR UPDATE SKIP LOCKED LIMIT 1; IF v_session.id IS NOT NULL THEN -- Reuse existing session UPDATE worker_sessions SET worker_id = p_worker_id, status = 'active', state_code = p_state_code, city = p_city, fingerprint_hash = COALESCE(p_fingerprint_hash, fingerprint_hash), fingerprint_data = COALESCE(p_fingerprint_data, fingerprint_data), tasks_claimed = 0, tasks_completed = 0, tasks_failed = 0, locked_at = NOW(), retired_at = NULL, cooldown_until = NULL WHERE id = v_session.id RETURNING * INTO v_session; ELSE -- Create new session INSERT INTO worker_sessions ( ip_address, fingerprint_hash, fingerprint_data, state_code, city, worker_id, status, locked_at ) VALUES ( p_ip_address, COALESCE(p_fingerprint_hash, md5(random()::text)), p_fingerprint_data, p_state_code, p_city, p_worker_id, 'active', NOW() ) RETURNING * INTO v_session; END IF; RETURN v_session; END; $$ LANGUAGE plpgsql; -- Function: Retire a session (start 8hr cooldown) CREATE OR REPLACE FUNCTION retire_worker_session(p_worker_id VARCHAR(255)) RETURNS BOOLEAN AS $$ DECLARE v_updated INTEGER; BEGIN UPDATE worker_sessions SET status = 'cooldown', worker_id = NULL, retired_at = NOW(), cooldown_until = NOW() + INTERVAL '8 hours' WHERE worker_id = p_worker_id AND status = 'active'; GET DIAGNOSTICS v_updated = ROW_COUNT; RETURN v_updated > 0; END; $$ LANGUAGE plpgsql; -- Function: Release expired cooldowns CREATE OR REPLACE FUNCTION release_expired_sessions() RETURNS INTEGER AS $$ DECLARE v_released INTEGER; BEGIN UPDATE worker_sessions SET status = 'available' WHERE status = 'cooldown' AND cooldown_until <= NOW(); GET DIAGNOSTICS v_released = ROW_COUNT; RETURN v_released; END; $$ LANGUAGE plpgsql; -- Function: Get session for worker CREATE OR REPLACE FUNCTION get_worker_session(p_worker_id VARCHAR(255)) RETURNS worker_sessions AS $$ SELECT * FROM worker_sessions WHERE worker_id = p_worker_id AND status = 'active' LIMIT 1; $$ LANGUAGE sql; -- Function: Increment task counters CREATE OR REPLACE FUNCTION session_task_completed(p_worker_id VARCHAR(255)) RETURNS BOOLEAN AS $$ BEGIN UPDATE worker_sessions SET tasks_completed = tasks_completed + 1 WHERE worker_id = p_worker_id AND status = 'active'; RETURN FOUND; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION session_task_failed(p_worker_id VARCHAR(255)) RETURNS BOOLEAN AS $$ BEGIN UPDATE worker_sessions SET tasks_failed = tasks_failed + 1 WHERE worker_id = p_worker_id AND status = 'active'; RETURN FOUND; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION session_task_claimed(p_worker_id VARCHAR(255), p_count INTEGER DEFAULT 1) RETURNS BOOLEAN AS $$ BEGIN UPDATE worker_sessions SET tasks_claimed = tasks_claimed + p_count WHERE worker_id = p_worker_id AND status = 'active'; RETURN FOUND; END; $$ LANGUAGE plpgsql; -- Scheduled job hint: Run release_expired_sessions() every 5 minutes COMMENT ON FUNCTION release_expired_sessions() IS 'Run periodically to release sessions from cooldown. Suggest: every 5 minutes.'; -- ============================================================================= -- ATOMIC TASK CLAIMING -- Worker claims up to 6 tasks for same geo in one transaction -- ============================================================================= -- Function: Claim up to N tasks for same geo -- Returns claimed tasks with dispensary geo info CREATE OR REPLACE FUNCTION claim_tasks_batch( p_worker_id VARCHAR(255), p_max_tasks INTEGER DEFAULT 6, p_role VARCHAR(50) DEFAULT NULL -- Optional role filter ) RETURNS TABLE ( task_id INTEGER, role VARCHAR(50), dispensary_id INTEGER, dispensary_name VARCHAR(255), city VARCHAR(100), state_code VARCHAR(2), platform VARCHAR(50), method VARCHAR(20) ) AS $$ DECLARE v_target_state VARCHAR(2); v_target_city VARCHAR(100); v_claimed_count INTEGER := 0; BEGIN -- First, find the geo with most pending tasks to target SELECT d.state, d.city INTO v_target_state, v_target_city FROM worker_tasks t JOIN dispensaries d ON t.dispensary_id = d.id WHERE t.status = 'pending' AND (p_role IS NULL OR t.role = p_role) GROUP BY d.state, d.city ORDER BY COUNT(*) DESC LIMIT 1; -- No pending tasks IF v_target_state IS NULL THEN RETURN; END IF; -- Claim up to p_max_tasks for this geo RETURN QUERY WITH claimed AS ( UPDATE worker_tasks t SET status = 'claimed', worker_id = p_worker_id, claimed_at = NOW() FROM ( SELECT t2.id FROM worker_tasks t2 JOIN dispensaries d ON t2.dispensary_id = d.id WHERE t2.status = 'pending' AND d.state = v_target_state AND (v_target_city IS NULL OR d.city = v_target_city) AND (p_role IS NULL OR t2.role = p_role) ORDER BY t2.priority DESC, t2.created_at ASC FOR UPDATE SKIP LOCKED LIMIT p_max_tasks ) sub WHERE t.id = sub.id RETURNING t.id, t.role, t.dispensary_id, t.method ) SELECT c.id as task_id, c.role, c.dispensary_id, d.name as dispensary_name, d.city, d.state as state_code, d.platform, c.method FROM claimed c JOIN dispensaries d ON c.dispensary_id = d.id; END; $$ LANGUAGE plpgsql; -- Function: Release claimed tasks back to pending (for failed worker or cleanup) CREATE OR REPLACE FUNCTION release_claimed_tasks(p_worker_id VARCHAR(255)) RETURNS INTEGER AS $$ DECLARE v_released INTEGER; BEGIN UPDATE worker_tasks SET status = 'pending', worker_id = NULL, claimed_at = NULL WHERE worker_id = p_worker_id AND status IN ('claimed', 'running'); GET DIAGNOSTICS v_released = ROW_COUNT; RETURN v_released; END; $$ LANGUAGE plpgsql; -- Function: Mark task as running CREATE OR REPLACE FUNCTION start_task(p_task_id INTEGER, p_worker_id VARCHAR(255)) RETURNS BOOLEAN AS $$ BEGIN UPDATE worker_tasks SET status = 'running', started_at = NOW() WHERE id = p_task_id AND worker_id = p_worker_id AND status = 'claimed'; RETURN FOUND; END; $$ LANGUAGE plpgsql; -- Function: Mark task as completed (leaves pool) CREATE OR REPLACE FUNCTION complete_task( p_task_id INTEGER, p_worker_id VARCHAR(255), p_result JSONB DEFAULT NULL ) RETURNS BOOLEAN AS $$ BEGIN UPDATE worker_tasks SET status = 'completed', completed_at = NOW(), result = p_result WHERE id = p_task_id AND worker_id = p_worker_id AND status = 'running'; RETURN FOUND; END; $$ LANGUAGE plpgsql; -- Function: Mark task as failed (returns to pending for retry) CREATE OR REPLACE FUNCTION fail_task( p_task_id INTEGER, p_worker_id VARCHAR(255), p_error TEXT DEFAULT NULL, p_max_retries INTEGER DEFAULT 3 ) RETURNS BOOLEAN AS $$ DECLARE v_retry_count INTEGER; BEGIN -- Get current retry count SELECT COALESCE(retry_count, 0) INTO v_retry_count FROM worker_tasks WHERE id = p_task_id; IF v_retry_count >= p_max_retries THEN -- Max retries exceeded - mark as permanently failed UPDATE worker_tasks SET status = 'failed', completed_at = NOW(), error_message = p_error, retry_count = v_retry_count + 1 WHERE id = p_task_id AND worker_id = p_worker_id; ELSE -- Return to pending for retry UPDATE worker_tasks SET status = 'pending', worker_id = NULL, claimed_at = NULL, started_at = NULL, error_message = p_error, retry_count = v_retry_count + 1 WHERE id = p_task_id AND worker_id = p_worker_id; END IF; RETURN FOUND; END; $$ LANGUAGE plpgsql; -- Add retry_count column if not exists DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'worker_tasks' AND column_name = 'retry_count' ) THEN ALTER TABLE worker_tasks ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0; END IF; IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'worker_tasks' AND column_name = 'claimed_at' ) THEN ALTER TABLE worker_tasks ADD COLUMN claimed_at TIMESTAMPTZ; END IF; END $$;