Files
cannaiq/backend/src/_deprecated/canonical-hydration/hydration-service.ts
Kelly a35976b9e9 chore: Clean up deprecated code and docs
- Move deprecated directories to src/_deprecated/:
  - hydration/ (old pipeline approach)
  - scraper-v2/ (old Puppeteer scraper)
  - canonical-hydration/ (merged into tasks)
  - Unused services: availability, crawler-logger, geolocation, etc
  - Unused utils: age-gate-playwright, HomepageValidator, stealthBrowser

- Archive outdated docs to docs/_archive/:
  - ANALYTICS_RUNBOOK.md
  - ANALYTICS_V2_EXAMPLES.md
  - BRAND_INTELLIGENCE_API.md
  - CRAWL_PIPELINE.md
  - TASK_WORKFLOW_2024-12-10.md
  - WORKER_TASK_ARCHITECTURE.md
  - ORGANIC_SCRAPING_GUIDE.md

- Add docs/CODEBASE_MAP.md as single source of truth
- Add warning files to deprecated/archived directories
- Slim down CLAUDE.md to essential rules only

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 22:17:40 -07:00

562 lines
18 KiB
TypeScript

/**
* CanonicalHydrationService
* Orchestrates the full hydration pipeline from dutchie_* to canonical tables
*/
import { Pool } from 'pg';
import { CrawlRunRecorder } from './crawl-run-recorder';
import { StoreProductNormalizer } from './store-product-normalizer';
import { SnapshotWriter } from './snapshot-writer';
import { HydrationOptions, HydrationResult, ServiceContext, SourceJob } from './types';
export class CanonicalHydrationService {
private pool: Pool;
private log: (message: string) => void;
private crawlRunRecorder: CrawlRunRecorder;
private productNormalizer: StoreProductNormalizer;
private snapshotWriter: SnapshotWriter;
constructor(ctx: ServiceContext) {
this.pool = ctx.pool;
this.log = ctx.logger || console.log;
this.crawlRunRecorder = new CrawlRunRecorder(ctx);
this.productNormalizer = new StoreProductNormalizer(ctx);
this.snapshotWriter = new SnapshotWriter(ctx);
}
/**
* Run the full hydration pipeline
* Supports both backfill (historical) and incremental (ongoing) modes
*/
async hydrate(options: HydrationOptions): Promise<HydrationResult> {
const startTime = Date.now();
const result: HydrationResult = {
crawlRunsCreated: 0,
crawlRunsSkipped: 0,
productsUpserted: 0,
snapshotsWritten: 0,
errors: [],
durationMs: 0,
};
this.log(`Starting hydration in ${options.mode} mode`);
try {
if (options.mode === 'backfill') {
await this.runBackfill(options, result);
} else {
await this.runIncremental(options, result);
}
} catch (err: any) {
result.errors.push(`Fatal error: ${err.message}`);
this.log(`Hydration failed: ${err.message}`);
}
result.durationMs = Date.now() - startTime;
this.log(`Hydration completed in ${result.durationMs}ms: ${JSON.stringify({
crawlRunsCreated: result.crawlRunsCreated,
crawlRunsSkipped: result.crawlRunsSkipped,
productsUpserted: result.productsUpserted,
snapshotsWritten: result.snapshotsWritten,
errors: result.errors.length,
})}`);
return result;
}
/**
* Backfill mode: Process historical data from source tables
*/
private async runBackfill(options: HydrationOptions, result: HydrationResult): Promise<void> {
const batchSize = options.batchSize || 50;
// Get source jobs to process
const sourceJobs = await this.crawlRunRecorder.getSourceJobsForBackfill(
options.startDate,
options.endDate,
options.dispensaryId,
1000 // Max jobs to process
);
this.log(`Found ${sourceJobs.length} source jobs to backfill`);
// Group jobs by dispensary for efficient processing
const jobsByDispensary = this.groupJobsByDispensary(sourceJobs);
for (const [dispensaryId, jobs] of jobsByDispensary) {
this.log(`Processing dispensary ${dispensaryId} (${jobs.length} jobs)`);
try {
// Step 1: Upsert products for this dispensary
if (!options.dryRun) {
const productResult = await this.productNormalizer.upsertProductsForDispensary(dispensaryId);
result.productsUpserted += productResult.upserted;
if (productResult.errors.length > 0) {
result.errors.push(...productResult.errors.map(e => `Dispensary ${dispensaryId}: ${e}`));
}
}
// Get store_product_id map for snapshot writing
const storeProductIdMap = await this.productNormalizer.getStoreProductIdMap(dispensaryId);
// Step 2: Record crawl runs and write snapshots for each job
for (const job of jobs) {
try {
await this.processJob(job, storeProductIdMap, result, options.dryRun);
} catch (err: any) {
result.errors.push(`Job ${job.id}: ${err.message}`);
}
}
} catch (err: any) {
result.errors.push(`Dispensary ${dispensaryId}: ${err.message}`);
}
}
}
/**
* Incremental mode: Process only unhydrated jobs
*/
private async runIncremental(options: HydrationOptions, result: HydrationResult): Promise<void> {
const limit = options.batchSize || 100;
// Get unhydrated jobs
const unhydratedJobs = await this.crawlRunRecorder.getUnhydratedJobs(
options.dispensaryId,
options.startDate,
limit
);
this.log(`Found ${unhydratedJobs.length} unhydrated jobs`);
// Group by dispensary
const jobsByDispensary = this.groupJobsByDispensary(unhydratedJobs);
for (const [dispensaryId, jobs] of jobsByDispensary) {
this.log(`Processing dispensary ${dispensaryId} (${jobs.length} jobs)`);
try {
// Step 1: Upsert products
if (!options.dryRun) {
const productResult = await this.productNormalizer.upsertProductsForDispensary(dispensaryId);
result.productsUpserted += productResult.upserted;
if (productResult.errors.length > 0) {
result.errors.push(...productResult.errors.map(e => `Dispensary ${dispensaryId}: ${e}`));
}
}
// Get store_product_id map
const storeProductIdMap = await this.productNormalizer.getStoreProductIdMap(dispensaryId);
// Step 2: Process each job
for (const job of jobs) {
try {
await this.processJob(job, storeProductIdMap, result, options.dryRun);
} catch (err: any) {
result.errors.push(`Job ${job.id}: ${err.message}`);
}
}
} catch (err: any) {
result.errors.push(`Dispensary ${dispensaryId}: ${err.message}`);
}
}
}
/**
* Process a single job: record crawl run and write snapshots
*/
private async processJob(
job: SourceJob,
storeProductIdMap: Map<string, number>,
result: HydrationResult,
dryRun?: boolean
): Promise<void> {
// Step 1: Record the crawl run
let crawlRunId: number | null = null;
if (!dryRun) {
crawlRunId = await this.crawlRunRecorder.recordCrawlRun(job);
if (crawlRunId) {
result.crawlRunsCreated++;
} else {
result.crawlRunsSkipped++;
return; // Skip snapshot writing if crawl run wasn't created
}
} else {
// In dry run, check if it would be created
const existingId = await this.crawlRunRecorder.getCrawlRunIdBySourceJob(
'dispensary_crawl_jobs',
job.id
);
if (existingId) {
result.crawlRunsSkipped++;
return;
}
result.crawlRunsCreated++;
return; // Skip snapshot writing in dry run
}
// Step 2: Write snapshots for this crawl run
if (crawlRunId && job.completed_at) {
const snapshotResult = await this.snapshotWriter.writeSnapshotsForCrawlRun(
crawlRunId,
job.dispensary_id,
storeProductIdMap,
job.completed_at
);
result.snapshotsWritten += snapshotResult.written;
if (snapshotResult.errors.length > 0) {
result.errors.push(...snapshotResult.errors);
}
// Update crawl_run with snapshots_written count
await this.crawlRunRecorder.updateSnapshotsWritten(crawlRunId, snapshotResult.written);
}
}
/**
* Hydrate a single dispensary (convenience method)
*/
async hydrateDispensary(
dispensaryId: number,
mode: 'backfill' | 'incremental' = 'incremental'
): Promise<HydrationResult> {
return this.hydrate({
mode,
dispensaryId,
});
}
/**
* Get hydration status for a dispensary
*/
async getHydrationStatus(dispensaryId: number): Promise<{
sourceJobs: number;
hydratedJobs: number;
unhydratedJobs: number;
sourceProducts: number;
storeProducts: number;
sourceSnapshots: number;
storeSnapshots: number;
}> {
const [sourceJobs, hydratedJobs, sourceProducts, storeProducts, sourceSnapshots, storeSnapshots] =
await Promise.all([
this.pool.query(
`SELECT COUNT(*) FROM dispensary_crawl_jobs
WHERE dispensary_id = $1 AND status = 'completed' AND job_type = 'dutchie_product_crawl'`,
[dispensaryId]
),
this.pool.query(
`SELECT COUNT(*) FROM crawl_runs
WHERE dispensary_id = $1 AND source_job_type = 'dispensary_crawl_jobs'`,
[dispensaryId]
),
this.pool.query(
`SELECT COUNT(*) FROM dutchie_products WHERE dispensary_id = $1`,
[dispensaryId]
),
this.pool.query(
`SELECT COUNT(*) FROM store_products WHERE dispensary_id = $1 AND provider = 'dutchie'`,
[dispensaryId]
),
this.pool.query(
`SELECT COUNT(*) FROM dutchie_product_snapshots WHERE dispensary_id = $1`,
[dispensaryId]
),
this.pool.query(
`SELECT COUNT(*) FROM store_product_snapshots WHERE dispensary_id = $1`,
[dispensaryId]
),
]);
const sourceJobCount = parseInt(sourceJobs.rows[0].count);
const hydratedJobCount = parseInt(hydratedJobs.rows[0].count);
return {
sourceJobs: sourceJobCount,
hydratedJobs: hydratedJobCount,
unhydratedJobs: sourceJobCount - hydratedJobCount,
sourceProducts: parseInt(sourceProducts.rows[0].count),
storeProducts: parseInt(storeProducts.rows[0].count),
sourceSnapshots: parseInt(sourceSnapshots.rows[0].count),
storeSnapshots: parseInt(storeSnapshots.rows[0].count),
};
}
/**
* Get overall hydration status
*/
async getOverallStatus(): Promise<{
totalSourceJobs: number;
totalHydratedJobs: number;
totalSourceProducts: number;
totalStoreProducts: number;
totalSourceSnapshots: number;
totalStoreSnapshots: number;
dispensariesWithData: number;
}> {
const [sourceJobs, hydratedJobs, sourceProducts, storeProducts, sourceSnapshots, storeSnapshots, dispensaries] =
await Promise.all([
this.pool.query(
`SELECT COUNT(*) FROM dispensary_crawl_jobs
WHERE status = 'completed' AND job_type = 'dutchie_product_crawl'`
),
this.pool.query(
`SELECT COUNT(*) FROM crawl_runs WHERE source_job_type = 'dispensary_crawl_jobs'`
),
this.pool.query(`SELECT COUNT(*) FROM dutchie_products`),
this.pool.query(`SELECT COUNT(*) FROM store_products WHERE provider = 'dutchie'`),
this.pool.query(`SELECT COUNT(*) FROM dutchie_product_snapshots`),
this.pool.query(`SELECT COUNT(*) FROM store_product_snapshots`),
this.pool.query(
`SELECT COUNT(DISTINCT dispensary_id) FROM dutchie_products`
),
]);
return {
totalSourceJobs: parseInt(sourceJobs.rows[0].count),
totalHydratedJobs: parseInt(hydratedJobs.rows[0].count),
totalSourceProducts: parseInt(sourceProducts.rows[0].count),
totalStoreProducts: parseInt(storeProducts.rows[0].count),
totalSourceSnapshots: parseInt(sourceSnapshots.rows[0].count),
totalStoreSnapshots: parseInt(storeSnapshots.rows[0].count),
dispensariesWithData: parseInt(dispensaries.rows[0].count),
};
}
/**
* Group jobs by dispensary ID
*/
private groupJobsByDispensary(jobs: SourceJob[]): Map<number, SourceJob[]> {
const map = new Map<number, SourceJob[]>();
for (const job of jobs) {
const list = map.get(job.dispensary_id) || [];
list.push(job);
map.set(job.dispensary_id, list);
}
return map;
}
/**
* Products-only hydration mode
* Used when there are no historical job records - creates synthetic crawl runs
* from current product data
*/
async hydrateProductsOnly(options: {
dispensaryId?: number;
dryRun?: boolean;
} = {}): Promise<HydrationResult> {
const startTime = Date.now();
const result: HydrationResult = {
crawlRunsCreated: 0,
crawlRunsSkipped: 0,
productsUpserted: 0,
snapshotsWritten: 0,
errors: [],
durationMs: 0,
};
this.log('Starting products-only hydration mode');
try {
// Get all dispensaries with products
let dispensaryIds: number[];
if (options.dispensaryId) {
dispensaryIds = [options.dispensaryId];
} else {
const dispResult = await this.pool.query(
'SELECT DISTINCT dispensary_id FROM dutchie_products ORDER BY dispensary_id'
);
dispensaryIds = dispResult.rows.map(r => r.dispensary_id);
}
this.log(`Processing ${dispensaryIds.length} dispensaries`);
for (const dispensaryId of dispensaryIds) {
try {
await this.hydrateDispensaryProductsOnly(dispensaryId, result, options.dryRun);
} catch (err: any) {
result.errors.push(`Dispensary ${dispensaryId}: ${err.message}`);
}
}
} catch (err: any) {
result.errors.push(`Fatal error: ${err.message}`);
}
result.durationMs = Date.now() - startTime;
this.log(`Products-only hydration completed in ${result.durationMs}ms: ${JSON.stringify({
crawlRunsCreated: result.crawlRunsCreated,
productsUpserted: result.productsUpserted,
snapshotsWritten: result.snapshotsWritten,
errors: result.errors.length,
})}`);
return result;
}
/**
* Hydrate a single dispensary in products-only mode
*/
private async hydrateDispensaryProductsOnly(
dispensaryId: number,
result: HydrationResult,
dryRun?: boolean
): Promise<void> {
// Get product count and timestamps for this dispensary
const statsResult = await this.pool.query(
`SELECT COUNT(*) as cnt, MIN(created_at) as min_date, MAX(updated_at) as max_date
FROM dutchie_products WHERE dispensary_id = $1`,
[dispensaryId]
);
const stats = statsResult.rows[0];
const productCount = parseInt(stats.cnt);
if (productCount === 0) {
this.log(`Dispensary ${dispensaryId}: No products, skipping`);
return;
}
this.log(`Dispensary ${dispensaryId}: ${productCount} products`);
// Step 1: Create synthetic crawl run
let crawlRunId: number | null = null;
const now = new Date();
if (!dryRun) {
// Check if we already have a synthetic run for this dispensary
const existingRun = await this.pool.query(
`SELECT id FROM crawl_runs
WHERE dispensary_id = $1
AND source_job_type = 'products_only_hydration'
LIMIT 1`,
[dispensaryId]
);
if (existingRun.rows.length > 0) {
crawlRunId = existingRun.rows[0].id;
this.log(`Dispensary ${dispensaryId}: Using existing synthetic crawl run ${crawlRunId}`);
result.crawlRunsSkipped++;
} else {
// Create new synthetic crawl run
const insertResult = await this.pool.query(
`INSERT INTO crawl_runs (
dispensary_id, provider, started_at, finished_at, duration_ms,
status, products_found, trigger_type, metadata,
source_job_type, source_job_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
RETURNING id`,
[
dispensaryId,
'dutchie',
stats.min_date || now,
stats.max_date || now,
0,
'success',
productCount,
'hydration',
JSON.stringify({ mode: 'products_only', hydratedAt: now.toISOString() }),
'products_only_hydration',
dispensaryId, // Use dispensary_id as synthetic job_id
]
);
crawlRunId = insertResult.rows[0].id;
result.crawlRunsCreated++;
this.log(`Dispensary ${dispensaryId}: Created synthetic crawl run ${crawlRunId}`);
}
// Step 2: Upsert products
const productResult = await this.productNormalizer.upsertProductsForDispensary(dispensaryId);
result.productsUpserted += productResult.upserted;
if (productResult.errors.length > 0) {
result.errors.push(...productResult.errors.map(e => `Dispensary ${dispensaryId}: ${e}`));
}
// Step 3: Create initial snapshots from current product state
// crawlRunId is guaranteed to be set at this point (either from existing run or insert)
const snapshotsWritten = await this.createInitialSnapshots(dispensaryId, crawlRunId!);
result.snapshotsWritten += snapshotsWritten;
// Update crawl run with snapshot count
await this.pool.query(
'UPDATE crawl_runs SET snapshots_written = $1 WHERE id = $2',
[snapshotsWritten, crawlRunId]
);
} else {
// Dry run - just count what would be done
result.crawlRunsCreated++;
result.productsUpserted += productCount;
result.snapshotsWritten += productCount;
}
}
/**
* Create initial snapshots from current product state
*/
private async createInitialSnapshots(
dispensaryId: number,
crawlRunId: number
): Promise<number> {
// Get all store products for this dispensary
const products = await this.pool.query(
`SELECT sp.id, sp.price_rec, sp.price_med, sp.is_on_special, sp.is_in_stock,
sp.stock_quantity, sp.thc_percent, sp.cbd_percent
FROM store_products sp
WHERE sp.dispensary_id = $1 AND sp.provider = 'dutchie'`,
[dispensaryId]
);
if (products.rows.length === 0) return 0;
const now = new Date();
const batchSize = 100;
let totalInserted = 0;
// Process in batches
for (let i = 0; i < products.rows.length; i += batchSize) {
const batch = products.rows.slice(i, i + batchSize);
const values: any[] = [];
const placeholders: string[] = [];
let paramIndex = 1;
for (const product of batch) {
values.push(
dispensaryId,
product.id,
crawlRunId,
now,
product.price_rec,
product.price_med,
product.is_on_special || false,
product.is_in_stock || false,
product.stock_quantity,
product.thc_percent,
product.cbd_percent,
JSON.stringify({ source: 'initial_hydration' })
);
const rowPlaceholders = [];
for (let j = 0; j < 12; j++) {
rowPlaceholders.push(`$${paramIndex++}`);
}
placeholders.push(`(${rowPlaceholders.join(', ')}, NOW())`);
}
const query = `
INSERT INTO store_product_snapshots (
dispensary_id, store_product_id, crawl_run_id, captured_at,
price_rec, price_med, is_on_special, is_in_stock, stock_quantity,
thc_percent, cbd_percent, raw_data, created_at
) VALUES ${placeholders.join(', ')}
ON CONFLICT (store_product_id, crawl_run_id)
WHERE store_product_id IS NOT NULL AND crawl_run_id IS NOT NULL
DO NOTHING
`;
const result = await this.pool.query(query, values);
totalInserted += result.rowCount || 0;
}
return totalInserted;
}
}