/** * Incremental Sync * * Hooks into the crawler to automatically write to canonical tables * after each crawl completes. This ensures store_products and * store_product_snapshots stay in sync with new data. * * Two modes: * 1. Inline - Called directly from crawler after saving to legacy tables * 2. Async - Called from a background worker that processes recent crawls * * Usage: * // Inline mode (in crawler) * import { syncCrawlToCanonical } from './hydration/incremental-sync'; * await syncCrawlToCanonical(pool, crawlResult); * * // Async mode (background worker) * import { syncRecentCrawls } from './hydration/incremental-sync'; * await syncRecentCrawls(pool, { since: '1 hour' }); */ import { Pool } from 'pg'; const BATCH_SIZE = 100; // ============================================================ // TYPES // ============================================================ export interface CrawlResult { dispensaryId: number; stateId?: number; platformDispensaryId?: string; crawlJobId?: number; // legacy dispensary_crawl_jobs.id startedAt: Date; finishedAt?: Date; status: 'success' | 'failed' | 'running'; errorMessage?: string; productsFound: number; productsCreated: number; productsUpdated: number; productsMissing?: number; brandsFound?: number; } export interface SyncOptions { dryRun?: boolean; verbose?: boolean; skipSnapshots?: boolean; } export interface SyncResult { crawlRunId: number | null; productsUpserted: number; productsNew: number; productsUpdated: number; snapshotsCreated: number; durationMs: number; errors: string[]; } // ============================================================ // CREATE OR GET CRAWL RUN // ============================================================ /** * Create a crawl_run record for a completed crawl. * Returns existing if already synced (idempotent). */ export async function getOrCreateCrawlRun( pool: Pool, crawlResult: CrawlResult, options: SyncOptions = {} ): Promise { const { dryRun = false, verbose = false } = options; // Check if already exists (by legacy job ID) if (crawlResult.crawlJobId) { const existing = await pool.query( `SELECT id FROM crawl_runs WHERE legacy_dispensary_crawl_job_id = $1`, [crawlResult.crawlJobId] ); if (existing.rows.length > 0) { if (verbose) { console.log(`[IncrSync] Found existing crawl_run ${existing.rows[0].id} for job ${crawlResult.crawlJobId}`); } return existing.rows[0].id; } } if (dryRun) { console.log(`[IncrSync][DryRun] Would create crawl_run for dispensary ${crawlResult.dispensaryId}`); return null; } const durationMs = crawlResult.finishedAt && crawlResult.startedAt ? crawlResult.finishedAt.getTime() - crawlResult.startedAt.getTime() : null; const result = await pool.query( `INSERT INTO crawl_runs ( dispensary_id, state_id, provider, legacy_dispensary_crawl_job_id, started_at, finished_at, duration_ms, status, error_message, products_found, products_new, products_updated, products_missing, brands_found, trigger_type, created_at ) VALUES ( $1, $2, 'dutchie', $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, 'scheduled', NOW() ) RETURNING id`, [ crawlResult.dispensaryId, crawlResult.stateId, crawlResult.crawlJobId, crawlResult.startedAt, crawlResult.finishedAt, durationMs, crawlResult.status, crawlResult.errorMessage, crawlResult.productsFound, crawlResult.productsCreated, crawlResult.productsUpdated, crawlResult.productsMissing || 0, crawlResult.brandsFound || 0, ] ); if (verbose) { console.log(`[IncrSync] Created crawl_run ${result.rows[0].id}`); } return result.rows[0].id; } // ============================================================ // SYNC PRODUCTS TO CANONICAL // ============================================================ /** * Sync dutchie_products to store_products for a single dispensary. * Called after a crawl completes. */ export async function syncProductsToCanonical( pool: Pool, dispensaryId: number, stateId: number | null, crawlRunId: number | null, options: SyncOptions = {} ): Promise<{ upserted: number; new: number; updated: number; errors: string[] }> { const { dryRun = false, verbose = false } = options; const errors: string[] = []; let newCount = 0; let updatedCount = 0; // Get all products for this dispensary const { rows: products } = await pool.query( `SELECT dp.id, dp.external_product_id, dp.name, dp.brand_name, dp.brand_id, dp.category, dp.subcategory, dp.type, dp.strain_type, dp.description, dp.effects, dp.cannabinoids_v2, dp.thc, dp.thc_content, dp.cbd, dp.cbd_content, dp.primary_image_url, dp.local_image_url, dp.local_image_thumb_url, dp.local_image_medium_url, dp.original_image_url, dp.additional_images, dp.stock_status, dp.c_name, dp.enterprise_product_id, dp.weight, dp.options, dp.measurements, dp.status, dp.featured, dp.special, dp.medical_only, dp.rec_only, dp.is_below_threshold, dp.is_below_kiosk_threshold, dp.total_quantity_available, dp.total_kiosk_quantity_available, dp.first_seen_at, dp.last_seen_at, dp.updated_at, d.platform_dispensary_id FROM dutchie_products dp LEFT JOIN dispensaries d ON d.id = dp.dispensary_id WHERE dp.dispensary_id = $1`, [dispensaryId] ); if (verbose) { console.log(`[IncrSync] Found ${products.length} products for dispensary ${dispensaryId}`); } // Process in batches for (let i = 0; i < products.length; i += BATCH_SIZE) { const batch = products.slice(i, i + BATCH_SIZE); for (const p of batch) { try { const thcPercent = parseFloat(p.thc) || parseFloat(p.thc_content) || null; const cbdPercent = parseFloat(p.cbd) || parseFloat(p.cbd_content) || null; const stockStatus = p.stock_status || 'unknown'; const isInStock = stockStatus === 'in_stock' || stockStatus === 'unknown'; if (dryRun) { if (verbose) { console.log(`[IncrSync][DryRun] Would upsert product ${p.external_product_id}`); } newCount++; continue; } const result = await pool.query( `INSERT INTO store_products ( dispensary_id, provider, provider_product_id, provider_brand_id, platform_dispensary_id, external_product_id, name_raw, brand_name_raw, category_raw, subcategory_raw, strain_type, description, effects, cannabinoids_v2, thc_percent, cbd_percent, thc_content, cbd_content, is_in_stock, stock_status, stock_quantity, total_quantity_available, image_url, primary_image_url, images, is_on_special, featured, medical_only, rec_only, is_below_threshold, is_below_kiosk_threshold, status, c_name, weight, measurements, first_seen_at, last_seen_at, created_at, updated_at ) VALUES ( $1, 'dutchie', $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, NOW(), NOW() ) ON CONFLICT (dispensary_id, provider, provider_product_id) DO UPDATE SET name_raw = EXCLUDED.name_raw, brand_name_raw = EXCLUDED.brand_name_raw, category_raw = EXCLUDED.category_raw, subcategory_raw = EXCLUDED.subcategory_raw, strain_type = EXCLUDED.strain_type, is_in_stock = EXCLUDED.is_in_stock, stock_status = EXCLUDED.stock_status, stock_quantity = EXCLUDED.stock_quantity, total_quantity_available = EXCLUDED.total_quantity_available, thc_percent = EXCLUDED.thc_percent, cbd_percent = EXCLUDED.cbd_percent, thc_content = EXCLUDED.thc_content, cbd_content = EXCLUDED.cbd_content, image_url = EXCLUDED.image_url, primary_image_url = EXCLUDED.primary_image_url, is_on_special = EXCLUDED.is_on_special, status = EXCLUDED.status, description = COALESCE(EXCLUDED.description, store_products.description), effects = COALESCE(EXCLUDED.effects, store_products.effects), cannabinoids_v2 = COALESCE(EXCLUDED.cannabinoids_v2, store_products.cannabinoids_v2), weight = EXCLUDED.weight, measurements = EXCLUDED.measurements, last_seen_at = NOW(), updated_at = NOW() RETURNING (xmax = 0) as is_new`, [ dispensaryId, // $1 p.external_product_id, // $2 p.brand_id, // $3 p.platform_dispensary_id, // $4 p.external_product_id, // $5 external_product_id p.name, // $6 p.brand_name, // $7 p.type || p.category, // $8 category_raw p.subcategory, // $9 p.strain_type, // $10 p.description, // $11 p.effects, // $12 p.cannabinoids_v2, // $13 thcPercent, // $14 cbdPercent, // $15 p.thc_content, // $16 p.cbd_content, // $17 isInStock, // $18 stockStatus, // $19 p.total_quantity_available || 0, // $20 stock_quantity p.total_quantity_available || 0, // $21 p.primary_image_url, // $22 image_url p.primary_image_url, // $23 p.additional_images, // $24 images p.special || false, // $25 p.featured || false, // $26 p.medical_only || false, // $27 p.rec_only || false, // $28 p.is_below_threshold || false, // $29 p.is_below_kiosk_threshold || false, // $30 p.status, // $31 p.c_name, // $32 p.weight, // $33 p.measurements, // $34 p.first_seen_at || p.updated_at, // $35 p.last_seen_at || p.updated_at, // $36 ] ); if (result.rows[0]?.is_new) { newCount++; } else { updatedCount++; } } catch (error: any) { errors.push(`Product ${p.id}: ${error.message}`); } } } return { upserted: newCount + updatedCount, new: newCount, updated: updatedCount, errors, }; } // ============================================================ // SYNC SNAPSHOTS TO CANONICAL // ============================================================ /** * Sync dutchie_product_snapshots to store_product_snapshots for recent crawls. */ export async function syncSnapshotsToCanonical( pool: Pool, dispensaryId: number, stateId: number | null, crawlRunId: number | null, since: Date, options: SyncOptions = {} ): Promise<{ created: number; errors: string[] }> { const { dryRun = false, verbose = false } = options; const errors: string[] = []; let created = 0; // Get recent snapshots that haven't been synced yet const { rows: snapshots } = await pool.query( `SELECT dps.id, dps.dutchie_product_id, dps.dispensary_id, dps.options, dps.raw_product_data, dps.crawled_at, dps.created_at, dp.external_product_id, dp.name, dp.brand_name, dp.category, dp.subcategory, sp.id as store_product_id, d.platform_dispensary_id FROM dutchie_product_snapshots dps JOIN dutchie_products dp ON dp.id = dps.dutchie_product_id LEFT JOIN store_products sp ON sp.dispensary_id = dps.dispensary_id AND sp.provider_product_id = dp.external_product_id AND sp.provider = 'dutchie' LEFT JOIN dispensaries d ON d.id = dps.dispensary_id LEFT JOIN store_product_snapshots sps ON sps.legacy_snapshot_id = dps.id WHERE dps.dispensary_id = $1 AND dps.crawled_at >= $2 AND sps.id IS NULL ORDER BY dps.id`, [dispensaryId, since] ); if (verbose) { console.log(`[IncrSync] Found ${snapshots.length} new snapshots since ${since.toISOString()}`); } if (snapshots.length === 0) { return { created: 0, errors: [] }; } for (const s of snapshots) { try { // Extract pricing from raw_product_data let priceRec: number | null = null; let priceMed: number | null = null; let priceRecSpecial: number | null = null; let isOnSpecial = false; let isInStock = true; let thcPercent: number | null = null; let cbdPercent: number | null = null; let stockStatus = 'unknown'; let platformStatus: string | null = null; if (s.raw_product_data) { const raw = typeof s.raw_product_data === 'string' ? JSON.parse(s.raw_product_data) : s.raw_product_data; priceRec = raw.recPrices?.[0] || raw.Prices?.[0] || null; priceMed = raw.medicalPrices?.[0] || null; priceRecSpecial = raw.recSpecialPrices?.[0] || null; isOnSpecial = raw.special === true || (priceRecSpecial !== null); thcPercent = raw.THCContent?.range?.[0] || raw.THC || null; cbdPercent = raw.CBDContent?.range?.[0] || raw.CBD || null; platformStatus = raw.Status || null; isInStock = platformStatus === 'Active'; stockStatus = isInStock ? 'in_stock' : 'out_of_stock'; } if (dryRun) { if (verbose) { console.log(`[IncrSync][DryRun] Would create snapshot for legacy ${s.id}`); } created++; continue; } await pool.query( `INSERT INTO store_product_snapshots ( dispensary_id, store_product_id, state_id, provider, provider_product_id, provider_dispensary_id, crawl_run_id, legacy_snapshot_id, legacy_dutchie_product_id, captured_at, name, brand_name, category, subcategory, price_rec, price_med, price_rec_special, is_on_special, is_in_stock, stock_status, thc_percent, cbd_percent, platform_status, options, raw_data, created_at ) VALUES ( $1, $2, $3, 'dutchie', $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, NOW() )`, [ s.dispensary_id, s.store_product_id, stateId, s.external_product_id, s.platform_dispensary_id, crawlRunId, s.id, s.dutchie_product_id, s.crawled_at, s.name, s.brand_name, s.category, s.subcategory, priceRec, priceMed, priceRecSpecial, isOnSpecial, isInStock, stockStatus, thcPercent, cbdPercent, platformStatus, s.options, s.raw_product_data, ] ); created++; } catch (error: any) { errors.push(`Snapshot ${s.id}: ${error.message}`); } } return { created, errors }; } // ============================================================ // MAIN SYNC FUNCTION // ============================================================ /** * Sync a single crawl result to canonical tables. * Call this from the crawler after each crawl completes. */ export async function syncCrawlToCanonical( pool: Pool, crawlResult: CrawlResult, options: SyncOptions = {} ): Promise { const startTime = Date.now(); const errors: string[] = []; const { verbose = false, skipSnapshots = false } = options; if (verbose) { console.log(`[IncrSync] Starting sync for dispensary ${crawlResult.dispensaryId}`); } // 1. Create crawl_run record const crawlRunId = await getOrCreateCrawlRun(pool, crawlResult, options); // 2. Sync products const productResult = await syncProductsToCanonical( pool, crawlResult.dispensaryId, crawlResult.stateId || null, crawlRunId, options ); errors.push(...productResult.errors); // 3. Sync snapshots (if not skipped) let snapshotsCreated = 0; if (!skipSnapshots) { const since = new Date(crawlResult.startedAt.getTime() - 60 * 1000); // 1 min before const snapshotResult = await syncSnapshotsToCanonical( pool, crawlResult.dispensaryId, crawlResult.stateId || null, crawlRunId, since, options ); snapshotsCreated = snapshotResult.created; errors.push(...snapshotResult.errors); } const durationMs = Date.now() - startTime; if (verbose) { console.log(`[IncrSync] Completed in ${durationMs}ms: ${productResult.upserted} products, ${snapshotsCreated} snapshots`); } return { crawlRunId, productsUpserted: productResult.upserted, productsNew: productResult.new, productsUpdated: productResult.updated, snapshotsCreated, durationMs, errors, }; } // ============================================================ // BATCH SYNC FOR RECENT CRAWLS // ============================================================ export interface RecentSyncOptions extends SyncOptions { since?: string; // e.g., '1 hour', '30 minutes', '1 day' dispensaryId?: number; limit?: number; } /** * Sync recent crawls that haven't been synced yet. * Run this as a background job to catch any missed syncs. */ export async function syncRecentCrawls( pool: Pool, options: RecentSyncOptions = {} ): Promise<{ synced: number; errors: string[] }> { const { since = '1 hour', dispensaryId, limit = 100, verbose = false, dryRun = false, } = options; const errors: string[] = []; let synced = 0; // Find recent completed crawl jobs that don't have a crawl_run let query = ` SELECT dcj.id as crawl_job_id, dcj.dispensary_id, dcj.status, dcj.started_at, dcj.completed_at, dcj.products_found, dcj.products_created, dcj.products_updated, dcj.brands_found, dcj.error_message, d.state_id FROM dispensary_crawl_jobs dcj LEFT JOIN dispensaries d ON d.id = dcj.dispensary_id LEFT JOIN crawl_runs cr ON cr.legacy_dispensary_crawl_job_id = dcj.id WHERE dcj.status IN ('completed', 'failed') AND dcj.started_at > NOW() - INTERVAL '${since}' AND cr.id IS NULL `; const params: any[] = []; let paramIdx = 1; if (dispensaryId) { query += ` AND dcj.dispensary_id = $${paramIdx}`; params.push(dispensaryId); paramIdx++; } query += ` ORDER BY dcj.started_at DESC LIMIT $${paramIdx}`; params.push(limit); const { rows: unsynced } = await pool.query(query, params); if (verbose) { console.log(`[IncrSync] Found ${unsynced.length} unsynced crawls from last ${since}`); } for (const job of unsynced) { try { const crawlResult: CrawlResult = { dispensaryId: job.dispensary_id, stateId: job.state_id, crawlJobId: job.crawl_job_id, startedAt: new Date(job.started_at), finishedAt: job.completed_at ? new Date(job.completed_at) : undefined, status: job.status === 'completed' ? 'success' : 'failed', errorMessage: job.error_message, productsFound: job.products_found || 0, productsCreated: job.products_created || 0, productsUpdated: job.products_updated || 0, brandsFound: job.brands_found || 0, }; await syncCrawlToCanonical(pool, crawlResult, { dryRun, verbose }); synced++; } catch (error: any) { errors.push(`Job ${job.crawl_job_id}: ${error.message}`); } } return { synced, errors }; } // Types CrawlResult, SyncOptions, and SyncResult are already exported at their declarations