feat(tasks): Dual transport handlers and self-healing product_refresh
- Rename product-discovery.ts to product-discovery-curl.ts (axios-based) - Rename payload-fetch.ts to payload-fetch-curl.ts - Add product-discovery-http.ts (Puppeteer browser-based handler) - Add method field to CreateTaskParams for transport selection - Update task-service to insert method column on task creation - Update task-worker with getHandlerForTask() for dual transport routing - product_refresh now queues upstream tasks when no payload exists: - Has platform_dispensary_id → queues product_discovery (http) - No platform_dispensary_id → queues entry_point_discovery This enables HTTP workers to pick up browser-based tasks while curl workers handle axios-based tasks, and prevents product_refresh from failing repeatedly when no crawl has been performed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -69,9 +69,11 @@ import { runPuppeteerPreflightWithRetry, PuppeteerPreflightResult } from '../ser
|
||||
|
||||
// Task handlers by role
|
||||
// Per TASK_WORKFLOW_2024-12-10.md: payload_fetch and product_refresh are now separate
|
||||
import { handlePayloadFetch } from './handlers/payload-fetch';
|
||||
// Dual-transport: curl vs http (browser-based) handlers
|
||||
import { handlePayloadFetch } from './handlers/payload-fetch-curl';
|
||||
import { handleProductRefresh } from './handlers/product-refresh';
|
||||
import { handleProductDiscovery } from './handlers/product-discovery';
|
||||
import { handleProductDiscovery } from './handlers/product-discovery-curl';
|
||||
import { handleProductDiscoveryHttp } from './handlers/product-discovery-http';
|
||||
import { handleStoreDiscovery } from './handlers/store-discovery';
|
||||
import { handleEntryPointDiscovery } from './handlers/entry-point-discovery';
|
||||
import { handleAnalyticsRefresh } from './handlers/analytics-refresh';
|
||||
@@ -144,17 +146,38 @@ type TaskHandler = (ctx: TaskContext) => Promise<TaskResult>;
|
||||
// Per TASK_WORKFLOW_2024-12-10.md: Handler registry
|
||||
// payload_fetch: Fetches from Dutchie API, saves to disk
|
||||
// product_refresh: Reads local payload, normalizes, upserts to DB
|
||||
// product_discovery: Main handler for product crawling
|
||||
// product_discovery: Main handler for product crawling (has curl and http variants)
|
||||
const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
|
||||
payload_fetch: handlePayloadFetch, // API fetch -> disk
|
||||
payload_fetch: handlePayloadFetch, // API fetch -> disk (curl)
|
||||
product_refresh: handleProductRefresh, // disk -> DB
|
||||
product_discovery: handleProductDiscovery,
|
||||
product_discovery: handleProductDiscovery, // Default: curl (see getHandlerForTask for http override)
|
||||
store_discovery: handleStoreDiscovery,
|
||||
entry_point_discovery: handleEntryPointDiscovery,
|
||||
analytics_refresh: handleAnalyticsRefresh,
|
||||
whoami: handleWhoami, // Tests proxy + anti-detect
|
||||
};
|
||||
|
||||
/**
|
||||
* Get the appropriate handler for a task, considering both role and method.
|
||||
*
|
||||
* For product_discovery:
|
||||
* - method='http' -> handleProductDiscoveryHttp (browser-based, for Evomi proxies)
|
||||
* - method='curl' or unspecified -> handleProductDiscovery (curl-based)
|
||||
*/
|
||||
function getHandlerForTask(task: WorkerTask): TaskHandler | undefined {
|
||||
const role = task.role as TaskRole;
|
||||
const method = task.method || 'curl';
|
||||
|
||||
// Special handling for product_discovery with method='http'
|
||||
if (role === 'product_discovery' && method === 'http') {
|
||||
console.log(`[TaskWorker] Using HTTP handler for product_discovery (method=${method})`);
|
||||
return handleProductDiscoveryHttp;
|
||||
}
|
||||
|
||||
// Default: use the static handler registry
|
||||
return TASK_HANDLERS[role];
|
||||
}
|
||||
|
||||
/**
|
||||
* Resource usage stats reported to the registry and used for backoff decisions.
|
||||
* These values are included in worker heartbeats and displayed in the UI.
|
||||
@@ -783,13 +806,32 @@ export class TaskWorker {
|
||||
console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`);
|
||||
|
||||
// =================================================================
|
||||
// PREFLIGHT CHECK - CRITICAL: Worker MUST pass before task execution
|
||||
// Verifies: 1) Proxy available 2) Proxy connected 3) Anti-detect ready
|
||||
// PREFLIGHT CHECK - Use stored preflight results based on task method
|
||||
// We already ran dual-transport preflights at startup, so just verify
|
||||
// the correct preflight passed for this task's required method.
|
||||
// =================================================================
|
||||
const preflight = await this.crawlRotator.preflight();
|
||||
if (!preflight.passed) {
|
||||
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${preflight.error}`);
|
||||
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without proxy/anti-detect`);
|
||||
const taskMethod = task.method || 'http'; // Default to http if not specified
|
||||
let preflightPassed = false;
|
||||
let preflightMsg = '';
|
||||
|
||||
if (taskMethod === 'http' && this.preflightHttpPassed) {
|
||||
preflightPassed = true;
|
||||
preflightMsg = `HTTP preflight passed (IP: ${this.preflightHttpResult?.proxyIp || 'unknown'})`;
|
||||
} else if (taskMethod === 'curl' && this.preflightCurlPassed) {
|
||||
preflightPassed = true;
|
||||
preflightMsg = `CURL preflight passed (IP: ${this.preflightCurlResult?.proxyIp || 'unknown'})`;
|
||||
} else if (!task.method && (this.preflightHttpPassed || this.preflightCurlPassed)) {
|
||||
// No method preference - either transport works
|
||||
preflightPassed = true;
|
||||
preflightMsg = this.preflightHttpPassed ? 'HTTP preflight passed' : 'CURL preflight passed';
|
||||
}
|
||||
|
||||
if (!preflightPassed) {
|
||||
const errorMsg = taskMethod === 'http'
|
||||
? 'HTTP preflight not passed - cannot execute http tasks'
|
||||
: 'CURL preflight not passed - cannot execute curl tasks';
|
||||
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${errorMsg}`);
|
||||
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without preflight`);
|
||||
|
||||
// Release task back to pending so another worker can pick it up
|
||||
await taskService.releaseTask(task.id);
|
||||
@@ -799,7 +841,7 @@ export class TaskWorker {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[TaskWorker] ${this.friendlyName} preflight PASSED for task ${task.id} (proxy: ${preflight.proxyIp}, ${preflight.responseTimeMs}ms)`);
|
||||
console.log(`[TaskWorker] ${this.friendlyName} preflight verified for task ${task.id}: ${preflightMsg}`);
|
||||
|
||||
this.activeTasks.set(task.id, task);
|
||||
|
||||
@@ -843,8 +885,8 @@ export class TaskWorker {
|
||||
// Mark as running
|
||||
await taskService.startTask(task.id);
|
||||
|
||||
// Get handler for this role
|
||||
const handler = TASK_HANDLERS[task.role];
|
||||
// Get handler for this role (considers method for dual-transport)
|
||||
const handler = getHandlerForTask(task);
|
||||
if (!handler) {
|
||||
throw new Error(`No handler registered for role: ${task.role}`);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user