Files
cannaiq/backend/src/hydration/incremental-sync.ts
Kelly b7cfec0770 feat: AZ dispensary harmonization with Dutchie source of truth
Major changes:
- Add harmonize-az-dispensaries.ts script to sync dispensaries with Dutchie API
- Add migration 057 for crawl_enabled and dutchie_verified fields
- Remove legacy dutchie-az module (replaced by platforms/dutchie)
- Clean up deprecated crawlers, scrapers, and orchestrator code
- Update location-discovery to not fallback to slug when ID is missing
- Add crawl-rotator service for proxy rotation
- Add types/index.ts for shared type definitions
- Add woodpecker-agent k8s manifest

Harmonization script:
- Queries ConsumerDispensaries API for all 32 AZ cities
- Matches dispensaries by platform_dispensary_id (not slug)
- Updates existing records with full Dutchie data
- Creates new records for unmatched Dutchie dispensaries
- Disables dispensaries not found in Dutchie

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-08 10:19:49 -07:00

673 lines
19 KiB
TypeScript

/**
* Incremental Sync
*
* Hooks into the crawler to automatically write to canonical tables
* after each crawl completes. This ensures store_products and
* store_product_snapshots stay in sync with new data.
*
* Two modes:
* 1. Inline - Called directly from crawler after saving to legacy tables
* 2. Async - Called from a background worker that processes recent crawls
*
* Usage:
* // Inline mode (in crawler)
* import { syncCrawlToCanonical } from './hydration/incremental-sync';
* await syncCrawlToCanonical(pool, crawlResult);
*
* // Async mode (background worker)
* import { syncRecentCrawls } from './hydration/incremental-sync';
* await syncRecentCrawls(pool, { since: '1 hour' });
*/
import { Pool } from 'pg';
const BATCH_SIZE = 100;
// ============================================================
// TYPES
// ============================================================
export interface CrawlResult {
dispensaryId: number;
stateId?: number;
platformDispensaryId?: string;
crawlJobId?: number; // legacy dispensary_crawl_jobs.id
startedAt: Date;
finishedAt?: Date;
status: 'success' | 'failed' | 'running';
errorMessage?: string;
productsFound: number;
productsCreated: number;
productsUpdated: number;
productsMissing?: number;
brandsFound?: number;
}
export interface SyncOptions {
dryRun?: boolean;
verbose?: boolean;
skipSnapshots?: boolean;
}
export interface SyncResult {
crawlRunId: number | null;
productsUpserted: number;
productsNew: number;
productsUpdated: number;
snapshotsCreated: number;
durationMs: number;
errors: string[];
}
// ============================================================
// CREATE OR GET CRAWL RUN
// ============================================================
/**
* Create a crawl_run record for a completed crawl.
* Returns existing if already synced (idempotent).
*/
export async function getOrCreateCrawlRun(
pool: Pool,
crawlResult: CrawlResult,
options: SyncOptions = {}
): Promise<number | null> {
const { dryRun = false, verbose = false } = options;
// Check if already exists (by legacy job ID)
if (crawlResult.crawlJobId) {
const existing = await pool.query(
`SELECT id FROM crawl_runs WHERE legacy_dispensary_crawl_job_id = $1`,
[crawlResult.crawlJobId]
);
if (existing.rows.length > 0) {
if (verbose) {
console.log(`[IncrSync] Found existing crawl_run ${existing.rows[0].id} for job ${crawlResult.crawlJobId}`);
}
return existing.rows[0].id;
}
}
if (dryRun) {
console.log(`[IncrSync][DryRun] Would create crawl_run for dispensary ${crawlResult.dispensaryId}`);
return null;
}
const durationMs = crawlResult.finishedAt && crawlResult.startedAt
? crawlResult.finishedAt.getTime() - crawlResult.startedAt.getTime()
: null;
const result = await pool.query(
`INSERT INTO crawl_runs (
dispensary_id, state_id, provider,
legacy_dispensary_crawl_job_id,
started_at, finished_at, duration_ms,
status, error_message,
products_found, products_new, products_updated, products_missing,
brands_found, trigger_type, created_at
) VALUES (
$1, $2, 'dutchie',
$3,
$4, $5, $6,
$7, $8,
$9, $10, $11, $12,
$13, 'scheduled', NOW()
)
RETURNING id`,
[
crawlResult.dispensaryId,
crawlResult.stateId,
crawlResult.crawlJobId,
crawlResult.startedAt,
crawlResult.finishedAt,
durationMs,
crawlResult.status,
crawlResult.errorMessage,
crawlResult.productsFound,
crawlResult.productsCreated,
crawlResult.productsUpdated,
crawlResult.productsMissing || 0,
crawlResult.brandsFound || 0,
]
);
if (verbose) {
console.log(`[IncrSync] Created crawl_run ${result.rows[0].id}`);
}
return result.rows[0].id;
}
// ============================================================
// SYNC PRODUCTS TO CANONICAL
// ============================================================
/**
* Sync dutchie_products to store_products for a single dispensary.
* Called after a crawl completes.
*/
export async function syncProductsToCanonical(
pool: Pool,
dispensaryId: number,
stateId: number | null,
crawlRunId: number | null,
options: SyncOptions = {}
): Promise<{ upserted: number; new: number; updated: number; errors: string[] }> {
const { dryRun = false, verbose = false } = options;
const errors: string[] = [];
let newCount = 0;
let updatedCount = 0;
// Get all products for this dispensary
const { rows: products } = await pool.query(
`SELECT
dp.id,
dp.external_product_id,
dp.name,
dp.brand_name,
dp.brand_id,
dp.category,
dp.subcategory,
dp.type,
dp.strain_type,
dp.description,
dp.effects,
dp.cannabinoids_v2,
dp.thc,
dp.thc_content,
dp.cbd,
dp.cbd_content,
dp.primary_image_url,
dp.local_image_url,
dp.local_image_thumb_url,
dp.local_image_medium_url,
dp.original_image_url,
dp.additional_images,
dp.stock_status,
dp.c_name,
dp.enterprise_product_id,
dp.weight,
dp.options,
dp.measurements,
dp.status,
dp.featured,
dp.special,
dp.medical_only,
dp.rec_only,
dp.is_below_threshold,
dp.is_below_kiosk_threshold,
dp.total_quantity_available,
dp.total_kiosk_quantity_available,
dp.first_seen_at,
dp.last_seen_at,
dp.updated_at,
d.platform_dispensary_id
FROM dutchie_products dp
LEFT JOIN dispensaries d ON d.id = dp.dispensary_id
WHERE dp.dispensary_id = $1`,
[dispensaryId]
);
if (verbose) {
console.log(`[IncrSync] Found ${products.length} products for dispensary ${dispensaryId}`);
}
// Process in batches
for (let i = 0; i < products.length; i += BATCH_SIZE) {
const batch = products.slice(i, i + BATCH_SIZE);
for (const p of batch) {
try {
const thcPercent = parseFloat(p.thc) || parseFloat(p.thc_content) || null;
const cbdPercent = parseFloat(p.cbd) || parseFloat(p.cbd_content) || null;
const stockStatus = p.stock_status || 'unknown';
const isInStock = stockStatus === 'in_stock' || stockStatus === 'unknown';
if (dryRun) {
if (verbose) {
console.log(`[IncrSync][DryRun] Would upsert product ${p.external_product_id}`);
}
newCount++;
continue;
}
const result = await pool.query(
`INSERT INTO store_products (
dispensary_id, state_id, provider, provider_product_id,
provider_brand_id, provider_dispensary_id, enterprise_product_id,
legacy_dutchie_product_id,
name, brand_name, category, subcategory, product_type, strain_type,
description, effects, cannabinoids,
thc_percent, cbd_percent, thc_content_text, cbd_content_text,
is_in_stock, stock_status, stock_quantity,
total_quantity_available, total_kiosk_quantity_available,
image_url, local_image_url, local_image_thumb_url, local_image_medium_url,
original_image_url, additional_images,
is_on_special, is_featured, medical_only, rec_only,
is_below_threshold, is_below_kiosk_threshold,
platform_status, c_name, weight, options, measurements,
first_seen_at, last_seen_at, updated_at
) VALUES (
$1, $2, 'dutchie', $3,
$4, $5, $6,
$7,
$8, $9, $10, $11, $12, $13,
$14, $15, $16,
$17, $18, $19, $20,
$21, $22, $23,
$24, $25,
$26, $27, $28, $29,
$30, $31,
$32, $33, $34, $35,
$36, $37,
$38, $39, $40, $41, $42,
$43, $44, NOW()
)
ON CONFLICT (dispensary_id, provider, provider_product_id)
DO UPDATE SET
legacy_dutchie_product_id = EXCLUDED.legacy_dutchie_product_id,
name = EXCLUDED.name,
brand_name = EXCLUDED.brand_name,
category = EXCLUDED.category,
subcategory = EXCLUDED.subcategory,
is_in_stock = EXCLUDED.is_in_stock,
stock_status = EXCLUDED.stock_status,
thc_percent = EXCLUDED.thc_percent,
cbd_percent = EXCLUDED.cbd_percent,
image_url = EXCLUDED.image_url,
local_image_url = EXCLUDED.local_image_url,
is_on_special = EXCLUDED.is_on_special,
platform_status = EXCLUDED.platform_status,
last_seen_at = NOW(),
updated_at = NOW()
RETURNING (xmax = 0) as is_new`,
[
dispensaryId,
stateId,
p.external_product_id,
p.brand_id,
p.platform_dispensary_id,
p.enterprise_product_id,
p.id,
p.name,
p.brand_name,
p.category || p.type,
p.subcategory,
p.type,
p.strain_type,
p.description,
p.effects,
p.cannabinoids_v2,
thcPercent,
cbdPercent,
p.thc_content,
p.cbd_content,
isInStock,
stockStatus,
p.total_quantity_available,
p.total_quantity_available,
p.total_kiosk_quantity_available,
p.primary_image_url,
p.local_image_url,
p.local_image_thumb_url,
p.local_image_medium_url,
p.original_image_url,
p.additional_images,
p.special || false,
p.featured || false,
p.medical_only || false,
p.rec_only || false,
p.is_below_threshold || false,
p.is_below_kiosk_threshold || false,
p.status,
p.c_name,
p.weight,
p.options,
p.measurements,
p.first_seen_at || p.updated_at,
p.last_seen_at || p.updated_at,
]
);
if (result.rows[0]?.is_new) {
newCount++;
} else {
updatedCount++;
}
} catch (error: any) {
errors.push(`Product ${p.id}: ${error.message}`);
}
}
}
return {
upserted: newCount + updatedCount,
new: newCount,
updated: updatedCount,
errors,
};
}
// ============================================================
// SYNC SNAPSHOTS TO CANONICAL
// ============================================================
/**
* Sync dutchie_product_snapshots to store_product_snapshots for recent crawls.
*/
export async function syncSnapshotsToCanonical(
pool: Pool,
dispensaryId: number,
stateId: number | null,
crawlRunId: number | null,
since: Date,
options: SyncOptions = {}
): Promise<{ created: number; errors: string[] }> {
const { dryRun = false, verbose = false } = options;
const errors: string[] = [];
let created = 0;
// Get recent snapshots that haven't been synced yet
const { rows: snapshots } = await pool.query(
`SELECT
dps.id,
dps.dutchie_product_id,
dps.dispensary_id,
dps.options,
dps.raw_product_data,
dps.crawled_at,
dps.created_at,
dp.external_product_id,
dp.name,
dp.brand_name,
dp.category,
dp.subcategory,
sp.id as store_product_id,
d.platform_dispensary_id
FROM dutchie_product_snapshots dps
JOIN dutchie_products dp ON dp.id = dps.dutchie_product_id
LEFT JOIN store_products sp ON sp.dispensary_id = dps.dispensary_id
AND sp.provider_product_id = dp.external_product_id
AND sp.provider = 'dutchie'
LEFT JOIN dispensaries d ON d.id = dps.dispensary_id
LEFT JOIN store_product_snapshots sps ON sps.legacy_snapshot_id = dps.id
WHERE dps.dispensary_id = $1
AND dps.crawled_at >= $2
AND sps.id IS NULL
ORDER BY dps.id`,
[dispensaryId, since]
);
if (verbose) {
console.log(`[IncrSync] Found ${snapshots.length} new snapshots since ${since.toISOString()}`);
}
if (snapshots.length === 0) {
return { created: 0, errors: [] };
}
for (const s of snapshots) {
try {
// Extract pricing from raw_product_data
let priceRec: number | null = null;
let priceMed: number | null = null;
let priceRecSpecial: number | null = null;
let isOnSpecial = false;
let isInStock = true;
let thcPercent: number | null = null;
let cbdPercent: number | null = null;
let stockStatus = 'unknown';
let platformStatus: string | null = null;
if (s.raw_product_data) {
const raw = typeof s.raw_product_data === 'string'
? JSON.parse(s.raw_product_data)
: s.raw_product_data;
priceRec = raw.recPrices?.[0] || raw.Prices?.[0] || null;
priceMed = raw.medicalPrices?.[0] || null;
priceRecSpecial = raw.recSpecialPrices?.[0] || null;
isOnSpecial = raw.special === true || (priceRecSpecial !== null);
thcPercent = raw.THCContent?.range?.[0] || raw.THC || null;
cbdPercent = raw.CBDContent?.range?.[0] || raw.CBD || null;
platformStatus = raw.Status || null;
isInStock = platformStatus === 'Active';
stockStatus = isInStock ? 'in_stock' : 'out_of_stock';
}
if (dryRun) {
if (verbose) {
console.log(`[IncrSync][DryRun] Would create snapshot for legacy ${s.id}`);
}
created++;
continue;
}
await pool.query(
`INSERT INTO store_product_snapshots (
dispensary_id, store_product_id, state_id,
provider, provider_product_id, provider_dispensary_id,
crawl_run_id,
legacy_snapshot_id, legacy_dutchie_product_id,
captured_at,
name, brand_name, category, subcategory,
price_rec, price_med, price_rec_special,
is_on_special, is_in_stock, stock_status,
thc_percent, cbd_percent,
platform_status, options, raw_data,
created_at
) VALUES (
$1, $2, $3,
'dutchie', $4, $5,
$6,
$7, $8,
$9,
$10, $11, $12, $13,
$14, $15, $16,
$17, $18, $19,
$20, $21,
$22, $23, $24,
NOW()
)`,
[
s.dispensary_id,
s.store_product_id,
stateId,
s.external_product_id,
s.platform_dispensary_id,
crawlRunId,
s.id,
s.dutchie_product_id,
s.crawled_at,
s.name,
s.brand_name,
s.category,
s.subcategory,
priceRec,
priceMed,
priceRecSpecial,
isOnSpecial,
isInStock,
stockStatus,
thcPercent,
cbdPercent,
platformStatus,
s.options,
s.raw_product_data,
]
);
created++;
} catch (error: any) {
errors.push(`Snapshot ${s.id}: ${error.message}`);
}
}
return { created, errors };
}
// ============================================================
// MAIN SYNC FUNCTION
// ============================================================
/**
* Sync a single crawl result to canonical tables.
* Call this from the crawler after each crawl completes.
*/
export async function syncCrawlToCanonical(
pool: Pool,
crawlResult: CrawlResult,
options: SyncOptions = {}
): Promise<SyncResult> {
const startTime = Date.now();
const errors: string[] = [];
const { verbose = false, skipSnapshots = false } = options;
if (verbose) {
console.log(`[IncrSync] Starting sync for dispensary ${crawlResult.dispensaryId}`);
}
// 1. Create crawl_run record
const crawlRunId = await getOrCreateCrawlRun(pool, crawlResult, options);
// 2. Sync products
const productResult = await syncProductsToCanonical(
pool,
crawlResult.dispensaryId,
crawlResult.stateId || null,
crawlRunId,
options
);
errors.push(...productResult.errors);
// 3. Sync snapshots (if not skipped)
let snapshotsCreated = 0;
if (!skipSnapshots) {
const since = new Date(crawlResult.startedAt.getTime() - 60 * 1000); // 1 min before
const snapshotResult = await syncSnapshotsToCanonical(
pool,
crawlResult.dispensaryId,
crawlResult.stateId || null,
crawlRunId,
since,
options
);
snapshotsCreated = snapshotResult.created;
errors.push(...snapshotResult.errors);
}
const durationMs = Date.now() - startTime;
if (verbose) {
console.log(`[IncrSync] Completed in ${durationMs}ms: ${productResult.upserted} products, ${snapshotsCreated} snapshots`);
}
return {
crawlRunId,
productsUpserted: productResult.upserted,
productsNew: productResult.new,
productsUpdated: productResult.updated,
snapshotsCreated,
durationMs,
errors,
};
}
// ============================================================
// BATCH SYNC FOR RECENT CRAWLS
// ============================================================
export interface RecentSyncOptions extends SyncOptions {
since?: string; // e.g., '1 hour', '30 minutes', '1 day'
dispensaryId?: number;
limit?: number;
}
/**
* Sync recent crawls that haven't been synced yet.
* Run this as a background job to catch any missed syncs.
*/
export async function syncRecentCrawls(
pool: Pool,
options: RecentSyncOptions = {}
): Promise<{ synced: number; errors: string[] }> {
const {
since = '1 hour',
dispensaryId,
limit = 100,
verbose = false,
dryRun = false,
} = options;
const errors: string[] = [];
let synced = 0;
// Find recent completed crawl jobs that don't have a crawl_run
let query = `
SELECT
dcj.id as crawl_job_id,
dcj.dispensary_id,
dcj.status,
dcj.started_at,
dcj.completed_at,
dcj.products_found,
dcj.products_created,
dcj.products_updated,
dcj.brands_found,
dcj.error_message,
d.state_id
FROM dispensary_crawl_jobs dcj
LEFT JOIN dispensaries d ON d.id = dcj.dispensary_id
LEFT JOIN crawl_runs cr ON cr.legacy_dispensary_crawl_job_id = dcj.id
WHERE dcj.status IN ('completed', 'failed')
AND dcj.started_at > NOW() - INTERVAL '${since}'
AND cr.id IS NULL
`;
const params: any[] = [];
let paramIdx = 1;
if (dispensaryId) {
query += ` AND dcj.dispensary_id = $${paramIdx}`;
params.push(dispensaryId);
paramIdx++;
}
query += ` ORDER BY dcj.started_at DESC LIMIT $${paramIdx}`;
params.push(limit);
const { rows: unsynced } = await pool.query(query, params);
if (verbose) {
console.log(`[IncrSync] Found ${unsynced.length} unsynced crawls from last ${since}`);
}
for (const job of unsynced) {
try {
const crawlResult: CrawlResult = {
dispensaryId: job.dispensary_id,
stateId: job.state_id,
crawlJobId: job.crawl_job_id,
startedAt: new Date(job.started_at),
finishedAt: job.completed_at ? new Date(job.completed_at) : undefined,
status: job.status === 'completed' ? 'success' : 'failed',
errorMessage: job.error_message,
productsFound: job.products_found || 0,
productsCreated: job.products_created || 0,
productsUpdated: job.products_updated || 0,
brandsFound: job.brands_found || 0,
};
await syncCrawlToCanonical(pool, crawlResult, { dryRun, verbose });
synced++;
} catch (error: any) {
errors.push(`Job ${job.crawl_job_id}: ${error.message}`);
}
}
return { synced, errors };
}
// Types CrawlResult, SyncOptions, and SyncResult are already exported at their declarations