feat(api): Add job queue management endpoints and fix SQL type errors

- Add GET /api/job-queue/available - list dispensaries available for crawling
- Add GET /api/job-queue/history - get recent job history with results
- Add POST /api/job-queue/enqueue-batch - queue multiple dispensaries at once
- Add POST /api/job-queue/enqueue-state - queue all crawl-enabled dispensaries for a state
- Add POST /api/job-queue/clear-pending - clear pending jobs with optional filters
- Fix SQL parameter type errors by adding explicit casts ($2::text, $3::integer)
- Fix route ordering to prevent /:id from matching /available and /history

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-09 14:10:55 -07:00
parent 4e5c09a2a5
commit bbe039c868
2 changed files with 330 additions and 10 deletions

View File

@@ -143,6 +143,152 @@ router.get('/', async (req: Request, res: Response) => {
}
});
/**
* GET /api/job-queue/available - List dispensaries available for crawling
* Query: { state_code?: string, limit?: number }
* NOTE: Must be defined BEFORE /:id route to avoid conflict
*/
router.get('/available', async (req: Request, res: Response) => {
try {
const { state_code, limit = '100' } = req.query;
let query = `
SELECT
d.id,
d.name,
d.city,
s.code as state_code,
d.platform_dispensary_id,
d.crawl_enabled,
(SELECT MAX(created_at) FROM dispensary_crawl_jobs WHERE dispensary_id = d.id AND status = 'completed') as last_crawl,
EXISTS (
SELECT 1 FROM dispensary_crawl_jobs
WHERE dispensary_id = d.id AND status IN ('pending', 'running')
) as has_pending_job
FROM dispensaries d
LEFT JOIN states s ON s.id = d.state_id
WHERE d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
`;
const params: any[] = [];
let paramIndex = 1;
if (state_code) {
params.push((state_code as string).toUpperCase());
query += ` AND s.code = $${paramIndex++}`;
}
query += ` ORDER BY d.name LIMIT $${paramIndex}`;
params.push(parseInt(limit as string));
const { rows } = await pool.query(query, params);
// Get counts by state
const { rows: stateCounts } = await pool.query(`
SELECT s.code, COUNT(*) as count
FROM dispensaries d
JOIN states s ON s.id = d.state_id
WHERE d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
GROUP BY s.code
ORDER BY count DESC
`);
res.json({
success: true,
dispensaries: rows,
total: rows.length,
by_state: stateCounts
});
} catch (error: any) {
console.error('[JobQueue] Error listing available:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/job-queue/history - Get recent job history with results
* Query: { state_code?: string, status?: string, limit?: number, hours?: number }
* NOTE: Must be defined BEFORE /:id route to avoid conflict
*/
router.get('/history', async (req: Request, res: Response) => {
try {
const {
state_code,
status,
limit = '50',
hours = '24'
} = req.query;
let query = `
SELECT
j.id,
j.dispensary_id,
d.name as dispensary_name,
s.code as state_code,
j.job_type,
j.status,
j.products_found,
j.error_message,
j.started_at,
j.completed_at,
j.duration_ms,
j.created_at
FROM dispensary_crawl_jobs j
LEFT JOIN dispensaries d ON d.id = j.dispensary_id
LEFT JOIN states s ON s.id = d.state_id
WHERE j.created_at > NOW() - INTERVAL '${parseInt(hours as string)} hours'
`;
const params: any[] = [];
let paramIndex = 1;
if (status && status !== 'all') {
params.push(status);
query += ` AND j.status = $${paramIndex++}`;
}
if (state_code) {
params.push((state_code as string).toUpperCase());
query += ` AND s.code = $${paramIndex++}`;
}
query += ` ORDER BY j.created_at DESC LIMIT $${paramIndex}`;
params.push(parseInt(limit as string));
const { rows } = await pool.query(query, params);
// Get summary stats
const { rows: stats } = await pool.query(`
SELECT
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed,
COUNT(*) FILTER (WHERE status = 'running') as running,
COUNT(*) FILTER (WHERE status = 'pending') as pending,
SUM(products_found) FILTER (WHERE status = 'completed') as total_products,
AVG(duration_ms) FILTER (WHERE status = 'completed') as avg_duration_ms
FROM dispensary_crawl_jobs
WHERE created_at > NOW() - INTERVAL '${parseInt(hours as string)} hours'
`);
res.json({
success: true,
jobs: rows,
summary: {
completed: parseInt(stats[0].completed) || 0,
failed: parseInt(stats[0].failed) || 0,
running: parseInt(stats[0].running) || 0,
pending: parseInt(stats[0].pending) || 0,
total_products: parseInt(stats[0].total_products) || 0,
avg_duration_ms: Math.round(parseFloat(stats[0].avg_duration_ms)) || null
},
hours: parseInt(hours as string)
});
} catch (error: any) {
console.error('[JobQueue] Error getting history:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/job-queue/stats - Queue statistics
*/
@@ -463,5 +609,165 @@ router.get('/paused', async (_req: Request, res: Response) => {
res.json({ success: true, queue_paused: queuePaused });
});
/**
* POST /api/job-queue/enqueue-batch - Queue multiple dispensaries at once
* Body: { dispensary_ids: number[], job_type?: string, priority?: number }
*/
router.post('/enqueue-batch', async (req: Request, res: Response) => {
try {
const { dispensary_ids, job_type = 'dutchie_product_crawl', priority = 0 } = req.body;
if (!Array.isArray(dispensary_ids) || dispensary_ids.length === 0) {
return res.status(400).json({ success: false, error: 'dispensary_ids array is required' });
}
if (dispensary_ids.length > 500) {
return res.status(400).json({ success: false, error: 'Maximum 500 dispensaries per batch' });
}
// Insert jobs, skipping duplicates
const { rows } = await pool.query(`
INSERT INTO dispensary_crawl_jobs (dispensary_id, job_type, priority, trigger_type, status, created_at)
SELECT
d.id,
$2::text,
$3::integer,
'api_batch',
'pending',
NOW()
FROM dispensaries d
WHERE d.id = ANY($1::int[])
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM dispensary_crawl_jobs cj
WHERE cj.dispensary_id = d.id
AND cj.job_type = $2::text
AND cj.status IN ('pending', 'running')
)
RETURNING id, dispensary_id
`, [dispensary_ids, job_type, priority]);
res.json({
success: true,
queued: rows.length,
requested: dispensary_ids.length,
job_ids: rows.map(r => r.id),
message: `Queued ${rows.length} of ${dispensary_ids.length} dispensaries`
});
} catch (error: any) {
console.error('[JobQueue] Error batch enqueuing:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/job-queue/enqueue-state - Queue all crawl-enabled dispensaries for a state
* Body: { state_code: string, job_type?: string, priority?: number, limit?: number }
*/
router.post('/enqueue-state', async (req: Request, res: Response) => {
try {
const { state_code, job_type = 'dutchie_product_crawl', priority = 0, limit = 200 } = req.body;
if (!state_code) {
return res.status(400).json({ success: false, error: 'state_code is required (e.g., "AZ")' });
}
// Get state_id and queue jobs
const { rows } = await pool.query(`
WITH target_state AS (
SELECT id FROM states WHERE code = $1
)
INSERT INTO dispensary_crawl_jobs (dispensary_id, job_type, priority, trigger_type, status, created_at)
SELECT
d.id,
$2::text,
$3::integer,
'api_state',
'pending',
NOW()
FROM dispensaries d, target_state
WHERE d.state_id = target_state.id
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM dispensary_crawl_jobs cj
WHERE cj.dispensary_id = d.id
AND cj.job_type = $2::text
AND cj.status IN ('pending', 'running')
)
LIMIT $4::integer
RETURNING id, dispensary_id
`, [state_code.toUpperCase(), job_type, priority, limit]);
// Get total available count
const countResult = await pool.query(`
WITH target_state AS (
SELECT id FROM states WHERE code = $1
)
SELECT COUNT(*) as total
FROM dispensaries d, target_state
WHERE d.state_id = target_state.id
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
`, [state_code.toUpperCase()]);
res.json({
success: true,
queued: rows.length,
total_available: parseInt(countResult.rows[0].total),
state: state_code.toUpperCase(),
job_type,
message: `Queued ${rows.length} dispensaries for ${state_code.toUpperCase()}`
});
} catch (error: any) {
console.error('[JobQueue] Error enqueuing state:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/job-queue/clear-pending - Clear all pending jobs (optionally filtered)
* Body: { state_code?: string, job_type?: string }
*/
router.post('/clear-pending', async (req: Request, res: Response) => {
try {
const { state_code, job_type } = req.body;
let query = `
UPDATE dispensary_crawl_jobs
SET status = 'cancelled', completed_at = NOW(), updated_at = NOW()
WHERE status = 'pending'
`;
const params: any[] = [];
let paramIndex = 1;
if (job_type) {
params.push(job_type);
query += ` AND job_type = $${paramIndex++}`;
}
if (state_code) {
params.push((state_code as string).toUpperCase());
query += ` AND dispensary_id IN (
SELECT d.id FROM dispensaries d
JOIN states s ON s.id = d.state_id
WHERE s.code = $${paramIndex++}
)`;
}
const result = await pool.query(query, params);
res.json({
success: true,
cleared: result.rowCount,
message: `Cancelled ${result.rowCount} pending jobs`
});
} catch (error: any) {
console.error('[JobQueue] Error clearing pending:', error);
res.status(500).json({ success: false, error: error.message });
}
});
export default router;
export { queuePaused };