feat(workers): Concurrent task processing with resource-based backoff
Workers can now process multiple tasks concurrently (default: 3 max). Self-regulate based on resource usage - back off at 85% memory or 90% CPU. Backend changes: - TaskWorker handles concurrent tasks using async Maps - Resource monitoring (memory %, CPU %) with backoff logic - Heartbeat reports active_task_count, max_concurrent_tasks, resource stats - Decommission support via worker_commands table Frontend changes: - Workers Dashboard shows tasks per worker (N/M format) - Resource badges with color-coded thresholds - Pod visualization with clickable selection - Decommission controls per worker New env vars: - MAX_CONCURRENT_TASKS (default: 3) - MEMORY_BACKOFF_THRESHOLD (default: 0.85) - CPU_BACKOFF_THRESHOLD (default: 0.90) - BACKOFF_DURATION_MS (default: 10000) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -362,6 +362,148 @@ SET status = 'pending', retry_count = retry_count + 1
|
||||
WHERE status = 'failed' AND retry_count < max_retries;
|
||||
```
|
||||
|
||||
## Concurrent Task Processing (Added 2024-12)
|
||||
|
||||
Workers can now process multiple tasks concurrently within a single worker instance. This improves throughput by utilizing async I/O efficiently.
|
||||
|
||||
### Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ Pod (K8s) │
|
||||
│ │
|
||||
│ ┌─────────────────────────────────────────────────────┐ │
|
||||
│ │ TaskWorker │ │
|
||||
│ │ │ │
|
||||
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
|
||||
│ │ │ Task 1 │ │ Task 2 │ │ Task 3 │ (concurrent)│ │
|
||||
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
|
||||
│ │ │ │
|
||||
│ │ Resource Monitor │ │
|
||||
│ │ ├── Memory: 65% (threshold: 85%) │ │
|
||||
│ │ ├── CPU: 45% (threshold: 90%) │ │
|
||||
│ │ └── Status: Normal │ │
|
||||
│ └─────────────────────────────────────────────────────┘ │
|
||||
└─────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Environment Variables
|
||||
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `MAX_CONCURRENT_TASKS` | 3 | Maximum tasks a worker will run concurrently |
|
||||
| `MEMORY_BACKOFF_THRESHOLD` | 0.85 | Back off when heap memory exceeds 85% |
|
||||
| `CPU_BACKOFF_THRESHOLD` | 0.90 | Back off when CPU exceeds 90% |
|
||||
| `BACKOFF_DURATION_MS` | 10000 | How long to wait when backing off (10s) |
|
||||
|
||||
### How It Works
|
||||
|
||||
1. **Main Loop**: Worker continuously tries to fill up to `MAX_CONCURRENT_TASKS`
|
||||
2. **Resource Monitoring**: Before claiming a new task, worker checks memory and CPU
|
||||
3. **Backoff**: If resources exceed thresholds, worker pauses and stops claiming new tasks
|
||||
4. **Concurrent Execution**: Tasks run in parallel using `Promise` - they don't block each other
|
||||
5. **Graceful Shutdown**: On SIGTERM/decommission, worker stops claiming but waits for active tasks
|
||||
|
||||
### Resource Monitoring
|
||||
|
||||
```typescript
|
||||
// ResourceStats interface
|
||||
interface ResourceStats {
|
||||
memoryPercent: number; // Current heap usage as decimal (0.0-1.0)
|
||||
memoryMb: number; // Current heap used in MB
|
||||
memoryTotalMb: number; // Total heap available in MB
|
||||
cpuPercent: number; // CPU usage as percentage (0-100)
|
||||
isBackingOff: boolean; // True if worker is in backoff state
|
||||
backoffReason: string; // Why the worker is backing off
|
||||
}
|
||||
```
|
||||
|
||||
### Heartbeat Data
|
||||
|
||||
Workers report the following in their heartbeat:
|
||||
|
||||
```json
|
||||
{
|
||||
"worker_id": "worker-abc123",
|
||||
"current_task_id": 456,
|
||||
"current_task_ids": [456, 457, 458],
|
||||
"active_task_count": 3,
|
||||
"max_concurrent_tasks": 3,
|
||||
"status": "active",
|
||||
"resources": {
|
||||
"memory_mb": 256,
|
||||
"memory_total_mb": 512,
|
||||
"memory_rss_mb": 320,
|
||||
"memory_percent": 50,
|
||||
"cpu_user_ms": 12500,
|
||||
"cpu_system_ms": 3200,
|
||||
"cpu_percent": 45,
|
||||
"is_backing_off": false,
|
||||
"backoff_reason": null
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Backoff Behavior
|
||||
|
||||
When resources exceed thresholds:
|
||||
|
||||
1. Worker logs the backoff reason:
|
||||
```
|
||||
[TaskWorker] MyWorker backing off: Memory at 87.3% (threshold: 85%)
|
||||
```
|
||||
|
||||
2. Worker stops claiming new tasks but continues existing tasks
|
||||
|
||||
3. After `BACKOFF_DURATION_MS`, worker rechecks resources
|
||||
|
||||
4. When resources return to normal:
|
||||
```
|
||||
[TaskWorker] MyWorker resuming normal operation
|
||||
```
|
||||
|
||||
### UI Display
|
||||
|
||||
The Workers Dashboard shows:
|
||||
|
||||
- **Tasks Column**: `2/3 tasks` (active/max concurrent)
|
||||
- **Resources Column**: Memory % and CPU % with color coding
|
||||
- Green: < 50%
|
||||
- Yellow: 50-74%
|
||||
- Amber: 75-89%
|
||||
- Red: 90%+
|
||||
- **Backing Off**: Orange warning badge when worker is in backoff state
|
||||
|
||||
### Task Count Badge Details
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────┐
|
||||
│ Worker: "MyWorker" │
|
||||
│ Tasks: 2/3 tasks #456, #457 │
|
||||
│ Resources: 🧠 65% 💻 45% │
|
||||
│ Status: ● Active │
|
||||
└─────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Best Practices
|
||||
|
||||
1. **Start Conservative**: Use `MAX_CONCURRENT_TASKS=3` initially
|
||||
2. **Monitor Resources**: Watch for frequent backoffs in logs
|
||||
3. **Tune Per Workload**: I/O-bound tasks benefit from higher concurrency
|
||||
4. **Scale Horizontally**: Add more pods rather than cranking concurrency too high
|
||||
|
||||
### Code References
|
||||
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `src/tasks/task-worker.ts:68-71` | Concurrency environment variables |
|
||||
| `src/tasks/task-worker.ts:104-111` | ResourceStats interface |
|
||||
| `src/tasks/task-worker.ts:149-179` | getResourceStats() method |
|
||||
| `src/tasks/task-worker.ts:184-196` | shouldBackOff() method |
|
||||
| `src/tasks/task-worker.ts:462-516` | mainLoop() with concurrent claiming |
|
||||
| `src/routes/worker-registry.ts:148-195` | Heartbeat endpoint handling |
|
||||
| `cannaiq/src/pages/WorkersDashboard.tsx:233-305` | UI components for resources |
|
||||
|
||||
## Monitoring
|
||||
|
||||
### Logs
|
||||
|
||||
27
backend/migrations/074_worker_commands.sql
Normal file
27
backend/migrations/074_worker_commands.sql
Normal file
@@ -0,0 +1,27 @@
|
||||
-- Migration: Worker Commands Table
|
||||
-- Purpose: Store commands for workers (decommission, etc.)
|
||||
-- Workers poll this table after each task to check for commands
|
||||
|
||||
CREATE TABLE IF NOT EXISTS worker_commands (
|
||||
id SERIAL PRIMARY KEY,
|
||||
worker_id TEXT NOT NULL,
|
||||
command TEXT NOT NULL, -- 'decommission', 'pause', 'resume'
|
||||
reason TEXT,
|
||||
issued_by TEXT,
|
||||
issued_at TIMESTAMPTZ DEFAULT NOW(),
|
||||
acknowledged_at TIMESTAMPTZ,
|
||||
executed_at TIMESTAMPTZ,
|
||||
status TEXT DEFAULT 'pending' -- 'pending', 'acknowledged', 'executed', 'cancelled'
|
||||
);
|
||||
|
||||
-- Index for worker lookups
|
||||
CREATE INDEX IF NOT EXISTS idx_worker_commands_worker_id ON worker_commands(worker_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_worker_commands_pending ON worker_commands(worker_id, status) WHERE status = 'pending';
|
||||
|
||||
-- Add decommission_requested column to worker_registry for quick checks
|
||||
ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS decommission_requested BOOLEAN DEFAULT FALSE;
|
||||
ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS decommission_reason TEXT;
|
||||
ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS decommission_requested_at TIMESTAMPTZ;
|
||||
|
||||
-- Comment
|
||||
COMMENT ON TABLE worker_commands IS 'Commands issued to workers (decommission after task, pause, etc.)';
|
||||
@@ -138,17 +138,36 @@ router.post('/register', async (req: Request, res: Response) => {
|
||||
*
|
||||
* Body:
|
||||
* - worker_id: string (required)
|
||||
* - current_task_id: number (optional) - task currently being processed
|
||||
* - current_task_id: number (optional) - task currently being processed (primary task)
|
||||
* - current_task_ids: number[] (optional) - all tasks currently being processed (concurrent)
|
||||
* - active_task_count: number (optional) - number of tasks currently running
|
||||
* - max_concurrent_tasks: number (optional) - max concurrent tasks this worker can handle
|
||||
* - status: string (optional) - 'active', 'idle'
|
||||
* - resources: object (optional) - memory_mb, cpu_user_ms, cpu_system_ms, etc.
|
||||
*/
|
||||
router.post('/heartbeat', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const { worker_id, current_task_id, status = 'active', resources } = req.body;
|
||||
const {
|
||||
worker_id,
|
||||
current_task_id,
|
||||
current_task_ids,
|
||||
active_task_count,
|
||||
max_concurrent_tasks,
|
||||
status = 'active',
|
||||
resources
|
||||
} = req.body;
|
||||
|
||||
if (!worker_id) {
|
||||
return res.status(400).json({ success: false, error: 'worker_id is required' });
|
||||
}
|
||||
|
||||
// Build metadata object with all the new fields
|
||||
const metadata: Record<string, unknown> = {};
|
||||
if (resources) Object.assign(metadata, resources);
|
||||
if (current_task_ids) metadata.current_task_ids = current_task_ids;
|
||||
if (active_task_count !== undefined) metadata.active_task_count = active_task_count;
|
||||
if (max_concurrent_tasks !== undefined) metadata.max_concurrent_tasks = max_concurrent_tasks;
|
||||
|
||||
// Store resources in metadata jsonb column
|
||||
const { rows } = await pool.query(`
|
||||
UPDATE worker_registry
|
||||
@@ -159,7 +178,7 @@ router.post('/heartbeat', async (req: Request, res: Response) => {
|
||||
updated_at = NOW()
|
||||
WHERE worker_id = $3
|
||||
RETURNING id, friendly_name, status
|
||||
`, [current_task_id || null, status, worker_id, resources ? JSON.stringify(resources) : null]);
|
||||
`, [current_task_id || null, status, worker_id, Object.keys(metadata).length > 0 ? JSON.stringify(metadata) : null]);
|
||||
|
||||
if (rows.length === 0) {
|
||||
return res.status(404).json({ success: false, error: 'Worker not found - please register first' });
|
||||
@@ -330,12 +349,21 @@ router.get('/workers', async (req: Request, res: Response) => {
|
||||
tasks_completed,
|
||||
tasks_failed,
|
||||
current_task_id,
|
||||
-- Concurrent task fields from metadata
|
||||
(metadata->>'current_task_ids')::jsonb as current_task_ids,
|
||||
(metadata->>'active_task_count')::int as active_task_count,
|
||||
(metadata->>'max_concurrent_tasks')::int as max_concurrent_tasks,
|
||||
-- Decommission fields
|
||||
COALESCE(decommission_requested, false) as decommission_requested,
|
||||
decommission_reason,
|
||||
-- Full metadata for resources
|
||||
metadata,
|
||||
EXTRACT(EPOCH FROM (NOW() - last_heartbeat_at)) as seconds_since_heartbeat,
|
||||
CASE
|
||||
WHEN status = 'offline' OR status = 'terminated' THEN status
|
||||
WHEN last_heartbeat_at < NOW() - INTERVAL '2 minutes' THEN 'stale'
|
||||
WHEN current_task_id IS NOT NULL THEN 'busy'
|
||||
WHEN (metadata->>'active_task_count')::int > 0 THEN 'busy'
|
||||
ELSE 'ready'
|
||||
END as health_status,
|
||||
created_at
|
||||
@@ -672,4 +700,163 @@ router.get('/capacity', async (_req: Request, res: Response) => {
|
||||
}
|
||||
});
|
||||
|
||||
// ============================================================
|
||||
// WORKER LIFECYCLE MANAGEMENT
|
||||
// ============================================================
|
||||
|
||||
/**
|
||||
* POST /api/worker-registry/workers/:workerId/decommission
|
||||
* Request graceful decommission of a worker (will stop after current task)
|
||||
*/
|
||||
router.post('/workers/:workerId/decommission', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const { workerId } = req.params;
|
||||
const { reason, issued_by } = req.body;
|
||||
|
||||
// Update worker_registry to flag for decommission
|
||||
const result = await pool.query(
|
||||
`UPDATE worker_registry
|
||||
SET decommission_requested = true,
|
||||
decommission_reason = $2,
|
||||
decommission_requested_at = NOW()
|
||||
WHERE worker_id = $1
|
||||
RETURNING friendly_name, status, current_task_id`,
|
||||
[workerId, reason || 'Manual decommission from admin']
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ success: false, error: 'Worker not found' });
|
||||
}
|
||||
|
||||
const worker = result.rows[0];
|
||||
|
||||
// Also log to worker_commands for audit trail
|
||||
await pool.query(
|
||||
`INSERT INTO worker_commands (worker_id, command, reason, issued_by)
|
||||
VALUES ($1, 'decommission', $2, $3)
|
||||
ON CONFLICT DO NOTHING`,
|
||||
[workerId, reason || 'Manual decommission', issued_by || 'admin']
|
||||
).catch(() => {
|
||||
// Table might not exist yet - ignore
|
||||
});
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: worker.current_task_id
|
||||
? `Worker ${worker.friendly_name} will stop after completing task #${worker.current_task_id}`
|
||||
: `Worker ${worker.friendly_name} will stop on next poll`,
|
||||
worker: {
|
||||
friendly_name: worker.friendly_name,
|
||||
status: worker.status,
|
||||
current_task_id: worker.current_task_id,
|
||||
decommission_requested: true
|
||||
}
|
||||
});
|
||||
} catch (error: any) {
|
||||
res.status(500).json({ success: false, error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/worker-registry/workers/:workerId/cancel-decommission
|
||||
* Cancel a pending decommission request
|
||||
*/
|
||||
router.post('/workers/:workerId/cancel-decommission', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const { workerId } = req.params;
|
||||
|
||||
const result = await pool.query(
|
||||
`UPDATE worker_registry
|
||||
SET decommission_requested = false,
|
||||
decommission_reason = NULL,
|
||||
decommission_requested_at = NULL
|
||||
WHERE worker_id = $1
|
||||
RETURNING friendly_name`,
|
||||
[workerId]
|
||||
);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ success: false, error: 'Worker not found' });
|
||||
}
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: `Decommission cancelled for ${result.rows[0].friendly_name}`
|
||||
});
|
||||
} catch (error: any) {
|
||||
res.status(500).json({ success: false, error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/worker-registry/spawn
|
||||
* Spawn a new worker in the current pod (only works in multi-worker-per-pod mode)
|
||||
* For now, this is a placeholder - actual spawning requires the pod supervisor
|
||||
*/
|
||||
router.post('/spawn', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const { pod_name, role } = req.body;
|
||||
|
||||
// For now, we can't actually spawn workers from the API
|
||||
// This would require a supervisor process in each pod that listens for spawn commands
|
||||
// Instead, return instructions for how to scale
|
||||
res.json({
|
||||
success: false,
|
||||
error: 'Direct worker spawning not yet implemented',
|
||||
instructions: 'To add workers, scale the K8s deployment: kubectl scale deployment/scraper-worker --replicas=N'
|
||||
});
|
||||
} catch (error: any) {
|
||||
res.status(500).json({ success: false, error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/worker-registry/pods
|
||||
* Get workers grouped by pod
|
||||
*/
|
||||
router.get('/pods', async (_req: Request, res: Response) => {
|
||||
try {
|
||||
const { rows } = await pool.query(`
|
||||
SELECT
|
||||
COALESCE(pod_name, 'Unknown') as pod_name,
|
||||
COUNT(*) as worker_count,
|
||||
COUNT(*) FILTER (WHERE current_task_id IS NOT NULL) as busy_count,
|
||||
COUNT(*) FILTER (WHERE current_task_id IS NULL) as idle_count,
|
||||
SUM(tasks_completed) as total_completed,
|
||||
SUM(tasks_failed) as total_failed,
|
||||
SUM((metadata->>'memory_rss_mb')::int) as total_memory_mb,
|
||||
array_agg(json_build_object(
|
||||
'worker_id', worker_id,
|
||||
'friendly_name', friendly_name,
|
||||
'status', status,
|
||||
'current_task_id', current_task_id,
|
||||
'tasks_completed', tasks_completed,
|
||||
'tasks_failed', tasks_failed,
|
||||
'decommission_requested', COALESCE(decommission_requested, false),
|
||||
'last_heartbeat_at', last_heartbeat_at
|
||||
)) as workers
|
||||
FROM worker_registry
|
||||
WHERE status NOT IN ('offline', 'terminated')
|
||||
GROUP BY pod_name
|
||||
ORDER BY pod_name
|
||||
`);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
pods: rows.map(row => ({
|
||||
pod_name: row.pod_name,
|
||||
worker_count: parseInt(row.worker_count),
|
||||
busy_count: parseInt(row.busy_count),
|
||||
idle_count: parseInt(row.idle_count),
|
||||
total_completed: parseInt(row.total_completed) || 0,
|
||||
total_failed: parseInt(row.total_failed) || 0,
|
||||
total_memory_mb: parseInt(row.total_memory_mb) || 0,
|
||||
workers: row.workers
|
||||
}))
|
||||
});
|
||||
} catch (error: any) {
|
||||
res.status(500).json({ success: false, error: error.message });
|
||||
}
|
||||
});
|
||||
|
||||
export default router;
|
||||
|
||||
@@ -35,7 +35,7 @@ const router = Router();
|
||||
// ============================================================
|
||||
|
||||
const K8S_NAMESPACE = process.env.K8S_NAMESPACE || 'dispensary-scraper';
|
||||
const K8S_STATEFULSET_NAME = process.env.K8S_WORKER_STATEFULSET || 'scraper-worker';
|
||||
const K8S_DEPLOYMENT_NAME = process.env.K8S_WORKER_DEPLOYMENT || 'scraper-worker';
|
||||
|
||||
// Initialize K8s client - uses in-cluster config when running in K8s,
|
||||
// or kubeconfig when running locally
|
||||
@@ -70,7 +70,7 @@ function getK8sClient(): k8s.AppsV1Api | null {
|
||||
|
||||
/**
|
||||
* GET /api/workers/k8s/replicas - Get current worker replica count
|
||||
* Returns current and desired replica counts from the StatefulSet
|
||||
* Returns current and desired replica counts from the Deployment
|
||||
*/
|
||||
router.get('/k8s/replicas', async (_req: Request, res: Response) => {
|
||||
const client = getK8sClient();
|
||||
@@ -84,21 +84,21 @@ router.get('/k8s/replicas', async (_req: Request, res: Response) => {
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await client.readNamespacedStatefulSet({
|
||||
name: K8S_STATEFULSET_NAME,
|
||||
const response = await client.readNamespacedDeployment({
|
||||
name: K8S_DEPLOYMENT_NAME,
|
||||
namespace: K8S_NAMESPACE,
|
||||
});
|
||||
|
||||
const statefulSet = response;
|
||||
const deployment = response;
|
||||
res.json({
|
||||
success: true,
|
||||
replicas: {
|
||||
current: statefulSet.status?.readyReplicas || 0,
|
||||
desired: statefulSet.spec?.replicas || 0,
|
||||
available: statefulSet.status?.availableReplicas || 0,
|
||||
updated: statefulSet.status?.updatedReplicas || 0,
|
||||
current: deployment.status?.readyReplicas || 0,
|
||||
desired: deployment.spec?.replicas || 0,
|
||||
available: deployment.status?.availableReplicas || 0,
|
||||
updated: deployment.status?.updatedReplicas || 0,
|
||||
},
|
||||
statefulset: K8S_STATEFULSET_NAME,
|
||||
deployment: K8S_DEPLOYMENT_NAME,
|
||||
namespace: K8S_NAMESPACE,
|
||||
});
|
||||
} catch (err: any) {
|
||||
@@ -112,7 +112,7 @@ router.get('/k8s/replicas', async (_req: Request, res: Response) => {
|
||||
|
||||
/**
|
||||
* POST /api/workers/k8s/scale - Scale worker replicas
|
||||
* Body: { replicas: number } - desired replica count (1-20)
|
||||
* Body: { replicas: number } - desired replica count (0-20)
|
||||
*/
|
||||
router.post('/k8s/scale', async (req: Request, res: Response) => {
|
||||
const client = getK8sClient();
|
||||
@@ -136,21 +136,21 @@ router.post('/k8s/scale', async (req: Request, res: Response) => {
|
||||
|
||||
try {
|
||||
// Get current state first
|
||||
const currentResponse = await client.readNamespacedStatefulSetScale({
|
||||
name: K8S_STATEFULSET_NAME,
|
||||
const currentResponse = await client.readNamespacedDeploymentScale({
|
||||
name: K8S_DEPLOYMENT_NAME,
|
||||
namespace: K8S_NAMESPACE,
|
||||
});
|
||||
const currentReplicas = currentResponse.spec?.replicas || 0;
|
||||
|
||||
// Update scale using replaceNamespacedStatefulSetScale
|
||||
await client.replaceNamespacedStatefulSetScale({
|
||||
name: K8S_STATEFULSET_NAME,
|
||||
// Update scale using replaceNamespacedDeploymentScale
|
||||
await client.replaceNamespacedDeploymentScale({
|
||||
name: K8S_DEPLOYMENT_NAME,
|
||||
namespace: K8S_NAMESPACE,
|
||||
body: {
|
||||
apiVersion: 'autoscaling/v1',
|
||||
kind: 'Scale',
|
||||
metadata: {
|
||||
name: K8S_STATEFULSET_NAME,
|
||||
name: K8S_DEPLOYMENT_NAME,
|
||||
namespace: K8S_NAMESPACE,
|
||||
},
|
||||
spec: {
|
||||
@@ -159,14 +159,14 @@ router.post('/k8s/scale', async (req: Request, res: Response) => {
|
||||
},
|
||||
});
|
||||
|
||||
console.log(`[Workers] Scaled ${K8S_STATEFULSET_NAME} from ${currentReplicas} to ${replicas} replicas`);
|
||||
console.log(`[Workers] Scaled ${K8S_DEPLOYMENT_NAME} from ${currentReplicas} to ${replicas} replicas`);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: `Scaled from ${currentReplicas} to ${replicas} replicas`,
|
||||
previous: currentReplicas,
|
||||
desired: replicas,
|
||||
statefulset: K8S_STATEFULSET_NAME,
|
||||
deployment: K8S_DEPLOYMENT_NAME,
|
||||
namespace: K8S_NAMESPACE,
|
||||
});
|
||||
} catch (err: any) {
|
||||
@@ -178,6 +178,73 @@ router.post('/k8s/scale', async (req: Request, res: Response) => {
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/workers/k8s/scale-up - Scale up worker replicas by 1
|
||||
* Convenience endpoint for adding a single worker
|
||||
*/
|
||||
router.post('/k8s/scale-up', async (_req: Request, res: Response) => {
|
||||
const client = getK8sClient();
|
||||
|
||||
if (!client) {
|
||||
return res.status(503).json({
|
||||
success: false,
|
||||
error: 'K8s client not available (not running in cluster or no kubeconfig)',
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
// Get current replica count
|
||||
const currentResponse = await client.readNamespacedDeploymentScale({
|
||||
name: K8S_DEPLOYMENT_NAME,
|
||||
namespace: K8S_NAMESPACE,
|
||||
});
|
||||
const currentReplicas = currentResponse.spec?.replicas || 0;
|
||||
const newReplicas = currentReplicas + 1;
|
||||
|
||||
// Cap at 20 replicas
|
||||
if (newReplicas > 20) {
|
||||
return res.status(400).json({
|
||||
success: false,
|
||||
error: 'Maximum replica count (20) reached',
|
||||
});
|
||||
}
|
||||
|
||||
// Scale up by 1
|
||||
await client.replaceNamespacedDeploymentScale({
|
||||
name: K8S_DEPLOYMENT_NAME,
|
||||
namespace: K8S_NAMESPACE,
|
||||
body: {
|
||||
apiVersion: 'autoscaling/v1',
|
||||
kind: 'Scale',
|
||||
metadata: {
|
||||
name: K8S_DEPLOYMENT_NAME,
|
||||
namespace: K8S_NAMESPACE,
|
||||
},
|
||||
spec: {
|
||||
replicas: newReplicas,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
console.log(`[Workers] Scaled up ${K8S_DEPLOYMENT_NAME} from ${currentReplicas} to ${newReplicas} replicas`);
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: `Added worker (${currentReplicas} → ${newReplicas} replicas)`,
|
||||
previous: currentReplicas,
|
||||
desired: newReplicas,
|
||||
deployment: K8S_DEPLOYMENT_NAME,
|
||||
namespace: K8S_NAMESPACE,
|
||||
});
|
||||
} catch (err: any) {
|
||||
console.error('[Workers] K8s scale-up error:', err.body?.message || err.message);
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: err.body?.message || err.message,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// ============================================================
|
||||
// STATIC ROUTES (must come before parameterized routes)
|
||||
// ============================================================
|
||||
|
||||
@@ -64,6 +64,33 @@ const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000');
|
||||
const HEARTBEAT_INTERVAL_MS = parseInt(process.env.HEARTBEAT_INTERVAL_MS || '30000');
|
||||
const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010';
|
||||
|
||||
// =============================================================================
|
||||
// CONCURRENT TASK PROCESSING SETTINGS
|
||||
// =============================================================================
|
||||
// Workers can process multiple tasks simultaneously using async I/O.
|
||||
// This improves throughput for I/O-bound tasks (network calls, DB queries).
|
||||
//
|
||||
// Resource thresholds trigger "backoff" - the worker stops claiming new tasks
|
||||
// but continues processing existing ones until resources return to normal.
|
||||
//
|
||||
// See: docs/WORKER_TASK_ARCHITECTURE.md#concurrent-task-processing
|
||||
// =============================================================================
|
||||
|
||||
// Maximum number of tasks this worker will run concurrently
|
||||
// Tune based on workload: I/O-bound tasks benefit from higher concurrency
|
||||
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
|
||||
|
||||
// When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
|
||||
// Default 85% - gives headroom before OOM
|
||||
const MEMORY_BACKOFF_THRESHOLD = parseFloat(process.env.MEMORY_BACKOFF_THRESHOLD || '0.85');
|
||||
|
||||
// When CPU usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
|
||||
// Default 90% - allows some burst capacity
|
||||
const CPU_BACKOFF_THRESHOLD = parseFloat(process.env.CPU_BACKOFF_THRESHOLD || '0.90');
|
||||
|
||||
// How long to wait (ms) when in backoff state before rechecking resources
|
||||
const BACKOFF_DURATION_MS = parseInt(process.env.BACKOFF_DURATION_MS || '10000');
|
||||
|
||||
export interface TaskContext {
|
||||
pool: Pool;
|
||||
workerId: string;
|
||||
@@ -94,6 +121,25 @@ const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
|
||||
analytics_refresh: handleAnalyticsRefresh,
|
||||
};
|
||||
|
||||
/**
|
||||
* Resource usage stats reported to the registry and used for backoff decisions.
|
||||
* These values are included in worker heartbeats and displayed in the UI.
|
||||
*/
|
||||
interface ResourceStats {
|
||||
/** Current heap memory usage as decimal (0.0 to 1.0) */
|
||||
memoryPercent: number;
|
||||
/** Current heap used in MB */
|
||||
memoryMb: number;
|
||||
/** Total heap available in MB */
|
||||
memoryTotalMb: number;
|
||||
/** CPU usage percentage since last check (0 to 100) */
|
||||
cpuPercent: number;
|
||||
/** True if worker is currently in backoff state */
|
||||
isBackingOff: boolean;
|
||||
/** Reason for backoff (e.g., "Memory at 87.3% (threshold: 85%)") */
|
||||
backoffReason: string | null;
|
||||
}
|
||||
|
||||
export class TaskWorker {
|
||||
private pool: Pool;
|
||||
private workerId: string;
|
||||
@@ -102,14 +148,106 @@ export class TaskWorker {
|
||||
private isRunning: boolean = false;
|
||||
private heartbeatInterval: NodeJS.Timeout | null = null;
|
||||
private registryHeartbeatInterval: NodeJS.Timeout | null = null;
|
||||
private currentTask: WorkerTask | null = null;
|
||||
private crawlRotator: CrawlRotator;
|
||||
|
||||
// ==========================================================================
|
||||
// CONCURRENT TASK TRACKING
|
||||
// ==========================================================================
|
||||
// activeTasks: Map of task ID -> task object for all currently running tasks
|
||||
// taskPromises: Map of task ID -> Promise for cleanup when task completes
|
||||
// maxConcurrentTasks: How many tasks this worker will run in parallel
|
||||
// ==========================================================================
|
||||
private activeTasks: Map<number, WorkerTask> = new Map();
|
||||
private taskPromises: Map<number, Promise<void>> = new Map();
|
||||
private maxConcurrentTasks: number = MAX_CONCURRENT_TASKS;
|
||||
|
||||
// ==========================================================================
|
||||
// RESOURCE MONITORING FOR BACKOFF
|
||||
// ==========================================================================
|
||||
// CPU tracking uses differential measurement - we track last values and
|
||||
// calculate percentage based on elapsed time since last check.
|
||||
// ==========================================================================
|
||||
private lastCpuUsage: { user: number; system: number } = { user: 0, system: 0 };
|
||||
private lastCpuCheck: number = Date.now();
|
||||
private isBackingOff: boolean = false;
|
||||
private backoffReason: string | null = null;
|
||||
|
||||
constructor(role: TaskRole | null = null, workerId?: string) {
|
||||
this.pool = getPool();
|
||||
this.role = role;
|
||||
this.workerId = workerId || `worker-${uuidv4().slice(0, 8)}`;
|
||||
this.crawlRotator = new CrawlRotator(this.pool);
|
||||
|
||||
// Initialize CPU tracking
|
||||
const cpuUsage = process.cpuUsage();
|
||||
this.lastCpuUsage = { user: cpuUsage.user, system: cpuUsage.system };
|
||||
this.lastCpuCheck = Date.now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current resource usage
|
||||
*/
|
||||
private getResourceStats(): ResourceStats {
|
||||
const memUsage = process.memoryUsage();
|
||||
const heapUsedMb = memUsage.heapUsed / 1024 / 1024;
|
||||
const heapTotalMb = memUsage.heapTotal / 1024 / 1024;
|
||||
const memoryPercent = heapUsedMb / heapTotalMb;
|
||||
|
||||
// Calculate CPU usage since last check
|
||||
const cpuUsage = process.cpuUsage();
|
||||
const now = Date.now();
|
||||
const elapsed = now - this.lastCpuCheck;
|
||||
|
||||
let cpuPercent = 0;
|
||||
if (elapsed > 0) {
|
||||
const userDiff = (cpuUsage.user - this.lastCpuUsage.user) / 1000; // microseconds to ms
|
||||
const systemDiff = (cpuUsage.system - this.lastCpuUsage.system) / 1000;
|
||||
cpuPercent = ((userDiff + systemDiff) / elapsed) * 100;
|
||||
}
|
||||
|
||||
// Update last values
|
||||
this.lastCpuUsage = { user: cpuUsage.user, system: cpuUsage.system };
|
||||
this.lastCpuCheck = now;
|
||||
|
||||
return {
|
||||
memoryPercent,
|
||||
memoryMb: Math.round(heapUsedMb),
|
||||
memoryTotalMb: Math.round(heapTotalMb),
|
||||
cpuPercent: Math.min(100, cpuPercent), // Cap at 100%
|
||||
isBackingOff: this.isBackingOff,
|
||||
backoffReason: this.backoffReason,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if we should back off from taking new tasks
|
||||
*/
|
||||
private shouldBackOff(): { backoff: boolean; reason: string | null } {
|
||||
const stats = this.getResourceStats();
|
||||
|
||||
if (stats.memoryPercent > MEMORY_BACKOFF_THRESHOLD) {
|
||||
return { backoff: true, reason: `Memory at ${(stats.memoryPercent * 100).toFixed(1)}% (threshold: ${MEMORY_BACKOFF_THRESHOLD * 100}%)` };
|
||||
}
|
||||
|
||||
if (stats.cpuPercent > CPU_BACKOFF_THRESHOLD * 100) {
|
||||
return { backoff: true, reason: `CPU at ${stats.cpuPercent.toFixed(1)}% (threshold: ${CPU_BACKOFF_THRESHOLD * 100}%)` };
|
||||
}
|
||||
|
||||
return { backoff: false, reason: null };
|
||||
}
|
||||
|
||||
/**
|
||||
* Get count of currently running tasks
|
||||
*/
|
||||
get activeTaskCount(): number {
|
||||
return this.activeTasks.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if we can accept more tasks
|
||||
*/
|
||||
private canAcceptMoreTasks(): boolean {
|
||||
return this.activeTasks.size < this.maxConcurrentTasks;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -252,21 +390,32 @@ export class TaskWorker {
|
||||
const memUsage = process.memoryUsage();
|
||||
const cpuUsage = process.cpuUsage();
|
||||
const proxyLocation = this.crawlRotator.getProxyLocation();
|
||||
const resourceStats = this.getResourceStats();
|
||||
|
||||
// Get array of active task IDs
|
||||
const activeTaskIds = Array.from(this.activeTasks.keys());
|
||||
|
||||
await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
worker_id: this.workerId,
|
||||
current_task_id: this.currentTask?.id || null,
|
||||
status: this.currentTask ? 'active' : 'idle',
|
||||
current_task_id: activeTaskIds[0] || null, // Primary task for backwards compat
|
||||
current_task_ids: activeTaskIds, // All active tasks
|
||||
active_task_count: this.activeTasks.size,
|
||||
max_concurrent_tasks: this.maxConcurrentTasks,
|
||||
status: this.activeTasks.size > 0 ? 'active' : 'idle',
|
||||
resources: {
|
||||
memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024),
|
||||
memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024),
|
||||
memory_rss_mb: Math.round(memUsage.rss / 1024 / 1024),
|
||||
memory_percent: Math.round(resourceStats.memoryPercent * 100),
|
||||
cpu_user_ms: Math.round(cpuUsage.user / 1000),
|
||||
cpu_system_ms: Math.round(cpuUsage.system / 1000),
|
||||
cpu_percent: Math.round(resourceStats.cpuPercent),
|
||||
proxy_location: proxyLocation,
|
||||
is_backing_off: this.isBackingOff,
|
||||
backoff_reason: this.backoffReason,
|
||||
}
|
||||
})
|
||||
});
|
||||
@@ -328,20 +477,85 @@ export class TaskWorker {
|
||||
this.startRegistryHeartbeat();
|
||||
|
||||
const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)';
|
||||
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg}`);
|
||||
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (max ${this.maxConcurrentTasks} concurrent tasks)`);
|
||||
|
||||
while (this.isRunning) {
|
||||
try {
|
||||
await this.processNextTask();
|
||||
await this.mainLoop();
|
||||
} catch (error: any) {
|
||||
console.error(`[TaskWorker] Loop error:`, error.message);
|
||||
await this.sleep(POLL_INTERVAL_MS);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for any remaining tasks to complete
|
||||
if (this.taskPromises.size > 0) {
|
||||
console.log(`[TaskWorker] Waiting for ${this.taskPromises.size} active tasks to complete...`);
|
||||
await Promise.allSettled(this.taskPromises.values());
|
||||
}
|
||||
|
||||
console.log(`[TaskWorker] Worker ${this.workerId} stopped`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Main loop - tries to fill up to maxConcurrentTasks
|
||||
*/
|
||||
private async mainLoop(): Promise<void> {
|
||||
// Check resource usage and backoff if needed
|
||||
const { backoff, reason } = this.shouldBackOff();
|
||||
if (backoff) {
|
||||
if (!this.isBackingOff) {
|
||||
console.log(`[TaskWorker] ${this.friendlyName} backing off: ${reason}`);
|
||||
}
|
||||
this.isBackingOff = true;
|
||||
this.backoffReason = reason;
|
||||
await this.sleep(BACKOFF_DURATION_MS);
|
||||
return;
|
||||
}
|
||||
|
||||
// Clear backoff state
|
||||
if (this.isBackingOff) {
|
||||
console.log(`[TaskWorker] ${this.friendlyName} resuming normal operation`);
|
||||
this.isBackingOff = false;
|
||||
this.backoffReason = null;
|
||||
}
|
||||
|
||||
// Check for decommission signal
|
||||
const shouldDecommission = await this.checkDecommission();
|
||||
if (shouldDecommission) {
|
||||
console.log(`[TaskWorker] ${this.friendlyName} received decommission signal - waiting for ${this.activeTasks.size} tasks to complete`);
|
||||
// Stop accepting new tasks, wait for current to finish
|
||||
this.isRunning = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// Try to claim more tasks if we have capacity
|
||||
if (this.canAcceptMoreTasks()) {
|
||||
const task = await taskService.claimTask(this.role, this.workerId);
|
||||
|
||||
if (task) {
|
||||
console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`);
|
||||
this.activeTasks.set(task.id, task);
|
||||
|
||||
// Start task in background (don't await)
|
||||
const taskPromise = this.executeTask(task);
|
||||
this.taskPromises.set(task.id, taskPromise);
|
||||
|
||||
// Clean up when done
|
||||
taskPromise.finally(() => {
|
||||
this.activeTasks.delete(task.id);
|
||||
this.taskPromises.delete(task.id);
|
||||
});
|
||||
|
||||
// Immediately try to claim more tasks (don't wait for poll interval)
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// No task claimed or at capacity - wait before next poll
|
||||
await this.sleep(POLL_INTERVAL_MS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the worker
|
||||
*/
|
||||
@@ -354,23 +568,10 @@ export class TaskWorker {
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the next available task
|
||||
* Execute a single task (runs concurrently with other tasks)
|
||||
*/
|
||||
private async processNextTask(): Promise<void> {
|
||||
// Try to claim a task
|
||||
const task = await taskService.claimTask(this.role, this.workerId);
|
||||
|
||||
if (!task) {
|
||||
// No tasks available, wait and retry
|
||||
await this.sleep(POLL_INTERVAL_MS);
|
||||
return;
|
||||
}
|
||||
|
||||
this.currentTask = task;
|
||||
console.log(`[TaskWorker] Claimed task ${task.id} (${task.role}) for dispensary ${task.dispensary_id || 'N/A'}`);
|
||||
|
||||
// Start heartbeat
|
||||
this.startHeartbeat(task.id);
|
||||
private async executeTask(task: WorkerTask): Promise<void> {
|
||||
console.log(`[TaskWorker] ${this.friendlyName} starting task ${task.id} (${task.role}) for dispensary ${task.dispensary_id || 'N/A'}`);
|
||||
|
||||
try {
|
||||
// Mark as running
|
||||
@@ -399,7 +600,7 @@ export class TaskWorker {
|
||||
// Mark as completed
|
||||
await taskService.completeTask(task.id, result);
|
||||
await this.reportTaskCompletion(true);
|
||||
console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id}`);
|
||||
console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id} [${this.activeTasks.size}/${this.maxConcurrentTasks} active]`);
|
||||
|
||||
// Chain next task if applicable
|
||||
const chainedTask = await taskService.chainNextTask({
|
||||
@@ -421,9 +622,35 @@ export class TaskWorker {
|
||||
await taskService.failTask(task.id, error.message);
|
||||
await this.reportTaskCompletion(false);
|
||||
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} error:`, error.message);
|
||||
} finally {
|
||||
this.stopHeartbeat();
|
||||
this.currentTask = null;
|
||||
}
|
||||
// Note: cleanup (removing from activeTasks) is handled in mainLoop's finally block
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this worker has been flagged for decommission
|
||||
* Returns true if worker should stop after current task
|
||||
*/
|
||||
private async checkDecommission(): Promise<boolean> {
|
||||
try {
|
||||
// Check worker_registry for decommission flag
|
||||
const result = await this.pool.query(
|
||||
`SELECT decommission_requested, decommission_reason
|
||||
FROM worker_registry
|
||||
WHERE worker_id = $1`,
|
||||
[this.workerId]
|
||||
);
|
||||
|
||||
if (result.rows.length > 0 && result.rows[0].decommission_requested) {
|
||||
const reason = result.rows[0].decommission_reason || 'No reason provided';
|
||||
console.log(`[TaskWorker] Decommission requested: ${reason}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
} catch (error: any) {
|
||||
// If we can't check, continue running
|
||||
console.warn(`[TaskWorker] Could not check decommission status: ${error.message}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -460,12 +687,25 @@ export class TaskWorker {
|
||||
/**
|
||||
* Get worker info
|
||||
*/
|
||||
getInfo(): { workerId: string; role: TaskRole | null; isRunning: boolean; currentTaskId: number | null } {
|
||||
getInfo(): {
|
||||
workerId: string;
|
||||
role: TaskRole | null;
|
||||
isRunning: boolean;
|
||||
activeTaskIds: number[];
|
||||
activeTaskCount: number;
|
||||
maxConcurrentTasks: number;
|
||||
isBackingOff: boolean;
|
||||
backoffReason: string | null;
|
||||
} {
|
||||
return {
|
||||
workerId: this.workerId,
|
||||
role: this.role,
|
||||
isRunning: this.isRunning,
|
||||
currentTaskId: this.currentTask?.id || null,
|
||||
activeTaskIds: Array.from(this.activeTasks.keys()),
|
||||
activeTaskCount: this.activeTasks.size,
|
||||
maxConcurrentTasks: this.maxConcurrentTasks,
|
||||
isBackingOff: this.isBackingOff,
|
||||
backoffReason: this.backoffReason,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user