/** * Pipeline Stage Transition Routes * * Explicit API endpoints for moving stores through the 6-stage pipeline: * 1. discovered → validated (POST /validate) * 2. validated → promoted (POST /promote) * 3. promoted → sandbox (POST /crawl) * 4. sandbox → production (POST /approve) * 5. production → failing (auto on crawl failure) * 6. failing → sandbox (POST /retry) * * Each endpoint: * - Does the work for that stage * - Validates success * - Updates status only after completion * - Logs to stage_transitions table */ import { Router, Request, Response } from 'express'; import { pool } from '../db/pool'; const router = Router(); // Valid stages const STAGES = ['discovered', 'validated', 'promoted', 'sandbox', 'production', 'failing'] as const; type Stage = typeof STAGES[number]; // ============================================================ // HELPER FUNCTIONS // ============================================================ /** * Log a stage transition to the audit table */ async function logTransition( entityType: 'discovery_location' | 'dispensary', entityId: number, fromStage: string | null, toStage: string, triggerType: 'api' | 'scheduler' | 'manual' | 'auto', triggerEndpoint: string, success: boolean, errorMessage?: string, metadata?: Record, durationMs?: number ): Promise { const result = await pool.query(` INSERT INTO stage_transitions (entity_type, entity_id, from_stage, to_stage, trigger_type, trigger_endpoint, success, error_message, metadata, duration_ms) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id `, [ entityType, entityId, fromStage, toStage, triggerType, triggerEndpoint, success, errorMessage || null, metadata ? JSON.stringify(metadata) : null, durationMs || null, ]); return result.rows[0].id; } /** * Create a status alert for the dashboard */ async function createAlert( dispensaryId: number | null, profileId: number | null, alertType: string, severity: 'info' | 'warning' | 'error', message: string, fromStage?: string | null, toStage?: string | null, metadata?: Record ): Promise { await pool.query(` INSERT INTO crawler_status_alerts (dispensary_id, profile_id, alert_type, severity, message, previous_status, new_status, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) `, [ dispensaryId, profileId, alertType, severity, message, fromStage || null, toStage || null, metadata ? JSON.stringify(metadata) : null, ]); } // ============================================================ // STAGE 1 → 2: VALIDATE // discovered → validated // ============================================================ /** * POST /api/pipeline/discovery/:id/validate * Validate a discovered location - check required fields */ router.post('/discovery/:id/validate', async (req: Request, res: Response) => { const startTime = Date.now(); const { id } = req.params; const locationId = parseInt(id, 10); try { // Get the discovery location const { rows } = await pool.query(` SELECT * FROM dutchie_discovery_locations WHERE id = $1 `, [locationId]); if (rows.length === 0) { return res.status(404).json({ error: 'Discovery location not found' }); } const loc = rows[0]; if (loc.stage !== 'discovered') { return res.status(400).json({ error: `Cannot validate: current stage is '${loc.stage}', expected 'discovered'`, }); } // Validate required fields const errors: string[] = []; if (!loc.platform_location_id) errors.push('Missing platform_location_id'); if (!loc.name || loc.name.trim() === '') errors.push('Missing name'); if (!loc.city || loc.city.trim() === '') errors.push('Missing city'); if (!loc.state_code || loc.state_code.trim() === '') errors.push('Missing state_code'); if (!loc.platform_menu_url) errors.push('Missing platform_menu_url'); if (errors.length > 0) { // Update to failing stage await pool.query(` UPDATE dutchie_discovery_locations SET stage = 'failing', notes = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2 `, [errors.join('; '), locationId]); await logTransition( 'discovery_location', locationId, 'discovered', 'failing', 'api', '/api/pipeline/discovery/:id/validate', false, errors.join('; '), { errors }, Date.now() - startTime ); return res.status(400).json({ success: false, stage: 'failing', errors, }); } // Update to validated stage await pool.query(` UPDATE dutchie_discovery_locations SET stage = 'validated', updated_at = CURRENT_TIMESTAMP WHERE id = $1 `, [locationId]); await logTransition( 'discovery_location', locationId, 'discovered', 'validated', 'api', '/api/pipeline/discovery/:id/validate', true, undefined, { name: loc.name, city: loc.city, state: loc.state_code }, Date.now() - startTime ); res.json({ success: true, locationId, stage: 'validated', name: loc.name, }); } catch (error: any) { console.error('[Pipeline] Validate error:', error.message); res.status(500).json({ error: error.message }); } }); /** * POST /api/pipeline/discovery/validate-batch * Validate all discovered locations (or filtered by state) */ router.post('/discovery/validate-batch', async (req: Request, res: Response) => { const { stateCode, limit = 100 } = req.body; try { let query = ` SELECT id FROM dutchie_discovery_locations WHERE stage = 'discovered' `; const params: any[] = []; if (stateCode) { query += ` AND state_code = $1`; params.push(stateCode); } query += ` ORDER BY first_seen_at LIMIT $${params.length + 1}`; params.push(limit); const { rows } = await pool.query(query, params); const results = { processed: 0, validated: 0, failed: 0, errors: [] as Array<{ id: number; errors: string[] }>, }; for (const row of rows) { // Call validate endpoint internally const validateResult = await validateSingleLocation(row.id); results.processed++; if (validateResult.success) { results.validated++; } else { results.failed++; results.errors.push({ id: row.id, errors: validateResult.errors || [] }); } } res.json(results); } catch (error: any) { console.error('[Pipeline] Validate batch error:', error.message); res.status(500).json({ error: error.message }); } }); // Internal helper for batch validation async function validateSingleLocation(locationId: number): Promise<{ success: boolean; errors?: string[] }> { const { rows } = await pool.query(` SELECT * FROM dutchie_discovery_locations WHERE id = $1 `, [locationId]); if (rows.length === 0) return { success: false, errors: ['Not found'] }; const loc = rows[0]; const errors: string[] = []; if (!loc.platform_location_id) errors.push('Missing platform_location_id'); if (!loc.name || loc.name.trim() === '') errors.push('Missing name'); if (!loc.city || loc.city.trim() === '') errors.push('Missing city'); if (!loc.state_code || loc.state_code.trim() === '') errors.push('Missing state_code'); if (!loc.platform_menu_url) errors.push('Missing platform_menu_url'); const newStage = errors.length === 0 ? 'validated' : 'failing'; await pool.query(` UPDATE dutchie_discovery_locations SET stage = $1, notes = $2, updated_at = CURRENT_TIMESTAMP WHERE id = $3 `, [newStage, errors.length > 0 ? errors.join('; ') : null, locationId]); await logTransition( 'discovery_location', locationId, loc.stage, newStage, 'scheduler', '/api/pipeline/discovery/validate-batch', errors.length === 0, errors.length > 0 ? errors.join('; ') : undefined ); return { success: errors.length === 0, errors: errors.length > 0 ? errors : undefined }; } // ============================================================ // STAGE 2 → 3: PROMOTE // validated → promoted // ============================================================ /** * POST /api/pipeline/discovery/:id/promote * Promote a validated location to dispensaries table */ router.post('/discovery/:id/promote', async (req: Request, res: Response) => { const startTime = Date.now(); const { id } = req.params; const locationId = parseInt(id, 10); try { // Get the discovery location const { rows } = await pool.query(` SELECT * FROM dutchie_discovery_locations WHERE id = $1 `, [locationId]); if (rows.length === 0) { return res.status(404).json({ error: 'Discovery location not found' }); } const loc = rows[0]; if (loc.stage !== 'validated') { return res.status(400).json({ error: `Cannot promote: current stage is '${loc.stage}', expected 'validated'`, }); } // Generate slug const slug = (loc.platform_slug || `${loc.name}-${loc.city}-${loc.state_code}`) .toLowerCase() .replace(/[^a-z0-9]+/g, '-') .replace(/^-|-$/g, '') .substring(0, 100); // Upsert to dispensaries const upsertResult = await pool.query(` INSERT INTO dispensaries ( platform, name, slug, city, state, address1, postal_code, latitude, longitude, timezone, platform_dispensary_id, menu_url, menu_type, offer_pickup, offer_delivery, is_medical, is_recreational, country, stage, stage_changed_at, crawl_enabled, dutchie_discovery_id, created_at, updated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, 'dutchie', $13, $14, $15, $16, $17, 'promoted', CURRENT_TIMESTAMP, true, $18, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP ) ON CONFLICT (platform_dispensary_id) WHERE platform_dispensary_id IS NOT NULL DO UPDATE SET name = EXCLUDED.name, city = EXCLUDED.city, state = EXCLUDED.state, menu_url = EXCLUDED.menu_url, stage = 'promoted', stage_changed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP RETURNING id, (xmax = 0) AS inserted `, [ loc.platform || 'dutchie', loc.name, slug, loc.city, loc.state_code, loc.address_line1, loc.postal_code, loc.latitude, loc.longitude, loc.timezone, loc.platform_location_id, loc.platform_menu_url, loc.offers_pickup ?? true, loc.offers_delivery ?? false, loc.is_medical ?? false, loc.is_recreational ?? true, loc.country || 'United States', locationId, ]); const dispensaryId = upsertResult.rows[0].id; const wasCreated = upsertResult.rows[0].inserted; // Create crawler profile const profileKey = loc.name .toLowerCase() .replace(/[^a-z0-9]+/g, '-') .replace(/^-|-$/g, '') .substring(0, 50); await pool.query(` INSERT INTO dispensary_crawler_profiles ( dispensary_id, profile_name, profile_key, crawler_type, status, status_reason, config, enabled, consecutive_successes, consecutive_failures, created_at, updated_at ) VALUES ( $1, $2, $3, 'dutchie', 'promoted', 'Promoted from discovery', $4::jsonb, true, 0, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP ) ON CONFLICT (dispensary_id) WHERE enabled = true DO UPDATE SET status = 'promoted', status_reason = 'Re-promoted from discovery', updated_at = CURRENT_TIMESTAMP `, [ dispensaryId, loc.name, profileKey, JSON.stringify({ platformDispensaryId: loc.platform_location_id, useBothModes: true, downloadImages: true, trackStock: true, }), ]); // Update discovery location await pool.query(` UPDATE dutchie_discovery_locations SET stage = 'promoted', dispensary_id = $1, verified_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = $2 `, [dispensaryId, locationId]); await logTransition( 'discovery_location', locationId, 'validated', 'promoted', 'api', '/api/pipeline/discovery/:id/promote', true, undefined, { dispensaryId, wasCreated, name: loc.name }, Date.now() - startTime ); await createAlert( dispensaryId, null, 'stage_change', 'info', `${loc.name} promoted to dispensaries table`, 'validated', 'promoted', { locationId, wasCreated } ); res.json({ success: true, locationId, dispensaryId, stage: 'promoted', action: wasCreated ? 'created' : 'updated', name: loc.name, }); } catch (error: any) { console.error('[Pipeline] Promote error:', error.message); res.status(500).json({ error: error.message }); } }); /** * POST /api/pipeline/discovery/promote-batch * Promote all validated locations (or filtered by state) */ router.post('/discovery/promote-batch', async (req: Request, res: Response) => { const { stateCode, limit = 100 } = req.body; try { let query = ` SELECT id FROM dutchie_discovery_locations WHERE stage = 'validated' `; const params: any[] = []; if (stateCode) { query += ` AND state_code = $1`; params.push(stateCode); } query += ` ORDER BY first_seen_at LIMIT $${params.length + 1}`; params.push(limit); const { rows } = await pool.query(query, params); const results = { processed: 0, promoted: 0, failed: 0, dispensaryIds: [] as number[], }; for (const row of rows) { try { const promoteResult = await promoteSingleLocation(row.id); results.processed++; if (promoteResult.success) { results.promoted++; if (promoteResult.dispensaryId) { results.dispensaryIds.push(promoteResult.dispensaryId); } } else { results.failed++; } } catch (e) { results.processed++; results.failed++; } } res.json(results); } catch (error: any) { console.error('[Pipeline] Promote batch error:', error.message); res.status(500).json({ error: error.message }); } }); // Internal helper for batch promotion async function promoteSingleLocation(locationId: number): Promise<{ success: boolean; dispensaryId?: number }> { // Simplified version - reuses logic from promote endpoint const { rows } = await pool.query(` SELECT * FROM dutchie_discovery_locations WHERE id = $1 AND stage = 'validated' `, [locationId]); if (rows.length === 0) return { success: false }; const loc = rows[0]; const slug = (loc.platform_slug || `${loc.name}-${loc.city}-${loc.state_code}`) .toLowerCase().replace(/[^a-z0-9]+/g, '-').replace(/^-|-$/g, '').substring(0, 100); const upsertResult = await pool.query(` INSERT INTO dispensaries ( platform, name, slug, city, state, platform_dispensary_id, menu_url, menu_type, stage, stage_changed_at, crawl_enabled, dutchie_discovery_id, created_at, updated_at ) VALUES ( 'dutchie', $1, $2, $3, $4, $5, $6, 'dutchie', 'promoted', CURRENT_TIMESTAMP, true, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP ) ON CONFLICT (platform_dispensary_id) WHERE platform_dispensary_id IS NOT NULL DO UPDATE SET stage = 'promoted', stage_changed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP RETURNING id `, [loc.name, slug, loc.city, loc.state_code, loc.platform_location_id, loc.platform_menu_url, locationId]); const dispensaryId = upsertResult.rows[0].id; await pool.query(` UPDATE dutchie_discovery_locations SET stage = 'promoted', dispensary_id = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2 `, [dispensaryId, locationId]); await logTransition( 'discovery_location', locationId, 'validated', 'promoted', 'scheduler', '/api/pipeline/discovery/promote-batch', true, undefined, { dispensaryId } ); return { success: true, dispensaryId }; } // ============================================================ // STAGE 3 → 4: CRAWL (First Crawl) // promoted → sandbox // ============================================================ /** * POST /api/pipeline/stores/:id/crawl * Attempt first crawl for a promoted store */ router.post('/stores/:id/crawl', async (req: Request, res: Response) => { const startTime = Date.now(); const { id } = req.params; const dispensaryId = parseInt(id, 10); try { // Get the dispensary const { rows } = await pool.query(` SELECT d.*, dcp.id as profile_id, dcp.config FROM dispensaries d LEFT JOIN dispensary_crawler_profiles dcp ON dcp.dispensary_id = d.id AND dcp.enabled = true WHERE d.id = $1 `, [dispensaryId]); if (rows.length === 0) { return res.status(404).json({ error: 'Dispensary not found' }); } const disp = rows[0]; if (disp.stage !== 'promoted') { return res.status(400).json({ error: `Cannot crawl: current stage is '${disp.stage}', expected 'promoted'`, }); } if (!disp.platform_dispensary_id) { return res.status(400).json({ error: 'Missing platform_dispensary_id' }); } // TODO: Actually call the Dutchie GraphQL API to fetch products // For now, we'll just transition to sandbox and mark first_crawl_at // The actual crawl will be implemented in the crawler module // Update to sandbox stage await pool.query(` UPDATE dispensaries SET stage = 'sandbox', stage_changed_at = CURRENT_TIMESTAMP, first_crawl_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = $1 `, [dispensaryId]); await pool.query(` UPDATE dispensary_crawler_profiles SET status = 'sandbox', status_reason = 'First crawl initiated', updated_at = CURRENT_TIMESTAMP WHERE dispensary_id = $1 AND enabled = true `, [dispensaryId]); await logTransition( 'dispensary', dispensaryId, 'promoted', 'sandbox', 'api', '/api/pipeline/stores/:id/crawl', true, undefined, { name: disp.name, platformId: disp.platform_dispensary_id }, Date.now() - startTime ); await createAlert( dispensaryId, disp.profile_id, 'stage_change', 'info', `${disp.name} moved to sandbox - first crawl initiated`, 'promoted', 'sandbox' ); res.json({ success: true, dispensaryId, stage: 'sandbox', name: disp.name, message: 'First crawl initiated - store is now in sandbox', }); } catch (error: any) { console.error('[Pipeline] Crawl error:', error.message); res.status(500).json({ error: error.message }); } }); /** * POST /api/pipeline/stores/crawl-batch * Initiate first crawl for all promoted stores (or filtered by state) */ router.post('/stores/crawl-batch', async (req: Request, res: Response) => { const { stateCode, limit = 50 } = req.body; try { let query = ` SELECT id FROM dispensaries WHERE stage = 'promoted' AND platform_dispensary_id IS NOT NULL `; const params: any[] = []; if (stateCode) { query += ` AND state = $1`; params.push(stateCode); } query += ` ORDER BY created_at LIMIT $${params.length + 1}`; params.push(limit); const { rows } = await pool.query(query, params); const results = { processed: 0, crawled: 0, failed: 0, }; for (const row of rows) { try { await pool.query(` UPDATE dispensaries SET stage = 'sandbox', stage_changed_at = CURRENT_TIMESTAMP, first_crawl_at = CURRENT_TIMESTAMP WHERE id = $1 `, [row.id]); await pool.query(` UPDATE dispensary_crawler_profiles SET status = 'sandbox', status_reason = 'First crawl initiated (batch)' WHERE dispensary_id = $1 AND enabled = true `, [row.id]); await logTransition( 'dispensary', row.id, 'promoted', 'sandbox', 'scheduler', '/api/pipeline/stores/crawl-batch', true ); results.crawled++; } catch (e) { results.failed++; } results.processed++; } res.json(results); } catch (error: any) { console.error('[Pipeline] Crawl batch error:', error.message); res.status(500).json({ error: error.message }); } }); // ============================================================ // STAGE 4 → 5: APPROVE // sandbox → production // ============================================================ /** * POST /api/pipeline/stores/:id/approve * Approve a sandbox store for production (requires products) */ router.post('/stores/:id/approve', async (req: Request, res: Response) => { const startTime = Date.now(); const { id } = req.params; const { force = false } = req.body; const dispensaryId = parseInt(id, 10); try { // Get the dispensary with product count const { rows } = await pool.query(` SELECT d.*, dcp.id as profile_id, (SELECT COUNT(*) FROM store_products sp WHERE sp.dispensary_id = d.id) as product_count FROM dispensaries d LEFT JOIN dispensary_crawler_profiles dcp ON dcp.dispensary_id = d.id AND dcp.enabled = true WHERE d.id = $1 `, [dispensaryId]); if (rows.length === 0) { return res.status(404).json({ error: 'Dispensary not found' }); } const disp = rows[0]; if (disp.stage !== 'sandbox') { return res.status(400).json({ error: `Cannot approve: current stage is '${disp.stage}', expected 'sandbox'`, }); } const productCount = parseInt(disp.product_count || '0', 10); // Require products unless force=true if (productCount === 0 && !force) { return res.status(400).json({ error: 'Cannot approve: no products found. Use force=true to override.', productCount, }); } // Update to production stage await pool.query(` UPDATE dispensaries SET stage = 'production', stage_changed_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = $1 `, [dispensaryId]); await pool.query(` UPDATE dispensary_crawler_profiles SET status = 'production', status_reason = 'Approved for production', consecutive_successes = 1, consecutive_failures = 0, updated_at = CURRENT_TIMESTAMP WHERE dispensary_id = $1 AND enabled = true `, [dispensaryId]); await logTransition( 'dispensary', dispensaryId, 'sandbox', 'production', 'api', '/api/pipeline/stores/:id/approve', true, undefined, { name: disp.name, productCount, forced: force }, Date.now() - startTime ); await createAlert( dispensaryId, disp.profile_id, 'stage_change', 'info', `${disp.name} approved for production with ${productCount} products`, 'sandbox', 'production', { productCount } ); res.json({ success: true, dispensaryId, stage: 'production', name: disp.name, productCount, }); } catch (error: any) { console.error('[Pipeline] Approve error:', error.message); res.status(500).json({ error: error.message }); } }); /** * POST /api/pipeline/stores/approve-batch * Approve all sandbox stores that have products */ router.post('/stores/approve-batch', async (req: Request, res: Response) => { const { stateCode, minProducts = 1, limit = 50 } = req.body; try { let query = ` SELECT d.id, d.name, (SELECT COUNT(*) FROM store_products sp WHERE sp.dispensary_id = d.id) as product_count FROM dispensaries d WHERE d.stage = 'sandbox' `; const params: any[] = []; if (stateCode) { query += ` AND d.state = $1`; params.push(stateCode); } query += ` ORDER BY d.first_crawl_at LIMIT $${params.length + 1}`; params.push(limit); const { rows } = await pool.query(query, params); const results = { processed: 0, approved: 0, skipped: 0, skippedReasons: [] as Array<{ id: number; name: string; productCount: number }>, }; for (const row of rows) { const productCount = parseInt(row.product_count || '0', 10); results.processed++; if (productCount < minProducts) { results.skipped++; results.skippedReasons.push({ id: row.id, name: row.name, productCount }); continue; } await pool.query(` UPDATE dispensaries SET stage = 'production', stage_changed_at = CURRENT_TIMESTAMP WHERE id = $1 `, [row.id]); await pool.query(` UPDATE dispensary_crawler_profiles SET status = 'production', status_reason = 'Auto-approved (batch)' WHERE dispensary_id = $1 AND enabled = true `, [row.id]); await logTransition( 'dispensary', row.id, 'sandbox', 'production', 'scheduler', '/api/pipeline/stores/approve-batch', true, undefined, { productCount } ); results.approved++; } res.json(results); } catch (error: any) { console.error('[Pipeline] Approve batch error:', error.message); res.status(500).json({ error: error.message }); } }); // ============================================================ // STAGE 5 → 6: FAIL // production → failing (called on crawl error) // ============================================================ /** * POST /api/pipeline/stores/:id/fail * Mark a store as failing (usually called by crawler on error) */ router.post('/stores/:id/fail', async (req: Request, res: Response) => { const startTime = Date.now(); const { id } = req.params; const { error: errorMessage, consecutiveFailures = 1 } = req.body; const dispensaryId = parseInt(id, 10); try { const { rows } = await pool.query(` SELECT d.*, dcp.id as profile_id, dcp.consecutive_failures FROM dispensaries d LEFT JOIN dispensary_crawler_profiles dcp ON dcp.dispensary_id = d.id AND dcp.enabled = true WHERE d.id = $1 `, [dispensaryId]); if (rows.length === 0) { return res.status(404).json({ error: 'Dispensary not found' }); } const disp = rows[0]; const previousStage = disp.stage; // Only production stores can move to failing if (previousStage !== 'production' && previousStage !== 'sandbox') { return res.status(400).json({ error: `Cannot fail: current stage is '${previousStage}'`, }); } await pool.query(` UPDATE dispensaries SET stage = 'failing', stage_changed_at = CURRENT_TIMESTAMP WHERE id = $1 `, [dispensaryId]); await pool.query(` UPDATE dispensary_crawler_profiles SET status = 'failing', status_reason = $1, consecutive_failures = consecutive_failures + 1, consecutive_successes = 0 WHERE dispensary_id = $2 AND enabled = true `, [errorMessage || 'Crawl failed', dispensaryId]); await logTransition( 'dispensary', dispensaryId, previousStage, 'failing', 'api', '/api/pipeline/stores/:id/fail', true, errorMessage, { consecutiveFailures }, Date.now() - startTime ); await createAlert( dispensaryId, disp.profile_id, 'crawl_error', 'error', `${disp.name} moved to failing: ${errorMessage || 'Unknown error'}`, previousStage, 'failing', { error: errorMessage } ); res.json({ success: true, dispensaryId, stage: 'failing', previousStage, }); } catch (error: any) { console.error('[Pipeline] Fail error:', error.message); res.status(500).json({ error: error.message }); } }); // ============================================================ // STAGE 6 → 4: RETRY // failing → sandbox (manual retry) // ============================================================ /** * POST /api/pipeline/stores/:id/retry * Retry a failing store (moves back to sandbox) */ router.post('/stores/:id/retry', async (req: Request, res: Response) => { const startTime = Date.now(); const { id } = req.params; const dispensaryId = parseInt(id, 10); try { const { rows } = await pool.query(` SELECT d.*, dcp.id as profile_id FROM dispensaries d LEFT JOIN dispensary_crawler_profiles dcp ON dcp.dispensary_id = d.id AND dcp.enabled = true WHERE d.id = $1 `, [dispensaryId]); if (rows.length === 0) { return res.status(404).json({ error: 'Dispensary not found' }); } const disp = rows[0]; if (disp.stage !== 'failing') { return res.status(400).json({ error: `Cannot retry: current stage is '${disp.stage}', expected 'failing'`, }); } await pool.query(` UPDATE dispensaries SET stage = 'sandbox', stage_changed_at = CURRENT_TIMESTAMP WHERE id = $1 `, [dispensaryId]); await pool.query(` UPDATE dispensary_crawler_profiles SET status = 'sandbox', status_reason = 'Manual retry', consecutive_failures = 0, consecutive_successes = 0 WHERE dispensary_id = $1 AND enabled = true `, [dispensaryId]); await logTransition( 'dispensary', dispensaryId, 'failing', 'sandbox', 'api', '/api/pipeline/stores/:id/retry', true, undefined, { name: disp.name }, Date.now() - startTime ); await createAlert( dispensaryId, disp.profile_id, 'stage_change', 'info', `${disp.name} moved back to sandbox for retry`, 'failing', 'sandbox' ); res.json({ success: true, dispensaryId, stage: 'sandbox', name: disp.name, }); } catch (error: any) { console.error('[Pipeline] Retry error:', error.message); res.status(500).json({ error: error.message }); } }); // ============================================================ // PIPELINE STATS // ============================================================ /** * GET /api/pipeline/stats * Get counts for each stage */ router.get('/stats', async (_req: Request, res: Response) => { try { // Discovery locations by stage const { rows: discoveryStats } = await pool.query(` SELECT stage, COUNT(*) as count FROM dutchie_discovery_locations WHERE active = true GROUP BY stage `); // Dispensaries by stage const { rows: dispensaryStats } = await pool.query(` SELECT stage, COUNT(*) as count FROM dispensaries WHERE crawl_enabled = true GROUP BY stage `); // By state for dispensaries const { rows: byState } = await pool.query(` SELECT state, stage, COUNT(*) as count FROM dispensaries WHERE crawl_enabled = true AND state IS NOT NULL GROUP BY state, stage ORDER BY state, stage `); res.json({ discovery: discoveryStats.reduce((acc: Record, r: any) => { acc[r.stage || 'unknown'] = parseInt(r.count, 10); return acc; }, {}), dispensaries: dispensaryStats.reduce((acc: Record, r: any) => { acc[r.stage || 'unknown'] = parseInt(r.count, 10); return acc; }, {}), byState: byState.map((r: any) => ({ state: r.state, stage: r.stage, count: parseInt(r.count, 10), })), }); } catch (error: any) { console.error('[Pipeline] Stats error:', error.message); res.status(500).json({ error: error.message }); } }); /** * GET /api/pipeline/transitions * Get recent stage transitions */ router.get('/transitions', async (req: Request, res: Response) => { try { const { limit = '50', entityType, toStage } = req.query; let whereClause = 'WHERE 1=1'; const params: any[] = []; let paramIndex = 1; if (entityType) { whereClause += ` AND entity_type = $${paramIndex}`; params.push(entityType); paramIndex++; } if (toStage) { whereClause += ` AND to_stage = $${paramIndex}`; params.push(toStage); paramIndex++; } params.push(parseInt(limit as string, 10)); const { rows } = await pool.query(` SELECT st.*, CASE WHEN st.entity_type = 'dispensary' THEN (SELECT name FROM dispensaries WHERE id = st.entity_id) WHEN st.entity_type = 'discovery_location' THEN (SELECT name FROM dutchie_discovery_locations WHERE id = st.entity_id) END as entity_name FROM stage_transitions st ${whereClause} ORDER BY st.created_at DESC LIMIT $${paramIndex} `, params); res.json({ transitions: rows.map((r: any) => ({ id: r.id, entityType: r.entity_type, entityId: r.entity_id, entityName: r.entity_name, fromStage: r.from_stage, toStage: r.to_stage, triggerType: r.trigger_type, triggerEndpoint: r.trigger_endpoint, success: r.success, errorMessage: r.error_message, metadata: r.metadata, durationMs: r.duration_ms, createdAt: r.created_at, })), }); } catch (error: any) { console.error('[Pipeline] Transitions error:', error.message); res.status(500).json({ error: error.message }); } }); export default router;