/** * Product Refresh Handler * * Per TASK_WORKFLOW_2024-12-10.md: Processes a locally-stored payload. * * This handler reads from the filesystem (NOT the Dutchie API). * The payload_fetch handler is responsible for API calls. * * Flow: * 1. Load payload from filesystem (by payload_id or latest for dispensary) * 2. Normalize data via DutchieNormalizer * 3. Upsert to store_products and store_product_snapshots * 4. Track missing products (increment consecutive_misses, mark OOS at 3) * 5. Download new product images * * Benefits of separation: * - Retry-friendly: If this fails, re-run without re-crawling * - Replay-able: Run against any historical payload * - Faster: Local file read vs network call */ import { TaskContext, TaskResult } from '../task-worker'; import { DutchieNormalizer } from '../../hydration/normalizers/dutchie'; import { upsertStoreProducts, createStoreProductSnapshots, downloadProductImages, } from '../../hydration/canonical-upsert'; import { loadRawPayloadById, getLatestPayload } from '../../utils/payload-storage'; const normalizer = new DutchieNormalizer(); export async function handleProductRefresh(ctx: TaskContext): Promise { const { pool, task } = ctx; const dispensaryId = task.dispensary_id; if (!dispensaryId) { return { success: false, error: 'No dispensary_id specified for product_refresh task' }; } try { // ============================================================ // STEP 1: Load dispensary info // ============================================================ const dispResult = await pool.query(` SELECT id, name, platform_dispensary_id, menu_url, menu_type, city, state FROM dispensaries WHERE id = $1 `, [dispensaryId]); if (dispResult.rows.length === 0) { return { success: false, error: `Dispensary ${dispensaryId} not found` }; } const dispensary = dispResult.rows[0]; // Extract cName from menu_url for image storage context const cNameMatch = dispensary.menu_url?.match(/\/(?:embedded-menu|dispensary)\/([^/?]+)/); const cName = cNameMatch ? cNameMatch[1] : 'dispensary'; console.log(`[ProductRefresh] Starting refresh for ${dispensary.name} (ID: ${dispensaryId})`); await ctx.heartbeat(); // ============================================================ // STEP 2: Load payload from filesystem // Per TASK_WORKFLOW_2024-12-10.md: Read local payload, not API // ============================================================ let payloadData: any; let payloadId: number; // Check if specific payload_id was provided (from task chaining) const taskPayload = task.payload as { payload_id?: number } | null; if (taskPayload?.payload_id) { // Load specific payload (from payload_fetch chaining) const result = await loadRawPayloadById(pool, taskPayload.payload_id); if (!result) { return { success: false, error: `Payload ${taskPayload.payload_id} not found` }; } payloadData = result.payload; payloadId = result.metadata.id; console.log(`[ProductRefresh] Loaded specific payload #${payloadId}`); } else { // Load latest payload for this dispensary const result = await getLatestPayload(pool, dispensaryId); if (!result) { return { success: false, error: `No payload found for dispensary ${dispensaryId}` }; } payloadData = result.payload; payloadId = result.metadata.id; console.log(`[ProductRefresh] Loaded latest payload #${payloadId} (${result.metadata.fetchedAt})`); } const allProducts = payloadData.products || []; if (allProducts.length === 0) { return { success: false, error: 'Payload contains no products', payloadId, productsProcessed: 0, }; } console.log(`[ProductRefresh] Processing ${allProducts.length} products from payload #${payloadId}`); await ctx.heartbeat(); // ============================================================ // STEP 3: Normalize data // ============================================================ console.log(`[ProductRefresh] Normalizing ${allProducts.length} products...`); // Build RawPayload for the normalizer const rawPayload = { id: `refresh-${dispensaryId}-${Date.now()}`, dispensary_id: dispensaryId, crawl_run_id: null, platform: 'dutchie', payload_version: 1, raw_json: { data: { filteredProducts: { products: allProducts } } }, product_count: allProducts.length, pricing_type: 'dual', crawl_mode: 'dual_mode', fetched_at: new Date(), processed: false, normalized_at: null, hydration_error: null, hydration_attempts: 0, created_at: new Date(), }; const normalizationResult = normalizer.normalize(rawPayload); if (normalizationResult.errors.length > 0) { console.warn(`[ProductRefresh] Normalization warnings: ${normalizationResult.errors.map(e => e.message).join(', ')}`); } if (normalizationResult.products.length === 0) { return { success: false, error: 'Normalization produced no products', payloadId, productsProcessed: 0, }; } console.log(`[ProductRefresh] Normalized ${normalizationResult.products.length} products`); await ctx.heartbeat(); // ============================================================ // STEP 4: Upsert to canonical tables // ============================================================ console.log(`[ProductRefresh] Upserting to store_products...`); const upsertResult = await upsertStoreProducts( pool, normalizationResult.products, normalizationResult.pricing, normalizationResult.availability ); console.log(`[ProductRefresh] Upserted: ${upsertResult.upserted} (${upsertResult.new} new, ${upsertResult.updated} updated)`); await ctx.heartbeat(); // Create snapshots console.log(`[ProductRefresh] Creating snapshots...`); const snapshotsResult = await createStoreProductSnapshots( pool, dispensaryId, normalizationResult.products, normalizationResult.pricing, normalizationResult.availability, null // No crawl_run_id in new system ); console.log(`[ProductRefresh] Created ${snapshotsResult.created} snapshots`); await ctx.heartbeat(); // ============================================================ // STEP 5: Track missing products (consecutive_misses logic) // - Products in feed: reset consecutive_misses to 0 // - Products not in feed: increment consecutive_misses // - At 3 consecutive misses: mark as OOS // ============================================================ const currentProductIds = allProducts .map((p: any) => p._id || p.id) .filter(Boolean); // Reset consecutive_misses for products that ARE in the feed if (currentProductIds.length > 0) { await pool.query(` UPDATE store_products SET consecutive_misses = 0, last_seen_at = NOW() WHERE dispensary_id = $1 AND provider = 'dutchie' AND provider_product_id = ANY($2) `, [dispensaryId, currentProductIds]); } // Increment consecutive_misses for products NOT in the feed const incrementResult = await pool.query(` UPDATE store_products SET consecutive_misses = consecutive_misses + 1 WHERE dispensary_id = $1 AND provider = 'dutchie' AND provider_product_id NOT IN (SELECT unnest($2::text[])) AND consecutive_misses < 3 RETURNING id `, [dispensaryId, currentProductIds]); const incrementedCount = incrementResult.rowCount || 0; if (incrementedCount > 0) { console.log(`[ProductRefresh] Incremented consecutive_misses for ${incrementedCount} products`); } // Mark as OOS any products that hit 3 consecutive misses const oosResult = await pool.query(` UPDATE store_products SET stock_status = 'oos', is_in_stock = false WHERE dispensary_id = $1 AND provider = 'dutchie' AND consecutive_misses >= 3 AND stock_status != 'oos' RETURNING id `, [dispensaryId]); const markedOosCount = oosResult.rowCount || 0; if (markedOosCount > 0) { console.log(`[ProductRefresh] Marked ${markedOosCount} products as OOS (3+ consecutive misses)`); } await ctx.heartbeat(); // ============================================================ // STEP 6: Download images for new products // ============================================================ if (upsertResult.productsNeedingImages.length > 0) { console.log(`[ProductRefresh] Downloading images for ${upsertResult.productsNeedingImages.length} products...`); try { const dispensaryContext = { stateCode: dispensary.state || 'AZ', storeSlug: cName, }; await downloadProductImages( pool, upsertResult.productsNeedingImages, dispensaryContext ); } catch (imgError: any) { // Image download errors shouldn't fail the whole task console.warn(`[ProductRefresh] Image download error (non-fatal): ${imgError.message}`); } } // ============================================================ // STEP 7: Update dispensary last_crawl_at // ============================================================ await pool.query(` UPDATE dispensaries SET last_crawl_at = NOW() WHERE id = $1 `, [dispensaryId]); // ============================================================ // STEP 8: Mark payload as processed // ============================================================ await pool.query(` UPDATE raw_crawl_payloads SET processed_at = NOW() WHERE id = $1 `, [payloadId]); console.log(`[ProductRefresh] Completed ${dispensary.name}`); return { success: true, payloadId, productsProcessed: normalizationResult.products.length, snapshotsCreated: snapshotsResult.created, newProducts: upsertResult.new, updatedProducts: upsertResult.updated, markedOos: markedOosCount, }; } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : 'Unknown error'; console.error(`[ProductRefresh] Error for dispensary ${dispensaryId}:`, errorMessage); return { success: false, error: errorMessage, }; } }