diff --git a/backend/src/routes/job-queue.ts b/backend/src/routes/job-queue.ts index fdb07cf2..e52a79a1 100644 --- a/backend/src/routes/job-queue.ts +++ b/backend/src/routes/job-queue.ts @@ -143,6 +143,152 @@ router.get('/', async (req: Request, res: Response) => { } }); +/** + * GET /api/job-queue/available - List dispensaries available for crawling + * Query: { state_code?: string, limit?: number } + * NOTE: Must be defined BEFORE /:id route to avoid conflict + */ +router.get('/available', async (req: Request, res: Response) => { + try { + const { state_code, limit = '100' } = req.query; + + let query = ` + SELECT + d.id, + d.name, + d.city, + s.code as state_code, + d.platform_dispensary_id, + d.crawl_enabled, + (SELECT MAX(created_at) FROM dispensary_crawl_jobs WHERE dispensary_id = d.id AND status = 'completed') as last_crawl, + EXISTS ( + SELECT 1 FROM dispensary_crawl_jobs + WHERE dispensary_id = d.id AND status IN ('pending', 'running') + ) as has_pending_job + FROM dispensaries d + LEFT JOIN states s ON s.id = d.state_id + WHERE d.crawl_enabled = true + AND d.platform_dispensary_id IS NOT NULL + `; + const params: any[] = []; + let paramIndex = 1; + + if (state_code) { + params.push((state_code as string).toUpperCase()); + query += ` AND s.code = $${paramIndex++}`; + } + + query += ` ORDER BY d.name LIMIT $${paramIndex}`; + params.push(parseInt(limit as string)); + + const { rows } = await pool.query(query, params); + + // Get counts by state + const { rows: stateCounts } = await pool.query(` + SELECT s.code, COUNT(*) as count + FROM dispensaries d + JOIN states s ON s.id = d.state_id + WHERE d.crawl_enabled = true + AND d.platform_dispensary_id IS NOT NULL + GROUP BY s.code + ORDER BY count DESC + `); + + res.json({ + success: true, + dispensaries: rows, + total: rows.length, + by_state: stateCounts + }); + } catch (error: any) { + console.error('[JobQueue] Error listing available:', error); + res.status(500).json({ success: false, error: error.message }); + } +}); + +/** + * GET /api/job-queue/history - Get recent job history with results + * Query: { state_code?: string, status?: string, limit?: number, hours?: number } + * NOTE: Must be defined BEFORE /:id route to avoid conflict + */ +router.get('/history', async (req: Request, res: Response) => { + try { + const { + state_code, + status, + limit = '50', + hours = '24' + } = req.query; + + let query = ` + SELECT + j.id, + j.dispensary_id, + d.name as dispensary_name, + s.code as state_code, + j.job_type, + j.status, + j.products_found, + j.error_message, + j.started_at, + j.completed_at, + j.duration_ms, + j.created_at + FROM dispensary_crawl_jobs j + LEFT JOIN dispensaries d ON d.id = j.dispensary_id + LEFT JOIN states s ON s.id = d.state_id + WHERE j.created_at > NOW() - INTERVAL '${parseInt(hours as string)} hours' + `; + const params: any[] = []; + let paramIndex = 1; + + if (status && status !== 'all') { + params.push(status); + query += ` AND j.status = $${paramIndex++}`; + } + + if (state_code) { + params.push((state_code as string).toUpperCase()); + query += ` AND s.code = $${paramIndex++}`; + } + + query += ` ORDER BY j.created_at DESC LIMIT $${paramIndex}`; + params.push(parseInt(limit as string)); + + const { rows } = await pool.query(query, params); + + // Get summary stats + const { rows: stats } = await pool.query(` + SELECT + COUNT(*) FILTER (WHERE status = 'completed') as completed, + COUNT(*) FILTER (WHERE status = 'failed') as failed, + COUNT(*) FILTER (WHERE status = 'running') as running, + COUNT(*) FILTER (WHERE status = 'pending') as pending, + SUM(products_found) FILTER (WHERE status = 'completed') as total_products, + AVG(duration_ms) FILTER (WHERE status = 'completed') as avg_duration_ms + FROM dispensary_crawl_jobs + WHERE created_at > NOW() - INTERVAL '${parseInt(hours as string)} hours' + `); + + res.json({ + success: true, + jobs: rows, + summary: { + completed: parseInt(stats[0].completed) || 0, + failed: parseInt(stats[0].failed) || 0, + running: parseInt(stats[0].running) || 0, + pending: parseInt(stats[0].pending) || 0, + total_products: parseInt(stats[0].total_products) || 0, + avg_duration_ms: Math.round(parseFloat(stats[0].avg_duration_ms)) || null + }, + hours: parseInt(hours as string) + }); + } catch (error: any) { + console.error('[JobQueue] Error getting history:', error); + res.status(500).json({ success: false, error: error.message }); + } +}); + /** * GET /api/job-queue/stats - Queue statistics */ @@ -463,5 +609,165 @@ router.get('/paused', async (_req: Request, res: Response) => { res.json({ success: true, queue_paused: queuePaused }); }); +/** + * POST /api/job-queue/enqueue-batch - Queue multiple dispensaries at once + * Body: { dispensary_ids: number[], job_type?: string, priority?: number } + */ +router.post('/enqueue-batch', async (req: Request, res: Response) => { + try { + const { dispensary_ids, job_type = 'dutchie_product_crawl', priority = 0 } = req.body; + + if (!Array.isArray(dispensary_ids) || dispensary_ids.length === 0) { + return res.status(400).json({ success: false, error: 'dispensary_ids array is required' }); + } + + if (dispensary_ids.length > 500) { + return res.status(400).json({ success: false, error: 'Maximum 500 dispensaries per batch' }); + } + + // Insert jobs, skipping duplicates + const { rows } = await pool.query(` + INSERT INTO dispensary_crawl_jobs (dispensary_id, job_type, priority, trigger_type, status, created_at) + SELECT + d.id, + $2::text, + $3::integer, + 'api_batch', + 'pending', + NOW() + FROM dispensaries d + WHERE d.id = ANY($1::int[]) + AND d.crawl_enabled = true + AND d.platform_dispensary_id IS NOT NULL + AND NOT EXISTS ( + SELECT 1 FROM dispensary_crawl_jobs cj + WHERE cj.dispensary_id = d.id + AND cj.job_type = $2::text + AND cj.status IN ('pending', 'running') + ) + RETURNING id, dispensary_id + `, [dispensary_ids, job_type, priority]); + + res.json({ + success: true, + queued: rows.length, + requested: dispensary_ids.length, + job_ids: rows.map(r => r.id), + message: `Queued ${rows.length} of ${dispensary_ids.length} dispensaries` + }); + } catch (error: any) { + console.error('[JobQueue] Error batch enqueuing:', error); + res.status(500).json({ success: false, error: error.message }); + } +}); + +/** + * POST /api/job-queue/enqueue-state - Queue all crawl-enabled dispensaries for a state + * Body: { state_code: string, job_type?: string, priority?: number, limit?: number } + */ +router.post('/enqueue-state', async (req: Request, res: Response) => { + try { + const { state_code, job_type = 'dutchie_product_crawl', priority = 0, limit = 200 } = req.body; + + if (!state_code) { + return res.status(400).json({ success: false, error: 'state_code is required (e.g., "AZ")' }); + } + + // Get state_id and queue jobs + const { rows } = await pool.query(` + WITH target_state AS ( + SELECT id FROM states WHERE code = $1 + ) + INSERT INTO dispensary_crawl_jobs (dispensary_id, job_type, priority, trigger_type, status, created_at) + SELECT + d.id, + $2::text, + $3::integer, + 'api_state', + 'pending', + NOW() + FROM dispensaries d, target_state + WHERE d.state_id = target_state.id + AND d.crawl_enabled = true + AND d.platform_dispensary_id IS NOT NULL + AND NOT EXISTS ( + SELECT 1 FROM dispensary_crawl_jobs cj + WHERE cj.dispensary_id = d.id + AND cj.job_type = $2::text + AND cj.status IN ('pending', 'running') + ) + LIMIT $4::integer + RETURNING id, dispensary_id + `, [state_code.toUpperCase(), job_type, priority, limit]); + + // Get total available count + const countResult = await pool.query(` + WITH target_state AS ( + SELECT id FROM states WHERE code = $1 + ) + SELECT COUNT(*) as total + FROM dispensaries d, target_state + WHERE d.state_id = target_state.id + AND d.crawl_enabled = true + AND d.platform_dispensary_id IS NOT NULL + `, [state_code.toUpperCase()]); + + res.json({ + success: true, + queued: rows.length, + total_available: parseInt(countResult.rows[0].total), + state: state_code.toUpperCase(), + job_type, + message: `Queued ${rows.length} dispensaries for ${state_code.toUpperCase()}` + }); + } catch (error: any) { + console.error('[JobQueue] Error enqueuing state:', error); + res.status(500).json({ success: false, error: error.message }); + } +}); + +/** + * POST /api/job-queue/clear-pending - Clear all pending jobs (optionally filtered) + * Body: { state_code?: string, job_type?: string } + */ +router.post('/clear-pending', async (req: Request, res: Response) => { + try { + const { state_code, job_type } = req.body; + + let query = ` + UPDATE dispensary_crawl_jobs + SET status = 'cancelled', completed_at = NOW(), updated_at = NOW() + WHERE status = 'pending' + `; + const params: any[] = []; + let paramIndex = 1; + + if (job_type) { + params.push(job_type); + query += ` AND job_type = $${paramIndex++}`; + } + + if (state_code) { + params.push((state_code as string).toUpperCase()); + query += ` AND dispensary_id IN ( + SELECT d.id FROM dispensaries d + JOIN states s ON s.id = d.state_id + WHERE s.code = $${paramIndex++} + )`; + } + + const result = await pool.query(query, params); + + res.json({ + success: true, + cleared: result.rowCount, + message: `Cancelled ${result.rowCount} pending jobs` + }); + } catch (error: any) { + console.error('[JobQueue] Error clearing pending:', error); + res.status(500).json({ success: false, error: error.message }); + } +}); + export default router; export { queuePaused }; diff --git a/cannaiq/src/pages/Users.tsx b/cannaiq/src/pages/Users.tsx index c2699d88..56832feb 100644 --- a/cannaiq/src/pages/Users.tsx +++ b/cannaiq/src/pages/Users.tsx @@ -141,13 +141,21 @@ export function Users() { }; const canModifyUser = (user: User) => { - // Can't modify yourself - if (currentUser?.id === user.id) return false; // Only superadmin can modify superadmin users if (user.role === 'superadmin' && currentUser?.role !== 'superadmin') return false; return true; }; + const canDeleteUser = (user: User) => { + // Can't delete yourself + if (currentUser?.id === user.id) return false; + // Only superadmin can delete superadmin users + if (user.role === 'superadmin' && currentUser?.role !== 'superadmin') return false; + return true; + }; + + const isEditingSelf = (user: User) => currentUser?.id === user.id; + return (
@@ -236,15 +244,17 @@ export function Users() { {new Date(user.created_at).toLocaleDateString()} - {canModifyUser(user) ? ( -
+
+ {canModifyUser(user) && ( + )} + {canDeleteUser(user) ? ( -
- ) : ( - - )} + ) : !canModifyUser(user) && ( + + )} +
))} @@ -349,11 +359,15 @@ export function Users() {