-- Migration: Add job queue claiming/locking fields to dispensary_crawl_jobs -- This enables multiple workers to claim and process jobs without conflicts -- Add claiming fields ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS claimed_by VARCHAR(100); ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS claimed_at TIMESTAMPTZ; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS max_retries INTEGER DEFAULT 3; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS retry_count INTEGER DEFAULT 0; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS locked_until TIMESTAMPTZ; -- Add worker tracking ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS worker_hostname VARCHAR(255); -- Add progress tracking for live monitoring ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS products_upserted INTEGER DEFAULT 0; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS snapshots_created INTEGER DEFAULT 0; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS current_page INTEGER DEFAULT 0; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS total_pages INTEGER; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS last_heartbeat_at TIMESTAMPTZ; -- Create index for queue polling (pending jobs ordered by priority and created_at) CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_queue ON dispensary_crawl_jobs(status, priority DESC, created_at ASC) WHERE status = 'pending'; -- Create index for worker claiming (to prevent double-claims on same dispensary) CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_dispensary_active ON dispensary_crawl_jobs(dispensary_id, status) WHERE status IN ('pending', 'running'); -- Create index for claimed_by lookup CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_claimed_by ON dispensary_crawl_jobs(claimed_by) WHERE claimed_by IS NOT NULL; -- Create index for heartbeat monitoring (stale workers) CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_heartbeat ON dispensary_crawl_jobs(last_heartbeat_at) WHERE status = 'running'; -- Add worker_id to job_run_logs for tracking which worker ran scheduled jobs ALTER TABLE job_run_logs ADD COLUMN IF NOT EXISTS worker_id VARCHAR(100); ALTER TABLE job_run_logs ADD COLUMN IF NOT EXISTS worker_hostname VARCHAR(255); -- Create a view for queue stats CREATE OR REPLACE VIEW v_queue_stats AS SELECT COUNT(*) FILTER (WHERE status = 'pending') as pending_jobs, COUNT(*) FILTER (WHERE status = 'running') as running_jobs, COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') as completed_1h, COUNT(*) FILTER (WHERE status = 'failed' AND completed_at > NOW() - INTERVAL '1 hour') as failed_1h, COUNT(DISTINCT claimed_by) FILTER (WHERE status = 'running') as active_workers, AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') as avg_duration_seconds FROM dispensary_crawl_jobs; -- Create a view for active workers CREATE OR REPLACE VIEW v_active_workers AS SELECT claimed_by as worker_id, worker_hostname, COUNT(*) as current_jobs, SUM(products_found) as total_products_found, SUM(products_upserted) as total_products_upserted, SUM(snapshots_created) as total_snapshots, MIN(claimed_at) as first_claimed_at, MAX(last_heartbeat_at) as last_heartbeat FROM dispensary_crawl_jobs WHERE status = 'running' AND claimed_by IS NOT NULL GROUP BY claimed_by, worker_hostname;