New worker flow (enabled via USE_SESSION_POOL=true): 1. Worker claims up to 6 tasks for same geo (atomically marked claimed) 2. Gets Evomi proxy for that geo 3. Checks IP availability (not in use, not in 8hr cooldown) 4. Locks IP exclusively to this worker 5. Runs preflight with locked IP 6. Executes tasks (3 concurrent) 7. After 6 tasks, retires session (8hr IP cooldown) 8. Repeats with new IP Key files: - migrations/112_worker_session_pool.sql: Session table + atomic claiming - services/worker-session.ts: Session lifecycle management - tasks/task-worker.ts: sessionPoolMainLoop() with new flow - services/crawl-rotator.ts: setFixedProxy() for session locking Failed tasks return to pending for retry by another worker. No two workers can share same IP simultaneously. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
391 lines
11 KiB
PL/PgSQL
391 lines
11 KiB
PL/PgSQL
-- Migration 112: Worker Session Pool
|
|
-- Tracks IP/fingerprint sessions with exclusive locks and cooldowns
|
|
-- Each worker claims up to 6 tasks, uses one IP/fingerprint for those tasks,
|
|
-- then retires the session (8hr cooldown before IP can be reused)
|
|
|
|
-- Drop old identity pool tables if they exist (replacing with simpler session model)
|
|
DROP TABLE IF EXISTS worker_identity_claims CASCADE;
|
|
DROP TABLE IF EXISTS worker_identities CASCADE;
|
|
|
|
-- Worker sessions: tracks active and cooling down IP/fingerprint pairs
|
|
CREATE TABLE IF NOT EXISTS worker_sessions (
|
|
id SERIAL PRIMARY KEY,
|
|
|
|
-- IP and fingerprint for this session
|
|
ip_address VARCHAR(45) NOT NULL,
|
|
fingerprint_hash VARCHAR(64) NOT NULL,
|
|
fingerprint_data JSONB,
|
|
|
|
-- Geo this session is locked to
|
|
state_code VARCHAR(2) NOT NULL,
|
|
city VARCHAR(100),
|
|
|
|
-- Ownership
|
|
worker_id VARCHAR(255), -- NULL if in cooldown
|
|
|
|
-- Status: 'active' (locked to worker), 'cooldown' (8hr wait), 'available'
|
|
status VARCHAR(20) NOT NULL DEFAULT 'available',
|
|
|
|
-- Task tracking
|
|
tasks_claimed INTEGER NOT NULL DEFAULT 0,
|
|
tasks_completed INTEGER NOT NULL DEFAULT 0,
|
|
tasks_failed INTEGER NOT NULL DEFAULT 0,
|
|
max_tasks INTEGER NOT NULL DEFAULT 6,
|
|
|
|
-- Timestamps
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
locked_at TIMESTAMPTZ, -- When worker locked this session
|
|
retired_at TIMESTAMPTZ, -- When session was retired (cooldown starts)
|
|
cooldown_until TIMESTAMPTZ, -- When session becomes available again
|
|
|
|
-- Constraints
|
|
CONSTRAINT valid_status CHECK (status IN ('active', 'cooldown', 'available'))
|
|
);
|
|
|
|
-- Indexes for fast lookups
|
|
CREATE INDEX IF NOT EXISTS idx_worker_sessions_ip ON worker_sessions(ip_address);
|
|
CREATE INDEX IF NOT EXISTS idx_worker_sessions_status ON worker_sessions(status);
|
|
CREATE INDEX IF NOT EXISTS idx_worker_sessions_worker ON worker_sessions(worker_id) WHERE worker_id IS NOT NULL;
|
|
CREATE INDEX IF NOT EXISTS idx_worker_sessions_geo ON worker_sessions(state_code, city);
|
|
CREATE INDEX IF NOT EXISTS idx_worker_sessions_cooldown ON worker_sessions(cooldown_until) WHERE status = 'cooldown';
|
|
|
|
-- Unique constraint: only one active session per IP
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_worker_sessions_active_ip
|
|
ON worker_sessions(ip_address)
|
|
WHERE status = 'active';
|
|
|
|
-- Function: Check if IP is available (not active, not in cooldown)
|
|
CREATE OR REPLACE FUNCTION is_ip_available(check_ip VARCHAR(45))
|
|
RETURNS BOOLEAN AS $$
|
|
BEGIN
|
|
-- Check if any session has this IP and is either active or in cooldown
|
|
RETURN NOT EXISTS (
|
|
SELECT 1 FROM worker_sessions
|
|
WHERE ip_address = check_ip
|
|
AND (status = 'active' OR (status = 'cooldown' AND cooldown_until > NOW()))
|
|
);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function: Lock a session to a worker
|
|
-- Returns the session if successful, NULL if IP not available
|
|
CREATE OR REPLACE FUNCTION lock_worker_session(
|
|
p_worker_id VARCHAR(255),
|
|
p_ip_address VARCHAR(45),
|
|
p_state_code VARCHAR(2),
|
|
p_city VARCHAR(100) DEFAULT NULL,
|
|
p_fingerprint_hash VARCHAR(64) DEFAULT NULL,
|
|
p_fingerprint_data JSONB DEFAULT NULL
|
|
) RETURNS worker_sessions AS $$
|
|
DECLARE
|
|
v_session worker_sessions;
|
|
BEGIN
|
|
-- First check if IP is available
|
|
IF NOT is_ip_available(p_ip_address) THEN
|
|
RETURN NULL;
|
|
END IF;
|
|
|
|
-- Try to find an existing available session for this IP
|
|
SELECT * INTO v_session
|
|
FROM worker_sessions
|
|
WHERE ip_address = p_ip_address
|
|
AND status = 'available'
|
|
FOR UPDATE SKIP LOCKED
|
|
LIMIT 1;
|
|
|
|
IF v_session.id IS NOT NULL THEN
|
|
-- Reuse existing session
|
|
UPDATE worker_sessions SET
|
|
worker_id = p_worker_id,
|
|
status = 'active',
|
|
state_code = p_state_code,
|
|
city = p_city,
|
|
fingerprint_hash = COALESCE(p_fingerprint_hash, fingerprint_hash),
|
|
fingerprint_data = COALESCE(p_fingerprint_data, fingerprint_data),
|
|
tasks_claimed = 0,
|
|
tasks_completed = 0,
|
|
tasks_failed = 0,
|
|
locked_at = NOW(),
|
|
retired_at = NULL,
|
|
cooldown_until = NULL
|
|
WHERE id = v_session.id
|
|
RETURNING * INTO v_session;
|
|
ELSE
|
|
-- Create new session
|
|
INSERT INTO worker_sessions (
|
|
ip_address, fingerprint_hash, fingerprint_data,
|
|
state_code, city, worker_id, status, locked_at
|
|
) VALUES (
|
|
p_ip_address, COALESCE(p_fingerprint_hash, md5(random()::text)),
|
|
p_fingerprint_data, p_state_code, p_city, p_worker_id, 'active', NOW()
|
|
)
|
|
RETURNING * INTO v_session;
|
|
END IF;
|
|
|
|
RETURN v_session;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function: Retire a session (start 8hr cooldown)
|
|
CREATE OR REPLACE FUNCTION retire_worker_session(p_worker_id VARCHAR(255))
|
|
RETURNS BOOLEAN AS $$
|
|
DECLARE
|
|
v_updated INTEGER;
|
|
BEGIN
|
|
UPDATE worker_sessions SET
|
|
status = 'cooldown',
|
|
worker_id = NULL,
|
|
retired_at = NOW(),
|
|
cooldown_until = NOW() + INTERVAL '8 hours'
|
|
WHERE worker_id = p_worker_id
|
|
AND status = 'active';
|
|
|
|
GET DIAGNOSTICS v_updated = ROW_COUNT;
|
|
RETURN v_updated > 0;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function: Release expired cooldowns
|
|
CREATE OR REPLACE FUNCTION release_expired_sessions()
|
|
RETURNS INTEGER AS $$
|
|
DECLARE
|
|
v_released INTEGER;
|
|
BEGIN
|
|
UPDATE worker_sessions SET
|
|
status = 'available'
|
|
WHERE status = 'cooldown'
|
|
AND cooldown_until <= NOW();
|
|
|
|
GET DIAGNOSTICS v_released = ROW_COUNT;
|
|
RETURN v_released;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function: Get session for worker
|
|
CREATE OR REPLACE FUNCTION get_worker_session(p_worker_id VARCHAR(255))
|
|
RETURNS worker_sessions AS $$
|
|
SELECT * FROM worker_sessions
|
|
WHERE worker_id = p_worker_id AND status = 'active'
|
|
LIMIT 1;
|
|
$$ LANGUAGE sql;
|
|
|
|
-- Function: Increment task counters
|
|
CREATE OR REPLACE FUNCTION session_task_completed(p_worker_id VARCHAR(255))
|
|
RETURNS BOOLEAN AS $$
|
|
BEGIN
|
|
UPDATE worker_sessions SET
|
|
tasks_completed = tasks_completed + 1
|
|
WHERE worker_id = p_worker_id AND status = 'active';
|
|
RETURN FOUND;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
CREATE OR REPLACE FUNCTION session_task_failed(p_worker_id VARCHAR(255))
|
|
RETURNS BOOLEAN AS $$
|
|
BEGIN
|
|
UPDATE worker_sessions SET
|
|
tasks_failed = tasks_failed + 1
|
|
WHERE worker_id = p_worker_id AND status = 'active';
|
|
RETURN FOUND;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
CREATE OR REPLACE FUNCTION session_task_claimed(p_worker_id VARCHAR(255), p_count INTEGER DEFAULT 1)
|
|
RETURNS BOOLEAN AS $$
|
|
BEGIN
|
|
UPDATE worker_sessions SET
|
|
tasks_claimed = tasks_claimed + p_count
|
|
WHERE worker_id = p_worker_id AND status = 'active';
|
|
RETURN FOUND;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Scheduled job hint: Run release_expired_sessions() every 5 minutes
|
|
COMMENT ON FUNCTION release_expired_sessions() IS
|
|
'Run periodically to release sessions from cooldown. Suggest: every 5 minutes.';
|
|
|
|
-- =============================================================================
|
|
-- ATOMIC TASK CLAIMING
|
|
-- Worker claims up to 6 tasks for same geo in one transaction
|
|
-- =============================================================================
|
|
|
|
-- Function: Claim up to N tasks for same geo
|
|
-- Returns claimed tasks with dispensary geo info
|
|
CREATE OR REPLACE FUNCTION claim_tasks_batch(
|
|
p_worker_id VARCHAR(255),
|
|
p_max_tasks INTEGER DEFAULT 6,
|
|
p_role VARCHAR(50) DEFAULT NULL -- Optional role filter
|
|
) RETURNS TABLE (
|
|
task_id INTEGER,
|
|
role VARCHAR(50),
|
|
dispensary_id INTEGER,
|
|
dispensary_name VARCHAR(255),
|
|
city VARCHAR(100),
|
|
state_code VARCHAR(2),
|
|
platform VARCHAR(50),
|
|
method VARCHAR(20)
|
|
) AS $$
|
|
DECLARE
|
|
v_target_state VARCHAR(2);
|
|
v_target_city VARCHAR(100);
|
|
v_claimed_count INTEGER := 0;
|
|
BEGIN
|
|
-- First, find the geo with most pending tasks to target
|
|
SELECT d.state, d.city INTO v_target_state, v_target_city
|
|
FROM worker_tasks t
|
|
JOIN dispensaries d ON t.dispensary_id = d.id
|
|
WHERE t.status = 'pending'
|
|
AND (p_role IS NULL OR t.role = p_role)
|
|
GROUP BY d.state, d.city
|
|
ORDER BY COUNT(*) DESC
|
|
LIMIT 1;
|
|
|
|
-- No pending tasks
|
|
IF v_target_state IS NULL THEN
|
|
RETURN;
|
|
END IF;
|
|
|
|
-- Claim up to p_max_tasks for this geo
|
|
RETURN QUERY
|
|
WITH claimed AS (
|
|
UPDATE worker_tasks t SET
|
|
status = 'claimed',
|
|
worker_id = p_worker_id,
|
|
claimed_at = NOW()
|
|
FROM (
|
|
SELECT t2.id
|
|
FROM worker_tasks t2
|
|
JOIN dispensaries d ON t2.dispensary_id = d.id
|
|
WHERE t2.status = 'pending'
|
|
AND d.state = v_target_state
|
|
AND (v_target_city IS NULL OR d.city = v_target_city)
|
|
AND (p_role IS NULL OR t2.role = p_role)
|
|
ORDER BY t2.priority DESC, t2.created_at ASC
|
|
FOR UPDATE SKIP LOCKED
|
|
LIMIT p_max_tasks
|
|
) sub
|
|
WHERE t.id = sub.id
|
|
RETURNING t.id, t.role, t.dispensary_id, t.method
|
|
)
|
|
SELECT
|
|
c.id as task_id,
|
|
c.role,
|
|
c.dispensary_id,
|
|
d.name as dispensary_name,
|
|
d.city,
|
|
d.state as state_code,
|
|
d.platform,
|
|
c.method
|
|
FROM claimed c
|
|
JOIN dispensaries d ON c.dispensary_id = d.id;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function: Release claimed tasks back to pending (for failed worker or cleanup)
|
|
CREATE OR REPLACE FUNCTION release_claimed_tasks(p_worker_id VARCHAR(255))
|
|
RETURNS INTEGER AS $$
|
|
DECLARE
|
|
v_released INTEGER;
|
|
BEGIN
|
|
UPDATE worker_tasks SET
|
|
status = 'pending',
|
|
worker_id = NULL,
|
|
claimed_at = NULL
|
|
WHERE worker_id = p_worker_id
|
|
AND status IN ('claimed', 'running');
|
|
|
|
GET DIAGNOSTICS v_released = ROW_COUNT;
|
|
RETURN v_released;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function: Mark task as running
|
|
CREATE OR REPLACE FUNCTION start_task(p_task_id INTEGER, p_worker_id VARCHAR(255))
|
|
RETURNS BOOLEAN AS $$
|
|
BEGIN
|
|
UPDATE worker_tasks SET
|
|
status = 'running',
|
|
started_at = NOW()
|
|
WHERE id = p_task_id
|
|
AND worker_id = p_worker_id
|
|
AND status = 'claimed';
|
|
RETURN FOUND;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function: Mark task as completed (leaves pool)
|
|
CREATE OR REPLACE FUNCTION complete_task(
|
|
p_task_id INTEGER,
|
|
p_worker_id VARCHAR(255),
|
|
p_result JSONB DEFAULT NULL
|
|
) RETURNS BOOLEAN AS $$
|
|
BEGIN
|
|
UPDATE worker_tasks SET
|
|
status = 'completed',
|
|
completed_at = NOW(),
|
|
result = p_result
|
|
WHERE id = p_task_id
|
|
AND worker_id = p_worker_id
|
|
AND status = 'running';
|
|
RETURN FOUND;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function: Mark task as failed (returns to pending for retry)
|
|
CREATE OR REPLACE FUNCTION fail_task(
|
|
p_task_id INTEGER,
|
|
p_worker_id VARCHAR(255),
|
|
p_error TEXT DEFAULT NULL,
|
|
p_max_retries INTEGER DEFAULT 3
|
|
) RETURNS BOOLEAN AS $$
|
|
DECLARE
|
|
v_retry_count INTEGER;
|
|
BEGIN
|
|
-- Get current retry count
|
|
SELECT COALESCE(retry_count, 0) INTO v_retry_count
|
|
FROM worker_tasks WHERE id = p_task_id;
|
|
|
|
IF v_retry_count >= p_max_retries THEN
|
|
-- Max retries exceeded - mark as permanently failed
|
|
UPDATE worker_tasks SET
|
|
status = 'failed',
|
|
completed_at = NOW(),
|
|
error_message = p_error,
|
|
retry_count = v_retry_count + 1
|
|
WHERE id = p_task_id
|
|
AND worker_id = p_worker_id;
|
|
ELSE
|
|
-- Return to pending for retry
|
|
UPDATE worker_tasks SET
|
|
status = 'pending',
|
|
worker_id = NULL,
|
|
claimed_at = NULL,
|
|
started_at = NULL,
|
|
error_message = p_error,
|
|
retry_count = v_retry_count + 1
|
|
WHERE id = p_task_id
|
|
AND worker_id = p_worker_id;
|
|
END IF;
|
|
|
|
RETURN FOUND;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Add retry_count column if not exists
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_name = 'worker_tasks' AND column_name = 'retry_count'
|
|
) THEN
|
|
ALTER TABLE worker_tasks ADD COLUMN retry_count INTEGER NOT NULL DEFAULT 0;
|
|
END IF;
|
|
|
|
IF NOT EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_name = 'worker_tasks' AND column_name = 'claimed_at'
|
|
) THEN
|
|
ALTER TABLE worker_tasks ADD COLUMN claimed_at TIMESTAMPTZ;
|
|
END IF;
|
|
END $$;
|