Files
cannaiq/backend/migrations/112_worker_session_pool.sql
Kelly 4cb4e1c502 feat(workers): Session pool system - claim tasks first, then get IP
New worker flow (enabled via USE_SESSION_POOL=true):
1. Worker claims up to 6 tasks for same geo (atomically marked claimed)
2. Gets Evomi proxy for that geo
3. Checks IP availability (not in use, not in 8hr cooldown)
4. Locks IP exclusively to this worker
5. Runs preflight with locked IP
6. Executes tasks (3 concurrent)
7. After 6 tasks, retires session (8hr IP cooldown)
8. Repeats with new IP

Key files:
- migrations/112_worker_session_pool.sql: Session table + atomic claiming
- services/worker-session.ts: Session lifecycle management
- tasks/task-worker.ts: sessionPoolMainLoop() with new flow
- services/crawl-rotator.ts: setFixedProxy() for session locking

Failed tasks return to pending for retry by another worker.
No two workers can share same IP simultaneously.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-13 22:54:45 -07:00

391 lines
11 KiB
PL/PgSQL

-- Migration 112: Worker Session Pool
-- Tracks IP/fingerprint sessions with exclusive locks and cooldowns
-- Each worker claims up to 6 tasks, uses one IP/fingerprint for those tasks,
-- then retires the session (8hr cooldown before IP can be reused)
-- Drop old identity pool tables if they exist (replacing with simpler session model)
DROP TABLE IF EXISTS worker_identity_claims CASCADE;
DROP TABLE IF EXISTS worker_identities CASCADE;
-- Worker sessions: tracks active and cooling down IP/fingerprint pairs
CREATE TABLE IF NOT EXISTS worker_sessions (
id SERIAL PRIMARY KEY,
-- IP and fingerprint for this session
ip_address VARCHAR(45) NOT NULL,
fingerprint_hash VARCHAR(64) NOT NULL,
fingerprint_data JSONB,
-- Geo this session is locked to
state_code VARCHAR(2) NOT NULL,
city VARCHAR(100),
-- Ownership
worker_id VARCHAR(255), -- NULL if in cooldown
-- Status: 'active' (locked to worker), 'cooldown' (8hr wait), 'available'
status VARCHAR(20) NOT NULL DEFAULT 'available',
-- Task tracking
tasks_claimed INTEGER NOT NULL DEFAULT 0,
tasks_completed INTEGER NOT NULL DEFAULT 0,
tasks_failed INTEGER NOT NULL DEFAULT 0,
max_tasks INTEGER NOT NULL DEFAULT 6,
-- Timestamps
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
locked_at TIMESTAMPTZ, -- When worker locked this session
retired_at TIMESTAMPTZ, -- When session was retired (cooldown starts)
cooldown_until TIMESTAMPTZ, -- When session becomes available again
-- Constraints
CONSTRAINT valid_status CHECK (status IN ('active', 'cooldown', 'available'))
);
-- Indexes for fast lookups
CREATE INDEX IF NOT EXISTS idx_worker_sessions_ip ON worker_sessions(ip_address);
CREATE INDEX IF NOT EXISTS idx_worker_sessions_status ON worker_sessions(status);
CREATE INDEX IF NOT EXISTS idx_worker_sessions_worker ON worker_sessions(worker_id) WHERE worker_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_worker_sessions_geo ON worker_sessions(state_code, city);
CREATE INDEX IF NOT EXISTS idx_worker_sessions_cooldown ON worker_sessions(cooldown_until) WHERE status = 'cooldown';
-- Unique constraint: only one active session per IP
CREATE UNIQUE INDEX IF NOT EXISTS idx_worker_sessions_active_ip
ON worker_sessions(ip_address)
WHERE status = 'active';
-- Function: Check if IP is available (not active, not in cooldown)
CREATE OR REPLACE FUNCTION is_ip_available(check_ip VARCHAR(45))
RETURNS BOOLEAN AS $$
BEGIN
-- Check if any session has this IP and is either active or in cooldown
RETURN NOT EXISTS (
SELECT 1 FROM worker_sessions
WHERE ip_address = check_ip
AND (status = 'active' OR (status = 'cooldown' AND cooldown_until > NOW()))
);
END;
$$ LANGUAGE plpgsql;
-- Function: Lock a session to a worker
-- Returns the session if successful, NULL if IP not available
CREATE OR REPLACE FUNCTION lock_worker_session(
p_worker_id VARCHAR(255),
p_ip_address VARCHAR(45),
p_state_code VARCHAR(2),
p_city VARCHAR(100) DEFAULT NULL,
p_fingerprint_hash VARCHAR(64) DEFAULT NULL,
p_fingerprint_data JSONB DEFAULT NULL
) RETURNS worker_sessions AS $$
DECLARE
v_session worker_sessions;
BEGIN
-- First check if IP is available
IF NOT is_ip_available(p_ip_address) THEN
RETURN NULL;
END IF;
-- Try to find an existing available session for this IP
SELECT * INTO v_session
FROM worker_sessions
WHERE ip_address = p_ip_address
AND status = 'available'
FOR UPDATE SKIP LOCKED
LIMIT 1;
IF v_session.id IS NOT NULL THEN
-- Reuse existing session
UPDATE worker_sessions SET
worker_id = p_worker_id,
status = 'active',
state_code = p_state_code,
city = p_city,
fingerprint_hash = COALESCE(p_fingerprint_hash, fingerprint_hash),
fingerprint_data = COALESCE(p_fingerprint_data, fingerprint_data),
tasks_claimed = 0,
tasks_completed = 0,
tasks_failed = 0,
locked_at = NOW(),
retired_at = NULL,
cooldown_until = NULL
WHERE id = v_session.id
RETURNING * INTO v_session;
ELSE
-- Create new session
INSERT INTO worker_sessions (
ip_address, fingerprint_hash, fingerprint_data,
state_code, city, worker_id, status, locked_at
) VALUES (
p_ip_address, COALESCE(p_fingerprint_hash, md5(random()::text)),
p_fingerprint_data, p_state_code, p_city, p_worker_id, 'active', NOW()
)
RETURNING * INTO v_session;
END IF;
RETURN v_session;
END;
$$ LANGUAGE plpgsql;
-- Function: Retire a session (start 8hr cooldown)
CREATE OR REPLACE FUNCTION retire_worker_session(p_worker_id VARCHAR(255))
RETURNS BOOLEAN AS $$
DECLARE
v_updated INTEGER;
BEGIN
UPDATE worker_sessions SET
status = 'cooldown',
worker_id = NULL,
retired_at = NOW(),
cooldown_until = NOW() + INTERVAL '8 hours'
WHERE worker_id = p_worker_id
AND status = 'active';
GET DIAGNOSTICS v_updated = ROW_COUNT;
RETURN v_updated > 0;
END;
$$ LANGUAGE plpgsql;
-- Function: Release expired cooldowns
CREATE OR REPLACE FUNCTION release_expired_sessions()
RETURNS INTEGER AS $$
DECLARE
v_released INTEGER;
BEGIN
UPDATE worker_sessions SET
status = 'available'
WHERE status = 'cooldown'
AND cooldown_until <= NOW();
GET DIAGNOSTICS v_released = ROW_COUNT;
RETURN v_released;
END;
$$ LANGUAGE plpgsql;
-- Function: Get session for worker
CREATE OR REPLACE FUNCTION get_worker_session(p_worker_id VARCHAR(255))
RETURNS worker_sessions AS $$
SELECT * FROM worker_sessions
WHERE worker_id = p_worker_id AND status = 'active'
LIMIT 1;
$$ LANGUAGE sql;
-- Function: Increment task counters
CREATE OR REPLACE FUNCTION session_task_completed(p_worker_id VARCHAR(255))
RETURNS BOOLEAN AS $$
BEGIN
UPDATE worker_sessions SET
tasks_completed = tasks_completed + 1
WHERE worker_id = p_worker_id AND status = 'active';
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION session_task_failed(p_worker_id VARCHAR(255))
RETURNS BOOLEAN AS $$
BEGIN
UPDATE worker_sessions SET
tasks_failed = tasks_failed + 1
WHERE worker_id = p_worker_id AND status = 'active';
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION session_task_claimed(p_worker_id VARCHAR(255), p_count INTEGER DEFAULT 1)
RETURNS BOOLEAN AS $$
BEGIN
UPDATE worker_sessions SET
tasks_claimed = tasks_claimed + p_count
WHERE worker_id = p_worker_id AND status = 'active';
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
-- Scheduled job hint: Run release_expired_sessions() every 5 minutes
COMMENT ON FUNCTION release_expired_sessions() IS
'Run periodically to release sessions from cooldown. Suggest: every 5 minutes.';
-- =============================================================================
-- ATOMIC TASK CLAIMING
-- Worker claims up to 6 tasks for same geo in one transaction
-- =============================================================================
-- Function: Claim up to N tasks for same geo
-- Returns claimed tasks with dispensary geo info
CREATE OR REPLACE FUNCTION claim_tasks_batch(
p_worker_id VARCHAR(255),
p_max_tasks INTEGER DEFAULT 6,
p_role VARCHAR(50) DEFAULT NULL -- Optional role filter
) RETURNS TABLE (
task_id INTEGER,
role VARCHAR(50),
dispensary_id INTEGER,
dispensary_name VARCHAR(255),
city VARCHAR(100),
state_code VARCHAR(2),
platform VARCHAR(50),
method VARCHAR(20)
) AS $$
DECLARE
v_target_state VARCHAR(2);
v_target_city VARCHAR(100);
v_claimed_count INTEGER := 0;
BEGIN
-- First, find the geo with most pending tasks to target
SELECT d.state, d.city INTO v_target_state, v_target_city
FROM worker_tasks t
JOIN dispensaries d ON t.dispensary_id = d.id
WHERE t.status = 'pending'
AND (p_role IS NULL OR t.role = p_role)
GROUP BY d.state, d.city
ORDER BY COUNT(*) DESC
LIMIT 1;
-- No pending tasks
IF v_target_state IS NULL THEN
RETURN;
END IF;
-- Claim up to p_max_tasks for this geo
RETURN QUERY
WITH claimed AS (
UPDATE worker_tasks t SET
status = 'claimed',
worker_id = p_worker_id,
claimed_at = NOW()
FROM (
SELECT t2.id
FROM worker_tasks t2
JOIN dispensaries d ON t2.dispensary_id = d.id
WHERE t2.status = 'pending'
AND d.state = v_target_state
AND (v_target_city IS NULL OR d.city = v_target_city)
AND (p_role IS NULL OR t2.role = p_role)
ORDER BY t2.priority DESC, t2.created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT p_max_tasks
) sub
WHERE t.id = sub.id
RETURNING t.id, t.role, t.dispensary_id, t.method
)
SELECT
c.id as task_id,
c.role,
c.dispensary_id,
d.name as dispensary_name,
d.city,
d.state as state_code,
d.platform,
c.method
FROM claimed c
JOIN dispensaries d ON c.dispensary_id = d.id;
END;
$$ LANGUAGE plpgsql;
-- Function: Release claimed tasks back to pending (for failed worker or cleanup)
CREATE OR REPLACE FUNCTION release_claimed_tasks(p_worker_id VARCHAR(255))
RETURNS INTEGER AS $$
DECLARE
v_released INTEGER;
BEGIN
UPDATE worker_tasks SET
status = 'pending',
worker_id = NULL,
claimed_at = NULL
WHERE worker_id = p_worker_id
AND status IN ('claimed', 'running');
GET DIAGNOSTICS v_released = ROW_COUNT;
RETURN v_released;
END;
$$ LANGUAGE plpgsql;
-- Function: Mark task as running
CREATE OR REPLACE FUNCTION start_task(p_task_id INTEGER, p_worker_id VARCHAR(255))
RETURNS BOOLEAN AS $$
BEGIN
UPDATE worker_tasks SET
status = 'running',
started_at = NOW()
WHERE id = p_task_id
AND worker_id = p_worker_id
AND status = 'claimed';
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
-- Function: Mark task as completed (leaves pool)
CREATE OR REPLACE FUNCTION complete_task(
p_task_id INTEGER,
p_worker_id VARCHAR(255),
p_result JSONB DEFAULT NULL
) RETURNS BOOLEAN AS $$
BEGIN
UPDATE worker_tasks SET
status = 'completed',
completed_at = NOW(),
result = p_result
WHERE id = p_task_id
AND worker_id = p_worker_id
AND status = 'running';
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
-- Function: Mark task as failed (returns to pending for retry)
CREATE OR REPLACE FUNCTION fail_task(
p_task_id INTEGER,
p_worker_id VARCHAR(255),
p_error TEXT DEFAULT NULL,
p_max_retries INTEGER DEFAULT 3
) RETURNS BOOLEAN AS $$
DECLARE
v_retry_count INTEGER;
BEGIN
-- Get current retry count
SELECT COALESCE(retry_count, 0) INTO v_retry_count
FROM worker_tasks WHERE id = p_task_id;
IF v_retry_count >= p_max_retries THEN
-- Max retries exceeded - mark as permanently failed
UPDATE worker_tasks SET
status = 'failed',
completed_at = NOW(),
error_message = p_error,
retry_count = v_retry_count + 1
WHERE id = p_task_id
AND worker_id = p_worker_id;
ELSE
-- Return to pending for retry
UPDATE worker_tasks SET
status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
error_message = p_error,
retry_count = v_retry_count + 1
WHERE id = p_task_id
AND worker_id = p_worker_id;
END IF;
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
-- Add retry_count column if not exists
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'worker_tasks' AND column_name = 'retry_count'
) THEN
ALTER TABLE worker_tasks ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0;
END IF;
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'worker_tasks' AND column_name = 'claimed_at'
) THEN
ALTER TABLE worker_tasks ADD COLUMN claimed_at TIMESTAMPTZ;
END IF;
END $$;