Files
cannaiq/backend/src/routes/workers.ts
Kelly 2f483b3084 feat: SEO template library, discovery pipeline, and orchestrator enhancements
## SEO Template Library
- Add complete template library with 7 page types (state, city, category, brand, product, search, regeneration)
- Add Template Library tab in SEO Orchestrator with accordion-based editors
- Add template preview, validation, and variable injection engine
- Add API endpoints: /api/seo/templates, preview, validate, generate, regenerate

## Discovery Pipeline
- Add promotion.ts for discovery location validation and promotion
- Add discover-all-states.ts script for multi-state discovery
- Add promotion log migration (067)
- Enhance discovery routes and types

## Orchestrator & Admin
- Add crawl_enabled filter to stores page
- Add API permissions page
- Add job queue management
- Add price analytics routes
- Add markets and intelligence routes
- Enhance dashboard and worker monitoring

## Infrastructure
- Add migrations for worker definitions, SEO settings, field alignment
- Add canonical pipeline for scraper v2
- Update hydration and sync orchestrator
- Enhance multi-state query service

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-09 00:05:34 -07:00

1041 lines
34 KiB
TypeScript

/**
* Workers API Routes
*
* Provider-agnostic worker management and job monitoring.
* Replaces legacy /api/dutchie-az/admin/schedules and /api/dutchie-az/monitor/* routes.
*
* Endpoints:
* GET /api/workers - List all workers/schedules
* GET /api/workers/active - List currently active workers
* GET /api/workers/schedule - Get all job schedules
* GET /api/workers/:workerName - Get specific worker details
* GET /api/workers/:workerName/scope - Get worker's scope (states, etc.)
* GET /api/workers/:workerName/stats - Get worker statistics
* GET /api/workers/:workerName/logs - Get worker's recent logs
* POST /api/workers/:workerName/trigger - Trigger worker manually
*
* GET /api/monitor/jobs - Get recent job history
* GET /api/monitor/active-jobs - Get currently running jobs
* GET /api/monitor/summary - Get monitoring summary
*/
import { Router, Request, Response } from 'express';
import { pool } from '../db/pool';
const router = Router();
// ============================================================
// STATIC ROUTES (must come before parameterized routes)
// ============================================================
/**
* GET /api/workers/roles - List available worker roles
*/
router.get('/roles', async (_req: Request, res: Response) => {
const roles = [
{ id: 'product_sync', name: 'Product Sync', description: 'Crawls products from dispensary menus' },
{ id: 'store_discovery', name: 'Store Discovery', description: 'Discovers new dispensary locations' },
{ id: 'entry_point_finder', name: 'Entry Point Finder', description: 'Detects menu providers and resolves platform IDs' },
{ id: 'analytics_refresh', name: 'Analytics Refresh', description: 'Refreshes materialized views and analytics' },
{ id: 'price_monitor', name: 'Price Monitor', description: 'Monitors price changes and triggers alerts' },
{ id: 'inventory_sync', name: 'Inventory Sync', description: 'Syncs inventory levels' },
{ id: 'image_processor', name: 'Image Processor', description: 'Downloads and processes product images' },
{ id: 'data_validator', name: 'Data Validator', description: 'Validates data integrity' },
{ id: 'custom', name: 'Custom', description: 'Custom worker role' },
];
res.json({ success: true, roles });
});
/**
* GET /api/workers/states - List available states for assignment
*/
router.get('/states', async (_req: Request, res: Response) => {
try {
const { rows } = await pool.query(`
SELECT state_code, state_name, dispensary_count
FROM states
WHERE active = true
ORDER BY state_name ASC
`);
res.json({ success: true, states: rows });
} catch (error: any) {
// Fallback if states table doesn't exist
res.json({ success: true, states: [
{ state_code: 'AZ', state_name: 'Arizona', dispensary_count: 0 },
{ state_code: 'CA', state_name: 'California', dispensary_count: 0 },
{ state_code: 'CO', state_name: 'Colorado', dispensary_count: 0 },
{ state_code: 'MI', state_name: 'Michigan', dispensary_count: 0 },
{ state_code: 'NV', state_name: 'Nevada', dispensary_count: 0 },
]});
}
});
/**
* GET /api/workers/dispensaries - List dispensaries for assignment (paginated search)
*/
router.get('/dispensaries', async (req: Request, res: Response) => {
try {
const search = (req.query.search as string) || '';
const limit = parseInt(req.query.limit as string) || 50;
const { rows } = await pool.query(`
SELECT id, name, city, state_code
FROM dispensaries
WHERE ($1 = '' OR name ILIKE $2)
ORDER BY name ASC
LIMIT $3
`, [search, `%${search}%`, limit]);
res.json({ success: true, dispensaries: rows });
} catch (error: any) {
console.error('[Workers] Error fetching dispensaries:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/workers/chains - List chains for assignment
*/
router.get('/chains', async (_req: Request, res: Response) => {
try {
const { rows } = await pool.query(`
SELECT DISTINCT chain_id as id, chain_name as name, COUNT(*) as dispensary_count
FROM dispensaries
WHERE chain_id IS NOT NULL AND chain_name IS NOT NULL
GROUP BY chain_id, chain_name
ORDER BY chain_name ASC
`);
res.json({ success: true, chains: rows });
} catch (error: any) {
// Fallback if chain columns don't exist
res.json({ success: true, chains: [] });
}
});
// ============================================================
// WORKER TYPES
// ============================================================
interface Worker {
id: number;
worker_name: string;
run_role: string;
job_name?: string;
scope: string[];
description: string;
enabled: boolean;
base_interval_minutes: number;
jitter_minutes: number;
next_run_at: string | null;
last_run_at: string | null;
last_status: string | null;
last_error_message?: string | null;
last_duration_ms?: number | null;
last_seen: string | null;
visibility_lost: number;
visibility_restored: number;
}
interface JobLog {
id: number;
worker_name: string;
run_role: string;
job_name: string;
status: string;
started_at: string;
completed_at: string | null;
duration_seconds: number | null;
items_processed: number;
items_succeeded: number;
items_failed: number;
error_message: string | null;
scope: string[];
}
// ============================================================
// HELPERS
// ============================================================
function parseScope(jobConfig: any): string[] {
if (!jobConfig) return [];
if (jobConfig.scope) return Array.isArray(jobConfig.scope) ? jobConfig.scope : [jobConfig.scope];
if (jobConfig.states) return Array.isArray(jobConfig.states) ? jobConfig.states : [jobConfig.states];
return [];
}
function extractWorkerName(jobName: string, jobConfig: any): string {
// Priority: explicit worker_name > job_config.worker_name > derive from job_name
if (jobConfig?.worker_name) return jobConfig.worker_name;
// Extract from job_name like "dutchie_az_product_crawl" -> "ProductCrawl"
const parts = jobName.replace(/^(dutchie_)?az_?/i, '').split('_');
return parts.map(p => p.charAt(0).toUpperCase() + p.slice(1).toLowerCase()).join('');
}
function extractRunRole(jobName: string, jobConfig: any): string {
if (jobConfig?.run_role) return jobConfig.run_role;
// Map job names to roles
const roleMap: Record<string, string> = {
'menu_detection': 'StoreDiscovery',
'menu_detection_single': 'StoreDiscovery',
'dutchie_product_crawl': 'ProductSync',
'product_crawl': 'ProductSync',
'analytics_refresh': 'Analytics',
'id_resolution': 'IdResolution',
};
for (const [key, role] of Object.entries(roleMap)) {
if (jobName.toLowerCase().includes(key.toLowerCase())) {
return role;
}
}
return 'General';
}
// ============================================================
// WORKERS ROUTES
// ============================================================
/**
* GET /api/workers - List all workers/schedules
*/
router.get('/', async (_req: Request, res: Response) => {
try {
// pool imported from db/pool
const { rows } = await pool.query(`
SELECT
id,
job_name,
description,
enabled,
base_interval_minutes,
jitter_minutes,
next_run_at,
last_run_at,
last_status,
last_error_message,
last_duration_ms,
job_config,
worker_name,
worker_role
FROM job_schedules
ORDER BY enabled DESC, last_run_at DESC NULLS LAST
`);
const workers: Worker[] = rows.map((row: any) => ({
id: row.id,
worker_name: row.worker_name || extractWorkerName(row.job_name, row.job_config),
run_role: row.worker_role || extractRunRole(row.job_name, row.job_config),
job_name: row.job_name,
scope: parseScope(row.job_config),
description: row.description || row.job_name,
enabled: row.enabled,
base_interval_minutes: row.base_interval_minutes,
jitter_minutes: row.jitter_minutes,
next_run_at: row.next_run_at?.toISOString() || null,
last_run_at: row.last_run_at?.toISOString() || null,
last_status: row.last_status,
last_error_message: row.last_error_message,
last_duration_ms: row.last_duration_ms,
last_seen: row.last_run_at?.toISOString() || null,
visibility_lost: 0,
visibility_restored: 0,
}));
res.json({ success: true, workers });
} catch (error: any) {
console.error('[Workers] Error listing workers:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/workers/active - List currently active workers
*/
router.get('/active', async (_req: Request, res: Response) => {
try {
// pool imported from db/pool
const { rows } = await pool.query(`
SELECT DISTINCT ON (claimed_by)
claimed_by as worker_id,
worker_hostname,
job_type,
started_at,
last_heartbeat_at
FROM dispensary_crawl_jobs
WHERE status = 'running'
AND claimed_by IS NOT NULL
ORDER BY claimed_by, started_at DESC
`);
const activeWorkers = rows.map((row: any) => ({
worker_id: row.worker_id,
hostname: row.worker_hostname,
current_job_type: row.job_type,
started_at: row.started_at?.toISOString(),
last_heartbeat: row.last_heartbeat_at?.toISOString(),
run_role: extractRunRole(row.job_type, null),
}));
res.json({ success: true, active_workers: activeWorkers, count: activeWorkers.length });
} catch (error: any) {
console.error('[Workers] Error getting active workers:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/workers/schedule - Get all job schedules (alias for /)
*/
router.get('/schedule', async (req: Request, res: Response) => {
// Delegate to main workers endpoint
// pool imported from db/pool
try {
const { rows } = await pool.query(`
SELECT
id,
job_name,
description,
enabled,
base_interval_minutes,
jitter_minutes,
next_run_at,
last_run_at,
last_status,
job_config
FROM job_schedules
ORDER BY next_run_at ASC NULLS LAST
`);
res.json({ success: true, schedules: rows });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/workers/:workerIdOrName - Get specific worker details
*/
router.get('/:workerIdOrName', async (req: Request, res: Response) => {
try {
const { workerIdOrName } = req.params;
// pool imported from db/pool
// Try to find by ID or job_name
const { rows } = await pool.query(`
SELECT
id,
job_name,
description,
enabled,
base_interval_minutes,
jitter_minutes,
next_run_at,
last_run_at,
last_status,
job_config
FROM job_schedules
WHERE id = $1::int OR job_name ILIKE $2
LIMIT 1
`, [parseInt(workerIdOrName) || 0, `%${workerIdOrName}%`]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
const row = rows[0];
const worker: Worker = {
id: row.id,
worker_name: extractWorkerName(row.job_name, row.job_config),
run_role: extractRunRole(row.job_name, row.job_config),
scope: parseScope(row.job_config),
description: row.description || row.job_name,
enabled: row.enabled,
base_interval_minutes: row.base_interval_minutes,
jitter_minutes: row.jitter_minutes,
next_run_at: row.next_run_at?.toISOString() || null,
last_run_at: row.last_run_at?.toISOString() || null,
last_status: row.last_status,
last_seen: row.last_run_at?.toISOString() || null,
visibility_lost: 0,
visibility_restored: 0,
};
res.json({ success: true, worker });
} catch (error: any) {
console.error('[Workers] Error getting worker:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/workers/:workerIdOrName/scope - Get worker's scope
*/
router.get('/:workerIdOrName/scope', async (req: Request, res: Response) => {
try {
const { workerIdOrName } = req.params;
// pool imported from db/pool
const { rows } = await pool.query(`
SELECT job_config
FROM job_schedules
WHERE id = $1::int OR job_name ILIKE $2
LIMIT 1
`, [parseInt(workerIdOrName) || 0, `%${workerIdOrName}%`]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
const scope = parseScope(rows[0].job_config);
res.json({ success: true, scope });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/workers/:workerIdOrName/stats - Get worker statistics
*/
router.get('/:workerIdOrName/stats', async (req: Request, res: Response) => {
try {
const { workerIdOrName } = req.params;
// pool imported from db/pool
// Get schedule info
const scheduleResult = await pool.query(`
SELECT id, job_name FROM job_schedules
WHERE id = $1::int OR job_name ILIKE $2
LIMIT 1
`, [parseInt(workerIdOrName) || 0, `%${workerIdOrName}%`]);
if (scheduleResult.rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
const scheduleId = scheduleResult.rows[0].id;
// Get stats
const statsResult = await pool.query(`
SELECT
COUNT(*) FILTER (WHERE status = 'success') as success_count,
COUNT(*) FILTER (WHERE status IN ('error', 'partial')) as failure_count,
COUNT(*) as total_runs,
AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_duration_seconds,
SUM(items_processed) as total_items_processed,
MAX(completed_at) as last_completed
FROM job_run_logs
WHERE schedule_id = $1
AND started_at > NOW() - INTERVAL '7 days'
`, [scheduleId]);
const stats = statsResult.rows[0];
res.json({
success: true,
stats: {
success_count: parseInt(stats.success_count) || 0,
failure_count: parseInt(stats.failure_count) || 0,
total_runs: parseInt(stats.total_runs) || 0,
avg_duration_seconds: parseFloat(stats.avg_duration_seconds) || 0,
total_items_processed: parseInt(stats.total_items_processed) || 0,
last_completed: stats.last_completed?.toISOString() || null,
}
});
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/workers/:workerIdOrName/logs - Get worker's recent logs
*/
router.get('/:workerIdOrName/logs', async (req: Request, res: Response) => {
try {
const { workerIdOrName } = req.params;
const limit = parseInt(req.query.limit as string) || 20;
// pool imported from db/pool
// Get schedule info
const scheduleResult = await pool.query(`
SELECT id, job_name, job_config FROM job_schedules
WHERE id = $1::int OR job_name ILIKE $2
LIMIT 1
`, [parseInt(workerIdOrName) || 0, `%${workerIdOrName}%`]);
if (scheduleResult.rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
const schedule = scheduleResult.rows[0];
const { rows } = await pool.query(`
SELECT
id,
job_name,
status,
started_at,
completed_at,
EXTRACT(EPOCH FROM (completed_at - started_at)) as duration_seconds,
items_processed,
items_succeeded,
items_failed,
error_message,
metadata
FROM job_run_logs
WHERE schedule_id = $1
ORDER BY started_at DESC
LIMIT $2
`, [schedule.id, limit]);
const logs: JobLog[] = rows.map((row: any) => ({
id: row.id,
worker_name: extractWorkerName(schedule.job_name, schedule.job_config),
run_role: extractRunRole(schedule.job_name, schedule.job_config),
job_name: row.job_name,
status: row.status,
started_at: row.started_at?.toISOString(),
completed_at: row.completed_at?.toISOString() || null,
duration_seconds: row.duration_seconds ? Math.round(row.duration_seconds) : null,
items_processed: row.items_processed || 0,
items_succeeded: row.items_succeeded || 0,
items_failed: row.items_failed || 0,
error_message: row.error_message,
scope: parseScope(schedule.job_config),
}));
res.json({ success: true, logs });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/workers/:workerIdOrName/trigger - Trigger worker manually
*/
router.post('/:workerIdOrName/trigger', async (req: Request, res: Response) => {
try {
const { workerIdOrName } = req.params;
// pool imported from db/pool
// Get schedule info
const scheduleResult = await pool.query(`
SELECT id, job_name FROM job_schedules
WHERE id = $1::int OR job_name ILIKE $2
LIMIT 1
`, [parseInt(workerIdOrName) || 0, `%${workerIdOrName}%`]);
if (scheduleResult.rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
const scheduleId = scheduleResult.rows[0].id;
// Set next_run_at to now to trigger immediately
await pool.query(`
UPDATE job_schedules
SET next_run_at = NOW()
WHERE id = $1
`, [scheduleId]);
res.json({ success: true, message: 'Worker triggered', schedule_id: scheduleId });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
// ============================================================
// MONITOR ROUTES (for /api/monitor prefix)
// ============================================================
/**
* GET /api/monitor/jobs - Get recent job history
*/
router.get('/jobs', async (req: Request, res: Response) => {
try {
const limit = parseInt(req.query.limit as string) || 50;
const status = req.query.status as string | undefined;
// pool imported from db/pool
let query = `
SELECT
j.id,
j.job_name,
j.status,
j.started_at,
j.completed_at,
EXTRACT(EPOCH FROM (j.completed_at - j.started_at)) as duration_seconds,
j.items_processed,
j.items_succeeded,
j.items_failed,
j.error_message,
j.metadata,
s.job_config
FROM job_run_logs j
LEFT JOIN job_schedules s ON j.schedule_id = s.id
WHERE 1=1
`;
const params: any[] = [];
if (status) {
params.push(status);
query += ` AND j.status = $${params.length}`;
}
params.push(limit);
query += ` ORDER BY j.started_at DESC LIMIT $${params.length}`;
const { rows } = await pool.query(query, params);
const jobs: JobLog[] = rows.map((row: any) => ({
id: row.id,
worker_name: extractWorkerName(row.job_name, row.job_config),
run_role: extractRunRole(row.job_name, row.job_config),
job_name: row.job_name,
status: row.status,
started_at: row.started_at?.toISOString(),
completed_at: row.completed_at?.toISOString() || null,
duration_seconds: row.duration_seconds ? Math.round(row.duration_seconds) : null,
items_processed: row.items_processed || 0,
items_succeeded: row.items_succeeded || 0,
items_failed: row.items_failed || 0,
error_message: row.error_message,
scope: parseScope(row.job_config),
}));
res.json({ success: true, jobs, count: jobs.length });
} catch (error: any) {
console.error('[Monitor] Error getting jobs:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/monitor/active-jobs - Get currently running jobs
*/
router.get('/active-jobs', async (req: Request, res: Response) => {
try {
// pool imported from db/pool
const { rows } = await pool.query(`
SELECT
id,
dispensary_id,
job_type,
status,
worker_hostname,
started_at,
last_heartbeat_at,
products_found,
error_message,
metadata
FROM dispensary_crawl_jobs
WHERE status = 'running'
ORDER BY started_at DESC
`);
const activeJobs = rows.map((row: any) => ({
id: row.id,
dispensary_id: row.dispensary_id,
job_type: row.job_type,
worker_name: extractWorkerName(row.job_type, null),
run_role: extractRunRole(row.job_type, null),
status: row.status,
hostname: row.worker_hostname,
started_at: row.started_at?.toISOString(),
last_heartbeat: row.last_heartbeat_at?.toISOString(),
products_found: row.products_found || 0,
error_message: row.error_message,
}));
res.json({ success: true, active_jobs: activeJobs, count: activeJobs.length });
} catch (error: any) {
console.error('[Monitor] Error getting active jobs:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/monitor/summary - Get monitoring summary
*/
router.get('/summary', async (req: Request, res: Response) => {
try {
// pool imported from db/pool
// Get summary stats
const [scheduleStats, jobStats, activeJobs] = await Promise.all([
pool.query(`
SELECT
COUNT(*) as total_schedules,
COUNT(*) FILTER (WHERE enabled = true) as enabled_schedules,
COUNT(*) FILTER (WHERE last_status = 'success') as last_success,
COUNT(*) FILTER (WHERE last_status IN ('error', 'partial')) as last_failed
FROM job_schedules
`),
pool.query(`
SELECT
COUNT(*) as total_runs,
COUNT(*) FILTER (WHERE status = 'success') as success_count,
COUNT(*) FILTER (WHERE status IN ('error', 'partial')) as failure_count,
COUNT(*) FILTER (WHERE status = 'running') as running_count
FROM job_run_logs
WHERE started_at > NOW() - INTERVAL '24 hours'
`),
pool.query(`
SELECT COUNT(*) as active_count
FROM dispensary_crawl_jobs
WHERE status = 'running'
`)
]);
const schedules = scheduleStats.rows[0];
const jobs = jobStats.rows[0];
const active = activeJobs.rows[0];
res.json({
success: true,
summary: {
schedules: {
total: parseInt(schedules.total_schedules) || 0,
enabled: parseInt(schedules.enabled_schedules) || 0,
last_success: parseInt(schedules.last_success) || 0,
last_failed: parseInt(schedules.last_failed) || 0,
},
jobs_24h: {
total: parseInt(jobs.total_runs) || 0,
success: parseInt(jobs.success_count) || 0,
failed: parseInt(jobs.failure_count) || 0,
running: parseInt(jobs.running_count) || 0,
},
active_crawl_jobs: parseInt(active.active_count) || 0,
}
});
} catch (error: any) {
console.error('[Monitor] Error getting summary:', error);
res.status(500).json({ success: false, error: error.message });
}
});
// ============================================================
// WORKER CRUD ROUTES (using new workers table)
// ============================================================
/**
* GET /api/workers/definitions - List all worker definitions from workers table
*/
router.get('/definitions', async (_req: Request, res: Response) => {
try {
const { rows } = await pool.query(`
SELECT
w.*,
(SELECT COUNT(*) FROM dispensary_crawl_jobs j WHERE j.assigned_worker_id = w.id AND j.status = 'pending') as pending_jobs,
(SELECT COUNT(*) FROM dispensary_crawl_jobs j WHERE j.assigned_worker_id = w.id AND j.status = 'running') as running_jobs
FROM workers w
ORDER BY w.enabled DESC, w.name ASC
`);
res.json({ success: true, workers: rows });
} catch (error: any) {
console.error('[Workers] Error listing worker definitions:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/workers/definitions - Create a new worker definition
*/
router.post('/definitions', async (req: Request, res: Response) => {
try {
const {
name,
role,
description,
enabled = true,
schedule_type = 'interval',
interval_minutes = 240,
cron_expression,
jitter_minutes = 30,
assignment_type = 'all',
assigned_state_codes,
assigned_dispensary_ids,
assigned_chain_ids,
job_type = 'dutchie_product_crawl',
job_config = {},
priority = 0,
max_concurrent = 1
} = req.body;
if (!name || !role) {
return res.status(400).json({ success: false, error: 'name and role are required' });
}
const { rows } = await pool.query(`
INSERT INTO workers (
name, role, description, enabled,
schedule_type, interval_minutes, cron_expression, jitter_minutes,
assignment_type, assigned_state_codes, assigned_dispensary_ids, assigned_chain_ids,
job_type, job_config, priority, max_concurrent
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING *
`, [
name, role, description, enabled,
schedule_type, interval_minutes, cron_expression, jitter_minutes,
assignment_type, assigned_state_codes, assigned_dispensary_ids, assigned_chain_ids,
job_type, job_config, priority, max_concurrent
]);
// Also create a job_schedule entry for backwards compatibility
await pool.query(`
INSERT INTO job_schedules (job_name, description, enabled, base_interval_minutes, jitter_minutes, worker_name, worker_role, job_config)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (job_name) DO UPDATE SET
description = EXCLUDED.description,
enabled = EXCLUDED.enabled,
base_interval_minutes = EXCLUDED.base_interval_minutes,
jitter_minutes = EXCLUDED.jitter_minutes,
worker_name = EXCLUDED.worker_name,
worker_role = EXCLUDED.worker_role,
updated_at = NOW()
`, [
`worker_${name.toLowerCase().replace(/\s+/g, '_')}`,
description || `Worker: ${name}`,
enabled,
interval_minutes,
jitter_minutes,
name,
role,
job_config
]);
res.json({ success: true, worker: rows[0], message: 'Worker created' });
} catch (error: any) {
console.error('[Workers] Error creating worker:', error);
if (error.code === '23505') { // unique violation
return res.status(400).json({ success: false, error: 'Worker name already exists' });
}
res.status(500).json({ success: false, error: error.message });
}
});
/**
* PUT /api/workers/definitions/:id - Update a worker definition
*/
router.put('/definitions/:id', async (req: Request, res: Response) => {
try {
const { id } = req.params;
const {
name,
role,
description,
enabled,
schedule_type,
interval_minutes,
cron_expression,
jitter_minutes,
assignment_type,
assigned_state_codes,
assigned_dispensary_ids,
assigned_chain_ids,
job_type,
job_config,
priority,
max_concurrent
} = req.body;
const { rows } = await pool.query(`
UPDATE workers SET
name = COALESCE($1, name),
role = COALESCE($2, role),
description = COALESCE($3, description),
enabled = COALESCE($4, enabled),
schedule_type = COALESCE($5, schedule_type),
interval_minutes = COALESCE($6, interval_minutes),
cron_expression = COALESCE($7, cron_expression),
jitter_minutes = COALESCE($8, jitter_minutes),
assignment_type = COALESCE($9, assignment_type),
assigned_state_codes = COALESCE($10, assigned_state_codes),
assigned_dispensary_ids = COALESCE($11, assigned_dispensary_ids),
assigned_chain_ids = COALESCE($12, assigned_chain_ids),
job_type = COALESCE($13, job_type),
job_config = COALESCE($14, job_config),
priority = COALESCE($15, priority),
max_concurrent = COALESCE($16, max_concurrent),
updated_at = NOW()
WHERE id = $17
RETURNING *
`, [
name, role, description, enabled,
schedule_type, interval_minutes, cron_expression, jitter_minutes,
assignment_type, assigned_state_codes, assigned_dispensary_ids, assigned_chain_ids,
job_type, job_config, priority, max_concurrent,
id
]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
res.json({ success: true, worker: rows[0], message: 'Worker updated' });
} catch (error: any) {
console.error('[Workers] Error updating worker:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* DELETE /api/workers/definitions/:id - Delete a worker definition
*/
router.delete('/definitions/:id', async (req: Request, res: Response) => {
try {
const { id } = req.params;
const { rows } = await pool.query(`
DELETE FROM workers WHERE id = $1 RETURNING name
`, [id]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
res.json({ success: true, message: `Worker "${rows[0].name}" deleted` });
} catch (error: any) {
console.error('[Workers] Error deleting worker:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/workers/definitions/:id/toggle - Enable/disable worker
*/
router.post('/definitions/:id/toggle', async (req: Request, res: Response) => {
try {
const { id } = req.params;
const { rows } = await pool.query(`
UPDATE workers SET enabled = NOT enabled, updated_at = NOW()
WHERE id = $1
RETURNING id, name, enabled
`, [id]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
res.json({ success: true, worker: rows[0], message: `Worker ${rows[0].enabled ? 'enabled' : 'disabled'}` });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/workers/definitions/:id/assign-dispensary - Assign dispensary to worker
*/
router.post('/definitions/:id/assign-dispensary', async (req: Request, res: Response) => {
try {
const { id } = req.params;
const { dispensary_id } = req.body;
if (!dispensary_id) {
return res.status(400).json({ success: false, error: 'dispensary_id is required' });
}
const { rows } = await pool.query(`
UPDATE workers SET
assigned_dispensary_ids = array_append(
COALESCE(assigned_dispensary_ids, ARRAY[]::integer[]),
$1::integer
),
assignment_type = 'dispensary',
updated_at = NOW()
WHERE id = $2 AND NOT ($1 = ANY(COALESCE(assigned_dispensary_ids, ARRAY[]::integer[])))
RETURNING id, name, assigned_dispensary_ids
`, [dispensary_id, id]);
if (rows.length === 0) {
// Check if dispensary was already assigned
const existing = await pool.query(`
SELECT assigned_dispensary_ids FROM workers WHERE id = $1
`, [id]);
if (existing.rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
return res.json({ success: true, message: 'Dispensary already assigned', worker: existing.rows[0] });
}
res.json({ success: true, worker: rows[0], message: 'Dispensary assigned to worker' });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* DELETE /api/workers/definitions/:id/assign-dispensary/:dispensaryId - Remove dispensary from worker
*/
router.delete('/definitions/:id/assign-dispensary/:dispensaryId', async (req: Request, res: Response) => {
try {
const { id, dispensaryId } = req.params;
const { rows } = await pool.query(`
UPDATE workers SET
assigned_dispensary_ids = array_remove(assigned_dispensary_ids, $1::integer),
updated_at = NOW()
WHERE id = $2
RETURNING id, name, assigned_dispensary_ids
`, [dispensaryId, id]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
res.json({ success: true, worker: rows[0], message: 'Dispensary removed from worker' });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* PUT /api/workers/:id/schedule - Update worker schedule (for job_schedules table)
*/
router.put('/:id/schedule', async (req: Request, res: Response) => {
try {
const { id } = req.params;
const {
worker_name,
worker_role,
description,
enabled,
base_interval_minutes,
jitter_minutes,
job_config
} = req.body;
const { rows } = await pool.query(`
UPDATE job_schedules SET
worker_name = COALESCE($1, worker_name),
worker_role = COALESCE($2, worker_role),
description = COALESCE($3, description),
enabled = COALESCE($4, enabled),
base_interval_minutes = COALESCE($5, base_interval_minutes),
jitter_minutes = COALESCE($6, jitter_minutes),
job_config = COALESCE($7, job_config),
updated_at = NOW()
WHERE id = $8
RETURNING *
`, [worker_name, worker_role, description, enabled, base_interval_minutes, jitter_minutes, job_config, id]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Schedule not found' });
}
res.json({ success: true, schedule: rows[0], message: 'Schedule updated' });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
export default router;