## Worker System - Role-agnostic workers that can handle any task type - Pod-based architecture with StatefulSet (5-15 pods, 5 workers each) - Custom pod names (Aethelgard, Xylos, Kryll, etc.) - Worker registry with friendly names and resource monitoring - Hub-and-spoke visualization on JobQueue page ## Stealth & Anti-Detection (REQUIRED) - Proxies are MANDATORY - workers fail to start without active proxies - CrawlRotator initializes on worker startup - Loads proxies from `proxies` table - Auto-rotates proxy + fingerprint on 403 errors - 12 browser fingerprints (Chrome, Firefox, Safari, Edge) - Locale/timezone matching for geographic consistency ## Task System - Renamed product_resync → product_refresh - Task chaining: store_discovery → entry_point → product_discovery - Priority-based claiming with FOR UPDATE SKIP LOCKED - Heartbeat and stale task recovery ## UI Updates - JobQueue: Pod visualization, resource monitoring on hover - WorkersDashboard: Simplified worker list - Removed unused filters from task list ## Other - IP2Location service for visitor analytics - Findagram consumer features scaffolding - Documentation updates 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
142 lines
4.8 KiB
PL/PgSQL
142 lines
4.8 KiB
PL/PgSQL
-- Migration 076: Worker Registry for Dynamic Workers
|
|
-- Workers register on startup, receive a friendly name, and report heartbeats
|
|
|
|
-- Name pool for workers (expandable, no hardcoding)
|
|
CREATE TABLE IF NOT EXISTS worker_name_pool (
|
|
id SERIAL PRIMARY KEY,
|
|
name VARCHAR(50) UNIQUE NOT NULL,
|
|
in_use BOOLEAN DEFAULT FALSE,
|
|
assigned_to VARCHAR(100), -- worker_id
|
|
assigned_at TIMESTAMPTZ,
|
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
-- Seed with initial names (can add more via API)
|
|
INSERT INTO worker_name_pool (name) VALUES
|
|
('Alice'), ('Bella'), ('Clara'), ('Diana'), ('Elena'),
|
|
('Fiona'), ('Grace'), ('Hazel'), ('Iris'), ('Julia'),
|
|
('Katie'), ('Luna'), ('Mia'), ('Nora'), ('Olive'),
|
|
('Pearl'), ('Quinn'), ('Rosa'), ('Sara'), ('Tara'),
|
|
('Uma'), ('Vera'), ('Wendy'), ('Xena'), ('Yuki'), ('Zara'),
|
|
('Amber'), ('Blake'), ('Coral'), ('Dawn'), ('Echo'),
|
|
('Fleur'), ('Gem'), ('Haven'), ('Ivy'), ('Jade'),
|
|
('Kira'), ('Lotus'), ('Maple'), ('Nova'), ('Onyx'),
|
|
('Pixel'), ('Quest'), ('Raven'), ('Sage'), ('Terra'),
|
|
('Unity'), ('Violet'), ('Willow'), ('Xylo'), ('Yara'), ('Zen')
|
|
ON CONFLICT (name) DO NOTHING;
|
|
|
|
-- Worker registry - tracks active workers
|
|
CREATE TABLE IF NOT EXISTS worker_registry (
|
|
id SERIAL PRIMARY KEY,
|
|
worker_id VARCHAR(100) UNIQUE NOT NULL, -- e.g., "pod-abc123" or uuid
|
|
friendly_name VARCHAR(50), -- assigned from pool
|
|
role VARCHAR(50) NOT NULL, -- task role
|
|
pod_name VARCHAR(100), -- k8s pod name
|
|
hostname VARCHAR(100), -- machine hostname
|
|
ip_address VARCHAR(50), -- worker IP
|
|
status VARCHAR(20) DEFAULT 'starting', -- starting, active, idle, offline, terminated
|
|
started_at TIMESTAMPTZ DEFAULT NOW(),
|
|
last_heartbeat_at TIMESTAMPTZ DEFAULT NOW(),
|
|
last_task_at TIMESTAMPTZ,
|
|
tasks_completed INTEGER DEFAULT 0,
|
|
tasks_failed INTEGER DEFAULT 0,
|
|
current_task_id INTEGER,
|
|
metadata JSONB DEFAULT '{}',
|
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
-- Indexes for worker registry
|
|
CREATE INDEX IF NOT EXISTS idx_worker_registry_status ON worker_registry(status);
|
|
CREATE INDEX IF NOT EXISTS idx_worker_registry_role ON worker_registry(role);
|
|
CREATE INDEX IF NOT EXISTS idx_worker_registry_heartbeat ON worker_registry(last_heartbeat_at);
|
|
|
|
-- Function to assign a name to a new worker
|
|
CREATE OR REPLACE FUNCTION assign_worker_name(p_worker_id VARCHAR(100))
|
|
RETURNS VARCHAR(50) AS $$
|
|
DECLARE
|
|
v_name VARCHAR(50);
|
|
BEGIN
|
|
-- Try to get an unused name
|
|
UPDATE worker_name_pool
|
|
SET in_use = TRUE, assigned_to = p_worker_id, assigned_at = NOW()
|
|
WHERE id = (
|
|
SELECT id FROM worker_name_pool
|
|
WHERE in_use = FALSE
|
|
ORDER BY RANDOM()
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING name INTO v_name;
|
|
|
|
-- If no names available, generate one
|
|
IF v_name IS NULL THEN
|
|
v_name := 'Worker-' || SUBSTRING(p_worker_id FROM 1 FOR 8);
|
|
END IF;
|
|
|
|
RETURN v_name;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function to release a worker's name back to the pool
|
|
CREATE OR REPLACE FUNCTION release_worker_name(p_worker_id VARCHAR(100))
|
|
RETURNS VOID AS $$
|
|
BEGIN
|
|
UPDATE worker_name_pool
|
|
SET in_use = FALSE, assigned_to = NULL, assigned_at = NULL
|
|
WHERE assigned_to = p_worker_id;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function to mark stale workers as offline
|
|
CREATE OR REPLACE FUNCTION mark_stale_workers(stale_threshold_minutes INTEGER DEFAULT 5)
|
|
RETURNS INTEGER AS $$
|
|
DECLARE
|
|
v_count INTEGER;
|
|
BEGIN
|
|
UPDATE worker_registry
|
|
SET status = 'offline', updated_at = NOW()
|
|
WHERE status IN ('active', 'idle', 'starting')
|
|
AND last_heartbeat_at < NOW() - (stale_threshold_minutes || ' minutes')::INTERVAL
|
|
RETURNING COUNT(*) INTO v_count;
|
|
|
|
-- Release names from offline workers
|
|
PERFORM release_worker_name(worker_id)
|
|
FROM worker_registry
|
|
WHERE status = 'offline'
|
|
AND last_heartbeat_at < NOW() - INTERVAL '30 minutes';
|
|
|
|
RETURN COALESCE(v_count, 0);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- View for dashboard
|
|
CREATE OR REPLACE VIEW v_active_workers AS
|
|
SELECT
|
|
wr.id,
|
|
wr.worker_id,
|
|
wr.friendly_name,
|
|
wr.role,
|
|
wr.status,
|
|
wr.pod_name,
|
|
wr.hostname,
|
|
wr.started_at,
|
|
wr.last_heartbeat_at,
|
|
wr.last_task_at,
|
|
wr.tasks_completed,
|
|
wr.tasks_failed,
|
|
wr.current_task_id,
|
|
EXTRACT(EPOCH FROM (NOW() - wr.last_heartbeat_at)) as seconds_since_heartbeat,
|
|
CASE
|
|
WHEN wr.status = 'offline' THEN 'offline'
|
|
WHEN wr.last_heartbeat_at < NOW() - INTERVAL '2 minutes' THEN 'stale'
|
|
WHEN wr.current_task_id IS NOT NULL THEN 'busy'
|
|
ELSE 'ready'
|
|
END as health_status
|
|
FROM worker_registry wr
|
|
WHERE wr.status != 'terminated'
|
|
ORDER BY wr.status = 'active' DESC, wr.last_heartbeat_at DESC;
|
|
|
|
COMMENT ON TABLE worker_registry IS 'Tracks all workers that have registered with the system';
|
|
COMMENT ON TABLE worker_name_pool IS 'Pool of friendly names for workers - expandable via API';
|