diff --git a/backend/docs/WORKER_TASK_ARCHITECTURE.md b/backend/docs/WORKER_TASK_ARCHITECTURE.md index bb53eacf..2752c65b 100644 --- a/backend/docs/WORKER_TASK_ARCHITECTURE.md +++ b/backend/docs/WORKER_TASK_ARCHITECTURE.md @@ -362,6 +362,148 @@ SET status = 'pending', retry_count = retry_count + 1 WHERE status = 'failed' AND retry_count < max_retries; ``` +## Concurrent Task Processing (Added 2024-12) + +Workers can now process multiple tasks concurrently within a single worker instance. This improves throughput by utilizing async I/O efficiently. + +### Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Pod (K8s) │ +│ │ +│ ┌─────────────────────────────────────────────────────┐ │ +│ │ TaskWorker │ │ +│ │ │ │ +│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ +│ │ │ Task 1 │ │ Task 2 │ │ Task 3 │ (concurrent)│ │ +│ │ └─────────┘ └─────────┘ └─────────┘ │ │ +│ │ │ │ +│ │ Resource Monitor │ │ +│ │ ├── Memory: 65% (threshold: 85%) │ │ +│ │ ├── CPU: 45% (threshold: 90%) │ │ +│ │ └── Status: Normal │ │ +│ └─────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `MAX_CONCURRENT_TASKS` | 3 | Maximum tasks a worker will run concurrently | +| `MEMORY_BACKOFF_THRESHOLD` | 0.85 | Back off when heap memory exceeds 85% | +| `CPU_BACKOFF_THRESHOLD` | 0.90 | Back off when CPU exceeds 90% | +| `BACKOFF_DURATION_MS` | 10000 | How long to wait when backing off (10s) | + +### How It Works + +1. **Main Loop**: Worker continuously tries to fill up to `MAX_CONCURRENT_TASKS` +2. **Resource Monitoring**: Before claiming a new task, worker checks memory and CPU +3. **Backoff**: If resources exceed thresholds, worker pauses and stops claiming new tasks +4. **Concurrent Execution**: Tasks run in parallel using `Promise` - they don't block each other +5. **Graceful Shutdown**: On SIGTERM/decommission, worker stops claiming but waits for active tasks + +### Resource Monitoring + +```typescript +// ResourceStats interface +interface ResourceStats { + memoryPercent: number; // Current heap usage as decimal (0.0-1.0) + memoryMb: number; // Current heap used in MB + memoryTotalMb: number; // Total heap available in MB + cpuPercent: number; // CPU usage as percentage (0-100) + isBackingOff: boolean; // True if worker is in backoff state + backoffReason: string; // Why the worker is backing off +} +``` + +### Heartbeat Data + +Workers report the following in their heartbeat: + +```json +{ + "worker_id": "worker-abc123", + "current_task_id": 456, + "current_task_ids": [456, 457, 458], + "active_task_count": 3, + "max_concurrent_tasks": 3, + "status": "active", + "resources": { + "memory_mb": 256, + "memory_total_mb": 512, + "memory_rss_mb": 320, + "memory_percent": 50, + "cpu_user_ms": 12500, + "cpu_system_ms": 3200, + "cpu_percent": 45, + "is_backing_off": false, + "backoff_reason": null + } +} +``` + +### Backoff Behavior + +When resources exceed thresholds: + +1. Worker logs the backoff reason: + ``` + [TaskWorker] MyWorker backing off: Memory at 87.3% (threshold: 85%) + ``` + +2. Worker stops claiming new tasks but continues existing tasks + +3. After `BACKOFF_DURATION_MS`, worker rechecks resources + +4. When resources return to normal: + ``` + [TaskWorker] MyWorker resuming normal operation + ``` + +### UI Display + +The Workers Dashboard shows: + +- **Tasks Column**: `2/3 tasks` (active/max concurrent) +- **Resources Column**: Memory % and CPU % with color coding + - Green: < 50% + - Yellow: 50-74% + - Amber: 75-89% + - Red: 90%+ +- **Backing Off**: Orange warning badge when worker is in backoff state + +### Task Count Badge Details + +``` +┌─────────────────────────────────────────────┐ +│ Worker: "MyWorker" │ +│ Tasks: 2/3 tasks #456, #457 │ +│ Resources: 🧠 65% 💻 45% │ +│ Status: ● Active │ +└─────────────────────────────────────────────┘ +``` + +### Best Practices + +1. **Start Conservative**: Use `MAX_CONCURRENT_TASKS=3` initially +2. **Monitor Resources**: Watch for frequent backoffs in logs +3. **Tune Per Workload**: I/O-bound tasks benefit from higher concurrency +4. **Scale Horizontally**: Add more pods rather than cranking concurrency too high + +### Code References + +| File | Purpose | +|------|---------| +| `src/tasks/task-worker.ts:68-71` | Concurrency environment variables | +| `src/tasks/task-worker.ts:104-111` | ResourceStats interface | +| `src/tasks/task-worker.ts:149-179` | getResourceStats() method | +| `src/tasks/task-worker.ts:184-196` | shouldBackOff() method | +| `src/tasks/task-worker.ts:462-516` | mainLoop() with concurrent claiming | +| `src/routes/worker-registry.ts:148-195` | Heartbeat endpoint handling | +| `cannaiq/src/pages/WorkersDashboard.tsx:233-305` | UI components for resources | + ## Monitoring ### Logs diff --git a/backend/migrations/074_worker_commands.sql b/backend/migrations/074_worker_commands.sql new file mode 100644 index 00000000..39ad1c72 --- /dev/null +++ b/backend/migrations/074_worker_commands.sql @@ -0,0 +1,27 @@ +-- Migration: Worker Commands Table +-- Purpose: Store commands for workers (decommission, etc.) +-- Workers poll this table after each task to check for commands + +CREATE TABLE IF NOT EXISTS worker_commands ( + id SERIAL PRIMARY KEY, + worker_id TEXT NOT NULL, + command TEXT NOT NULL, -- 'decommission', 'pause', 'resume' + reason TEXT, + issued_by TEXT, + issued_at TIMESTAMPTZ DEFAULT NOW(), + acknowledged_at TIMESTAMPTZ, + executed_at TIMESTAMPTZ, + status TEXT DEFAULT 'pending' -- 'pending', 'acknowledged', 'executed', 'cancelled' +); + +-- Index for worker lookups +CREATE INDEX IF NOT EXISTS idx_worker_commands_worker_id ON worker_commands(worker_id); +CREATE INDEX IF NOT EXISTS idx_worker_commands_pending ON worker_commands(worker_id, status) WHERE status = 'pending'; + +-- Add decommission_requested column to worker_registry for quick checks +ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS decommission_requested BOOLEAN DEFAULT FALSE; +ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS decommission_reason TEXT; +ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS decommission_requested_at TIMESTAMPTZ; + +-- Comment +COMMENT ON TABLE worker_commands IS 'Commands issued to workers (decommission after task, pause, etc.)'; diff --git a/backend/src/routes/worker-registry.ts b/backend/src/routes/worker-registry.ts index a46698f4..264ed3d1 100644 --- a/backend/src/routes/worker-registry.ts +++ b/backend/src/routes/worker-registry.ts @@ -138,17 +138,36 @@ router.post('/register', async (req: Request, res: Response) => { * * Body: * - worker_id: string (required) - * - current_task_id: number (optional) - task currently being processed + * - current_task_id: number (optional) - task currently being processed (primary task) + * - current_task_ids: number[] (optional) - all tasks currently being processed (concurrent) + * - active_task_count: number (optional) - number of tasks currently running + * - max_concurrent_tasks: number (optional) - max concurrent tasks this worker can handle * - status: string (optional) - 'active', 'idle' + * - resources: object (optional) - memory_mb, cpu_user_ms, cpu_system_ms, etc. */ router.post('/heartbeat', async (req: Request, res: Response) => { try { - const { worker_id, current_task_id, status = 'active', resources } = req.body; + const { + worker_id, + current_task_id, + current_task_ids, + active_task_count, + max_concurrent_tasks, + status = 'active', + resources + } = req.body; if (!worker_id) { return res.status(400).json({ success: false, error: 'worker_id is required' }); } + // Build metadata object with all the new fields + const metadata: Record = {}; + if (resources) Object.assign(metadata, resources); + if (current_task_ids) metadata.current_task_ids = current_task_ids; + if (active_task_count !== undefined) metadata.active_task_count = active_task_count; + if (max_concurrent_tasks !== undefined) metadata.max_concurrent_tasks = max_concurrent_tasks; + // Store resources in metadata jsonb column const { rows } = await pool.query(` UPDATE worker_registry @@ -159,7 +178,7 @@ router.post('/heartbeat', async (req: Request, res: Response) => { updated_at = NOW() WHERE worker_id = $3 RETURNING id, friendly_name, status - `, [current_task_id || null, status, worker_id, resources ? JSON.stringify(resources) : null]); + `, [current_task_id || null, status, worker_id, Object.keys(metadata).length > 0 ? JSON.stringify(metadata) : null]); if (rows.length === 0) { return res.status(404).json({ success: false, error: 'Worker not found - please register first' }); @@ -330,12 +349,21 @@ router.get('/workers', async (req: Request, res: Response) => { tasks_completed, tasks_failed, current_task_id, + -- Concurrent task fields from metadata + (metadata->>'current_task_ids')::jsonb as current_task_ids, + (metadata->>'active_task_count')::int as active_task_count, + (metadata->>'max_concurrent_tasks')::int as max_concurrent_tasks, + -- Decommission fields + COALESCE(decommission_requested, false) as decommission_requested, + decommission_reason, + -- Full metadata for resources metadata, EXTRACT(EPOCH FROM (NOW() - last_heartbeat_at)) as seconds_since_heartbeat, CASE WHEN status = 'offline' OR status = 'terminated' THEN status WHEN last_heartbeat_at < NOW() - INTERVAL '2 minutes' THEN 'stale' WHEN current_task_id IS NOT NULL THEN 'busy' + WHEN (metadata->>'active_task_count')::int > 0 THEN 'busy' ELSE 'ready' END as health_status, created_at @@ -672,4 +700,163 @@ router.get('/capacity', async (_req: Request, res: Response) => { } }); +// ============================================================ +// WORKER LIFECYCLE MANAGEMENT +// ============================================================ + +/** + * POST /api/worker-registry/workers/:workerId/decommission + * Request graceful decommission of a worker (will stop after current task) + */ +router.post('/workers/:workerId/decommission', async (req: Request, res: Response) => { + try { + const { workerId } = req.params; + const { reason, issued_by } = req.body; + + // Update worker_registry to flag for decommission + const result = await pool.query( + `UPDATE worker_registry + SET decommission_requested = true, + decommission_reason = $2, + decommission_requested_at = NOW() + WHERE worker_id = $1 + RETURNING friendly_name, status, current_task_id`, + [workerId, reason || 'Manual decommission from admin'] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ success: false, error: 'Worker not found' }); + } + + const worker = result.rows[0]; + + // Also log to worker_commands for audit trail + await pool.query( + `INSERT INTO worker_commands (worker_id, command, reason, issued_by) + VALUES ($1, 'decommission', $2, $3) + ON CONFLICT DO NOTHING`, + [workerId, reason || 'Manual decommission', issued_by || 'admin'] + ).catch(() => { + // Table might not exist yet - ignore + }); + + res.json({ + success: true, + message: worker.current_task_id + ? `Worker ${worker.friendly_name} will stop after completing task #${worker.current_task_id}` + : `Worker ${worker.friendly_name} will stop on next poll`, + worker: { + friendly_name: worker.friendly_name, + status: worker.status, + current_task_id: worker.current_task_id, + decommission_requested: true + } + }); + } catch (error: any) { + res.status(500).json({ success: false, error: error.message }); + } +}); + +/** + * POST /api/worker-registry/workers/:workerId/cancel-decommission + * Cancel a pending decommission request + */ +router.post('/workers/:workerId/cancel-decommission', async (req: Request, res: Response) => { + try { + const { workerId } = req.params; + + const result = await pool.query( + `UPDATE worker_registry + SET decommission_requested = false, + decommission_reason = NULL, + decommission_requested_at = NULL + WHERE worker_id = $1 + RETURNING friendly_name`, + [workerId] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ success: false, error: 'Worker not found' }); + } + + res.json({ + success: true, + message: `Decommission cancelled for ${result.rows[0].friendly_name}` + }); + } catch (error: any) { + res.status(500).json({ success: false, error: error.message }); + } +}); + +/** + * POST /api/worker-registry/spawn + * Spawn a new worker in the current pod (only works in multi-worker-per-pod mode) + * For now, this is a placeholder - actual spawning requires the pod supervisor + */ +router.post('/spawn', async (req: Request, res: Response) => { + try { + const { pod_name, role } = req.body; + + // For now, we can't actually spawn workers from the API + // This would require a supervisor process in each pod that listens for spawn commands + // Instead, return instructions for how to scale + res.json({ + success: false, + error: 'Direct worker spawning not yet implemented', + instructions: 'To add workers, scale the K8s deployment: kubectl scale deployment/scraper-worker --replicas=N' + }); + } catch (error: any) { + res.status(500).json({ success: false, error: error.message }); + } +}); + +/** + * GET /api/worker-registry/pods + * Get workers grouped by pod + */ +router.get('/pods', async (_req: Request, res: Response) => { + try { + const { rows } = await pool.query(` + SELECT + COALESCE(pod_name, 'Unknown') as pod_name, + COUNT(*) as worker_count, + COUNT(*) FILTER (WHERE current_task_id IS NOT NULL) as busy_count, + COUNT(*) FILTER (WHERE current_task_id IS NULL) as idle_count, + SUM(tasks_completed) as total_completed, + SUM(tasks_failed) as total_failed, + SUM((metadata->>'memory_rss_mb')::int) as total_memory_mb, + array_agg(json_build_object( + 'worker_id', worker_id, + 'friendly_name', friendly_name, + 'status', status, + 'current_task_id', current_task_id, + 'tasks_completed', tasks_completed, + 'tasks_failed', tasks_failed, + 'decommission_requested', COALESCE(decommission_requested, false), + 'last_heartbeat_at', last_heartbeat_at + )) as workers + FROM worker_registry + WHERE status NOT IN ('offline', 'terminated') + GROUP BY pod_name + ORDER BY pod_name + `); + + res.json({ + success: true, + pods: rows.map(row => ({ + pod_name: row.pod_name, + worker_count: parseInt(row.worker_count), + busy_count: parseInt(row.busy_count), + idle_count: parseInt(row.idle_count), + total_completed: parseInt(row.total_completed) || 0, + total_failed: parseInt(row.total_failed) || 0, + total_memory_mb: parseInt(row.total_memory_mb) || 0, + workers: row.workers + })) + }); + } catch (error: any) { + res.status(500).json({ success: false, error: error.message }); + } +}); + export default router; diff --git a/backend/src/routes/workers.ts b/backend/src/routes/workers.ts index 6fd69aea..d4e072c5 100644 --- a/backend/src/routes/workers.ts +++ b/backend/src/routes/workers.ts @@ -35,7 +35,7 @@ const router = Router(); // ============================================================ const K8S_NAMESPACE = process.env.K8S_NAMESPACE || 'dispensary-scraper'; -const K8S_STATEFULSET_NAME = process.env.K8S_WORKER_STATEFULSET || 'scraper-worker'; +const K8S_DEPLOYMENT_NAME = process.env.K8S_WORKER_DEPLOYMENT || 'scraper-worker'; // Initialize K8s client - uses in-cluster config when running in K8s, // or kubeconfig when running locally @@ -70,7 +70,7 @@ function getK8sClient(): k8s.AppsV1Api | null { /** * GET /api/workers/k8s/replicas - Get current worker replica count - * Returns current and desired replica counts from the StatefulSet + * Returns current and desired replica counts from the Deployment */ router.get('/k8s/replicas', async (_req: Request, res: Response) => { const client = getK8sClient(); @@ -84,21 +84,21 @@ router.get('/k8s/replicas', async (_req: Request, res: Response) => { } try { - const response = await client.readNamespacedStatefulSet({ - name: K8S_STATEFULSET_NAME, + const response = await client.readNamespacedDeployment({ + name: K8S_DEPLOYMENT_NAME, namespace: K8S_NAMESPACE, }); - const statefulSet = response; + const deployment = response; res.json({ success: true, replicas: { - current: statefulSet.status?.readyReplicas || 0, - desired: statefulSet.spec?.replicas || 0, - available: statefulSet.status?.availableReplicas || 0, - updated: statefulSet.status?.updatedReplicas || 0, + current: deployment.status?.readyReplicas || 0, + desired: deployment.spec?.replicas || 0, + available: deployment.status?.availableReplicas || 0, + updated: deployment.status?.updatedReplicas || 0, }, - statefulset: K8S_STATEFULSET_NAME, + deployment: K8S_DEPLOYMENT_NAME, namespace: K8S_NAMESPACE, }); } catch (err: any) { @@ -112,7 +112,7 @@ router.get('/k8s/replicas', async (_req: Request, res: Response) => { /** * POST /api/workers/k8s/scale - Scale worker replicas - * Body: { replicas: number } - desired replica count (1-20) + * Body: { replicas: number } - desired replica count (0-20) */ router.post('/k8s/scale', async (req: Request, res: Response) => { const client = getK8sClient(); @@ -136,21 +136,21 @@ router.post('/k8s/scale', async (req: Request, res: Response) => { try { // Get current state first - const currentResponse = await client.readNamespacedStatefulSetScale({ - name: K8S_STATEFULSET_NAME, + const currentResponse = await client.readNamespacedDeploymentScale({ + name: K8S_DEPLOYMENT_NAME, namespace: K8S_NAMESPACE, }); const currentReplicas = currentResponse.spec?.replicas || 0; - // Update scale using replaceNamespacedStatefulSetScale - await client.replaceNamespacedStatefulSetScale({ - name: K8S_STATEFULSET_NAME, + // Update scale using replaceNamespacedDeploymentScale + await client.replaceNamespacedDeploymentScale({ + name: K8S_DEPLOYMENT_NAME, namespace: K8S_NAMESPACE, body: { apiVersion: 'autoscaling/v1', kind: 'Scale', metadata: { - name: K8S_STATEFULSET_NAME, + name: K8S_DEPLOYMENT_NAME, namespace: K8S_NAMESPACE, }, spec: { @@ -159,14 +159,14 @@ router.post('/k8s/scale', async (req: Request, res: Response) => { }, }); - console.log(`[Workers] Scaled ${K8S_STATEFULSET_NAME} from ${currentReplicas} to ${replicas} replicas`); + console.log(`[Workers] Scaled ${K8S_DEPLOYMENT_NAME} from ${currentReplicas} to ${replicas} replicas`); res.json({ success: true, message: `Scaled from ${currentReplicas} to ${replicas} replicas`, previous: currentReplicas, desired: replicas, - statefulset: K8S_STATEFULSET_NAME, + deployment: K8S_DEPLOYMENT_NAME, namespace: K8S_NAMESPACE, }); } catch (err: any) { @@ -178,6 +178,73 @@ router.post('/k8s/scale', async (req: Request, res: Response) => { } }); +/** + * POST /api/workers/k8s/scale-up - Scale up worker replicas by 1 + * Convenience endpoint for adding a single worker + */ +router.post('/k8s/scale-up', async (_req: Request, res: Response) => { + const client = getK8sClient(); + + if (!client) { + return res.status(503).json({ + success: false, + error: 'K8s client not available (not running in cluster or no kubeconfig)', + }); + } + + try { + // Get current replica count + const currentResponse = await client.readNamespacedDeploymentScale({ + name: K8S_DEPLOYMENT_NAME, + namespace: K8S_NAMESPACE, + }); + const currentReplicas = currentResponse.spec?.replicas || 0; + const newReplicas = currentReplicas + 1; + + // Cap at 20 replicas + if (newReplicas > 20) { + return res.status(400).json({ + success: false, + error: 'Maximum replica count (20) reached', + }); + } + + // Scale up by 1 + await client.replaceNamespacedDeploymentScale({ + name: K8S_DEPLOYMENT_NAME, + namespace: K8S_NAMESPACE, + body: { + apiVersion: 'autoscaling/v1', + kind: 'Scale', + metadata: { + name: K8S_DEPLOYMENT_NAME, + namespace: K8S_NAMESPACE, + }, + spec: { + replicas: newReplicas, + }, + }, + }); + + console.log(`[Workers] Scaled up ${K8S_DEPLOYMENT_NAME} from ${currentReplicas} to ${newReplicas} replicas`); + + res.json({ + success: true, + message: `Added worker (${currentReplicas} → ${newReplicas} replicas)`, + previous: currentReplicas, + desired: newReplicas, + deployment: K8S_DEPLOYMENT_NAME, + namespace: K8S_NAMESPACE, + }); + } catch (err: any) { + console.error('[Workers] K8s scale-up error:', err.body?.message || err.message); + res.status(500).json({ + success: false, + error: err.body?.message || err.message, + }); + } +}); + // ============================================================ // STATIC ROUTES (must come before parameterized routes) // ============================================================ diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 214fc357..bdc44506 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -64,6 +64,33 @@ const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000'); const HEARTBEAT_INTERVAL_MS = parseInt(process.env.HEARTBEAT_INTERVAL_MS || '30000'); const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010'; +// ============================================================================= +// CONCURRENT TASK PROCESSING SETTINGS +// ============================================================================= +// Workers can process multiple tasks simultaneously using async I/O. +// This improves throughput for I/O-bound tasks (network calls, DB queries). +// +// Resource thresholds trigger "backoff" - the worker stops claiming new tasks +// but continues processing existing ones until resources return to normal. +// +// See: docs/WORKER_TASK_ARCHITECTURE.md#concurrent-task-processing +// ============================================================================= + +// Maximum number of tasks this worker will run concurrently +// Tune based on workload: I/O-bound tasks benefit from higher concurrency +const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3'); + +// When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks +// Default 85% - gives headroom before OOM +const MEMORY_BACKOFF_THRESHOLD = parseFloat(process.env.MEMORY_BACKOFF_THRESHOLD || '0.85'); + +// When CPU usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks +// Default 90% - allows some burst capacity +const CPU_BACKOFF_THRESHOLD = parseFloat(process.env.CPU_BACKOFF_THRESHOLD || '0.90'); + +// How long to wait (ms) when in backoff state before rechecking resources +const BACKOFF_DURATION_MS = parseInt(process.env.BACKOFF_DURATION_MS || '10000'); + export interface TaskContext { pool: Pool; workerId: string; @@ -94,6 +121,25 @@ const TASK_HANDLERS: Record = { analytics_refresh: handleAnalyticsRefresh, }; +/** + * Resource usage stats reported to the registry and used for backoff decisions. + * These values are included in worker heartbeats and displayed in the UI. + */ +interface ResourceStats { + /** Current heap memory usage as decimal (0.0 to 1.0) */ + memoryPercent: number; + /** Current heap used in MB */ + memoryMb: number; + /** Total heap available in MB */ + memoryTotalMb: number; + /** CPU usage percentage since last check (0 to 100) */ + cpuPercent: number; + /** True if worker is currently in backoff state */ + isBackingOff: boolean; + /** Reason for backoff (e.g., "Memory at 87.3% (threshold: 85%)") */ + backoffReason: string | null; +} + export class TaskWorker { private pool: Pool; private workerId: string; @@ -102,14 +148,106 @@ export class TaskWorker { private isRunning: boolean = false; private heartbeatInterval: NodeJS.Timeout | null = null; private registryHeartbeatInterval: NodeJS.Timeout | null = null; - private currentTask: WorkerTask | null = null; private crawlRotator: CrawlRotator; + // ========================================================================== + // CONCURRENT TASK TRACKING + // ========================================================================== + // activeTasks: Map of task ID -> task object for all currently running tasks + // taskPromises: Map of task ID -> Promise for cleanup when task completes + // maxConcurrentTasks: How many tasks this worker will run in parallel + // ========================================================================== + private activeTasks: Map = new Map(); + private taskPromises: Map> = new Map(); + private maxConcurrentTasks: number = MAX_CONCURRENT_TASKS; + + // ========================================================================== + // RESOURCE MONITORING FOR BACKOFF + // ========================================================================== + // CPU tracking uses differential measurement - we track last values and + // calculate percentage based on elapsed time since last check. + // ========================================================================== + private lastCpuUsage: { user: number; system: number } = { user: 0, system: 0 }; + private lastCpuCheck: number = Date.now(); + private isBackingOff: boolean = false; + private backoffReason: string | null = null; + constructor(role: TaskRole | null = null, workerId?: string) { this.pool = getPool(); this.role = role; this.workerId = workerId || `worker-${uuidv4().slice(0, 8)}`; this.crawlRotator = new CrawlRotator(this.pool); + + // Initialize CPU tracking + const cpuUsage = process.cpuUsage(); + this.lastCpuUsage = { user: cpuUsage.user, system: cpuUsage.system }; + this.lastCpuCheck = Date.now(); + } + + /** + * Get current resource usage + */ + private getResourceStats(): ResourceStats { + const memUsage = process.memoryUsage(); + const heapUsedMb = memUsage.heapUsed / 1024 / 1024; + const heapTotalMb = memUsage.heapTotal / 1024 / 1024; + const memoryPercent = heapUsedMb / heapTotalMb; + + // Calculate CPU usage since last check + const cpuUsage = process.cpuUsage(); + const now = Date.now(); + const elapsed = now - this.lastCpuCheck; + + let cpuPercent = 0; + if (elapsed > 0) { + const userDiff = (cpuUsage.user - this.lastCpuUsage.user) / 1000; // microseconds to ms + const systemDiff = (cpuUsage.system - this.lastCpuUsage.system) / 1000; + cpuPercent = ((userDiff + systemDiff) / elapsed) * 100; + } + + // Update last values + this.lastCpuUsage = { user: cpuUsage.user, system: cpuUsage.system }; + this.lastCpuCheck = now; + + return { + memoryPercent, + memoryMb: Math.round(heapUsedMb), + memoryTotalMb: Math.round(heapTotalMb), + cpuPercent: Math.min(100, cpuPercent), // Cap at 100% + isBackingOff: this.isBackingOff, + backoffReason: this.backoffReason, + }; + } + + /** + * Check if we should back off from taking new tasks + */ + private shouldBackOff(): { backoff: boolean; reason: string | null } { + const stats = this.getResourceStats(); + + if (stats.memoryPercent > MEMORY_BACKOFF_THRESHOLD) { + return { backoff: true, reason: `Memory at ${(stats.memoryPercent * 100).toFixed(1)}% (threshold: ${MEMORY_BACKOFF_THRESHOLD * 100}%)` }; + } + + if (stats.cpuPercent > CPU_BACKOFF_THRESHOLD * 100) { + return { backoff: true, reason: `CPU at ${stats.cpuPercent.toFixed(1)}% (threshold: ${CPU_BACKOFF_THRESHOLD * 100}%)` }; + } + + return { backoff: false, reason: null }; + } + + /** + * Get count of currently running tasks + */ + get activeTaskCount(): number { + return this.activeTasks.size; + } + + /** + * Check if we can accept more tasks + */ + private canAcceptMoreTasks(): boolean { + return this.activeTasks.size < this.maxConcurrentTasks; } /** @@ -252,21 +390,32 @@ export class TaskWorker { const memUsage = process.memoryUsage(); const cpuUsage = process.cpuUsage(); const proxyLocation = this.crawlRotator.getProxyLocation(); + const resourceStats = this.getResourceStats(); + + // Get array of active task IDs + const activeTaskIds = Array.from(this.activeTasks.keys()); await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ worker_id: this.workerId, - current_task_id: this.currentTask?.id || null, - status: this.currentTask ? 'active' : 'idle', + current_task_id: activeTaskIds[0] || null, // Primary task for backwards compat + current_task_ids: activeTaskIds, // All active tasks + active_task_count: this.activeTasks.size, + max_concurrent_tasks: this.maxConcurrentTasks, + status: this.activeTasks.size > 0 ? 'active' : 'idle', resources: { memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024), memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024), memory_rss_mb: Math.round(memUsage.rss / 1024 / 1024), + memory_percent: Math.round(resourceStats.memoryPercent * 100), cpu_user_ms: Math.round(cpuUsage.user / 1000), cpu_system_ms: Math.round(cpuUsage.system / 1000), + cpu_percent: Math.round(resourceStats.cpuPercent), proxy_location: proxyLocation, + is_backing_off: this.isBackingOff, + backoff_reason: this.backoffReason, } }) }); @@ -328,20 +477,85 @@ export class TaskWorker { this.startRegistryHeartbeat(); const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)'; - console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg}`); + console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (max ${this.maxConcurrentTasks} concurrent tasks)`); while (this.isRunning) { try { - await this.processNextTask(); + await this.mainLoop(); } catch (error: any) { console.error(`[TaskWorker] Loop error:`, error.message); await this.sleep(POLL_INTERVAL_MS); } } + // Wait for any remaining tasks to complete + if (this.taskPromises.size > 0) { + console.log(`[TaskWorker] Waiting for ${this.taskPromises.size} active tasks to complete...`); + await Promise.allSettled(this.taskPromises.values()); + } + console.log(`[TaskWorker] Worker ${this.workerId} stopped`); } + /** + * Main loop - tries to fill up to maxConcurrentTasks + */ + private async mainLoop(): Promise { + // Check resource usage and backoff if needed + const { backoff, reason } = this.shouldBackOff(); + if (backoff) { + if (!this.isBackingOff) { + console.log(`[TaskWorker] ${this.friendlyName} backing off: ${reason}`); + } + this.isBackingOff = true; + this.backoffReason = reason; + await this.sleep(BACKOFF_DURATION_MS); + return; + } + + // Clear backoff state + if (this.isBackingOff) { + console.log(`[TaskWorker] ${this.friendlyName} resuming normal operation`); + this.isBackingOff = false; + this.backoffReason = null; + } + + // Check for decommission signal + const shouldDecommission = await this.checkDecommission(); + if (shouldDecommission) { + console.log(`[TaskWorker] ${this.friendlyName} received decommission signal - waiting for ${this.activeTasks.size} tasks to complete`); + // Stop accepting new tasks, wait for current to finish + this.isRunning = false; + return; + } + + // Try to claim more tasks if we have capacity + if (this.canAcceptMoreTasks()) { + const task = await taskService.claimTask(this.role, this.workerId); + + if (task) { + console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`); + this.activeTasks.set(task.id, task); + + // Start task in background (don't await) + const taskPromise = this.executeTask(task); + this.taskPromises.set(task.id, taskPromise); + + // Clean up when done + taskPromise.finally(() => { + this.activeTasks.delete(task.id); + this.taskPromises.delete(task.id); + }); + + // Immediately try to claim more tasks (don't wait for poll interval) + return; + } + } + + // No task claimed or at capacity - wait before next poll + await this.sleep(POLL_INTERVAL_MS); + } + /** * Stop the worker */ @@ -354,23 +568,10 @@ export class TaskWorker { } /** - * Process the next available task + * Execute a single task (runs concurrently with other tasks) */ - private async processNextTask(): Promise { - // Try to claim a task - const task = await taskService.claimTask(this.role, this.workerId); - - if (!task) { - // No tasks available, wait and retry - await this.sleep(POLL_INTERVAL_MS); - return; - } - - this.currentTask = task; - console.log(`[TaskWorker] Claimed task ${task.id} (${task.role}) for dispensary ${task.dispensary_id || 'N/A'}`); - - // Start heartbeat - this.startHeartbeat(task.id); + private async executeTask(task: WorkerTask): Promise { + console.log(`[TaskWorker] ${this.friendlyName} starting task ${task.id} (${task.role}) for dispensary ${task.dispensary_id || 'N/A'}`); try { // Mark as running @@ -399,7 +600,7 @@ export class TaskWorker { // Mark as completed await taskService.completeTask(task.id, result); await this.reportTaskCompletion(true); - console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id}`); + console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id} [${this.activeTasks.size}/${this.maxConcurrentTasks} active]`); // Chain next task if applicable const chainedTask = await taskService.chainNextTask({ @@ -421,9 +622,35 @@ export class TaskWorker { await taskService.failTask(task.id, error.message); await this.reportTaskCompletion(false); console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} error:`, error.message); - } finally { - this.stopHeartbeat(); - this.currentTask = null; + } + // Note: cleanup (removing from activeTasks) is handled in mainLoop's finally block + } + + /** + * Check if this worker has been flagged for decommission + * Returns true if worker should stop after current task + */ + private async checkDecommission(): Promise { + try { + // Check worker_registry for decommission flag + const result = await this.pool.query( + `SELECT decommission_requested, decommission_reason + FROM worker_registry + WHERE worker_id = $1`, + [this.workerId] + ); + + if (result.rows.length > 0 && result.rows[0].decommission_requested) { + const reason = result.rows[0].decommission_reason || 'No reason provided'; + console.log(`[TaskWorker] Decommission requested: ${reason}`); + return true; + } + + return false; + } catch (error: any) { + // If we can't check, continue running + console.warn(`[TaskWorker] Could not check decommission status: ${error.message}`); + return false; } } @@ -460,12 +687,25 @@ export class TaskWorker { /** * Get worker info */ - getInfo(): { workerId: string; role: TaskRole | null; isRunning: boolean; currentTaskId: number | null } { + getInfo(): { + workerId: string; + role: TaskRole | null; + isRunning: boolean; + activeTaskIds: number[]; + activeTaskCount: number; + maxConcurrentTasks: number; + isBackingOff: boolean; + backoffReason: string | null; + } { return { workerId: this.workerId, role: this.role, isRunning: this.isRunning, - currentTaskId: this.currentTask?.id || null, + activeTaskIds: Array.from(this.activeTasks.keys()), + activeTaskCount: this.activeTasks.size, + maxConcurrentTasks: this.maxConcurrentTasks, + isBackingOff: this.isBackingOff, + backoffReason: this.backoffReason, }; } } diff --git a/cannaiq/src/components/Layout.tsx b/cannaiq/src/components/Layout.tsx index aa2ac1a5..b3e3f63a 100755 --- a/cannaiq/src/components/Layout.tsx +++ b/cannaiq/src/components/Layout.tsx @@ -1,4 +1,4 @@ -import { ReactNode, useEffect, useState } from 'react'; +import { ReactNode, useEffect, useState, useRef } from 'react'; import { useNavigate, useLocation, Link } from 'react-router-dom'; import { useAuthStore } from '../store/authStore'; import { api } from '../lib/api'; @@ -86,6 +86,8 @@ export function Layout({ children }: LayoutProps) { const { user, logout } = useAuthStore(); const [versionInfo, setVersionInfo] = useState(null); const [sidebarOpen, setSidebarOpen] = useState(false); + const navRef = useRef(null); + const scrollPositionRef = useRef(0); useEffect(() => { const fetchVersion = async () => { @@ -111,9 +113,27 @@ export function Layout({ children }: LayoutProps) { return location.pathname.startsWith(path); }; - // Close sidebar on route change (mobile) + // Save scroll position before route change + useEffect(() => { + const nav = navRef.current; + if (nav) { + const handleScroll = () => { + scrollPositionRef.current = nav.scrollTop; + }; + nav.addEventListener('scroll', handleScroll); + return () => nav.removeEventListener('scroll', handleScroll); + } + }, []); + + // Restore scroll position after route change and close mobile sidebar useEffect(() => { setSidebarOpen(false); + // Restore scroll position after render + requestAnimationFrame(() => { + if (navRef.current) { + navRef.current.scrollTop = scrollPositionRef.current; + } + }); }, [location.pathname]); const sidebarContent = ( @@ -145,7 +165,7 @@ export function Layout({ children }: LayoutProps) { {/* Navigation */} -