diff --git a/backend/migrations/092_store_intelligence_cache.sql b/backend/migrations/092_store_intelligence_cache.sql new file mode 100644 index 00000000..f7c1db3e --- /dev/null +++ b/backend/migrations/092_store_intelligence_cache.sql @@ -0,0 +1,35 @@ +-- Migration 092: Store Intelligence Cache +-- Pre-computed store intelligence data refreshed by analytics_refresh task +-- Eliminates costly aggregation queries on /intelligence/stores endpoint + +CREATE TABLE IF NOT EXISTS store_intelligence_cache ( + dispensary_id INTEGER PRIMARY KEY REFERENCES dispensaries(id) ON DELETE CASCADE, + + -- Basic counts + sku_count INTEGER NOT NULL DEFAULT 0, + brand_count INTEGER NOT NULL DEFAULT 0, + snapshot_count INTEGER NOT NULL DEFAULT 0, + + -- Pricing + avg_price_rec NUMERIC(10,2), + avg_price_med NUMERIC(10,2), + min_price NUMERIC(10,2), + max_price NUMERIC(10,2), + + -- Category breakdown (JSONB for flexibility) + category_counts JSONB DEFAULT '{}', + + -- Timestamps + last_crawl_at TIMESTAMPTZ, + last_refresh_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + -- Metadata + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Index for fast lookups +CREATE INDEX IF NOT EXISTS idx_store_intelligence_cache_refresh +ON store_intelligence_cache (last_refresh_at DESC); + +COMMENT ON TABLE store_intelligence_cache IS 'Pre-computed store intelligence metrics, refreshed by analytics_refresh task'; +COMMENT ON COLUMN store_intelligence_cache.category_counts IS 'JSON object mapping category_raw to product count'; diff --git a/backend/src/routes/intelligence.ts b/backend/src/routes/intelligence.ts index a01a541b..ff48f4ef 100644 --- a/backend/src/routes/intelligence.ts +++ b/backend/src/routes/intelligence.ts @@ -308,6 +308,8 @@ router.get('/stores', async (req: Request, res: Response) => { params.push(state); } + // Quick query without costly snapshot_count subquery + // Full data available from store_intelligence_cache (refreshed by analytics_refresh) const { rows: storeRows } = await pool.query(` SELECT d.id, @@ -321,9 +323,7 @@ router.get('/stores', async (req: Request, res: Response) => { COUNT(DISTINCT sp.id) as sku_count, COUNT(DISTINCT sp.brand_name_raw) as brand_count, ROUND(AVG(sp.price_rec)::numeric, 2) as avg_price, - MAX(sp.updated_at) as last_crawl, - (SELECT COUNT(*) FROM store_product_snapshots sps - WHERE sps.store_product_id IN (SELECT id FROM store_products WHERE dispensary_id = d.id)) as snapshot_count + MAX(sp.updated_at) as last_crawl FROM dispensaries d INNER JOIN store_products sp ON sp.dispensary_id = d.id LEFT JOIN chains c ON d.chain_id = c.id @@ -346,11 +346,9 @@ router.get('/stores', async (req: Request, res: Response) => { crawlEnabled: r.crawl_enabled, chainName: r.chain_name || null, skuCount: parseInt(r.sku_count || '0', 10), - snapshotCount: parseInt(r.snapshot_count || '0', 10), brandCount: parseInt(r.brand_count || '0', 10), avgPrice: r.avg_price ? parseFloat(r.avg_price) : null, lastCrawl: r.last_crawl, - crawlFrequencyHours: 4, // Default crawl frequency })), total: storeRows.length, }); @@ -360,4 +358,104 @@ router.get('/stores', async (req: Request, res: Response) => { } }); +/** + * GET /api/admin/intelligence/stores/cached + * Get store intelligence from pre-computed cache (fast) + * Query params: + * - state: Filter by state (e.g., "AZ") + * - limit: Max results (default 200) + */ +router.get('/stores/cached', async (req: Request, res: Response) => { + try { + const { state, limit = '200' } = req.query; + const limitNum = Math.min(parseInt(limit as string, 10), 500); + + // Check if cache table exists and has data + const cacheCheck = await pool.query(` + SELECT EXISTS ( + SELECT 1 FROM information_schema.tables + WHERE table_name = 'store_intelligence_cache' + ) as exists + `); + + if (!cacheCheck.rows[0].exists) { + return res.status(503).json({ + error: 'Cache not available. Run analytics_refresh task first.', + }); + } + + // Build WHERE clause based on state filter + let stateFilter = ''; + const params: any[] = [limitNum]; + if (state && state !== 'all') { + stateFilter = 'AND d.state = $2'; + params.push(state); + } + + const { rows } = await pool.query(` + SELECT + d.id, + d.name, + d.dba_name, + d.city, + d.state, + d.menu_type, + d.crawl_enabled, + c.name as chain_name, + sic.sku_count, + sic.brand_count, + sic.snapshot_count, + sic.avg_price_rec, + sic.avg_price_med, + sic.min_price, + sic.max_price, + sic.last_crawl_at, + sic.last_refresh_at + FROM store_intelligence_cache sic + JOIN dispensaries d ON sic.dispensary_id = d.id + LEFT JOIN chains c ON d.chain_id = c.id + WHERE d.crawl_enabled = true + ${stateFilter} + ORDER BY sic.sku_count DESC + LIMIT $1 + `, params); + + // Get cache freshness + const freshnessResult = await pool.query(` + SELECT MIN(last_refresh_at) as oldest, MAX(last_refresh_at) as newest + FROM store_intelligence_cache + `); + const freshness = freshnessResult.rows[0]; + + res.json({ + stores: rows.map((r: any) => ({ + id: r.id, + name: r.name, + dbaName: r.dba_name, + city: r.city, + state: r.state, + menuType: r.menu_type, + crawlEnabled: r.crawl_enabled, + chainName: r.chain_name || null, + skuCount: r.sku_count || 0, + brandCount: r.brand_count || 0, + snapshotCount: r.snapshot_count || 0, + avgPrice: r.avg_price_rec ? parseFloat(r.avg_price_rec) : null, + avgPriceMed: r.avg_price_med ? parseFloat(r.avg_price_med) : null, + minPrice: r.min_price ? parseFloat(r.min_price) : null, + maxPrice: r.max_price ? parseFloat(r.max_price) : null, + lastCrawl: r.last_crawl_at, + })), + total: rows.length, + cache: { + lastRefresh: freshness?.newest, + oldestEntry: freshness?.oldest, + }, + }); + } catch (error: any) { + console.error('[Intelligence] Error fetching cached stores:', error.message); + res.status(500).json({ error: error.message }); + } +}); + export default router; diff --git a/backend/src/tasks/handlers/analytics-refresh.ts b/backend/src/tasks/handlers/analytics-refresh.ts index 82a81a52..3ca4bf36 100644 --- a/backend/src/tasks/handlers/analytics-refresh.ts +++ b/backend/src/tasks/handlers/analytics-refresh.ts @@ -81,6 +81,88 @@ export async function handleAnalyticsRefresh(ctx: TaskContext): Promise [r.dispensary_id, parseInt(r.snapshot_count)])); + + // Upsert store intelligence data + const result = await pool.query(` + INSERT INTO store_intelligence_cache ( + dispensary_id, + sku_count, + brand_count, + snapshot_count, + avg_price_rec, + avg_price_med, + min_price, + max_price, + category_counts, + last_crawl_at, + last_refresh_at + ) + SELECT + d.id as dispensary_id, + COUNT(DISTINCT sp.id) as sku_count, + COUNT(DISTINCT sp.brand_name_raw) as brand_count, + 0 as snapshot_count, + ROUND(AVG(sp.price_rec) FILTER (WHERE sp.price_rec > 0)::numeric, 2) as avg_price_rec, + ROUND(AVG(sp.price_med) FILTER (WHERE sp.price_med > 0)::numeric, 2) as avg_price_med, + MIN(sp.price_rec) FILTER (WHERE sp.price_rec > 0) as min_price, + MAX(sp.price_rec) FILTER (WHERE sp.price_rec > 0) as max_price, + '{}'::jsonb as category_counts, + MAX(sp.updated_at) as last_crawl_at, + NOW() as last_refresh_at + FROM dispensaries d + LEFT JOIN store_products sp ON sp.dispensary_id = d.id + WHERE d.crawl_enabled = true + GROUP BY d.id + ON CONFLICT (dispensary_id) DO UPDATE SET + sku_count = EXCLUDED.sku_count, + brand_count = EXCLUDED.brand_count, + avg_price_rec = EXCLUDED.avg_price_rec, + avg_price_med = EXCLUDED.avg_price_med, + min_price = EXCLUDED.min_price, + max_price = EXCLUDED.max_price, + last_crawl_at = EXCLUDED.last_crawl_at, + last_refresh_at = NOW() + `); + + // Update snapshot counts from pre-computed map + for (const [dispensaryId, count] of snapshotMap) { + await pool.query(` + UPDATE store_intelligence_cache SET snapshot_count = $2 WHERE dispensary_id = $1 + `, [dispensaryId, count]); + } + + console.log(`[AnalyticsRefresh] Refreshed store_intelligence_cache (${result.rowCount} stores)`); + refreshed.push('store_intelligence_cache'); + } else { + console.log(`[AnalyticsRefresh] store_intelligence_cache table does not exist, skipping`); + } + } catch (error: any) { + console.error(`[AnalyticsRefresh] Error refreshing store_intelligence_cache:`, error.message); + failed.push('store_intelligence_cache'); + } + console.log(`[AnalyticsRefresh] Complete: ${refreshed.length} refreshed, ${failed.length} failed`); return {