feat: Add step reporting to all task handlers

Added updateStep() calls to:
- payload-fetch-curl: loading → preflight → fetching → saving
- product-refresh: loading → normalizing → upserting
- store-discovery-http: starting → preflight → navigating → fetching

This enables real-time visibility of worker progress in the dashboard.
This commit is contained in:
Kelly
2025-12-12 20:14:00 -07:00
parent 63023a4061
commit 4ea7139ed5
3 changed files with 11 additions and 2 deletions

View File

@@ -28,7 +28,7 @@ import { saveRawPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult> {
const { pool, task } = ctx;
const { pool, task, updateStep } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
@@ -39,6 +39,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
updateStep('loading', 'Loading dispensary info');
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
@@ -67,6 +68,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 2: Start stealth session
// ============================================================
updateStep('preflight', 'Starting stealth session');
const session = startSession();
console.log(`[PayloadFetch] Session started: ${session.sessionId}`);
@@ -75,6 +77,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 3: Fetch products via GraphQL (Status: 'All')
// ============================================================
updateStep('fetching', 'Executing GraphQL query');
const allProducts: any[] = [];
let page = 0;
let totalCount = 0;
@@ -162,6 +165,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// STEP 4: Save raw payload to filesystem
// Per TASK_WORKFLOW_2024-12-10.md: Metadata/Payload separation
// ============================================================
updateStep('saving', `Saving ${allProducts.length} products`);
const rawPayload = {
dispensaryId,
platformId,

View File

@@ -188,6 +188,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// ============================================================
// STEP 4: Upsert to canonical tables
// ============================================================
updateStep('upserting', `Saving ${normalizationResult.products.length} products to DB`);
console.log(`[ProductRefresh] Upserting to store_products...`);
const upsertResult = await upsertStoreProducts(

View File

@@ -71,17 +71,19 @@ interface DiscoveredLocation {
}
export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator } = ctx;
const { pool, task, crawlRotator, updateStep } = ctx;
const platform = task.platform || 'dutchie';
let browser: any = null;
try {
updateStep('starting', 'Initializing store discovery');
console.log(`[StoreDiscoveryHTTP] Starting discovery for platform: ${platform}`);
// ============================================================
// STEP 1: Setup Puppeteer with proxy
// ============================================================
updateStep('preflight', 'Launching browser');
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
@@ -126,6 +128,7 @@ export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskRe
// ============================================================
// STEP 2: Establish session by visiting dispensaries page
// ============================================================
updateStep('navigating', 'Loading session page');
const sessionUrl = 'https://dutchie.com/dispensaries';
console.log(`[StoreDiscoveryHTTP] Establishing session at ${sessionUrl}...`);
@@ -174,6 +177,7 @@ export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskRe
// ============================================================
// STEP 4: Fetch cities for each state via GraphQL
// ============================================================
updateStep('fetching', `Fetching cities for ${stateCodesToDiscover.length} states`);
const statesWithCities = await page.evaluate(async (hash: string) => {
const logs: string[] = [];
try {