Compare commits
4 Commits
fix/worker
...
feat/task-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8a09691e91 | ||
|
|
01810c40a1 | ||
|
|
61e915968f | ||
|
|
a4338669a9 |
@@ -13,6 +13,12 @@ import {
|
||||
TaskFilter,
|
||||
} from '../tasks/task-service';
|
||||
import { pool } from '../db/pool';
|
||||
import {
|
||||
isTaskPoolPaused,
|
||||
pauseTaskPool,
|
||||
resumeTaskPool,
|
||||
getTaskPoolStatus,
|
||||
} from '../tasks/task-pool-state';
|
||||
|
||||
const router = Router();
|
||||
|
||||
@@ -592,4 +598,42 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => {
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/tasks/pool/status
|
||||
* Check if task pool is paused
|
||||
*/
|
||||
router.get('/pool/status', async (_req: Request, res: Response) => {
|
||||
const status = getTaskPoolStatus();
|
||||
res.json({
|
||||
success: true,
|
||||
...status,
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/tasks/pool/pause
|
||||
* Pause the task pool - workers won't pick up new tasks
|
||||
*/
|
||||
router.post('/pool/pause', async (_req: Request, res: Response) => {
|
||||
pauseTaskPool();
|
||||
res.json({
|
||||
success: true,
|
||||
paused: true,
|
||||
message: 'Task pool paused - workers will not pick up new tasks',
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/tasks/pool/resume
|
||||
* Resume the task pool - workers will pick up tasks again
|
||||
*/
|
||||
router.post('/pool/resume', async (_req: Request, res: Response) => {
|
||||
resumeTaskPool();
|
||||
res.json({
|
||||
success: true,
|
||||
paused: false,
|
||||
message: 'Task pool resumed - workers will pick up new tasks',
|
||||
});
|
||||
});
|
||||
|
||||
export default router;
|
||||
|
||||
@@ -25,7 +25,7 @@ export async function handleStoreDiscovery(ctx: TaskContext): Promise<TaskResult
|
||||
try {
|
||||
// Get states to discover
|
||||
const statesResult = await pool.query(`
|
||||
SELECT code FROM states WHERE active = true ORDER BY code
|
||||
SELECT code FROM states WHERE is_active = true ORDER BY code
|
||||
`);
|
||||
const stateCodes = statesResult.rows.map(r => r.code);
|
||||
|
||||
|
||||
35
backend/src/tasks/task-pool-state.ts
Normal file
35
backend/src/tasks/task-pool-state.ts
Normal file
@@ -0,0 +1,35 @@
|
||||
/**
|
||||
* Task Pool State
|
||||
*
|
||||
* Shared state for task pool pause/resume functionality.
|
||||
* This is kept separate to avoid circular dependencies between
|
||||
* task-service.ts and routes/tasks.ts.
|
||||
*
|
||||
* State is in-memory and resets on server restart.
|
||||
* By default, the pool is OPEN (not paused).
|
||||
*/
|
||||
|
||||
let taskPoolPaused = false;
|
||||
|
||||
export function isTaskPoolPaused(): boolean {
|
||||
return taskPoolPaused;
|
||||
}
|
||||
|
||||
export function pauseTaskPool(): void {
|
||||
taskPoolPaused = true;
|
||||
console.log('[TaskPool] Task pool PAUSED - workers will not pick up new tasks');
|
||||
}
|
||||
|
||||
export function resumeTaskPool(): void {
|
||||
taskPoolPaused = false;
|
||||
console.log('[TaskPool] Task pool RESUMED - workers can pick up tasks');
|
||||
}
|
||||
|
||||
export function getTaskPoolStatus(): { paused: boolean; message: string } {
|
||||
return {
|
||||
paused: taskPoolPaused,
|
||||
message: taskPoolPaused
|
||||
? 'Task pool is paused - workers will not pick up new tasks'
|
||||
: 'Task pool is open - workers are picking up tasks',
|
||||
};
|
||||
}
|
||||
@@ -9,6 +9,7 @@
|
||||
*/
|
||||
|
||||
import { pool } from '../db/pool';
|
||||
import { isTaskPoolPaused } from './task-pool-state';
|
||||
|
||||
// Helper to check if a table exists
|
||||
async function tableExists(tableName: string): Promise<boolean> {
|
||||
@@ -149,8 +150,14 @@ class TaskService {
|
||||
/**
|
||||
* Claim a task atomically for a worker
|
||||
* If role is null, claims ANY available task (role-agnostic worker)
|
||||
* Returns null if task pool is paused.
|
||||
*/
|
||||
async claimTask(role: TaskRole | null, workerId: string): Promise<WorkerTask | null> {
|
||||
// Check if task pool is paused - don't claim any tasks
|
||||
if (isTaskPoolPaused()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (role) {
|
||||
// Role-specific claiming - use the SQL function
|
||||
const result = await pool.query(
|
||||
@@ -170,7 +177,6 @@ class TaskService {
|
||||
WHERE id = (
|
||||
SELECT id FROM worker_tasks
|
||||
WHERE status = 'pending'
|
||||
AND active = true
|
||||
AND (scheduled_for IS NULL OR scheduled_for <= NOW())
|
||||
-- Exclude stores that already have an active task
|
||||
AND (dispensary_id IS NULL OR dispensary_id NOT IN (
|
||||
|
||||
@@ -2888,6 +2888,27 @@ class ApiClient {
|
||||
`/api/tasks/store/${dispensaryId}/active`
|
||||
);
|
||||
}
|
||||
|
||||
// Task Pool Control
|
||||
async getTaskPoolStatus() {
|
||||
return this.request<{ success: boolean; paused: boolean; message: string }>(
|
||||
'/api/tasks/pool/status'
|
||||
);
|
||||
}
|
||||
|
||||
async pauseTaskPool() {
|
||||
return this.request<{ success: boolean; paused: boolean; message: string }>(
|
||||
'/api/tasks/pool/pause',
|
||||
{ method: 'POST' }
|
||||
);
|
||||
}
|
||||
|
||||
async resumeTaskPool() {
|
||||
return this.request<{ success: boolean; paused: boolean; message: string }>(
|
||||
'/api/tasks/pool/resume',
|
||||
{ method: 'POST' }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export const api = new ApiClient(API_URL);
|
||||
|
||||
@@ -14,8 +14,9 @@ import {
|
||||
ChevronUp,
|
||||
Gauge,
|
||||
Users,
|
||||
Calendar,
|
||||
Zap,
|
||||
Power,
|
||||
Play,
|
||||
Square,
|
||||
} from 'lucide-react';
|
||||
|
||||
interface Task {
|
||||
@@ -82,6 +83,27 @@ const STATUS_COLORS: Record<string, string> = {
|
||||
stale: 'bg-gray-100 text-gray-800',
|
||||
};
|
||||
|
||||
const getStatusIcon = (status: string, poolPaused: boolean): React.ReactNode => {
|
||||
switch (status) {
|
||||
case 'pending':
|
||||
return <Clock className="w-4 h-4" />;
|
||||
case 'claimed':
|
||||
return <PlayCircle className="w-4 h-4" />;
|
||||
case 'running':
|
||||
// Don't spin when pool is paused
|
||||
return <RefreshCw className={`w-4 h-4 ${!poolPaused ? 'animate-spin' : ''}`} />;
|
||||
case 'completed':
|
||||
return <CheckCircle2 className="w-4 h-4" />;
|
||||
case 'failed':
|
||||
return <XCircle className="w-4 h-4" />;
|
||||
case 'stale':
|
||||
return <AlertTriangle className="w-4 h-4" />;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
// Static version for summary cards (always shows animation)
|
||||
const STATUS_ICONS: Record<string, React.ReactNode> = {
|
||||
pending: <Clock className="w-4 h-4" />,
|
||||
claimed: <PlayCircle className="w-4 h-4" />,
|
||||
@@ -116,6 +138,8 @@ export default function TasksDashboard() {
|
||||
const [capacity, setCapacity] = useState<CapacityMetric[]>([]);
|
||||
const [loading, setLoading] = useState(true);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const [poolPaused, setPoolPaused] = useState(false);
|
||||
const [poolLoading, setPoolLoading] = useState(false);
|
||||
|
||||
// Filters
|
||||
const [roleFilter, setRoleFilter] = useState<string>('');
|
||||
@@ -123,13 +147,10 @@ export default function TasksDashboard() {
|
||||
const [searchQuery, setSearchQuery] = useState('');
|
||||
const [showCapacity, setShowCapacity] = useState(true);
|
||||
|
||||
// Actions
|
||||
const [actionLoading, setActionLoading] = useState(false);
|
||||
const [actionMessage, setActionMessage] = useState<string | null>(null);
|
||||
|
||||
const fetchData = async () => {
|
||||
try {
|
||||
const [tasksRes, countsRes, capacityRes] = await Promise.all([
|
||||
const [tasksRes, countsRes, capacityRes, poolStatus] = await Promise.all([
|
||||
api.getTasks({
|
||||
role: roleFilter || undefined,
|
||||
status: statusFilter || undefined,
|
||||
@@ -137,11 +158,13 @@ export default function TasksDashboard() {
|
||||
}),
|
||||
api.getTaskCounts(),
|
||||
api.getTaskCapacity(),
|
||||
api.getTaskPoolStatus(),
|
||||
]);
|
||||
|
||||
setTasks(tasksRes.tasks || []);
|
||||
setCounts(countsRes);
|
||||
setCapacity(capacityRes.metrics || []);
|
||||
setPoolPaused(poolStatus.paused);
|
||||
setError(null);
|
||||
} catch (err: any) {
|
||||
setError(err.message || 'Failed to load tasks');
|
||||
@@ -150,40 +173,29 @@ export default function TasksDashboard() {
|
||||
}
|
||||
};
|
||||
|
||||
const togglePool = async () => {
|
||||
setPoolLoading(true);
|
||||
try {
|
||||
if (poolPaused) {
|
||||
await api.resumeTaskPool();
|
||||
setPoolPaused(false);
|
||||
} else {
|
||||
await api.pauseTaskPool();
|
||||
setPoolPaused(true);
|
||||
}
|
||||
} catch (err: any) {
|
||||
setError(err.message || 'Failed to toggle pool');
|
||||
} finally {
|
||||
setPoolLoading(false);
|
||||
}
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
fetchData();
|
||||
const interval = setInterval(fetchData, 10000); // Refresh every 10 seconds
|
||||
const interval = setInterval(fetchData, 15000); // Auto-refresh every 15 seconds
|
||||
return () => clearInterval(interval);
|
||||
}, [roleFilter, statusFilter]);
|
||||
|
||||
const handleGenerateResync = async () => {
|
||||
setActionLoading(true);
|
||||
try {
|
||||
const result = await api.generateResyncTasks();
|
||||
setActionMessage(`Generated ${result.tasks_created} resync tasks`);
|
||||
fetchData();
|
||||
} catch (err: any) {
|
||||
setActionMessage(`Error: ${err.message}`);
|
||||
} finally {
|
||||
setActionLoading(false);
|
||||
setTimeout(() => setActionMessage(null), 5000);
|
||||
}
|
||||
};
|
||||
|
||||
const handleRecoverStale = async () => {
|
||||
setActionLoading(true);
|
||||
try {
|
||||
const result = await api.recoverStaleTasks();
|
||||
setActionMessage(`Recovered ${result.tasks_recovered} stale tasks`);
|
||||
fetchData();
|
||||
} catch (err: any) {
|
||||
setActionMessage(`Error: ${err.message}`);
|
||||
} finally {
|
||||
setActionLoading(false);
|
||||
setTimeout(() => setActionMessage(null), 5000);
|
||||
}
|
||||
};
|
||||
|
||||
const filteredTasks = tasks.filter((task) => {
|
||||
if (searchQuery) {
|
||||
const query = searchQuery.toLowerCase();
|
||||
@@ -225,45 +237,32 @@ export default function TasksDashboard() {
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<div className="flex gap-2">
|
||||
<div className="flex items-center gap-4">
|
||||
{/* Pool Toggle */}
|
||||
<button
|
||||
onClick={handleGenerateResync}
|
||||
disabled={actionLoading}
|
||||
className="flex items-center gap-2 px-4 py-2 bg-emerald-600 text-white rounded-lg hover:bg-emerald-700 disabled:opacity-50"
|
||||
>
|
||||
<Calendar className="w-4 h-4" />
|
||||
Generate Resync
|
||||
</button>
|
||||
<button
|
||||
onClick={handleRecoverStale}
|
||||
disabled={actionLoading}
|
||||
className="flex items-center gap-2 px-4 py-2 bg-gray-600 text-white rounded-lg hover:bg-gray-700 disabled:opacity-50"
|
||||
>
|
||||
<Zap className="w-4 h-4" />
|
||||
Recover Stale
|
||||
</button>
|
||||
<button
|
||||
onClick={fetchData}
|
||||
className="flex items-center gap-2 px-4 py-2 bg-gray-100 text-gray-700 rounded-lg hover:bg-gray-200"
|
||||
>
|
||||
<RefreshCw className="w-4 h-4" />
|
||||
Refresh
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{/* Action Message */}
|
||||
{actionMessage && (
|
||||
<div
|
||||
className={`p-4 rounded-lg ${
|
||||
actionMessage.startsWith('Error')
|
||||
? 'bg-red-50 text-red-700'
|
||||
: 'bg-green-50 text-green-700'
|
||||
onClick={togglePool}
|
||||
disabled={poolLoading}
|
||||
className={`flex items-center gap-2 px-4 py-2 rounded-lg font-medium transition-colors ${
|
||||
poolPaused
|
||||
? 'bg-emerald-100 text-emerald-700 hover:bg-emerald-200'
|
||||
: 'bg-red-100 text-red-700 hover:bg-red-200'
|
||||
}`}
|
||||
>
|
||||
{actionMessage}
|
||||
</div>
|
||||
{poolPaused ? (
|
||||
<>
|
||||
<Play className={`w-5 h-5 ${poolLoading ? 'animate-pulse' : ''}`} />
|
||||
Start Pool
|
||||
</>
|
||||
) : (
|
||||
<>
|
||||
<Square className={`w-5 h-5 ${poolLoading ? 'animate-pulse' : ''}`} />
|
||||
Stop Pool
|
||||
</>
|
||||
)}
|
||||
</button>
|
||||
<span className="text-sm text-gray-400">Auto-refreshes every 15s</span>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{error && (
|
||||
<div className="p-4 bg-red-50 text-red-700 rounded-lg">{error}</div>
|
||||
@@ -496,7 +495,7 @@ export default function TasksDashboard() {
|
||||
STATUS_COLORS[task.status]
|
||||
}`}
|
||||
>
|
||||
{STATUS_ICONS[task.status]}
|
||||
{getStatusIcon(task.status, poolPaused)}
|
||||
{task.status}
|
||||
</span>
|
||||
</td>
|
||||
|
||||
Reference in New Issue
Block a user