Files
cannaiq/backend/src/tasks/handlers/product-refresh.ts
Kelly 4949b22457 feat(tasks): Refactor task workflow with payload/refresh separation
Major changes:
- Split crawl into payload_fetch (API → disk) and product_refresh (disk → DB)
- Add task chaining: store_discovery → product_discovery → payload_fetch → product_refresh
- Add payload storage utilities for gzipped JSON on filesystem
- Add /api/payloads endpoints for payload access and diffing
- Add DB-driven TaskScheduler with schedule persistence
- Track newDispensaryIds through discovery promotion for chaining
- Add stealth improvements: HTTP fingerprinting, proxy rotation enhancements
- Add Workers dashboard K8s scaling controls

New files:
- src/tasks/handlers/payload-fetch.ts - Fetches from API, saves to disk
- src/services/task-scheduler.ts - DB-driven schedule management
- src/utils/payload-storage.ts - Payload save/load utilities
- src/routes/payloads.ts - Payload API endpoints
- src/services/http-fingerprint.ts - Browser fingerprint generation
- docs/TASK_WORKFLOW_2024-12-10.md - Complete workflow documentation

Migrations:
- 078: Proxy consecutive 403 tracking
- 079: task_schedules table
- 080: raw_crawl_payloads table
- 081: payload column and last_fetch_at

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 22:15:35 -07:00

302 lines
10 KiB
TypeScript

/**
* Product Refresh Handler
*
* Per TASK_WORKFLOW_2024-12-10.md: Processes a locally-stored payload.
*
* This handler reads from the filesystem (NOT the Dutchie API).
* The payload_fetch handler is responsible for API calls.
*
* Flow:
* 1. Load payload from filesystem (by payload_id or latest for dispensary)
* 2. Normalize data via DutchieNormalizer
* 3. Upsert to store_products and store_product_snapshots
* 4. Track missing products (increment consecutive_misses, mark OOS at 3)
* 5. Download new product images
*
* Benefits of separation:
* - Retry-friendly: If this fails, re-run without re-crawling
* - Replay-able: Run against any historical payload
* - Faster: Local file read vs network call
*/
import { TaskContext, TaskResult } from '../task-worker';
import { DutchieNormalizer } from '../../hydration/normalizers/dutchie';
import {
upsertStoreProducts,
createStoreProductSnapshots,
downloadProductImages,
} from '../../hydration/canonical-upsert';
import { loadRawPayloadById, getLatestPayload } from '../../utils/payload-storage';
const normalizer = new DutchieNormalizer();
export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult> {
const { pool, task } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
return { success: false, error: 'No dispensary_id specified for product_refresh task' };
}
try {
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
FROM dispensaries
WHERE id = $1
`, [dispensaryId]);
if (dispResult.rows.length === 0) {
return { success: false, error: `Dispensary ${dispensaryId} not found` };
}
const dispensary = dispResult.rows[0];
// Extract cName from menu_url for image storage context
const cNameMatch = dispensary.menu_url?.match(/\/(?:embedded-menu|dispensary)\/([^/?]+)/);
const cName = cNameMatch ? cNameMatch[1] : 'dispensary';
console.log(`[ProductRefresh] Starting refresh for ${dispensary.name} (ID: ${dispensaryId})`);
await ctx.heartbeat();
// ============================================================
// STEP 2: Load payload from filesystem
// Per TASK_WORKFLOW_2024-12-10.md: Read local payload, not API
// ============================================================
let payloadData: any;
let payloadId: number;
// Check if specific payload_id was provided (from task chaining)
const taskPayload = task.payload as { payload_id?: number } | null;
if (taskPayload?.payload_id) {
// Load specific payload (from payload_fetch chaining)
const result = await loadRawPayloadById(pool, taskPayload.payload_id);
if (!result) {
return { success: false, error: `Payload ${taskPayload.payload_id} not found` };
}
payloadData = result.payload;
payloadId = result.metadata.id;
console.log(`[ProductRefresh] Loaded specific payload #${payloadId}`);
} else {
// Load latest payload for this dispensary
const result = await getLatestPayload(pool, dispensaryId);
if (!result) {
return { success: false, error: `No payload found for dispensary ${dispensaryId}` };
}
payloadData = result.payload;
payloadId = result.metadata.id;
console.log(`[ProductRefresh] Loaded latest payload #${payloadId} (${result.metadata.fetchedAt})`);
}
const allProducts = payloadData.products || [];
if (allProducts.length === 0) {
return {
success: false,
error: 'Payload contains no products',
payloadId,
productsProcessed: 0,
};
}
console.log(`[ProductRefresh] Processing ${allProducts.length} products from payload #${payloadId}`);
await ctx.heartbeat();
// ============================================================
// STEP 3: Normalize data
// ============================================================
console.log(`[ProductRefresh] Normalizing ${allProducts.length} products...`);
// Build RawPayload for the normalizer
const rawPayload = {
id: `refresh-${dispensaryId}-${Date.now()}`,
dispensary_id: dispensaryId,
crawl_run_id: null,
platform: 'dutchie',
payload_version: 1,
raw_json: { data: { filteredProducts: { products: allProducts } } },
product_count: allProducts.length,
pricing_type: 'dual',
crawl_mode: 'dual_mode',
fetched_at: new Date(),
processed: false,
normalized_at: null,
hydration_error: null,
hydration_attempts: 0,
created_at: new Date(),
};
const normalizationResult = normalizer.normalize(rawPayload);
if (normalizationResult.errors.length > 0) {
console.warn(`[ProductRefresh] Normalization warnings: ${normalizationResult.errors.map(e => e.message).join(', ')}`);
}
if (normalizationResult.products.length === 0) {
return {
success: false,
error: 'Normalization produced no products',
payloadId,
productsProcessed: 0,
};
}
console.log(`[ProductRefresh] Normalized ${normalizationResult.products.length} products`);
await ctx.heartbeat();
// ============================================================
// STEP 4: Upsert to canonical tables
// ============================================================
console.log(`[ProductRefresh] Upserting to store_products...`);
const upsertResult = await upsertStoreProducts(
pool,
normalizationResult.products,
normalizationResult.pricing,
normalizationResult.availability
);
console.log(`[ProductRefresh] Upserted: ${upsertResult.upserted} (${upsertResult.new} new, ${upsertResult.updated} updated)`);
await ctx.heartbeat();
// Create snapshots
console.log(`[ProductRefresh] Creating snapshots...`);
const snapshotsResult = await createStoreProductSnapshots(
pool,
dispensaryId,
normalizationResult.products,
normalizationResult.pricing,
normalizationResult.availability,
null // No crawl_run_id in new system
);
console.log(`[ProductRefresh] Created ${snapshotsResult.created} snapshots`);
await ctx.heartbeat();
// ============================================================
// STEP 5: Track missing products (consecutive_misses logic)
// - Products in feed: reset consecutive_misses to 0
// - Products not in feed: increment consecutive_misses
// - At 3 consecutive misses: mark as OOS
// ============================================================
const currentProductIds = allProducts
.map((p: any) => p._id || p.id)
.filter(Boolean);
// Reset consecutive_misses for products that ARE in the feed
if (currentProductIds.length > 0) {
await pool.query(`
UPDATE store_products
SET consecutive_misses = 0, last_seen_at = NOW()
WHERE dispensary_id = $1
AND provider = 'dutchie'
AND provider_product_id = ANY($2)
`, [dispensaryId, currentProductIds]);
}
// Increment consecutive_misses for products NOT in the feed
const incrementResult = await pool.query(`
UPDATE store_products
SET consecutive_misses = consecutive_misses + 1
WHERE dispensary_id = $1
AND provider = 'dutchie'
AND provider_product_id NOT IN (SELECT unnest($2::text[]))
AND consecutive_misses < 3
RETURNING id
`, [dispensaryId, currentProductIds]);
const incrementedCount = incrementResult.rowCount || 0;
if (incrementedCount > 0) {
console.log(`[ProductRefresh] Incremented consecutive_misses for ${incrementedCount} products`);
}
// Mark as OOS any products that hit 3 consecutive misses
const oosResult = await pool.query(`
UPDATE store_products
SET stock_status = 'oos', is_in_stock = false
WHERE dispensary_id = $1
AND provider = 'dutchie'
AND consecutive_misses >= 3
AND stock_status != 'oos'
RETURNING id
`, [dispensaryId]);
const markedOosCount = oosResult.rowCount || 0;
if (markedOosCount > 0) {
console.log(`[ProductRefresh] Marked ${markedOosCount} products as OOS (3+ consecutive misses)`);
}
await ctx.heartbeat();
// ============================================================
// STEP 6: Download images for new products
// ============================================================
if (upsertResult.productsNeedingImages.length > 0) {
console.log(`[ProductRefresh] Downloading images for ${upsertResult.productsNeedingImages.length} products...`);
try {
const dispensaryContext = {
stateCode: dispensary.state || 'AZ',
storeSlug: cName,
};
await downloadProductImages(
pool,
upsertResult.productsNeedingImages,
dispensaryContext
);
} catch (imgError: any) {
// Image download errors shouldn't fail the whole task
console.warn(`[ProductRefresh] Image download error (non-fatal): ${imgError.message}`);
}
}
// ============================================================
// STEP 7: Update dispensary last_crawl_at
// ============================================================
await pool.query(`
UPDATE dispensaries
SET last_crawl_at = NOW()
WHERE id = $1
`, [dispensaryId]);
// ============================================================
// STEP 8: Mark payload as processed
// ============================================================
await pool.query(`
UPDATE raw_crawl_payloads
SET processed_at = NOW()
WHERE id = $1
`, [payloadId]);
console.log(`[ProductRefresh] Completed ${dispensary.name}`);
return {
success: true,
payloadId,
productsProcessed: normalizationResult.products.length,
snapshotsCreated: snapshotsResult.created,
newProducts: upsertResult.new,
updatedProducts: upsertResult.updated,
markedOos: markedOosCount,
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[ProductRefresh] Error for dispensary ${dispensaryId}:`, errorMessage);
return {
success: false,
error: errorMessage,
};
}
}