Files
cannaiq/backend/src/hydration/canonical-upsert.ts
Kelly e11400566e feat: Store full Dutchie payload in latest_raw_payload
Now stores the complete raw product JSON from Dutchie on every
product refresh. This enables querying any Dutchie field
(terpenes, effects, description, etc.) without schema changes.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-13 02:31:00 -07:00

890 lines
29 KiB
TypeScript

/**
* Canonical Upsert Functions
*
* Upserts normalized data into canonical tables:
* - store_products
* - store_product_snapshots
* - brands
* - categories (future)
*/
import { Pool, PoolClient } from 'pg';
import {
NormalizedProduct,
NormalizedPricing,
NormalizedAvailability,
NormalizedBrand,
NormalizationResult,
} from './types';
import {
downloadProductImage,
ProductImageContext,
isImageStorageReady,
LocalImageSizes,
} from '../utils/image-storage';
const BATCH_SIZE = 100;
// ============================================================
// PRODUCT UPSERTS
// ============================================================
export interface NewProductInfo {
id: number; // store_products.id
externalProductId: string; // provider_product_id
name: string;
brandName: string | null;
primaryImageUrl: string | null;
hasLocalImage?: boolean; // True if local_image_path is already set
}
export interface UpsertProductsResult {
upserted: number;
new: number;
updated: number;
newProducts: NewProductInfo[]; // Details of newly created products
productsNeedingImages: NewProductInfo[]; // Products (new or updated) that need image downloads
}
/**
* Upsert products to store_products table
* Returns counts of new vs updated products
*/
export async function upsertStoreProducts(
pool: Pool,
products: NormalizedProduct[],
pricing: Map<string, NormalizedPricing>,
availability: Map<string, NormalizedAvailability>,
options: { dryRun?: boolean } = {}
): Promise<UpsertProductsResult> {
if (products.length === 0) {
return { upserted: 0, new: 0, updated: 0, newProducts: [], productsNeedingImages: [] };
}
const { dryRun = false } = options;
let newCount = 0;
let updatedCount = 0;
const newProducts: NewProductInfo[] = [];
const productsNeedingImages: NewProductInfo[] = [];
// Process in batches
for (let i = 0; i < products.length; i += BATCH_SIZE) {
const batch = products.slice(i, i + BATCH_SIZE);
if (dryRun) {
console.log(`[DryRun] Would upsert ${batch.length} products`);
continue;
}
const client = await pool.connect();
try {
await client.query('BEGIN');
for (const product of batch) {
const productPricing = pricing.get(product.externalProductId);
const productAvailability = availability.get(product.externalProductId);
const result = await client.query(
`INSERT INTO store_products (
dispensary_id, provider, provider_product_id, provider_brand_id,
name_raw, brand_name_raw, category_raw, subcategory_raw,
price_rec, price_med, price_rec_special, price_med_special,
is_on_special, discount_percent,
is_in_stock, stock_status, stock_quantity, total_quantity_available,
thc_percent, cbd_percent,
image_url, latest_raw_payload,
first_seen_at, last_seen_at, updated_at
) VALUES (
$1, $2, $3, $4,
$5, $6, $7, $8,
$9, $10, $11, $12,
$13, $14,
$15, $16, $17, $17,
$18, $19,
$20, $21,
NOW(), NOW(), NOW()
)
ON CONFLICT (dispensary_id, provider, provider_product_id)
DO UPDATE SET
name_raw = EXCLUDED.name_raw,
brand_name_raw = EXCLUDED.brand_name_raw,
category_raw = EXCLUDED.category_raw,
subcategory_raw = EXCLUDED.subcategory_raw,
price_rec = EXCLUDED.price_rec,
price_med = EXCLUDED.price_med,
price_rec_special = EXCLUDED.price_rec_special,
price_med_special = EXCLUDED.price_med_special,
is_on_special = EXCLUDED.is_on_special,
discount_percent = EXCLUDED.discount_percent,
is_in_stock = EXCLUDED.is_in_stock,
stock_status = EXCLUDED.stock_status,
stock_quantity = EXCLUDED.stock_quantity,
total_quantity_available = EXCLUDED.total_quantity_available,
thc_percent = EXCLUDED.thc_percent,
cbd_percent = EXCLUDED.cbd_percent,
image_url = EXCLUDED.image_url,
latest_raw_payload = EXCLUDED.latest_raw_payload,
last_seen_at = NOW(),
updated_at = NOW()
RETURNING id, (xmax = 0) as is_new, (local_image_path IS NOT NULL) as has_local_image`,
[
product.dispensaryId,
product.platform,
product.externalProductId,
product.brandId,
product.name,
product.brandName,
product.category,
product.subcategory,
productPricing?.priceRec ? productPricing.priceRec / 100 : null,
productPricing?.priceMed ? productPricing.priceMed / 100 : null,
productPricing?.priceRecSpecial ? productPricing.priceRecSpecial / 100 : null,
productPricing?.priceMedSpecial ? productPricing.priceMedSpecial / 100 : null,
productPricing?.isOnSpecial || false,
productPricing?.discountPercent,
productAvailability?.inStock ?? true,
productAvailability?.stockStatus || 'unknown',
productAvailability?.quantity ?? null, // stock_quantity and total_quantity_available
// Clamp THC/CBD to valid percentage range (0-100) - some products report mg as %
product.thcPercent !== null && product.thcPercent <= 100 ? product.thcPercent : null,
product.cbdPercent !== null && product.cbdPercent <= 100 ? product.cbdPercent : null,
product.primaryImageUrl,
JSON.stringify(product.rawProduct), // Full Dutchie payload
]
);
const row = result.rows[0];
const productInfo: NewProductInfo = {
id: row.id,
externalProductId: product.externalProductId,
name: product.name,
brandName: product.brandName,
primaryImageUrl: product.primaryImageUrl,
hasLocalImage: row.has_local_image,
};
if (row.is_new) {
newCount++;
// Track new products
newProducts.push(productInfo);
// New products always need images (if they have a source URL)
if (product.primaryImageUrl && !row.has_local_image) {
productsNeedingImages.push(productInfo);
}
} else {
updatedCount++;
// Updated products need images only if they don't have a local image yet
if (product.primaryImageUrl && !row.has_local_image) {
productsNeedingImages.push(productInfo);
}
}
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
return {
upserted: newCount + updatedCount,
new: newCount,
updated: updatedCount,
newProducts,
productsNeedingImages,
};
}
// ============================================================
// SNAPSHOT CREATION
// ============================================================
export interface CreateSnapshotsResult {
created: number;
}
/**
* Create snapshots for all products in a crawl
*/
export async function createStoreProductSnapshots(
pool: Pool,
dispensaryId: number,
products: NormalizedProduct[],
pricing: Map<string, NormalizedPricing>,
availability: Map<string, NormalizedAvailability>,
crawlRunId: number | null,
options: { dryRun?: boolean } = {}
): Promise<CreateSnapshotsResult> {
if (products.length === 0) {
return { created: 0 };
}
const { dryRun = false } = options;
if (dryRun) {
console.log(`[DryRun] Would create ${products.length} snapshots`);
return { created: products.length };
}
let created = 0;
// Process in batches
for (let i = 0; i < products.length; i += BATCH_SIZE) {
const batch = products.slice(i, i + BATCH_SIZE);
const values: any[][] = [];
for (const product of batch) {
const productPricing = pricing.get(product.externalProductId);
const productAvailability = availability.get(product.externalProductId);
values.push([
dispensaryId,
product.platform,
product.externalProductId,
crawlRunId,
new Date(), // captured_at
product.name,
product.brandName,
product.category,
product.subcategory,
productPricing?.priceRec ? productPricing.priceRec / 100 : null,
productPricing?.priceMed ? productPricing.priceMed / 100 : null,
productPricing?.priceRecSpecial ? productPricing.priceRecSpecial / 100 : null,
productPricing?.priceMedSpecial ? productPricing.priceMedSpecial / 100 : null,
productPricing?.isOnSpecial || false,
productPricing?.discountPercent,
productAvailability?.inStock ?? true,
productAvailability?.quantity,
productAvailability?.stockStatus || 'unknown',
// Clamp THC/CBD to valid percentage range (0-100) - some products report mg as %
product.thcPercent !== null && product.thcPercent <= 100 ? product.thcPercent : null,
product.cbdPercent !== null && product.cbdPercent <= 100 ? product.cbdPercent : null,
product.primaryImageUrl,
JSON.stringify(product.rawProduct),
]);
}
// Build bulk insert query
const placeholders = values.map((_, idx) => {
const offset = idx * 22;
return `(${Array.from({ length: 22 }, (_, j) => `$${offset + j + 1}`).join(', ')})`;
}).join(', ');
await pool.query(
`INSERT INTO store_product_snapshots (
dispensary_id, provider, provider_product_id, crawl_run_id,
captured_at,
name_raw, brand_name_raw, category_raw, subcategory_raw,
price_rec, price_med, price_rec_special, price_med_special,
is_on_special, discount_percent,
is_in_stock, stock_quantity, stock_status,
thc_percent, cbd_percent,
image_url, raw_data
) VALUES ${placeholders}`,
values.flat()
);
created += batch.length;
}
return { created };
}
// ============================================================
// VARIANT UPSERTS
// ============================================================
export interface UpsertVariantsResult {
upserted: number;
new: number;
updated: number;
snapshotsCreated: number;
}
/**
* Extract variant data from raw Dutchie product
*/
function extractVariantsFromRaw(rawProduct: any): any[] {
const children = rawProduct?.POSMetaData?.children || [];
return children.map((child: any) => ({
option: child.option || child.key || '',
canonicalSku: child.canonicalSKU || null,
canonicalId: child.canonicalID || null,
canonicalName: child.canonicalName || null,
priceRec: child.recPrice || child.price || null,
priceMed: child.medPrice || null,
priceRecSpecial: child.recSpecialPrice || null,
priceMedSpecial: child.medSpecialPrice || null,
quantity: child.quantityAvailable ?? child.quantity ?? null,
inStock: (child.quantityAvailable ?? child.quantity ?? 0) > 0,
}));
}
/**
* Parse weight value and unit from option string
* e.g., "1g" -> { value: 1, unit: "g" }
* "3.5g" -> { value: 3.5, unit: "g" }
* "1/8oz" -> { value: 0.125, unit: "oz" }
*/
function parseWeight(option: string): { value: number | null; unit: string | null } {
if (!option) return { value: null, unit: null };
// Handle fractions like "1/8oz"
const fractionMatch = option.match(/^(\d+)\/(\d+)\s*(g|oz|mg|ml)?$/i);
if (fractionMatch) {
const value = parseInt(fractionMatch[1]) / parseInt(fractionMatch[2]);
return { value, unit: fractionMatch[3]?.toLowerCase() || 'oz' };
}
// Handle decimals like "3.5g" or "100mg"
const decimalMatch = option.match(/^([\d.]+)\s*(g|oz|mg|ml|each)?$/i);
if (decimalMatch) {
return {
value: parseFloat(decimalMatch[1]),
unit: decimalMatch[2]?.toLowerCase() || null
};
}
return { value: null, unit: null };
}
/**
* Upsert variants for products and create variant snapshots
*/
export async function upsertProductVariants(
pool: Pool,
dispensaryId: number,
products: NormalizedProduct[],
crawlRunId: number | null,
options: { dryRun?: boolean } = {}
): Promise<UpsertVariantsResult> {
if (products.length === 0) {
return { upserted: 0, new: 0, updated: 0, snapshotsCreated: 0 };
}
const { dryRun = false } = options;
let newCount = 0;
let updatedCount = 0;
let snapshotsCreated = 0;
for (const product of products) {
// Get the store_product_id for this product
const productResult = await pool.query(
`SELECT id FROM store_products
WHERE dispensary_id = $1 AND provider = $2 AND provider_product_id = $3`,
[dispensaryId, product.platform, product.externalProductId]
);
if (productResult.rows.length === 0) {
continue; // Product not found, skip variants
}
const storeProductId = productResult.rows[0].id;
const variants = extractVariantsFromRaw(product.rawProduct);
if (variants.length === 0) {
continue; // No variants to process
}
if (dryRun) {
console.log(`[DryRun] Would upsert ${variants.length} variants for product ${product.externalProductId}`);
continue;
}
for (const variant of variants) {
const { value: weightValue, unit: weightUnit } = parseWeight(variant.option);
const isOnSpecial = (variant.priceRecSpecial !== null && variant.priceRecSpecial < variant.priceRec) ||
(variant.priceMedSpecial !== null && variant.priceMedSpecial < variant.priceMed);
// Upsert variant
const variantResult = await pool.query(
`INSERT INTO product_variants (
store_product_id, dispensary_id,
option, canonical_sku, canonical_id, canonical_name,
price_rec, price_med, price_rec_special, price_med_special,
quantity, quantity_available, in_stock, is_on_special,
weight_value, weight_unit,
first_seen_at, last_seen_at, updated_at
) VALUES (
$1, $2,
$3, $4, $5, $6,
$7, $8, $9, $10,
$11, $11, $12, $13,
$14, $15,
NOW(), NOW(), NOW()
)
ON CONFLICT (store_product_id, option)
DO UPDATE SET
canonical_sku = COALESCE(EXCLUDED.canonical_sku, product_variants.canonical_sku),
canonical_id = COALESCE(EXCLUDED.canonical_id, product_variants.canonical_id),
canonical_name = COALESCE(EXCLUDED.canonical_name, product_variants.canonical_name),
price_rec = EXCLUDED.price_rec,
price_med = EXCLUDED.price_med,
price_rec_special = EXCLUDED.price_rec_special,
price_med_special = EXCLUDED.price_med_special,
quantity = EXCLUDED.quantity,
quantity_available = EXCLUDED.quantity_available,
in_stock = EXCLUDED.in_stock,
is_on_special = EXCLUDED.is_on_special,
weight_value = COALESCE(EXCLUDED.weight_value, product_variants.weight_value),
weight_unit = COALESCE(EXCLUDED.weight_unit, product_variants.weight_unit),
last_seen_at = NOW(),
last_price_change_at = CASE
WHEN product_variants.price_rec IS DISTINCT FROM EXCLUDED.price_rec
OR product_variants.price_rec_special IS DISTINCT FROM EXCLUDED.price_rec_special
THEN NOW()
ELSE product_variants.last_price_change_at
END,
last_stock_change_at = CASE
WHEN product_variants.quantity IS DISTINCT FROM EXCLUDED.quantity
THEN NOW()
ELSE product_variants.last_stock_change_at
END,
updated_at = NOW()
RETURNING id, (xmax = 0) as is_new`,
[
storeProductId, dispensaryId,
variant.option, variant.canonicalSku, variant.canonicalId, variant.canonicalName,
variant.priceRec, variant.priceMed, variant.priceRecSpecial, variant.priceMedSpecial,
variant.quantity, variant.inStock, isOnSpecial,
weightValue, weightUnit,
]
);
const variantId = variantResult.rows[0].id;
if (variantResult.rows[0]?.is_new) {
newCount++;
} else {
updatedCount++;
}
// Create variant snapshot
await pool.query(
`INSERT INTO product_variant_snapshots (
product_variant_id, store_product_id, dispensary_id, crawl_run_id,
option,
price_rec, price_med, price_rec_special, price_med_special,
quantity, in_stock, is_on_special,
captured_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, NOW())`,
[
variantId, storeProductId, dispensaryId, crawlRunId,
variant.option,
variant.priceRec, variant.priceMed, variant.priceRecSpecial, variant.priceMedSpecial,
variant.quantity, variant.inStock, isOnSpecial,
]
);
snapshotsCreated++;
}
}
return {
upserted: newCount + updatedCount,
new: newCount,
updated: updatedCount,
snapshotsCreated,
};
}
// ============================================================
// DISCONTINUED PRODUCTS
// ============================================================
/**
* Mark products as discontinued if they weren't in the current crawl
*/
export async function markDiscontinuedProducts(
pool: Pool,
dispensaryId: number,
currentProductIds: Set<string>,
platform: string,
crawlRunId: number | null,
options: { dryRun?: boolean } = {}
): Promise<number> {
const { dryRun = false } = options;
// Get all products for this dispensary/platform
const result = await pool.query(
`SELECT provider_product_id FROM store_products
WHERE dispensary_id = $1 AND provider = $2 AND is_in_stock = TRUE`,
[dispensaryId, platform]
);
const existingIds = result.rows.map((r: any) => r.provider_product_id);
const discontinuedIds = existingIds.filter((id: string) => !currentProductIds.has(id));
if (discontinuedIds.length === 0) {
return 0;
}
if (dryRun) {
console.log(`[DryRun] Would mark ${discontinuedIds.length} products as discontinued`);
return discontinuedIds.length;
}
// Update store_products to mark as out of stock
await pool.query(
`UPDATE store_products
SET is_in_stock = FALSE,
stock_status = 'discontinued',
updated_at = NOW()
WHERE dispensary_id = $1
AND provider = $2
AND provider_product_id = ANY($3)`,
[dispensaryId, platform, discontinuedIds]
);
// Create snapshots for discontinued products
for (const productId of discontinuedIds) {
await pool.query(
`INSERT INTO store_product_snapshots (
dispensary_id, provider, provider_product_id, crawl_run_id,
captured_at, is_in_stock, stock_status
)
SELECT
dispensary_id, provider, provider_product_id, $4,
NOW(), FALSE, 'discontinued'
FROM store_products
WHERE dispensary_id = $1 AND provider = $2 AND provider_product_id = $3`,
[dispensaryId, platform, productId, crawlRunId]
);
}
return discontinuedIds.length;
}
// ============================================================
// BRAND UPSERTS
// ============================================================
export interface UpsertBrandsResult {
upserted: number;
new: number;
}
/**
* Upsert brands to brands table
*/
export async function upsertBrands(
pool: Pool,
brands: NormalizedBrand[],
options: { dryRun?: boolean; skipIfExists?: boolean } = {}
): Promise<UpsertBrandsResult> {
if (brands.length === 0) {
return { upserted: 0, new: 0 };
}
const { dryRun = false, skipIfExists = true } = options;
if (dryRun) {
console.log(`[DryRun] Would upsert ${brands.length} brands`);
return { upserted: brands.length, new: 0 };
}
let newCount = 0;
for (const brand of brands) {
const result = await pool.query(
`INSERT INTO brands (name, slug, external_id, logo_url, created_at, updated_at)
VALUES ($1, $2, $3, $4, NOW(), NOW())
ON CONFLICT (slug) DO ${skipIfExists ? 'NOTHING' : 'UPDATE SET logo_url = COALESCE(EXCLUDED.logo_url, brands.logo_url), updated_at = NOW()'}
RETURNING (xmax = 0) as is_new`,
[brand.name, brand.slug, brand.externalBrandId, brand.logoUrl]
);
if (result.rows[0]?.is_new) {
newCount++;
}
}
return {
upserted: brands.length,
new: newCount,
};
}
// ============================================================
// FULL HYDRATION
// ============================================================
export interface ImageDownloadResult {
downloaded: number;
skipped: number;
failed: number;
bytesTotal: number;
}
export interface DispensaryContext {
stateCode: string;
storeSlug: string;
hasExistingProducts?: boolean; // True if store already has products with local images
}
export interface HydratePayloadResult {
productsUpserted: number;
productsNew: number;
productsUpdated: number;
productsDiscontinued: number;
snapshotsCreated: number;
brandsCreated: number;
variantsUpserted: number;
variantsNew: number;
variantSnapshotsCreated: number;
imagesDownloaded: number;
imagesSkipped: number;
imagesFailed: number;
imagesBytesTotal: number;
}
/**
* Helper to create slug from string
*/
function slugify(str: string): string {
return str
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-+|-+$/g, '')
.substring(0, 50) || 'unknown';
}
/**
* Download images for new products and update their local paths
*/
export async function downloadProductImages(
pool: Pool,
newProducts: NewProductInfo[],
dispensaryContext: DispensaryContext,
options: { dryRun?: boolean; concurrency?: number } = {}
): Promise<ImageDownloadResult> {
const { dryRun = false, concurrency = 5 } = options;
// Filter products that have images to download
const productsWithImages = newProducts.filter(p => p.primaryImageUrl);
if (productsWithImages.length === 0) {
return { downloaded: 0, skipped: 0, failed: 0, bytesTotal: 0 };
}
// Check if image storage is ready
if (!isImageStorageReady()) {
console.warn('[ImageDownload] Image storage not initialized, skipping downloads');
return { downloaded: 0, skipped: productsWithImages.length, failed: 0, bytesTotal: 0 };
}
if (dryRun) {
console.log(`[DryRun] Would download ${productsWithImages.length} images`);
return { downloaded: 0, skipped: productsWithImages.length, failed: 0, bytesTotal: 0 };
}
let downloaded = 0;
let skipped = 0;
let failed = 0;
let bytesTotal = 0;
// Process in batches with concurrency limit
for (let i = 0; i < productsWithImages.length; i += concurrency) {
const batch = productsWithImages.slice(i, i + concurrency);
const results = await Promise.allSettled(
batch.map(async (product) => {
const ctx: ProductImageContext = {
stateCode: dispensaryContext.stateCode,
storeSlug: dispensaryContext.storeSlug,
brandSlug: slugify(product.brandName || 'unknown'),
productId: product.externalProductId,
};
const result = await downloadProductImage(product.primaryImageUrl!, ctx, { skipIfExists: true });
if (result.success) {
// Update the database with local image path
const imagesJson = JSON.stringify({
full: result.urls!.full,
medium: result.urls!.medium,
thumb: result.urls!.thumb,
});
await pool.query(
`UPDATE store_products
SET local_image_path = $1, images = $2
WHERE id = $3`,
[result.urls!.full, imagesJson, product.id]
);
}
return result;
})
);
for (const result of results) {
if (result.status === 'fulfilled') {
const downloadResult = result.value;
if (downloadResult.success) {
if (downloadResult.skipped) {
skipped++;
} else {
downloaded++;
bytesTotal += downloadResult.bytesDownloaded || 0;
}
} else {
failed++;
console.warn(`[ImageDownload] Failed: ${downloadResult.error}`);
}
} else {
failed++;
console.error(`[ImageDownload] Error:`, result.reason);
}
}
}
console.log(`[ImageDownload] Downloaded: ${downloaded}, Skipped: ${skipped}, Failed: ${failed}, Bytes: ${bytesTotal}`);
return { downloaded, skipped, failed, bytesTotal };
}
/**
* Get dispensary context for image paths
* Also checks if this dispensary already has products with local images
* to skip unnecessary filesystem checks for existing stores
*/
async function getDispensaryContext(pool: Pool, dispensaryId: number): Promise<DispensaryContext | null> {
try {
const result = await pool.query(
`SELECT
d.state,
d.slug,
d.name,
EXISTS(
SELECT 1 FROM store_products sp
WHERE sp.dispensary_id = d.id
AND sp.local_image_path IS NOT NULL
LIMIT 1
) as has_local_images
FROM dispensaries d
WHERE d.id = $1`,
[dispensaryId]
);
if (result.rows.length === 0) {
return null;
}
const row = result.rows[0];
return {
stateCode: row.state || 'unknown',
storeSlug: row.slug || slugify(row.name || `store-${dispensaryId}`),
hasExistingProducts: row.has_local_images,
};
} catch (error) {
console.error('[getDispensaryContext] Error:', error);
return null;
}
}
/**
* Hydrate a complete normalization result into canonical tables
*/
export async function hydrateToCanonical(
pool: Pool,
dispensaryId: number,
normResult: NormalizationResult,
crawlRunId: number | null,
options: { dryRun?: boolean; downloadImages?: boolean } = {}
): Promise<HydratePayloadResult> {
const { dryRun = false, downloadImages: shouldDownloadImages = true } = options;
// 1. Upsert brands
const brandResult = await upsertBrands(pool, normResult.brands, { dryRun });
// 2. Upsert products
const productResult = await upsertStoreProducts(
pool,
normResult.products,
normResult.pricing,
normResult.availability,
{ dryRun }
);
// 3. Create product snapshots
const snapshotResult = await createStoreProductSnapshots(
pool,
dispensaryId,
normResult.products,
normResult.pricing,
normResult.availability,
crawlRunId,
{ dryRun }
);
// 4. Upsert variants and create variant snapshots
const variantResult = await upsertProductVariants(
pool,
dispensaryId,
normResult.products,
crawlRunId,
{ dryRun }
);
// 5. Mark discontinued products
const currentProductIds = new Set(
normResult.products.map((p) => p.externalProductId)
);
const platform = normResult.products[0]?.platform || 'dutchie';
const discontinuedCount = await markDiscontinuedProducts(
pool,
dispensaryId,
currentProductIds,
platform,
crawlRunId,
{ dryRun }
);
// 6. Download images for products that need them
// This includes:
// - New products (always need images)
// - Updated products that don't have local images yet (backfill)
// This avoids:
// - Filesystem checks for products that already have local images
// - Unnecessary HTTP requests for products with existing images
let imageResult: ImageDownloadResult = { downloaded: 0, skipped: 0, failed: 0, bytesTotal: 0 };
if (shouldDownloadImages && productResult.productsNeedingImages.length > 0) {
const dispensaryContext = await getDispensaryContext(pool, dispensaryId);
if (dispensaryContext) {
const newCount = productResult.productsNeedingImages.filter(p => !p.hasLocalImage).length;
const backfillCount = productResult.productsNeedingImages.length - newCount;
console.log(`[Hydration] Downloading images for ${productResult.productsNeedingImages.length} products (${productResult.new} new, ${backfillCount} backfill)...`);
imageResult = await downloadProductImages(
pool,
productResult.productsNeedingImages,
dispensaryContext,
{ dryRun }
);
} else {
console.warn(`[Hydration] Could not get dispensary context for ID ${dispensaryId}, skipping image downloads`);
}
} else if (productResult.productsNeedingImages.length === 0 && productResult.upserted > 0) {
// All products already have local images
console.log(`[Hydration] All ${productResult.upserted} products already have local images, skipping downloads`);
}
return {
productsUpserted: productResult.upserted,
productsNew: productResult.new,
productsUpdated: productResult.updated,
productsDiscontinued: discontinuedCount,
snapshotsCreated: snapshotResult.created,
brandsCreated: brandResult.new,
variantsUpserted: variantResult.upserted,
variantsNew: variantResult.new,
variantSnapshotsCreated: variantResult.snapshotsCreated,
imagesDownloaded: imageResult.downloaded,
imagesSkipped: imageResult.skipped,
imagesFailed: imageResult.failed,
imagesBytesTotal: imageResult.bytesTotal,
};
}