feat(tasks): Add unified task-based worker architecture
Replace fragmented job systems (job_schedules, dispensary_crawl_jobs, SyncOrchestrator) with a single unified task queue: - Add worker_tasks table with atomic task claiming via SELECT FOR UPDATE SKIP LOCKED - Add TaskService for CRUD, claiming, and capacity metrics - Add TaskWorker with role-based handlers (resync, discovery, analytics) - Add /api/tasks endpoints for management and migration from legacy systems - Add TasksDashboard UI and integrate task counts into main dashboard - Add comprehensive documentation Task roles: store_discovery, entry_point_discovery, product_discovery, product_resync, analytics_refresh Run workers with: WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -139,6 +139,7 @@ import eventsRoutes from './routes/events';
|
||||
import clickAnalyticsRoutes from './routes/click-analytics';
|
||||
import seoRoutes from './routes/seo';
|
||||
import priceAnalyticsRoutes from './routes/price-analytics';
|
||||
import tasksRoutes from './routes/tasks';
|
||||
|
||||
// Mark requests from trusted domains (cannaiq.co, findagram.co, findadispo.com)
|
||||
// These domains can access the API without authentication
|
||||
@@ -211,6 +212,10 @@ app.use('/api/monitor', workersRoutes);
|
||||
app.use('/api/job-queue', jobQueueRoutes);
|
||||
console.log('[Workers] Routes registered at /api/workers, /api/monitor, and /api/job-queue');
|
||||
|
||||
// Task queue management - worker tasks with capacity planning
|
||||
app.use('/api/tasks', tasksRoutes);
|
||||
console.log('[Tasks] Routes registered at /api/tasks');
|
||||
|
||||
// Phase 3: Analytics V2 - Enhanced analytics with rec/med state segmentation
|
||||
try {
|
||||
const analyticsV2Router = createAnalyticsV2Router(getPool());
|
||||
|
||||
565
backend/src/routes/tasks.ts
Normal file
565
backend/src/routes/tasks.ts
Normal file
@@ -0,0 +1,565 @@
|
||||
/**
|
||||
* Task Queue API Routes
|
||||
*
|
||||
* Endpoints for managing worker tasks, viewing capacity metrics,
|
||||
* and generating batch tasks.
|
||||
*/
|
||||
|
||||
import { Router, Request, Response } from 'express';
|
||||
import {
|
||||
taskService,
|
||||
TaskRole,
|
||||
TaskStatus,
|
||||
TaskFilter,
|
||||
} from '../tasks/task-service';
|
||||
import { pool } from '../db/pool';
|
||||
|
||||
const router = Router();
|
||||
|
||||
/**
|
||||
* GET /api/tasks
|
||||
* List tasks with optional filters
|
||||
*
|
||||
* Query params:
|
||||
* - role: Filter by role
|
||||
* - status: Filter by status (comma-separated for multiple)
|
||||
* - dispensary_id: Filter by dispensary
|
||||
* - worker_id: Filter by worker
|
||||
* - limit: Max results (default 100)
|
||||
* - offset: Pagination offset
|
||||
*/
|
||||
router.get('/', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const filter: TaskFilter = {};
|
||||
|
||||
if (req.query.role) {
|
||||
filter.role = req.query.role as TaskRole;
|
||||
}
|
||||
|
||||
if (req.query.status) {
|
||||
const statuses = (req.query.status as string).split(',') as TaskStatus[];
|
||||
filter.status = statuses.length === 1 ? statuses[0] : statuses;
|
||||
}
|
||||
|
||||
if (req.query.dispensary_id) {
|
||||
filter.dispensary_id = parseInt(req.query.dispensary_id as string, 10);
|
||||
}
|
||||
|
||||
if (req.query.worker_id) {
|
||||
filter.worker_id = req.query.worker_id as string;
|
||||
}
|
||||
|
||||
if (req.query.limit) {
|
||||
filter.limit = parseInt(req.query.limit as string, 10);
|
||||
}
|
||||
|
||||
if (req.query.offset) {
|
||||
filter.offset = parseInt(req.query.offset as string, 10);
|
||||
}
|
||||
|
||||
const tasks = await taskService.listTasks(filter);
|
||||
res.json({ tasks, count: tasks.length });
|
||||
} catch (error: unknown) {
|
||||
console.error('Error listing tasks:', error);
|
||||
res.status(500).json({ error: 'Failed to list tasks' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/tasks/counts
|
||||
* Get task counts by status
|
||||
*/
|
||||
router.get('/counts', async (_req: Request, res: Response) => {
|
||||
try {
|
||||
const counts = await taskService.getTaskCounts();
|
||||
res.json(counts);
|
||||
} catch (error: unknown) {
|
||||
console.error('Error getting task counts:', error);
|
||||
res.status(500).json({ error: 'Failed to get task counts' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/tasks/capacity
|
||||
* Get capacity metrics for all roles
|
||||
*/
|
||||
router.get('/capacity', async (_req: Request, res: Response) => {
|
||||
try {
|
||||
const metrics = await taskService.getCapacityMetrics();
|
||||
res.json({ metrics });
|
||||
} catch (error: unknown) {
|
||||
console.error('Error getting capacity metrics:', error);
|
||||
res.status(500).json({ error: 'Failed to get capacity metrics' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/tasks/capacity/:role
|
||||
* Get capacity metrics for a specific role
|
||||
*/
|
||||
router.get('/capacity/:role', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const role = req.params.role as TaskRole;
|
||||
const capacity = await taskService.getRoleCapacity(role);
|
||||
|
||||
if (!capacity) {
|
||||
return res.status(404).json({ error: 'Role not found or no data' });
|
||||
}
|
||||
|
||||
// Calculate workers needed for different SLAs
|
||||
const workersFor1Hour = await taskService.calculateWorkersNeeded(role, 1);
|
||||
const workersFor4Hours = await taskService.calculateWorkersNeeded(role, 4);
|
||||
const workersFor8Hours = await taskService.calculateWorkersNeeded(role, 8);
|
||||
|
||||
res.json({
|
||||
...capacity,
|
||||
workers_needed: {
|
||||
for_1_hour: workersFor1Hour,
|
||||
for_4_hours: workersFor4Hours,
|
||||
for_8_hours: workersFor8Hours,
|
||||
},
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error getting role capacity:', error);
|
||||
res.status(500).json({ error: 'Failed to get role capacity' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/tasks/:id
|
||||
* Get a specific task by ID
|
||||
*/
|
||||
router.get('/:id', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const taskId = parseInt(req.params.id, 10);
|
||||
const task = await taskService.getTask(taskId);
|
||||
|
||||
if (!task) {
|
||||
return res.status(404).json({ error: 'Task not found' });
|
||||
}
|
||||
|
||||
res.json(task);
|
||||
} catch (error: unknown) {
|
||||
console.error('Error getting task:', error);
|
||||
res.status(500).json({ error: 'Failed to get task' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/tasks
|
||||
* Create a new task
|
||||
*
|
||||
* Body:
|
||||
* - role: TaskRole (required)
|
||||
* - dispensary_id: number (optional)
|
||||
* - platform: string (optional)
|
||||
* - priority: number (optional, default 0)
|
||||
* - scheduled_for: ISO date string (optional)
|
||||
*/
|
||||
router.post('/', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const { role, dispensary_id, platform, priority, scheduled_for } = req.body;
|
||||
|
||||
if (!role) {
|
||||
return res.status(400).json({ error: 'Role is required' });
|
||||
}
|
||||
|
||||
// Check if store already has an active task
|
||||
if (dispensary_id) {
|
||||
const hasActive = await taskService.hasActiveTask(dispensary_id);
|
||||
if (hasActive) {
|
||||
return res.status(409).json({
|
||||
error: 'Store already has an active task',
|
||||
dispensary_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const task = await taskService.createTask({
|
||||
role,
|
||||
dispensary_id,
|
||||
platform,
|
||||
priority,
|
||||
scheduled_for: scheduled_for ? new Date(scheduled_for) : undefined,
|
||||
});
|
||||
|
||||
res.status(201).json(task);
|
||||
} catch (error: unknown) {
|
||||
console.error('Error creating task:', error);
|
||||
res.status(500).json({ error: 'Failed to create task' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/tasks/generate/resync
|
||||
* Generate daily resync tasks for all active stores
|
||||
*
|
||||
* Body:
|
||||
* - batches_per_day: number (optional, default 6 = every 4 hours)
|
||||
* - date: ISO date string (optional, default today)
|
||||
*/
|
||||
router.post('/generate/resync', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const { batches_per_day, date } = req.body;
|
||||
const batchesPerDay = batches_per_day ?? 6;
|
||||
const targetDate = date ? new Date(date) : new Date();
|
||||
|
||||
const createdCount = await taskService.generateDailyResyncTasks(
|
||||
batchesPerDay,
|
||||
targetDate
|
||||
);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
tasks_created: createdCount,
|
||||
batches_per_day: batchesPerDay,
|
||||
date: targetDate.toISOString().split('T')[0],
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error generating resync tasks:', error);
|
||||
res.status(500).json({ error: 'Failed to generate resync tasks' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/tasks/generate/discovery
|
||||
* Generate store discovery tasks for a platform
|
||||
*
|
||||
* Body:
|
||||
* - platform: string (required, e.g., 'dutchie')
|
||||
* - state_code: string (optional, e.g., 'AZ')
|
||||
* - priority: number (optional)
|
||||
*/
|
||||
router.post('/generate/discovery', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const { platform, state_code, priority } = req.body;
|
||||
|
||||
if (!platform) {
|
||||
return res.status(400).json({ error: 'Platform is required' });
|
||||
}
|
||||
|
||||
const task = await taskService.createStoreDiscoveryTask(
|
||||
platform,
|
||||
state_code,
|
||||
priority ?? 0
|
||||
);
|
||||
|
||||
res.status(201).json(task);
|
||||
} catch (error: unknown) {
|
||||
console.error('Error creating discovery task:', error);
|
||||
res.status(500).json({ error: 'Failed to create discovery task' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/tasks/recover-stale
|
||||
* Recover stale tasks from dead workers
|
||||
*
|
||||
* Body:
|
||||
* - threshold_minutes: number (optional, default 10)
|
||||
*/
|
||||
router.post('/recover-stale', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const { threshold_minutes } = req.body;
|
||||
const recovered = await taskService.recoverStaleTasks(threshold_minutes ?? 10);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
tasks_recovered: recovered,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error recovering stale tasks:', error);
|
||||
res.status(500).json({ error: 'Failed to recover stale tasks' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/tasks/role/:role/last-completion
|
||||
* Get the last completion time for a role
|
||||
*/
|
||||
router.get('/role/:role/last-completion', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const role = req.params.role as TaskRole;
|
||||
const lastCompletion = await taskService.getLastCompletion(role);
|
||||
|
||||
res.json({
|
||||
role,
|
||||
last_completion: lastCompletion?.toISOString() ?? null,
|
||||
time_since: lastCompletion
|
||||
? Math.floor((Date.now() - lastCompletion.getTime()) / 1000)
|
||||
: null,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error getting last completion:', error);
|
||||
res.status(500).json({ error: 'Failed to get last completion' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/tasks/role/:role/recent
|
||||
* Get recent completions for a role
|
||||
*/
|
||||
router.get('/role/:role/recent', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const role = req.params.role as TaskRole;
|
||||
const limit = parseInt(req.query.limit as string, 10) || 10;
|
||||
|
||||
const tasks = await taskService.getRecentCompletions(role, limit);
|
||||
res.json({ tasks });
|
||||
} catch (error: unknown) {
|
||||
console.error('Error getting recent completions:', error);
|
||||
res.status(500).json({ error: 'Failed to get recent completions' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/tasks/store/:dispensaryId/active
|
||||
* Check if a store has an active task
|
||||
*/
|
||||
router.get('/store/:dispensaryId/active', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const dispensaryId = parseInt(req.params.dispensaryId, 10);
|
||||
const hasActive = await taskService.hasActiveTask(dispensaryId);
|
||||
|
||||
res.json({
|
||||
dispensary_id: dispensaryId,
|
||||
has_active_task: hasActive,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error checking active task:', error);
|
||||
res.status(500).json({ error: 'Failed to check active task' });
|
||||
}
|
||||
});
|
||||
|
||||
// ============================================================
|
||||
// MIGRATION ROUTES - Disable old job systems
|
||||
// ============================================================
|
||||
|
||||
/**
|
||||
* GET /api/tasks/migration/status
|
||||
* Get status of old job systems vs new task queue
|
||||
*/
|
||||
router.get('/migration/status', async (_req: Request, res: Response) => {
|
||||
try {
|
||||
// Get old job system counts
|
||||
const [schedules, crawlJobs, rawPayloads, taskCounts] = await Promise.all([
|
||||
pool.query(`
|
||||
SELECT
|
||||
COUNT(*) as total,
|
||||
COUNT(*) FILTER (WHERE enabled = true) as enabled
|
||||
FROM job_schedules
|
||||
`),
|
||||
pool.query(`
|
||||
SELECT
|
||||
COUNT(*) as total,
|
||||
COUNT(*) FILTER (WHERE status = 'pending') as pending,
|
||||
COUNT(*) FILTER (WHERE status = 'running') as running
|
||||
FROM dispensary_crawl_jobs
|
||||
`),
|
||||
pool.query(`
|
||||
SELECT
|
||||
COUNT(*) as total,
|
||||
COUNT(*) FILTER (WHERE processed = false) as unprocessed
|
||||
FROM raw_payloads
|
||||
`),
|
||||
taskService.getTaskCounts(),
|
||||
]);
|
||||
|
||||
res.json({
|
||||
old_systems: {
|
||||
job_schedules: {
|
||||
total: parseInt(schedules.rows[0].total) || 0,
|
||||
enabled: parseInt(schedules.rows[0].enabled) || 0,
|
||||
},
|
||||
dispensary_crawl_jobs: {
|
||||
total: parseInt(crawlJobs.rows[0].total) || 0,
|
||||
pending: parseInt(crawlJobs.rows[0].pending) || 0,
|
||||
running: parseInt(crawlJobs.rows[0].running) || 0,
|
||||
},
|
||||
raw_payloads: {
|
||||
total: parseInt(rawPayloads.rows[0].total) || 0,
|
||||
unprocessed: parseInt(rawPayloads.rows[0].unprocessed) || 0,
|
||||
},
|
||||
},
|
||||
new_task_queue: taskCounts,
|
||||
recommendation: schedules.rows[0].enabled > 0
|
||||
? 'Disable old job schedules before switching to new task queue'
|
||||
: 'Ready to use new task queue',
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error getting migration status:', error);
|
||||
res.status(500).json({ error: 'Failed to get migration status' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/tasks/migration/disable-old-schedules
|
||||
* Disable all old job schedules to prepare for new task queue
|
||||
*/
|
||||
router.post('/migration/disable-old-schedules', async (_req: Request, res: Response) => {
|
||||
try {
|
||||
const result = await pool.query(`
|
||||
UPDATE job_schedules
|
||||
SET enabled = false,
|
||||
updated_at = NOW()
|
||||
WHERE enabled = true
|
||||
RETURNING id, job_name
|
||||
`);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
disabled_count: result.rowCount,
|
||||
disabled_schedules: result.rows.map(r => ({ id: r.id, job_name: r.job_name })),
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error disabling old schedules:', error);
|
||||
res.status(500).json({ error: 'Failed to disable old schedules' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/tasks/migration/cancel-pending-crawl-jobs
|
||||
* Cancel all pending crawl jobs from the old system
|
||||
*/
|
||||
router.post('/migration/cancel-pending-crawl-jobs', async (_req: Request, res: Response) => {
|
||||
try {
|
||||
const result = await pool.query(`
|
||||
UPDATE dispensary_crawl_jobs
|
||||
SET status = 'cancelled',
|
||||
completed_at = NOW(),
|
||||
updated_at = NOW()
|
||||
WHERE status = 'pending'
|
||||
RETURNING id
|
||||
`);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
cancelled_count: result.rowCount,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error cancelling pending crawl jobs:', error);
|
||||
res.status(500).json({ error: 'Failed to cancel pending crawl jobs' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/tasks/migration/create-resync-tasks
|
||||
* Create product_resync tasks for all crawl-enabled dispensaries
|
||||
*/
|
||||
router.post('/migration/create-resync-tasks', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const { priority = 0, state_code } = req.body;
|
||||
|
||||
let query = `
|
||||
SELECT id, name FROM dispensaries
|
||||
WHERE crawl_enabled = true
|
||||
AND platform_dispensary_id IS NOT NULL
|
||||
`;
|
||||
const params: any[] = [];
|
||||
|
||||
if (state_code) {
|
||||
query += `
|
||||
AND state_id = (SELECT id FROM states WHERE code = $1)
|
||||
`;
|
||||
params.push(state_code.toUpperCase());
|
||||
}
|
||||
|
||||
query += ` ORDER BY id`;
|
||||
|
||||
const dispensaries = await pool.query(query, params);
|
||||
let created = 0;
|
||||
|
||||
for (const disp of dispensaries.rows) {
|
||||
// Check if already has pending/running task
|
||||
const hasActive = await taskService.hasActiveTask(disp.id);
|
||||
if (!hasActive) {
|
||||
await taskService.createTask({
|
||||
role: 'product_resync',
|
||||
dispensary_id: disp.id,
|
||||
platform: 'dutchie',
|
||||
priority,
|
||||
});
|
||||
created++;
|
||||
}
|
||||
}
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
tasks_created: created,
|
||||
dispensaries_checked: dispensaries.rows.length,
|
||||
state_filter: state_code || 'all',
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error creating resync tasks:', error);
|
||||
res.status(500).json({ error: 'Failed to create resync tasks' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/tasks/migration/full-migrate
|
||||
* One-click migration: disable old systems, create new tasks
|
||||
*/
|
||||
router.post('/migration/full-migrate', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const results: any = {
|
||||
success: true,
|
||||
steps: [],
|
||||
};
|
||||
|
||||
// Step 1: Disable old job schedules
|
||||
const disableResult = await pool.query(`
|
||||
UPDATE job_schedules
|
||||
SET enabled = false, updated_at = NOW()
|
||||
WHERE enabled = true
|
||||
RETURNING id
|
||||
`);
|
||||
results.steps.push({
|
||||
step: 'disable_job_schedules',
|
||||
count: disableResult.rowCount,
|
||||
});
|
||||
|
||||
// Step 2: Cancel pending crawl jobs
|
||||
const cancelResult = await pool.query(`
|
||||
UPDATE dispensary_crawl_jobs
|
||||
SET status = 'cancelled', completed_at = NOW(), updated_at = NOW()
|
||||
WHERE status = 'pending'
|
||||
RETURNING id
|
||||
`);
|
||||
results.steps.push({
|
||||
step: 'cancel_pending_crawl_jobs',
|
||||
count: cancelResult.rowCount,
|
||||
});
|
||||
|
||||
// Step 3: Generate initial resync tasks
|
||||
const resyncCount = await taskService.generateDailyResyncTasks(6);
|
||||
results.steps.push({
|
||||
step: 'generate_resync_tasks',
|
||||
count: resyncCount,
|
||||
});
|
||||
|
||||
// Step 4: Create store discovery task
|
||||
const discoveryTask = await taskService.createStoreDiscoveryTask('dutchie', undefined, 0);
|
||||
results.steps.push({
|
||||
step: 'create_discovery_task',
|
||||
task_id: discoveryTask.id,
|
||||
});
|
||||
|
||||
// Step 5: Create analytics refresh task
|
||||
const analyticsTask = await taskService.createTask({
|
||||
role: 'analytics_refresh',
|
||||
priority: 0,
|
||||
});
|
||||
results.steps.push({
|
||||
step: 'create_analytics_task',
|
||||
task_id: analyticsTask.id,
|
||||
});
|
||||
|
||||
results.message = 'Migration complete. New task workers will pick up tasks.';
|
||||
res.json(results);
|
||||
} catch (error: unknown) {
|
||||
console.error('Error during full migration:', error);
|
||||
res.status(500).json({ error: 'Failed to complete migration' });
|
||||
}
|
||||
});
|
||||
|
||||
export default router;
|
||||
92
backend/src/tasks/handlers/analytics-refresh.ts
Normal file
92
backend/src/tasks/handlers/analytics-refresh.ts
Normal file
@@ -0,0 +1,92 @@
|
||||
/**
|
||||
* Analytics Refresh Handler
|
||||
*
|
||||
* Refreshes materialized views and pre-computed analytics tables.
|
||||
* Should run daily or on-demand after major data changes.
|
||||
*/
|
||||
|
||||
import { TaskContext, TaskResult } from '../task-worker';
|
||||
|
||||
export async function handleAnalyticsRefresh(ctx: TaskContext): Promise<TaskResult> {
|
||||
const { pool } = ctx;
|
||||
|
||||
console.log(`[AnalyticsRefresh] Starting analytics refresh...`);
|
||||
|
||||
const refreshed: string[] = [];
|
||||
const failed: string[] = [];
|
||||
|
||||
// List of materialized views to refresh
|
||||
const materializedViews = [
|
||||
'mv_state_metrics',
|
||||
'mv_brand_metrics',
|
||||
'mv_category_metrics',
|
||||
'v_brand_summary',
|
||||
'v_dashboard_stats',
|
||||
];
|
||||
|
||||
for (const viewName of materializedViews) {
|
||||
try {
|
||||
// Heartbeat before each refresh
|
||||
await ctx.heartbeat();
|
||||
|
||||
// Check if view exists
|
||||
const existsResult = await pool.query(`
|
||||
SELECT EXISTS (
|
||||
SELECT 1 FROM pg_matviews WHERE matviewname = $1
|
||||
UNION
|
||||
SELECT 1 FROM pg_views WHERE viewname = $1
|
||||
) as exists
|
||||
`, [viewName]);
|
||||
|
||||
if (!existsResult.rows[0].exists) {
|
||||
console.log(`[AnalyticsRefresh] View ${viewName} does not exist, skipping`);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Try to refresh (only works for materialized views)
|
||||
try {
|
||||
await pool.query(`REFRESH MATERIALIZED VIEW CONCURRENTLY ${viewName}`);
|
||||
refreshed.push(viewName);
|
||||
console.log(`[AnalyticsRefresh] Refreshed ${viewName}`);
|
||||
} catch (refreshError: any) {
|
||||
// Try non-concurrent refresh
|
||||
try {
|
||||
await pool.query(`REFRESH MATERIALIZED VIEW ${viewName}`);
|
||||
refreshed.push(viewName);
|
||||
console.log(`[AnalyticsRefresh] Refreshed ${viewName} (non-concurrent)`);
|
||||
} catch (nonConcurrentError: any) {
|
||||
// Not a materialized view or other error
|
||||
console.log(`[AnalyticsRefresh] ${viewName} is not a materialized view or refresh failed`);
|
||||
}
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error(`[AnalyticsRefresh] Error refreshing ${viewName}:`, error.message);
|
||||
failed.push(viewName);
|
||||
}
|
||||
}
|
||||
|
||||
// Run analytics capture functions if they exist
|
||||
const captureFunctions = [
|
||||
'capture_brand_snapshots',
|
||||
'capture_category_snapshots',
|
||||
];
|
||||
|
||||
for (const funcName of captureFunctions) {
|
||||
try {
|
||||
await pool.query(`SELECT ${funcName}()`);
|
||||
console.log(`[AnalyticsRefresh] Executed ${funcName}()`);
|
||||
} catch (error: any) {
|
||||
// Function might not exist
|
||||
console.log(`[AnalyticsRefresh] ${funcName}() not available`);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[AnalyticsRefresh] Complete: ${refreshed.length} refreshed, ${failed.length} failed`);
|
||||
|
||||
return {
|
||||
success: failed.length === 0,
|
||||
refreshed,
|
||||
failed,
|
||||
error: failed.length > 0 ? `Failed to refresh: ${failed.join(', ')}` : undefined,
|
||||
};
|
||||
}
|
||||
105
backend/src/tasks/handlers/entry-point-discovery.ts
Normal file
105
backend/src/tasks/handlers/entry-point-discovery.ts
Normal file
@@ -0,0 +1,105 @@
|
||||
/**
|
||||
* Entry Point Discovery Handler
|
||||
*
|
||||
* Detects menu type and resolves platform IDs for a discovered store.
|
||||
* This is the step between store_discovery and product_discovery.
|
||||
*/
|
||||
|
||||
import { TaskContext, TaskResult } from '../task-worker';
|
||||
import { DutchieClient } from '../../platforms/dutchie/client';
|
||||
|
||||
export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskResult> {
|
||||
const { pool, task } = ctx;
|
||||
const dispensaryId = task.dispensary_id;
|
||||
|
||||
if (!dispensaryId) {
|
||||
return { success: false, error: 'No dispensary_id specified for entry_point_discovery task' };
|
||||
}
|
||||
|
||||
try {
|
||||
// Get dispensary info
|
||||
const dispResult = await pool.query(`
|
||||
SELECT id, name, menu_url, platform_dispensary_id, menu_type
|
||||
FROM dispensaries
|
||||
WHERE id = $1
|
||||
`, [dispensaryId]);
|
||||
|
||||
if (dispResult.rows.length === 0) {
|
||||
return { success: false, error: `Dispensary ${dispensaryId} not found` };
|
||||
}
|
||||
|
||||
const dispensary = dispResult.rows[0];
|
||||
|
||||
// If already has platform_dispensary_id, we're done
|
||||
if (dispensary.platform_dispensary_id) {
|
||||
console.log(`[EntryPointDiscovery] Dispensary ${dispensaryId} already has platform ID`);
|
||||
return {
|
||||
success: true,
|
||||
alreadyResolved: true,
|
||||
platformId: dispensary.platform_dispensary_id,
|
||||
};
|
||||
}
|
||||
|
||||
const menuUrl = dispensary.menu_url;
|
||||
if (!menuUrl) {
|
||||
return { success: false, error: `Dispensary ${dispensaryId} has no menu_url` };
|
||||
}
|
||||
|
||||
console.log(`[EntryPointDiscovery] Resolving platform ID for ${dispensary.name} from ${menuUrl}`);
|
||||
|
||||
// Extract cName from menu URL
|
||||
// Format: https://dutchie.com/embedded-menu/<cName> or https://dutchie.com/dispensary/<slug>
|
||||
let cName: string | null = null;
|
||||
|
||||
const embeddedMatch = menuUrl.match(/\/embedded-menu\/([^/?]+)/);
|
||||
const dispensaryMatch = menuUrl.match(/\/dispensary\/([^/?]+)/);
|
||||
|
||||
if (embeddedMatch) {
|
||||
cName = embeddedMatch[1];
|
||||
} else if (dispensaryMatch) {
|
||||
cName = dispensaryMatch[1];
|
||||
}
|
||||
|
||||
if (!cName) {
|
||||
return {
|
||||
success: false,
|
||||
error: `Could not extract cName from menu_url: ${menuUrl}`,
|
||||
};
|
||||
}
|
||||
|
||||
// Resolve platform ID using Dutchie API
|
||||
const client = new DutchieClient();
|
||||
const platformId = await client.resolveDispensaryId(cName);
|
||||
|
||||
if (!platformId) {
|
||||
return {
|
||||
success: false,
|
||||
error: `Could not resolve platform ID for cName: ${cName}`,
|
||||
};
|
||||
}
|
||||
|
||||
// Update dispensary with platform ID and enable crawling
|
||||
await pool.query(`
|
||||
UPDATE dispensaries
|
||||
SET platform_dispensary_id = $2,
|
||||
menu_type = 'dutchie',
|
||||
crawl_enabled = true,
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
`, [dispensaryId, platformId]);
|
||||
|
||||
console.log(`[EntryPointDiscovery] Resolved ${dispensary.name}: platformId=${platformId}`);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
platformId,
|
||||
cName,
|
||||
};
|
||||
} catch (error: any) {
|
||||
console.error(`[EntryPointDiscovery] Error for dispensary ${dispensaryId}:`, error.message);
|
||||
return {
|
||||
success: false,
|
||||
error: error.message,
|
||||
};
|
||||
}
|
||||
}
|
||||
11
backend/src/tasks/handlers/index.ts
Normal file
11
backend/src/tasks/handlers/index.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
/**
|
||||
* Task Handlers Index
|
||||
*
|
||||
* Exports all task handlers for the task worker.
|
||||
*/
|
||||
|
||||
export { handleProductResync } from './product-resync';
|
||||
export { handleProductDiscovery } from './product-discovery';
|
||||
export { handleStoreDiscovery } from './store-discovery';
|
||||
export { handleEntryPointDiscovery } from './entry-point-discovery';
|
||||
export { handleAnalyticsRefresh } from './analytics-refresh';
|
||||
16
backend/src/tasks/handlers/product-discovery.ts
Normal file
16
backend/src/tasks/handlers/product-discovery.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
/**
|
||||
* Product Discovery Handler
|
||||
*
|
||||
* Initial product fetch for stores that have 0 products.
|
||||
* Same logic as product_resync, but for initial discovery.
|
||||
*/
|
||||
|
||||
import { TaskContext, TaskResult } from '../task-worker';
|
||||
import { handleProductResync } from './product-resync';
|
||||
|
||||
export async function handleProductDiscovery(ctx: TaskContext): Promise<TaskResult> {
|
||||
// Product discovery is essentially the same as resync for the first time
|
||||
// The main difference is in when this task is triggered (new store vs scheduled)
|
||||
console.log(`[ProductDiscovery] Starting initial product fetch for dispensary ${ctx.task.dispensary_id}`);
|
||||
return handleProductResync(ctx);
|
||||
}
|
||||
131
backend/src/tasks/handlers/product-resync.ts
Normal file
131
backend/src/tasks/handlers/product-resync.ts
Normal file
@@ -0,0 +1,131 @@
|
||||
/**
|
||||
* Product Resync Handler
|
||||
*
|
||||
* Re-crawls a store that already has products to capture price/stock changes.
|
||||
* Creates new snapshots for any changed products.
|
||||
*/
|
||||
|
||||
import { TaskContext, TaskResult } from '../task-worker';
|
||||
import { DutchieClient } from '../../platforms/dutchie/client';
|
||||
import { hydrateToCanonical } from '../../hydration/canonical-upsert';
|
||||
import { DutchieNormalizer } from '../../hydration/normalizers/dutchie';
|
||||
|
||||
export async function handleProductResync(ctx: TaskContext): Promise<TaskResult> {
|
||||
const { pool, task } = ctx;
|
||||
const dispensaryId = task.dispensary_id;
|
||||
|
||||
if (!dispensaryId) {
|
||||
return { success: false, error: 'No dispensary_id specified for product_resync task' };
|
||||
}
|
||||
|
||||
try {
|
||||
// Get dispensary info
|
||||
const dispResult = await pool.query(`
|
||||
SELECT id, name, platform_dispensary_id, menu_url, state
|
||||
FROM dispensaries
|
||||
WHERE id = $1 AND crawl_enabled = true
|
||||
`, [dispensaryId]);
|
||||
|
||||
if (dispResult.rows.length === 0) {
|
||||
return { success: false, error: `Dispensary ${dispensaryId} not found or not crawl_enabled` };
|
||||
}
|
||||
|
||||
const dispensary = dispResult.rows[0];
|
||||
const platformId = dispensary.platform_dispensary_id;
|
||||
|
||||
if (!platformId) {
|
||||
return { success: false, error: `Dispensary ${dispensaryId} has no platform_dispensary_id` };
|
||||
}
|
||||
|
||||
console.log(`[ProductResync] Crawling ${dispensary.name} (${dispensaryId})`);
|
||||
|
||||
// Send heartbeat before long operation
|
||||
await ctx.heartbeat();
|
||||
|
||||
// Fetch products from Dutchie
|
||||
const client = new DutchieClient();
|
||||
const products = await client.fetchProducts(platformId);
|
||||
|
||||
if (!products || products.length === 0) {
|
||||
// No products returned - could be a problem or could be empty menu
|
||||
console.log(`[ProductResync] No products returned for ${dispensary.name}`);
|
||||
return {
|
||||
success: true,
|
||||
productsProcessed: 0,
|
||||
snapshotsCreated: 0,
|
||||
message: 'No products returned from API',
|
||||
};
|
||||
}
|
||||
|
||||
console.log(`[ProductResync] Fetched ${products.length} products for ${dispensary.name}`);
|
||||
|
||||
// Heartbeat again
|
||||
await ctx.heartbeat();
|
||||
|
||||
// Normalize products
|
||||
const normalizer = new DutchieNormalizer();
|
||||
const normResult = normalizer.normalize({
|
||||
products,
|
||||
dispensary_id: dispensaryId,
|
||||
platform: 'dutchie',
|
||||
});
|
||||
|
||||
// Create crawl run record
|
||||
const crawlRunResult = await pool.query(`
|
||||
INSERT INTO crawl_runs (dispensary_id, provider, started_at, status, trigger_type)
|
||||
VALUES ($1, 'dutchie', NOW(), 'running', 'task')
|
||||
RETURNING id
|
||||
`, [dispensaryId]);
|
||||
const crawlRunId = crawlRunResult.rows[0].id;
|
||||
|
||||
// Hydrate to canonical tables
|
||||
const hydrateResult = await hydrateToCanonical(
|
||||
pool,
|
||||
dispensaryId,
|
||||
normResult,
|
||||
crawlRunId
|
||||
);
|
||||
|
||||
// Update crawl run
|
||||
await pool.query(`
|
||||
UPDATE crawl_runs
|
||||
SET status = 'completed',
|
||||
completed_at = NOW(),
|
||||
products_found = $2,
|
||||
products_new = $3,
|
||||
products_updated = $4,
|
||||
snapshots_created = $5
|
||||
WHERE id = $1
|
||||
`, [
|
||||
crawlRunId,
|
||||
hydrateResult.productsUpserted,
|
||||
hydrateResult.productsNew,
|
||||
hydrateResult.productsUpdated,
|
||||
hydrateResult.snapshotsCreated,
|
||||
]);
|
||||
|
||||
// Update dispensary last_crawled_at
|
||||
await pool.query(`
|
||||
UPDATE dispensaries
|
||||
SET last_crawled_at = NOW()
|
||||
WHERE id = $1
|
||||
`, [dispensaryId]);
|
||||
|
||||
console.log(`[ProductResync] Completed ${dispensary.name}: ${hydrateResult.productsUpserted} products, ${hydrateResult.snapshotsCreated} snapshots`);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
productsProcessed: hydrateResult.productsUpserted,
|
||||
productsNew: hydrateResult.productsNew,
|
||||
productsUpdated: hydrateResult.productsUpdated,
|
||||
snapshotsCreated: hydrateResult.snapshotsCreated,
|
||||
brandsCreated: hydrateResult.brandsCreated,
|
||||
};
|
||||
} catch (error: any) {
|
||||
console.error(`[ProductResync] Error for dispensary ${dispensaryId}:`, error.message);
|
||||
return {
|
||||
success: false,
|
||||
error: error.message,
|
||||
};
|
||||
}
|
||||
}
|
||||
67
backend/src/tasks/handlers/store-discovery.ts
Normal file
67
backend/src/tasks/handlers/store-discovery.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
/**
|
||||
* Store Discovery Handler
|
||||
*
|
||||
* Discovers new stores on a platform (e.g., Dutchie) by crawling
|
||||
* location APIs and adding them to dutchie_discovery_locations.
|
||||
*/
|
||||
|
||||
import { TaskContext, TaskResult } from '../task-worker';
|
||||
import { DiscoveryCrawler } from '../../discovery/discovery-crawler';
|
||||
|
||||
export async function handleStoreDiscovery(ctx: TaskContext): Promise<TaskResult> {
|
||||
const { pool, task } = ctx;
|
||||
const platform = task.platform || 'dutchie';
|
||||
|
||||
console.log(`[StoreDiscovery] Starting discovery for platform: ${platform}`);
|
||||
|
||||
try {
|
||||
// Get states to discover
|
||||
const statesResult = await pool.query(`
|
||||
SELECT code FROM states WHERE active = true ORDER BY code
|
||||
`);
|
||||
const stateCodes = statesResult.rows.map(r => r.code);
|
||||
|
||||
if (stateCodes.length === 0) {
|
||||
return { success: true, storesDiscovered: 0, message: 'No active states to discover' };
|
||||
}
|
||||
|
||||
let totalDiscovered = 0;
|
||||
let totalPromoted = 0;
|
||||
|
||||
// Run discovery for each state
|
||||
const crawler = new DiscoveryCrawler(pool);
|
||||
|
||||
for (const stateCode of stateCodes) {
|
||||
// Heartbeat before each state
|
||||
await ctx.heartbeat();
|
||||
|
||||
console.log(`[StoreDiscovery] Discovering stores in ${stateCode}...`);
|
||||
|
||||
try {
|
||||
const result = await crawler.discoverState(stateCode);
|
||||
totalDiscovered += result.locationsDiscovered || 0;
|
||||
totalPromoted += result.locationsPromoted || 0;
|
||||
console.log(`[StoreDiscovery] ${stateCode}: discovered ${result.locationsDiscovered}, promoted ${result.locationsPromoted}`);
|
||||
} catch (error: any) {
|
||||
console.error(`[StoreDiscovery] Error discovering ${stateCode}:`, error.message);
|
||||
// Continue with other states
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[StoreDiscovery] Complete: ${totalDiscovered} discovered, ${totalPromoted} promoted`);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
storesDiscovered: totalDiscovered,
|
||||
storesPromoted: totalPromoted,
|
||||
statesProcessed: stateCodes.length,
|
||||
newStoreIds: [], // Would be populated with actual new store IDs for chaining
|
||||
};
|
||||
} catch (error: any) {
|
||||
console.error(`[StoreDiscovery] Error:`, error.message);
|
||||
return {
|
||||
success: false,
|
||||
error: error.message,
|
||||
};
|
||||
}
|
||||
}
|
||||
25
backend/src/tasks/index.ts
Normal file
25
backend/src/tasks/index.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
/**
|
||||
* Task Queue Module
|
||||
*
|
||||
* Exports task service, worker, and types for use throughout the application.
|
||||
*/
|
||||
|
||||
export {
|
||||
taskService,
|
||||
TaskRole,
|
||||
TaskStatus,
|
||||
WorkerTask,
|
||||
CreateTaskParams,
|
||||
CapacityMetrics,
|
||||
TaskFilter,
|
||||
} from './task-service';
|
||||
|
||||
export { TaskWorker, TaskContext, TaskResult } from './task-worker';
|
||||
|
||||
export {
|
||||
handleProductResync,
|
||||
handleProductDiscovery,
|
||||
handleStoreDiscovery,
|
||||
handleEntryPointDiscovery,
|
||||
handleAnalyticsRefresh,
|
||||
} from './handlers';
|
||||
474
backend/src/tasks/task-service.ts
Normal file
474
backend/src/tasks/task-service.ts
Normal file
@@ -0,0 +1,474 @@
|
||||
/**
|
||||
* Task Service
|
||||
*
|
||||
* Central service for managing worker tasks with:
|
||||
* - Atomic task claiming (per-store locking)
|
||||
* - Task lifecycle management
|
||||
* - Auto-chaining of related tasks
|
||||
* - Capacity planning metrics
|
||||
*/
|
||||
|
||||
import { pool } from '../db/pool';
|
||||
|
||||
export type TaskRole =
|
||||
| 'store_discovery'
|
||||
| 'entry_point_discovery'
|
||||
| 'product_discovery'
|
||||
| 'product_resync'
|
||||
| 'analytics_refresh';
|
||||
|
||||
export type TaskStatus =
|
||||
| 'pending'
|
||||
| 'claimed'
|
||||
| 'running'
|
||||
| 'completed'
|
||||
| 'failed'
|
||||
| 'stale';
|
||||
|
||||
export interface WorkerTask {
|
||||
id: number;
|
||||
role: TaskRole;
|
||||
dispensary_id: number | null;
|
||||
platform: string | null;
|
||||
status: TaskStatus;
|
||||
priority: number;
|
||||
scheduled_for: Date | null;
|
||||
worker_id: string | null;
|
||||
claimed_at: Date | null;
|
||||
started_at: Date | null;
|
||||
completed_at: Date | null;
|
||||
last_heartbeat_at: Date | null;
|
||||
result: Record<string, unknown> | null;
|
||||
error_message: string | null;
|
||||
retry_count: number;
|
||||
max_retries: number;
|
||||
created_at: Date;
|
||||
updated_at: Date;
|
||||
}
|
||||
|
||||
export interface CreateTaskParams {
|
||||
role: TaskRole;
|
||||
dispensary_id?: number;
|
||||
platform?: string;
|
||||
priority?: number;
|
||||
scheduled_for?: Date;
|
||||
}
|
||||
|
||||
export interface CapacityMetrics {
|
||||
role: string;
|
||||
pending_tasks: number;
|
||||
ready_tasks: number;
|
||||
claimed_tasks: number;
|
||||
running_tasks: number;
|
||||
completed_last_hour: number;
|
||||
failed_last_hour: number;
|
||||
active_workers: number;
|
||||
avg_duration_sec: number | null;
|
||||
tasks_per_worker_hour: number | null;
|
||||
estimated_hours_to_drain: number | null;
|
||||
}
|
||||
|
||||
export interface TaskFilter {
|
||||
role?: TaskRole;
|
||||
status?: TaskStatus | TaskStatus[];
|
||||
dispensary_id?: number;
|
||||
worker_id?: string;
|
||||
limit?: number;
|
||||
offset?: number;
|
||||
}
|
||||
|
||||
class TaskService {
|
||||
/**
|
||||
* Create a new task
|
||||
*/
|
||||
async createTask(params: CreateTaskParams): Promise<WorkerTask> {
|
||||
const result = await pool.query(
|
||||
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
RETURNING *`,
|
||||
[
|
||||
params.role,
|
||||
params.dispensary_id ?? null,
|
||||
params.platform ?? null,
|
||||
params.priority ?? 0,
|
||||
params.scheduled_for ?? null,
|
||||
]
|
||||
);
|
||||
return result.rows[0] as WorkerTask;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create multiple tasks in a batch
|
||||
*/
|
||||
async createTasks(tasks: CreateTaskParams[]): Promise<number> {
|
||||
if (tasks.length === 0) return 0;
|
||||
|
||||
const values = tasks.map((t, i) => {
|
||||
const base = i * 5;
|
||||
return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5})`;
|
||||
});
|
||||
|
||||
const params = tasks.flatMap((t) => [
|
||||
t.role,
|
||||
t.dispensary_id ?? null,
|
||||
t.platform ?? null,
|
||||
t.priority ?? 0,
|
||||
t.scheduled_for ?? null,
|
||||
]);
|
||||
|
||||
const result = await pool.query(
|
||||
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for)
|
||||
VALUES ${values.join(', ')}
|
||||
ON CONFLICT DO NOTHING`,
|
||||
params
|
||||
);
|
||||
|
||||
return result.rowCount ?? 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Claim a task atomically for a worker
|
||||
* Uses the SQL function for proper locking
|
||||
*/
|
||||
async claimTask(role: TaskRole, workerId: string): Promise<WorkerTask | null> {
|
||||
const result = await pool.query(
|
||||
`SELECT * FROM claim_task($1, $2)`,
|
||||
[role, workerId]
|
||||
);
|
||||
return (result.rows[0] as WorkerTask) || null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a task as running (worker started processing)
|
||||
*/
|
||||
async startTask(taskId: number): Promise<void> {
|
||||
await pool.query(
|
||||
`UPDATE worker_tasks
|
||||
SET status = 'running', started_at = NOW(), last_heartbeat_at = NOW()
|
||||
WHERE id = $1`,
|
||||
[taskId]
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update heartbeat to prevent stale detection
|
||||
*/
|
||||
async heartbeat(taskId: number): Promise<void> {
|
||||
await pool.query(
|
||||
`UPDATE worker_tasks
|
||||
SET last_heartbeat_at = NOW()
|
||||
WHERE id = $1 AND status = 'running'`,
|
||||
[taskId]
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a task as completed
|
||||
*/
|
||||
async completeTask(taskId: number, result?: Record<string, unknown>): Promise<void> {
|
||||
await pool.query(
|
||||
`UPDATE worker_tasks
|
||||
SET status = 'completed', completed_at = NOW(), result = $2
|
||||
WHERE id = $1`,
|
||||
[taskId, result ? JSON.stringify(result) : null]
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a task as failed
|
||||
*/
|
||||
async failTask(taskId: number, errorMessage: string): Promise<void> {
|
||||
await pool.query(
|
||||
`UPDATE worker_tasks
|
||||
SET status = 'failed', completed_at = NOW(), error_message = $2
|
||||
WHERE id = $1`,
|
||||
[taskId, errorMessage]
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a task by ID
|
||||
*/
|
||||
async getTask(taskId: number): Promise<WorkerTask | null> {
|
||||
const result = await pool.query(
|
||||
`SELECT * FROM worker_tasks WHERE id = $1`,
|
||||
[taskId]
|
||||
);
|
||||
return (result.rows[0] as WorkerTask) || null;
|
||||
}
|
||||
|
||||
/**
|
||||
* List tasks with filters
|
||||
*/
|
||||
async listTasks(filter: TaskFilter = {}): Promise<WorkerTask[]> {
|
||||
const conditions: string[] = [];
|
||||
const params: (string | number | string[])[] = [];
|
||||
let paramIndex = 1;
|
||||
|
||||
if (filter.role) {
|
||||
conditions.push(`role = $${paramIndex++}`);
|
||||
params.push(filter.role);
|
||||
}
|
||||
|
||||
if (filter.status) {
|
||||
if (Array.isArray(filter.status)) {
|
||||
conditions.push(`status = ANY($${paramIndex++})`);
|
||||
params.push(filter.status);
|
||||
} else {
|
||||
conditions.push(`status = $${paramIndex++}`);
|
||||
params.push(filter.status);
|
||||
}
|
||||
}
|
||||
|
||||
if (filter.dispensary_id) {
|
||||
conditions.push(`dispensary_id = $${paramIndex++}`);
|
||||
params.push(filter.dispensary_id);
|
||||
}
|
||||
|
||||
if (filter.worker_id) {
|
||||
conditions.push(`worker_id = $${paramIndex++}`);
|
||||
params.push(filter.worker_id);
|
||||
}
|
||||
|
||||
const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : '';
|
||||
const limit = filter.limit ?? 100;
|
||||
const offset = filter.offset ?? 0;
|
||||
|
||||
const result = await pool.query(
|
||||
`SELECT * FROM worker_tasks
|
||||
${whereClause}
|
||||
ORDER BY created_at DESC
|
||||
LIMIT ${limit} OFFSET ${offset}`,
|
||||
params
|
||||
);
|
||||
|
||||
return result.rows as WorkerTask[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get capacity metrics for all roles
|
||||
*/
|
||||
async getCapacityMetrics(): Promise<CapacityMetrics[]> {
|
||||
const result = await pool.query(
|
||||
`SELECT * FROM v_worker_capacity`
|
||||
);
|
||||
return result.rows as CapacityMetrics[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get capacity metrics for a specific role
|
||||
*/
|
||||
async getRoleCapacity(role: TaskRole): Promise<CapacityMetrics | null> {
|
||||
const result = await pool.query(
|
||||
`SELECT * FROM v_worker_capacity WHERE role = $1`,
|
||||
[role]
|
||||
);
|
||||
return (result.rows[0] as CapacityMetrics) || null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover stale tasks from dead workers
|
||||
*/
|
||||
async recoverStaleTasks(staleThresholdMinutes = 10): Promise<number> {
|
||||
const result = await pool.query(
|
||||
`SELECT recover_stale_tasks($1)`,
|
||||
[staleThresholdMinutes]
|
||||
);
|
||||
return (result.rows[0] as { recover_stale_tasks: number })?.recover_stale_tasks ?? 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate daily resync tasks for all active stores
|
||||
*/
|
||||
async generateDailyResyncTasks(batchesPerDay = 6, date?: Date): Promise<number> {
|
||||
const result = await pool.query(
|
||||
`SELECT generate_resync_tasks($1, $2)`,
|
||||
[batchesPerDay, date ?? new Date()]
|
||||
);
|
||||
return (result.rows[0] as { generate_resync_tasks: number })?.generate_resync_tasks ?? 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Chain next task after completion
|
||||
* Called automatically when a task completes successfully
|
||||
*/
|
||||
async chainNextTask(completedTask: WorkerTask): Promise<WorkerTask | null> {
|
||||
if (completedTask.status !== 'completed') {
|
||||
return null;
|
||||
}
|
||||
|
||||
switch (completedTask.role) {
|
||||
case 'store_discovery': {
|
||||
// New stores discovered -> create entry_point_discovery tasks
|
||||
const newStoreIds = (completedTask.result as { newStoreIds?: number[] })?.newStoreIds;
|
||||
if (newStoreIds && newStoreIds.length > 0) {
|
||||
for (const storeId of newStoreIds) {
|
||||
await this.createTask({
|
||||
role: 'entry_point_discovery',
|
||||
dispensary_id: storeId,
|
||||
platform: completedTask.platform ?? undefined,
|
||||
priority: 10, // High priority for new stores
|
||||
});
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'entry_point_discovery': {
|
||||
// Entry point resolved -> create product_discovery task
|
||||
const success = (completedTask.result as { success?: boolean })?.success;
|
||||
if (success && completedTask.dispensary_id) {
|
||||
return this.createTask({
|
||||
role: 'product_discovery',
|
||||
dispensary_id: completedTask.dispensary_id,
|
||||
platform: completedTask.platform ?? undefined,
|
||||
priority: 10,
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case 'product_discovery': {
|
||||
// Product discovery done -> store is now ready for regular resync
|
||||
// No immediate chaining needed; will be picked up by daily batch generation
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create store discovery task for a platform/state
|
||||
*/
|
||||
async createStoreDiscoveryTask(
|
||||
platform: string,
|
||||
stateCode?: string,
|
||||
priority = 0
|
||||
): Promise<WorkerTask> {
|
||||
return this.createTask({
|
||||
role: 'store_discovery',
|
||||
platform,
|
||||
priority,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create entry point discovery task for a specific store
|
||||
*/
|
||||
async createEntryPointTask(
|
||||
dispensaryId: number,
|
||||
platform: string,
|
||||
priority = 10
|
||||
): Promise<WorkerTask> {
|
||||
return this.createTask({
|
||||
role: 'entry_point_discovery',
|
||||
dispensary_id: dispensaryId,
|
||||
platform,
|
||||
priority,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create product discovery task for a specific store
|
||||
*/
|
||||
async createProductDiscoveryTask(
|
||||
dispensaryId: number,
|
||||
platform: string,
|
||||
priority = 10
|
||||
): Promise<WorkerTask> {
|
||||
return this.createTask({
|
||||
role: 'product_discovery',
|
||||
dispensary_id: dispensaryId,
|
||||
platform,
|
||||
priority,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get task counts by status for dashboard
|
||||
*/
|
||||
async getTaskCounts(): Promise<Record<TaskStatus, number>> {
|
||||
const result = await pool.query(
|
||||
`SELECT status, COUNT(*) as count
|
||||
FROM worker_tasks
|
||||
GROUP BY status`
|
||||
);
|
||||
|
||||
const counts: Record<TaskStatus, number> = {
|
||||
pending: 0,
|
||||
claimed: 0,
|
||||
running: 0,
|
||||
completed: 0,
|
||||
failed: 0,
|
||||
stale: 0,
|
||||
};
|
||||
|
||||
for (const row of result.rows) {
|
||||
const typedRow = row as { status: TaskStatus; count: string };
|
||||
counts[typedRow.status] = parseInt(typedRow.count, 10);
|
||||
}
|
||||
|
||||
return counts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get recent task completions for a role
|
||||
*/
|
||||
async getRecentCompletions(role: TaskRole, limit = 10): Promise<WorkerTask[]> {
|
||||
const result = await pool.query(
|
||||
`SELECT * FROM worker_tasks
|
||||
WHERE role = $1 AND status = 'completed'
|
||||
ORDER BY completed_at DESC
|
||||
LIMIT $2`,
|
||||
[role, limit]
|
||||
);
|
||||
return result.rows as WorkerTask[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a store has any active tasks
|
||||
*/
|
||||
async hasActiveTask(dispensaryId: number): Promise<boolean> {
|
||||
const result = await pool.query(
|
||||
`SELECT EXISTS(
|
||||
SELECT 1 FROM worker_tasks
|
||||
WHERE dispensary_id = $1
|
||||
AND status IN ('claimed', 'running')
|
||||
) as exists`,
|
||||
[dispensaryId]
|
||||
);
|
||||
return (result.rows[0] as { exists: boolean })?.exists ?? false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the last completion time for a role
|
||||
*/
|
||||
async getLastCompletion(role: TaskRole): Promise<Date | null> {
|
||||
const result = await pool.query(
|
||||
`SELECT MAX(completed_at) as completed_at
|
||||
FROM worker_tasks
|
||||
WHERE role = $1 AND status = 'completed'`,
|
||||
[role]
|
||||
);
|
||||
return (result.rows[0] as { completed_at: Date | null })?.completed_at ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate workers needed to complete tasks within SLA
|
||||
*/
|
||||
async calculateWorkersNeeded(role: TaskRole, slaHours: number): Promise<number> {
|
||||
const capacity = await this.getRoleCapacity(role);
|
||||
if (!capacity || !capacity.tasks_per_worker_hour) {
|
||||
return 1; // Default to 1 worker if no data
|
||||
}
|
||||
|
||||
const pendingTasks = capacity.pending_tasks;
|
||||
const tasksPerWorkerHour = capacity.tasks_per_worker_hour;
|
||||
const totalTaskCapacityNeeded = pendingTasks / slaHours;
|
||||
|
||||
return Math.ceil(totalTaskCapacityNeeded / tasksPerWorkerHour);
|
||||
}
|
||||
}
|
||||
|
||||
export const taskService = new TaskService();
|
||||
266
backend/src/tasks/task-worker.ts
Normal file
266
backend/src/tasks/task-worker.ts
Normal file
@@ -0,0 +1,266 @@
|
||||
/**
|
||||
* Task Worker
|
||||
*
|
||||
* A unified worker that processes tasks from the worker_tasks queue.
|
||||
* Replaces the fragmented job systems (job_schedules, dispensary_crawl_jobs, etc.)
|
||||
*
|
||||
* Usage:
|
||||
* WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts
|
||||
*
|
||||
* Environment:
|
||||
* WORKER_ROLE - Which task role to process (required)
|
||||
* WORKER_ID - Optional custom worker ID
|
||||
* POLL_INTERVAL_MS - How often to check for tasks (default: 5000)
|
||||
* HEARTBEAT_INTERVAL_MS - How often to update heartbeat (default: 30000)
|
||||
*/
|
||||
|
||||
import { Pool } from 'pg';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import { taskService, TaskRole, WorkerTask } from './task-service';
|
||||
import { pool } from '../db/pool';
|
||||
|
||||
// Task handlers by role
|
||||
import { handleProductResync } from './handlers/product-resync';
|
||||
import { handleProductDiscovery } from './handlers/product-discovery';
|
||||
import { handleStoreDiscovery } from './handlers/store-discovery';
|
||||
import { handleEntryPointDiscovery } from './handlers/entry-point-discovery';
|
||||
import { handleAnalyticsRefresh } from './handlers/analytics-refresh';
|
||||
|
||||
const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000');
|
||||
const HEARTBEAT_INTERVAL_MS = parseInt(process.env.HEARTBEAT_INTERVAL_MS || '30000');
|
||||
|
||||
export interface TaskContext {
|
||||
pool: Pool;
|
||||
workerId: string;
|
||||
task: WorkerTask;
|
||||
heartbeat: () => Promise<void>;
|
||||
}
|
||||
|
||||
export interface TaskResult {
|
||||
success: boolean;
|
||||
productsProcessed?: number;
|
||||
snapshotsCreated?: number;
|
||||
storesDiscovered?: number;
|
||||
error?: string;
|
||||
[key: string]: unknown;
|
||||
}
|
||||
|
||||
type TaskHandler = (ctx: TaskContext) => Promise<TaskResult>;
|
||||
|
||||
const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
|
||||
product_resync: handleProductResync,
|
||||
product_discovery: handleProductDiscovery,
|
||||
store_discovery: handleStoreDiscovery,
|
||||
entry_point_discovery: handleEntryPointDiscovery,
|
||||
analytics_refresh: handleAnalyticsRefresh,
|
||||
};
|
||||
|
||||
export class TaskWorker {
|
||||
private pool: Pool;
|
||||
private workerId: string;
|
||||
private role: TaskRole;
|
||||
private isRunning: boolean = false;
|
||||
private heartbeatInterval: NodeJS.Timeout | null = null;
|
||||
private currentTask: WorkerTask | null = null;
|
||||
|
||||
constructor(role: TaskRole, workerId?: string) {
|
||||
this.pool = pool;
|
||||
this.role = role;
|
||||
this.workerId = workerId || `worker-${role}-${uuidv4().slice(0, 8)}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the worker loop
|
||||
*/
|
||||
async start(): Promise<void> {
|
||||
this.isRunning = true;
|
||||
console.log(`[TaskWorker] Starting worker ${this.workerId} for role: ${this.role}`);
|
||||
|
||||
while (this.isRunning) {
|
||||
try {
|
||||
await this.processNextTask();
|
||||
} catch (error: any) {
|
||||
console.error(`[TaskWorker] Loop error:`, error.message);
|
||||
await this.sleep(POLL_INTERVAL_MS);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[TaskWorker] Worker ${this.workerId} stopped`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the worker
|
||||
*/
|
||||
stop(): void {
|
||||
this.isRunning = false;
|
||||
this.stopHeartbeat();
|
||||
console.log(`[TaskWorker] Stopping worker ${this.workerId}...`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the next available task
|
||||
*/
|
||||
private async processNextTask(): Promise<void> {
|
||||
// Try to claim a task
|
||||
const task = await taskService.claimTask(this.role, this.workerId);
|
||||
|
||||
if (!task) {
|
||||
// No tasks available, wait and retry
|
||||
await this.sleep(POLL_INTERVAL_MS);
|
||||
return;
|
||||
}
|
||||
|
||||
this.currentTask = task;
|
||||
console.log(`[TaskWorker] Claimed task ${task.id} (${task.role}) for dispensary ${task.dispensary_id || 'N/A'}`);
|
||||
|
||||
// Start heartbeat
|
||||
this.startHeartbeat(task.id);
|
||||
|
||||
try {
|
||||
// Mark as running
|
||||
await taskService.startTask(task.id);
|
||||
|
||||
// Get handler for this role
|
||||
const handler = TASK_HANDLERS[task.role];
|
||||
if (!handler) {
|
||||
throw new Error(`No handler registered for role: ${task.role}`);
|
||||
}
|
||||
|
||||
// Create context
|
||||
const ctx: TaskContext = {
|
||||
pool: this.pool,
|
||||
workerId: this.workerId,
|
||||
task,
|
||||
heartbeat: async () => {
|
||||
await taskService.heartbeat(task.id);
|
||||
},
|
||||
};
|
||||
|
||||
// Execute the task
|
||||
const result = await handler(ctx);
|
||||
|
||||
if (result.success) {
|
||||
// Mark as completed
|
||||
await taskService.completeTask(task.id, result);
|
||||
console.log(`[TaskWorker] Task ${task.id} completed successfully`);
|
||||
|
||||
// Chain next task if applicable
|
||||
const chainedTask = await taskService.chainNextTask({
|
||||
...task,
|
||||
status: 'completed',
|
||||
result,
|
||||
});
|
||||
if (chainedTask) {
|
||||
console.log(`[TaskWorker] Chained new task ${chainedTask.id} (${chainedTask.role})`);
|
||||
}
|
||||
} else {
|
||||
// Mark as failed
|
||||
await taskService.failTask(task.id, result.error || 'Unknown error');
|
||||
console.log(`[TaskWorker] Task ${task.id} failed: ${result.error}`);
|
||||
}
|
||||
} catch (error: any) {
|
||||
// Mark as failed
|
||||
await taskService.failTask(task.id, error.message);
|
||||
console.error(`[TaskWorker] Task ${task.id} threw error:`, error.message);
|
||||
} finally {
|
||||
this.stopHeartbeat();
|
||||
this.currentTask = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start heartbeat interval
|
||||
*/
|
||||
private startHeartbeat(taskId: number): void {
|
||||
this.heartbeatInterval = setInterval(async () => {
|
||||
try {
|
||||
await taskService.heartbeat(taskId);
|
||||
} catch (error: any) {
|
||||
console.warn(`[TaskWorker] Heartbeat failed:`, error.message);
|
||||
}
|
||||
}, HEARTBEAT_INTERVAL_MS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop heartbeat interval
|
||||
*/
|
||||
private stopHeartbeat(): void {
|
||||
if (this.heartbeatInterval) {
|
||||
clearInterval(this.heartbeatInterval);
|
||||
this.heartbeatInterval = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep helper
|
||||
*/
|
||||
private sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get worker info
|
||||
*/
|
||||
getInfo(): { workerId: string; role: TaskRole; isRunning: boolean; currentTaskId: number | null } {
|
||||
return {
|
||||
workerId: this.workerId,
|
||||
role: this.role,
|
||||
isRunning: this.isRunning,
|
||||
currentTaskId: this.currentTask?.id || null,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// CLI ENTRY POINT
|
||||
// ============================================================
|
||||
|
||||
async function main(): Promise<void> {
|
||||
const role = process.env.WORKER_ROLE as TaskRole;
|
||||
|
||||
if (!role) {
|
||||
console.error('Error: WORKER_ROLE environment variable is required');
|
||||
console.error('Valid roles: store_discovery, entry_point_discovery, product_discovery, product_resync, analytics_refresh');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const validRoles: TaskRole[] = [
|
||||
'store_discovery',
|
||||
'entry_point_discovery',
|
||||
'product_discovery',
|
||||
'product_resync',
|
||||
'analytics_refresh',
|
||||
];
|
||||
|
||||
if (!validRoles.includes(role)) {
|
||||
console.error(`Error: Invalid WORKER_ROLE: ${role}`);
|
||||
console.error(`Valid roles: ${validRoles.join(', ')}`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const workerId = process.env.WORKER_ID;
|
||||
const worker = new TaskWorker(role, workerId);
|
||||
|
||||
// Handle graceful shutdown
|
||||
process.on('SIGTERM', () => {
|
||||
console.log('[TaskWorker] Received SIGTERM, shutting down...');
|
||||
worker.stop();
|
||||
});
|
||||
|
||||
process.on('SIGINT', () => {
|
||||
console.log('[TaskWorker] Received SIGINT, shutting down...');
|
||||
worker.stop();
|
||||
});
|
||||
|
||||
await worker.start();
|
||||
}
|
||||
|
||||
// Run if this is the main module
|
||||
if (require.main === module) {
|
||||
main().catch((error) => {
|
||||
console.error('[TaskWorker] Fatal error:', error);
|
||||
process.exit(1);
|
||||
});
|
||||
}
|
||||
|
||||
export { main };
|
||||
Reference in New Issue
Block a user