feat: Add identity pool for diverse IP/fingerprint rotation

- Add worker_identities table and metro_areas for city groupings
- Create IdentityPoolService for claiming/releasing identities
- Each identity used for 3-5 tasks, then 2-3 hour cooldown
- Integrate with task-worker via USE_IDENTITY_POOL feature flag
- Update puppeteer-preflight to accept custom proxy URLs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-13 18:46:58 -07:00
parent d810592bf2
commit 83f629fec4
4 changed files with 1119 additions and 27 deletions

View File

@@ -0,0 +1,354 @@
-- Migration: 109_worker_identity_pool.sql
-- Description: Identity pool for diverse IP/fingerprint rotation
-- Created: 2025-12-14
--
-- Workers claim identities (IP + fingerprint) from pool.
-- Each identity used for 3-5 tasks, then cools down 2-3 hours.
-- This creates natural browsing patterns - same person doesn't hit 20 stores.
-- ============================================================
-- IDENTITY POOL TABLE
-- ============================================================
CREATE TABLE IF NOT EXISTS worker_identities (
id SERIAL PRIMARY KEY,
-- Evomi session controls the IP
session_id VARCHAR(100) UNIQUE NOT NULL,
-- Detected IP from this session
ip_address INET,
-- Geo targeting
state_code VARCHAR(2) NOT NULL,
city VARCHAR(100), -- City-level targeting for diversity
-- Fingerprint data (UA, timezone, locale, device, etc.)
fingerprint JSONB NOT NULL,
-- Timestamps
created_at TIMESTAMPTZ DEFAULT NOW(),
last_used_at TIMESTAMPTZ,
cooldown_until TIMESTAMPTZ, -- Can't reuse until this time
-- Usage stats
total_tasks_completed INT DEFAULT 0,
total_sessions INT DEFAULT 1, -- How many times this identity has been used
-- Current state
is_active BOOLEAN DEFAULT FALSE, -- Currently claimed by a worker
active_worker_id VARCHAR(100), -- Which worker has it
-- Health tracking
consecutive_failures INT DEFAULT 0,
is_healthy BOOLEAN DEFAULT TRUE -- Set false if IP gets blocked
);
-- Indexes for efficient lookups
CREATE INDEX IF NOT EXISTS idx_worker_identities_state_city
ON worker_identities(state_code, city);
CREATE INDEX IF NOT EXISTS idx_worker_identities_available
ON worker_identities(state_code, is_active, cooldown_until)
WHERE is_healthy = TRUE;
CREATE INDEX IF NOT EXISTS idx_worker_identities_cooldown
ON worker_identities(cooldown_until)
WHERE is_healthy = TRUE AND is_active = FALSE;
-- ============================================================
-- METRO AREA MAPPING
-- For fallback when exact city not available
-- ============================================================
CREATE TABLE IF NOT EXISTS metro_areas (
id SERIAL PRIMARY KEY,
metro_name VARCHAR(100) NOT NULL,
state_code VARCHAR(2) NOT NULL,
city VARCHAR(100) NOT NULL,
is_primary BOOLEAN DEFAULT FALSE, -- Primary city of the metro
UNIQUE(state_code, city)
);
-- Phoenix Metro Area
INSERT INTO metro_areas (metro_name, state_code, city, is_primary) VALUES
('Phoenix Metro', 'AZ', 'Phoenix', TRUE),
('Phoenix Metro', 'AZ', 'Mesa', FALSE),
('Phoenix Metro', 'AZ', 'Glendale', FALSE),
('Phoenix Metro', 'AZ', 'Tempe', FALSE),
('Phoenix Metro', 'AZ', 'Scottsdale', FALSE),
('Phoenix Metro', 'AZ', 'Chandler', FALSE),
('Phoenix Metro', 'AZ', 'Peoria', FALSE),
('Phoenix Metro', 'AZ', 'El Mirage', FALSE),
('Phoenix Metro', 'AZ', 'Tolleson', FALSE),
('Phoenix Metro', 'AZ', 'Sun City', FALSE),
('Phoenix Metro', 'AZ', 'Apache Junction', FALSE),
('Phoenix Metro', 'AZ', 'Cave Creek', FALSE),
('Phoenix Metro', 'AZ', 'Gilbert', FALSE),
('Phoenix Metro', 'AZ', 'Surprise', FALSE),
('Phoenix Metro', 'AZ', 'Avondale', FALSE),
('Phoenix Metro', 'AZ', 'Goodyear', FALSE),
('Phoenix Metro', 'AZ', 'Buckeye', FALSE),
('Phoenix Metro', 'AZ', 'Queen Creek', FALSE)
ON CONFLICT (state_code, city) DO NOTHING;
-- Tucson Metro Area
INSERT INTO metro_areas (metro_name, state_code, city, is_primary) VALUES
('Tucson Metro', 'AZ', 'Tucson', TRUE),
('Tucson Metro', 'AZ', 'Oro Valley', FALSE),
('Tucson Metro', 'AZ', 'Marana', FALSE),
('Tucson Metro', 'AZ', 'Sahuarita', FALSE),
('Tucson Metro', 'AZ', 'South Tucson', FALSE)
ON CONFLICT (state_code, city) DO NOTHING;
-- Flagstaff Area
INSERT INTO metro_areas (metro_name, state_code, city, is_primary) VALUES
('Flagstaff Area', 'AZ', 'Flagstaff', TRUE),
('Flagstaff Area', 'AZ', 'Sedona', FALSE)
ON CONFLICT (state_code, city) DO NOTHING;
-- Prescott Area
INSERT INTO metro_areas (metro_name, state_code, city, is_primary) VALUES
('Prescott Area', 'AZ', 'Prescott', TRUE),
('Prescott Area', 'AZ', 'Prescott Valley', FALSE)
ON CONFLICT (state_code, city) DO NOTHING;
-- ============================================================
-- FUNCTION: claim_identity
-- Claims an available identity for a worker
-- Tries: exact city -> metro area -> any in state -> create new
-- ============================================================
CREATE OR REPLACE FUNCTION claim_identity(
p_worker_id VARCHAR(100),
p_state_code VARCHAR(2),
p_city VARCHAR(100) DEFAULT NULL
) RETURNS worker_identities AS $$
DECLARE
claimed_identity worker_identities;
metro_name_val VARCHAR(100);
primary_city VARCHAR(100);
BEGIN
-- 1. Try exact city match (if city provided)
IF p_city IS NOT NULL THEN
UPDATE worker_identities
SET is_active = TRUE,
active_worker_id = p_worker_id,
last_used_at = NOW()
WHERE id = (
SELECT id FROM worker_identities
WHERE state_code = p_state_code
AND city = p_city
AND is_active = FALSE
AND is_healthy = TRUE
AND (cooldown_until IS NULL OR cooldown_until < NOW())
ORDER BY last_used_at ASC NULLS FIRST
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING * INTO claimed_identity;
IF claimed_identity.id IS NOT NULL THEN
RETURN claimed_identity;
END IF;
END IF;
-- 2. Try metro area fallback
IF p_city IS NOT NULL THEN
-- Find the metro area for this city
SELECT ma.metro_name INTO metro_name_val
FROM metro_areas ma
WHERE ma.state_code = p_state_code AND ma.city = p_city;
IF metro_name_val IS NOT NULL THEN
-- Get primary city of metro
SELECT ma.city INTO primary_city
FROM metro_areas ma
WHERE ma.metro_name = metro_name_val AND ma.is_primary = TRUE;
-- Try any city in same metro
UPDATE worker_identities wi
SET is_active = TRUE,
active_worker_id = p_worker_id,
last_used_at = NOW()
WHERE wi.id = (
SELECT wi2.id FROM worker_identities wi2
JOIN metro_areas ma ON wi2.city = ma.city AND wi2.state_code = ma.state_code
WHERE ma.metro_name = metro_name_val
AND wi2.is_active = FALSE
AND wi2.is_healthy = TRUE
AND (wi2.cooldown_until IS NULL OR wi2.cooldown_until < NOW())
ORDER BY wi2.last_used_at ASC NULLS FIRST
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING * INTO claimed_identity;
IF claimed_identity.id IS NOT NULL THEN
RETURN claimed_identity;
END IF;
END IF;
END IF;
-- 3. Try any identity in state
UPDATE worker_identities
SET is_active = TRUE,
active_worker_id = p_worker_id,
last_used_at = NOW()
WHERE id = (
SELECT id FROM worker_identities
WHERE state_code = p_state_code
AND is_active = FALSE
AND is_healthy = TRUE
AND (cooldown_until IS NULL OR cooldown_until < NOW())
ORDER BY last_used_at ASC NULLS FIRST
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING * INTO claimed_identity;
-- Return whatever we got (NULL if nothing available - caller should create new)
RETURN claimed_identity;
END;
$$ LANGUAGE plpgsql;
-- ============================================================
-- FUNCTION: release_identity
-- Releases an identity back to pool with cooldown
-- ============================================================
CREATE OR REPLACE FUNCTION release_identity(
p_identity_id INT,
p_tasks_completed INT DEFAULT 0,
p_failed BOOLEAN DEFAULT FALSE
) RETURNS VOID AS $$
DECLARE
cooldown_hours FLOAT;
BEGIN
-- Random cooldown between 2-3 hours for diversity
cooldown_hours := 2 + random(); -- 2.0 to 3.0 hours
UPDATE worker_identities
SET is_active = FALSE,
active_worker_id = NULL,
total_tasks_completed = total_tasks_completed + p_tasks_completed,
total_sessions = total_sessions + 1,
cooldown_until = NOW() + (cooldown_hours || ' hours')::INTERVAL,
consecutive_failures = CASE WHEN p_failed THEN consecutive_failures + 1 ELSE 0 END,
is_healthy = CASE WHEN consecutive_failures >= 3 THEN FALSE ELSE TRUE END
WHERE id = p_identity_id;
END;
$$ LANGUAGE plpgsql;
-- ============================================================
-- FUNCTION: get_pending_tasks_by_geo
-- Gets pending tasks grouped by state/city for identity assignment
-- ============================================================
CREATE OR REPLACE FUNCTION get_pending_tasks_by_geo(
p_limit INT DEFAULT 10
) RETURNS TABLE (
state_code VARCHAR(2),
city VARCHAR(100),
pending_count BIGINT,
available_identities BIGINT
) AS $$
BEGIN
RETURN QUERY
SELECT
d.state as state_code,
d.city,
COUNT(t.id) as pending_count,
(
SELECT COUNT(*) FROM worker_identities wi
WHERE wi.state_code = d.state
AND (wi.city = d.city OR wi.city IS NULL)
AND wi.is_active = FALSE
AND wi.is_healthy = TRUE
AND (wi.cooldown_until IS NULL OR wi.cooldown_until < NOW())
) as available_identities
FROM worker_tasks t
JOIN dispensaries d ON t.dispensary_id = d.id
WHERE t.status = 'pending'
AND d.state IS NOT NULL
GROUP BY d.state, d.city
ORDER BY COUNT(t.id) DESC
LIMIT p_limit;
END;
$$ LANGUAGE plpgsql;
-- ============================================================
-- FUNCTION: get_tasks_for_identity
-- Gets tasks matching an identity's geo (same city or metro)
-- ============================================================
CREATE OR REPLACE FUNCTION get_tasks_for_identity(
p_state_code VARCHAR(2),
p_city VARCHAR(100),
p_limit INT DEFAULT 5
) RETURNS TABLE (
task_id INT,
dispensary_id INT,
dispensary_name VARCHAR(255),
dispensary_city VARCHAR(100),
role VARCHAR(50)
) AS $$
DECLARE
metro_name_val VARCHAR(100);
BEGIN
-- Find metro area for this city
SELECT ma.metro_name INTO metro_name_val
FROM metro_areas ma
WHERE ma.state_code = p_state_code AND ma.city = p_city;
RETURN QUERY
SELECT
t.id as task_id,
d.id as dispensary_id,
d.name as dispensary_name,
d.city as dispensary_city,
t.role
FROM worker_tasks t
JOIN dispensaries d ON t.dispensary_id = d.id
WHERE t.status = 'pending'
AND d.state = p_state_code
AND (
-- Exact city match
d.city = p_city
-- Or same metro area
OR (metro_name_val IS NOT NULL AND d.city IN (
SELECT ma.city FROM metro_areas ma WHERE ma.metro_name = metro_name_val
))
-- Or any in state if no metro
OR (metro_name_val IS NULL)
)
ORDER BY
CASE WHEN d.city = p_city THEN 0 ELSE 1 END, -- Prefer exact city
t.priority DESC,
t.created_at ASC
LIMIT p_limit;
END;
$$ LANGUAGE plpgsql;
-- ============================================================
-- VIEW: identity_pool_status
-- Overview of identity pool health and availability
-- ============================================================
CREATE OR REPLACE VIEW identity_pool_status AS
SELECT
state_code,
city,
COUNT(*) as total_identities,
COUNT(*) FILTER (WHERE is_active) as active,
COUNT(*) FILTER (WHERE NOT is_active AND is_healthy AND (cooldown_until IS NULL OR cooldown_until < NOW())) as available,
COUNT(*) FILTER (WHERE NOT is_active AND cooldown_until > NOW()) as cooling_down,
COUNT(*) FILTER (WHERE NOT is_healthy) as unhealthy,
SUM(total_tasks_completed) as total_tasks,
AVG(total_tasks_completed)::INT as avg_tasks_per_identity
FROM worker_identities
GROUP BY state_code, city
ORDER BY state_code, city;
-- ============================================================
-- Comments
-- ============================================================
COMMENT ON TABLE worker_identities IS 'Pool of IP/fingerprint identities for worker rotation';
COMMENT ON TABLE metro_areas IS 'City groupings for geographic fallback matching';
COMMENT ON FUNCTION claim_identity IS 'Claim an available identity: exact city -> metro -> state -> NULL (create new)';
COMMENT ON FUNCTION release_identity IS 'Release identity with 2-3 hour random cooldown';
COMMENT ON FUNCTION get_pending_tasks_by_geo IS 'Get pending task counts by state/city';
COMMENT ON FUNCTION get_tasks_for_identity IS 'Get tasks matching identity geo (city or metro area)';

View File

@@ -0,0 +1,493 @@
/**
* Identity Pool Service
*
* Manages IP/fingerprint identities for diverse worker rotation.
* Each identity is used for 3-5 tasks, then cools down 2-3 hours.
*
* Flow:
* 1. Worker queries pending tasks by geo
* 2. Claims identity for target city/state (or creates new)
* 3. Runs preflight with identity
* 4. Completes 3-5 tasks
* 5. Releases identity (goes on cooldown)
* 6. Immediately claims new identity, repeats
*/
import { Pool } from 'pg';
import { buildEvomiProxyUrl, getEvomiConfig } from './crawl-rotator';
// ============================================================
// TYPES
// ============================================================
export interface WorkerIdentity {
id: number;
session_id: string;
ip_address: string | null;
state_code: string;
city: string | null;
fingerprint: IdentityFingerprint;
created_at: Date;
last_used_at: Date | null;
cooldown_until: Date | null;
total_tasks_completed: number;
total_sessions: number;
is_active: boolean;
active_worker_id: string | null;
is_healthy: boolean;
}
export interface IdentityFingerprint {
userAgent: string;
browser: string;
browserVersion: string;
os: string;
osVersion: string;
device: 'desktop' | 'mobile' | 'tablet';
screenWidth: number;
screenHeight: number;
timezone: string;
locale: string;
// Additional anti-detect properties
webglVendor?: string;
webglRenderer?: string;
languages?: string[];
}
export interface PendingTaskGeo {
state_code: string;
city: string | null;
pending_count: number;
available_identities: number;
}
export interface TaskForIdentity {
task_id: number;
dispensary_id: number;
dispensary_name: string;
dispensary_city: string | null;
role: string;
}
// ============================================================
// FINGERPRINT GENERATION
// Following market share distributions from crawl-rotator.ts
// ============================================================
// Device weights (must match crawl-rotator.ts)
const DEVICE_WEIGHTS = {
mobile: 62,
desktop: 36,
tablet: 2,
};
// Browser weights (must match crawl-rotator.ts)
const BROWSER_WEIGHTS = {
chrome: 67,
safari: 20,
edge: 6,
firefox: 3,
};
// Common screen resolutions by device
const SCREEN_RESOLUTIONS = {
desktop: [
{ width: 1920, height: 1080 },
{ width: 1366, height: 768 },
{ width: 1536, height: 864 },
{ width: 1440, height: 900 },
{ width: 1280, height: 720 },
{ width: 2560, height: 1440 },
],
mobile: [
{ width: 390, height: 844 }, // iPhone 12/13/14
{ width: 393, height: 873 }, // iPhone 14 Pro
{ width: 430, height: 932 }, // iPhone 14 Pro Max
{ width: 360, height: 800 }, // Android common
{ width: 412, height: 915 }, // Pixel 6
{ width: 384, height: 854 }, // Android common
],
tablet: [
{ width: 768, height: 1024 }, // iPad
{ width: 810, height: 1080 }, // iPad 10th gen
{ width: 820, height: 1180 }, // iPad Air
{ width: 800, height: 1280 }, // Android tablet
],
};
// OS versions by device/browser combo
const OS_VERSIONS = {
windows: ['10', '11'],
macos: ['13.0', '13.5', '14.0', '14.1', '14.2'],
ios: ['17.0', '17.1', '17.2', '17.3', '17.4', '18.0', '18.1'],
android: ['13', '14'],
};
// Browser versions
const BROWSER_VERSIONS = {
chrome: ['120', '121', '122', '123', '124', '125'],
safari: ['17.0', '17.1', '17.2', '17.3', '17.4'],
firefox: ['121', '122', '123', '124', '125'],
edge: ['120', '121', '122', '123', '124'],
};
// Timezone mapping by state (IANA format)
const STATE_TIMEZONES: Record<string, string[]> = {
AZ: ['America/Phoenix'], // Arizona doesn't do DST
CA: ['America/Los_Angeles'],
CO: ['America/Denver'],
FL: ['America/New_York', 'America/Chicago'], // FL spans two zones
IL: ['America/Chicago'],
MA: ['America/New_York'],
MI: ['America/Detroit', 'America/New_York'],
NV: ['America/Los_Angeles'],
NY: ['America/New_York'],
OH: ['America/New_York'],
OR: ['America/Los_Angeles'],
PA: ['America/New_York'],
WA: ['America/Los_Angeles'],
// Add more as needed
};
/**
* Pick random item based on weights
*/
function weightedRandom<T extends string>(weights: Record<T, number>): T {
const entries = Object.entries(weights) as [T, number][];
const total = entries.reduce((sum, [, w]) => sum + w, 0);
let random = Math.random() * total;
for (const [key, weight] of entries) {
random -= weight;
if (random <= 0) return key;
}
return entries[0][0];
}
/**
* Pick random item from array
*/
function randomFrom<T>(arr: T[]): T {
return arr[Math.floor(Math.random() * arr.length)];
}
/**
* Generate a diverse fingerprint for a given state
*/
export function generateFingerprint(stateCode: string): IdentityFingerprint {
// Pick device type based on market share
const device = weightedRandom(DEVICE_WEIGHTS) as 'desktop' | 'mobile' | 'tablet';
// Pick browser based on market share
const browser = weightedRandom(BROWSER_WEIGHTS) as 'chrome' | 'safari' | 'edge' | 'firefox';
// Browser version
const browserVersion = randomFrom(BROWSER_VERSIONS[browser]);
// OS based on device and browser
let os: string;
let osVersion: string;
if (device === 'mobile') {
if (browser === 'safari') {
os = 'iOS';
osVersion = randomFrom(OS_VERSIONS.ios);
} else {
// Chrome/Firefox on mobile - mostly Android
os = Math.random() < 0.7 ? 'Android' : 'iOS';
osVersion = os === 'Android' ? randomFrom(OS_VERSIONS.android) : randomFrom(OS_VERSIONS.ios);
}
} else if (device === 'tablet') {
os = Math.random() < 0.6 ? 'iOS' : 'Android';
osVersion = os === 'iOS' ? randomFrom(OS_VERSIONS.ios) : randomFrom(OS_VERSIONS.android);
} else {
// Desktop
if (browser === 'safari') {
os = 'macOS';
osVersion = randomFrom(OS_VERSIONS.macos);
} else if (browser === 'edge') {
os = 'Windows';
osVersion = randomFrom(OS_VERSIONS.windows);
} else {
// Chrome/Firefox - mix of Windows and macOS
os = Math.random() < 0.75 ? 'Windows' : 'macOS';
osVersion = os === 'Windows' ? randomFrom(OS_VERSIONS.windows) : randomFrom(OS_VERSIONS.macos);
}
}
// Screen resolution
const resolution = randomFrom(SCREEN_RESOLUTIONS[device]);
// Timezone for state
const timezones = STATE_TIMEZONES[stateCode] || ['America/New_York'];
const timezone = randomFrom(timezones);
// Build user agent
const userAgent = buildUserAgent(browser, browserVersion, os, osVersion, device);
return {
userAgent,
browser: `${browser.charAt(0).toUpperCase()}${browser.slice(1)}`,
browserVersion,
os,
osVersion,
device,
screenWidth: resolution.width,
screenHeight: resolution.height,
timezone,
locale: 'en-US',
languages: ['en-US', 'en'],
};
}
/**
* Build realistic user agent string
*/
function buildUserAgent(
browser: string,
version: string,
os: string,
osVersion: string,
device: string
): string {
// Base Mozilla/5.0 prefix
let ua = 'Mozilla/5.0 ';
// Platform token
if (os === 'Windows') {
ua += `(Windows NT ${osVersion === '11' ? '10.0' : '10.0'}; Win64; x64) `;
} else if (os === 'macOS') {
const macVer = osVersion.replace('.', '_');
ua += `(Macintosh; Intel Mac OS X ${macVer}) `;
} else if (os === 'iOS') {
const iosVer = osVersion.replace('.', '_');
if (device === 'tablet') {
ua += `(iPad; CPU OS ${iosVer} like Mac OS X) `;
} else {
ua += `(iPhone; CPU iPhone OS ${iosVer} like Mac OS X) `;
}
} else if (os === 'Android') {
ua += `(Linux; Android ${osVersion}; ${device === 'tablet' ? 'Tablet' : 'Mobile'}) `;
}
// Browser engine and version
if (browser === 'chrome' || browser === 'edge') {
ua += `AppleWebKit/537.36 (KHTML, like Gecko) `;
if (browser === 'chrome') {
ua += `Chrome/${version}.0.0.0 `;
} else {
ua += `Chrome/${version}.0.0.0 Edg/${version}.0.0.0 `;
}
ua += device === 'mobile' ? 'Mobile Safari/537.36' : 'Safari/537.36';
} else if (browser === 'safari') {
ua += `AppleWebKit/605.1.15 (KHTML, like Gecko) `;
ua += `Version/${version} `;
ua += device === 'mobile' ? 'Mobile/15E148 Safari/604.1' : 'Safari/605.1.15';
} else if (browser === 'firefox') {
ua += `Gecko/20100101 Firefox/${version}.0`;
}
return ua;
}
// ============================================================
// IDENTITY POOL SERVICE
// ============================================================
export class IdentityPoolService {
constructor(private pool: Pool) {}
/**
* Get pending tasks grouped by geo
*/
async getPendingTasksByGeo(limit: number = 10): Promise<PendingTaskGeo[]> {
const result = await this.pool.query(
`SELECT * FROM get_pending_tasks_by_geo($1)`,
[limit]
);
return result.rows;
}
/**
* Claim an identity for a worker
* Returns existing identity or null if none available (caller should create new)
*/
async claimIdentity(
workerId: string,
stateCode: string,
city?: string
): Promise<WorkerIdentity | null> {
const result = await this.pool.query(
`SELECT * FROM claim_identity($1, $2, $3)`,
[workerId, stateCode, city || null]
);
if (result.rows[0]?.id) {
return this.rowToIdentity(result.rows[0]);
}
return null;
}
/**
* Create a new identity with Evomi proxy
* Generates session ID, gets IP, creates fingerprint
*/
async createIdentity(
workerId: string,
stateCode: string,
city?: string
): Promise<WorkerIdentity | null> {
const evomiConfig = getEvomiConfig();
if (!evomiConfig.enabled) {
console.error('[IdentityPool] Evomi not configured - cannot create identity');
return null;
}
// Generate unique session ID
const sessionId = `${workerId.slice(0, 8)}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
// Build proxy URL
const proxyResult = buildEvomiProxyUrl(stateCode, sessionId, city?.toLowerCase().replace(/\s+/g, '.'));
if (!proxyResult) {
console.error(`[IdentityPool] Failed to build proxy URL for ${stateCode}/${city}`);
return null;
}
// Test proxy and get IP
let ipAddress: string | null = null;
try {
const axios = require('axios');
const HttpsProxyAgent = require('https-proxy-agent').HttpsProxyAgent;
const agent = new HttpsProxyAgent(proxyResult.url);
const response = await axios.get('https://api.ipify.org?format=json', {
httpsAgent: agent,
timeout: 15000,
});
ipAddress = response.data?.ip || null;
console.log(`[IdentityPool] New identity IP: ${ipAddress} (${proxyResult.geo})`);
} catch (err: any) {
console.error(`[IdentityPool] Failed to get IP for new identity: ${err.message}`);
// Still create identity - IP will be detected during preflight
}
// Generate fingerprint
const fingerprint = generateFingerprint(stateCode);
// Insert into database
const insertResult = await this.pool.query(`
INSERT INTO worker_identities (
session_id, ip_address, state_code, city, fingerprint,
is_active, active_worker_id, last_used_at
) VALUES ($1, $2, $3, $4, $5, TRUE, $6, NOW())
RETURNING *
`, [
sessionId,
ipAddress,
stateCode,
city || null,
JSON.stringify(fingerprint),
workerId,
]);
if (insertResult.rows[0]) {
console.log(`[IdentityPool] Created new identity #${insertResult.rows[0].id} for ${stateCode}/${city || 'any'}`);
return this.rowToIdentity(insertResult.rows[0]);
}
return null;
}
/**
* Release an identity back to pool with cooldown
*/
async releaseIdentity(
identityId: number,
tasksCompleted: number = 0,
failed: boolean = false
): Promise<void> {
await this.pool.query(
`SELECT release_identity($1, $2, $3)`,
[identityId, tasksCompleted, failed]
);
console.log(`[IdentityPool] Released identity #${identityId} (${tasksCompleted} tasks, failed=${failed})`);
}
/**
* Get tasks matching an identity's geo
*/
async getTasksForIdentity(
stateCode: string,
city: string | null,
limit: number = 5
): Promise<TaskForIdentity[]> {
const result = await this.pool.query(
`SELECT * FROM get_tasks_for_identity($1, $2, $3)`,
[stateCode, city, limit]
);
return result.rows;
}
/**
* Get proxy URL for an identity
*/
getProxyUrl(identity: WorkerIdentity): string | null {
const proxyResult = buildEvomiProxyUrl(
identity.state_code,
identity.session_id,
identity.city?.toLowerCase().replace(/\s+/g, '.') || undefined
);
return proxyResult?.url || null;
}
/**
* Get identity pool status
*/
async getPoolStatus(): Promise<any[]> {
const result = await this.pool.query(`SELECT * FROM identity_pool_status`);
return result.rows;
}
/**
* Mark identity as unhealthy (e.g., IP got blocked)
*/
async markUnhealthy(identityId: number): Promise<void> {
await this.pool.query(`
UPDATE worker_identities
SET is_healthy = FALSE, is_active = FALSE, active_worker_id = NULL
WHERE id = $1
`, [identityId]);
console.log(`[IdentityPool] Marked identity #${identityId} as unhealthy`);
}
/**
* Convert DB row to WorkerIdentity
*/
private rowToIdentity(row: any): WorkerIdentity {
return {
id: row.id,
session_id: row.session_id,
ip_address: row.ip_address,
state_code: row.state_code,
city: row.city,
fingerprint: typeof row.fingerprint === 'string'
? JSON.parse(row.fingerprint)
: row.fingerprint,
created_at: row.created_at,
last_used_at: row.last_used_at,
cooldown_until: row.cooldown_until,
total_tasks_completed: row.total_tasks_completed,
total_sessions: row.total_sessions,
is_active: row.is_active,
active_worker_id: row.active_worker_id,
is_healthy: row.is_healthy,
};
}
}
/**
* Get random task count for a session (3-5 for diversity)
*/
export function getRandomTaskCount(): number {
return 3 + Math.floor(Math.random() * 3); // 3, 4, or 5
}

View File

@@ -84,9 +84,11 @@ export interface PuppeteerPreflightResult extends PreflightResult {
* Tests browser-based access with anti-detect verification via fingerprint.com
*
* @param crawlRotator - CrawlRotator instance to get proxy from pool
* @param customProxyUrl - Optional custom proxy URL (for identity pool system)
*/
export async function runPuppeteerPreflight(
crawlRotator?: CrawlRotator
crawlRotator?: CrawlRotator,
customProxyUrl?: string
): Promise<PuppeteerPreflightResult> {
const result: PuppeteerPreflightResult = {
method: 'http',
@@ -105,11 +107,26 @@ export async function runPuppeteerPreflight(
let browser: any = null;
try {
// Step 0: Get a proxy - prefer Evomi API, fall back to DB pool
// Step 0: Get a proxy - custom URL, Evomi API, or DB pool
let proxyUrl: string | null = null;
let expectedProxyHost: string | null = null;
// Try Evomi first (dynamic residential proxies)
// Use custom proxy URL if provided (for identity pool system)
if (customProxyUrl) {
result.proxyAvailable = true;
proxyUrl = customProxyUrl;
try {
const parsedUrl = new URL(customProxyUrl);
expectedProxyHost = parsedUrl.hostname;
} catch {
expectedProxyHost = 'custom';
}
result.expectedProxyIp = expectedProxyHost;
console.log(`[PuppeteerPreflight] Using custom proxy URL (identity pool)`);
}
// Try Evomi if no custom proxy (dynamic residential proxies)
if (!proxyUrl) {
const evomiConfig = getEvomiConfig();
if (evomiConfig.enabled) {
// Use AZ as default state for preflight testing
@@ -122,6 +139,7 @@ export async function runPuppeteerPreflight(
console.log(`[PuppeteerPreflight] Using Evomi proxy: ${evomiProxy.geo}`);
}
}
}
// Fall back to DB pool if Evomi not available
if (!proxyUrl && crawlRotator) {
@@ -135,9 +153,9 @@ export async function runPuppeteerPreflight(
}
}
// No proxy available from either source
// No proxy available from any source
if (!proxyUrl) {
result.error = 'No proxy available (Evomi not configured, DB pool empty)';
result.error = 'No proxy available (no custom URL, Evomi not configured, DB pool empty)';
console.log(`[PuppeteerPreflight] FAILED - No proxy available`);
return result;
}
@@ -318,10 +336,12 @@ export async function runPuppeteerPreflight(
*
* @param crawlRotator - CrawlRotator instance to get proxy from pool
* @param maxRetries - Number of retry attempts (default 1)
* @param customProxyUrl - Optional custom proxy URL (for identity pool system)
*/
export async function runPuppeteerPreflightWithRetry(
crawlRotator?: CrawlRotator,
maxRetries: number = 1
maxRetries: number = 1,
customProxyUrl?: string
): Promise<PuppeteerPreflightResult> {
let lastResult: PuppeteerPreflightResult | null = null;
@@ -331,7 +351,7 @@ export async function runPuppeteerPreflightWithRetry(
await new Promise((r) => setTimeout(r, 5000)); // Wait 5s between retries
}
lastResult = await runPuppeteerPreflight(crawlRotator);
lastResult = await runPuppeteerPreflight(crawlRotator, customProxyUrl);
if (lastResult.passed) {
return lastResult;

View File

@@ -70,6 +70,12 @@ import { runPuppeteerPreflightWithRetry, PuppeteerPreflightResult } from '../ser
// Geo-targeted proxy support
import { buildEvomiProxyUrl, getEvomiConfig } from '../services/crawl-rotator';
// Identity pool for diverse IP/fingerprint rotation
import { IdentityPoolService, WorkerIdentity, getRandomTaskCount } from '../services/identity-pool';
// Feature flag: Use new identity pool system (set via env var)
const USE_IDENTITY_POOL = process.env.USE_IDENTITY_POOL === 'true';
// Task handlers by role
// Platform-based handlers: {task}-{platform}.ts convention
import { handleProductRefresh } from './handlers/product-refresh';
@@ -357,12 +363,31 @@ export class TaskWorker {
private storedTimezone: string | null = null;
private storedFingerprint: WorkerFingerprint | null = null;
// ==========================================================================
// IDENTITY POOL TRACKING (new system - enabled via USE_IDENTITY_POOL)
// ==========================================================================
// Workers claim identities (IP + fingerprint) from pool.
// Each identity used for 3-5 tasks, then cools down 2-3 hours.
// This creates diverse, natural browsing patterns.
// ==========================================================================
private identityPool: IdentityPoolService | null = null;
private currentIdentity: WorkerIdentity | null = null;
private identityTasksCompleted: number = 0;
private identityMaxTasks: number = 5; // Random 3-5, set when identity claimed
private identityProxyUrl: string | null = null;
constructor(role: TaskRole | null = null, workerId?: string) {
this.pool = getPool();
this.role = role;
this.workerId = workerId || `worker-${uuidv4().slice(0, 8)}`;
this.crawlRotator = new CrawlRotator(this.pool);
// Initialize identity pool if feature enabled
if (USE_IDENTITY_POOL) {
this.identityPool = new IdentityPoolService(this.pool);
console.log(`[TaskWorker] Identity pool system ENABLED`);
}
// Initialize CPU tracking
const cpuUsage = process.cpuUsage();
this.lastCpuUsage = { user: cpuUsage.user, system: cpuUsage.system };
@@ -899,6 +924,179 @@ export class TaskWorker {
}
}
// ==========================================================================
// IDENTITY POOL METHODS (new system - enabled via USE_IDENTITY_POOL)
// ==========================================================================
/**
* Ensure worker has a valid identity from the pool.
* If no identity or identity exhausted, claim/create new one.
*
* Flow:
* 1. Check if current identity is valid and has tasks remaining
* 2. If not, release current identity (if any)
* 3. Query pending tasks to find target city/state
* 4. Claim existing identity or create new one
* 5. Run preflight with the identity's proxy
* 6. Return true if ready to claim tasks
*/
private async ensureIdentity(): Promise<boolean> {
if (!this.identityPool) {
console.error(`[TaskWorker] Identity pool not initialized`);
return false;
}
try {
// Check if current identity is still valid
if (this.currentIdentity && this.identityTasksCompleted < this.identityMaxTasks) {
return true; // Still have tasks remaining with current identity
}
// Release current identity if exhausted
if (this.currentIdentity) {
await this.releaseCurrentIdentity();
}
// Find target city/state based on pending tasks
const pendingGeo = await this.identityPool.getPendingTasksByGeo(5);
if (pendingGeo.length === 0) {
console.log(`[TaskWorker] ${this.friendlyName} no pending tasks available`);
return false;
}
// Pick city with most pending tasks
const target = pendingGeo[0];
console.log(`[TaskWorker] ${this.friendlyName} targeting ${target.city || 'any'}, ${target.state_code} (${target.pending_count} pending tasks)`);
// Try to claim existing identity
let identity = await this.identityPool.claimIdentity(
this.workerId,
target.state_code,
target.city || undefined
);
// Create new identity if none available
if (!identity) {
console.log(`[TaskWorker] ${this.friendlyName} no available identity, creating new one...`);
identity = await this.identityPool.createIdentity(
this.workerId,
target.state_code,
target.city || undefined
);
}
if (!identity) {
console.error(`[TaskWorker] ${this.friendlyName} failed to claim/create identity`);
return false;
}
// Set up identity
this.currentIdentity = identity;
this.identityTasksCompleted = 0;
this.identityMaxTasks = getRandomTaskCount(); // 3-5 random
this.identityProxyUrl = this.identityPool.getProxyUrl(identity);
// Update stored fingerprint from identity
this.storedFingerprint = {
timezone: identity.fingerprint.timezone,
city: identity.city || undefined,
state: identity.state_code,
ip: identity.ip_address || undefined,
locale: identity.fingerprint.locale,
};
this.storedTimezone = identity.fingerprint.timezone;
console.log(`[TaskWorker] ${this.friendlyName} claimed identity #${identity.id}: ${identity.city || 'any'}, ${identity.state_code} (max ${this.identityMaxTasks} tasks)`);
console.log(`[TaskWorker] ${this.friendlyName} fingerprint: ${identity.fingerprint.browser} on ${identity.fingerprint.os}, ${identity.fingerprint.device}`);
// Run preflight with this identity's proxy
const preflightPassed = await this.runIdentityPreflight();
if (!preflightPassed) {
console.error(`[TaskWorker] ${this.friendlyName} preflight failed for identity #${identity.id}`);
await this.identityPool.releaseIdentity(identity.id, 0, true); // Release as failed
this.currentIdentity = null;
this.identityProxyUrl = null;
return false;
}
return true;
} catch (err: any) {
console.error(`[TaskWorker] ${this.friendlyName} ensureIdentity error: ${err.message}`);
return false;
}
}
/**
* Run preflight with current identity's proxy
*/
private async runIdentityPreflight(): Promise<boolean> {
if (!this.currentIdentity || !this.identityProxyUrl) {
return false;
}
console.log(`[TaskWorker] ${this.friendlyName} running preflight for identity #${this.currentIdentity.id}...`);
console.log(`[TaskWorker] ${this.friendlyName} using identity proxy URL for ${this.currentIdentity.state_code}/${this.currentIdentity.city || 'any'}`);
try {
// Use puppeteer preflight with identity's specific proxy URL
const result = await runPuppeteerPreflightWithRetry(this.crawlRotator, 1, this.identityProxyUrl);
if (result.passed) {
console.log(`[TaskWorker] ${this.friendlyName} identity preflight PASSED (IP: ${result.proxyIp})`);
// Update identity IP if we got it
if (result.proxyIp && this.currentIdentity) {
await this.pool.query(`
UPDATE worker_identities SET ip_address = $2 WHERE id = $1
`, [this.currentIdentity.id, result.proxyIp]);
}
this.preflightHttpPassed = true;
this.preflightHttpResult = result;
return true;
} else {
console.error(`[TaskWorker] ${this.friendlyName} identity preflight FAILED: ${result.error}`);
return false;
}
} catch (err: any) {
console.error(`[TaskWorker] ${this.friendlyName} identity preflight error: ${err.message}`);
return false;
}
}
/**
* Release current identity back to pool with cooldown
*/
private async releaseCurrentIdentity(): Promise<void> {
if (!this.currentIdentity || !this.identityPool) {
return;
}
console.log(`[TaskWorker] ${this.friendlyName} releasing identity #${this.currentIdentity.id} (${this.identityTasksCompleted} tasks completed)`);
await this.identityPool.releaseIdentity(
this.currentIdentity.id,
this.identityTasksCompleted,
false // Not failed
);
this.currentIdentity = null;
this.identityTasksCompleted = 0;
this.identityProxyUrl = null;
this.preflightHttpPassed = false; // Need new preflight for next identity
}
/**
* Increment task count for current identity.
* Call this after each successful task completion.
*/
private incrementIdentityTaskCount(): void {
if (this.currentIdentity) {
this.identityTasksCompleted++;
console.log(`[TaskWorker] ${this.friendlyName} identity #${this.currentIdentity.id} task ${this.identityTasksCompleted}/${this.identityMaxTasks}`);
}
}
/**
* Get the effective max concurrent tasks based on working hours.
* Uses the worker's timezone (from preflight IP geolocation) to determine
@@ -1288,18 +1486,40 @@ export class TaskWorker {
}
// =================================================================
// GEO SESSION GATE - Ensure worker has valid geo assignment
// Worker must have a state assignment to claim tasks.
// Session = 60 min OR 7 stores, whichever comes first.
// If no valid session, assign one based on demand.
// GEO SESSION / IDENTITY GATE
// Worker must have a geo assignment or identity to claim tasks.
//
// USE_IDENTITY_POOL=true (new system):
// - Worker claims identity (IP + fingerprint) from pool
// - Each identity used for 3-5 tasks, then 2-3 hour cooldown
// - Creates diverse, natural browsing patterns
//
// USE_IDENTITY_POOL=false (legacy system):
// - Session = 60 min OR 7 stores, whichever comes first
// - State assigned based on demand
// =================================================================
const geoValid = await this.ensureGeoSession();
let geoValid: boolean;
if (USE_IDENTITY_POOL) {
geoValid = await this.ensureIdentity();
if (!geoValid) {
console.log(`[TaskWorker] ${this.friendlyName} no identity available, waiting...`);
await this.sleep(30000);
return;
}
// Update geoState for task claiming filter
if (this.currentIdentity) {
this.geoState = this.currentIdentity.state_code;
this.geoCity = this.currentIdentity.city;
}
} else {
geoValid = await this.ensureGeoSession();
if (!geoValid) {
// No tasks available in any state, or assignment failed
console.log(`[TaskWorker] ${this.friendlyName} no geo session available, waiting...`);
await this.sleep(30000);
return;
}
}
// =================================================================
// WORKING HOURS GATE - Simulate natural traffic patterns
@@ -1450,6 +1670,11 @@ export class TaskWorker {
await this.reportTaskCompletion(true);
console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id} [${this.activeTasks.size}/${this.maxConcurrentTasks} active]`);
// Track identity task count (for identity pool rotation)
if (USE_IDENTITY_POOL) {
this.incrementIdentityTaskCount();
}
// Chain next task if applicable
const chainedTask = await taskService.chainNextTask({
...task,