Files
cannaiq/backend/migrations/033_job_queue_claiming.sql
Kelly 66e07b2009 fix(monitor): remove non-existent worker columns from job_run_logs query
The job_run_logs table tracks scheduled job orchestration, not individual
worker jobs. Worker info (worker_id, worker_hostname) belongs on
dispensary_crawl_jobs, not job_run_logs.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 18:45:05 -07:00

68 lines
3.4 KiB
SQL

-- Migration: Add job queue claiming/locking fields to dispensary_crawl_jobs
-- This enables multiple workers to claim and process jobs without conflicts
-- Add claiming fields
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS claimed_by VARCHAR(100);
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS claimed_at TIMESTAMPTZ;
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS max_retries INTEGER DEFAULT 3;
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS retry_count INTEGER DEFAULT 0;
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS locked_until TIMESTAMPTZ;
-- Add worker tracking
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS worker_hostname VARCHAR(255);
-- Add progress tracking for live monitoring
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS products_upserted INTEGER DEFAULT 0;
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS snapshots_created INTEGER DEFAULT 0;
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS current_page INTEGER DEFAULT 0;
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS total_pages INTEGER;
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS last_heartbeat_at TIMESTAMPTZ;
-- Create index for queue polling (pending jobs ordered by priority and created_at)
CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_queue
ON dispensary_crawl_jobs(status, priority DESC, created_at ASC)
WHERE status = 'pending';
-- Create index for worker claiming (to prevent double-claims on same dispensary)
CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_dispensary_active
ON dispensary_crawl_jobs(dispensary_id, status)
WHERE status IN ('pending', 'running');
-- Create index for claimed_by lookup
CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_claimed_by
ON dispensary_crawl_jobs(claimed_by) WHERE claimed_by IS NOT NULL;
-- Create index for heartbeat monitoring (stale workers)
CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_heartbeat
ON dispensary_crawl_jobs(last_heartbeat_at) WHERE status = 'running';
-- Add worker_id to job_run_logs for tracking which worker ran scheduled jobs
ALTER TABLE job_run_logs ADD COLUMN IF NOT EXISTS worker_id VARCHAR(100);
ALTER TABLE job_run_logs ADD COLUMN IF NOT EXISTS worker_hostname VARCHAR(255);
-- Create a view for queue stats
CREATE OR REPLACE VIEW v_queue_stats AS
SELECT
COUNT(*) FILTER (WHERE status = 'pending') as pending_jobs,
COUNT(*) FILTER (WHERE status = 'running') as running_jobs,
COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') as completed_1h,
COUNT(*) FILTER (WHERE status = 'failed' AND completed_at > NOW() - INTERVAL '1 hour') as failed_1h,
COUNT(DISTINCT claimed_by) FILTER (WHERE status = 'running') as active_workers,
AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') as avg_duration_seconds
FROM dispensary_crawl_jobs;
-- Create a view for active workers
CREATE OR REPLACE VIEW v_active_workers AS
SELECT
claimed_by as worker_id,
worker_hostname,
COUNT(*) as current_jobs,
SUM(products_found) as total_products_found,
SUM(products_upserted) as total_products_upserted,
SUM(snapshots_created) as total_snapshots,
MIN(claimed_at) as first_claimed_at,
MAX(last_heartbeat_at) as last_heartbeat
FROM dispensary_crawl_jobs
WHERE status = 'running' AND claimed_by IS NOT NULL
GROUP BY claimed_by, worker_hostname;