- Add brand promotional history endpoint (GET /api/analytics/v2/brand/:name/promotions) - Tracks when products go on special, duration, discounts, quantity sold estimates - Aggregates by category with frequency metrics (weekly/monthly) - Add quantity changes endpoint (GET /api/analytics/v2/store/:id/quantity-changes) - Filter by direction (increase/decrease/all) for sales vs restock estimation - Fix canonical-upsert to populate stock_quantity and total_quantity_available - Add API key edit functionality in admin UI - Edit allowed domains and IPs - Display domains in list view 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
888 lines
28 KiB
TypeScript
888 lines
28 KiB
TypeScript
/**
|
|
* Canonical Upsert Functions
|
|
*
|
|
* Upserts normalized data into canonical tables:
|
|
* - store_products
|
|
* - store_product_snapshots
|
|
* - brands
|
|
* - categories (future)
|
|
*/
|
|
|
|
import { Pool, PoolClient } from 'pg';
|
|
import {
|
|
NormalizedProduct,
|
|
NormalizedPricing,
|
|
NormalizedAvailability,
|
|
NormalizedBrand,
|
|
NormalizationResult,
|
|
} from './types';
|
|
import {
|
|
downloadProductImage,
|
|
ProductImageContext,
|
|
isImageStorageReady,
|
|
LocalImageSizes,
|
|
} from '../utils/image-storage';
|
|
|
|
const BATCH_SIZE = 100;
|
|
|
|
// ============================================================
|
|
// PRODUCT UPSERTS
|
|
// ============================================================
|
|
|
|
export interface NewProductInfo {
|
|
id: number; // store_products.id
|
|
externalProductId: string; // provider_product_id
|
|
name: string;
|
|
brandName: string | null;
|
|
primaryImageUrl: string | null;
|
|
hasLocalImage?: boolean; // True if local_image_path is already set
|
|
}
|
|
|
|
export interface UpsertProductsResult {
|
|
upserted: number;
|
|
new: number;
|
|
updated: number;
|
|
newProducts: NewProductInfo[]; // Details of newly created products
|
|
productsNeedingImages: NewProductInfo[]; // Products (new or updated) that need image downloads
|
|
}
|
|
|
|
/**
|
|
* Upsert products to store_products table
|
|
* Returns counts of new vs updated products
|
|
*/
|
|
export async function upsertStoreProducts(
|
|
pool: Pool,
|
|
products: NormalizedProduct[],
|
|
pricing: Map<string, NormalizedPricing>,
|
|
availability: Map<string, NormalizedAvailability>,
|
|
options: { dryRun?: boolean } = {}
|
|
): Promise<UpsertProductsResult> {
|
|
if (products.length === 0) {
|
|
return { upserted: 0, new: 0, updated: 0, newProducts: [], productsNeedingImages: [] };
|
|
}
|
|
|
|
const { dryRun = false } = options;
|
|
let newCount = 0;
|
|
let updatedCount = 0;
|
|
const newProducts: NewProductInfo[] = [];
|
|
const productsNeedingImages: NewProductInfo[] = [];
|
|
|
|
// Process in batches
|
|
for (let i = 0; i < products.length; i += BATCH_SIZE) {
|
|
const batch = products.slice(i, i + BATCH_SIZE);
|
|
|
|
if (dryRun) {
|
|
console.log(`[DryRun] Would upsert ${batch.length} products`);
|
|
continue;
|
|
}
|
|
|
|
const client = await pool.connect();
|
|
try {
|
|
await client.query('BEGIN');
|
|
|
|
for (const product of batch) {
|
|
const productPricing = pricing.get(product.externalProductId);
|
|
const productAvailability = availability.get(product.externalProductId);
|
|
|
|
const result = await client.query(
|
|
`INSERT INTO store_products (
|
|
dispensary_id, provider, provider_product_id, provider_brand_id,
|
|
name_raw, brand_name_raw, category_raw, subcategory_raw,
|
|
price_rec, price_med, price_rec_special, price_med_special,
|
|
is_on_special, discount_percent,
|
|
is_in_stock, stock_status, stock_quantity, total_quantity_available,
|
|
thc_percent, cbd_percent,
|
|
image_url,
|
|
first_seen_at, last_seen_at, updated_at
|
|
) VALUES (
|
|
$1, $2, $3, $4,
|
|
$5, $6, $7, $8,
|
|
$9, $10, $11, $12,
|
|
$13, $14,
|
|
$15, $16, $17, $17,
|
|
$18, $19,
|
|
$20,
|
|
NOW(), NOW(), NOW()
|
|
)
|
|
ON CONFLICT (dispensary_id, provider, provider_product_id)
|
|
DO UPDATE SET
|
|
name_raw = EXCLUDED.name_raw,
|
|
brand_name_raw = EXCLUDED.brand_name_raw,
|
|
category_raw = EXCLUDED.category_raw,
|
|
subcategory_raw = EXCLUDED.subcategory_raw,
|
|
price_rec = EXCLUDED.price_rec,
|
|
price_med = EXCLUDED.price_med,
|
|
price_rec_special = EXCLUDED.price_rec_special,
|
|
price_med_special = EXCLUDED.price_med_special,
|
|
is_on_special = EXCLUDED.is_on_special,
|
|
discount_percent = EXCLUDED.discount_percent,
|
|
is_in_stock = EXCLUDED.is_in_stock,
|
|
stock_status = EXCLUDED.stock_status,
|
|
stock_quantity = EXCLUDED.stock_quantity,
|
|
total_quantity_available = EXCLUDED.total_quantity_available,
|
|
thc_percent = EXCLUDED.thc_percent,
|
|
cbd_percent = EXCLUDED.cbd_percent,
|
|
image_url = EXCLUDED.image_url,
|
|
last_seen_at = NOW(),
|
|
updated_at = NOW()
|
|
RETURNING id, (xmax = 0) as is_new, (local_image_path IS NOT NULL) as has_local_image`,
|
|
[
|
|
product.dispensaryId,
|
|
product.platform,
|
|
product.externalProductId,
|
|
product.brandId,
|
|
product.name,
|
|
product.brandName,
|
|
product.category,
|
|
product.subcategory,
|
|
productPricing?.priceRec ? productPricing.priceRec / 100 : null,
|
|
productPricing?.priceMed ? productPricing.priceMed / 100 : null,
|
|
productPricing?.priceRecSpecial ? productPricing.priceRecSpecial / 100 : null,
|
|
productPricing?.priceMedSpecial ? productPricing.priceMedSpecial / 100 : null,
|
|
productPricing?.isOnSpecial || false,
|
|
productPricing?.discountPercent,
|
|
productAvailability?.inStock ?? true,
|
|
productAvailability?.stockStatus || 'unknown',
|
|
productAvailability?.quantity ?? null, // stock_quantity and total_quantity_available
|
|
// Clamp THC/CBD to valid percentage range (0-100) - some products report mg as %
|
|
product.thcPercent !== null && product.thcPercent <= 100 ? product.thcPercent : null,
|
|
product.cbdPercent !== null && product.cbdPercent <= 100 ? product.cbdPercent : null,
|
|
product.primaryImageUrl,
|
|
]
|
|
);
|
|
|
|
const row = result.rows[0];
|
|
const productInfo: NewProductInfo = {
|
|
id: row.id,
|
|
externalProductId: product.externalProductId,
|
|
name: product.name,
|
|
brandName: product.brandName,
|
|
primaryImageUrl: product.primaryImageUrl,
|
|
hasLocalImage: row.has_local_image,
|
|
};
|
|
|
|
if (row.is_new) {
|
|
newCount++;
|
|
// Track new products
|
|
newProducts.push(productInfo);
|
|
// New products always need images (if they have a source URL)
|
|
if (product.primaryImageUrl && !row.has_local_image) {
|
|
productsNeedingImages.push(productInfo);
|
|
}
|
|
} else {
|
|
updatedCount++;
|
|
// Updated products need images only if they don't have a local image yet
|
|
if (product.primaryImageUrl && !row.has_local_image) {
|
|
productsNeedingImages.push(productInfo);
|
|
}
|
|
}
|
|
}
|
|
|
|
await client.query('COMMIT');
|
|
} catch (error) {
|
|
await client.query('ROLLBACK');
|
|
throw error;
|
|
} finally {
|
|
client.release();
|
|
}
|
|
}
|
|
|
|
return {
|
|
upserted: newCount + updatedCount,
|
|
new: newCount,
|
|
updated: updatedCount,
|
|
newProducts,
|
|
productsNeedingImages,
|
|
};
|
|
}
|
|
|
|
// ============================================================
|
|
// SNAPSHOT CREATION
|
|
// ============================================================
|
|
|
|
export interface CreateSnapshotsResult {
|
|
created: number;
|
|
}
|
|
|
|
/**
|
|
* Create snapshots for all products in a crawl
|
|
*/
|
|
export async function createStoreProductSnapshots(
|
|
pool: Pool,
|
|
dispensaryId: number,
|
|
products: NormalizedProduct[],
|
|
pricing: Map<string, NormalizedPricing>,
|
|
availability: Map<string, NormalizedAvailability>,
|
|
crawlRunId: number | null,
|
|
options: { dryRun?: boolean } = {}
|
|
): Promise<CreateSnapshotsResult> {
|
|
if (products.length === 0) {
|
|
return { created: 0 };
|
|
}
|
|
|
|
const { dryRun = false } = options;
|
|
|
|
if (dryRun) {
|
|
console.log(`[DryRun] Would create ${products.length} snapshots`);
|
|
return { created: products.length };
|
|
}
|
|
|
|
let created = 0;
|
|
|
|
// Process in batches
|
|
for (let i = 0; i < products.length; i += BATCH_SIZE) {
|
|
const batch = products.slice(i, i + BATCH_SIZE);
|
|
|
|
const values: any[][] = [];
|
|
for (const product of batch) {
|
|
const productPricing = pricing.get(product.externalProductId);
|
|
const productAvailability = availability.get(product.externalProductId);
|
|
|
|
values.push([
|
|
dispensaryId,
|
|
product.platform,
|
|
product.externalProductId,
|
|
crawlRunId,
|
|
new Date(), // captured_at
|
|
product.name,
|
|
product.brandName,
|
|
product.category,
|
|
product.subcategory,
|
|
productPricing?.priceRec ? productPricing.priceRec / 100 : null,
|
|
productPricing?.priceMed ? productPricing.priceMed / 100 : null,
|
|
productPricing?.priceRecSpecial ? productPricing.priceRecSpecial / 100 : null,
|
|
productPricing?.priceMedSpecial ? productPricing.priceMedSpecial / 100 : null,
|
|
productPricing?.isOnSpecial || false,
|
|
productPricing?.discountPercent,
|
|
productAvailability?.inStock ?? true,
|
|
productAvailability?.quantity,
|
|
productAvailability?.stockStatus || 'unknown',
|
|
// Clamp THC/CBD to valid percentage range (0-100) - some products report mg as %
|
|
product.thcPercent !== null && product.thcPercent <= 100 ? product.thcPercent : null,
|
|
product.cbdPercent !== null && product.cbdPercent <= 100 ? product.cbdPercent : null,
|
|
product.primaryImageUrl,
|
|
JSON.stringify(product.rawProduct),
|
|
]);
|
|
}
|
|
|
|
// Build bulk insert query
|
|
const placeholders = values.map((_, idx) => {
|
|
const offset = idx * 22;
|
|
return `(${Array.from({ length: 22 }, (_, j) => `$${offset + j + 1}`).join(', ')})`;
|
|
}).join(', ');
|
|
|
|
await pool.query(
|
|
`INSERT INTO store_product_snapshots (
|
|
dispensary_id, provider, provider_product_id, crawl_run_id,
|
|
captured_at,
|
|
name_raw, brand_name_raw, category_raw, subcategory_raw,
|
|
price_rec, price_med, price_rec_special, price_med_special,
|
|
is_on_special, discount_percent,
|
|
is_in_stock, stock_quantity, stock_status,
|
|
thc_percent, cbd_percent,
|
|
image_url, raw_data
|
|
) VALUES ${placeholders}`,
|
|
values.flat()
|
|
);
|
|
|
|
created += batch.length;
|
|
}
|
|
|
|
return { created };
|
|
}
|
|
|
|
// ============================================================
|
|
// VARIANT UPSERTS
|
|
// ============================================================
|
|
|
|
export interface UpsertVariantsResult {
|
|
upserted: number;
|
|
new: number;
|
|
updated: number;
|
|
snapshotsCreated: number;
|
|
}
|
|
|
|
/**
|
|
* Extract variant data from raw Dutchie product
|
|
*/
|
|
function extractVariantsFromRaw(rawProduct: any): any[] {
|
|
const children = rawProduct?.POSMetaData?.children || [];
|
|
return children.map((child: any) => ({
|
|
option: child.option || child.key || '',
|
|
canonicalSku: child.canonicalSKU || null,
|
|
canonicalId: child.canonicalID || null,
|
|
canonicalName: child.canonicalName || null,
|
|
priceRec: child.recPrice || child.price || null,
|
|
priceMed: child.medPrice || null,
|
|
priceRecSpecial: child.recSpecialPrice || null,
|
|
priceMedSpecial: child.medSpecialPrice || null,
|
|
quantity: child.quantityAvailable ?? child.quantity ?? null,
|
|
inStock: (child.quantityAvailable ?? child.quantity ?? 0) > 0,
|
|
}));
|
|
}
|
|
|
|
/**
|
|
* Parse weight value and unit from option string
|
|
* e.g., "1g" -> { value: 1, unit: "g" }
|
|
* "3.5g" -> { value: 3.5, unit: "g" }
|
|
* "1/8oz" -> { value: 0.125, unit: "oz" }
|
|
*/
|
|
function parseWeight(option: string): { value: number | null; unit: string | null } {
|
|
if (!option) return { value: null, unit: null };
|
|
|
|
// Handle fractions like "1/8oz"
|
|
const fractionMatch = option.match(/^(\d+)\/(\d+)\s*(g|oz|mg|ml)?$/i);
|
|
if (fractionMatch) {
|
|
const value = parseInt(fractionMatch[1]) / parseInt(fractionMatch[2]);
|
|
return { value, unit: fractionMatch[3]?.toLowerCase() || 'oz' };
|
|
}
|
|
|
|
// Handle decimals like "3.5g" or "100mg"
|
|
const decimalMatch = option.match(/^([\d.]+)\s*(g|oz|mg|ml|each)?$/i);
|
|
if (decimalMatch) {
|
|
return {
|
|
value: parseFloat(decimalMatch[1]),
|
|
unit: decimalMatch[2]?.toLowerCase() || null
|
|
};
|
|
}
|
|
|
|
return { value: null, unit: null };
|
|
}
|
|
|
|
/**
|
|
* Upsert variants for products and create variant snapshots
|
|
*/
|
|
export async function upsertProductVariants(
|
|
pool: Pool,
|
|
dispensaryId: number,
|
|
products: NormalizedProduct[],
|
|
crawlRunId: number | null,
|
|
options: { dryRun?: boolean } = {}
|
|
): Promise<UpsertVariantsResult> {
|
|
if (products.length === 0) {
|
|
return { upserted: 0, new: 0, updated: 0, snapshotsCreated: 0 };
|
|
}
|
|
|
|
const { dryRun = false } = options;
|
|
let newCount = 0;
|
|
let updatedCount = 0;
|
|
let snapshotsCreated = 0;
|
|
|
|
for (const product of products) {
|
|
// Get the store_product_id for this product
|
|
const productResult = await pool.query(
|
|
`SELECT id FROM store_products
|
|
WHERE dispensary_id = $1 AND provider = $2 AND provider_product_id = $3`,
|
|
[dispensaryId, product.platform, product.externalProductId]
|
|
);
|
|
|
|
if (productResult.rows.length === 0) {
|
|
continue; // Product not found, skip variants
|
|
}
|
|
|
|
const storeProductId = productResult.rows[0].id;
|
|
const variants = extractVariantsFromRaw(product.rawProduct);
|
|
|
|
if (variants.length === 0) {
|
|
continue; // No variants to process
|
|
}
|
|
|
|
if (dryRun) {
|
|
console.log(`[DryRun] Would upsert ${variants.length} variants for product ${product.externalProductId}`);
|
|
continue;
|
|
}
|
|
|
|
for (const variant of variants) {
|
|
const { value: weightValue, unit: weightUnit } = parseWeight(variant.option);
|
|
const isOnSpecial = (variant.priceRecSpecial !== null && variant.priceRecSpecial < variant.priceRec) ||
|
|
(variant.priceMedSpecial !== null && variant.priceMedSpecial < variant.priceMed);
|
|
|
|
// Upsert variant
|
|
const variantResult = await pool.query(
|
|
`INSERT INTO product_variants (
|
|
store_product_id, dispensary_id,
|
|
option, canonical_sku, canonical_id, canonical_name,
|
|
price_rec, price_med, price_rec_special, price_med_special,
|
|
quantity, quantity_available, in_stock, is_on_special,
|
|
weight_value, weight_unit,
|
|
first_seen_at, last_seen_at, updated_at
|
|
) VALUES (
|
|
$1, $2,
|
|
$3, $4, $5, $6,
|
|
$7, $8, $9, $10,
|
|
$11, $11, $12, $13,
|
|
$14, $15,
|
|
NOW(), NOW(), NOW()
|
|
)
|
|
ON CONFLICT (store_product_id, option)
|
|
DO UPDATE SET
|
|
canonical_sku = COALESCE(EXCLUDED.canonical_sku, product_variants.canonical_sku),
|
|
canonical_id = COALESCE(EXCLUDED.canonical_id, product_variants.canonical_id),
|
|
canonical_name = COALESCE(EXCLUDED.canonical_name, product_variants.canonical_name),
|
|
price_rec = EXCLUDED.price_rec,
|
|
price_med = EXCLUDED.price_med,
|
|
price_rec_special = EXCLUDED.price_rec_special,
|
|
price_med_special = EXCLUDED.price_med_special,
|
|
quantity = EXCLUDED.quantity,
|
|
quantity_available = EXCLUDED.quantity_available,
|
|
in_stock = EXCLUDED.in_stock,
|
|
is_on_special = EXCLUDED.is_on_special,
|
|
weight_value = COALESCE(EXCLUDED.weight_value, product_variants.weight_value),
|
|
weight_unit = COALESCE(EXCLUDED.weight_unit, product_variants.weight_unit),
|
|
last_seen_at = NOW(),
|
|
last_price_change_at = CASE
|
|
WHEN product_variants.price_rec IS DISTINCT FROM EXCLUDED.price_rec
|
|
OR product_variants.price_rec_special IS DISTINCT FROM EXCLUDED.price_rec_special
|
|
THEN NOW()
|
|
ELSE product_variants.last_price_change_at
|
|
END,
|
|
last_stock_change_at = CASE
|
|
WHEN product_variants.quantity IS DISTINCT FROM EXCLUDED.quantity
|
|
THEN NOW()
|
|
ELSE product_variants.last_stock_change_at
|
|
END,
|
|
updated_at = NOW()
|
|
RETURNING id, (xmax = 0) as is_new`,
|
|
[
|
|
storeProductId, dispensaryId,
|
|
variant.option, variant.canonicalSku, variant.canonicalId, variant.canonicalName,
|
|
variant.priceRec, variant.priceMed, variant.priceRecSpecial, variant.priceMedSpecial,
|
|
variant.quantity, variant.inStock, isOnSpecial,
|
|
weightValue, weightUnit,
|
|
]
|
|
);
|
|
|
|
const variantId = variantResult.rows[0].id;
|
|
if (variantResult.rows[0]?.is_new) {
|
|
newCount++;
|
|
} else {
|
|
updatedCount++;
|
|
}
|
|
|
|
// Create variant snapshot
|
|
await pool.query(
|
|
`INSERT INTO product_variant_snapshots (
|
|
product_variant_id, store_product_id, dispensary_id, crawl_run_id,
|
|
option,
|
|
price_rec, price_med, price_rec_special, price_med_special,
|
|
quantity, in_stock, is_on_special,
|
|
captured_at
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, NOW())`,
|
|
[
|
|
variantId, storeProductId, dispensaryId, crawlRunId,
|
|
variant.option,
|
|
variant.priceRec, variant.priceMed, variant.priceRecSpecial, variant.priceMedSpecial,
|
|
variant.quantity, variant.inStock, isOnSpecial,
|
|
]
|
|
);
|
|
snapshotsCreated++;
|
|
}
|
|
}
|
|
|
|
return {
|
|
upserted: newCount + updatedCount,
|
|
new: newCount,
|
|
updated: updatedCount,
|
|
snapshotsCreated,
|
|
};
|
|
}
|
|
|
|
// ============================================================
|
|
// DISCONTINUED PRODUCTS
|
|
// ============================================================
|
|
|
|
/**
|
|
* Mark products as discontinued if they weren't in the current crawl
|
|
*/
|
|
export async function markDiscontinuedProducts(
|
|
pool: Pool,
|
|
dispensaryId: number,
|
|
currentProductIds: Set<string>,
|
|
platform: string,
|
|
crawlRunId: number | null,
|
|
options: { dryRun?: boolean } = {}
|
|
): Promise<number> {
|
|
const { dryRun = false } = options;
|
|
|
|
// Get all products for this dispensary/platform
|
|
const result = await pool.query(
|
|
`SELECT provider_product_id FROM store_products
|
|
WHERE dispensary_id = $1 AND provider = $2 AND is_in_stock = TRUE`,
|
|
[dispensaryId, platform]
|
|
);
|
|
|
|
const existingIds = result.rows.map((r: any) => r.provider_product_id);
|
|
const discontinuedIds = existingIds.filter((id: string) => !currentProductIds.has(id));
|
|
|
|
if (discontinuedIds.length === 0) {
|
|
return 0;
|
|
}
|
|
|
|
if (dryRun) {
|
|
console.log(`[DryRun] Would mark ${discontinuedIds.length} products as discontinued`);
|
|
return discontinuedIds.length;
|
|
}
|
|
|
|
// Update store_products to mark as out of stock
|
|
await pool.query(
|
|
`UPDATE store_products
|
|
SET is_in_stock = FALSE,
|
|
stock_status = 'discontinued',
|
|
updated_at = NOW()
|
|
WHERE dispensary_id = $1
|
|
AND provider = $2
|
|
AND provider_product_id = ANY($3)`,
|
|
[dispensaryId, platform, discontinuedIds]
|
|
);
|
|
|
|
// Create snapshots for discontinued products
|
|
for (const productId of discontinuedIds) {
|
|
await pool.query(
|
|
`INSERT INTO store_product_snapshots (
|
|
dispensary_id, provider, provider_product_id, crawl_run_id,
|
|
captured_at, is_in_stock, stock_status
|
|
)
|
|
SELECT
|
|
dispensary_id, provider, provider_product_id, $4,
|
|
NOW(), FALSE, 'discontinued'
|
|
FROM store_products
|
|
WHERE dispensary_id = $1 AND provider = $2 AND provider_product_id = $3`,
|
|
[dispensaryId, platform, productId, crawlRunId]
|
|
);
|
|
}
|
|
|
|
return discontinuedIds.length;
|
|
}
|
|
|
|
// ============================================================
|
|
// BRAND UPSERTS
|
|
// ============================================================
|
|
|
|
export interface UpsertBrandsResult {
|
|
upserted: number;
|
|
new: number;
|
|
}
|
|
|
|
/**
|
|
* Upsert brands to brands table
|
|
*/
|
|
export async function upsertBrands(
|
|
pool: Pool,
|
|
brands: NormalizedBrand[],
|
|
options: { dryRun?: boolean; skipIfExists?: boolean } = {}
|
|
): Promise<UpsertBrandsResult> {
|
|
if (brands.length === 0) {
|
|
return { upserted: 0, new: 0 };
|
|
}
|
|
|
|
const { dryRun = false, skipIfExists = true } = options;
|
|
|
|
if (dryRun) {
|
|
console.log(`[DryRun] Would upsert ${brands.length} brands`);
|
|
return { upserted: brands.length, new: 0 };
|
|
}
|
|
|
|
let newCount = 0;
|
|
|
|
for (const brand of brands) {
|
|
const result = await pool.query(
|
|
`INSERT INTO brands (name, slug, external_id, logo_url, created_at, updated_at)
|
|
VALUES ($1, $2, $3, $4, NOW(), NOW())
|
|
ON CONFLICT (slug) DO ${skipIfExists ? 'NOTHING' : 'UPDATE SET logo_url = COALESCE(EXCLUDED.logo_url, brands.logo_url), updated_at = NOW()'}
|
|
RETURNING (xmax = 0) as is_new`,
|
|
[brand.name, brand.slug, brand.externalBrandId, brand.logoUrl]
|
|
);
|
|
|
|
if (result.rows[0]?.is_new) {
|
|
newCount++;
|
|
}
|
|
}
|
|
|
|
return {
|
|
upserted: brands.length,
|
|
new: newCount,
|
|
};
|
|
}
|
|
|
|
// ============================================================
|
|
// FULL HYDRATION
|
|
// ============================================================
|
|
|
|
export interface ImageDownloadResult {
|
|
downloaded: number;
|
|
skipped: number;
|
|
failed: number;
|
|
bytesTotal: number;
|
|
}
|
|
|
|
export interface DispensaryContext {
|
|
stateCode: string;
|
|
storeSlug: string;
|
|
hasExistingProducts?: boolean; // True if store already has products with local images
|
|
}
|
|
|
|
export interface HydratePayloadResult {
|
|
productsUpserted: number;
|
|
productsNew: number;
|
|
productsUpdated: number;
|
|
productsDiscontinued: number;
|
|
snapshotsCreated: number;
|
|
brandsCreated: number;
|
|
variantsUpserted: number;
|
|
variantsNew: number;
|
|
variantSnapshotsCreated: number;
|
|
imagesDownloaded: number;
|
|
imagesSkipped: number;
|
|
imagesFailed: number;
|
|
imagesBytesTotal: number;
|
|
}
|
|
|
|
/**
|
|
* Helper to create slug from string
|
|
*/
|
|
function slugify(str: string): string {
|
|
return str
|
|
.toLowerCase()
|
|
.replace(/[^a-z0-9]+/g, '-')
|
|
.replace(/^-+|-+$/g, '')
|
|
.substring(0, 50) || 'unknown';
|
|
}
|
|
|
|
/**
|
|
* Download images for new products and update their local paths
|
|
*/
|
|
export async function downloadProductImages(
|
|
pool: Pool,
|
|
newProducts: NewProductInfo[],
|
|
dispensaryContext: DispensaryContext,
|
|
options: { dryRun?: boolean; concurrency?: number } = {}
|
|
): Promise<ImageDownloadResult> {
|
|
const { dryRun = false, concurrency = 5 } = options;
|
|
|
|
// Filter products that have images to download
|
|
const productsWithImages = newProducts.filter(p => p.primaryImageUrl);
|
|
|
|
if (productsWithImages.length === 0) {
|
|
return { downloaded: 0, skipped: 0, failed: 0, bytesTotal: 0 };
|
|
}
|
|
|
|
// Check if image storage is ready
|
|
if (!isImageStorageReady()) {
|
|
console.warn('[ImageDownload] Image storage not initialized, skipping downloads');
|
|
return { downloaded: 0, skipped: productsWithImages.length, failed: 0, bytesTotal: 0 };
|
|
}
|
|
|
|
if (dryRun) {
|
|
console.log(`[DryRun] Would download ${productsWithImages.length} images`);
|
|
return { downloaded: 0, skipped: productsWithImages.length, failed: 0, bytesTotal: 0 };
|
|
}
|
|
|
|
let downloaded = 0;
|
|
let skipped = 0;
|
|
let failed = 0;
|
|
let bytesTotal = 0;
|
|
|
|
// Process in batches with concurrency limit
|
|
for (let i = 0; i < productsWithImages.length; i += concurrency) {
|
|
const batch = productsWithImages.slice(i, i + concurrency);
|
|
|
|
const results = await Promise.allSettled(
|
|
batch.map(async (product) => {
|
|
const ctx: ProductImageContext = {
|
|
stateCode: dispensaryContext.stateCode,
|
|
storeSlug: dispensaryContext.storeSlug,
|
|
brandSlug: slugify(product.brandName || 'unknown'),
|
|
productId: product.externalProductId,
|
|
};
|
|
|
|
const result = await downloadProductImage(product.primaryImageUrl!, ctx, { skipIfExists: true });
|
|
|
|
if (result.success) {
|
|
// Update the database with local image path
|
|
const imagesJson = JSON.stringify({
|
|
full: result.urls!.full,
|
|
medium: result.urls!.medium,
|
|
thumb: result.urls!.thumb,
|
|
});
|
|
|
|
await pool.query(
|
|
`UPDATE store_products
|
|
SET local_image_path = $1, images = $2
|
|
WHERE id = $3`,
|
|
[result.urls!.full, imagesJson, product.id]
|
|
);
|
|
}
|
|
|
|
return result;
|
|
})
|
|
);
|
|
|
|
for (const result of results) {
|
|
if (result.status === 'fulfilled') {
|
|
const downloadResult = result.value;
|
|
if (downloadResult.success) {
|
|
if (downloadResult.skipped) {
|
|
skipped++;
|
|
} else {
|
|
downloaded++;
|
|
bytesTotal += downloadResult.bytesDownloaded || 0;
|
|
}
|
|
} else {
|
|
failed++;
|
|
console.warn(`[ImageDownload] Failed: ${downloadResult.error}`);
|
|
}
|
|
} else {
|
|
failed++;
|
|
console.error(`[ImageDownload] Error:`, result.reason);
|
|
}
|
|
}
|
|
}
|
|
|
|
console.log(`[ImageDownload] Downloaded: ${downloaded}, Skipped: ${skipped}, Failed: ${failed}, Bytes: ${bytesTotal}`);
|
|
return { downloaded, skipped, failed, bytesTotal };
|
|
}
|
|
|
|
/**
|
|
* Get dispensary context for image paths
|
|
* Also checks if this dispensary already has products with local images
|
|
* to skip unnecessary filesystem checks for existing stores
|
|
*/
|
|
async function getDispensaryContext(pool: Pool, dispensaryId: number): Promise<DispensaryContext | null> {
|
|
try {
|
|
const result = await pool.query(
|
|
`SELECT
|
|
d.state,
|
|
d.slug,
|
|
d.name,
|
|
EXISTS(
|
|
SELECT 1 FROM store_products sp
|
|
WHERE sp.dispensary_id = d.id
|
|
AND sp.local_image_path IS NOT NULL
|
|
LIMIT 1
|
|
) as has_local_images
|
|
FROM dispensaries d
|
|
WHERE d.id = $1`,
|
|
[dispensaryId]
|
|
);
|
|
|
|
if (result.rows.length === 0) {
|
|
return null;
|
|
}
|
|
|
|
const row = result.rows[0];
|
|
return {
|
|
stateCode: row.state || 'unknown',
|
|
storeSlug: row.slug || slugify(row.name || `store-${dispensaryId}`),
|
|
hasExistingProducts: row.has_local_images,
|
|
};
|
|
} catch (error) {
|
|
console.error('[getDispensaryContext] Error:', error);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Hydrate a complete normalization result into canonical tables
|
|
*/
|
|
export async function hydrateToCanonical(
|
|
pool: Pool,
|
|
dispensaryId: number,
|
|
normResult: NormalizationResult,
|
|
crawlRunId: number | null,
|
|
options: { dryRun?: boolean; downloadImages?: boolean } = {}
|
|
): Promise<HydratePayloadResult> {
|
|
const { dryRun = false, downloadImages: shouldDownloadImages = true } = options;
|
|
|
|
// 1. Upsert brands
|
|
const brandResult = await upsertBrands(pool, normResult.brands, { dryRun });
|
|
|
|
// 2. Upsert products
|
|
const productResult = await upsertStoreProducts(
|
|
pool,
|
|
normResult.products,
|
|
normResult.pricing,
|
|
normResult.availability,
|
|
{ dryRun }
|
|
);
|
|
|
|
// 3. Create product snapshots
|
|
const snapshotResult = await createStoreProductSnapshots(
|
|
pool,
|
|
dispensaryId,
|
|
normResult.products,
|
|
normResult.pricing,
|
|
normResult.availability,
|
|
crawlRunId,
|
|
{ dryRun }
|
|
);
|
|
|
|
// 4. Upsert variants and create variant snapshots
|
|
const variantResult = await upsertProductVariants(
|
|
pool,
|
|
dispensaryId,
|
|
normResult.products,
|
|
crawlRunId,
|
|
{ dryRun }
|
|
);
|
|
|
|
// 5. Mark discontinued products
|
|
const currentProductIds = new Set(
|
|
normResult.products.map((p) => p.externalProductId)
|
|
);
|
|
const platform = normResult.products[0]?.platform || 'dutchie';
|
|
const discontinuedCount = await markDiscontinuedProducts(
|
|
pool,
|
|
dispensaryId,
|
|
currentProductIds,
|
|
platform,
|
|
crawlRunId,
|
|
{ dryRun }
|
|
);
|
|
|
|
// 6. Download images for products that need them
|
|
// This includes:
|
|
// - New products (always need images)
|
|
// - Updated products that don't have local images yet (backfill)
|
|
// This avoids:
|
|
// - Filesystem checks for products that already have local images
|
|
// - Unnecessary HTTP requests for products with existing images
|
|
let imageResult: ImageDownloadResult = { downloaded: 0, skipped: 0, failed: 0, bytesTotal: 0 };
|
|
|
|
if (shouldDownloadImages && productResult.productsNeedingImages.length > 0) {
|
|
const dispensaryContext = await getDispensaryContext(pool, dispensaryId);
|
|
|
|
if (dispensaryContext) {
|
|
const newCount = productResult.productsNeedingImages.filter(p => !p.hasLocalImage).length;
|
|
const backfillCount = productResult.productsNeedingImages.length - newCount;
|
|
console.log(`[Hydration] Downloading images for ${productResult.productsNeedingImages.length} products (${productResult.new} new, ${backfillCount} backfill)...`);
|
|
imageResult = await downloadProductImages(
|
|
pool,
|
|
productResult.productsNeedingImages,
|
|
dispensaryContext,
|
|
{ dryRun }
|
|
);
|
|
} else {
|
|
console.warn(`[Hydration] Could not get dispensary context for ID ${dispensaryId}, skipping image downloads`);
|
|
}
|
|
} else if (productResult.productsNeedingImages.length === 0 && productResult.upserted > 0) {
|
|
// All products already have local images
|
|
console.log(`[Hydration] All ${productResult.upserted} products already have local images, skipping downloads`);
|
|
}
|
|
|
|
return {
|
|
productsUpserted: productResult.upserted,
|
|
productsNew: productResult.new,
|
|
productsUpdated: productResult.updated,
|
|
productsDiscontinued: discontinuedCount,
|
|
snapshotsCreated: snapshotResult.created,
|
|
brandsCreated: brandResult.new,
|
|
variantsUpserted: variantResult.upserted,
|
|
variantsNew: variantResult.new,
|
|
variantSnapshotsCreated: variantResult.snapshotsCreated,
|
|
imagesDownloaded: imageResult.downloaded,
|
|
imagesSkipped: imageResult.skipped,
|
|
imagesFailed: imageResult.failed,
|
|
imagesBytesTotal: imageResult.bytesTotal,
|
|
};
|
|
}
|