Files
cannaiq/backend/migrations/128_pool_config.sql
Kelly d1d58da0b2
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
feat: Add K8s worker restart and pool control APIs
K8s routes (/api/k8s):
- POST /workers/restart - Restart all worker pods

Pool routes (/api/pool):
- GET /status - Pool status, task counts, pending by role/state
- POST /open - Open pool (allow task claiming)
- POST /close - Close pool (stop task claiming)
- POST /clear - Clear pending tasks (by role/state)
- GET /tasks - List tasks in pool
- DELETE /tasks/:id - Remove specific task
- POST /release-stale - Release stuck tasks

Migration 128: pool_config table with pool_open check in claim_task

🤖 Generated with [Claude Code](https://claude.com/claude-code)
2025-12-17 10:53:58 -07:00

110 lines
3.3 KiB
PL/PgSQL

-- Migration 128: Pool configuration table
-- Controls whether workers can claim tasks from the pool
CREATE TABLE IF NOT EXISTS pool_config (
id SERIAL PRIMARY KEY,
pool_open BOOLEAN NOT NULL DEFAULT true,
closed_reason TEXT,
closed_at TIMESTAMPTZ,
closed_by VARCHAR(100),
opened_at TIMESTAMPTZ DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Insert default config (pool open)
INSERT INTO pool_config (pool_open, opened_at)
VALUES (true, NOW())
ON CONFLICT DO NOTHING;
-- Update claim_task function to check pool status
CREATE OR REPLACE FUNCTION claim_task(
p_role VARCHAR(50),
p_worker_id VARCHAR(100),
p_curl_passed BOOLEAN DEFAULT TRUE,
p_http_passed BOOLEAN DEFAULT FALSE
) RETURNS worker_tasks AS $$
DECLARE
claimed_task worker_tasks;
worker_state VARCHAR(2);
session_valid BOOLEAN;
session_tasks INT;
max_tasks INT;
is_pool_open BOOLEAN;
BEGIN
-- Check if pool is open
SELECT pool_open INTO is_pool_open FROM pool_config LIMIT 1;
IF NOT COALESCE(is_pool_open, true) THEN
RETURN NULL; -- Pool is closed, no claiming allowed
END IF;
-- Get worker's current geo session info
SELECT
current_state,
session_task_count,
session_max_tasks,
(geo_session_started_at IS NOT NULL AND geo_session_started_at > NOW() - INTERVAL '60 minutes')
INTO worker_state, session_tasks, max_tasks, session_valid
FROM worker_registry
WHERE worker_id = p_worker_id;
-- Check if worker has reached max concurrent tasks (default 5)
IF session_tasks >= COALESCE(max_tasks, 5) THEN
RETURN NULL;
END IF;
-- If no valid geo session, or session expired, worker can't claim tasks
-- Worker must re-qualify first
IF worker_state IS NULL OR NOT session_valid THEN
RETURN NULL;
END IF;
-- Claim task matching worker's state
UPDATE worker_tasks
SET
status = 'claimed',
worker_id = p_worker_id,
claimed_at = NOW(),
updated_at = NOW()
WHERE id = (
SELECT wt.id FROM worker_tasks wt
JOIN dispensaries d ON wt.dispensary_id = d.id
WHERE wt.role = p_role
AND wt.status = 'pending'
AND (wt.scheduled_for IS NULL OR wt.scheduled_for <= NOW())
-- GEO FILTER: Task's dispensary must match worker's state
AND d.state = worker_state
-- Method compatibility: worker must have passed the required preflight
AND (
wt.method IS NULL -- No preference, any worker can claim
OR (wt.method = 'curl' AND p_curl_passed = TRUE)
OR (wt.method = 'http' AND p_http_passed = TRUE)
)
-- Exclude stores that already have an active task
AND (wt.dispensary_id IS NULL OR wt.dispensary_id NOT IN (
SELECT dispensary_id FROM worker_tasks
WHERE status IN ('claimed', 'running')
AND dispensary_id IS NOT NULL
AND dispensary_id != wt.dispensary_id
))
ORDER BY wt.priority DESC, wt.created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING * INTO claimed_task;
-- INCREMENT session_task_count if we claimed a task
IF claimed_task.id IS NOT NULL THEN
UPDATE worker_registry
SET session_task_count = session_task_count + 1
WHERE worker_id = p_worker_id;
END IF;
RETURN claimed_task;
END;
$$ LANGUAGE plpgsql;
-- Verify
SELECT 'pool_config table created' as status;
SELECT * FROM pool_config;