/** * CanonicalHydrationService * Orchestrates the full hydration pipeline from dutchie_* to canonical tables */ import { Pool } from 'pg'; import { CrawlRunRecorder } from './crawl-run-recorder'; import { StoreProductNormalizer } from './store-product-normalizer'; import { SnapshotWriter } from './snapshot-writer'; import { HydrationOptions, HydrationResult, ServiceContext, SourceJob } from './types'; export class CanonicalHydrationService { private pool: Pool; private log: (message: string) => void; private crawlRunRecorder: CrawlRunRecorder; private productNormalizer: StoreProductNormalizer; private snapshotWriter: SnapshotWriter; constructor(ctx: ServiceContext) { this.pool = ctx.pool; this.log = ctx.logger || console.log; this.crawlRunRecorder = new CrawlRunRecorder(ctx); this.productNormalizer = new StoreProductNormalizer(ctx); this.snapshotWriter = new SnapshotWriter(ctx); } /** * Run the full hydration pipeline * Supports both backfill (historical) and incremental (ongoing) modes */ async hydrate(options: HydrationOptions): Promise { const startTime = Date.now(); const result: HydrationResult = { crawlRunsCreated: 0, crawlRunsSkipped: 0, productsUpserted: 0, snapshotsWritten: 0, errors: [], durationMs: 0, }; this.log(`Starting hydration in ${options.mode} mode`); try { if (options.mode === 'backfill') { await this.runBackfill(options, result); } else { await this.runIncremental(options, result); } } catch (err: any) { result.errors.push(`Fatal error: ${err.message}`); this.log(`Hydration failed: ${err.message}`); } result.durationMs = Date.now() - startTime; this.log(`Hydration completed in ${result.durationMs}ms: ${JSON.stringify({ crawlRunsCreated: result.crawlRunsCreated, crawlRunsSkipped: result.crawlRunsSkipped, productsUpserted: result.productsUpserted, snapshotsWritten: result.snapshotsWritten, errors: result.errors.length, })}`); return result; } /** * Backfill mode: Process historical data from source tables */ private async runBackfill(options: HydrationOptions, result: HydrationResult): Promise { const batchSize = options.batchSize || 50; // Get source jobs to process const sourceJobs = await this.crawlRunRecorder.getSourceJobsForBackfill( options.startDate, options.endDate, options.dispensaryId, 1000 // Max jobs to process ); this.log(`Found ${sourceJobs.length} source jobs to backfill`); // Group jobs by dispensary for efficient processing const jobsByDispensary = this.groupJobsByDispensary(sourceJobs); for (const [dispensaryId, jobs] of jobsByDispensary) { this.log(`Processing dispensary ${dispensaryId} (${jobs.length} jobs)`); try { // Step 1: Upsert products for this dispensary if (!options.dryRun) { const productResult = await this.productNormalizer.upsertProductsForDispensary(dispensaryId); result.productsUpserted += productResult.upserted; if (productResult.errors.length > 0) { result.errors.push(...productResult.errors.map(e => `Dispensary ${dispensaryId}: ${e}`)); } } // Get store_product_id map for snapshot writing const storeProductIdMap = await this.productNormalizer.getStoreProductIdMap(dispensaryId); // Step 2: Record crawl runs and write snapshots for each job for (const job of jobs) { try { await this.processJob(job, storeProductIdMap, result, options.dryRun); } catch (err: any) { result.errors.push(`Job ${job.id}: ${err.message}`); } } } catch (err: any) { result.errors.push(`Dispensary ${dispensaryId}: ${err.message}`); } } } /** * Incremental mode: Process only unhydrated jobs */ private async runIncremental(options: HydrationOptions, result: HydrationResult): Promise { const limit = options.batchSize || 100; // Get unhydrated jobs const unhydratedJobs = await this.crawlRunRecorder.getUnhydratedJobs( options.dispensaryId, options.startDate, limit ); this.log(`Found ${unhydratedJobs.length} unhydrated jobs`); // Group by dispensary const jobsByDispensary = this.groupJobsByDispensary(unhydratedJobs); for (const [dispensaryId, jobs] of jobsByDispensary) { this.log(`Processing dispensary ${dispensaryId} (${jobs.length} jobs)`); try { // Step 1: Upsert products if (!options.dryRun) { const productResult = await this.productNormalizer.upsertProductsForDispensary(dispensaryId); result.productsUpserted += productResult.upserted; if (productResult.errors.length > 0) { result.errors.push(...productResult.errors.map(e => `Dispensary ${dispensaryId}: ${e}`)); } } // Get store_product_id map const storeProductIdMap = await this.productNormalizer.getStoreProductIdMap(dispensaryId); // Step 2: Process each job for (const job of jobs) { try { await this.processJob(job, storeProductIdMap, result, options.dryRun); } catch (err: any) { result.errors.push(`Job ${job.id}: ${err.message}`); } } } catch (err: any) { result.errors.push(`Dispensary ${dispensaryId}: ${err.message}`); } } } /** * Process a single job: record crawl run and write snapshots */ private async processJob( job: SourceJob, storeProductIdMap: Map, result: HydrationResult, dryRun?: boolean ): Promise { // Step 1: Record the crawl run let crawlRunId: number | null = null; if (!dryRun) { crawlRunId = await this.crawlRunRecorder.recordCrawlRun(job); if (crawlRunId) { result.crawlRunsCreated++; } else { result.crawlRunsSkipped++; return; // Skip snapshot writing if crawl run wasn't created } } else { // In dry run, check if it would be created const existingId = await this.crawlRunRecorder.getCrawlRunIdBySourceJob( 'dispensary_crawl_jobs', job.id ); if (existingId) { result.crawlRunsSkipped++; return; } result.crawlRunsCreated++; return; // Skip snapshot writing in dry run } // Step 2: Write snapshots for this crawl run if (crawlRunId && job.completed_at) { const snapshotResult = await this.snapshotWriter.writeSnapshotsForCrawlRun( crawlRunId, job.dispensary_id, storeProductIdMap, job.completed_at ); result.snapshotsWritten += snapshotResult.written; if (snapshotResult.errors.length > 0) { result.errors.push(...snapshotResult.errors); } // Update crawl_run with snapshots_written count await this.crawlRunRecorder.updateSnapshotsWritten(crawlRunId, snapshotResult.written); } } /** * Hydrate a single dispensary (convenience method) */ async hydrateDispensary( dispensaryId: number, mode: 'backfill' | 'incremental' = 'incremental' ): Promise { return this.hydrate({ mode, dispensaryId, }); } /** * Get hydration status for a dispensary */ async getHydrationStatus(dispensaryId: number): Promise<{ sourceJobs: number; hydratedJobs: number; unhydratedJobs: number; sourceProducts: number; storeProducts: number; sourceSnapshots: number; storeSnapshots: number; }> { const [sourceJobs, hydratedJobs, sourceProducts, storeProducts, sourceSnapshots, storeSnapshots] = await Promise.all([ this.pool.query( `SELECT COUNT(*) FROM dispensary_crawl_jobs WHERE dispensary_id = $1 AND status = 'completed' AND job_type = 'dutchie_product_crawl'`, [dispensaryId] ), this.pool.query( `SELECT COUNT(*) FROM crawl_runs WHERE dispensary_id = $1 AND source_job_type = 'dispensary_crawl_jobs'`, [dispensaryId] ), this.pool.query( `SELECT COUNT(*) FROM dutchie_products WHERE dispensary_id = $1`, [dispensaryId] ), this.pool.query( `SELECT COUNT(*) FROM store_products WHERE dispensary_id = $1 AND provider = 'dutchie'`, [dispensaryId] ), this.pool.query( `SELECT COUNT(*) FROM dutchie_product_snapshots WHERE dispensary_id = $1`, [dispensaryId] ), this.pool.query( `SELECT COUNT(*) FROM store_product_snapshots WHERE dispensary_id = $1`, [dispensaryId] ), ]); const sourceJobCount = parseInt(sourceJobs.rows[0].count); const hydratedJobCount = parseInt(hydratedJobs.rows[0].count); return { sourceJobs: sourceJobCount, hydratedJobs: hydratedJobCount, unhydratedJobs: sourceJobCount - hydratedJobCount, sourceProducts: parseInt(sourceProducts.rows[0].count), storeProducts: parseInt(storeProducts.rows[0].count), sourceSnapshots: parseInt(sourceSnapshots.rows[0].count), storeSnapshots: parseInt(storeSnapshots.rows[0].count), }; } /** * Get overall hydration status */ async getOverallStatus(): Promise<{ totalSourceJobs: number; totalHydratedJobs: number; totalSourceProducts: number; totalStoreProducts: number; totalSourceSnapshots: number; totalStoreSnapshots: number; dispensariesWithData: number; }> { const [sourceJobs, hydratedJobs, sourceProducts, storeProducts, sourceSnapshots, storeSnapshots, dispensaries] = await Promise.all([ this.pool.query( `SELECT COUNT(*) FROM dispensary_crawl_jobs WHERE status = 'completed' AND job_type = 'dutchie_product_crawl'` ), this.pool.query( `SELECT COUNT(*) FROM crawl_runs WHERE source_job_type = 'dispensary_crawl_jobs'` ), this.pool.query(`SELECT COUNT(*) FROM dutchie_products`), this.pool.query(`SELECT COUNT(*) FROM store_products WHERE provider = 'dutchie'`), this.pool.query(`SELECT COUNT(*) FROM dutchie_product_snapshots`), this.pool.query(`SELECT COUNT(*) FROM store_product_snapshots`), this.pool.query( `SELECT COUNT(DISTINCT dispensary_id) FROM dutchie_products` ), ]); return { totalSourceJobs: parseInt(sourceJobs.rows[0].count), totalHydratedJobs: parseInt(hydratedJobs.rows[0].count), totalSourceProducts: parseInt(sourceProducts.rows[0].count), totalStoreProducts: parseInt(storeProducts.rows[0].count), totalSourceSnapshots: parseInt(sourceSnapshots.rows[0].count), totalStoreSnapshots: parseInt(storeSnapshots.rows[0].count), dispensariesWithData: parseInt(dispensaries.rows[0].count), }; } /** * Group jobs by dispensary ID */ private groupJobsByDispensary(jobs: SourceJob[]): Map { const map = new Map(); for (const job of jobs) { const list = map.get(job.dispensary_id) || []; list.push(job); map.set(job.dispensary_id, list); } return map; } /** * Products-only hydration mode * Used when there are no historical job records - creates synthetic crawl runs * from current product data */ async hydrateProductsOnly(options: { dispensaryId?: number; dryRun?: boolean; } = {}): Promise { const startTime = Date.now(); const result: HydrationResult = { crawlRunsCreated: 0, crawlRunsSkipped: 0, productsUpserted: 0, snapshotsWritten: 0, errors: [], durationMs: 0, }; this.log('Starting products-only hydration mode'); try { // Get all dispensaries with products let dispensaryIds: number[]; if (options.dispensaryId) { dispensaryIds = [options.dispensaryId]; } else { const dispResult = await this.pool.query( 'SELECT DISTINCT dispensary_id FROM dutchie_products ORDER BY dispensary_id' ); dispensaryIds = dispResult.rows.map(r => r.dispensary_id); } this.log(`Processing ${dispensaryIds.length} dispensaries`); for (const dispensaryId of dispensaryIds) { try { await this.hydrateDispensaryProductsOnly(dispensaryId, result, options.dryRun); } catch (err: any) { result.errors.push(`Dispensary ${dispensaryId}: ${err.message}`); } } } catch (err: any) { result.errors.push(`Fatal error: ${err.message}`); } result.durationMs = Date.now() - startTime; this.log(`Products-only hydration completed in ${result.durationMs}ms: ${JSON.stringify({ crawlRunsCreated: result.crawlRunsCreated, productsUpserted: result.productsUpserted, snapshotsWritten: result.snapshotsWritten, errors: result.errors.length, })}`); return result; } /** * Hydrate a single dispensary in products-only mode */ private async hydrateDispensaryProductsOnly( dispensaryId: number, result: HydrationResult, dryRun?: boolean ): Promise { // Get product count and timestamps for this dispensary const statsResult = await this.pool.query( `SELECT COUNT(*) as cnt, MIN(created_at) as min_date, MAX(updated_at) as max_date FROM dutchie_products WHERE dispensary_id = $1`, [dispensaryId] ); const stats = statsResult.rows[0]; const productCount = parseInt(stats.cnt); if (productCount === 0) { this.log(`Dispensary ${dispensaryId}: No products, skipping`); return; } this.log(`Dispensary ${dispensaryId}: ${productCount} products`); // Step 1: Create synthetic crawl run let crawlRunId: number | null = null; const now = new Date(); if (!dryRun) { // Check if we already have a synthetic run for this dispensary const existingRun = await this.pool.query( `SELECT id FROM crawl_runs WHERE dispensary_id = $1 AND source_job_type = 'products_only_hydration' LIMIT 1`, [dispensaryId] ); if (existingRun.rows.length > 0) { crawlRunId = existingRun.rows[0].id; this.log(`Dispensary ${dispensaryId}: Using existing synthetic crawl run ${crawlRunId}`); result.crawlRunsSkipped++; } else { // Create new synthetic crawl run const insertResult = await this.pool.query( `INSERT INTO crawl_runs ( dispensary_id, provider, started_at, finished_at, duration_ms, status, products_found, trigger_type, metadata, source_job_type, source_job_id ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id`, [ dispensaryId, 'dutchie', stats.min_date || now, stats.max_date || now, 0, 'success', productCount, 'hydration', JSON.stringify({ mode: 'products_only', hydratedAt: now.toISOString() }), 'products_only_hydration', dispensaryId, // Use dispensary_id as synthetic job_id ] ); crawlRunId = insertResult.rows[0].id; result.crawlRunsCreated++; this.log(`Dispensary ${dispensaryId}: Created synthetic crawl run ${crawlRunId}`); } // Step 2: Upsert products const productResult = await this.productNormalizer.upsertProductsForDispensary(dispensaryId); result.productsUpserted += productResult.upserted; if (productResult.errors.length > 0) { result.errors.push(...productResult.errors.map(e => `Dispensary ${dispensaryId}: ${e}`)); } // Step 3: Create initial snapshots from current product state // crawlRunId is guaranteed to be set at this point (either from existing run or insert) const snapshotsWritten = await this.createInitialSnapshots(dispensaryId, crawlRunId!); result.snapshotsWritten += snapshotsWritten; // Update crawl run with snapshot count await this.pool.query( 'UPDATE crawl_runs SET snapshots_written = $1 WHERE id = $2', [snapshotsWritten, crawlRunId] ); } else { // Dry run - just count what would be done result.crawlRunsCreated++; result.productsUpserted += productCount; result.snapshotsWritten += productCount; } } /** * Create initial snapshots from current product state */ private async createInitialSnapshots( dispensaryId: number, crawlRunId: number ): Promise { // Get all store products for this dispensary const products = await this.pool.query( `SELECT sp.id, sp.price_rec, sp.price_med, sp.is_on_special, sp.is_in_stock, sp.stock_quantity, sp.thc_percent, sp.cbd_percent FROM store_products sp WHERE sp.dispensary_id = $1 AND sp.provider = 'dutchie'`, [dispensaryId] ); if (products.rows.length === 0) return 0; const now = new Date(); const batchSize = 100; let totalInserted = 0; // Process in batches for (let i = 0; i < products.rows.length; i += batchSize) { const batch = products.rows.slice(i, i + batchSize); const values: any[] = []; const placeholders: string[] = []; let paramIndex = 1; for (const product of batch) { values.push( dispensaryId, product.id, crawlRunId, now, product.price_rec, product.price_med, product.is_on_special || false, product.is_in_stock || false, product.stock_quantity, product.thc_percent, product.cbd_percent, JSON.stringify({ source: 'initial_hydration' }) ); const rowPlaceholders = []; for (let j = 0; j < 12; j++) { rowPlaceholders.push(`$${paramIndex++}`); } placeholders.push(`(${rowPlaceholders.join(', ')}, NOW())`); } const query = ` INSERT INTO store_product_snapshots ( dispensary_id, store_product_id, crawl_run_id, captured_at, price_rec, price_med, is_on_special, is_in_stock, stock_quantity, thc_percent, cbd_percent, raw_data, created_at ) VALUES ${placeholders.join(', ')} ON CONFLICT (store_product_id, crawl_run_id) WHERE store_product_id IS NOT NULL AND crawl_run_id IS NOT NULL DO NOTHING `; const result = await this.pool.query(query, values); totalInserted += result.rowCount || 0; } return totalInserted; } }