-- Migration 074: Worker Task Queue System -- Implements role-based task queue with per-store locking and capacity tracking -- Task queue table CREATE TABLE IF NOT EXISTS worker_tasks ( id SERIAL PRIMARY KEY, -- Task identification role VARCHAR(50) NOT NULL, -- store_discovery, entry_point_discovery, product_discovery, product_resync, analytics_refresh dispensary_id INTEGER REFERENCES dispensaries(id) ON DELETE CASCADE, platform VARCHAR(20), -- dutchie, jane, treez, etc. -- Task state status VARCHAR(20) NOT NULL DEFAULT 'pending', priority INTEGER DEFAULT 0, -- Higher = more urgent -- Scheduling scheduled_for TIMESTAMPTZ, -- For batch scheduling (e.g., every 4 hours) -- Ownership worker_id VARCHAR(100), -- Pod name or worker ID claimed_at TIMESTAMPTZ, started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, last_heartbeat_at TIMESTAMPTZ, -- Results result JSONB, -- Task output data error_message TEXT, retry_count INTEGER DEFAULT 0, max_retries INTEGER DEFAULT 3, -- Metadata created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), -- Constraints CONSTRAINT valid_status CHECK (status IN ('pending', 'claimed', 'running', 'completed', 'failed', 'stale')) ); -- Indexes for efficient task claiming CREATE INDEX IF NOT EXISTS idx_worker_tasks_pending ON worker_tasks(role, priority DESC, created_at ASC) WHERE status = 'pending'; CREATE INDEX IF NOT EXISTS idx_worker_tasks_claimed ON worker_tasks(worker_id, claimed_at) WHERE status = 'claimed'; CREATE INDEX IF NOT EXISTS idx_worker_tasks_running ON worker_tasks(worker_id, last_heartbeat_at) WHERE status = 'running'; CREATE INDEX IF NOT EXISTS idx_worker_tasks_dispensary ON worker_tasks(dispensary_id) WHERE dispensary_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_worker_tasks_scheduled ON worker_tasks(scheduled_for) WHERE status = 'pending' AND scheduled_for IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_worker_tasks_history ON worker_tasks(role, completed_at DESC) WHERE status IN ('completed', 'failed'); -- Partial unique index to prevent duplicate active tasks per store -- Only one task can be claimed/running for a given dispensary at a time CREATE UNIQUE INDEX IF NOT EXISTS idx_worker_tasks_unique_active_store ON worker_tasks(dispensary_id) WHERE status IN ('claimed', 'running') AND dispensary_id IS NOT NULL; -- Worker registration table (tracks active workers) CREATE TABLE IF NOT EXISTS worker_registry ( id SERIAL PRIMARY KEY, worker_id VARCHAR(100) UNIQUE NOT NULL, role VARCHAR(50) NOT NULL, pod_name VARCHAR(100), hostname VARCHAR(100), started_at TIMESTAMPTZ DEFAULT NOW(), last_heartbeat_at TIMESTAMPTZ DEFAULT NOW(), tasks_completed INTEGER DEFAULT 0, tasks_failed INTEGER DEFAULT 0, status VARCHAR(20) DEFAULT 'active', CONSTRAINT valid_worker_status CHECK (status IN ('active', 'idle', 'offline')) ); CREATE INDEX IF NOT EXISTS idx_worker_registry_role ON worker_registry(role, status); CREATE INDEX IF NOT EXISTS idx_worker_registry_heartbeat ON worker_registry(last_heartbeat_at) WHERE status = 'active'; -- Task completion tracking (summarized history) CREATE TABLE IF NOT EXISTS task_completion_log ( id SERIAL PRIMARY KEY, role VARCHAR(50) NOT NULL, date DATE NOT NULL DEFAULT CURRENT_DATE, hour INTEGER NOT NULL DEFAULT EXTRACT(HOUR FROM NOW()), tasks_created INTEGER DEFAULT 0, tasks_completed INTEGER DEFAULT 0, tasks_failed INTEGER DEFAULT 0, avg_duration_sec NUMERIC(10,2), min_duration_sec NUMERIC(10,2), max_duration_sec NUMERIC(10,2), updated_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE(role, date, hour) ); -- Capacity planning view CREATE OR REPLACE VIEW v_worker_capacity AS SELECT role, COUNT(*) FILTER (WHERE status = 'pending') as pending_tasks, COUNT(*) FILTER (WHERE status = 'pending' AND (scheduled_for IS NULL OR scheduled_for <= NOW())) as ready_tasks, COUNT(*) FILTER (WHERE status = 'claimed') as claimed_tasks, COUNT(*) FILTER (WHERE status = 'running') as running_tasks, COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') as completed_last_hour, COUNT(*) FILTER (WHERE status = 'failed' AND completed_at > NOW() - INTERVAL '1 hour') as failed_last_hour, COUNT(DISTINCT worker_id) FILTER (WHERE status IN ('claimed', 'running')) as active_workers, AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') as avg_duration_sec, -- Capacity planning metrics CASE WHEN COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') > 0 THEN 3600.0 / NULLIF(AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour'), 0) ELSE NULL END as tasks_per_worker_hour, -- Estimated time to drain queue CASE WHEN COUNT(DISTINCT worker_id) FILTER (WHERE status IN ('claimed', 'running')) > 0 AND COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') > 0 THEN COUNT(*) FILTER (WHERE status = 'pending') / NULLIF( COUNT(DISTINCT worker_id) FILTER (WHERE status IN ('claimed', 'running')) * (3600.0 / NULLIF(AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour'), 0)), 0 ) ELSE NULL END as estimated_hours_to_drain FROM worker_tasks GROUP BY role; -- Task history view (for UI) CREATE OR REPLACE VIEW v_task_history AS SELECT t.id, t.role, t.dispensary_id, d.name as dispensary_name, t.platform, t.status, t.priority, t.worker_id, t.scheduled_for, t.claimed_at, t.started_at, t.completed_at, t.error_message, t.retry_count, t.created_at, EXTRACT(EPOCH FROM (t.completed_at - t.started_at)) as duration_sec FROM worker_tasks t LEFT JOIN dispensaries d ON d.id = t.dispensary_id ORDER BY t.created_at DESC; -- Function to claim a task atomically CREATE OR REPLACE FUNCTION claim_task( p_role VARCHAR(50), p_worker_id VARCHAR(100) ) RETURNS worker_tasks AS $$ DECLARE claimed_task worker_tasks; BEGIN UPDATE worker_tasks SET status = 'claimed', worker_id = p_worker_id, claimed_at = NOW(), updated_at = NOW() WHERE id = ( SELECT id FROM worker_tasks WHERE role = p_role AND status = 'pending' AND (scheduled_for IS NULL OR scheduled_for <= NOW()) -- Exclude stores that already have an active task AND (dispensary_id IS NULL OR dispensary_id NOT IN ( SELECT dispensary_id FROM worker_tasks WHERE status IN ('claimed', 'running') AND dispensary_id IS NOT NULL )) ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING * INTO claimed_task; RETURN claimed_task; END; $$ LANGUAGE plpgsql; -- Function to mark stale tasks (workers that died) CREATE OR REPLACE FUNCTION recover_stale_tasks( stale_threshold_minutes INTEGER DEFAULT 10 ) RETURNS INTEGER AS $$ DECLARE recovered_count INTEGER; BEGIN WITH stale AS ( UPDATE worker_tasks SET status = 'pending', worker_id = NULL, claimed_at = NULL, started_at = NULL, retry_count = retry_count + 1, updated_at = NOW() WHERE status IN ('claimed', 'running') AND last_heartbeat_at < NOW() - (stale_threshold_minutes || ' minutes')::INTERVAL AND retry_count < max_retries RETURNING id ) SELECT COUNT(*) INTO recovered_count FROM stale; -- Mark tasks that exceeded retries as failed UPDATE worker_tasks SET status = 'failed', error_message = 'Exceeded max retries after worker failures', completed_at = NOW(), updated_at = NOW() WHERE status IN ('claimed', 'running') AND last_heartbeat_at < NOW() - (stale_threshold_minutes || ' minutes')::INTERVAL AND retry_count >= max_retries; RETURN recovered_count; END; $$ LANGUAGE plpgsql; -- Function to generate daily resync tasks CREATE OR REPLACE FUNCTION generate_resync_tasks( p_batches_per_day INTEGER DEFAULT 6, -- Every 4 hours p_date DATE DEFAULT CURRENT_DATE ) RETURNS INTEGER AS $$ DECLARE store_count INTEGER; stores_per_batch INTEGER; batch_num INTEGER; scheduled_time TIMESTAMPTZ; created_count INTEGER := 0; BEGIN -- Count active stores that need resync SELECT COUNT(*) INTO store_count FROM dispensaries WHERE crawl_enabled = true AND menu_type = 'dutchie' AND platform_dispensary_id IS NOT NULL; IF store_count = 0 THEN RETURN 0; END IF; stores_per_batch := CEIL(store_count::NUMERIC / p_batches_per_day); FOR batch_num IN 0..(p_batches_per_day - 1) LOOP scheduled_time := p_date + (batch_num * 4 || ' hours')::INTERVAL; INSERT INTO worker_tasks (role, dispensary_id, platform, scheduled_for, priority) SELECT 'product_resync', d.id, 'dutchie', scheduled_time, 0 FROM ( SELECT id, ROW_NUMBER() OVER (ORDER BY id) as rn FROM dispensaries WHERE crawl_enabled = true AND menu_type = 'dutchie' AND platform_dispensary_id IS NOT NULL ) d WHERE d.rn > (batch_num * stores_per_batch) AND d.rn <= ((batch_num + 1) * stores_per_batch) ON CONFLICT DO NOTHING; GET DIAGNOSTICS created_count = created_count + ROW_COUNT; END LOOP; RETURN created_count; END; $$ LANGUAGE plpgsql; -- Trigger to update timestamp CREATE OR REPLACE FUNCTION update_worker_tasks_timestamp() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = NOW(); RETURN NEW; END; $$ LANGUAGE plpgsql; DROP TRIGGER IF EXISTS worker_tasks_updated_at ON worker_tasks; CREATE TRIGGER worker_tasks_updated_at BEFORE UPDATE ON worker_tasks FOR EACH ROW EXECUTE FUNCTION update_worker_tasks_timestamp(); -- Comments COMMENT ON TABLE worker_tasks IS 'Central task queue for all worker roles'; COMMENT ON TABLE worker_registry IS 'Registry of active workers and their stats'; COMMENT ON TABLE task_completion_log IS 'Hourly aggregated task completion metrics'; COMMENT ON VIEW v_worker_capacity IS 'Real-time capacity planning metrics per role'; COMMENT ON VIEW v_task_history IS 'Task history with dispensary details for UI'; COMMENT ON FUNCTION claim_task IS 'Atomically claim a task for a worker, respecting per-store locking'; COMMENT ON FUNCTION recover_stale_tasks IS 'Release tasks from dead workers back to pending'; COMMENT ON FUNCTION generate_resync_tasks IS 'Generate daily product resync tasks in batches';