feat: Add v2 architecture with multi-state support and orchestrator services
Major additions: - Multi-state expansion: states table, StateSelector, NationalDashboard, StateHeatmap, CrossStateCompare - Orchestrator services: trace service, error taxonomy, retry manager, proxy rotator - Discovery system: dutchie discovery service, geo validation, city seeding scripts - Analytics infrastructure: analytics v2 routes, brand/pricing/stores intelligence pages - Local development: setup-local.sh starts all 5 services (postgres, backend, cannaiq, findadispo, findagram) - Migrations 037-056: crawler profiles, states, analytics indexes, worker metadata Frontend pages added: - Discovery, ChainsDashboard, IntelligenceBrands, IntelligencePricing, IntelligenceStores - StateHeatmap, CrossStateCompare, SyncInfoPanel Components added: - StateSelector, OrchestratorTraceModal, WorkflowStepper 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
370
backend/src/hydration/worker.ts
Normal file
370
backend/src/hydration/worker.ts
Normal file
@@ -0,0 +1,370 @@
|
||||
/**
|
||||
* Hydration Worker
|
||||
*
|
||||
* Processes raw payloads and hydrates them to canonical tables.
|
||||
* Features:
|
||||
* - Distributed locking to prevent double-processing
|
||||
* - Batch processing for efficiency
|
||||
* - Automatic retry with backoff
|
||||
* - Dry-run mode for testing
|
||||
*/
|
||||
|
||||
import { Pool } from 'pg';
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
import {
|
||||
RawPayload,
|
||||
HydrationOptions,
|
||||
HydrationResult,
|
||||
HydrationBatchResult,
|
||||
} from './types';
|
||||
import { getNormalizer } from './normalizers';
|
||||
import {
|
||||
getUnprocessedPayloads,
|
||||
markPayloadProcessed,
|
||||
markPayloadFailed,
|
||||
} from './payload-store';
|
||||
import { HydrationLockManager, LOCK_NAMES } from './locking';
|
||||
import { hydrateToCanonical } from './canonical-upsert';
|
||||
|
||||
const DEFAULT_BATCH_SIZE = 50;
|
||||
const DEFAULT_MAX_RETRIES = 3;
|
||||
const DEFAULT_LOCK_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutes
|
||||
|
||||
// ============================================================
|
||||
// HYDRATION WORKER CLASS
|
||||
// ============================================================
|
||||
|
||||
export class HydrationWorker {
|
||||
private pool: Pool;
|
||||
private lockManager: HydrationLockManager;
|
||||
private workerId: string;
|
||||
private options: HydrationOptions;
|
||||
private isRunning: boolean = false;
|
||||
|
||||
constructor(pool: Pool, options: HydrationOptions = {}) {
|
||||
this.pool = pool;
|
||||
this.workerId = `hydration-${uuidv4().slice(0, 8)}`;
|
||||
this.lockManager = new HydrationLockManager(pool, this.workerId);
|
||||
this.options = {
|
||||
dryRun: false,
|
||||
batchSize: DEFAULT_BATCH_SIZE,
|
||||
maxRetries: DEFAULT_MAX_RETRIES,
|
||||
lockTimeoutMs: DEFAULT_LOCK_TIMEOUT_MS,
|
||||
...options,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a single payload
|
||||
*/
|
||||
async processPayload(payload: RawPayload): Promise<HydrationResult> {
|
||||
const startedAt = new Date();
|
||||
const errors: string[] = [];
|
||||
|
||||
try {
|
||||
// Get normalizer for this platform
|
||||
const normalizer = getNormalizer(payload.platform);
|
||||
if (!normalizer) {
|
||||
throw new Error(`No normalizer found for platform: ${payload.platform}`);
|
||||
}
|
||||
|
||||
// Validate payload
|
||||
const validation = normalizer.validatePayload(payload.raw_json);
|
||||
if (!validation.valid) {
|
||||
errors.push(...validation.errors);
|
||||
if (errors.length > 0 && !payload.raw_json) {
|
||||
throw new Error(`Invalid payload: ${errors.join(', ')}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Normalize
|
||||
const normResult = normalizer.normalize(payload);
|
||||
|
||||
if (normResult.errors.length > 0) {
|
||||
errors.push(...normResult.errors.map((e) => `${e.field}: ${e.message}`));
|
||||
}
|
||||
|
||||
if (normResult.products.length === 0) {
|
||||
console.warn(`[HydrationWorker] No products in payload ${payload.id}`);
|
||||
}
|
||||
|
||||
// Hydrate to canonical tables
|
||||
const hydrateResult = await hydrateToCanonical(
|
||||
this.pool,
|
||||
payload.dispensary_id,
|
||||
normResult,
|
||||
payload.crawl_run_id,
|
||||
{ dryRun: this.options.dryRun }
|
||||
);
|
||||
|
||||
// Mark as processed
|
||||
if (!this.options.dryRun) {
|
||||
await markPayloadProcessed(this.pool, payload.id);
|
||||
}
|
||||
|
||||
const finishedAt = new Date();
|
||||
|
||||
console.log(
|
||||
`[HydrationWorker] ${this.options.dryRun ? '[DryRun] ' : ''}Processed payload ${payload.id}: ` +
|
||||
`${hydrateResult.productsNew} new, ${hydrateResult.productsUpdated} updated, ` +
|
||||
`${hydrateResult.productsDiscontinued} discontinued, ${hydrateResult.snapshotsCreated} snapshots`
|
||||
);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
payloadId: payload.id,
|
||||
dispensaryId: payload.dispensary_id,
|
||||
productsUpserted: hydrateResult.productsUpserted,
|
||||
productsNew: hydrateResult.productsNew,
|
||||
productsUpdated: hydrateResult.productsUpdated,
|
||||
productsDiscontinued: hydrateResult.productsDiscontinued,
|
||||
snapshotsCreated: hydrateResult.snapshotsCreated,
|
||||
brandsCreated: hydrateResult.brandsCreated,
|
||||
categoriesCreated: 0,
|
||||
errors,
|
||||
startedAt,
|
||||
finishedAt,
|
||||
durationMs: finishedAt.getTime() - startedAt.getTime(),
|
||||
};
|
||||
} catch (error: any) {
|
||||
const finishedAt = new Date();
|
||||
errors.push(error.message);
|
||||
|
||||
// Mark as failed
|
||||
if (!this.options.dryRun) {
|
||||
await markPayloadFailed(this.pool, payload.id, error.message);
|
||||
}
|
||||
|
||||
console.error(`[HydrationWorker] Failed to process payload ${payload.id}:`, error.message);
|
||||
|
||||
return {
|
||||
success: false,
|
||||
payloadId: payload.id,
|
||||
dispensaryId: payload.dispensary_id,
|
||||
productsUpserted: 0,
|
||||
productsNew: 0,
|
||||
productsUpdated: 0,
|
||||
productsDiscontinued: 0,
|
||||
snapshotsCreated: 0,
|
||||
brandsCreated: 0,
|
||||
categoriesCreated: 0,
|
||||
errors,
|
||||
startedAt,
|
||||
finishedAt,
|
||||
durationMs: finishedAt.getTime() - startedAt.getTime(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a batch of payloads
|
||||
*/
|
||||
async processBatch(
|
||||
platform?: string
|
||||
): Promise<HydrationBatchResult> {
|
||||
const startTime = Date.now();
|
||||
const errors: Array<{ payloadId: string; error: string }> = [];
|
||||
let payloadsProcessed = 0;
|
||||
let payloadsFailed = 0;
|
||||
let totalProductsUpserted = 0;
|
||||
let totalSnapshotsCreated = 0;
|
||||
let totalBrandsCreated = 0;
|
||||
|
||||
// Acquire lock
|
||||
const lockAcquired = await this.lockManager.acquireLock(
|
||||
LOCK_NAMES.HYDRATION_BATCH,
|
||||
this.options.lockTimeoutMs
|
||||
);
|
||||
|
||||
if (!lockAcquired) {
|
||||
console.log('[HydrationWorker] Could not acquire batch lock, skipping');
|
||||
return {
|
||||
payloadsProcessed: 0,
|
||||
payloadsFailed: 0,
|
||||
totalProductsUpserted: 0,
|
||||
totalSnapshotsCreated: 0,
|
||||
totalBrandsCreated: 0,
|
||||
errors: [],
|
||||
durationMs: Date.now() - startTime,
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
// Create hydration run record
|
||||
let runId: number | null = null;
|
||||
if (!this.options.dryRun) {
|
||||
const result = await this.pool.query(
|
||||
`INSERT INTO hydration_runs (worker_id, started_at, status)
|
||||
VALUES ($1, NOW(), 'running')
|
||||
RETURNING id`,
|
||||
[this.workerId]
|
||||
);
|
||||
runId = result.rows[0].id;
|
||||
}
|
||||
|
||||
// Get unprocessed payloads
|
||||
const payloads = await getUnprocessedPayloads(this.pool, {
|
||||
limit: this.options.batchSize,
|
||||
platform,
|
||||
maxAttempts: this.options.maxRetries,
|
||||
});
|
||||
|
||||
console.log(`[HydrationWorker] Processing ${payloads.length} payloads`);
|
||||
|
||||
// Process each payload
|
||||
for (const payload of payloads) {
|
||||
const result = await this.processPayload(payload);
|
||||
|
||||
if (result.success) {
|
||||
payloadsProcessed++;
|
||||
totalProductsUpserted += result.productsUpserted;
|
||||
totalSnapshotsCreated += result.snapshotsCreated;
|
||||
totalBrandsCreated += result.brandsCreated;
|
||||
} else {
|
||||
payloadsFailed++;
|
||||
if (result.errors.length > 0) {
|
||||
errors.push({
|
||||
payloadId: payload.id,
|
||||
error: result.errors.join('; '),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update hydration run record
|
||||
if (!this.options.dryRun && runId) {
|
||||
await this.pool.query(
|
||||
`UPDATE hydration_runs SET
|
||||
finished_at = NOW(),
|
||||
status = $2,
|
||||
payloads_processed = $3,
|
||||
products_upserted = $4,
|
||||
snapshots_created = $5,
|
||||
brands_created = $6,
|
||||
errors_count = $7
|
||||
WHERE id = $1`,
|
||||
[
|
||||
runId,
|
||||
payloadsFailed > 0 ? 'completed_with_errors' : 'completed',
|
||||
payloadsProcessed,
|
||||
totalProductsUpserted,
|
||||
totalSnapshotsCreated,
|
||||
totalBrandsCreated,
|
||||
payloadsFailed,
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
console.log(
|
||||
`[HydrationWorker] Batch complete: ${payloadsProcessed} processed, ${payloadsFailed} failed`
|
||||
);
|
||||
|
||||
return {
|
||||
payloadsProcessed,
|
||||
payloadsFailed,
|
||||
totalProductsUpserted,
|
||||
totalSnapshotsCreated,
|
||||
totalBrandsCreated,
|
||||
errors,
|
||||
durationMs: Date.now() - startTime,
|
||||
};
|
||||
} finally {
|
||||
await this.lockManager.releaseLock(LOCK_NAMES.HYDRATION_BATCH);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run continuous hydration loop
|
||||
*/
|
||||
async runLoop(intervalMs: number = 30000): Promise<void> {
|
||||
this.isRunning = true;
|
||||
console.log(`[HydrationWorker] Starting loop (interval: ${intervalMs}ms)`);
|
||||
|
||||
while (this.isRunning) {
|
||||
try {
|
||||
const result = await this.processBatch();
|
||||
|
||||
if (result.payloadsProcessed === 0) {
|
||||
// No work to do, wait before checking again
|
||||
await new Promise((resolve) => setTimeout(resolve, intervalMs));
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error('[HydrationWorker] Loop error:', error.message);
|
||||
await new Promise((resolve) => setTimeout(resolve, intervalMs));
|
||||
}
|
||||
}
|
||||
|
||||
await this.lockManager.releaseAllLocks();
|
||||
console.log('[HydrationWorker] Loop stopped');
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the hydration loop
|
||||
*/
|
||||
stop(): void {
|
||||
this.isRunning = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get worker ID
|
||||
*/
|
||||
getWorkerId(): string {
|
||||
return this.workerId;
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// STANDALONE FUNCTIONS
|
||||
// ============================================================
|
||||
|
||||
/**
|
||||
* Run a single hydration batch (for cron jobs)
|
||||
*/
|
||||
export async function runHydrationBatch(
|
||||
pool: Pool,
|
||||
options: HydrationOptions = {}
|
||||
): Promise<HydrationBatchResult> {
|
||||
const worker = new HydrationWorker(pool, options);
|
||||
return worker.processBatch();
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a specific payload by ID
|
||||
*/
|
||||
export async function processPayloadById(
|
||||
pool: Pool,
|
||||
payloadId: string,
|
||||
options: HydrationOptions = {}
|
||||
): Promise<HydrationResult> {
|
||||
const result = await pool.query(
|
||||
`SELECT * FROM raw_payloads WHERE id = $1`,
|
||||
[payloadId]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
throw new Error(`Payload not found: ${payloadId}`);
|
||||
}
|
||||
|
||||
const worker = new HydrationWorker(pool, options);
|
||||
return worker.processPayload(result.rows[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reprocess failed payloads
|
||||
*/
|
||||
export async function reprocessFailedPayloads(
|
||||
pool: Pool,
|
||||
options: HydrationOptions = {}
|
||||
): Promise<HydrationBatchResult> {
|
||||
// Reset failed payloads for reprocessing
|
||||
await pool.query(
|
||||
`UPDATE raw_payloads
|
||||
SET hydration_attempts = 0,
|
||||
hydration_error = NULL
|
||||
WHERE processed = FALSE
|
||||
AND hydration_error IS NOT NULL`
|
||||
);
|
||||
|
||||
const worker = new HydrationWorker(pool, options);
|
||||
return worker.processBatch();
|
||||
}
|
||||
Reference in New Issue
Block a user