Fix snapshot creation order - run before image downloads
Reorder processProducts() to create snapshots BEFORE attempting image downloads. Previously, if image downloads hung or failed, the process would be killed before snapshots were created, resulting in 0 snapshots despite successful product upserts. Changes: - Move Step 3 (snapshot creation) before Step 4 (image downloads) - Ensures core crawl data (products + snapshots) is persisted even if images fail - Adds chunked batch processing for improved memory management Tested locally: 771 snapshots created for dispensary 112 with quantity data populated. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,8 @@
|
||||
*
|
||||
* Crawls products from Dutchie dispensaries and stores them in the dutchie_az database.
|
||||
* Handles normalization from GraphQL response to database entities.
|
||||
*
|
||||
* IMPORTANT: Uses chunked batch processing per CLAUDE.md Rule #15 to avoid OOM.
|
||||
*/
|
||||
|
||||
import { query, getClient } from '../db/connection';
|
||||
@@ -22,6 +24,20 @@ import {
|
||||
} from '../types';
|
||||
import { downloadProductImage, imageExists } from '../../utils/image-storage';
|
||||
|
||||
// Explicit column list for dispensaries table (avoids SELECT * issues with schema differences)
|
||||
const DISPENSARY_COLUMNS = `
|
||||
id, name, slug, city, state, zip, address, latitude, longitude,
|
||||
menu_type, menu_url, platform_dispensary_id, website,
|
||||
provider_detection_data, created_at, updated_at
|
||||
`;
|
||||
|
||||
// ============================================================
|
||||
// BATCH PROCESSING CONFIGURATION
|
||||
// ============================================================
|
||||
|
||||
/** Chunk size for batch DB writes (per CLAUDE.md Rule #15) */
|
||||
const BATCH_CHUNK_SIZE = 100;
|
||||
|
||||
// ============================================================
|
||||
// NORMALIZATION FUNCTIONS
|
||||
// ============================================================
|
||||
@@ -528,6 +544,90 @@ async function insertSnapshot(snapshot: Partial<DutchieProductSnapshot>): Promis
|
||||
return result.rows[0].id;
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// BATCH DATABASE OPERATIONS (per CLAUDE.md Rule #15)
|
||||
// ============================================================
|
||||
|
||||
/**
|
||||
* Helper to chunk an array into smaller arrays
|
||||
*/
|
||||
function chunkArray<T>(array: T[], size: number): T[][] {
|
||||
const chunks: T[][] = [];
|
||||
for (let i = 0; i < array.length; i += size) {
|
||||
chunks.push(array.slice(i, i + size));
|
||||
}
|
||||
return chunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Batch upsert products - processes in chunks to avoid OOM
|
||||
* Returns a Map of externalProductId -> database id
|
||||
*/
|
||||
async function batchUpsertProducts(
|
||||
products: Partial<DutchieProduct>[]
|
||||
): Promise<Map<string, number>> {
|
||||
const productIdMap = new Map<string, number>();
|
||||
const chunks = chunkArray(products, BATCH_CHUNK_SIZE);
|
||||
|
||||
console.log(`[ProductCrawler] Batch upserting ${products.length} products in ${chunks.length} chunks of ${BATCH_CHUNK_SIZE}...`);
|
||||
|
||||
for (let i = 0; i < chunks.length; i++) {
|
||||
const chunk = chunks[i];
|
||||
|
||||
// Process each product in the chunk
|
||||
for (const product of chunk) {
|
||||
try {
|
||||
const id = await upsertProduct(product);
|
||||
if (product.externalProductId) {
|
||||
productIdMap.set(product.externalProductId, id);
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error(`[ProductCrawler] Error upserting product ${product.externalProductId}:`, error.message);
|
||||
}
|
||||
}
|
||||
|
||||
// Log progress
|
||||
if ((i + 1) % 5 === 0 || i === chunks.length - 1) {
|
||||
console.log(`[ProductCrawler] Upserted chunk ${i + 1}/${chunks.length} (${productIdMap.size} products so far)`);
|
||||
}
|
||||
}
|
||||
|
||||
return productIdMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Batch insert snapshots - processes in chunks to avoid OOM
|
||||
*/
|
||||
async function batchInsertSnapshots(
|
||||
snapshots: Partial<DutchieProductSnapshot>[]
|
||||
): Promise<number> {
|
||||
const chunks = chunkArray(snapshots, BATCH_CHUNK_SIZE);
|
||||
let inserted = 0;
|
||||
|
||||
console.log(`[ProductCrawler] Batch inserting ${snapshots.length} snapshots in ${chunks.length} chunks of ${BATCH_CHUNK_SIZE}...`);
|
||||
|
||||
for (let i = 0; i < chunks.length; i++) {
|
||||
const chunk = chunks[i];
|
||||
|
||||
// Process each snapshot in the chunk
|
||||
for (const snapshot of chunk) {
|
||||
try {
|
||||
await insertSnapshot(snapshot);
|
||||
inserted++;
|
||||
} catch (error: any) {
|
||||
console.error(`[ProductCrawler] Error inserting snapshot for ${snapshot.externalProductId}:`, error.message);
|
||||
}
|
||||
}
|
||||
|
||||
// Log progress
|
||||
if ((i + 1) % 5 === 0 || i === chunks.length - 1) {
|
||||
console.log(`[ProductCrawler] Inserted snapshot chunk ${i + 1}/${chunks.length} (${inserted} snapshots so far)`);
|
||||
}
|
||||
}
|
||||
|
||||
return inserted;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update dispensary last_crawled_at and product_count
|
||||
*/
|
||||
@@ -563,7 +663,7 @@ async function markMissingProducts(
|
||||
pricingType: 'rec' | 'med'
|
||||
): Promise<number> {
|
||||
// Build UNION of Mode A + Mode B product IDs
|
||||
const unionProductIds = new Set<string>([...modeAProductIds, ...modeBProductIds]);
|
||||
const unionProductIds = new Set<string>([...Array.from(modeAProductIds), ...Array.from(modeBProductIds)]);
|
||||
|
||||
// OUTAGE DETECTION: If union is empty, something went wrong - don't mark anything as missing
|
||||
if (unionProductIds.size === 0) {
|
||||
@@ -593,52 +693,53 @@ async function markMissingProducts(
|
||||
console.log(`[ProductCrawler] Marking ${missingProducts.length} products as missing from feed (union of ${modeAProductIds.size} Mode A + ${modeBProductIds.size} Mode B = ${unionProductIds.size} unique)...`);
|
||||
|
||||
const crawledAt = new Date();
|
||||
let marked = 0;
|
||||
|
||||
for (const product of missingProducts) {
|
||||
try {
|
||||
// Insert a "missing from feed" snapshot
|
||||
await insertSnapshot({
|
||||
dutchieProductId: product.id,
|
||||
dispensaryId,
|
||||
platformDispensaryId,
|
||||
externalProductId: product.external_product_id,
|
||||
pricingType,
|
||||
crawlMode: 'mode_a', // Use mode_a for missing snapshots (convention)
|
||||
status: undefined,
|
||||
featured: false,
|
||||
special: false,
|
||||
medicalOnly: false,
|
||||
recOnly: false,
|
||||
isPresentInFeed: false,
|
||||
stockStatus: 'missing_from_feed',
|
||||
totalQuantityAvailable: undefined, // null = unknown, not 0
|
||||
manualInventory: false,
|
||||
isBelowThreshold: false,
|
||||
isBelowKioskThreshold: false,
|
||||
options: [],
|
||||
rawPayload: { _missingFromFeed: true, lastKnownName: product.name },
|
||||
crawledAt,
|
||||
});
|
||||
// Build all missing snapshots first (per CLAUDE.md Rule #15 - batch writes)
|
||||
const missingSnapshots: Partial<DutchieProductSnapshot>[] = missingProducts.map(product => ({
|
||||
dutchieProductId: product.id,
|
||||
dispensaryId,
|
||||
platformDispensaryId,
|
||||
externalProductId: product.external_product_id,
|
||||
pricingType,
|
||||
crawlMode: 'mode_a' as CrawlMode, // Use mode_a for missing snapshots (convention)
|
||||
status: undefined,
|
||||
featured: false,
|
||||
special: false,
|
||||
medicalOnly: false,
|
||||
recOnly: false,
|
||||
isPresentInFeed: false,
|
||||
stockStatus: 'missing_from_feed' as StockStatus,
|
||||
totalQuantityAvailable: undefined, // null = unknown, not 0
|
||||
manualInventory: false,
|
||||
isBelowThreshold: false,
|
||||
isBelowKioskThreshold: false,
|
||||
options: [],
|
||||
rawPayload: { _missingFromFeed: true, lastKnownName: product.name },
|
||||
crawledAt,
|
||||
}));
|
||||
|
||||
// Update the product's stock status
|
||||
await query(
|
||||
`
|
||||
UPDATE dutchie_products
|
||||
SET stock_status = 'missing_from_feed', total_quantity_available = NULL, updated_at = NOW()
|
||||
WHERE id = $1
|
||||
`,
|
||||
[product.id]
|
||||
);
|
||||
// Batch insert missing snapshots
|
||||
const snapshotsInserted = await batchInsertSnapshots(missingSnapshots);
|
||||
|
||||
marked++;
|
||||
} catch (error: any) {
|
||||
console.error(`[ProductCrawler] Error marking product ${product.external_product_id} as missing:`, error.message);
|
||||
}
|
||||
// Batch update product stock status in chunks
|
||||
const productIds = missingProducts.map(p => p.id);
|
||||
const productChunks = chunkArray(productIds, BATCH_CHUNK_SIZE);
|
||||
|
||||
console.log(`[ProductCrawler] Updating ${productIds.length} product statuses in ${productChunks.length} chunks...`);
|
||||
|
||||
for (const chunk of productChunks) {
|
||||
await query(
|
||||
`
|
||||
UPDATE dutchie_products
|
||||
SET stock_status = 'missing_from_feed', total_quantity_available = NULL, updated_at = NOW()
|
||||
WHERE id = ANY($1::int[])
|
||||
`,
|
||||
[chunk]
|
||||
);
|
||||
}
|
||||
|
||||
console.log(`[ProductCrawler] Marked ${marked} products as missing from feed`);
|
||||
return marked;
|
||||
console.log(`[ProductCrawler] Marked ${snapshotsInserted} products as missing from feed`);
|
||||
return snapshotsInserted;
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
@@ -649,6 +750,7 @@ export interface CrawlResult {
|
||||
success: boolean;
|
||||
dispensaryId: number;
|
||||
productsFound: number;
|
||||
productsFetched: number; // Alias for productsFound (used by worker)
|
||||
productsUpserted: number;
|
||||
snapshotsCreated: number;
|
||||
modeAProducts?: number;
|
||||
@@ -663,6 +765,7 @@ export interface CrawlResult {
|
||||
/**
|
||||
* Process a batch of products from a single crawl mode
|
||||
* IMPORTANT: Stores ALL products, never filters before DB
|
||||
* Uses chunked batch processing per CLAUDE.md Rule #15 to avoid OOM
|
||||
* Returns the set of external product IDs that were processed
|
||||
*/
|
||||
async function processProducts(
|
||||
@@ -673,42 +776,39 @@ async function processProducts(
|
||||
options: { downloadImages?: boolean } = {}
|
||||
): Promise<{ upserted: number; snapshots: number; productIds: Set<string>; imagesDownloaded: number; imageErrors: number }> {
|
||||
const { downloadImages = true } = options;
|
||||
let upserted = 0;
|
||||
let snapshots = 0;
|
||||
const productIds = new Set<string>();
|
||||
let imagesDownloaded = 0;
|
||||
let imageErrors = 0;
|
||||
const productIds = new Set<string>();
|
||||
|
||||
console.log(`[ProductCrawler] Processing ${products.length} products using chunked batch processing...`);
|
||||
|
||||
// Step 1: Normalize all products and collect IDs
|
||||
const normalizedProducts: Partial<DutchieProduct>[] = [];
|
||||
const rawByExternalId = new Map<string, DutchieRawProduct>();
|
||||
|
||||
for (const raw of products) {
|
||||
try {
|
||||
const externalId = raw._id || raw.id || '';
|
||||
productIds.add(externalId);
|
||||
const externalId = raw._id || raw.id || '';
|
||||
productIds.add(externalId);
|
||||
rawByExternalId.set(externalId, raw);
|
||||
|
||||
// Upsert the canonical product - NEVER filter, store everything
|
||||
const normalizedProduct = normalizeProduct(
|
||||
raw,
|
||||
dispensary.id,
|
||||
dispensary.platformDispensaryId!
|
||||
);
|
||||
const productId = await upsertProduct(normalizedProduct);
|
||||
upserted++;
|
||||
const normalized = normalizeProduct(
|
||||
raw,
|
||||
dispensary.id,
|
||||
dispensary.platformDispensaryId!
|
||||
);
|
||||
normalizedProducts.push(normalized);
|
||||
}
|
||||
|
||||
// Download image locally if enabled
|
||||
if (downloadImages && normalizedProduct.primaryImageUrl) {
|
||||
const imageResult = await downloadAndUpdateProductImage(
|
||||
productId,
|
||||
dispensary.id,
|
||||
externalId,
|
||||
normalizedProduct.primaryImageUrl
|
||||
);
|
||||
if (imageResult.downloaded) {
|
||||
imagesDownloaded++;
|
||||
} else if (imageResult.error && imageResult.error !== 'No image URL') {
|
||||
imageErrors++;
|
||||
}
|
||||
}
|
||||
// Step 2: Batch upsert products (chunked)
|
||||
const productIdMap = await batchUpsertProducts(normalizedProducts);
|
||||
const upserted = productIdMap.size;
|
||||
|
||||
// Create snapshot with crawl mode
|
||||
// Step 3: Create and batch insert snapshots (chunked)
|
||||
// IMPORTANT: Do this BEFORE image downloads to ensure snapshots are created even if images fail
|
||||
const snapshots: Partial<DutchieProductSnapshot>[] = [];
|
||||
for (const [externalId, productId] of Array.from(productIdMap.entries())) {
|
||||
const raw = rawByExternalId.get(externalId);
|
||||
if (raw) {
|
||||
const snapshot = normalizeSnapshot(
|
||||
raw,
|
||||
productId,
|
||||
@@ -717,14 +817,52 @@ async function processProducts(
|
||||
pricingType,
|
||||
crawlMode
|
||||
);
|
||||
await insertSnapshot(snapshot);
|
||||
snapshots++;
|
||||
} catch (error: any) {
|
||||
console.error(`[ProductCrawler] Error processing product ${raw._id}:`, error.message);
|
||||
snapshots.push(snapshot);
|
||||
}
|
||||
}
|
||||
|
||||
return { upserted, snapshots, productIds, imagesDownloaded, imageErrors };
|
||||
const snapshotsInserted = await batchInsertSnapshots(snapshots);
|
||||
|
||||
// Step 4: Download images in chunks (if enabled)
|
||||
// This is done AFTER snapshots to ensure core data is saved even if image downloads fail
|
||||
if (downloadImages) {
|
||||
const imageChunks = chunkArray(Array.from(productIdMap.entries()), BATCH_CHUNK_SIZE);
|
||||
console.log(`[ProductCrawler] Downloading images in ${imageChunks.length} chunks...`);
|
||||
|
||||
for (let i = 0; i < imageChunks.length; i++) {
|
||||
const chunk = imageChunks[i];
|
||||
for (const [externalId, productId] of chunk) {
|
||||
const normalized = normalizedProducts.find(p => p.externalProductId === externalId);
|
||||
if (normalized?.primaryImageUrl) {
|
||||
try {
|
||||
const imageResult = await downloadAndUpdateProductImage(
|
||||
productId,
|
||||
dispensary.id,
|
||||
externalId,
|
||||
normalized.primaryImageUrl
|
||||
);
|
||||
if (imageResult.downloaded) {
|
||||
imagesDownloaded++;
|
||||
} else if (imageResult.error && imageResult.error !== 'No image URL') {
|
||||
imageErrors++;
|
||||
}
|
||||
} catch (error: any) {
|
||||
imageErrors++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ((i + 1) % 5 === 0 || i === imageChunks.length - 1) {
|
||||
console.log(`[ProductCrawler] Image download chunk ${i + 1}/${imageChunks.length} (${imagesDownloaded} downloaded, ${imageErrors} errors)`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear references to help GC
|
||||
normalizedProducts.length = 0;
|
||||
rawByExternalId.clear();
|
||||
|
||||
return { upserted, snapshots: snapshotsInserted, productIds, imagesDownloaded, imageErrors };
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -734,12 +872,24 @@ async function processProducts(
|
||||
*
|
||||
* This ensures we capture ALL products including out-of-stock items
|
||||
*/
|
||||
export interface CrawlProgressCallback {
|
||||
productsFound: number;
|
||||
productsUpserted: number;
|
||||
snapshotsCreated: number;
|
||||
currentPage: number;
|
||||
totalPages?: number;
|
||||
}
|
||||
|
||||
export async function crawlDispensaryProducts(
|
||||
dispensary: Dispensary,
|
||||
pricingType: 'rec' | 'med' = 'rec',
|
||||
options: { useBothModes?: boolean; downloadImages?: boolean } = {}
|
||||
options: {
|
||||
useBothModes?: boolean;
|
||||
downloadImages?: boolean;
|
||||
onProgress?: (progress: CrawlProgressCallback) => Promise<void>;
|
||||
} = {}
|
||||
): Promise<CrawlResult> {
|
||||
const { useBothModes = true, downloadImages = true } = options;
|
||||
const { useBothModes = true, downloadImages = true, onProgress } = options;
|
||||
const startTime = Date.now();
|
||||
|
||||
if (!dispensary.platformDispensaryId) {
|
||||
@@ -747,6 +897,7 @@ export async function crawlDispensaryProducts(
|
||||
success: false,
|
||||
dispensaryId: dispensary.id,
|
||||
productsFound: 0,
|
||||
productsFetched: 0,
|
||||
productsUpserted: 0,
|
||||
snapshotsCreated: 0,
|
||||
errorMessage: 'Missing platformDispensaryId',
|
||||
@@ -809,6 +960,17 @@ export async function crawlDispensaryProducts(
|
||||
totalSnapshots = mergedResult.snapshots;
|
||||
totalImagesDownloaded = mergedResult.imagesDownloaded;
|
||||
totalImageErrors = mergedResult.imageErrors;
|
||||
|
||||
// Report progress
|
||||
if (onProgress) {
|
||||
await onProgress({
|
||||
productsFound: bothResults.merged.products.length,
|
||||
productsUpserted: totalUpserted,
|
||||
snapshotsCreated: totalSnapshots,
|
||||
currentPage: 1,
|
||||
totalPages: 1,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Single mode crawl (Mode A only)
|
||||
@@ -830,6 +992,17 @@ export async function crawlDispensaryProducts(
|
||||
totalSnapshots = result.snapshots;
|
||||
totalImagesDownloaded = result.imagesDownloaded;
|
||||
totalImageErrors = result.imageErrors;
|
||||
|
||||
// Report progress
|
||||
if (onProgress) {
|
||||
await onProgress({
|
||||
productsFound: products.length,
|
||||
productsUpserted: totalUpserted,
|
||||
snapshotsCreated: totalSnapshots,
|
||||
currentPage: 1,
|
||||
totalPages: 1,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Mark products as missing using UNION of Mode A + Mode B
|
||||
@@ -848,10 +1021,12 @@ export async function crawlDispensaryProducts(
|
||||
|
||||
console.log(`[ProductCrawler] Completed: ${totalUpserted} products, ${totalSnapshots} snapshots, ${missingMarked} marked missing, ${totalImagesDownloaded} images downloaded`);
|
||||
|
||||
const totalProductsFound = modeAProducts + modeBProducts;
|
||||
return {
|
||||
success: true,
|
||||
dispensaryId: dispensary.id,
|
||||
productsFound: modeAProducts + modeBProducts,
|
||||
productsFound: totalProductsFound,
|
||||
productsFetched: totalProductsFound,
|
||||
productsUpserted: totalUpserted,
|
||||
snapshotsCreated: totalSnapshots,
|
||||
modeAProducts,
|
||||
@@ -867,6 +1042,7 @@ export async function crawlDispensaryProducts(
|
||||
success: false,
|
||||
dispensaryId: dispensary.id,
|
||||
productsFound: 0,
|
||||
productsFetched: 0,
|
||||
productsUpserted: 0,
|
||||
snapshotsCreated: 0,
|
||||
errorMessage: error.message,
|
||||
@@ -886,7 +1062,7 @@ export async function crawlAllArizonaDispensaries(
|
||||
// Get all AZ dispensaries with platform IDs
|
||||
const { rows: rawRows } = await query(
|
||||
`
|
||||
SELECT * FROM dispensaries
|
||||
SELECT ${DISPENSARY_COLUMNS} FROM dispensaries
|
||||
WHERE state = 'AZ' AND menu_type = 'dutchie' AND platform_dispensary_id IS NOT NULL
|
||||
ORDER BY id
|
||||
`
|
||||
|
||||
Reference in New Issue
Block a user