Files
cannaiq/backend/src/services/inventory-snapshots.ts
Kelly 9f3bc8a843
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
fix: Worker task concurrency limit and inventory tracking
- Fix claim_task to enforce max 5 tasks per worker (was unlimited)
- Add session_task_count check before ANY claiming path
- Add triggers to auto-decrement count on task complete/release
- Update MAX_CONCURRENT_TASKS default from 3 to 5
- Update frontend fallback to show 5 task slots

- Add Wasabi S3 storage for payload archival
- Add inventory snapshots service (delta-only tracking)
- Add sales analytics views and routes
- Add high-frequency manager UI components
- Reset hardcoded AZ 5-minute intervals (use UI to configure)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-17 01:34:38 -07:00

483 lines
14 KiB
TypeScript

/**
* Inventory Snapshots Service (Delta-Only)
*
* Only stores snapshots when something CHANGES (quantity, price, status).
* This reduces storage by ~95% while capturing all meaningful events.
*
* Part of Real-Time Inventory Tracking feature.
*
* Change types:
* - sale: quantity decreased (qty_delta < 0)
* - restock: quantity increased (qty_delta > 0)
* - price_change: price changed but quantity same
* - oos: went out of stock (quantity -> 0)
* - back_in_stock: came back in stock (0 -> quantity)
* - new_product: first time seeing this product
*/
import { Pool } from 'pg';
export type Platform = 'dutchie' | 'jane' | 'treez';
interface SnapshotRow {
product_id: string;
quantity_available: number | null;
is_below_threshold: boolean;
status: string | null;
price_rec: number | null;
price_med: number | null;
price_rec_special: number | null;
price_med_special: number | null;
is_on_special: boolean;
brand_name: string | null;
category: string | null;
product_name: string | null;
}
interface PreviousState {
quantity_available: number | null;
price_rec: number | null;
price_med: number | null;
status: string | null;
captured_at: Date;
}
interface DeltaSnapshot extends SnapshotRow {
prev_quantity: number | null;
prev_price_rec: number | null;
prev_price_med: number | null;
prev_status: string | null;
qty_delta: number | null;
price_delta: number | null;
change_type: string;
effective_price_rec: number | null;
effective_price_med: number | null;
revenue_rec: number | null;
revenue_med: number | null;
hours_since_last: number | null;
}
/**
* Extract a normalized snapshot row from a raw product based on platform.
*/
function normalizeProduct(product: any, platform: Platform): SnapshotRow | null {
let productId: string | null = null;
let quantityAvailable: number | null = null;
let isBelowThreshold = false;
let status: string | null = null;
let priceRec: number | null = null;
let priceMed: number | null = null;
let priceRecSpecial: number | null = null;
let priceMedSpecial: number | null = null;
let isOnSpecial = false;
let brandName: string | null = null;
let category: string | null = null;
let productName: string | null = null;
switch (platform) {
case 'dutchie': {
productId = product.id;
productName = product.Name || product.name;
brandName = product.brand?.name || null;
category = product.category || null;
status = product.Status || 'Active';
isBelowThreshold = product.isBelowThreshold === true;
// Quantity: sum from children if available
if (product.children && Array.isArray(product.children)) {
quantityAvailable = product.children.reduce(
(sum: number, child: any) => sum + (child.quantityAvailable || child.quantity || 0),
0
);
} else if (product.quantityAvailable != null) {
quantityAvailable = product.quantityAvailable;
}
// Prices: prefer recPrices, fall back to Prices
const recPrices = product.recPrices || product.Prices || [];
priceRec = recPrices.length > 0 ? parseFloat(recPrices[0]) : null;
const medPrices = product.medicalPrices || product.medPrices || [];
priceMed = medPrices.length > 0 ? parseFloat(medPrices[0]) : null;
// Special/sale prices
if (product.specialPrices && product.specialPrices.length > 0) {
priceRecSpecial = parseFloat(product.specialPrices[0]);
isOnSpecial = true;
} else if (product.discountedPrices && product.discountedPrices.length > 0) {
priceRecSpecial = parseFloat(product.discountedPrices[0]);
isOnSpecial = true;
}
break;
}
case 'jane': {
productId = String(product.product_id);
productName = product.name;
brandName = product.brand || null;
category = product.kind || null;
status = 'Active';
isBelowThreshold = false;
quantityAvailable = product.max_cart_quantity ?? null;
priceRec =
product.bucket_price ||
product.price_gram ||
product.price_eighth_ounce ||
product.price_each ||
null;
priceMed = null;
// Jane sale prices
if (product.discounted_price && priceRec && product.discounted_price < priceRec) {
priceRecSpecial = product.discounted_price;
isOnSpecial = true;
}
break;
}
case 'treez': {
productId = product.id;
productName = product.name || product.menuTitle;
brandName = product.brand || null;
category = product.category || null;
status = product.status || (product.isActive ? 'ACTIVE' : 'INACTIVE');
quantityAvailable = product.availableUnits ?? null;
isBelowThreshold = product.isAboveThreshold === false;
priceRec = product.customMinPrice ?? null;
priceMed = null;
// Treez sale prices
if (product.customOnSaleValue && priceRec && product.customOnSaleValue < priceRec) {
priceRecSpecial = product.customOnSaleValue;
isOnSpecial = true;
}
break;
}
}
if (!productId) {
return null;
}
return {
product_id: productId,
quantity_available: quantityAvailable,
is_below_threshold: isBelowThreshold,
status,
price_rec: priceRec,
price_med: priceMed,
price_rec_special: priceRecSpecial,
price_med_special: priceMedSpecial,
is_on_special: isOnSpecial,
brand_name: brandName,
category,
product_name: productName,
};
}
/**
* Determine if product state changed and calculate deltas
*/
function calculateDelta(
current: SnapshotRow,
previous: PreviousState | null,
now: Date
): DeltaSnapshot | null {
const qtyChanged =
previous?.quantity_available !== current.quantity_available;
const priceRecChanged =
previous?.price_rec !== current.price_rec;
const priceMedChanged =
previous?.price_med !== current.price_med;
const statusChanged =
previous?.status !== current.status;
// No change - skip
if (previous && !qtyChanged && !priceRecChanged && !priceMedChanged && !statusChanged) {
return null;
}
// Calculate qty delta
const prevQty = previous?.quantity_available ?? null;
const currQty = current.quantity_available ?? 0;
const qtyDelta = previous ? currQty - (prevQty ?? 0) : null;
// Calculate price delta
const priceDelta = previous && current.price_rec && previous.price_rec
? current.price_rec - previous.price_rec
: null;
// Determine change type
let changeType = 'new_product';
if (previous) {
if (currQty === 0 && (prevQty ?? 0) > 0) {
changeType = 'oos';
} else if (currQty > 0 && (prevQty ?? 0) === 0) {
changeType = 'back_in_stock';
} else if (qtyDelta !== null && qtyDelta < 0) {
changeType = 'sale';
} else if (qtyDelta !== null && qtyDelta > 0) {
changeType = 'restock';
} else if (priceRecChanged || priceMedChanged) {
changeType = 'price_change';
} else {
changeType = 'status_change';
}
}
// Calculate effective prices (sale price if on special, otherwise regular)
const effectivePriceRec = current.is_on_special && current.price_rec_special
? current.price_rec_special
: current.price_rec;
const effectivePriceMed = current.is_on_special && current.price_med_special
? current.price_med_special
: current.price_med;
// Calculate revenue (only for sales)
let revenueRec: number | null = null;
let revenueMed: number | null = null;
if (changeType === 'sale' && qtyDelta !== null && qtyDelta < 0) {
const unitsSold = Math.abs(qtyDelta);
if (effectivePriceRec) {
revenueRec = unitsSold * effectivePriceRec;
}
if (effectivePriceMed) {
revenueMed = unitsSold * effectivePriceMed;
}
}
// Calculate hours since last snapshot
let hoursSinceLast: number | null = null;
if (previous?.captured_at) {
const msDiff = now.getTime() - previous.captured_at.getTime();
hoursSinceLast = Math.round((msDiff / 3600000) * 100) / 100; // 2 decimal places
}
return {
...current,
prev_quantity: prevQty,
prev_price_rec: previous?.price_rec ?? null,
prev_price_med: previous?.price_med ?? null,
prev_status: previous?.status ?? null,
qty_delta: qtyDelta,
price_delta: priceDelta,
change_type: changeType,
effective_price_rec: effectivePriceRec,
effective_price_med: effectivePriceMed,
revenue_rec: revenueRec,
revenue_med: revenueMed,
hours_since_last: hoursSinceLast,
};
}
/**
* Get the previous snapshot state for a dispensary.
* Returns a map of product_id -> previous state.
*/
export async function getPreviousSnapshots(
pool: Pool,
dispensaryId: number
): Promise<Map<string, PreviousState>> {
const result = await pool.query(
`
SELECT DISTINCT ON (product_id)
product_id,
quantity_available,
price_rec,
price_med,
status,
captured_at
FROM inventory_snapshots
WHERE dispensary_id = $1
ORDER BY product_id, captured_at DESC
`,
[dispensaryId]
);
const map = new Map<string, PreviousState>();
for (const row of result.rows) {
map.set(row.product_id, {
quantity_available: row.quantity_available,
price_rec: row.price_rec ? parseFloat(row.price_rec) : null,
price_med: row.price_med ? parseFloat(row.price_med) : null,
status: row.status,
captured_at: row.captured_at,
});
}
return map;
}
/**
* Save delta-only inventory snapshots.
* Only stores rows where something changed (qty, price, or status).
*
* @param pool - Database connection pool
* @param dispensaryId - The dispensary ID
* @param products - Array of raw products from the platform
* @param platform - The platform type
* @returns Object with counts: { total, changed, sales, restocks }
*/
export async function saveInventorySnapshots(
pool: Pool,
dispensaryId: number,
products: any[],
platform: Platform
): Promise<{ total: number; changed: number; sales: number; restocks: number; revenue: number }> {
if (!products || products.length === 0) {
return { total: 0, changed: 0, sales: 0, restocks: 0, revenue: 0 };
}
const now = new Date();
// Get previous state for comparison
const previousStates = await getPreviousSnapshots(pool, dispensaryId);
// Normalize products and calculate deltas
const deltas: DeltaSnapshot[] = [];
let salesCount = 0;
let restockCount = 0;
let totalRevenue = 0;
for (const product of products) {
const normalized = normalizeProduct(product, platform);
if (!normalized) continue;
const previous = previousStates.get(normalized.product_id) || null;
const delta = calculateDelta(normalized, previous, now);
if (delta) {
deltas.push(delta);
if (delta.change_type === 'sale') {
salesCount++;
totalRevenue += (delta.revenue_rec || 0) + (delta.revenue_med || 0);
} else if (delta.change_type === 'restock') {
restockCount++;
}
}
}
if (deltas.length === 0) {
return { total: products.length, changed: 0, sales: 0, restocks: 0, revenue: 0 };
}
// Bulk insert deltas
const values: any[] = [];
const placeholders: string[] = [];
let paramIndex = 1;
for (const d of deltas) {
placeholders.push(
`($${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++})`
);
values.push(
dispensaryId,
d.product_id,
platform,
d.quantity_available,
d.is_below_threshold,
d.status,
d.price_rec,
d.price_med,
d.brand_name,
d.category,
d.product_name,
d.prev_quantity,
d.prev_price_rec,
d.prev_price_med,
d.prev_status,
d.qty_delta,
d.price_delta,
d.change_type,
d.effective_price_rec,
d.effective_price_med,
d.revenue_rec,
d.revenue_med,
d.hours_since_last
);
}
const query = `
INSERT INTO inventory_snapshots (
dispensary_id,
product_id,
platform,
quantity_available,
is_below_threshold,
status,
price_rec,
price_med,
brand_name,
category,
product_name,
prev_quantity,
prev_price_rec,
prev_price_med,
prev_status,
qty_delta,
price_delta,
change_type,
effective_price_rec,
effective_price_med,
revenue_rec,
revenue_med,
hours_since_last
) VALUES ${placeholders.join(', ')}
`;
await pool.query(query, values);
return {
total: products.length,
changed: deltas.length,
sales: salesCount,
restocks: restockCount,
revenue: Math.round(totalRevenue * 100) / 100,
};
}
/**
* Get snapshot statistics for a dispensary
*/
export async function getSnapshotStats(
pool: Pool,
dispensaryId: number,
hours: number = 24
): Promise<{
totalSnapshots: number;
sales: number;
restocks: number;
priceChanges: number;
oosEvents: number;
revenue: number;
}> {
const result = await pool.query(
`
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE change_type = 'sale') as sales,
COUNT(*) FILTER (WHERE change_type = 'restock') as restocks,
COUNT(*) FILTER (WHERE change_type = 'price_change') as price_changes,
COUNT(*) FILTER (WHERE change_type = 'oos') as oos_events,
COALESCE(SUM(revenue_rec), 0) + COALESCE(SUM(revenue_med), 0) as revenue
FROM inventory_snapshots
WHERE dispensary_id = $1
AND captured_at >= NOW() - INTERVAL '1 hour' * $2
`,
[dispensaryId, hours]
);
const row = result.rows[0];
return {
totalSnapshots: parseInt(row.total),
sales: parseInt(row.sales),
restocks: parseInt(row.restocks),
priceChanges: parseInt(row.price_changes),
oosEvents: parseInt(row.oos_events),
revenue: parseFloat(row.revenue) || 0,
};
}