Replace fragmented job systems (job_schedules, dispensary_crawl_jobs, SyncOrchestrator) with a single unified task queue: - Add worker_tasks table with atomic task claiming via SELECT FOR UPDATE SKIP LOCKED - Add TaskService for CRUD, claiming, and capacity metrics - Add TaskWorker with role-based handlers (resync, discovery, analytics) - Add /api/tasks endpoints for management and migration from legacy systems - Add TasksDashboard UI and integrate task counts into main dashboard - Add comprehensive documentation Task roles: store_discovery, entry_point_discovery, product_discovery, product_resync, analytics_refresh Run workers with: WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
323 lines
10 KiB
PL/PgSQL
323 lines
10 KiB
PL/PgSQL
-- Migration 074: Worker Task Queue System
|
|
-- Implements role-based task queue with per-store locking and capacity tracking
|
|
|
|
-- Task queue table
|
|
CREATE TABLE IF NOT EXISTS worker_tasks (
|
|
id SERIAL PRIMARY KEY,
|
|
|
|
-- Task identification
|
|
role VARCHAR(50) NOT NULL, -- store_discovery, entry_point_discovery, product_discovery, product_resync, analytics_refresh
|
|
dispensary_id INTEGER REFERENCES dispensaries(id) ON DELETE CASCADE,
|
|
platform VARCHAR(20), -- dutchie, jane, treez, etc.
|
|
|
|
-- Task state
|
|
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
|
priority INTEGER DEFAULT 0, -- Higher = more urgent
|
|
|
|
-- Scheduling
|
|
scheduled_for TIMESTAMPTZ, -- For batch scheduling (e.g., every 4 hours)
|
|
|
|
-- Ownership
|
|
worker_id VARCHAR(100), -- Pod name or worker ID
|
|
claimed_at TIMESTAMPTZ,
|
|
started_at TIMESTAMPTZ,
|
|
completed_at TIMESTAMPTZ,
|
|
last_heartbeat_at TIMESTAMPTZ,
|
|
|
|
-- Results
|
|
result JSONB, -- Task output data
|
|
error_message TEXT,
|
|
retry_count INTEGER DEFAULT 0,
|
|
max_retries INTEGER DEFAULT 3,
|
|
|
|
-- Metadata
|
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ DEFAULT NOW(),
|
|
|
|
-- Constraints
|
|
CONSTRAINT valid_status CHECK (status IN ('pending', 'claimed', 'running', 'completed', 'failed', 'stale'))
|
|
);
|
|
|
|
-- Indexes for efficient task claiming
|
|
CREATE INDEX IF NOT EXISTS idx_worker_tasks_pending
|
|
ON worker_tasks(role, priority DESC, created_at ASC)
|
|
WHERE status = 'pending';
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_worker_tasks_claimed
|
|
ON worker_tasks(worker_id, claimed_at)
|
|
WHERE status = 'claimed';
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_worker_tasks_running
|
|
ON worker_tasks(worker_id, last_heartbeat_at)
|
|
WHERE status = 'running';
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_worker_tasks_dispensary
|
|
ON worker_tasks(dispensary_id)
|
|
WHERE dispensary_id IS NOT NULL;
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_worker_tasks_scheduled
|
|
ON worker_tasks(scheduled_for)
|
|
WHERE status = 'pending' AND scheduled_for IS NOT NULL;
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_worker_tasks_history
|
|
ON worker_tasks(role, completed_at DESC)
|
|
WHERE status IN ('completed', 'failed');
|
|
|
|
-- Partial unique index to prevent duplicate active tasks per store
|
|
-- Only one task can be claimed/running for a given dispensary at a time
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_worker_tasks_unique_active_store
|
|
ON worker_tasks(dispensary_id)
|
|
WHERE status IN ('claimed', 'running') AND dispensary_id IS NOT NULL;
|
|
|
|
-- Worker registration table (tracks active workers)
|
|
CREATE TABLE IF NOT EXISTS worker_registry (
|
|
id SERIAL PRIMARY KEY,
|
|
worker_id VARCHAR(100) UNIQUE NOT NULL,
|
|
role VARCHAR(50) NOT NULL,
|
|
pod_name VARCHAR(100),
|
|
hostname VARCHAR(100),
|
|
started_at TIMESTAMPTZ DEFAULT NOW(),
|
|
last_heartbeat_at TIMESTAMPTZ DEFAULT NOW(),
|
|
tasks_completed INTEGER DEFAULT 0,
|
|
tasks_failed INTEGER DEFAULT 0,
|
|
status VARCHAR(20) DEFAULT 'active',
|
|
|
|
CONSTRAINT valid_worker_status CHECK (status IN ('active', 'idle', 'offline'))
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_worker_registry_role
|
|
ON worker_registry(role, status);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_worker_registry_heartbeat
|
|
ON worker_registry(last_heartbeat_at)
|
|
WHERE status = 'active';
|
|
|
|
-- Task completion tracking (summarized history)
|
|
CREATE TABLE IF NOT EXISTS task_completion_log (
|
|
id SERIAL PRIMARY KEY,
|
|
role VARCHAR(50) NOT NULL,
|
|
date DATE NOT NULL DEFAULT CURRENT_DATE,
|
|
hour INTEGER NOT NULL DEFAULT EXTRACT(HOUR FROM NOW()),
|
|
|
|
tasks_created INTEGER DEFAULT 0,
|
|
tasks_completed INTEGER DEFAULT 0,
|
|
tasks_failed INTEGER DEFAULT 0,
|
|
|
|
avg_duration_sec NUMERIC(10,2),
|
|
min_duration_sec NUMERIC(10,2),
|
|
max_duration_sec NUMERIC(10,2),
|
|
|
|
updated_at TIMESTAMPTZ DEFAULT NOW(),
|
|
|
|
UNIQUE(role, date, hour)
|
|
);
|
|
|
|
-- Capacity planning view
|
|
CREATE OR REPLACE VIEW v_worker_capacity AS
|
|
SELECT
|
|
role,
|
|
COUNT(*) FILTER (WHERE status = 'pending') as pending_tasks,
|
|
COUNT(*) FILTER (WHERE status = 'pending' AND (scheduled_for IS NULL OR scheduled_for <= NOW())) as ready_tasks,
|
|
COUNT(*) FILTER (WHERE status = 'claimed') as claimed_tasks,
|
|
COUNT(*) FILTER (WHERE status = 'running') as running_tasks,
|
|
COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') as completed_last_hour,
|
|
COUNT(*) FILTER (WHERE status = 'failed' AND completed_at > NOW() - INTERVAL '1 hour') as failed_last_hour,
|
|
COUNT(DISTINCT worker_id) FILTER (WHERE status IN ('claimed', 'running')) as active_workers,
|
|
AVG(EXTRACT(EPOCH FROM (completed_at - started_at)))
|
|
FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') as avg_duration_sec,
|
|
-- Capacity planning metrics
|
|
CASE
|
|
WHEN COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') > 0
|
|
THEN 3600.0 / NULLIF(AVG(EXTRACT(EPOCH FROM (completed_at - started_at)))
|
|
FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour'), 0)
|
|
ELSE NULL
|
|
END as tasks_per_worker_hour,
|
|
-- Estimated time to drain queue
|
|
CASE
|
|
WHEN COUNT(DISTINCT worker_id) FILTER (WHERE status IN ('claimed', 'running')) > 0
|
|
AND COUNT(*) FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') > 0
|
|
THEN COUNT(*) FILTER (WHERE status = 'pending') / NULLIF(
|
|
COUNT(DISTINCT worker_id) FILTER (WHERE status IN ('claimed', 'running')) *
|
|
(3600.0 / NULLIF(AVG(EXTRACT(EPOCH FROM (completed_at - started_at)))
|
|
FILTER (WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour'), 0)),
|
|
0
|
|
)
|
|
ELSE NULL
|
|
END as estimated_hours_to_drain
|
|
FROM worker_tasks
|
|
GROUP BY role;
|
|
|
|
-- Task history view (for UI)
|
|
CREATE OR REPLACE VIEW v_task_history AS
|
|
SELECT
|
|
t.id,
|
|
t.role,
|
|
t.dispensary_id,
|
|
d.name as dispensary_name,
|
|
t.platform,
|
|
t.status,
|
|
t.priority,
|
|
t.worker_id,
|
|
t.scheduled_for,
|
|
t.claimed_at,
|
|
t.started_at,
|
|
t.completed_at,
|
|
t.error_message,
|
|
t.retry_count,
|
|
t.created_at,
|
|
EXTRACT(EPOCH FROM (t.completed_at - t.started_at)) as duration_sec
|
|
FROM worker_tasks t
|
|
LEFT JOIN dispensaries d ON d.id = t.dispensary_id
|
|
ORDER BY t.created_at DESC;
|
|
|
|
-- Function to claim a task atomically
|
|
CREATE OR REPLACE FUNCTION claim_task(
|
|
p_role VARCHAR(50),
|
|
p_worker_id VARCHAR(100)
|
|
) RETURNS worker_tasks AS $$
|
|
DECLARE
|
|
claimed_task worker_tasks;
|
|
BEGIN
|
|
UPDATE worker_tasks
|
|
SET
|
|
status = 'claimed',
|
|
worker_id = p_worker_id,
|
|
claimed_at = NOW(),
|
|
updated_at = NOW()
|
|
WHERE id = (
|
|
SELECT id FROM worker_tasks
|
|
WHERE role = p_role
|
|
AND status = 'pending'
|
|
AND (scheduled_for IS NULL OR scheduled_for <= NOW())
|
|
-- Exclude stores that already have an active task
|
|
AND (dispensary_id IS NULL OR dispensary_id NOT IN (
|
|
SELECT dispensary_id FROM worker_tasks
|
|
WHERE status IN ('claimed', 'running')
|
|
AND dispensary_id IS NOT NULL
|
|
))
|
|
ORDER BY priority DESC, created_at ASC
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING * INTO claimed_task;
|
|
|
|
RETURN claimed_task;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function to mark stale tasks (workers that died)
|
|
CREATE OR REPLACE FUNCTION recover_stale_tasks(
|
|
stale_threshold_minutes INTEGER DEFAULT 10
|
|
) RETURNS INTEGER AS $$
|
|
DECLARE
|
|
recovered_count INTEGER;
|
|
BEGIN
|
|
WITH stale AS (
|
|
UPDATE worker_tasks
|
|
SET
|
|
status = 'pending',
|
|
worker_id = NULL,
|
|
claimed_at = NULL,
|
|
started_at = NULL,
|
|
retry_count = retry_count + 1,
|
|
updated_at = NOW()
|
|
WHERE status IN ('claimed', 'running')
|
|
AND last_heartbeat_at < NOW() - (stale_threshold_minutes || ' minutes')::INTERVAL
|
|
AND retry_count < max_retries
|
|
RETURNING id
|
|
)
|
|
SELECT COUNT(*) INTO recovered_count FROM stale;
|
|
|
|
-- Mark tasks that exceeded retries as failed
|
|
UPDATE worker_tasks
|
|
SET
|
|
status = 'failed',
|
|
error_message = 'Exceeded max retries after worker failures',
|
|
completed_at = NOW(),
|
|
updated_at = NOW()
|
|
WHERE status IN ('claimed', 'running')
|
|
AND last_heartbeat_at < NOW() - (stale_threshold_minutes || ' minutes')::INTERVAL
|
|
AND retry_count >= max_retries;
|
|
|
|
RETURN recovered_count;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function to generate daily resync tasks
|
|
CREATE OR REPLACE FUNCTION generate_resync_tasks(
|
|
p_batches_per_day INTEGER DEFAULT 6, -- Every 4 hours
|
|
p_date DATE DEFAULT CURRENT_DATE
|
|
) RETURNS INTEGER AS $$
|
|
DECLARE
|
|
store_count INTEGER;
|
|
stores_per_batch INTEGER;
|
|
batch_num INTEGER;
|
|
scheduled_time TIMESTAMPTZ;
|
|
created_count INTEGER := 0;
|
|
BEGIN
|
|
-- Count active stores that need resync
|
|
SELECT COUNT(*) INTO store_count
|
|
FROM dispensaries
|
|
WHERE crawl_enabled = true
|
|
AND menu_type = 'dutchie'
|
|
AND platform_dispensary_id IS NOT NULL;
|
|
|
|
IF store_count = 0 THEN
|
|
RETURN 0;
|
|
END IF;
|
|
|
|
stores_per_batch := CEIL(store_count::NUMERIC / p_batches_per_day);
|
|
|
|
FOR batch_num IN 0..(p_batches_per_day - 1) LOOP
|
|
scheduled_time := p_date + (batch_num * 4 || ' hours')::INTERVAL;
|
|
|
|
INSERT INTO worker_tasks (role, dispensary_id, platform, scheduled_for, priority)
|
|
SELECT
|
|
'product_resync',
|
|
d.id,
|
|
'dutchie',
|
|
scheduled_time,
|
|
0
|
|
FROM (
|
|
SELECT id, ROW_NUMBER() OVER (ORDER BY id) as rn
|
|
FROM dispensaries
|
|
WHERE crawl_enabled = true
|
|
AND menu_type = 'dutchie'
|
|
AND platform_dispensary_id IS NOT NULL
|
|
) d
|
|
WHERE d.rn > (batch_num * stores_per_batch)
|
|
AND d.rn <= ((batch_num + 1) * stores_per_batch)
|
|
ON CONFLICT DO NOTHING;
|
|
|
|
GET DIAGNOSTICS created_count = created_count + ROW_COUNT;
|
|
END LOOP;
|
|
|
|
RETURN created_count;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Trigger to update timestamp
|
|
CREATE OR REPLACE FUNCTION update_worker_tasks_timestamp()
|
|
RETURNS TRIGGER AS $$
|
|
BEGIN
|
|
NEW.updated_at = NOW();
|
|
RETURN NEW;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
DROP TRIGGER IF EXISTS worker_tasks_updated_at ON worker_tasks;
|
|
CREATE TRIGGER worker_tasks_updated_at
|
|
BEFORE UPDATE ON worker_tasks
|
|
FOR EACH ROW
|
|
EXECUTE FUNCTION update_worker_tasks_timestamp();
|
|
|
|
-- Comments
|
|
COMMENT ON TABLE worker_tasks IS 'Central task queue for all worker roles';
|
|
COMMENT ON TABLE worker_registry IS 'Registry of active workers and their stats';
|
|
COMMENT ON TABLE task_completion_log IS 'Hourly aggregated task completion metrics';
|
|
COMMENT ON VIEW v_worker_capacity IS 'Real-time capacity planning metrics per role';
|
|
COMMENT ON VIEW v_task_history IS 'Task history with dispensary details for UI';
|
|
COMMENT ON FUNCTION claim_task IS 'Atomically claim a task for a worker, respecting per-store locking';
|
|
COMMENT ON FUNCTION recover_stale_tasks IS 'Release tasks from dead workers back to pending';
|
|
COMMENT ON FUNCTION generate_resync_tasks IS 'Generate daily product resync tasks in batches';
|