feat(tasks): Refactor task workflow with payload/refresh separation
Major changes: - Split crawl into payload_fetch (API → disk) and product_refresh (disk → DB) - Add task chaining: store_discovery → product_discovery → payload_fetch → product_refresh - Add payload storage utilities for gzipped JSON on filesystem - Add /api/payloads endpoints for payload access and diffing - Add DB-driven TaskScheduler with schedule persistence - Track newDispensaryIds through discovery promotion for chaining - Add stealth improvements: HTTP fingerprinting, proxy rotation enhancements - Add Workers dashboard K8s scaling controls New files: - src/tasks/handlers/payload-fetch.ts - Fetches from API, saves to disk - src/services/task-scheduler.ts - DB-driven schedule management - src/utils/payload-storage.ts - Payload save/load utilities - src/routes/payloads.ts - Payload API endpoints - src/services/http-fingerprint.ts - Browser fingerprint generation - docs/TASK_WORKFLOW_2024-12-10.md - Complete workflow documentation Migrations: - 078: Proxy consecutive 403 tracking - 079: task_schedules table - 080: raw_crawl_payloads table - 081: payload column and last_fetch_at 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,33 +1,32 @@
|
||||
/**
|
||||
* Product Refresh Handler
|
||||
*
|
||||
* Re-crawls a store to capture price/stock changes using the GraphQL pipeline.
|
||||
* Per TASK_WORKFLOW_2024-12-10.md: Processes a locally-stored payload.
|
||||
*
|
||||
* This handler reads from the filesystem (NOT the Dutchie API).
|
||||
* The payload_fetch handler is responsible for API calls.
|
||||
*
|
||||
* Flow:
|
||||
* 1. Load dispensary info from database
|
||||
* 2. Start stealth session (fingerprint + optional proxy)
|
||||
* 3. Fetch products via GraphQL (Status: 'All')
|
||||
* 4. Normalize data via DutchieNormalizer
|
||||
* 5. Upsert to store_products and store_product_snapshots
|
||||
* 6. Track missing products (increment consecutive_misses, mark OOS at 3)
|
||||
* 7. Download new product images
|
||||
* 8. End session
|
||||
* 1. Load payload from filesystem (by payload_id or latest for dispensary)
|
||||
* 2. Normalize data via DutchieNormalizer
|
||||
* 3. Upsert to store_products and store_product_snapshots
|
||||
* 4. Track missing products (increment consecutive_misses, mark OOS at 3)
|
||||
* 5. Download new product images
|
||||
*
|
||||
* Benefits of separation:
|
||||
* - Retry-friendly: If this fails, re-run without re-crawling
|
||||
* - Replay-able: Run against any historical payload
|
||||
* - Faster: Local file read vs network call
|
||||
*/
|
||||
|
||||
import { TaskContext, TaskResult } from '../task-worker';
|
||||
import {
|
||||
executeGraphQL,
|
||||
startSession,
|
||||
endSession,
|
||||
GRAPHQL_HASHES,
|
||||
DUTCHIE_CONFIG,
|
||||
} from '../../platforms/dutchie';
|
||||
import { DutchieNormalizer } from '../../hydration/normalizers/dutchie';
|
||||
import {
|
||||
upsertStoreProducts,
|
||||
createStoreProductSnapshots,
|
||||
downloadProductImages,
|
||||
} from '../../hydration/canonical-upsert';
|
||||
import { loadRawPayloadById, getLatestPayload } from '../../utils/payload-storage';
|
||||
|
||||
const normalizer = new DutchieNormalizer();
|
||||
|
||||
@@ -47,129 +46,76 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
|
||||
SELECT
|
||||
id, name, platform_dispensary_id, menu_url, menu_type, city, state
|
||||
FROM dispensaries
|
||||
WHERE id = $1 AND crawl_enabled = true
|
||||
WHERE id = $1
|
||||
`, [dispensaryId]);
|
||||
|
||||
if (dispResult.rows.length === 0) {
|
||||
return { success: false, error: `Dispensary ${dispensaryId} not found or not crawl_enabled` };
|
||||
return { success: false, error: `Dispensary ${dispensaryId} not found` };
|
||||
}
|
||||
|
||||
const dispensary = dispResult.rows[0];
|
||||
const platformId = dispensary.platform_dispensary_id;
|
||||
|
||||
if (!platformId) {
|
||||
return { success: false, error: `Dispensary ${dispensaryId} has no platform_dispensary_id` };
|
||||
}
|
||||
|
||||
// Extract cName from menu_url
|
||||
// Extract cName from menu_url for image storage context
|
||||
const cNameMatch = dispensary.menu_url?.match(/\/(?:embedded-menu|dispensary)\/([^/?]+)/);
|
||||
const cName = cNameMatch ? cNameMatch[1] : 'dispensary';
|
||||
|
||||
console.log(`[ProductResync] Starting crawl for ${dispensary.name} (ID: ${dispensaryId})`);
|
||||
console.log(`[ProductResync] Platform ID: ${platformId}, cName: ${cName}`);
|
||||
|
||||
// ============================================================
|
||||
// STEP 2: Start stealth session
|
||||
// ============================================================
|
||||
const session = startSession(dispensary.state || 'AZ', 'America/Phoenix');
|
||||
console.log(`[ProductResync] Session started: ${session.sessionId}`);
|
||||
console.log(`[ProductRefresh] Starting refresh for ${dispensary.name} (ID: ${dispensaryId})`);
|
||||
|
||||
await ctx.heartbeat();
|
||||
|
||||
// ============================================================
|
||||
// STEP 3: Fetch products via GraphQL (Status: 'All')
|
||||
// STEP 2: Load payload from filesystem
|
||||
// Per TASK_WORKFLOW_2024-12-10.md: Read local payload, not API
|
||||
// ============================================================
|
||||
const allProducts: any[] = [];
|
||||
let page = 0;
|
||||
let totalCount = 0;
|
||||
const perPage = DUTCHIE_CONFIG.perPage;
|
||||
const maxPages = DUTCHIE_CONFIG.maxPages;
|
||||
let payloadData: any;
|
||||
let payloadId: number;
|
||||
|
||||
try {
|
||||
while (page < maxPages) {
|
||||
const variables = {
|
||||
includeEnterpriseSpecials: false,
|
||||
productsFilter: {
|
||||
dispensaryId: platformId,
|
||||
pricingType: 'rec',
|
||||
Status: 'All',
|
||||
types: [],
|
||||
useCache: false,
|
||||
isDefaultSort: true,
|
||||
sortBy: 'popularSortIdx',
|
||||
sortDirection: 1,
|
||||
bypassOnlineThresholds: true,
|
||||
isKioskMenu: false,
|
||||
removeProductsBelowOptionThresholds: false,
|
||||
},
|
||||
page,
|
||||
perPage,
|
||||
};
|
||||
// Check if specific payload_id was provided (from task chaining)
|
||||
const taskPayload = task.payload as { payload_id?: number } | null;
|
||||
|
||||
console.log(`[ProductResync] Fetching page ${page + 1}...`);
|
||||
|
||||
const result = await executeGraphQL(
|
||||
'FilteredProducts',
|
||||
variables,
|
||||
GRAPHQL_HASHES.FilteredProducts,
|
||||
{ cName, maxRetries: 3 }
|
||||
);
|
||||
|
||||
const data = result?.data?.filteredProducts;
|
||||
if (!data || !data.products) {
|
||||
if (page === 0) {
|
||||
throw new Error('No product data returned from GraphQL');
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
const products = data.products;
|
||||
allProducts.push(...products);
|
||||
|
||||
if (page === 0) {
|
||||
totalCount = data.queryInfo?.totalCount || products.length;
|
||||
console.log(`[ProductResync] Total products reported: ${totalCount}`);
|
||||
}
|
||||
|
||||
if (allProducts.length >= totalCount || products.length < perPage) {
|
||||
break;
|
||||
}
|
||||
|
||||
page++;
|
||||
|
||||
if (page < maxPages) {
|
||||
await new Promise(r => setTimeout(r, DUTCHIE_CONFIG.pageDelayMs));
|
||||
}
|
||||
|
||||
if (page % 5 === 0) {
|
||||
await ctx.heartbeat();
|
||||
}
|
||||
if (taskPayload?.payload_id) {
|
||||
// Load specific payload (from payload_fetch chaining)
|
||||
const result = await loadRawPayloadById(pool, taskPayload.payload_id);
|
||||
if (!result) {
|
||||
return { success: false, error: `Payload ${taskPayload.payload_id} not found` };
|
||||
}
|
||||
|
||||
console.log(`[ProductResync] Fetched ${allProducts.length} products in ${page + 1} pages`);
|
||||
|
||||
} finally {
|
||||
endSession();
|
||||
payloadData = result.payload;
|
||||
payloadId = result.metadata.id;
|
||||
console.log(`[ProductRefresh] Loaded specific payload #${payloadId}`);
|
||||
} else {
|
||||
// Load latest payload for this dispensary
|
||||
const result = await getLatestPayload(pool, dispensaryId);
|
||||
if (!result) {
|
||||
return { success: false, error: `No payload found for dispensary ${dispensaryId}` };
|
||||
}
|
||||
payloadData = result.payload;
|
||||
payloadId = result.metadata.id;
|
||||
console.log(`[ProductRefresh] Loaded latest payload #${payloadId} (${result.metadata.fetchedAt})`);
|
||||
}
|
||||
|
||||
const allProducts = payloadData.products || [];
|
||||
|
||||
if (allProducts.length === 0) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'No products returned from GraphQL',
|
||||
error: 'Payload contains no products',
|
||||
payloadId,
|
||||
productsProcessed: 0,
|
||||
};
|
||||
}
|
||||
|
||||
console.log(`[ProductRefresh] Processing ${allProducts.length} products from payload #${payloadId}`);
|
||||
|
||||
await ctx.heartbeat();
|
||||
|
||||
// ============================================================
|
||||
// STEP 4: Normalize data
|
||||
// STEP 3: Normalize data
|
||||
// ============================================================
|
||||
console.log(`[ProductResync] Normalizing ${allProducts.length} products...`);
|
||||
console.log(`[ProductRefresh] Normalizing ${allProducts.length} products...`);
|
||||
|
||||
// Build RawPayload for the normalizer
|
||||
const rawPayload = {
|
||||
id: `resync-${dispensaryId}-${Date.now()}`,
|
||||
id: `refresh-${dispensaryId}-${Date.now()}`,
|
||||
dispensary_id: dispensaryId,
|
||||
crawl_run_id: null,
|
||||
platform: 'dutchie',
|
||||
@@ -189,25 +135,26 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
|
||||
const normalizationResult = normalizer.normalize(rawPayload);
|
||||
|
||||
if (normalizationResult.errors.length > 0) {
|
||||
console.warn(`[ProductResync] Normalization warnings: ${normalizationResult.errors.map(e => e.message).join(', ')}`);
|
||||
console.warn(`[ProductRefresh] Normalization warnings: ${normalizationResult.errors.map(e => e.message).join(', ')}`);
|
||||
}
|
||||
|
||||
if (normalizationResult.products.length === 0) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'Normalization produced no products',
|
||||
payloadId,
|
||||
productsProcessed: 0,
|
||||
};
|
||||
}
|
||||
|
||||
console.log(`[ProductResync] Normalized ${normalizationResult.products.length} products`);
|
||||
console.log(`[ProductRefresh] Normalized ${normalizationResult.products.length} products`);
|
||||
|
||||
await ctx.heartbeat();
|
||||
|
||||
// ============================================================
|
||||
// STEP 5: Upsert to canonical tables
|
||||
// STEP 4: Upsert to canonical tables
|
||||
// ============================================================
|
||||
console.log(`[ProductResync] Upserting to store_products...`);
|
||||
console.log(`[ProductRefresh] Upserting to store_products...`);
|
||||
|
||||
const upsertResult = await upsertStoreProducts(
|
||||
pool,
|
||||
@@ -216,12 +163,12 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
|
||||
normalizationResult.availability
|
||||
);
|
||||
|
||||
console.log(`[ProductResync] Upserted: ${upsertResult.upserted} (${upsertResult.new} new, ${upsertResult.updated} updated)`);
|
||||
console.log(`[ProductRefresh] Upserted: ${upsertResult.upserted} (${upsertResult.new} new, ${upsertResult.updated} updated)`);
|
||||
|
||||
await ctx.heartbeat();
|
||||
|
||||
// Create snapshots
|
||||
console.log(`[ProductResync] Creating snapshots...`);
|
||||
console.log(`[ProductRefresh] Creating snapshots...`);
|
||||
|
||||
const snapshotsResult = await createStoreProductSnapshots(
|
||||
pool,
|
||||
@@ -232,12 +179,12 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
|
||||
null // No crawl_run_id in new system
|
||||
);
|
||||
|
||||
console.log(`[ProductResync] Created ${snapshotsResult.created} snapshots`);
|
||||
console.log(`[ProductRefresh] Created ${snapshotsResult.created} snapshots`);
|
||||
|
||||
await ctx.heartbeat();
|
||||
|
||||
// ============================================================
|
||||
// STEP 6: Track missing products (consecutive_misses logic)
|
||||
// STEP 5: Track missing products (consecutive_misses logic)
|
||||
// - Products in feed: reset consecutive_misses to 0
|
||||
// - Products not in feed: increment consecutive_misses
|
||||
// - At 3 consecutive misses: mark as OOS
|
||||
@@ -270,7 +217,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
|
||||
|
||||
const incrementedCount = incrementResult.rowCount || 0;
|
||||
if (incrementedCount > 0) {
|
||||
console.log(`[ProductResync] Incremented consecutive_misses for ${incrementedCount} products`);
|
||||
console.log(`[ProductRefresh] Incremented consecutive_misses for ${incrementedCount} products`);
|
||||
}
|
||||
|
||||
// Mark as OOS any products that hit 3 consecutive misses
|
||||
@@ -286,16 +233,16 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
|
||||
|
||||
const markedOosCount = oosResult.rowCount || 0;
|
||||
if (markedOosCount > 0) {
|
||||
console.log(`[ProductResync] Marked ${markedOosCount} products as OOS (3+ consecutive misses)`);
|
||||
console.log(`[ProductRefresh] Marked ${markedOosCount} products as OOS (3+ consecutive misses)`);
|
||||
}
|
||||
|
||||
await ctx.heartbeat();
|
||||
|
||||
// ============================================================
|
||||
// STEP 7: Download images for new products
|
||||
// STEP 6: Download images for new products
|
||||
// ============================================================
|
||||
if (upsertResult.productsNeedingImages.length > 0) {
|
||||
console.log(`[ProductResync] Downloading images for ${upsertResult.productsNeedingImages.length} products...`);
|
||||
console.log(`[ProductRefresh] Downloading images for ${upsertResult.productsNeedingImages.length} products...`);
|
||||
|
||||
try {
|
||||
const dispensaryContext = {
|
||||
@@ -309,12 +256,12 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
|
||||
);
|
||||
} catch (imgError: any) {
|
||||
// Image download errors shouldn't fail the whole task
|
||||
console.warn(`[ProductResync] Image download error (non-fatal): ${imgError.message}`);
|
||||
console.warn(`[ProductRefresh] Image download error (non-fatal): ${imgError.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
// STEP 8: Update dispensary last_crawl_at
|
||||
// STEP 7: Update dispensary last_crawl_at
|
||||
// ============================================================
|
||||
await pool.query(`
|
||||
UPDATE dispensaries
|
||||
@@ -322,10 +269,20 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
|
||||
WHERE id = $1
|
||||
`, [dispensaryId]);
|
||||
|
||||
console.log(`[ProductResync] Completed ${dispensary.name}`);
|
||||
// ============================================================
|
||||
// STEP 8: Mark payload as processed
|
||||
// ============================================================
|
||||
await pool.query(`
|
||||
UPDATE raw_crawl_payloads
|
||||
SET processed_at = NOW()
|
||||
WHERE id = $1
|
||||
`, [payloadId]);
|
||||
|
||||
console.log(`[ProductRefresh] Completed ${dispensary.name}`);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
payloadId,
|
||||
productsProcessed: normalizationResult.products.length,
|
||||
snapshotsCreated: snapshotsResult.created,
|
||||
newProducts: upsertResult.new,
|
||||
@@ -335,7 +292,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
|
||||
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
|
||||
console.error(`[ProductResync] Error for dispensary ${dispensaryId}:`, errorMessage);
|
||||
console.error(`[ProductRefresh] Error for dispensary ${dispensaryId}:`, errorMessage);
|
||||
return {
|
||||
success: false,
|
||||
error: errorMessage,
|
||||
|
||||
Reference in New Issue
Block a user