Files
cannaiq/backend/migrations/109_worker_identity_pool.sql
Kelly 83f629fec4 feat: Add identity pool for diverse IP/fingerprint rotation
- Add worker_identities table and metro_areas for city groupings
- Create IdentityPoolService for claiming/releasing identities
- Each identity used for 3-5 tasks, then 2-3 hour cooldown
- Integrate with task-worker via USE_IDENTITY_POOL feature flag
- Update puppeteer-preflight to accept custom proxy URLs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-13 18:46:58 -07:00

355 lines
12 KiB
PL/PgSQL

-- Migration: 109_worker_identity_pool.sql
-- Description: Identity pool for diverse IP/fingerprint rotation
-- Created: 2025-12-14
--
-- Workers claim identities (IP + fingerprint) from pool.
-- Each identity used for 3-5 tasks, then cools down 2-3 hours.
-- This creates natural browsing patterns - same person doesn't hit 20 stores.
-- ============================================================
-- IDENTITY POOL TABLE
-- ============================================================
CREATE TABLE IF NOT EXISTS worker_identities (
id SERIAL PRIMARY KEY,
-- Evomi session controls the IP
session_id VARCHAR(100) UNIQUE NOT NULL,
-- Detected IP from this session
ip_address INET,
-- Geo targeting
state_code VARCHAR(2) NOT NULL,
city VARCHAR(100), -- City-level targeting for diversity
-- Fingerprint data (UA, timezone, locale, device, etc.)
fingerprint JSONB NOT NULL,
-- Timestamps
created_at TIMESTAMPTZ DEFAULT NOW(),
last_used_at TIMESTAMPTZ,
cooldown_until TIMESTAMPTZ, -- Can't reuse until this time
-- Usage stats
total_tasks_completed INT DEFAULT 0,
total_sessions INT DEFAULT 1, -- How many times this identity has been used
-- Current state
is_active BOOLEAN DEFAULT FALSE, -- Currently claimed by a worker
active_worker_id VARCHAR(100), -- Which worker has it
-- Health tracking
consecutive_failures INT DEFAULT 0,
is_healthy BOOLEAN DEFAULT TRUE -- Set false if IP gets blocked
);
-- Indexes for efficient lookups
CREATE INDEX IF NOT EXISTS idx_worker_identities_state_city
ON worker_identities(state_code, city);
CREATE INDEX IF NOT EXISTS idx_worker_identities_available
ON worker_identities(state_code, is_active, cooldown_until)
WHERE is_healthy = TRUE;
CREATE INDEX IF NOT EXISTS idx_worker_identities_cooldown
ON worker_identities(cooldown_until)
WHERE is_healthy = TRUE AND is_active = FALSE;
-- ============================================================
-- METRO AREA MAPPING
-- For fallback when exact city not available
-- ============================================================
CREATE TABLE IF NOT EXISTS metro_areas (
id SERIAL PRIMARY KEY,
metro_name VARCHAR(100) NOT NULL,
state_code VARCHAR(2) NOT NULL,
city VARCHAR(100) NOT NULL,
is_primary BOOLEAN DEFAULT FALSE, -- Primary city of the metro
UNIQUE(state_code, city)
);
-- Phoenix Metro Area
INSERT INTO metro_areas (metro_name, state_code, city, is_primary) VALUES
('Phoenix Metro', 'AZ', 'Phoenix', TRUE),
('Phoenix Metro', 'AZ', 'Mesa', FALSE),
('Phoenix Metro', 'AZ', 'Glendale', FALSE),
('Phoenix Metro', 'AZ', 'Tempe', FALSE),
('Phoenix Metro', 'AZ', 'Scottsdale', FALSE),
('Phoenix Metro', 'AZ', 'Chandler', FALSE),
('Phoenix Metro', 'AZ', 'Peoria', FALSE),
('Phoenix Metro', 'AZ', 'El Mirage', FALSE),
('Phoenix Metro', 'AZ', 'Tolleson', FALSE),
('Phoenix Metro', 'AZ', 'Sun City', FALSE),
('Phoenix Metro', 'AZ', 'Apache Junction', FALSE),
('Phoenix Metro', 'AZ', 'Cave Creek', FALSE),
('Phoenix Metro', 'AZ', 'Gilbert', FALSE),
('Phoenix Metro', 'AZ', 'Surprise', FALSE),
('Phoenix Metro', 'AZ', 'Avondale', FALSE),
('Phoenix Metro', 'AZ', 'Goodyear', FALSE),
('Phoenix Metro', 'AZ', 'Buckeye', FALSE),
('Phoenix Metro', 'AZ', 'Queen Creek', FALSE)
ON CONFLICT (state_code, city) DO NOTHING;
-- Tucson Metro Area
INSERT INTO metro_areas (metro_name, state_code, city, is_primary) VALUES
('Tucson Metro', 'AZ', 'Tucson', TRUE),
('Tucson Metro', 'AZ', 'Oro Valley', FALSE),
('Tucson Metro', 'AZ', 'Marana', FALSE),
('Tucson Metro', 'AZ', 'Sahuarita', FALSE),
('Tucson Metro', 'AZ', 'South Tucson', FALSE)
ON CONFLICT (state_code, city) DO NOTHING;
-- Flagstaff Area
INSERT INTO metro_areas (metro_name, state_code, city, is_primary) VALUES
('Flagstaff Area', 'AZ', 'Flagstaff', TRUE),
('Flagstaff Area', 'AZ', 'Sedona', FALSE)
ON CONFLICT (state_code, city) DO NOTHING;
-- Prescott Area
INSERT INTO metro_areas (metro_name, state_code, city, is_primary) VALUES
('Prescott Area', 'AZ', 'Prescott', TRUE),
('Prescott Area', 'AZ', 'Prescott Valley', FALSE)
ON CONFLICT (state_code, city) DO NOTHING;
-- ============================================================
-- FUNCTION: claim_identity
-- Claims an available identity for a worker
-- Tries: exact city -> metro area -> any in state -> create new
-- ============================================================
CREATE OR REPLACE FUNCTION claim_identity(
p_worker_id VARCHAR(100),
p_state_code VARCHAR(2),
p_city VARCHAR(100) DEFAULT NULL
) RETURNS worker_identities AS $$
DECLARE
claimed_identity worker_identities;
metro_name_val VARCHAR(100);
primary_city VARCHAR(100);
BEGIN
-- 1. Try exact city match (if city provided)
IF p_city IS NOT NULL THEN
UPDATE worker_identities
SET is_active = TRUE,
active_worker_id = p_worker_id,
last_used_at = NOW()
WHERE id = (
SELECT id FROM worker_identities
WHERE state_code = p_state_code
AND city = p_city
AND is_active = FALSE
AND is_healthy = TRUE
AND (cooldown_until IS NULL OR cooldown_until < NOW())
ORDER BY last_used_at ASC NULLS FIRST
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING * INTO claimed_identity;
IF claimed_identity.id IS NOT NULL THEN
RETURN claimed_identity;
END IF;
END IF;
-- 2. Try metro area fallback
IF p_city IS NOT NULL THEN
-- Find the metro area for this city
SELECT ma.metro_name INTO metro_name_val
FROM metro_areas ma
WHERE ma.state_code = p_state_code AND ma.city = p_city;
IF metro_name_val IS NOT NULL THEN
-- Get primary city of metro
SELECT ma.city INTO primary_city
FROM metro_areas ma
WHERE ma.metro_name = metro_name_val AND ma.is_primary = TRUE;
-- Try any city in same metro
UPDATE worker_identities wi
SET is_active = TRUE,
active_worker_id = p_worker_id,
last_used_at = NOW()
WHERE wi.id = (
SELECT wi2.id FROM worker_identities wi2
JOIN metro_areas ma ON wi2.city = ma.city AND wi2.state_code = ma.state_code
WHERE ma.metro_name = metro_name_val
AND wi2.is_active = FALSE
AND wi2.is_healthy = TRUE
AND (wi2.cooldown_until IS NULL OR wi2.cooldown_until < NOW())
ORDER BY wi2.last_used_at ASC NULLS FIRST
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING * INTO claimed_identity;
IF claimed_identity.id IS NOT NULL THEN
RETURN claimed_identity;
END IF;
END IF;
END IF;
-- 3. Try any identity in state
UPDATE worker_identities
SET is_active = TRUE,
active_worker_id = p_worker_id,
last_used_at = NOW()
WHERE id = (
SELECT id FROM worker_identities
WHERE state_code = p_state_code
AND is_active = FALSE
AND is_healthy = TRUE
AND (cooldown_until IS NULL OR cooldown_until < NOW())
ORDER BY last_used_at ASC NULLS FIRST
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING * INTO claimed_identity;
-- Return whatever we got (NULL if nothing available - caller should create new)
RETURN claimed_identity;
END;
$$ LANGUAGE plpgsql;
-- ============================================================
-- FUNCTION: release_identity
-- Releases an identity back to pool with cooldown
-- ============================================================
CREATE OR REPLACE FUNCTION release_identity(
p_identity_id INT,
p_tasks_completed INT DEFAULT 0,
p_failed BOOLEAN DEFAULT FALSE
) RETURNS VOID AS $$
DECLARE
cooldown_hours FLOAT;
BEGIN
-- Random cooldown between 2-3 hours for diversity
cooldown_hours := 2 + random(); -- 2.0 to 3.0 hours
UPDATE worker_identities
SET is_active = FALSE,
active_worker_id = NULL,
total_tasks_completed = total_tasks_completed + p_tasks_completed,
total_sessions = total_sessions + 1,
cooldown_until = NOW() + (cooldown_hours || ' hours')::INTERVAL,
consecutive_failures = CASE WHEN p_failed THEN consecutive_failures + 1 ELSE 0 END,
is_healthy = CASE WHEN consecutive_failures >= 3 THEN FALSE ELSE TRUE END
WHERE id = p_identity_id;
END;
$$ LANGUAGE plpgsql;
-- ============================================================
-- FUNCTION: get_pending_tasks_by_geo
-- Gets pending tasks grouped by state/city for identity assignment
-- ============================================================
CREATE OR REPLACE FUNCTION get_pending_tasks_by_geo(
p_limit INT DEFAULT 10
) RETURNS TABLE (
state_code VARCHAR(2),
city VARCHAR(100),
pending_count BIGINT,
available_identities BIGINT
) AS $$
BEGIN
RETURN QUERY
SELECT
d.state as state_code,
d.city,
COUNT(t.id) as pending_count,
(
SELECT COUNT(*) FROM worker_identities wi
WHERE wi.state_code = d.state
AND (wi.city = d.city OR wi.city IS NULL)
AND wi.is_active = FALSE
AND wi.is_healthy = TRUE
AND (wi.cooldown_until IS NULL OR wi.cooldown_until < NOW())
) as available_identities
FROM worker_tasks t
JOIN dispensaries d ON t.dispensary_id = d.id
WHERE t.status = 'pending'
AND d.state IS NOT NULL
GROUP BY d.state, d.city
ORDER BY COUNT(t.id) DESC
LIMIT p_limit;
END;
$$ LANGUAGE plpgsql;
-- ============================================================
-- FUNCTION: get_tasks_for_identity
-- Gets tasks matching an identity's geo (same city or metro)
-- ============================================================
CREATE OR REPLACE FUNCTION get_tasks_for_identity(
p_state_code VARCHAR(2),
p_city VARCHAR(100),
p_limit INT DEFAULT 5
) RETURNS TABLE (
task_id INT,
dispensary_id INT,
dispensary_name VARCHAR(255),
dispensary_city VARCHAR(100),
role VARCHAR(50)
) AS $$
DECLARE
metro_name_val VARCHAR(100);
BEGIN
-- Find metro area for this city
SELECT ma.metro_name INTO metro_name_val
FROM metro_areas ma
WHERE ma.state_code = p_state_code AND ma.city = p_city;
RETURN QUERY
SELECT
t.id as task_id,
d.id as dispensary_id,
d.name as dispensary_name,
d.city as dispensary_city,
t.role
FROM worker_tasks t
JOIN dispensaries d ON t.dispensary_id = d.id
WHERE t.status = 'pending'
AND d.state = p_state_code
AND (
-- Exact city match
d.city = p_city
-- Or same metro area
OR (metro_name_val IS NOT NULL AND d.city IN (
SELECT ma.city FROM metro_areas ma WHERE ma.metro_name = metro_name_val
))
-- Or any in state if no metro
OR (metro_name_val IS NULL)
)
ORDER BY
CASE WHEN d.city = p_city THEN 0 ELSE 1 END, -- Prefer exact city
t.priority DESC,
t.created_at ASC
LIMIT p_limit;
END;
$$ LANGUAGE plpgsql;
-- ============================================================
-- VIEW: identity_pool_status
-- Overview of identity pool health and availability
-- ============================================================
CREATE OR REPLACE VIEW identity_pool_status AS
SELECT
state_code,
city,
COUNT(*) as total_identities,
COUNT(*) FILTER (WHERE is_active) as active,
COUNT(*) FILTER (WHERE NOT is_active AND is_healthy AND (cooldown_until IS NULL OR cooldown_until < NOW())) as available,
COUNT(*) FILTER (WHERE NOT is_active AND cooldown_until > NOW()) as cooling_down,
COUNT(*) FILTER (WHERE NOT is_healthy) as unhealthy,
SUM(total_tasks_completed) as total_tasks,
AVG(total_tasks_completed)::INT as avg_tasks_per_identity
FROM worker_identities
GROUP BY state_code, city
ORDER BY state_code, city;
-- ============================================================
-- Comments
-- ============================================================
COMMENT ON TABLE worker_identities IS 'Pool of IP/fingerprint identities for worker rotation';
COMMENT ON TABLE metro_areas IS 'City groupings for geographic fallback matching';
COMMENT ON FUNCTION claim_identity IS 'Claim an available identity: exact city -> metro -> state -> NULL (create new)';
COMMENT ON FUNCTION release_identity IS 'Release identity with 2-3 hour random cooldown';
COMMENT ON FUNCTION get_pending_tasks_by_geo IS 'Get pending task counts by state/city';
COMMENT ON FUNCTION get_tasks_for_identity IS 'Get tasks matching identity geo (city or metro area)';