Merge pull request 'feat: Worker improvements and Run Now duplicate prevention' (#64) from feat/minio-payload-storage into master

This commit is contained in:
kelly
2025-12-13 03:35:48 +00:00
15 changed files with 949 additions and 245 deletions

View File

@@ -526,14 +526,17 @@ router.delete('/schedules/:id', async (req: Request, res: Response) => {
/**
* POST /api/tasks/schedules/:id/run-now
* Manually trigger a scheduled task to run immediately
*
* For product_discovery schedules with state_code, this creates individual
* tasks for each store in that state (fans out properly).
*/
router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
// Get the schedule
// Get the full schedule
const scheduleResult = await pool.query(`
SELECT id, name, role, state_code, platform, priority
SELECT id, name, role, state_code, platform, priority, interval_hours, method
FROM task_schedules WHERE id = $1
`, [scheduleId]);
@@ -542,27 +545,80 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
}
const schedule = scheduleResult.rows[0];
let tasksCreated = 0;
// Create a task based on the schedule
const task = await taskService.createTask({
role: schedule.role,
platform: schedule.platform,
priority: schedule.priority + 10, // Boost priority for manual runs
});
// For product_discovery with state_code, fan out to individual stores
if (schedule.role === 'product_discovery' && schedule.state_code) {
// Find stores in this state needing refresh
const storeResult = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND s.code = $1
-- No pending/running product_discovery task already
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role = 'product_discovery'
AND t.status IN ('pending', 'claimed', 'running')
)
ORDER BY d.last_fetch_at NULLS FIRST, d.id
`, [schedule.state_code]);
const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length > 0) {
// Create staggered tasks for all stores
const result = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery',
15, // 15 seconds stagger
schedule.platform || 'dutchie',
schedule.method || 'http'
);
tasksCreated = result.created;
} else {
// No stores need refresh - return early with message
return res.json({
success: true,
message: `No ${schedule.state_code} stores need refresh at this time`,
tasksCreated: 0,
stateCode: schedule.state_code,
});
}
} else if (schedule.role !== 'product_discovery') {
// For other schedules (store_discovery, analytics_refresh), create a single task
await taskService.createTask({
role: schedule.role,
platform: schedule.platform,
priority: schedule.priority + 10,
method: schedule.method,
});
tasksCreated = 1;
} else {
// product_discovery without state_code - shouldn't happen, reject
return res.status(400).json({
error: 'product_discovery schedules require a state_code',
});
}
// Update last_run_at on the schedule
await pool.query(`
UPDATE task_schedules
SET last_run_at = NOW(),
next_run_at = NOW() + (interval_hours || ' hours')::interval,
last_task_count = $2,
updated_at = NOW()
WHERE id = $1
`, [scheduleId]);
`, [scheduleId, tasksCreated]);
res.json({
success: true,
message: `Schedule "${schedule.name}" triggered`,
task,
tasksCreated,
stateCode: schedule.state_code,
});
} catch (error: unknown) {
console.error('Error running schedule:', error);
@@ -1187,6 +1243,225 @@ router.post('/batch/az-stores', async (req: Request, res: Response) => {
}
});
/**
* POST /api/tasks/batch/entry-point-discovery
* Create entry_point_discovery tasks for stores missing platform_dispensary_id
*
* This is idempotent - stores that already have platform_dispensary_id are skipped.
* Only creates tasks for stores with menu_url set and crawl_enabled = true.
*
* Body (optional):
* - state_code: string (optional) - Filter by state code
* - stagger_seconds: number (default: 5) - Seconds between tasks
* - force: boolean (default: false) - Re-run even for previously failed stores
*/
router.post('/batch/entry-point-discovery', async (req: Request, res: Response) => {
try {
const {
state_code,
stagger_seconds = 5,
force = false,
} = req.body;
// Find stores that need entry point discovery
const storeResult = await pool.query(`
SELECT d.id, d.name, d.menu_url
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE d.crawl_enabled = true
AND d.menu_url IS NOT NULL
AND d.platform_dispensary_id IS NULL
${state_code ? 'AND s.code = $1' : ''}
${!force ? "AND (d.id_resolution_status IS NULL OR d.id_resolution_status = 'pending')" : ''}
-- No pending/running entry_point_discovery task already
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role = 'entry_point_discovery'
AND t.status IN ('pending', 'claimed', 'running')
)
ORDER BY d.id
`, state_code ? [state_code.toUpperCase()] : []);
const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length === 0) {
return res.json({
success: true,
message: state_code
? `No ${state_code.toUpperCase()} stores need entry point discovery`
: 'No stores need entry point discovery',
tasks_created: 0,
});
}
// Create staggered tasks
const taskIds: number[] = [];
for (let i = 0; i < dispensaryIds.length; i++) {
const scheduledFor = new Date(Date.now() + i * stagger_seconds * 1000);
const result = await pool.query(`
INSERT INTO worker_tasks (role, dispensary_id, priority, scheduled_for, method)
VALUES ('entry_point_discovery', $1, 10, $2, 'http')
RETURNING id
`, [dispensaryIds[i], scheduledFor]);
taskIds.push(result.rows[0].id);
}
const totalDuration = dispensaryIds.length * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.json({
success: true,
tasks_created: taskIds.length,
task_ids: taskIds,
stores: storeResult.rows.map((r: { id: number; name: string }) => ({ id: r.id, name: r.name })),
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${taskIds.length} entry_point_discovery tasks${state_code ? ` for ${state_code.toUpperCase()}` : ''}`,
});
} catch (error: unknown) {
console.error('Error creating entry point discovery tasks:', error);
res.status(500).json({ error: 'Failed to create entry point discovery tasks' });
}
});
// ============================================================
// STATE-BASED CRAWL ENDPOINTS
// ============================================================
/**
* POST /api/tasks/crawl-state/:stateCode
* Create product_discovery tasks for all stores in a state
*
* This is the primary endpoint for triggering crawls by state.
* Creates staggered tasks for all crawl-enabled stores in the specified state.
*
* Params:
* - stateCode: State code (e.g., 'AZ', 'CA', 'CO')
*
* Body (optional):
* - stagger_seconds: number (default: 15) - Seconds between each task
* - priority: number (default: 10) - Task priority
* - method: 'curl' | 'http' | null (default: 'http')
*
* Returns:
* - tasks_created: Number of tasks created
* - stores_in_state: Total stores found for the state
* - skipped: Number skipped (already have active tasks)
*/
router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => {
try {
const stateCode = req.params.stateCode.toUpperCase();
const {
stagger_seconds = 15,
priority = 10,
method = 'http',
} = req.body;
// Verify state exists
const stateResult = await pool.query(`
SELECT id, code, name FROM states WHERE code = $1
`, [stateCode]);
if (stateResult.rows.length === 0) {
return res.status(404).json({
error: 'State not found',
state_code: stateCode,
});
}
const state = stateResult.rows[0];
// Get all crawl-enabled dispensaries in this state
const dispensariesResult = await pool.query(`
SELECT d.id, d.name
FROM dispensaries d
WHERE d.state_id = $1
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
ORDER BY d.last_fetch_at NULLS FIRST, d.id
`, [state.id]);
if (dispensariesResult.rows.length === 0) {
return res.status(200).json({
success: true,
message: `No crawl-enabled stores found in ${state.name}`,
state_code: stateCode,
state_name: state.name,
tasks_created: 0,
stores_in_state: 0,
});
}
const dispensaryIds = dispensariesResult.rows.map((d: { id: number }) => d.id);
// Create staggered tasks
const result = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery',
stagger_seconds,
'dutchie',
method
);
const totalDuration = (result.created - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
state_code: stateCode,
state_name: state.name,
tasks_created: result.created,
stores_in_state: dispensariesResult.rows.length,
skipped: dispensariesResult.rows.length - result.created,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.created} product_discovery tasks for ${state.name} (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
});
} catch (error: unknown) {
console.error('Error creating state crawl tasks:', error);
res.status(500).json({ error: 'Failed to create state crawl tasks' });
}
});
/**
* GET /api/tasks/states
* List all states with their store counts and crawl status
*/
router.get('/states', async (_req: Request, res: Response) => {
try {
const result = await pool.query(`
SELECT
s.code,
s.name,
COUNT(d.id)::int as total_stores,
COUNT(d.id) FILTER (WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NOT NULL)::int as crawl_enabled_stores,
COUNT(d.id) FILTER (WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NULL)::int as missing_platform_id,
MAX(d.last_fetch_at) as last_crawl_at,
(SELECT COUNT(*) FROM worker_tasks t
JOIN dispensaries d2 ON t.dispensary_id = d2.id
WHERE d2.state_id = s.id
AND t.role = 'product_discovery'
AND t.status IN ('pending', 'claimed', 'running'))::int as active_tasks
FROM states s
LEFT JOIN dispensaries d ON d.state_id = s.id
GROUP BY s.id, s.code, s.name
HAVING COUNT(d.id) > 0
ORDER BY COUNT(d.id) DESC
`);
res.json({
states: result.rows,
total_states: result.rows.length,
});
} catch (error: unknown) {
console.error('Error listing states:', error);
res.status(500).json({ error: 'Failed to list states' });
}
});
// ============================================================
// TASK POOL MANAGEMENT
// ============================================================

View File

@@ -155,7 +155,12 @@ router.post('/heartbeat', async (req: Request, res: Response) => {
active_task_count,
max_concurrent_tasks,
status = 'active',
resources
resources,
// Step tracking fields
current_step,
current_step_detail,
current_step_started_at,
task_steps,
} = req.body;
if (!worker_id) {
@@ -168,6 +173,11 @@ router.post('/heartbeat', async (req: Request, res: Response) => {
if (current_task_ids) metadata.current_task_ids = current_task_ids;
if (active_task_count !== undefined) metadata.active_task_count = active_task_count;
if (max_concurrent_tasks !== undefined) metadata.max_concurrent_tasks = max_concurrent_tasks;
// Step tracking
if (current_step) metadata.current_step = current_step;
if (current_step_detail) metadata.current_step_detail = current_step_detail;
if (current_step_started_at) metadata.current_step_started_at = current_step_started_at;
if (task_steps) metadata.task_steps = task_steps;
// Store resources in metadata jsonb column
const { rows } = await pool.query(`

View File

@@ -41,9 +41,16 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
const dispensary = dispResult.rows[0];
// If already has platform_dispensary_id, we're done
// If already has platform_dispensary_id, we're done (idempotent)
if (dispensary.platform_dispensary_id) {
console.log(`[EntryPointDiscovery] Dispensary ${dispensaryId} already has platform ID: ${dispensary.platform_dispensary_id}`);
// Update last_id_resolution_at to show we checked it
await pool.query(`
UPDATE dispensaries
SET last_id_resolution_at = NOW(),
id_resolution_status = 'resolved'
WHERE id = $1
`, [dispensaryId]);
return {
success: true,
alreadyResolved: true,
@@ -51,6 +58,15 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
};
}
// Increment attempt counter
await pool.query(`
UPDATE dispensaries
SET id_resolution_attempts = COALESCE(id_resolution_attempts, 0) + 1,
last_id_resolution_at = NOW(),
id_resolution_status = 'pending'
WHERE id = $1
`, [dispensaryId]);
const menuUrl = dispensary.menu_url;
if (!menuUrl) {
return { success: false, error: `Dispensary ${dispensaryId} has no menu_url` };
@@ -114,7 +130,7 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
console.log(`[EntryPointDiscovery] Failed to resolve ${slug}: ${reason}`);
// Mark as failed resolution but keep menu_type as dutchie
// Mark as failed resolution
await pool.query(`
UPDATE dispensaries
SET
@@ -123,9 +139,11 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
WHEN $2 = 403 THEN 'blocked'
ELSE 'dutchie'
END,
id_resolution_status = 'failed',
id_resolution_error = $3,
updated_at = NOW()
WHERE id = $1
`, [dispensaryId, result.httpStatus || 0]);
`, [dispensaryId, result.httpStatus || 0, reason]);
return {
success: false,
@@ -149,6 +167,8 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
platform_dispensary_id = $2,
menu_type = 'dutchie',
crawl_enabled = true,
id_resolution_status = 'resolved',
id_resolution_error = NULL,
updated_at = NOW()
WHERE id = $1
`, [dispensaryId, platformId]);

View File

@@ -28,7 +28,7 @@ import { saveRawPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult> {
const { pool, task } = ctx;
const { pool, task, updateStep } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
@@ -39,6 +39,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
updateStep('loading', 'Loading dispensary info');
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
@@ -67,6 +68,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 2: Start stealth session
// ============================================================
updateStep('preflight', 'Starting stealth session');
const session = startSession();
console.log(`[PayloadFetch] Session started: ${session.sessionId}`);
@@ -75,6 +77,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 3: Fetch products via GraphQL (Status: 'All')
// ============================================================
updateStep('fetching', 'Executing GraphQL query');
const allProducts: any[] = [];
let page = 0;
let totalCount = 0;
@@ -162,6 +165,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// STEP 4: Save raw payload to filesystem
// Per TASK_WORKFLOW_2024-12-10.md: Metadata/Payload separation
// ============================================================
updateStep('saving', `Saving ${allProducts.length} products`);
const rawPayload = {
dispensaryId,
platformId,

View File

@@ -27,7 +27,7 @@ import { taskService } from '../task-service';
const FILTERED_PRODUCTS_HASH = 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0';
export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator } = ctx;
const { pool, task, crawlRotator, updateStep } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
@@ -40,6 +40,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
updateStep('loading', 'Loading dispensary info');
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
@@ -70,6 +71,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
// ============================================================
// STEP 2: Setup Puppeteer with proxy
// ============================================================
updateStep('preflight', `Launching browser for ${dispensary.name}`);
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
@@ -114,6 +116,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
// ============================================================
// STEP 3: Establish session by visiting embedded menu
// ============================================================
updateStep('navigating', `Loading menu page`);
const embedUrl = `https://dutchie.com/embedded-menu/${cName}?menuType=rec`;
console.log(`[ProductDiscoveryHTTP] Establishing session at ${embedUrl}...`);
@@ -178,6 +181,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
// ============================================================
// STEP 4: Fetch ALL products via GraphQL from browser context
// ============================================================
updateStep('fetching', `Executing GraphQL query`);
const result = await page.evaluate(async (platformId: string, graphqlHash: string) => {
const allProducts: any[] = [];
const logs: string[] = [];
@@ -301,6 +305,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
// ============================================================
// STEP 5: Save raw payload to filesystem
// ============================================================
updateStep('saving', `Saving ${result.products.length} products`);
const rawPayload = {
dispensaryId,
platformId,

View File

@@ -32,7 +32,7 @@ import { taskService } from '../task-service';
const normalizer = new DutchieNormalizer();
export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult> {
const { pool, task } = ctx;
const { pool, task, updateStep } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
@@ -43,6 +43,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
updateStep('loading', 'Loading dispensary info');
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
@@ -68,6 +69,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// STEP 2: Load payload from filesystem
// Per TASK_WORKFLOW_2024-12-10.md: Read local payload, not API
// ============================================================
updateStep('loading', 'Loading payload from storage');
let payloadData: any;
let payloadId: number;
@@ -142,6 +144,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// ============================================================
// STEP 3: Normalize data
// ============================================================
updateStep('normalizing', `Normalizing ${allProducts.length} products`);
console.log(`[ProductRefresh] Normalizing ${allProducts.length} products...`);
// Build RawPayload for the normalizer
@@ -185,6 +188,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// ============================================================
// STEP 4: Upsert to canonical tables
// ============================================================
updateStep('upserting', `Saving ${normalizationResult.products.length} products to DB`);
console.log(`[ProductRefresh] Upserting to store_products...`);
const upsertResult = await upsertStoreProducts(

View File

@@ -71,17 +71,19 @@ interface DiscoveredLocation {
}
export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator } = ctx;
const { pool, task, crawlRotator, updateStep } = ctx;
const platform = task.platform || 'dutchie';
let browser: any = null;
try {
updateStep('starting', 'Initializing store discovery');
console.log(`[StoreDiscoveryHTTP] Starting discovery for platform: ${platform}`);
// ============================================================
// STEP 1: Setup Puppeteer with proxy
// ============================================================
updateStep('preflight', 'Launching browser');
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
@@ -126,6 +128,7 @@ export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskRe
// ============================================================
// STEP 2: Establish session by visiting dispensaries page
// ============================================================
updateStep('navigating', 'Loading session page');
const sessionUrl = 'https://dutchie.com/dispensaries';
console.log(`[StoreDiscoveryHTTP] Establishing session at ${sessionUrl}...`);
@@ -174,6 +177,7 @@ export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskRe
// ============================================================
// STEP 4: Fetch cities for each state via GraphQL
// ============================================================
updateStep('fetching', `Fetching cities for ${stateCodesToDiscover.length} states`);
const statesWithCities = await page.evaluate(async (hash: string) => {
const logs: string[] = [];
try {

View File

@@ -271,52 +271,69 @@ class TaskService {
}
/**
* Mark a task as failed, with auto-retry if under max_retries
* Returns true if task was re-queued for retry, false if permanently failed
* Determine if an error is a "soft failure" (transient) that should be requeued
* Soft failures: timeouts, connection issues, browser launch issues
* Hard failures: business logic errors like "No products returned"
*/
private isSoftFailure(errorMessage: string): boolean {
const softFailurePatterns = [
/timeout/i,
/timed out/i,
/connection.*terminated/i,
/connection.*refused/i,
/ECONNRESET/i,
/ECONNREFUSED/i,
/ETIMEDOUT/i,
/socket hang up/i,
/WS endpoint/i,
/browser process/i,
/Failed to launch/i,
/Navigation.*exceeded/i,
/net::ERR_/i,
/ENOENT.*storage/i, // Storage path issues (transient)
/ENOENT.*payload/i, // Payload path issues (transient)
];
return softFailurePatterns.some(pattern => pattern.test(errorMessage));
}
/**
* Mark a task as failed
*
* Soft failures (timeouts, connection issues): Requeue back to pending for later pickup
* Hard failures (business logic errors): Mark as failed permanently
*/
async failTask(taskId: number, errorMessage: string): Promise<boolean> {
// Get current retry state
const result = await pool.query(
`SELECT retry_count, max_retries FROM worker_tasks WHERE id = $1`,
[taskId]
);
const isSoft = this.isSoftFailure(errorMessage);
if (result.rows.length === 0) {
return false;
}
const { retry_count, max_retries } = result.rows[0];
const newRetryCount = (retry_count || 0) + 1;
if (newRetryCount < (max_retries || 3)) {
// Re-queue for retry - reset to pending with incremented retry_count
if (isSoft) {
// Soft failure: put back in queue immediately for another worker
await pool.query(
`UPDATE worker_tasks
SET status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
retry_count = $2,
error_message = $3,
error_message = $2,
scheduled_for = NULL,
updated_at = NOW()
WHERE id = $1`,
[taskId, newRetryCount, `Retry ${newRetryCount}: ${errorMessage}`]
[taskId, `Requeued: ${errorMessage}`]
);
console.log(`[TaskService] Task ${taskId} queued for retry ${newRetryCount}/${max_retries || 3}`);
console.log(`[TaskService] Task ${taskId} requeued for another worker`);
return true;
}
// Max retries exceeded - mark as permanently failed
// Hard failure: mark as permanently failed
await pool.query(
`UPDATE worker_tasks
SET status = 'failed',
completed_at = NOW(),
retry_count = $2,
error_message = $3
error_message = $2
WHERE id = $1`,
[taskId, newRetryCount, `Failed after ${newRetryCount} attempts: ${errorMessage}`]
[taskId, `Hard failure: ${errorMessage}`]
);
console.log(`[TaskService] Task ${taskId} permanently failed after ${newRetryCount} attempts`);
console.log(`[TaskService] Task ${taskId} hard failed: ${errorMessage}`);
return false;
}

View File

@@ -97,8 +97,12 @@ const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010';
// =============================================================================
// Maximum number of tasks this worker will run concurrently
// Tune based on workload: I/O-bound tasks benefit from higher concurrency
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '15');
// Browser tasks (Puppeteer) use ~400MB RAM each. With 2GB pod limit:
// - 3 browsers = ~1.3GB = SAFE
// - 4 browsers = ~1.7GB = RISKY
// - 5+ browsers = OOM CRASH
// See: docs/WORKER_TASK_ARCHITECTURE.md#browser-task-memory-limits
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
// When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
// Default 85% - gives headroom before OOM
@@ -131,6 +135,8 @@ export interface TaskContext {
task: WorkerTask;
heartbeat: () => Promise<void>;
crawlRotator?: CrawlRotator;
/** Update the current step being executed (shown in dashboard) */
updateStep: (step: string, detail?: string) => void;
}
export interface TaskResult {
@@ -264,6 +270,18 @@ export class TaskWorker {
private preflightsCompleted: boolean = false;
private initializingPromise: Promise<void> | null = null;
// ==========================================================================
// STEP TRACKING FOR DASHBOARD VISIBILITY
// ==========================================================================
// Workers report their current step in heartbeats so the dashboard can show
// real-time progress like "preflight", "loading page", "processing products"
// ==========================================================================
private currentStep: string = 'idle';
private currentStepDetail: string | null = null;
private currentStepStartedAt: Date | null = null;
/** Map of task ID -> step info for concurrent tasks */
private taskSteps: Map<number, { step: string; detail: string | null; startedAt: Date }> = new Map();
constructor(role: TaskRole | null = null, workerId?: string) {
this.pool = getPool();
this.role = role;
@@ -346,6 +364,65 @@ export class TaskWorker {
return this.activeTasks.size < this.maxConcurrentTasks;
}
// ==========================================================================
// STEP TRACKING METHODS
// ==========================================================================
/**
* Update the current step for a task (for dashboard visibility)
* @param taskId - The task ID to update
* @param step - Short step name (e.g., "preflight", "loading", "processing")
* @param detail - Optional detail (e.g., "Verifying IP 1.2.3.4")
*/
public updateTaskStep(taskId: number, step: string, detail?: string): void {
this.taskSteps.set(taskId, {
step,
detail: detail || null,
startedAt: new Date(),
});
// Also update the "primary" step for single-task backwards compat
if (this.activeTasks.size === 1 || taskId === Array.from(this.activeTasks.keys())[0]) {
this.currentStep = step;
this.currentStepDetail = detail || null;
this.currentStepStartedAt = new Date();
}
console.log(`[TaskWorker] Step: ${step}${detail ? ` - ${detail}` : ''} (task #${taskId})`);
}
/**
* Clear step tracking for a task (when task completes)
*/
private clearTaskStep(taskId: number): void {
this.taskSteps.delete(taskId);
// Reset primary step if no more active tasks
if (this.activeTasks.size === 0) {
this.currentStep = 'idle';
this.currentStepDetail = null;
this.currentStepStartedAt = null;
}
}
/**
* Get current step info for all active tasks (for heartbeat)
*/
private getTaskStepsInfo(): Array<{
task_id: number;
step: string;
detail: string | null;
elapsed_ms: number;
}> {
const now = Date.now();
return Array.from(this.taskSteps.entries()).map(([taskId, info]) => ({
task_id: taskId,
step: info.step,
detail: info.detail,
elapsed_ms: now - info.startedAt.getTime(),
}));
}
/**
* Initialize stealth systems (proxy rotation, fingerprints)
* Called LAZILY on first task claim attempt (NOT at worker startup).
@@ -635,7 +712,7 @@ export class TaskWorker {
}
/**
* Send heartbeat to registry with resource usage and proxy location
* Send heartbeat to registry with resource usage, proxy location, and step info
*/
private async sendRegistryHeartbeat(): Promise<void> {
try {
@@ -647,6 +724,9 @@ export class TaskWorker {
// Get array of active task IDs
const activeTaskIds = Array.from(this.activeTasks.keys());
// Get step info for all active tasks
const taskSteps = this.getTaskStepsInfo();
await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
@@ -657,6 +737,11 @@ export class TaskWorker {
active_task_count: this.activeTasks.size,
max_concurrent_tasks: this.maxConcurrentTasks,
status: this.activeTasks.size > 0 ? 'active' : 'idle',
// Step tracking for dashboard visibility
current_step: this.currentStep,
current_step_detail: this.currentStepDetail,
current_step_started_at: this.currentStepStartedAt?.toISOString() || null,
task_steps: taskSteps, // Per-task step info for concurrent workers
resources: {
memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024),
memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024),
@@ -915,7 +1000,7 @@ export class TaskWorker {
throw new Error(`No handler registered for role: ${task.role}`);
}
// Create context
// Create context with step tracking
const ctx: TaskContext = {
pool: this.pool,
workerId: this.workerId,
@@ -924,12 +1009,21 @@ export class TaskWorker {
await taskService.heartbeat(task.id);
},
crawlRotator: this.crawlRotator,
updateStep: (step: string, detail?: string) => {
this.updateTaskStep(task.id, step, detail);
},
};
// Initialize step tracking for this task
this.updateTaskStep(task.id, 'starting', `Initializing ${task.role}`);
// Execute the task
const result = await handler(ctx);
if (result.success) {
// Clear step tracking
this.clearTaskStep(task.id);
// Mark as completed
await taskService.completeTask(task.id, result);
await this.reportTaskCompletion(true);
@@ -945,12 +1039,18 @@ export class TaskWorker {
console.log(`[TaskWorker] Chained new task ${chainedTask.id} (${chainedTask.role})`);
}
} else {
// Clear step tracking
this.clearTaskStep(task.id);
// Mark as failed
await taskService.failTask(task.id, result.error || 'Unknown error');
await this.reportTaskCompletion(false);
console.log(`[TaskWorker] ${this.friendlyName} failed task ${task.id}: ${result.error}`);
}
} catch (error: any) {
// Clear step tracking
this.clearTaskStep(task.id);
// Mark as failed
await taskService.failTask(task.id, error.message);
await this.reportTaskCompletion(false);