From 63023a406185007dca873fbfdc5c9b59320c60ba Mon Sep 17 00:00:00 2001 From: Kelly Date: Fri, 12 Dec 2025 20:11:31 -0700 Subject: [PATCH 1/4] feat: Worker improvements and Run Now duplicate prevention - Fix Run Now to prevent duplicate task creation - Add loading state to Run Now button in UI - Return early when no stores need refresh - Worker dashboard improvements - Browser pooling architecture updates - K8s worker config updates (8 replicas, 3 concurrent tasks) --- CLAUDE.md | 25 +- .../docs/_archive/WORKER_TASK_ARCHITECTURE.md | 97 ++++++ backend/src/routes/tasks.ts | 212 +++++++++++- backend/src/routes/worker-registry.ts | 12 +- .../tasks/handlers/product-discovery-http.ts | 7 +- backend/src/tasks/handlers/product-refresh.ts | 5 +- backend/src/tasks/task-service.ts | 67 ++-- backend/src/tasks/task-worker.ts | 108 +++++- cannaiq/src/lib/api.ts | 2 +- cannaiq/src/pages/TasksDashboard.tsx | 28 +- cannaiq/src/pages/WorkersDashboard.tsx | 310 +++++++++++++++++- k8s/scraper-worker.yaml | 175 +--------- 12 files changed, 809 insertions(+), 239 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 57a290ea..ebff9867 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -25,13 +25,26 @@ Never import `src/db/migrate.ts` at runtime. Use `src/db/pool.ts` for DB access. - **Worker** = Concurrent task runner INSIDE a pod (controlled by `MAX_CONCURRENT_TASKS` env var) - Formula: `8 pods × MAX_CONCURRENT_TASKS = total concurrent workers` -**To increase workers:** Change `MAX_CONCURRENT_TASKS` env var, NOT replicas. -```bash -# CORRECT - increase workers per pod -kubectl set env deployment/scraper-worker -n dispensary-scraper MAX_CONCURRENT_TASKS=5 +**Browser Task Memory Limits:** +- Each Puppeteer/Chrome browser uses ~400 MB RAM +- Pod memory limit is 2 GB +- **MAX_CONCURRENT_TASKS=3** is the safe maximum for browser tasks +- More than 3 concurrent browsers per pod = OOM crash -# WRONG - never scale above 8 replicas -kubectl scale deployment/scraper-worker --replicas=20 # NEVER DO THIS +| Browsers | RAM Used | Status | +|----------|----------|--------| +| 3 | ~1.3 GB | Safe (recommended) | +| 4 | ~1.7 GB | Risky | +| 5+ | >2 GB | OOM crash | + +**To increase throughput:** Add more pods (up to 8), NOT more concurrent tasks per pod. + +```bash +# CORRECT - scale pods (up to 8) +kubectl scale deployment/scraper-worker -n dispensary-scraper --replicas=8 + +# WRONG - will cause OOM crashes +kubectl set env deployment/scraper-worker -n dispensary-scraper MAX_CONCURRENT_TASKS=10 ``` **If K8s API returns ServiceUnavailable:** STOP IMMEDIATELY. Do not retry. The cluster is overloaded. diff --git a/backend/docs/_archive/WORKER_TASK_ARCHITECTURE.md b/backend/docs/_archive/WORKER_TASK_ARCHITECTURE.md index 2752c65b..d539abf7 100644 --- a/backend/docs/_archive/WORKER_TASK_ARCHITECTURE.md +++ b/backend/docs/_archive/WORKER_TASK_ARCHITECTURE.md @@ -504,6 +504,103 @@ The Workers Dashboard shows: | `src/routes/worker-registry.ts:148-195` | Heartbeat endpoint handling | | `cannaiq/src/pages/WorkersDashboard.tsx:233-305` | UI components for resources | +## Browser Task Memory Limits (Updated 2025-12) + +Browser-based tasks (Puppeteer/Chrome) have strict memory constraints that limit concurrency. + +### Why Browser Tasks Are Different + +Each browser task launches a Chrome process. Unlike I/O-bound API calls, browsers consume significant RAM: + +| Component | RAM Usage | +|-----------|-----------| +| Node.js runtime | ~150 MB | +| Chrome browser (base) | ~200-250 MB | +| Dutchie menu page (loaded) | ~100-150 MB | +| **Per browser total** | **~350-450 MB** | + +### Memory Math for Pod Limits + +``` +Pod memory limit: 2 GB (2000 MB) +Node.js runtime: -150 MB +Safety buffer: -100 MB +──────────────────────────────── +Available for browsers: 1750 MB + +Per browser + page: ~400 MB + +Max browsers: 1750 ÷ 400 = ~4 browsers + +Recommended: 3 browsers (leaves headroom for spikes) +``` + +### MAX_CONCURRENT_TASKS for Browser Tasks + +| Browsers per Pod | RAM Used | Risk Level | +|------------------|----------|------------| +| 1 | ~500 MB | Very safe | +| 2 | ~900 MB | Safe | +| **3** | **~1.3 GB** | **Recommended** | +| 4 | ~1.7 GB | Tight (may OOM) | +| 5+ | >2 GB | Will OOM crash | + +**CRITICAL**: `MAX_CONCURRENT_TASKS=3` is the maximum safe value for browser tasks with current pod limits. + +### Scaling Strategy + +Scale **horizontally** (more pods) rather than vertically (more concurrency per pod): + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ Cluster: 8 pods × 3 browsers = 24 concurrent tasks │ +│ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ Pod 0 │ │ Pod 1 │ │ Pod 2 │ │ Pod 3 │ │ +│ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ +│ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ Pod 4 │ │ Pod 5 │ │ Pod 6 │ │ Pod 7 │ │ +│ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +### Browser Lifecycle Per Task + +Each task gets a fresh browser with fresh IP/identity: + +``` +1. Claim task from queue +2. Get fresh proxy from pool +3. Launch browser with proxy +4. Run preflight (verify IP) +5. Execute scrape +6. Close browser +7. Repeat +``` + +This ensures: +- Fresh IP per task (proxy rotation) +- Fresh fingerprint per task (UA rotation) +- No cookie/session bleed between tasks +- Predictable memory usage + +### Increasing Capacity + +To handle more concurrent tasks: + +1. **Add more pods** (up to 8 per CLAUDE.md limit) +2. **Increase pod memory** (allows 4 browsers per pod): + ```yaml + resources: + limits: + memory: "2.5Gi" # from 2Gi + ``` + +**DO NOT** simply increase `MAX_CONCURRENT_TASKS` without also increasing pod memory limits. + ## Monitoring ### Logs diff --git a/backend/src/routes/tasks.ts b/backend/src/routes/tasks.ts index 6b8783d1..b2c5a812 100644 --- a/backend/src/routes/tasks.ts +++ b/backend/src/routes/tasks.ts @@ -526,14 +526,17 @@ router.delete('/schedules/:id', async (req: Request, res: Response) => { /** * POST /api/tasks/schedules/:id/run-now * Manually trigger a scheduled task to run immediately + * + * For product_discovery schedules with state_code, this creates individual + * tasks for each store in that state (fans out properly). */ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => { try { const scheduleId = parseInt(req.params.id, 10); - // Get the schedule + // Get the full schedule const scheduleResult = await pool.query(` - SELECT id, name, role, state_code, platform, priority + SELECT id, name, role, state_code, platform, priority, interval_hours, method FROM task_schedules WHERE id = $1 `, [scheduleId]); @@ -542,27 +545,80 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => { } const schedule = scheduleResult.rows[0]; + let tasksCreated = 0; - // Create a task based on the schedule - const task = await taskService.createTask({ - role: schedule.role, - platform: schedule.platform, - priority: schedule.priority + 10, // Boost priority for manual runs - }); + // For product_discovery with state_code, fan out to individual stores + if (schedule.role === 'product_discovery' && schedule.state_code) { + // Find stores in this state needing refresh + const storeResult = await pool.query(` + SELECT d.id + FROM dispensaries d + JOIN states s ON d.state_id = s.id + WHERE d.crawl_enabled = true + AND d.platform_dispensary_id IS NOT NULL + AND s.code = $1 + -- No pending/running product_discovery task already + AND NOT EXISTS ( + SELECT 1 FROM worker_tasks t + WHERE t.dispensary_id = d.id + AND t.role = 'product_discovery' + AND t.status IN ('pending', 'claimed', 'running') + ) + ORDER BY d.last_fetch_at NULLS FIRST, d.id + `, [schedule.state_code]); + + const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id); + + if (dispensaryIds.length > 0) { + // Create staggered tasks for all stores + const result = await taskService.createStaggeredTasks( + dispensaryIds, + 'product_discovery', + 15, // 15 seconds stagger + schedule.platform || 'dutchie', + schedule.method || 'http' + ); + tasksCreated = result.created; + } else { + // No stores need refresh - return early with message + return res.json({ + success: true, + message: `No ${schedule.state_code} stores need refresh at this time`, + tasksCreated: 0, + stateCode: schedule.state_code, + }); + } + } else if (schedule.role !== 'product_discovery') { + // For other schedules (store_discovery, analytics_refresh), create a single task + await taskService.createTask({ + role: schedule.role, + platform: schedule.platform, + priority: schedule.priority + 10, + method: schedule.method, + }); + tasksCreated = 1; + } else { + // product_discovery without state_code - shouldn't happen, reject + return res.status(400).json({ + error: 'product_discovery schedules require a state_code', + }); + } // Update last_run_at on the schedule await pool.query(` UPDATE task_schedules SET last_run_at = NOW(), next_run_at = NOW() + (interval_hours || ' hours')::interval, + last_task_count = $2, updated_at = NOW() WHERE id = $1 - `, [scheduleId]); + `, [scheduleId, tasksCreated]); res.json({ success: true, message: `Schedule "${schedule.name}" triggered`, - task, + tasksCreated, + stateCode: schedule.state_code, }); } catch (error: unknown) { console.error('Error running schedule:', error); @@ -1187,6 +1243,142 @@ router.post('/batch/az-stores', async (req: Request, res: Response) => { } }); +// ============================================================ +// STATE-BASED CRAWL ENDPOINTS +// ============================================================ + +/** + * POST /api/tasks/crawl-state/:stateCode + * Create product_discovery tasks for all stores in a state + * + * This is the primary endpoint for triggering crawls by state. + * Creates staggered tasks for all crawl-enabled stores in the specified state. + * + * Params: + * - stateCode: State code (e.g., 'AZ', 'CA', 'CO') + * + * Body (optional): + * - stagger_seconds: number (default: 15) - Seconds between each task + * - priority: number (default: 10) - Task priority + * - method: 'curl' | 'http' | null (default: 'http') + * + * Returns: + * - tasks_created: Number of tasks created + * - stores_in_state: Total stores found for the state + * - skipped: Number skipped (already have active tasks) + */ +router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => { + try { + const stateCode = req.params.stateCode.toUpperCase(); + const { + stagger_seconds = 15, + priority = 10, + method = 'http', + } = req.body; + + // Verify state exists + const stateResult = await pool.query(` + SELECT id, code, name FROM states WHERE code = $1 + `, [stateCode]); + + if (stateResult.rows.length === 0) { + return res.status(404).json({ + error: 'State not found', + state_code: stateCode, + }); + } + + const state = stateResult.rows[0]; + + // Get all crawl-enabled dispensaries in this state + const dispensariesResult = await pool.query(` + SELECT d.id, d.name + FROM dispensaries d + WHERE d.state_id = $1 + AND d.crawl_enabled = true + AND d.platform_dispensary_id IS NOT NULL + ORDER BY d.last_fetch_at NULLS FIRST, d.id + `, [state.id]); + + if (dispensariesResult.rows.length === 0) { + return res.status(200).json({ + success: true, + message: `No crawl-enabled stores found in ${state.name}`, + state_code: stateCode, + state_name: state.name, + tasks_created: 0, + stores_in_state: 0, + }); + } + + const dispensaryIds = dispensariesResult.rows.map((d: { id: number }) => d.id); + + // Create staggered tasks + const result = await taskService.createStaggeredTasks( + dispensaryIds, + 'product_discovery', + stagger_seconds, + 'dutchie', + method + ); + + const totalDuration = (result.created - 1) * stagger_seconds; + const estimatedEndTime = new Date(Date.now() + totalDuration * 1000); + + res.status(201).json({ + success: true, + state_code: stateCode, + state_name: state.name, + tasks_created: result.created, + stores_in_state: dispensariesResult.rows.length, + skipped: dispensariesResult.rows.length - result.created, + stagger_seconds, + total_duration_seconds: totalDuration, + estimated_completion: estimatedEndTime.toISOString(), + message: `Created ${result.created} product_discovery tasks for ${state.name} (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`, + }); + } catch (error: unknown) { + console.error('Error creating state crawl tasks:', error); + res.status(500).json({ error: 'Failed to create state crawl tasks' }); + } +}); + +/** + * GET /api/tasks/states + * List all states with their store counts and crawl status + */ +router.get('/states', async (_req: Request, res: Response) => { + try { + const result = await pool.query(` + SELECT + s.code, + s.name, + COUNT(d.id)::int as total_stores, + COUNT(d.id) FILTER (WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NOT NULL)::int as crawl_enabled_stores, + COUNT(d.id) FILTER (WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NULL)::int as missing_platform_id, + MAX(d.last_fetch_at) as last_crawl_at, + (SELECT COUNT(*) FROM worker_tasks t + JOIN dispensaries d2 ON t.dispensary_id = d2.id + WHERE d2.state_id = s.id + AND t.role = 'product_discovery' + AND t.status IN ('pending', 'claimed', 'running'))::int as active_tasks + FROM states s + LEFT JOIN dispensaries d ON d.state_id = s.id + GROUP BY s.id, s.code, s.name + HAVING COUNT(d.id) > 0 + ORDER BY COUNT(d.id) DESC + `); + + res.json({ + states: result.rows, + total_states: result.rows.length, + }); + } catch (error: unknown) { + console.error('Error listing states:', error); + res.status(500).json({ error: 'Failed to list states' }); + } +}); + // ============================================================ // TASK POOL MANAGEMENT // ============================================================ diff --git a/backend/src/routes/worker-registry.ts b/backend/src/routes/worker-registry.ts index 3cab0333..088f0b9f 100644 --- a/backend/src/routes/worker-registry.ts +++ b/backend/src/routes/worker-registry.ts @@ -155,7 +155,12 @@ router.post('/heartbeat', async (req: Request, res: Response) => { active_task_count, max_concurrent_tasks, status = 'active', - resources + resources, + // Step tracking fields + current_step, + current_step_detail, + current_step_started_at, + task_steps, } = req.body; if (!worker_id) { @@ -168,6 +173,11 @@ router.post('/heartbeat', async (req: Request, res: Response) => { if (current_task_ids) metadata.current_task_ids = current_task_ids; if (active_task_count !== undefined) metadata.active_task_count = active_task_count; if (max_concurrent_tasks !== undefined) metadata.max_concurrent_tasks = max_concurrent_tasks; + // Step tracking + if (current_step) metadata.current_step = current_step; + if (current_step_detail) metadata.current_step_detail = current_step_detail; + if (current_step_started_at) metadata.current_step_started_at = current_step_started_at; + if (task_steps) metadata.task_steps = task_steps; // Store resources in metadata jsonb column const { rows } = await pool.query(` diff --git a/backend/src/tasks/handlers/product-discovery-http.ts b/backend/src/tasks/handlers/product-discovery-http.ts index ac4aedea..ef14341c 100644 --- a/backend/src/tasks/handlers/product-discovery-http.ts +++ b/backend/src/tasks/handlers/product-discovery-http.ts @@ -27,7 +27,7 @@ import { taskService } from '../task-service'; const FILTERED_PRODUCTS_HASH = 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0'; export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise { - const { pool, task, crawlRotator } = ctx; + const { pool, task, crawlRotator, updateStep } = ctx; const dispensaryId = task.dispensary_id; if (!dispensaryId) { @@ -40,6 +40,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise { const allProducts: any[] = []; const logs: string[] = []; @@ -301,6 +305,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise { - const { pool, task } = ctx; + const { pool, task, updateStep } = ctx; const dispensaryId = task.dispensary_id; if (!dispensaryId) { @@ -43,6 +43,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise pattern.test(errorMessage)); + } + + /** + * Mark a task as failed + * + * Soft failures (timeouts, connection issues): Requeue back to pending for later pickup + * Hard failures (business logic errors): Mark as failed permanently */ async failTask(taskId: number, errorMessage: string): Promise { - // Get current retry state - const result = await pool.query( - `SELECT retry_count, max_retries FROM worker_tasks WHERE id = $1`, - [taskId] - ); + const isSoft = this.isSoftFailure(errorMessage); - if (result.rows.length === 0) { - return false; - } - - const { retry_count, max_retries } = result.rows[0]; - const newRetryCount = (retry_count || 0) + 1; - - if (newRetryCount < (max_retries || 3)) { - // Re-queue for retry - reset to pending with incremented retry_count + if (isSoft) { + // Soft failure: put back in queue immediately for another worker await pool.query( `UPDATE worker_tasks SET status = 'pending', worker_id = NULL, claimed_at = NULL, started_at = NULL, - retry_count = $2, - error_message = $3, + error_message = $2, + scheduled_for = NULL, updated_at = NOW() WHERE id = $1`, - [taskId, newRetryCount, `Retry ${newRetryCount}: ${errorMessage}`] + [taskId, `Requeued: ${errorMessage}`] ); - console.log(`[TaskService] Task ${taskId} queued for retry ${newRetryCount}/${max_retries || 3}`); + console.log(`[TaskService] Task ${taskId} requeued for another worker`); return true; } - // Max retries exceeded - mark as permanently failed + // Hard failure: mark as permanently failed await pool.query( `UPDATE worker_tasks SET status = 'failed', completed_at = NOW(), - retry_count = $2, - error_message = $3 + error_message = $2 WHERE id = $1`, - [taskId, newRetryCount, `Failed after ${newRetryCount} attempts: ${errorMessage}`] + [taskId, `Hard failure: ${errorMessage}`] ); - console.log(`[TaskService] Task ${taskId} permanently failed after ${newRetryCount} attempts`); + console.log(`[TaskService] Task ${taskId} hard failed: ${errorMessage}`); return false; } diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 578b7535..8e4d9f05 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -97,8 +97,12 @@ const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010'; // ============================================================================= // Maximum number of tasks this worker will run concurrently -// Tune based on workload: I/O-bound tasks benefit from higher concurrency -const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '15'); +// Browser tasks (Puppeteer) use ~400MB RAM each. With 2GB pod limit: +// - 3 browsers = ~1.3GB = SAFE +// - 4 browsers = ~1.7GB = RISKY +// - 5+ browsers = OOM CRASH +// See: docs/WORKER_TASK_ARCHITECTURE.md#browser-task-memory-limits +const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3'); // When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks // Default 85% - gives headroom before OOM @@ -131,6 +135,8 @@ export interface TaskContext { task: WorkerTask; heartbeat: () => Promise; crawlRotator?: CrawlRotator; + /** Update the current step being executed (shown in dashboard) */ + updateStep: (step: string, detail?: string) => void; } export interface TaskResult { @@ -264,6 +270,18 @@ export class TaskWorker { private preflightsCompleted: boolean = false; private initializingPromise: Promise | null = null; + // ========================================================================== + // STEP TRACKING FOR DASHBOARD VISIBILITY + // ========================================================================== + // Workers report their current step in heartbeats so the dashboard can show + // real-time progress like "preflight", "loading page", "processing products" + // ========================================================================== + private currentStep: string = 'idle'; + private currentStepDetail: string | null = null; + private currentStepStartedAt: Date | null = null; + /** Map of task ID -> step info for concurrent tasks */ + private taskSteps: Map = new Map(); + constructor(role: TaskRole | null = null, workerId?: string) { this.pool = getPool(); this.role = role; @@ -346,6 +364,65 @@ export class TaskWorker { return this.activeTasks.size < this.maxConcurrentTasks; } + // ========================================================================== + // STEP TRACKING METHODS + // ========================================================================== + + /** + * Update the current step for a task (for dashboard visibility) + * @param taskId - The task ID to update + * @param step - Short step name (e.g., "preflight", "loading", "processing") + * @param detail - Optional detail (e.g., "Verifying IP 1.2.3.4") + */ + public updateTaskStep(taskId: number, step: string, detail?: string): void { + this.taskSteps.set(taskId, { + step, + detail: detail || null, + startedAt: new Date(), + }); + + // Also update the "primary" step for single-task backwards compat + if (this.activeTasks.size === 1 || taskId === Array.from(this.activeTasks.keys())[0]) { + this.currentStep = step; + this.currentStepDetail = detail || null; + this.currentStepStartedAt = new Date(); + } + + console.log(`[TaskWorker] Step: ${step}${detail ? ` - ${detail}` : ''} (task #${taskId})`); + } + + /** + * Clear step tracking for a task (when task completes) + */ + private clearTaskStep(taskId: number): void { + this.taskSteps.delete(taskId); + + // Reset primary step if no more active tasks + if (this.activeTasks.size === 0) { + this.currentStep = 'idle'; + this.currentStepDetail = null; + this.currentStepStartedAt = null; + } + } + + /** + * Get current step info for all active tasks (for heartbeat) + */ + private getTaskStepsInfo(): Array<{ + task_id: number; + step: string; + detail: string | null; + elapsed_ms: number; + }> { + const now = Date.now(); + return Array.from(this.taskSteps.entries()).map(([taskId, info]) => ({ + task_id: taskId, + step: info.step, + detail: info.detail, + elapsed_ms: now - info.startedAt.getTime(), + })); + } + /** * Initialize stealth systems (proxy rotation, fingerprints) * Called LAZILY on first task claim attempt (NOT at worker startup). @@ -635,7 +712,7 @@ export class TaskWorker { } /** - * Send heartbeat to registry with resource usage and proxy location + * Send heartbeat to registry with resource usage, proxy location, and step info */ private async sendRegistryHeartbeat(): Promise { try { @@ -647,6 +724,9 @@ export class TaskWorker { // Get array of active task IDs const activeTaskIds = Array.from(this.activeTasks.keys()); + // Get step info for all active tasks + const taskSteps = this.getTaskStepsInfo(); + await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, @@ -657,6 +737,11 @@ export class TaskWorker { active_task_count: this.activeTasks.size, max_concurrent_tasks: this.maxConcurrentTasks, status: this.activeTasks.size > 0 ? 'active' : 'idle', + // Step tracking for dashboard visibility + current_step: this.currentStep, + current_step_detail: this.currentStepDetail, + current_step_started_at: this.currentStepStartedAt?.toISOString() || null, + task_steps: taskSteps, // Per-task step info for concurrent workers resources: { memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024), memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024), @@ -915,7 +1000,7 @@ export class TaskWorker { throw new Error(`No handler registered for role: ${task.role}`); } - // Create context + // Create context with step tracking const ctx: TaskContext = { pool: this.pool, workerId: this.workerId, @@ -924,12 +1009,21 @@ export class TaskWorker { await taskService.heartbeat(task.id); }, crawlRotator: this.crawlRotator, + updateStep: (step: string, detail?: string) => { + this.updateTaskStep(task.id, step, detail); + }, }; + // Initialize step tracking for this task + this.updateTaskStep(task.id, 'starting', `Initializing ${task.role}`); + // Execute the task const result = await handler(ctx); if (result.success) { + // Clear step tracking + this.clearTaskStep(task.id); + // Mark as completed await taskService.completeTask(task.id, result); await this.reportTaskCompletion(true); @@ -945,12 +1039,18 @@ export class TaskWorker { console.log(`[TaskWorker] Chained new task ${chainedTask.id} (${chainedTask.role})`); } } else { + // Clear step tracking + this.clearTaskStep(task.id); + // Mark as failed await taskService.failTask(task.id, result.error || 'Unknown error'); await this.reportTaskCompletion(false); console.log(`[TaskWorker] ${this.friendlyName} failed task ${task.id}: ${result.error}`); } } catch (error: any) { + // Clear step tracking + this.clearTaskStep(task.id); + // Mark as failed await taskService.failTask(task.id, error.message); await this.reportTaskCompletion(false); diff --git a/cannaiq/src/lib/api.ts b/cannaiq/src/lib/api.ts index 1f6ba742..0f8f4b4f 100755 --- a/cannaiq/src/lib/api.ts +++ b/cannaiq/src/lib/api.ts @@ -3051,7 +3051,7 @@ class ApiClient { } async runTaskScheduleNow(id: number) { - return this.request<{ success: boolean; message: string; task: any }>(`/api/tasks/schedules/${id}/run-now`, { + return this.request<{ success: boolean; message: string; tasksCreated?: number; stateCode?: string }>(`/api/tasks/schedules/${id}/run-now`, { method: 'POST', }); } diff --git a/cannaiq/src/pages/TasksDashboard.tsx b/cannaiq/src/pages/TasksDashboard.tsx index e882462f..65ee5243 100644 --- a/cannaiq/src/pages/TasksDashboard.tsx +++ b/cannaiq/src/pages/TasksDashboard.tsx @@ -718,6 +718,7 @@ export default function TasksDashboard() { const [selectedSchedules, setSelectedSchedules] = useState>(new Set()); const [editingSchedule, setEditingSchedule] = useState(null); const [showScheduleModal, setShowScheduleModal] = useState(false); + const [runningScheduleId, setRunningScheduleId] = useState(null); // Pagination const [page, setPage] = useState(0); @@ -812,13 +813,17 @@ export default function TasksDashboard() { }; const handleRunScheduleNow = async (scheduleId: number) => { + if (runningScheduleId !== null) return; // Prevent duplicate clicks + setRunningScheduleId(scheduleId); try { - const result = await api.runTaskScheduleNow(scheduleId); - alert(result.message); + const result = await api.runTaskScheduleNow(scheduleId) as { success: boolean; message: string; tasksCreated?: number }; + alert(result.message + (result.tasksCreated ? ` (${result.tasksCreated} tasks created)` : '')); fetchData(); } catch (err: any) { console.error('Run schedule error:', err); alert(err.response?.data?.error || 'Failed to run schedule'); + } finally { + setRunningScheduleId(null); } }; @@ -1046,18 +1051,18 @@ export default function TasksDashboard() { {formatDuration(metric.avg_duration_sec)} - {metric.tasks_per_worker_hour?.toFixed(1) || '-'} + {metric.tasks_per_worker_hour ? Number(metric.tasks_per_worker_hour).toFixed(1) : '-'} {metric.estimated_hours_to_drain ? ( 4 + Number(metric.estimated_hours_to_drain) > 4 ? 'text-red-600 font-medium' : 'text-gray-600' } > - {metric.estimated_hours_to_drain.toFixed(1)}h + {Number(metric.estimated_hours_to_drain).toFixed(1)}h ) : ( '-' @@ -1257,10 +1262,17 @@ export default function TasksDashboard() {