Files
cannaiq/backend/src/hydration/worker.ts
Kelly 2f483b3084 feat: SEO template library, discovery pipeline, and orchestrator enhancements
## SEO Template Library
- Add complete template library with 7 page types (state, city, category, brand, product, search, regeneration)
- Add Template Library tab in SEO Orchestrator with accordion-based editors
- Add template preview, validation, and variable injection engine
- Add API endpoints: /api/seo/templates, preview, validate, generate, regenerate

## Discovery Pipeline
- Add promotion.ts for discovery location validation and promotion
- Add discover-all-states.ts script for multi-state discovery
- Add promotion log migration (067)
- Enhance discovery routes and types

## Orchestrator & Admin
- Add crawl_enabled filter to stores page
- Add API permissions page
- Add job queue management
- Add price analytics routes
- Add markets and intelligence routes
- Enhance dashboard and worker monitoring

## Infrastructure
- Add migrations for worker definitions, SEO settings, field alignment
- Add canonical pipeline for scraper v2
- Update hydration and sync orchestrator
- Enhance multi-state query service

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-09 00:05:34 -07:00

372 lines
10 KiB
TypeScript

/**
* Hydration Worker
*
* Processes raw payloads and hydrates them to canonical tables.
* Features:
* - Distributed locking to prevent double-processing
* - Batch processing for efficiency
* - Automatic retry with backoff
* - Dry-run mode for testing
*/
import { Pool } from 'pg';
import { v4 as uuidv4 } from 'uuid';
import {
RawPayload,
HydrationOptions,
HydrationResult,
HydrationBatchResult,
} from './types';
import { getNormalizer } from './normalizers';
import {
getUnprocessedPayloads,
markPayloadProcessed,
markPayloadFailed,
} from './payload-store';
import { HydrationLockManager, LOCK_NAMES } from './locking';
import { hydrateToCanonical } from './canonical-upsert';
const DEFAULT_BATCH_SIZE = 50;
const DEFAULT_MAX_RETRIES = 3;
const DEFAULT_LOCK_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutes
// ============================================================
// HYDRATION WORKER CLASS
// ============================================================
export class HydrationWorker {
private pool: Pool;
private lockManager: HydrationLockManager;
private workerId: string;
private options: HydrationOptions;
private isRunning: boolean = false;
constructor(pool: Pool, options: HydrationOptions = {}) {
this.pool = pool;
this.workerId = `hydration-${uuidv4().slice(0, 8)}`;
this.lockManager = new HydrationLockManager(pool, this.workerId);
this.options = {
dryRun: false,
batchSize: DEFAULT_BATCH_SIZE,
maxRetries: DEFAULT_MAX_RETRIES,
lockTimeoutMs: DEFAULT_LOCK_TIMEOUT_MS,
...options,
};
}
/**
* Process a single payload
*/
async processPayload(payload: RawPayload): Promise<HydrationResult> {
const startedAt = new Date();
const errors: string[] = [];
try {
// Get normalizer for this platform
const normalizer = getNormalizer(payload.platform);
if (!normalizer) {
throw new Error(`No normalizer found for platform: ${payload.platform}`);
}
// Validate payload
const validation = normalizer.validatePayload(payload.raw_json);
if (!validation.valid) {
errors.push(...validation.errors);
if (errors.length > 0 && !payload.raw_json) {
throw new Error(`Invalid payload: ${errors.join(', ')}`);
}
}
// Normalize
const normResult = normalizer.normalize(payload);
if (normResult.errors.length > 0) {
errors.push(...normResult.errors.map((e) => `${e.field}: ${e.message}`));
}
if (normResult.products.length === 0) {
console.warn(`[HydrationWorker] No products in payload ${payload.id}`);
}
// Hydrate to canonical tables
const hydrateResult = await hydrateToCanonical(
this.pool,
payload.dispensary_id,
normResult,
payload.crawl_run_id,
{ dryRun: this.options.dryRun }
);
// Mark as processed
if (!this.options.dryRun) {
await markPayloadProcessed(this.pool, payload.id);
}
const finishedAt = new Date();
console.log(
`[HydrationWorker] ${this.options.dryRun ? '[DryRun] ' : ''}Processed payload ${payload.id}: ` +
`${hydrateResult.productsNew} new, ${hydrateResult.productsUpdated} updated, ` +
`${hydrateResult.productsDiscontinued} discontinued, ${hydrateResult.snapshotsCreated} snapshots, ` +
`${hydrateResult.variantsUpserted} variants (${hydrateResult.variantSnapshotsCreated} variant snapshots)`
);
return {
success: true,
payloadId: payload.id,
dispensaryId: payload.dispensary_id,
productsUpserted: hydrateResult.productsUpserted,
productsNew: hydrateResult.productsNew,
productsUpdated: hydrateResult.productsUpdated,
productsDiscontinued: hydrateResult.productsDiscontinued,
snapshotsCreated: hydrateResult.snapshotsCreated,
brandsCreated: hydrateResult.brandsCreated,
categoriesCreated: 0,
errors,
startedAt,
finishedAt,
durationMs: finishedAt.getTime() - startedAt.getTime(),
};
} catch (error: any) {
const finishedAt = new Date();
errors.push(error.message);
// Mark as failed
if (!this.options.dryRun) {
await markPayloadFailed(this.pool, payload.id, error.message);
}
console.error(`[HydrationWorker] Failed to process payload ${payload.id}:`, error.message);
return {
success: false,
payloadId: payload.id,
dispensaryId: payload.dispensary_id,
productsUpserted: 0,
productsNew: 0,
productsUpdated: 0,
productsDiscontinued: 0,
snapshotsCreated: 0,
brandsCreated: 0,
categoriesCreated: 0,
errors,
startedAt,
finishedAt,
durationMs: finishedAt.getTime() - startedAt.getTime(),
};
}
}
/**
* Process a batch of payloads
*/
async processBatch(
platform?: string
): Promise<HydrationBatchResult> {
const startTime = Date.now();
const errors: Array<{ payloadId: string; error: string }> = [];
let payloadsProcessed = 0;
let payloadsFailed = 0;
let totalProductsUpserted = 0;
let totalSnapshotsCreated = 0;
let totalBrandsCreated = 0;
// Acquire lock
const lockAcquired = await this.lockManager.acquireLock(
LOCK_NAMES.HYDRATION_BATCH,
this.options.lockTimeoutMs
);
if (!lockAcquired) {
console.log('[HydrationWorker] Could not acquire batch lock, skipping');
return {
payloadsProcessed: 0,
payloadsFailed: 0,
totalProductsUpserted: 0,
totalSnapshotsCreated: 0,
totalBrandsCreated: 0,
errors: [],
durationMs: Date.now() - startTime,
};
}
try {
// Create hydration run record
let runId: number | null = null;
if (!this.options.dryRun) {
const result = await this.pool.query(
`INSERT INTO hydration_runs (worker_id, started_at, status)
VALUES ($1, NOW(), 'running')
RETURNING id`,
[this.workerId]
);
runId = result.rows[0].id;
}
// Get unprocessed payloads
const payloads = await getUnprocessedPayloads(this.pool, {
limit: this.options.batchSize,
platform,
maxAttempts: this.options.maxRetries,
});
console.log(`[HydrationWorker] Processing ${payloads.length} payloads`);
// Process each payload
for (const payload of payloads) {
const result = await this.processPayload(payload);
if (result.success) {
payloadsProcessed++;
totalProductsUpserted += result.productsUpserted;
totalSnapshotsCreated += result.snapshotsCreated;
totalBrandsCreated += result.brandsCreated;
} else {
payloadsFailed++;
if (result.errors.length > 0) {
errors.push({
payloadId: payload.id,
error: result.errors.join('; '),
});
}
}
}
// Update hydration run record
if (!this.options.dryRun && runId) {
await this.pool.query(
`UPDATE hydration_runs SET
finished_at = NOW(),
status = $2,
payloads_processed = $3,
products_upserted = $4,
snapshots_created = $5,
brands_created = $6,
errors_count = $7
WHERE id = $1`,
[
runId,
payloadsFailed > 0 ? 'completed_with_errors' : 'completed',
payloadsProcessed,
totalProductsUpserted,
totalSnapshotsCreated,
totalBrandsCreated,
payloadsFailed,
]
);
}
console.log(
`[HydrationWorker] Batch complete: ${payloadsProcessed} processed, ${payloadsFailed} failed`
);
return {
payloadsProcessed,
payloadsFailed,
totalProductsUpserted,
totalSnapshotsCreated,
totalBrandsCreated,
errors,
durationMs: Date.now() - startTime,
};
} finally {
await this.lockManager.releaseLock(LOCK_NAMES.HYDRATION_BATCH);
}
}
/**
* Run continuous hydration loop
*/
async runLoop(intervalMs: number = 30000): Promise<void> {
this.isRunning = true;
console.log(`[HydrationWorker] Starting loop (interval: ${intervalMs}ms)`);
while (this.isRunning) {
try {
const result = await this.processBatch();
if (result.payloadsProcessed === 0) {
// No work to do, wait before checking again
await new Promise((resolve) => setTimeout(resolve, intervalMs));
}
} catch (error: any) {
console.error('[HydrationWorker] Loop error:', error.message);
await new Promise((resolve) => setTimeout(resolve, intervalMs));
}
}
await this.lockManager.releaseAllLocks();
console.log('[HydrationWorker] Loop stopped');
}
/**
* Stop the hydration loop
*/
stop(): void {
this.isRunning = false;
}
/**
* Get worker ID
*/
getWorkerId(): string {
return this.workerId;
}
}
// ============================================================
// STANDALONE FUNCTIONS
// ============================================================
/**
* Run a single hydration batch (for cron jobs)
*/
export async function runHydrationBatch(
pool: Pool,
options: HydrationOptions = {}
): Promise<HydrationBatchResult> {
const worker = new HydrationWorker(pool, options);
return worker.processBatch();
}
/**
* Process a specific payload by ID
*/
export async function processPayloadById(
pool: Pool,
payloadId: string,
options: HydrationOptions = {}
): Promise<HydrationResult> {
const result = await pool.query(
`SELECT * FROM raw_payloads WHERE id = $1`,
[payloadId]
);
if (result.rows.length === 0) {
throw new Error(`Payload not found: ${payloadId}`);
}
const worker = new HydrationWorker(pool, options);
return worker.processPayload(result.rows[0]);
}
/**
* Reprocess failed payloads
*/
export async function reprocessFailedPayloads(
pool: Pool,
options: HydrationOptions = {}
): Promise<HydrationBatchResult> {
// Reset failed payloads for reprocessing
await pool.query(
`UPDATE raw_payloads
SET hydration_attempts = 0,
hydration_error = NULL
WHERE processed = FALSE
AND hydration_error IS NOT NULL`
);
const worker = new HydrationWorker(pool, options);
return worker.processBatch();
}