The job_run_logs table tracks scheduled job orchestration, not individual worker jobs. Worker info (worker_id, worker_hostname) belongs on dispensary_crawl_jobs, not job_run_logs. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
68 lines
3.4 KiB
SQL
68 lines
3.4 KiB
SQL
-- Migration: Add job queue claiming/locking fields to dispensary_crawl_jobs
|
|
-- This enables multiple workers to claim and process jobs without conflicts
|
|
|
|
-- Add claiming fields
|
|
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS claimed_by VARCHAR(100);
|
|
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS claimed_at TIMESTAMPTZ;
|
|
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS max_retries INTEGER DEFAULT 3;
|
|
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS retry_count INTEGER DEFAULT 0;
|
|
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS locked_until TIMESTAMPTZ;
|
|
|
|
-- Add worker tracking
|
|
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS worker_hostname VARCHAR(255);
|
|
|
|
-- Add progress tracking for live monitoring
|
|
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS products_upserted INTEGER DEFAULT 0;
|
|
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS snapshots_created INTEGER DEFAULT 0;
|
|
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS current_page INTEGER DEFAULT 0;
|
|
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS total_pages INTEGER;
|
|
ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS last_heartbeat_at TIMESTAMPTZ;
|
|
|
|
-- Create index for queue polling (pending jobs ordered by priority and created_at)
|
|
CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_queue
|
|
ON dispensary_crawl_jobs(status, priority DESC, created_at ASC)
|
|
WHERE status = 'pending';
|
|
|
|
-- Create index for worker claiming (to prevent double-claims on same dispensary)
|
|
CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_dispensary_active
|
|
ON dispensary_crawl_jobs(dispensary_id, status)
|
|
WHERE status IN ('pending', 'running');
|
|
|
|
-- Create index for claimed_by lookup
|
|
CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_claimed_by
|
|
ON dispensary_crawl_jobs(claimed_by) WHERE claimed_by IS NOT NULL;
|
|
|
|
-- Create index for heartbeat monitoring (stale workers)
|
|
CREATE INDEX IF NOT EXISTS idx_dispensary_crawl_jobs_heartbeat
|
|
ON dispensary_crawl_jobs(last_heartbeat_at) WHERE status = 'running';
|
|
|
|
-- Add worker_id to job_run_logs for tracking which worker ran scheduled jobs
|
|
ALTER TABLE job_run_logs ADD COLUMN IF NOT EXISTS worker_id VARCHAR(100);
|
|
ALTER TABLE job_run_logs ADD COLUMN IF NOT EXISTS worker_hostname VARCHAR(255);
|
|
|
|
-- Create a view for queue stats
|
|
CREATE OR REPLACE VIEW v_queue_stats AS
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE status = 'pending') as pending_jobs,
|
|
COUNT(*) FILTER (WHERE status = 'running') as running_jobs,
|
|
COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') as completed_1h,
|
|
COUNT(*) FILTER (WHERE status = 'failed' AND completed_at > NOW() - INTERVAL '1 hour') as failed_1h,
|
|
COUNT(DISTINCT claimed_by) FILTER (WHERE status = 'running') as active_workers,
|
|
AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') as avg_duration_seconds
|
|
FROM dispensary_crawl_jobs;
|
|
|
|
-- Create a view for active workers
|
|
CREATE OR REPLACE VIEW v_active_workers AS
|
|
SELECT
|
|
claimed_by as worker_id,
|
|
worker_hostname,
|
|
COUNT(*) as current_jobs,
|
|
SUM(products_found) as total_products_found,
|
|
SUM(products_upserted) as total_products_upserted,
|
|
SUM(snapshots_created) as total_snapshots,
|
|
MIN(claimed_at) as first_claimed_at,
|
|
MAX(last_heartbeat_at) as last_heartbeat
|
|
FROM dispensary_crawl_jobs
|
|
WHERE status = 'running' AND claimed_by IS NOT NULL
|
|
GROUP BY claimed_by, worker_hostname;
|