/** * Hydration Worker * * Processes raw payloads and hydrates them to canonical tables. * Features: * - Distributed locking to prevent double-processing * - Batch processing for efficiency * - Automatic retry with backoff * - Dry-run mode for testing */ import { Pool } from 'pg'; import { v4 as uuidv4 } from 'uuid'; import { RawPayload, HydrationOptions, HydrationResult, HydrationBatchResult, } from './types'; import { getNormalizer } from './normalizers'; import { getUnprocessedPayloads, markPayloadProcessed, markPayloadFailed, } from './payload-store'; import { HydrationLockManager, LOCK_NAMES } from './locking'; import { hydrateToCanonical } from './canonical-upsert'; const DEFAULT_BATCH_SIZE = 50; const DEFAULT_MAX_RETRIES = 3; const DEFAULT_LOCK_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutes // ============================================================ // HYDRATION WORKER CLASS // ============================================================ export class HydrationWorker { private pool: Pool; private lockManager: HydrationLockManager; private workerId: string; private options: HydrationOptions; private isRunning: boolean = false; constructor(pool: Pool, options: HydrationOptions = {}) { this.pool = pool; this.workerId = `hydration-${uuidv4().slice(0, 8)}`; this.lockManager = new HydrationLockManager(pool, this.workerId); this.options = { dryRun: false, batchSize: DEFAULT_BATCH_SIZE, maxRetries: DEFAULT_MAX_RETRIES, lockTimeoutMs: DEFAULT_LOCK_TIMEOUT_MS, ...options, }; } /** * Process a single payload */ async processPayload(payload: RawPayload): Promise { const startedAt = new Date(); const errors: string[] = []; try { // Get normalizer for this platform const normalizer = getNormalizer(payload.platform); if (!normalizer) { throw new Error(`No normalizer found for platform: ${payload.platform}`); } // Validate payload const validation = normalizer.validatePayload(payload.raw_json); if (!validation.valid) { errors.push(...validation.errors); if (errors.length > 0 && !payload.raw_json) { throw new Error(`Invalid payload: ${errors.join(', ')}`); } } // Normalize const normResult = normalizer.normalize(payload); if (normResult.errors.length > 0) { errors.push(...normResult.errors.map((e) => `${e.field}: ${e.message}`)); } if (normResult.products.length === 0) { console.warn(`[HydrationWorker] No products in payload ${payload.id}`); } // Hydrate to canonical tables const hydrateResult = await hydrateToCanonical( this.pool, payload.dispensary_id, normResult, payload.crawl_run_id, { dryRun: this.options.dryRun } ); // Mark as processed if (!this.options.dryRun) { await markPayloadProcessed(this.pool, payload.id); } const finishedAt = new Date(); console.log( `[HydrationWorker] ${this.options.dryRun ? '[DryRun] ' : ''}Processed payload ${payload.id}: ` + `${hydrateResult.productsNew} new, ${hydrateResult.productsUpdated} updated, ` + `${hydrateResult.productsDiscontinued} discontinued, ${hydrateResult.snapshotsCreated} snapshots, ` + `${hydrateResult.variantsUpserted} variants (${hydrateResult.variantSnapshotsCreated} variant snapshots)` ); return { success: true, payloadId: payload.id, dispensaryId: payload.dispensary_id, productsUpserted: hydrateResult.productsUpserted, productsNew: hydrateResult.productsNew, productsUpdated: hydrateResult.productsUpdated, productsDiscontinued: hydrateResult.productsDiscontinued, snapshotsCreated: hydrateResult.snapshotsCreated, brandsCreated: hydrateResult.brandsCreated, categoriesCreated: 0, errors, startedAt, finishedAt, durationMs: finishedAt.getTime() - startedAt.getTime(), }; } catch (error: any) { const finishedAt = new Date(); errors.push(error.message); // Mark as failed if (!this.options.dryRun) { await markPayloadFailed(this.pool, payload.id, error.message); } console.error(`[HydrationWorker] Failed to process payload ${payload.id}:`, error.message); return { success: false, payloadId: payload.id, dispensaryId: payload.dispensary_id, productsUpserted: 0, productsNew: 0, productsUpdated: 0, productsDiscontinued: 0, snapshotsCreated: 0, brandsCreated: 0, categoriesCreated: 0, errors, startedAt, finishedAt, durationMs: finishedAt.getTime() - startedAt.getTime(), }; } } /** * Process a batch of payloads */ async processBatch( platform?: string ): Promise { const startTime = Date.now(); const errors: Array<{ payloadId: string; error: string }> = []; let payloadsProcessed = 0; let payloadsFailed = 0; let totalProductsUpserted = 0; let totalSnapshotsCreated = 0; let totalBrandsCreated = 0; // Acquire lock const lockAcquired = await this.lockManager.acquireLock( LOCK_NAMES.HYDRATION_BATCH, this.options.lockTimeoutMs ); if (!lockAcquired) { console.log('[HydrationWorker] Could not acquire batch lock, skipping'); return { payloadsProcessed: 0, payloadsFailed: 0, totalProductsUpserted: 0, totalSnapshotsCreated: 0, totalBrandsCreated: 0, errors: [], durationMs: Date.now() - startTime, }; } try { // Create hydration run record let runId: number | null = null; if (!this.options.dryRun) { const result = await this.pool.query( `INSERT INTO hydration_runs (worker_id, started_at, status) VALUES ($1, NOW(), 'running') RETURNING id`, [this.workerId] ); runId = result.rows[0].id; } // Get unprocessed payloads const payloads = await getUnprocessedPayloads(this.pool, { limit: this.options.batchSize, platform, maxAttempts: this.options.maxRetries, }); console.log(`[HydrationWorker] Processing ${payloads.length} payloads`); // Process each payload for (const payload of payloads) { const result = await this.processPayload(payload); if (result.success) { payloadsProcessed++; totalProductsUpserted += result.productsUpserted; totalSnapshotsCreated += result.snapshotsCreated; totalBrandsCreated += result.brandsCreated; } else { payloadsFailed++; if (result.errors.length > 0) { errors.push({ payloadId: payload.id, error: result.errors.join('; '), }); } } } // Update hydration run record if (!this.options.dryRun && runId) { await this.pool.query( `UPDATE hydration_runs SET finished_at = NOW(), status = $2, payloads_processed = $3, products_upserted = $4, snapshots_created = $5, brands_created = $6, errors_count = $7 WHERE id = $1`, [ runId, payloadsFailed > 0 ? 'completed_with_errors' : 'completed', payloadsProcessed, totalProductsUpserted, totalSnapshotsCreated, totalBrandsCreated, payloadsFailed, ] ); } console.log( `[HydrationWorker] Batch complete: ${payloadsProcessed} processed, ${payloadsFailed} failed` ); return { payloadsProcessed, payloadsFailed, totalProductsUpserted, totalSnapshotsCreated, totalBrandsCreated, errors, durationMs: Date.now() - startTime, }; } finally { await this.lockManager.releaseLock(LOCK_NAMES.HYDRATION_BATCH); } } /** * Run continuous hydration loop */ async runLoop(intervalMs: number = 30000): Promise { this.isRunning = true; console.log(`[HydrationWorker] Starting loop (interval: ${intervalMs}ms)`); while (this.isRunning) { try { const result = await this.processBatch(); if (result.payloadsProcessed === 0) { // No work to do, wait before checking again await new Promise((resolve) => setTimeout(resolve, intervalMs)); } } catch (error: any) { console.error('[HydrationWorker] Loop error:', error.message); await new Promise((resolve) => setTimeout(resolve, intervalMs)); } } await this.lockManager.releaseAllLocks(); console.log('[HydrationWorker] Loop stopped'); } /** * Stop the hydration loop */ stop(): void { this.isRunning = false; } /** * Get worker ID */ getWorkerId(): string { return this.workerId; } } // ============================================================ // STANDALONE FUNCTIONS // ============================================================ /** * Run a single hydration batch (for cron jobs) */ export async function runHydrationBatch( pool: Pool, options: HydrationOptions = {} ): Promise { const worker = new HydrationWorker(pool, options); return worker.processBatch(); } /** * Process a specific payload by ID */ export async function processPayloadById( pool: Pool, payloadId: string, options: HydrationOptions = {} ): Promise { const result = await pool.query( `SELECT * FROM raw_payloads WHERE id = $1`, [payloadId] ); if (result.rows.length === 0) { throw new Error(`Payload not found: ${payloadId}`); } const worker = new HydrationWorker(pool, options); return worker.processPayload(result.rows[0]); } /** * Reprocess failed payloads */ export async function reprocessFailedPayloads( pool: Pool, options: HydrationOptions = {} ): Promise { // Reset failed payloads for reprocessing await pool.query( `UPDATE raw_payloads SET hydration_attempts = 0, hydration_error = NULL WHERE processed = FALSE AND hydration_error IS NOT NULL` ); const worker = new HydrationWorker(pool, options); return worker.processBatch(); }