Files
cannaiq/backend/migrations/059_job_queue_columns.sql
Kelly 8ac64ba077 feat(cannaiq): Add Workers Dashboard and visibility tracking
Workers Dashboard:
- New /workers route with two-pane layout
- Workers table showing Alice, Henry, Bella, Oscar with role badges
- Run history with visibility stats (lost/restored counts)
- "Run Now" action to trigger workers immediately

Migrations:
- 057: Add visibility tracking columns (visibility_lost, visibility_lost_at, visibility_restored_at)
- 058: Add ID resolution columns for Henry worker
- 059: Add job queue columns (max_retries, retry_count, worker_id, locked_at, locked_by)

Backend fixes:
- Add httpStatus to CrawlResult interface for error classification
- Fix pool.ts typing for event listener
- Update completeJob to accept visibility stats in metadata

Frontend fixes:
- Fix NationalDashboard crash with safe formatMoney helper
- Fix OrchestratorDashboard/Stores StoreInfo type mismatches
- Add workerName/workerRole to getDutchieAZSchedules API type

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-07 11:04:12 -07:00

68 lines
3.0 KiB
SQL

-- Migration 059: Add missing columns to dispensary_crawl_jobs
--
-- Required for worker job processing:
-- - max_retries: Maximum retry attempts for a job
-- - retry_count: Current retry count
-- - worker_id: ID of worker processing the job
-- - locked_at: When the job was locked by a worker
-- - locked_by: Hostname of worker that locked the job
-- ============================================================
-- 1. ADD JOB QUEUE COLUMNS
-- ============================================================
ALTER TABLE dispensary_crawl_jobs
ADD COLUMN IF NOT EXISTS max_retries INTEGER DEFAULT 3,
ADD COLUMN IF NOT EXISTS retry_count INTEGER DEFAULT 0,
ADD COLUMN IF NOT EXISTS worker_id VARCHAR(100),
ADD COLUMN IF NOT EXISTS locked_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS locked_by VARCHAR(100),
ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ DEFAULT NOW();
COMMENT ON COLUMN dispensary_crawl_jobs.max_retries IS 'Maximum number of retry attempts';
COMMENT ON COLUMN dispensary_crawl_jobs.retry_count IS 'Current retry count';
COMMENT ON COLUMN dispensary_crawl_jobs.worker_id IS 'ID of worker processing this job';
COMMENT ON COLUMN dispensary_crawl_jobs.locked_at IS 'When job was locked by worker';
COMMENT ON COLUMN dispensary_crawl_jobs.locked_by IS 'Hostname of worker that locked job';
-- ============================================================
-- 2. CREATE INDEXES FOR JOB QUEUE QUERIES
-- ============================================================
CREATE INDEX IF NOT EXISTS idx_crawl_jobs_status_priority
ON dispensary_crawl_jobs(status, priority DESC)
WHERE status = 'pending';
CREATE INDEX IF NOT EXISTS idx_crawl_jobs_worker_id
ON dispensary_crawl_jobs(worker_id)
WHERE worker_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_crawl_jobs_locked_at
ON dispensary_crawl_jobs(locked_at)
WHERE locked_at IS NOT NULL;
-- ============================================================
-- 3. CREATE QUEUE STATS VIEW
-- ============================================================
CREATE OR REPLACE VIEW v_queue_stats AS
SELECT
COUNT(*) FILTER (WHERE status = 'pending') AS pending_jobs,
COUNT(*) FILTER (WHERE status = 'running') AS running_jobs,
COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') AS completed_1h,
COUNT(*) FILTER (WHERE status = 'failed' AND completed_at > NOW() - INTERVAL '1 hour') AS failed_1h,
COUNT(DISTINCT worker_id) FILTER (WHERE status = 'running') AS active_workers,
ROUND((AVG(EXTRACT(EPOCH FROM (completed_at - started_at)))
FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour'))::numeric, 2) AS avg_duration_seconds
FROM dispensary_crawl_jobs;
COMMENT ON VIEW v_queue_stats IS 'Real-time queue statistics for monitoring dashboard';
-- ============================================================
-- 4. RECORD MIGRATION
-- ============================================================
INSERT INTO schema_migrations (version, name, applied_at)
VALUES (59, '059_job_queue_columns', NOW())
ON CONFLICT (version) DO NOTHING;