/** * Canonical Database Pipeline * * Writes scraped products to the canonical tables: * - store_products (current state) * - store_product_snapshots (historical) * - product_variants (per-weight pricing) * - product_variant_snapshots (variant history) * * This replaces the legacy DatabasePipeline that wrote to `products` table. */ import { ItemPipeline, Product } from './types'; import { logger } from '../services/logger'; import { pool } from '../db/pool'; import { v4 as uuidv4 } from 'uuid'; interface VariantData { option: string; priceRec: number | null; priceMed: number | null; priceRecSpecial: number | null; priceMedSpecial: number | null; quantity: number | null; inStock: boolean; isOnSpecial: boolean; } /** * Parse weight string like "1g", "3.5g", "1/8oz" into value and unit */ function parseWeight(option: string): { value: number | null; unit: string | null } { if (!option) return { value: null, unit: null }; // Match patterns like "1g", "3.5g", "1/8oz", "100mg" const match = option.match(/^([\d.\/]+)\s*(g|oz|mg|ml|each|pk|ct)?$/i); if (!match) return { value: null, unit: null }; let value: number | null = null; const rawValue = match[1]; const unit = match[2]?.toLowerCase() || null; // Handle fractions like "1/8" if (rawValue.includes('/')) { const [num, denom] = rawValue.split('/'); value = parseFloat(num) / parseFloat(denom); } else { value = parseFloat(rawValue); } if (isNaN(value)) value = null; return { value, unit }; } /** * Canonical Database Pipeline - saves items to canonical tables * * TABLES: * - store_products: Current product state per store * - store_product_snapshots: Historical snapshot per crawl * - product_variants: Current variant state (per-weight pricing) * - product_variant_snapshots: Historical variant snapshots */ export class CanonicalDatabasePipeline implements ItemPipeline { name = 'CanonicalDatabasePipeline'; priority = 10; // Low priority - runs last private crawlRunId: number | null = null; setCrawlRunId(id: number): void { this.crawlRunId = id; } async process(item: Product, spider: string): Promise { const client = await pool.connect(); try { // Extract metadata set by spider const dispensaryId = (item as any).dispensaryId; const categoryName = (item as any).categoryName; const variants: VariantData[] = (item as any).variants || []; if (!dispensaryId) { logger.error('pipeline', `Missing dispensaryId for ${item.name}`); return null; } const externalProductId = item.dutchieProductId || null; const provider = 'dutchie'; // Determine stock status const isInStock = (item as any).inStock !== false; const stockQuantity = (item as any).stockQuantity || null; // Extract pricing const priceRec = item.price || null; const priceMed = (item as any).priceMed || null; let storeProductId: number | null = null; let isNewProduct = false; // ============================================================ // UPSERT store_products // ============================================================ const upsertResult = await client.query(` INSERT INTO store_products ( dispensary_id, provider, provider_product_id, name_raw, brand_name_raw, category_raw, price_rec, price_med, thc_percent, cbd_percent, is_in_stock, stock_quantity, image_url, source_url, raw_data, first_seen_at, last_seen_at, created_at, updated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, NOW(), NOW(), NOW(), NOW() ) ON CONFLICT (dispensary_id, provider, provider_product_id) DO UPDATE SET name_raw = EXCLUDED.name_raw, brand_name_raw = EXCLUDED.brand_name_raw, category_raw = EXCLUDED.category_raw, price_rec = EXCLUDED.price_rec, price_med = EXCLUDED.price_med, thc_percent = EXCLUDED.thc_percent, cbd_percent = EXCLUDED.cbd_percent, is_in_stock = EXCLUDED.is_in_stock, stock_quantity = EXCLUDED.stock_quantity, image_url = COALESCE(EXCLUDED.image_url, store_products.image_url), source_url = EXCLUDED.source_url, raw_data = EXCLUDED.raw_data, last_seen_at = NOW(), updated_at = NOW() RETURNING id, (xmax = 0) as is_new `, [ dispensaryId, provider, externalProductId, item.name, item.brand || null, categoryName || null, priceRec, priceMed, item.thcPercentage || null, item.cbdPercentage || null, isInStock, stockQuantity, item.imageUrl || null, item.dutchieUrl || null, JSON.stringify(item.metadata || {}), ]); storeProductId = upsertResult.rows[0].id; isNewProduct = upsertResult.rows[0].is_new; logger.debug('pipeline', `${isNewProduct ? 'Inserted' : 'Updated'} canonical product: ${item.name} (ID: ${storeProductId})`); // ============================================================ // INSERT store_product_snapshots // ============================================================ await client.query(` INSERT INTO store_product_snapshots ( store_product_id, dispensary_id, crawl_run_id, price_rec, price_med, is_in_stock, stock_quantity, is_present_in_feed, captured_at, created_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, TRUE, NOW(), NOW() ) ON CONFLICT (store_product_id, crawl_run_id) WHERE crawl_run_id IS NOT NULL DO UPDATE SET price_rec = EXCLUDED.price_rec, price_med = EXCLUDED.price_med, is_in_stock = EXCLUDED.is_in_stock, stock_quantity = EXCLUDED.stock_quantity `, [ storeProductId, dispensaryId, this.crawlRunId, priceRec, priceMed, isInStock, stockQuantity, ]); // ============================================================ // UPSERT product_variants (if variants exist) // ============================================================ if (variants.length > 0) { for (const variant of variants) { const { value: weightValue, unit: weightUnit } = parseWeight(variant.option); const variantResult = await client.query(` INSERT INTO product_variants ( store_product_id, dispensary_id, option, price_rec, price_med, price_rec_special, price_med_special, quantity, quantity_available, in_stock, is_on_special, weight_value, weight_unit, first_seen_at, last_seen_at, created_at, updated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $8, $9, $10, $11, $12, NOW(), NOW(), NOW(), NOW() ) ON CONFLICT (store_product_id, option) DO UPDATE SET price_rec = EXCLUDED.price_rec, price_med = EXCLUDED.price_med, price_rec_special = EXCLUDED.price_rec_special, price_med_special = EXCLUDED.price_med_special, quantity = EXCLUDED.quantity, quantity_available = EXCLUDED.quantity_available, in_stock = EXCLUDED.in_stock, is_on_special = EXCLUDED.is_on_special, weight_value = EXCLUDED.weight_value, weight_unit = EXCLUDED.weight_unit, last_seen_at = NOW(), last_price_change_at = CASE WHEN product_variants.price_rec IS DISTINCT FROM EXCLUDED.price_rec OR product_variants.price_rec_special IS DISTINCT FROM EXCLUDED.price_rec_special THEN NOW() ELSE product_variants.last_price_change_at END, last_stock_change_at = CASE WHEN product_variants.in_stock IS DISTINCT FROM EXCLUDED.in_stock THEN NOW() ELSE product_variants.last_stock_change_at END, updated_at = NOW() RETURNING id `, [ storeProductId, dispensaryId, variant.option, variant.priceRec, variant.priceMed, variant.priceRecSpecial, variant.priceMedSpecial, variant.quantity, variant.inStock, variant.isOnSpecial, weightValue, weightUnit, ]); const variantId = variantResult.rows[0].id; // Insert variant snapshot await client.query(` INSERT INTO product_variant_snapshots ( product_variant_id, store_product_id, dispensary_id, crawl_run_id, option, price_rec, price_med, price_rec_special, price_med_special, quantity, in_stock, is_on_special, is_present_in_feed, captured_at, created_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, TRUE, NOW(), NOW() ) `, [ variantId, storeProductId, dispensaryId, this.crawlRunId, variant.option, variant.priceRec, variant.priceMed, variant.priceRecSpecial, variant.priceMedSpecial, variant.quantity, variant.inStock, variant.isOnSpecial, ]); } logger.debug('pipeline', `Upserted ${variants.length} variants for ${item.name}`); } // Attach metadata for stats tracking (item as any).isNewProduct = isNewProduct; (item as any).storeProductId = storeProductId; return item; } catch (error) { logger.error('pipeline', `Failed to save canonical product ${item.name}: ${error}`); return null; } finally { client.release(); } } } /** * Create a crawl run record before starting crawl */ export async function createCrawlRun( dispensaryId: number, provider: string = 'dutchie', triggerType: string = 'manual' ): Promise { const result = await pool.query(` INSERT INTO crawl_runs ( dispensary_id, provider, started_at, status, trigger_type ) VALUES ($1, $2, NOW(), 'running', $3) RETURNING id `, [dispensaryId, provider, triggerType]); return result.rows[0].id; } /** * Complete a crawl run with stats */ export async function completeCrawlRun( crawlRunId: number, stats: { productsFound: number; productsNew: number; productsUpdated: number; snapshotsWritten: number; variantsUpserted?: number; status?: 'completed' | 'failed' | 'partial'; error?: string; } ): Promise { await pool.query(` UPDATE crawl_runs SET finished_at = NOW(), status = $2, products_found = $3, products_new = $4, products_updated = $5, snapshots_written = $6, metadata = jsonb_build_object( 'variants_upserted', $7, 'error', $8 ) WHERE id = $1 `, [ crawlRunId, stats.status || 'completed', stats.productsFound, stats.productsNew, stats.productsUpdated, stats.snapshotsWritten, stats.variantsUpserted || 0, stats.error || null, ]); }