Major changes: - Add harmonize-az-dispensaries.ts script to sync dispensaries with Dutchie API - Add migration 057 for crawl_enabled and dutchie_verified fields - Remove legacy dutchie-az module (replaced by platforms/dutchie) - Clean up deprecated crawlers, scrapers, and orchestrator code - Update location-discovery to not fallback to slug when ID is missing - Add crawl-rotator service for proxy rotation - Add types/index.ts for shared type definitions - Add woodpecker-agent k8s manifest Harmonization script: - Queries ConsumerDispensaries API for all 32 AZ cities - Matches dispensaries by platform_dispensary_id (not slug) - Updates existing records with full Dutchie data - Creates new records for unmatched Dutchie dispensaries - Disables dispensaries not found in Dutchie 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
562 lines
18 KiB
TypeScript
562 lines
18 KiB
TypeScript
/**
|
|
* CanonicalHydrationService
|
|
* Orchestrates the full hydration pipeline from dutchie_* to canonical tables
|
|
*/
|
|
|
|
import { Pool } from 'pg';
|
|
import { CrawlRunRecorder } from './crawl-run-recorder';
|
|
import { StoreProductNormalizer } from './store-product-normalizer';
|
|
import { SnapshotWriter } from './snapshot-writer';
|
|
import { HydrationOptions, HydrationResult, ServiceContext, SourceJob } from './types';
|
|
|
|
export class CanonicalHydrationService {
|
|
private pool: Pool;
|
|
private log: (message: string) => void;
|
|
private crawlRunRecorder: CrawlRunRecorder;
|
|
private productNormalizer: StoreProductNormalizer;
|
|
private snapshotWriter: SnapshotWriter;
|
|
|
|
constructor(ctx: ServiceContext) {
|
|
this.pool = ctx.pool;
|
|
this.log = ctx.logger || console.log;
|
|
this.crawlRunRecorder = new CrawlRunRecorder(ctx);
|
|
this.productNormalizer = new StoreProductNormalizer(ctx);
|
|
this.snapshotWriter = new SnapshotWriter(ctx);
|
|
}
|
|
|
|
/**
|
|
* Run the full hydration pipeline
|
|
* Supports both backfill (historical) and incremental (ongoing) modes
|
|
*/
|
|
async hydrate(options: HydrationOptions): Promise<HydrationResult> {
|
|
const startTime = Date.now();
|
|
const result: HydrationResult = {
|
|
crawlRunsCreated: 0,
|
|
crawlRunsSkipped: 0,
|
|
productsUpserted: 0,
|
|
snapshotsWritten: 0,
|
|
errors: [],
|
|
durationMs: 0,
|
|
};
|
|
|
|
this.log(`Starting hydration in ${options.mode} mode`);
|
|
|
|
try {
|
|
if (options.mode === 'backfill') {
|
|
await this.runBackfill(options, result);
|
|
} else {
|
|
await this.runIncremental(options, result);
|
|
}
|
|
} catch (err: any) {
|
|
result.errors.push(`Fatal error: ${err.message}`);
|
|
this.log(`Hydration failed: ${err.message}`);
|
|
}
|
|
|
|
result.durationMs = Date.now() - startTime;
|
|
this.log(`Hydration completed in ${result.durationMs}ms: ${JSON.stringify({
|
|
crawlRunsCreated: result.crawlRunsCreated,
|
|
crawlRunsSkipped: result.crawlRunsSkipped,
|
|
productsUpserted: result.productsUpserted,
|
|
snapshotsWritten: result.snapshotsWritten,
|
|
errors: result.errors.length,
|
|
})}`);
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Backfill mode: Process historical data from source tables
|
|
*/
|
|
private async runBackfill(options: HydrationOptions, result: HydrationResult): Promise<void> {
|
|
const batchSize = options.batchSize || 50;
|
|
|
|
// Get source jobs to process
|
|
const sourceJobs = await this.crawlRunRecorder.getSourceJobsForBackfill(
|
|
options.startDate,
|
|
options.endDate,
|
|
options.dispensaryId,
|
|
1000 // Max jobs to process
|
|
);
|
|
|
|
this.log(`Found ${sourceJobs.length} source jobs to backfill`);
|
|
|
|
// Group jobs by dispensary for efficient processing
|
|
const jobsByDispensary = this.groupJobsByDispensary(sourceJobs);
|
|
|
|
for (const [dispensaryId, jobs] of jobsByDispensary) {
|
|
this.log(`Processing dispensary ${dispensaryId} (${jobs.length} jobs)`);
|
|
|
|
try {
|
|
// Step 1: Upsert products for this dispensary
|
|
if (!options.dryRun) {
|
|
const productResult = await this.productNormalizer.upsertProductsForDispensary(dispensaryId);
|
|
result.productsUpserted += productResult.upserted;
|
|
if (productResult.errors.length > 0) {
|
|
result.errors.push(...productResult.errors.map(e => `Dispensary ${dispensaryId}: ${e}`));
|
|
}
|
|
}
|
|
|
|
// Get store_product_id map for snapshot writing
|
|
const storeProductIdMap = await this.productNormalizer.getStoreProductIdMap(dispensaryId);
|
|
|
|
// Step 2: Record crawl runs and write snapshots for each job
|
|
for (const job of jobs) {
|
|
try {
|
|
await this.processJob(job, storeProductIdMap, result, options.dryRun);
|
|
} catch (err: any) {
|
|
result.errors.push(`Job ${job.id}: ${err.message}`);
|
|
}
|
|
}
|
|
} catch (err: any) {
|
|
result.errors.push(`Dispensary ${dispensaryId}: ${err.message}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Incremental mode: Process only unhydrated jobs
|
|
*/
|
|
private async runIncremental(options: HydrationOptions, result: HydrationResult): Promise<void> {
|
|
const limit = options.batchSize || 100;
|
|
|
|
// Get unhydrated jobs
|
|
const unhydratedJobs = await this.crawlRunRecorder.getUnhydratedJobs(
|
|
options.dispensaryId,
|
|
options.startDate,
|
|
limit
|
|
);
|
|
|
|
this.log(`Found ${unhydratedJobs.length} unhydrated jobs`);
|
|
|
|
// Group by dispensary
|
|
const jobsByDispensary = this.groupJobsByDispensary(unhydratedJobs);
|
|
|
|
for (const [dispensaryId, jobs] of jobsByDispensary) {
|
|
this.log(`Processing dispensary ${dispensaryId} (${jobs.length} jobs)`);
|
|
|
|
try {
|
|
// Step 1: Upsert products
|
|
if (!options.dryRun) {
|
|
const productResult = await this.productNormalizer.upsertProductsForDispensary(dispensaryId);
|
|
result.productsUpserted += productResult.upserted;
|
|
if (productResult.errors.length > 0) {
|
|
result.errors.push(...productResult.errors.map(e => `Dispensary ${dispensaryId}: ${e}`));
|
|
}
|
|
}
|
|
|
|
// Get store_product_id map
|
|
const storeProductIdMap = await this.productNormalizer.getStoreProductIdMap(dispensaryId);
|
|
|
|
// Step 2: Process each job
|
|
for (const job of jobs) {
|
|
try {
|
|
await this.processJob(job, storeProductIdMap, result, options.dryRun);
|
|
} catch (err: any) {
|
|
result.errors.push(`Job ${job.id}: ${err.message}`);
|
|
}
|
|
}
|
|
} catch (err: any) {
|
|
result.errors.push(`Dispensary ${dispensaryId}: ${err.message}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Process a single job: record crawl run and write snapshots
|
|
*/
|
|
private async processJob(
|
|
job: SourceJob,
|
|
storeProductIdMap: Map<string, number>,
|
|
result: HydrationResult,
|
|
dryRun?: boolean
|
|
): Promise<void> {
|
|
// Step 1: Record the crawl run
|
|
let crawlRunId: number | null = null;
|
|
|
|
if (!dryRun) {
|
|
crawlRunId = await this.crawlRunRecorder.recordCrawlRun(job);
|
|
if (crawlRunId) {
|
|
result.crawlRunsCreated++;
|
|
} else {
|
|
result.crawlRunsSkipped++;
|
|
return; // Skip snapshot writing if crawl run wasn't created
|
|
}
|
|
} else {
|
|
// In dry run, check if it would be created
|
|
const existingId = await this.crawlRunRecorder.getCrawlRunIdBySourceJob(
|
|
'dispensary_crawl_jobs',
|
|
job.id
|
|
);
|
|
if (existingId) {
|
|
result.crawlRunsSkipped++;
|
|
return;
|
|
}
|
|
result.crawlRunsCreated++;
|
|
return; // Skip snapshot writing in dry run
|
|
}
|
|
|
|
// Step 2: Write snapshots for this crawl run
|
|
if (crawlRunId && job.completed_at) {
|
|
const snapshotResult = await this.snapshotWriter.writeSnapshotsForCrawlRun(
|
|
crawlRunId,
|
|
job.dispensary_id,
|
|
storeProductIdMap,
|
|
job.completed_at
|
|
);
|
|
|
|
result.snapshotsWritten += snapshotResult.written;
|
|
if (snapshotResult.errors.length > 0) {
|
|
result.errors.push(...snapshotResult.errors);
|
|
}
|
|
|
|
// Update crawl_run with snapshots_written count
|
|
await this.crawlRunRecorder.updateSnapshotsWritten(crawlRunId, snapshotResult.written);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Hydrate a single dispensary (convenience method)
|
|
*/
|
|
async hydrateDispensary(
|
|
dispensaryId: number,
|
|
mode: 'backfill' | 'incremental' = 'incremental'
|
|
): Promise<HydrationResult> {
|
|
return this.hydrate({
|
|
mode,
|
|
dispensaryId,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Get hydration status for a dispensary
|
|
*/
|
|
async getHydrationStatus(dispensaryId: number): Promise<{
|
|
sourceJobs: number;
|
|
hydratedJobs: number;
|
|
unhydratedJobs: number;
|
|
sourceProducts: number;
|
|
storeProducts: number;
|
|
sourceSnapshots: number;
|
|
storeSnapshots: number;
|
|
}> {
|
|
const [sourceJobs, hydratedJobs, sourceProducts, storeProducts, sourceSnapshots, storeSnapshots] =
|
|
await Promise.all([
|
|
this.pool.query(
|
|
`SELECT COUNT(*) FROM dispensary_crawl_jobs
|
|
WHERE dispensary_id = $1 AND status = 'completed' AND job_type = 'dutchie_product_crawl'`,
|
|
[dispensaryId]
|
|
),
|
|
this.pool.query(
|
|
`SELECT COUNT(*) FROM crawl_runs
|
|
WHERE dispensary_id = $1 AND source_job_type = 'dispensary_crawl_jobs'`,
|
|
[dispensaryId]
|
|
),
|
|
this.pool.query(
|
|
`SELECT COUNT(*) FROM dutchie_products WHERE dispensary_id = $1`,
|
|
[dispensaryId]
|
|
),
|
|
this.pool.query(
|
|
`SELECT COUNT(*) FROM store_products WHERE dispensary_id = $1 AND provider = 'dutchie'`,
|
|
[dispensaryId]
|
|
),
|
|
this.pool.query(
|
|
`SELECT COUNT(*) FROM dutchie_product_snapshots WHERE dispensary_id = $1`,
|
|
[dispensaryId]
|
|
),
|
|
this.pool.query(
|
|
`SELECT COUNT(*) FROM store_product_snapshots WHERE dispensary_id = $1`,
|
|
[dispensaryId]
|
|
),
|
|
]);
|
|
|
|
const sourceJobCount = parseInt(sourceJobs.rows[0].count);
|
|
const hydratedJobCount = parseInt(hydratedJobs.rows[0].count);
|
|
|
|
return {
|
|
sourceJobs: sourceJobCount,
|
|
hydratedJobs: hydratedJobCount,
|
|
unhydratedJobs: sourceJobCount - hydratedJobCount,
|
|
sourceProducts: parseInt(sourceProducts.rows[0].count),
|
|
storeProducts: parseInt(storeProducts.rows[0].count),
|
|
sourceSnapshots: parseInt(sourceSnapshots.rows[0].count),
|
|
storeSnapshots: parseInt(storeSnapshots.rows[0].count),
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get overall hydration status
|
|
*/
|
|
async getOverallStatus(): Promise<{
|
|
totalSourceJobs: number;
|
|
totalHydratedJobs: number;
|
|
totalSourceProducts: number;
|
|
totalStoreProducts: number;
|
|
totalSourceSnapshots: number;
|
|
totalStoreSnapshots: number;
|
|
dispensariesWithData: number;
|
|
}> {
|
|
const [sourceJobs, hydratedJobs, sourceProducts, storeProducts, sourceSnapshots, storeSnapshots, dispensaries] =
|
|
await Promise.all([
|
|
this.pool.query(
|
|
`SELECT COUNT(*) FROM dispensary_crawl_jobs
|
|
WHERE status = 'completed' AND job_type = 'dutchie_product_crawl'`
|
|
),
|
|
this.pool.query(
|
|
`SELECT COUNT(*) FROM crawl_runs WHERE source_job_type = 'dispensary_crawl_jobs'`
|
|
),
|
|
this.pool.query(`SELECT COUNT(*) FROM dutchie_products`),
|
|
this.pool.query(`SELECT COUNT(*) FROM store_products WHERE provider = 'dutchie'`),
|
|
this.pool.query(`SELECT COUNT(*) FROM dutchie_product_snapshots`),
|
|
this.pool.query(`SELECT COUNT(*) FROM store_product_snapshots`),
|
|
this.pool.query(
|
|
`SELECT COUNT(DISTINCT dispensary_id) FROM dutchie_products`
|
|
),
|
|
]);
|
|
|
|
return {
|
|
totalSourceJobs: parseInt(sourceJobs.rows[0].count),
|
|
totalHydratedJobs: parseInt(hydratedJobs.rows[0].count),
|
|
totalSourceProducts: parseInt(sourceProducts.rows[0].count),
|
|
totalStoreProducts: parseInt(storeProducts.rows[0].count),
|
|
totalSourceSnapshots: parseInt(sourceSnapshots.rows[0].count),
|
|
totalStoreSnapshots: parseInt(storeSnapshots.rows[0].count),
|
|
dispensariesWithData: parseInt(dispensaries.rows[0].count),
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Group jobs by dispensary ID
|
|
*/
|
|
private groupJobsByDispensary(jobs: SourceJob[]): Map<number, SourceJob[]> {
|
|
const map = new Map<number, SourceJob[]>();
|
|
for (const job of jobs) {
|
|
const list = map.get(job.dispensary_id) || [];
|
|
list.push(job);
|
|
map.set(job.dispensary_id, list);
|
|
}
|
|
return map;
|
|
}
|
|
|
|
/**
|
|
* Products-only hydration mode
|
|
* Used when there are no historical job records - creates synthetic crawl runs
|
|
* from current product data
|
|
*/
|
|
async hydrateProductsOnly(options: {
|
|
dispensaryId?: number;
|
|
dryRun?: boolean;
|
|
} = {}): Promise<HydrationResult> {
|
|
const startTime = Date.now();
|
|
const result: HydrationResult = {
|
|
crawlRunsCreated: 0,
|
|
crawlRunsSkipped: 0,
|
|
productsUpserted: 0,
|
|
snapshotsWritten: 0,
|
|
errors: [],
|
|
durationMs: 0,
|
|
};
|
|
|
|
this.log('Starting products-only hydration mode');
|
|
|
|
try {
|
|
// Get all dispensaries with products
|
|
let dispensaryIds: number[];
|
|
if (options.dispensaryId) {
|
|
dispensaryIds = [options.dispensaryId];
|
|
} else {
|
|
const dispResult = await this.pool.query(
|
|
'SELECT DISTINCT dispensary_id FROM dutchie_products ORDER BY dispensary_id'
|
|
);
|
|
dispensaryIds = dispResult.rows.map(r => r.dispensary_id);
|
|
}
|
|
|
|
this.log(`Processing ${dispensaryIds.length} dispensaries`);
|
|
|
|
for (const dispensaryId of dispensaryIds) {
|
|
try {
|
|
await this.hydrateDispensaryProductsOnly(dispensaryId, result, options.dryRun);
|
|
} catch (err: any) {
|
|
result.errors.push(`Dispensary ${dispensaryId}: ${err.message}`);
|
|
}
|
|
}
|
|
} catch (err: any) {
|
|
result.errors.push(`Fatal error: ${err.message}`);
|
|
}
|
|
|
|
result.durationMs = Date.now() - startTime;
|
|
this.log(`Products-only hydration completed in ${result.durationMs}ms: ${JSON.stringify({
|
|
crawlRunsCreated: result.crawlRunsCreated,
|
|
productsUpserted: result.productsUpserted,
|
|
snapshotsWritten: result.snapshotsWritten,
|
|
errors: result.errors.length,
|
|
})}`);
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Hydrate a single dispensary in products-only mode
|
|
*/
|
|
private async hydrateDispensaryProductsOnly(
|
|
dispensaryId: number,
|
|
result: HydrationResult,
|
|
dryRun?: boolean
|
|
): Promise<void> {
|
|
// Get product count and timestamps for this dispensary
|
|
const statsResult = await this.pool.query(
|
|
`SELECT COUNT(*) as cnt, MIN(created_at) as min_date, MAX(updated_at) as max_date
|
|
FROM dutchie_products WHERE dispensary_id = $1`,
|
|
[dispensaryId]
|
|
);
|
|
const stats = statsResult.rows[0];
|
|
const productCount = parseInt(stats.cnt);
|
|
|
|
if (productCount === 0) {
|
|
this.log(`Dispensary ${dispensaryId}: No products, skipping`);
|
|
return;
|
|
}
|
|
|
|
this.log(`Dispensary ${dispensaryId}: ${productCount} products`);
|
|
|
|
// Step 1: Create synthetic crawl run
|
|
let crawlRunId: number | null = null;
|
|
const now = new Date();
|
|
|
|
if (!dryRun) {
|
|
// Check if we already have a synthetic run for this dispensary
|
|
const existingRun = await this.pool.query(
|
|
`SELECT id FROM crawl_runs
|
|
WHERE dispensary_id = $1
|
|
AND source_job_type = 'products_only_hydration'
|
|
LIMIT 1`,
|
|
[dispensaryId]
|
|
);
|
|
|
|
if (existingRun.rows.length > 0) {
|
|
crawlRunId = existingRun.rows[0].id;
|
|
this.log(`Dispensary ${dispensaryId}: Using existing synthetic crawl run ${crawlRunId}`);
|
|
result.crawlRunsSkipped++;
|
|
} else {
|
|
// Create new synthetic crawl run
|
|
const insertResult = await this.pool.query(
|
|
`INSERT INTO crawl_runs (
|
|
dispensary_id, provider, started_at, finished_at, duration_ms,
|
|
status, products_found, trigger_type, metadata,
|
|
source_job_type, source_job_id
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
|
RETURNING id`,
|
|
[
|
|
dispensaryId,
|
|
'dutchie',
|
|
stats.min_date || now,
|
|
stats.max_date || now,
|
|
0,
|
|
'success',
|
|
productCount,
|
|
'hydration',
|
|
JSON.stringify({ mode: 'products_only', hydratedAt: now.toISOString() }),
|
|
'products_only_hydration',
|
|
dispensaryId, // Use dispensary_id as synthetic job_id
|
|
]
|
|
);
|
|
crawlRunId = insertResult.rows[0].id;
|
|
result.crawlRunsCreated++;
|
|
this.log(`Dispensary ${dispensaryId}: Created synthetic crawl run ${crawlRunId}`);
|
|
}
|
|
|
|
// Step 2: Upsert products
|
|
const productResult = await this.productNormalizer.upsertProductsForDispensary(dispensaryId);
|
|
result.productsUpserted += productResult.upserted;
|
|
if (productResult.errors.length > 0) {
|
|
result.errors.push(...productResult.errors.map(e => `Dispensary ${dispensaryId}: ${e}`));
|
|
}
|
|
|
|
// Step 3: Create initial snapshots from current product state
|
|
// crawlRunId is guaranteed to be set at this point (either from existing run or insert)
|
|
const snapshotsWritten = await this.createInitialSnapshots(dispensaryId, crawlRunId!);
|
|
result.snapshotsWritten += snapshotsWritten;
|
|
|
|
// Update crawl run with snapshot count
|
|
await this.pool.query(
|
|
'UPDATE crawl_runs SET snapshots_written = $1 WHERE id = $2',
|
|
[snapshotsWritten, crawlRunId]
|
|
);
|
|
} else {
|
|
// Dry run - just count what would be done
|
|
result.crawlRunsCreated++;
|
|
result.productsUpserted += productCount;
|
|
result.snapshotsWritten += productCount;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create initial snapshots from current product state
|
|
*/
|
|
private async createInitialSnapshots(
|
|
dispensaryId: number,
|
|
crawlRunId: number
|
|
): Promise<number> {
|
|
// Get all store products for this dispensary
|
|
const products = await this.pool.query(
|
|
`SELECT sp.id, sp.price_rec, sp.price_med, sp.is_on_special, sp.is_in_stock,
|
|
sp.stock_quantity, sp.thc_percent, sp.cbd_percent
|
|
FROM store_products sp
|
|
WHERE sp.dispensary_id = $1 AND sp.provider = 'dutchie'`,
|
|
[dispensaryId]
|
|
);
|
|
|
|
if (products.rows.length === 0) return 0;
|
|
|
|
const now = new Date();
|
|
const batchSize = 100;
|
|
let totalInserted = 0;
|
|
|
|
// Process in batches
|
|
for (let i = 0; i < products.rows.length; i += batchSize) {
|
|
const batch = products.rows.slice(i, i + batchSize);
|
|
const values: any[] = [];
|
|
const placeholders: string[] = [];
|
|
let paramIndex = 1;
|
|
|
|
for (const product of batch) {
|
|
values.push(
|
|
dispensaryId,
|
|
product.id,
|
|
crawlRunId,
|
|
now,
|
|
product.price_rec,
|
|
product.price_med,
|
|
product.is_on_special || false,
|
|
product.is_in_stock || false,
|
|
product.stock_quantity,
|
|
product.thc_percent,
|
|
product.cbd_percent,
|
|
JSON.stringify({ source: 'initial_hydration' })
|
|
);
|
|
|
|
const rowPlaceholders = [];
|
|
for (let j = 0; j < 12; j++) {
|
|
rowPlaceholders.push(`$${paramIndex++}`);
|
|
}
|
|
placeholders.push(`(${rowPlaceholders.join(', ')}, NOW())`);
|
|
}
|
|
|
|
const query = `
|
|
INSERT INTO store_product_snapshots (
|
|
dispensary_id, store_product_id, crawl_run_id, captured_at,
|
|
price_rec, price_med, is_on_special, is_in_stock, stock_quantity,
|
|
thc_percent, cbd_percent, raw_data, created_at
|
|
) VALUES ${placeholders.join(', ')}
|
|
ON CONFLICT (store_product_id, crawl_run_id)
|
|
WHERE store_product_id IS NOT NULL AND crawl_run_id IS NOT NULL
|
|
DO NOTHING
|
|
`;
|
|
|
|
const result = await this.pool.query(query, values);
|
|
totalInserted += result.rowCount || 0;
|
|
}
|
|
|
|
return totalInserted;
|
|
}
|
|
}
|