-- Migration 059: Add missing columns to dispensary_crawl_jobs -- -- Required for worker job processing: -- - max_retries: Maximum retry attempts for a job -- - retry_count: Current retry count -- - worker_id: ID of worker processing the job -- - locked_at: When the job was locked by a worker -- - locked_by: Hostname of worker that locked the job -- ============================================================ -- 1. ADD JOB QUEUE COLUMNS -- ============================================================ ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS max_retries INTEGER DEFAULT 3, ADD COLUMN IF NOT EXISTS retry_count INTEGER DEFAULT 0, ADD COLUMN IF NOT EXISTS worker_id VARCHAR(100), ADD COLUMN IF NOT EXISTS locked_at TIMESTAMPTZ, ADD COLUMN IF NOT EXISTS locked_by VARCHAR(100), ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ DEFAULT NOW(); COMMENT ON COLUMN dispensary_crawl_jobs.max_retries IS 'Maximum number of retry attempts'; COMMENT ON COLUMN dispensary_crawl_jobs.retry_count IS 'Current retry count'; COMMENT ON COLUMN dispensary_crawl_jobs.worker_id IS 'ID of worker processing this job'; COMMENT ON COLUMN dispensary_crawl_jobs.locked_at IS 'When job was locked by worker'; COMMENT ON COLUMN dispensary_crawl_jobs.locked_by IS 'Hostname of worker that locked job'; -- ============================================================ -- 2. CREATE INDEXES FOR JOB QUEUE QUERIES -- ============================================================ CREATE INDEX IF NOT EXISTS idx_crawl_jobs_status_priority ON dispensary_crawl_jobs(status, priority DESC) WHERE status = 'pending'; CREATE INDEX IF NOT EXISTS idx_crawl_jobs_worker_id ON dispensary_crawl_jobs(worker_id) WHERE worker_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_crawl_jobs_locked_at ON dispensary_crawl_jobs(locked_at) WHERE locked_at IS NOT NULL; -- ============================================================ -- 3. CREATE QUEUE STATS VIEW -- ============================================================ CREATE OR REPLACE VIEW v_queue_stats AS SELECT COUNT(*) FILTER (WHERE status = 'pending') AS pending_jobs, COUNT(*) FILTER (WHERE status = 'running') AS running_jobs, COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') AS completed_1h, COUNT(*) FILTER (WHERE status = 'failed' AND completed_at > NOW() - INTERVAL '1 hour') AS failed_1h, COUNT(DISTINCT worker_id) FILTER (WHERE status = 'running') AS active_workers, ROUND((AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour'))::numeric, 2) AS avg_duration_seconds FROM dispensary_crawl_jobs; COMMENT ON VIEW v_queue_stats IS 'Real-time queue statistics for monitoring dashboard'; -- ============================================================ -- 4. RECORD MIGRATION -- ============================================================ INSERT INTO schema_migrations (version, name, applied_at) VALUES (59, '059_job_queue_columns', NOW()) ON CONFLICT (version) DO NOTHING;