feat(cannaiq): Add Workers Dashboard and visibility tracking

Workers Dashboard:
- New /workers route with two-pane layout
- Workers table showing Alice, Henry, Bella, Oscar with role badges
- Run history with visibility stats (lost/restored counts)
- "Run Now" action to trigger workers immediately

Migrations:
- 057: Add visibility tracking columns (visibility_lost, visibility_lost_at, visibility_restored_at)
- 058: Add ID resolution columns for Henry worker
- 059: Add job queue columns (max_retries, retry_count, worker_id, locked_at, locked_by)

Backend fixes:
- Add httpStatus to CrawlResult interface for error classification
- Fix pool.ts typing for event listener
- Update completeJob to accept visibility stats in metadata

Frontend fixes:
- Fix NationalDashboard crash with safe formatMoney helper
- Fix OrchestratorDashboard/Stores StoreInfo type mismatches
- Add workerName/workerRole to getDutchieAZSchedules API type

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-07 11:04:12 -07:00
parent 1d1263afc6
commit 8ac64ba077
22 changed files with 7022 additions and 134 deletions

View File

@@ -0,0 +1,643 @@
/**
* StateQueryService
*
* Core service for multi-state queries and analytics.
* Phase 4: Multi-State Expansion
*/
import { Pool } from 'pg';
import {
State,
StateMetrics,
StateSummary,
BrandInState,
BrandStatePenetration,
BrandCrossStateComparison,
CategoryInState,
CategoryStateDist,
CategoryCrossStateComparison,
StoreInState,
StatePriceDistribution,
NationalSummary,
NationalPenetrationTrend,
StateHeatmapData,
StateQueryOptions,
CrossStateQueryOptions,
} from './types';
export class StateQueryService {
constructor(private pool: Pool) {}
// =========================================================================
// State List & Basic Queries
// =========================================================================
/**
* Get all available states
*/
async listStates(): Promise<State[]> {
const result = await this.pool.query(`
SELECT code, name
FROM states
ORDER BY name
`);
return result.rows;
}
/**
* Get states that have dispensary data
*/
async listActiveStates(): Promise<State[]> {
const result = await this.pool.query(`
SELECT DISTINCT s.code, s.name
FROM states s
JOIN dispensaries d ON d.state = s.code
WHERE d.menu_type IS NOT NULL
ORDER BY s.name
`);
return result.rows;
}
// =========================================================================
// State Summary & Metrics
// =========================================================================
/**
* Get summary metrics for a single state
*/
async getStateSummary(state: string): Promise<StateSummary | null> {
// Get base metrics from materialized view
const metricsResult = await this.pool.query(`
SELECT
state,
state_name AS "stateName",
dispensary_count AS "storeCount",
dispensary_count AS "dutchieStores",
dispensary_count AS "activeStores",
total_products AS "totalProducts",
in_stock_products AS "inStockProducts",
out_of_stock_products AS "outOfStockProducts",
unique_brands AS "uniqueBrands",
unique_categories AS "uniqueCategories",
avg_price_rec AS "avgPriceRec",
min_price_rec AS "minPriceRec",
max_price_rec AS "maxPriceRec",
refreshed_at AS "refreshedAt"
FROM mv_state_metrics
WHERE state = $1
`, [state]);
if (metricsResult.rows.length === 0) {
return null;
}
const metrics = metricsResult.rows[0];
// Get crawl stats
const crawlResult = await this.pool.query(`
SELECT
COUNT(*) FILTER (WHERE cr.status = 'success' AND cr.started_at > NOW() - INTERVAL '24 hours') AS recent_crawls,
COUNT(*) FILTER (WHERE cr.status = 'failed' AND cr.started_at > NOW() - INTERVAL '24 hours') AS failed_crawls,
MAX(cr.finished_at) AS last_crawl_at
FROM crawl_runs cr
JOIN dispensaries d ON cr.dispensary_id = d.id
WHERE d.state = $1
`, [state]);
// Get top brands
const topBrands = await this.getBrandsByState(state, { limit: 5 });
// Get top categories
const topCategories = await this.getCategoriesByState(state, { limit: 5 });
return {
...metrics,
recentCrawls: parseInt(crawlResult.rows[0]?.recent_crawls || '0'),
failedCrawls: parseInt(crawlResult.rows[0]?.failed_crawls || '0'),
lastCrawlAt: crawlResult.rows[0]?.last_crawl_at || null,
topBrands,
topCategories,
};
}
/**
* Get metrics for all states
*/
async getAllStateMetrics(): Promise<StateMetrics[]> {
const result = await this.pool.query(`
SELECT
state,
state_name AS "stateName",
dispensary_count AS "storeCount",
dispensary_count AS "dutchieStores",
dispensary_count AS "activeStores",
total_products AS "totalProducts",
in_stock_products AS "inStockProducts",
out_of_stock_products AS "outOfStockProducts",
unique_brands AS "uniqueBrands",
unique_categories AS "uniqueCategories",
avg_price_rec AS "avgPriceRec",
min_price_rec AS "minPriceRec",
max_price_rec AS "maxPriceRec",
refreshed_at AS "refreshedAt"
FROM mv_state_metrics
ORDER BY dispensary_count DESC
`);
return result.rows;
}
// =========================================================================
// Brand Queries
// =========================================================================
/**
* Get brands present in a specific state
*/
async getBrandsByState(state: string, options: StateQueryOptions = {}): Promise<BrandInState[]> {
const { limit = 50, offset = 0, sortBy = 'productCount', sortDir = 'desc' } = options;
const sortColumn = {
productCount: 'product_count',
storeCount: 'store_count',
avgPrice: 'avg_price',
name: 'brand_name',
}[sortBy] || 'product_count';
const result = await this.pool.query(`
SELECT
brand_id AS "brandId",
brand_name AS "brandName",
brand_slug AS "brandSlug",
store_count AS "storeCount",
product_count AS "productCount",
avg_price AS "avgPrice",
first_seen_in_state AS "firstSeenInState",
last_seen_in_state AS "lastSeenInState"
FROM v_brand_state_presence
WHERE state = $1
ORDER BY ${sortColumn} ${sortDir === 'asc' ? 'ASC' : 'DESC'}
LIMIT $2 OFFSET $3
`, [state, limit, offset]);
return result.rows;
}
/**
* Get brand penetration across all states
*/
async getBrandStatePenetration(brandId: number): Promise<BrandStatePenetration[]> {
const result = await this.pool.query(`
SELECT
state,
state_name AS "stateName",
total_stores AS "totalStores",
stores_with_brand AS "storesWithBrand",
penetration_pct AS "penetrationPct",
product_count AS "productCount",
avg_price AS "avgPrice"
FROM fn_brand_state_penetration($1)
`, [brandId]);
return result.rows;
}
/**
* Compare a brand across multiple states
*/
async compareBrandAcrossStates(
brandId: number,
states: string[]
): Promise<BrandCrossStateComparison> {
// Get brand info
const brandResult = await this.pool.query(`
SELECT id, name FROM brands WHERE id = $1
`, [brandId]);
if (brandResult.rows.length === 0) {
throw new Error(`Brand ${brandId} not found`);
}
const brand = brandResult.rows[0];
// Get penetration for specified states
const allPenetration = await this.getBrandStatePenetration(brandId);
const filteredStates = allPenetration.filter(p => states.includes(p.state));
// Calculate national metrics
const nationalResult = await this.pool.query(`
SELECT
COUNT(DISTINCT d.id) AS total_stores,
COUNT(DISTINCT CASE WHEN sp.brand_id = $1 THEN d.id END) AS stores_with_brand,
AVG(sp.price_rec) FILTER (WHERE sp.brand_id = $1) AS avg_price
FROM dispensaries d
LEFT JOIN store_products sp ON d.id = sp.dispensary_id
WHERE d.state IS NOT NULL
`, [brandId]);
const nationalData = nationalResult.rows[0];
const nationalPenetration = nationalData.total_stores > 0
? (nationalData.stores_with_brand / nationalData.total_stores) * 100
: 0;
// Find best/worst states
const sortedByPenetration = [...filteredStates].sort(
(a, b) => b.penetrationPct - a.penetrationPct
);
return {
brandId,
brandName: brand.name,
states: filteredStates,
nationalPenetration: Math.round(nationalPenetration * 100) / 100,
nationalAvgPrice: nationalData.avg_price
? Math.round(nationalData.avg_price * 100) / 100
: null,
bestPerformingState: sortedByPenetration[0]?.state || null,
worstPerformingState: sortedByPenetration[sortedByPenetration.length - 1]?.state || null,
};
}
// =========================================================================
// Category Queries
// =========================================================================
/**
* Get categories in a specific state
*/
async getCategoriesByState(state: string, options: StateQueryOptions = {}): Promise<CategoryInState[]> {
const { limit = 50, offset = 0, sortBy = 'productCount', sortDir = 'desc' } = options;
const sortColumn = {
productCount: 'product_count',
storeCount: 'store_count',
avgPrice: 'avg_price',
category: 'category',
}[sortBy] || 'product_count';
const result = await this.pool.query(`
SELECT
category,
product_count AS "productCount",
store_count AS "storeCount",
avg_price AS "avgPrice",
in_stock_count AS "inStockCount",
on_special_count AS "onSpecialCount"
FROM v_category_state_distribution
WHERE state = $1
ORDER BY ${sortColumn} ${sortDir === 'asc' ? 'ASC' : 'DESC'}
LIMIT $2 OFFSET $3
`, [state, limit, offset]);
return result.rows;
}
/**
* Compare a category across multiple states
*/
async compareCategoryAcrossStates(
category: string,
states: string[]
): Promise<CategoryCrossStateComparison> {
const result = await this.pool.query(`
SELECT
v.state,
s.name AS "stateName",
v.category,
v.product_count AS "productCount",
v.store_count AS "storeCount",
v.avg_price AS "avgPrice",
ROUND(v.product_count::NUMERIC / SUM(v.product_count) OVER () * 100, 2) AS "marketShare"
FROM v_category_state_distribution v
JOIN states s ON v.state = s.code
WHERE v.category = $1
AND v.state = ANY($2)
ORDER BY v.product_count DESC
`, [category, states]);
// Get national totals
const nationalResult = await this.pool.query(`
SELECT
COUNT(DISTINCT sp.id) AS product_count,
AVG(sp.price_rec) AS avg_price
FROM store_products sp
WHERE sp.category_raw = $1
`, [category]);
const national = nationalResult.rows[0];
// Find dominant state
const dominantState = result.rows.length > 0 ? result.rows[0].state : null;
return {
category,
states: result.rows,
nationalProductCount: parseInt(national.product_count || '0'),
nationalAvgPrice: national.avg_price
? Math.round(national.avg_price * 100) / 100
: null,
dominantState,
};
}
// =========================================================================
// Store Queries
// =========================================================================
/**
* Get stores in a specific state
*/
async getStoresByState(state: string, options: StateQueryOptions = {}): Promise<StoreInState[]> {
const { limit = 100, offset = 0, includeInactive = false, sortBy = 'productCount', sortDir = 'desc' } = options;
const sortColumn = {
productCount: 'product_count',
brandCount: 'brand_count',
avgPrice: 'avg_price',
name: 'dispensary_name',
city: 'city',
lastCrawl: 'last_crawl_at',
}[sortBy] || 'product_count';
let whereClause = 'WHERE state = $1';
if (!includeInactive) {
whereClause += ` AND crawl_status != 'disabled'`;
}
const result = await this.pool.query(`
SELECT
dispensary_id AS "dispensaryId",
dispensary_name AS "dispensaryName",
dispensary_slug AS "dispensarySlug",
state,
city,
menu_type AS "menuType",
crawl_status AS "crawlStatus",
last_crawl_at AS "lastCrawlAt",
product_count AS "productCount",
in_stock_count AS "inStockCount",
brand_count AS "brandCount",
avg_price AS "avgPrice",
special_count AS "specialCount"
FROM v_store_state_summary
${whereClause}
ORDER BY ${sortColumn} ${sortDir === 'asc' ? 'ASC' : 'DESC'} NULLS LAST
LIMIT $2 OFFSET $3
`, [state, limit, offset]);
return result.rows;
}
// =========================================================================
// Price Analytics
// =========================================================================
/**
* Get price distribution by state
*/
async getStorePriceDistribution(
state: string,
options: { category?: string; brandId?: number } = {}
): Promise<StatePriceDistribution[]> {
const { category, brandId } = options;
const result = await this.pool.query(`
SELECT * FROM fn_national_price_comparison($1, $2)
WHERE state = $3
`, [category || null, brandId || null, state]);
return result.rows.map(row => ({
state: row.state,
stateName: row.state_name,
productCount: parseInt(row.product_count),
avgPrice: parseFloat(row.avg_price),
minPrice: parseFloat(row.min_price),
maxPrice: parseFloat(row.max_price),
medianPrice: parseFloat(row.median_price),
priceStddev: parseFloat(row.price_stddev),
}));
}
/**
* Get national price comparison across all states
*/
async getNationalPriceComparison(
options: { category?: string; brandId?: number } = {}
): Promise<StatePriceDistribution[]> {
const { category, brandId } = options;
const result = await this.pool.query(`
SELECT * FROM fn_national_price_comparison($1, $2)
`, [category || null, brandId || null]);
return result.rows.map(row => ({
state: row.state,
stateName: row.state_name,
productCount: parseInt(row.product_count),
avgPrice: parseFloat(row.avg_price),
minPrice: parseFloat(row.min_price),
maxPrice: parseFloat(row.max_price),
medianPrice: parseFloat(row.median_price),
priceStddev: parseFloat(row.price_stddev),
}));
}
// =========================================================================
// National Analytics
// =========================================================================
/**
* Get national summary across all states
*/
async getNationalSummary(): Promise<NationalSummary> {
const stateMetrics = await this.getAllStateMetrics();
const result = await this.pool.query(`
SELECT
COUNT(DISTINCT s.code) AS total_states,
COUNT(DISTINCT CASE WHEN EXISTS (
SELECT 1 FROM dispensaries d WHERE d.state = s.code AND d.menu_type IS NOT NULL
) THEN s.code END) AS active_states,
(SELECT COUNT(*) FROM dispensaries WHERE state IS NOT NULL) AS total_stores,
(SELECT COUNT(*) FROM store_products sp
JOIN dispensaries d ON sp.dispensary_id = d.id
WHERE d.state IS NOT NULL) AS total_products,
(SELECT COUNT(DISTINCT brand_id) FROM store_products sp
JOIN dispensaries d ON sp.dispensary_id = d.id
WHERE d.state IS NOT NULL AND sp.brand_id IS NOT NULL) AS total_brands,
(SELECT AVG(price_rec) FROM store_products sp
JOIN dispensaries d ON sp.dispensary_id = d.id
WHERE d.state IS NOT NULL AND sp.price_rec > 0) AS avg_price_national
FROM states s
`);
const data = result.rows[0];
return {
totalStates: parseInt(data.total_states),
activeStates: parseInt(data.active_states),
totalStores: parseInt(data.total_stores),
totalProducts: parseInt(data.total_products),
totalBrands: parseInt(data.total_brands),
avgPriceNational: data.avg_price_national
? Math.round(parseFloat(data.avg_price_national) * 100) / 100
: null,
stateMetrics,
};
}
/**
* Get heatmap data for a specific metric
*/
async getStateHeatmapData(
metric: 'stores' | 'products' | 'brands' | 'avgPrice' | 'penetration',
options: { brandId?: number; category?: string } = {}
): Promise<StateHeatmapData[]> {
let query: string;
let params: any[] = [];
switch (metric) {
case 'stores':
query = `
SELECT state, state_name AS "stateName", dispensary_count AS value, 'stores' AS label
FROM mv_state_metrics
WHERE state IS NOT NULL
ORDER BY state
`;
break;
case 'products':
query = `
SELECT state, state_name AS "stateName", total_products AS value, 'products' AS label
FROM mv_state_metrics
WHERE state IS NOT NULL
ORDER BY state
`;
break;
case 'brands':
query = `
SELECT state, state_name AS "stateName", unique_brands AS value, 'brands' AS label
FROM mv_state_metrics
WHERE state IS NOT NULL
ORDER BY state
`;
break;
case 'avgPrice':
query = `
SELECT state, state_name AS "stateName", avg_price_rec AS value, 'avg price' AS label
FROM mv_state_metrics
WHERE state IS NOT NULL AND avg_price_rec IS NOT NULL
ORDER BY state
`;
break;
case 'penetration':
if (!options.brandId) {
throw new Error('brandId required for penetration heatmap');
}
query = `
SELECT state, state_name AS "stateName", penetration_pct AS value, 'penetration %' AS label
FROM fn_brand_state_penetration($1)
ORDER BY state
`;
params = [options.brandId];
break;
default:
throw new Error(`Unknown metric: ${metric}`);
}
const result = await this.pool.query(query, params);
return result.rows;
}
/**
* Get national penetration trend for a brand
*/
async getNationalPenetrationTrend(
brandId: number,
options: { days?: number } = {}
): Promise<NationalPenetrationTrend> {
const { days = 30 } = options;
// Get brand info
const brandResult = await this.pool.query(`
SELECT id, name FROM brands WHERE id = $1
`, [brandId]);
if (brandResult.rows.length === 0) {
throw new Error(`Brand ${brandId} not found`);
}
// Get historical data from snapshots
const result = await this.pool.query(`
WITH daily_presence AS (
SELECT
DATE(sps.captured_at) AS date,
COUNT(DISTINCT d.state) AS states_present,
COUNT(DISTINCT d.id) AS stores_with_brand
FROM store_product_snapshots sps
JOIN dispensaries d ON sps.dispensary_id = d.id
JOIN store_products sp ON sps.store_product_id = sp.id
WHERE sp.brand_id = $1
AND sps.captured_at > NOW() - INTERVAL '1 day' * $2
AND d.state IS NOT NULL
GROUP BY DATE(sps.captured_at)
),
daily_totals AS (
SELECT
DATE(sps.captured_at) AS date,
COUNT(DISTINCT d.id) AS total_stores
FROM store_product_snapshots sps
JOIN dispensaries d ON sps.dispensary_id = d.id
WHERE sps.captured_at > NOW() - INTERVAL '1 day' * $2
AND d.state IS NOT NULL
GROUP BY DATE(sps.captured_at)
)
SELECT
dp.date,
dp.states_present,
dt.total_stores,
ROUND(dp.stores_with_brand::NUMERIC / NULLIF(dt.total_stores, 0) * 100, 2) AS penetration_pct
FROM daily_presence dp
JOIN daily_totals dt ON dp.date = dt.date
ORDER BY dp.date
`, [brandId, days]);
return {
brandId,
brandName: brandResult.rows[0].name,
dataPoints: result.rows.map(row => ({
date: row.date.toISOString().split('T')[0],
statesPresent: parseInt(row.states_present),
totalStores: parseInt(row.total_stores),
penetrationPct: parseFloat(row.penetration_pct || '0'),
})),
};
}
// =========================================================================
// Utility Methods
// =========================================================================
/**
* Refresh materialized views
* Uses direct REFRESH MATERIALIZED VIEW for compatibility
*/
async refreshMetrics(): Promise<void> {
// Use direct refresh command instead of function call for better compatibility
// CONCURRENTLY requires a unique index (idx_mv_state_metrics_state exists)
await this.pool.query('REFRESH MATERIALIZED VIEW CONCURRENTLY mv_state_metrics');
}
/**
* Validate state code
*/
async isValidState(state: string): Promise<boolean> {
const result = await this.pool.query(`
SELECT 1 FROM states WHERE code = $1
`, [state]);
return result.rows.length > 0;
}
}