Compare commits
4 Commits
fix/worker
...
feat/task-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8a09691e91 | ||
|
|
01810c40a1 | ||
|
|
61e915968f | ||
|
|
a4338669a9 |
@@ -13,6 +13,12 @@ import {
|
|||||||
TaskFilter,
|
TaskFilter,
|
||||||
} from '../tasks/task-service';
|
} from '../tasks/task-service';
|
||||||
import { pool } from '../db/pool';
|
import { pool } from '../db/pool';
|
||||||
|
import {
|
||||||
|
isTaskPoolPaused,
|
||||||
|
pauseTaskPool,
|
||||||
|
resumeTaskPool,
|
||||||
|
getTaskPoolStatus,
|
||||||
|
} from '../tasks/task-pool-state';
|
||||||
|
|
||||||
const router = Router();
|
const router = Router();
|
||||||
|
|
||||||
@@ -592,4 +598,42 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GET /api/tasks/pool/status
|
||||||
|
* Check if task pool is paused
|
||||||
|
*/
|
||||||
|
router.get('/pool/status', async (_req: Request, res: Response) => {
|
||||||
|
const status = getTaskPoolStatus();
|
||||||
|
res.json({
|
||||||
|
success: true,
|
||||||
|
...status,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* POST /api/tasks/pool/pause
|
||||||
|
* Pause the task pool - workers won't pick up new tasks
|
||||||
|
*/
|
||||||
|
router.post('/pool/pause', async (_req: Request, res: Response) => {
|
||||||
|
pauseTaskPool();
|
||||||
|
res.json({
|
||||||
|
success: true,
|
||||||
|
paused: true,
|
||||||
|
message: 'Task pool paused - workers will not pick up new tasks',
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
/**
|
||||||
|
* POST /api/tasks/pool/resume
|
||||||
|
* Resume the task pool - workers will pick up tasks again
|
||||||
|
*/
|
||||||
|
router.post('/pool/resume', async (_req: Request, res: Response) => {
|
||||||
|
resumeTaskPool();
|
||||||
|
res.json({
|
||||||
|
success: true,
|
||||||
|
paused: false,
|
||||||
|
message: 'Task pool resumed - workers will pick up new tasks',
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
export default router;
|
export default router;
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ export async function handleStoreDiscovery(ctx: TaskContext): Promise<TaskResult
|
|||||||
try {
|
try {
|
||||||
// Get states to discover
|
// Get states to discover
|
||||||
const statesResult = await pool.query(`
|
const statesResult = await pool.query(`
|
||||||
SELECT code FROM states WHERE active = true ORDER BY code
|
SELECT code FROM states WHERE is_active = true ORDER BY code
|
||||||
`);
|
`);
|
||||||
const stateCodes = statesResult.rows.map(r => r.code);
|
const stateCodes = statesResult.rows.map(r => r.code);
|
||||||
|
|
||||||
|
|||||||
35
backend/src/tasks/task-pool-state.ts
Normal file
35
backend/src/tasks/task-pool-state.ts
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
/**
|
||||||
|
* Task Pool State
|
||||||
|
*
|
||||||
|
* Shared state for task pool pause/resume functionality.
|
||||||
|
* This is kept separate to avoid circular dependencies between
|
||||||
|
* task-service.ts and routes/tasks.ts.
|
||||||
|
*
|
||||||
|
* State is in-memory and resets on server restart.
|
||||||
|
* By default, the pool is OPEN (not paused).
|
||||||
|
*/
|
||||||
|
|
||||||
|
let taskPoolPaused = false;
|
||||||
|
|
||||||
|
export function isTaskPoolPaused(): boolean {
|
||||||
|
return taskPoolPaused;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function pauseTaskPool(): void {
|
||||||
|
taskPoolPaused = true;
|
||||||
|
console.log('[TaskPool] Task pool PAUSED - workers will not pick up new tasks');
|
||||||
|
}
|
||||||
|
|
||||||
|
export function resumeTaskPool(): void {
|
||||||
|
taskPoolPaused = false;
|
||||||
|
console.log('[TaskPool] Task pool RESUMED - workers can pick up tasks');
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getTaskPoolStatus(): { paused: boolean; message: string } {
|
||||||
|
return {
|
||||||
|
paused: taskPoolPaused,
|
||||||
|
message: taskPoolPaused
|
||||||
|
? 'Task pool is paused - workers will not pick up new tasks'
|
||||||
|
: 'Task pool is open - workers are picking up tasks',
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -9,6 +9,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { pool } from '../db/pool';
|
import { pool } from '../db/pool';
|
||||||
|
import { isTaskPoolPaused } from './task-pool-state';
|
||||||
|
|
||||||
// Helper to check if a table exists
|
// Helper to check if a table exists
|
||||||
async function tableExists(tableName: string): Promise<boolean> {
|
async function tableExists(tableName: string): Promise<boolean> {
|
||||||
@@ -149,8 +150,14 @@ class TaskService {
|
|||||||
/**
|
/**
|
||||||
* Claim a task atomically for a worker
|
* Claim a task atomically for a worker
|
||||||
* If role is null, claims ANY available task (role-agnostic worker)
|
* If role is null, claims ANY available task (role-agnostic worker)
|
||||||
|
* Returns null if task pool is paused.
|
||||||
*/
|
*/
|
||||||
async claimTask(role: TaskRole | null, workerId: string): Promise<WorkerTask | null> {
|
async claimTask(role: TaskRole | null, workerId: string): Promise<WorkerTask | null> {
|
||||||
|
// Check if task pool is paused - don't claim any tasks
|
||||||
|
if (isTaskPoolPaused()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
if (role) {
|
if (role) {
|
||||||
// Role-specific claiming - use the SQL function
|
// Role-specific claiming - use the SQL function
|
||||||
const result = await pool.query(
|
const result = await pool.query(
|
||||||
@@ -170,7 +177,6 @@ class TaskService {
|
|||||||
WHERE id = (
|
WHERE id = (
|
||||||
SELECT id FROM worker_tasks
|
SELECT id FROM worker_tasks
|
||||||
WHERE status = 'pending'
|
WHERE status = 'pending'
|
||||||
AND active = true
|
|
||||||
AND (scheduled_for IS NULL OR scheduled_for <= NOW())
|
AND (scheduled_for IS NULL OR scheduled_for <= NOW())
|
||||||
-- Exclude stores that already have an active task
|
-- Exclude stores that already have an active task
|
||||||
AND (dispensary_id IS NULL OR dispensary_id NOT IN (
|
AND (dispensary_id IS NULL OR dispensary_id NOT IN (
|
||||||
|
|||||||
@@ -2888,6 +2888,27 @@ class ApiClient {
|
|||||||
`/api/tasks/store/${dispensaryId}/active`
|
`/api/tasks/store/${dispensaryId}/active`
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Task Pool Control
|
||||||
|
async getTaskPoolStatus() {
|
||||||
|
return this.request<{ success: boolean; paused: boolean; message: string }>(
|
||||||
|
'/api/tasks/pool/status'
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async pauseTaskPool() {
|
||||||
|
return this.request<{ success: boolean; paused: boolean; message: string }>(
|
||||||
|
'/api/tasks/pool/pause',
|
||||||
|
{ method: 'POST' }
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async resumeTaskPool() {
|
||||||
|
return this.request<{ success: boolean; paused: boolean; message: string }>(
|
||||||
|
'/api/tasks/pool/resume',
|
||||||
|
{ method: 'POST' }
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const api = new ApiClient(API_URL);
|
export const api = new ApiClient(API_URL);
|
||||||
|
|||||||
@@ -14,8 +14,9 @@ import {
|
|||||||
ChevronUp,
|
ChevronUp,
|
||||||
Gauge,
|
Gauge,
|
||||||
Users,
|
Users,
|
||||||
Calendar,
|
Power,
|
||||||
Zap,
|
Play,
|
||||||
|
Square,
|
||||||
} from 'lucide-react';
|
} from 'lucide-react';
|
||||||
|
|
||||||
interface Task {
|
interface Task {
|
||||||
@@ -82,6 +83,27 @@ const STATUS_COLORS: Record<string, string> = {
|
|||||||
stale: 'bg-gray-100 text-gray-800',
|
stale: 'bg-gray-100 text-gray-800',
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const getStatusIcon = (status: string, poolPaused: boolean): React.ReactNode => {
|
||||||
|
switch (status) {
|
||||||
|
case 'pending':
|
||||||
|
return <Clock className="w-4 h-4" />;
|
||||||
|
case 'claimed':
|
||||||
|
return <PlayCircle className="w-4 h-4" />;
|
||||||
|
case 'running':
|
||||||
|
// Don't spin when pool is paused
|
||||||
|
return <RefreshCw className={`w-4 h-4 ${!poolPaused ? 'animate-spin' : ''}`} />;
|
||||||
|
case 'completed':
|
||||||
|
return <CheckCircle2 className="w-4 h-4" />;
|
||||||
|
case 'failed':
|
||||||
|
return <XCircle className="w-4 h-4" />;
|
||||||
|
case 'stale':
|
||||||
|
return <AlertTriangle className="w-4 h-4" />;
|
||||||
|
default:
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Static version for summary cards (always shows animation)
|
||||||
const STATUS_ICONS: Record<string, React.ReactNode> = {
|
const STATUS_ICONS: Record<string, React.ReactNode> = {
|
||||||
pending: <Clock className="w-4 h-4" />,
|
pending: <Clock className="w-4 h-4" />,
|
||||||
claimed: <PlayCircle className="w-4 h-4" />,
|
claimed: <PlayCircle className="w-4 h-4" />,
|
||||||
@@ -116,6 +138,8 @@ export default function TasksDashboard() {
|
|||||||
const [capacity, setCapacity] = useState<CapacityMetric[]>([]);
|
const [capacity, setCapacity] = useState<CapacityMetric[]>([]);
|
||||||
const [loading, setLoading] = useState(true);
|
const [loading, setLoading] = useState(true);
|
||||||
const [error, setError] = useState<string | null>(null);
|
const [error, setError] = useState<string | null>(null);
|
||||||
|
const [poolPaused, setPoolPaused] = useState(false);
|
||||||
|
const [poolLoading, setPoolLoading] = useState(false);
|
||||||
|
|
||||||
// Filters
|
// Filters
|
||||||
const [roleFilter, setRoleFilter] = useState<string>('');
|
const [roleFilter, setRoleFilter] = useState<string>('');
|
||||||
@@ -123,13 +147,10 @@ export default function TasksDashboard() {
|
|||||||
const [searchQuery, setSearchQuery] = useState('');
|
const [searchQuery, setSearchQuery] = useState('');
|
||||||
const [showCapacity, setShowCapacity] = useState(true);
|
const [showCapacity, setShowCapacity] = useState(true);
|
||||||
|
|
||||||
// Actions
|
|
||||||
const [actionLoading, setActionLoading] = useState(false);
|
|
||||||
const [actionMessage, setActionMessage] = useState<string | null>(null);
|
|
||||||
|
|
||||||
const fetchData = async () => {
|
const fetchData = async () => {
|
||||||
try {
|
try {
|
||||||
const [tasksRes, countsRes, capacityRes] = await Promise.all([
|
const [tasksRes, countsRes, capacityRes, poolStatus] = await Promise.all([
|
||||||
api.getTasks({
|
api.getTasks({
|
||||||
role: roleFilter || undefined,
|
role: roleFilter || undefined,
|
||||||
status: statusFilter || undefined,
|
status: statusFilter || undefined,
|
||||||
@@ -137,11 +158,13 @@ export default function TasksDashboard() {
|
|||||||
}),
|
}),
|
||||||
api.getTaskCounts(),
|
api.getTaskCounts(),
|
||||||
api.getTaskCapacity(),
|
api.getTaskCapacity(),
|
||||||
|
api.getTaskPoolStatus(),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
setTasks(tasksRes.tasks || []);
|
setTasks(tasksRes.tasks || []);
|
||||||
setCounts(countsRes);
|
setCounts(countsRes);
|
||||||
setCapacity(capacityRes.metrics || []);
|
setCapacity(capacityRes.metrics || []);
|
||||||
|
setPoolPaused(poolStatus.paused);
|
||||||
setError(null);
|
setError(null);
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
setError(err.message || 'Failed to load tasks');
|
setError(err.message || 'Failed to load tasks');
|
||||||
@@ -150,40 +173,29 @@ export default function TasksDashboard() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const togglePool = async () => {
|
||||||
|
setPoolLoading(true);
|
||||||
|
try {
|
||||||
|
if (poolPaused) {
|
||||||
|
await api.resumeTaskPool();
|
||||||
|
setPoolPaused(false);
|
||||||
|
} else {
|
||||||
|
await api.pauseTaskPool();
|
||||||
|
setPoolPaused(true);
|
||||||
|
}
|
||||||
|
} catch (err: any) {
|
||||||
|
setError(err.message || 'Failed to toggle pool');
|
||||||
|
} finally {
|
||||||
|
setPoolLoading(false);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
fetchData();
|
fetchData();
|
||||||
const interval = setInterval(fetchData, 10000); // Refresh every 10 seconds
|
const interval = setInterval(fetchData, 15000); // Auto-refresh every 15 seconds
|
||||||
return () => clearInterval(interval);
|
return () => clearInterval(interval);
|
||||||
}, [roleFilter, statusFilter]);
|
}, [roleFilter, statusFilter]);
|
||||||
|
|
||||||
const handleGenerateResync = async () => {
|
|
||||||
setActionLoading(true);
|
|
||||||
try {
|
|
||||||
const result = await api.generateResyncTasks();
|
|
||||||
setActionMessage(`Generated ${result.tasks_created} resync tasks`);
|
|
||||||
fetchData();
|
|
||||||
} catch (err: any) {
|
|
||||||
setActionMessage(`Error: ${err.message}`);
|
|
||||||
} finally {
|
|
||||||
setActionLoading(false);
|
|
||||||
setTimeout(() => setActionMessage(null), 5000);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const handleRecoverStale = async () => {
|
|
||||||
setActionLoading(true);
|
|
||||||
try {
|
|
||||||
const result = await api.recoverStaleTasks();
|
|
||||||
setActionMessage(`Recovered ${result.tasks_recovered} stale tasks`);
|
|
||||||
fetchData();
|
|
||||||
} catch (err: any) {
|
|
||||||
setActionMessage(`Error: ${err.message}`);
|
|
||||||
} finally {
|
|
||||||
setActionLoading(false);
|
|
||||||
setTimeout(() => setActionMessage(null), 5000);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const filteredTasks = tasks.filter((task) => {
|
const filteredTasks = tasks.filter((task) => {
|
||||||
if (searchQuery) {
|
if (searchQuery) {
|
||||||
const query = searchQuery.toLowerCase();
|
const query = searchQuery.toLowerCase();
|
||||||
@@ -225,45 +237,32 @@ export default function TasksDashboard() {
|
|||||||
</p>
|
</p>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div className="flex gap-2">
|
<div className="flex items-center gap-4">
|
||||||
|
{/* Pool Toggle */}
|
||||||
<button
|
<button
|
||||||
onClick={handleGenerateResync}
|
onClick={togglePool}
|
||||||
disabled={actionLoading}
|
disabled={poolLoading}
|
||||||
className="flex items-center gap-2 px-4 py-2 bg-emerald-600 text-white rounded-lg hover:bg-emerald-700 disabled:opacity-50"
|
className={`flex items-center gap-2 px-4 py-2 rounded-lg font-medium transition-colors ${
|
||||||
>
|
poolPaused
|
||||||
<Calendar className="w-4 h-4" />
|
? 'bg-emerald-100 text-emerald-700 hover:bg-emerald-200'
|
||||||
Generate Resync
|
: 'bg-red-100 text-red-700 hover:bg-red-200'
|
||||||
</button>
|
|
||||||
<button
|
|
||||||
onClick={handleRecoverStale}
|
|
||||||
disabled={actionLoading}
|
|
||||||
className="flex items-center gap-2 px-4 py-2 bg-gray-600 text-white rounded-lg hover:bg-gray-700 disabled:opacity-50"
|
|
||||||
>
|
|
||||||
<Zap className="w-4 h-4" />
|
|
||||||
Recover Stale
|
|
||||||
</button>
|
|
||||||
<button
|
|
||||||
onClick={fetchData}
|
|
||||||
className="flex items-center gap-2 px-4 py-2 bg-gray-100 text-gray-700 rounded-lg hover:bg-gray-200"
|
|
||||||
>
|
|
||||||
<RefreshCw className="w-4 h-4" />
|
|
||||||
Refresh
|
|
||||||
</button>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
{/* Action Message */}
|
|
||||||
{actionMessage && (
|
|
||||||
<div
|
|
||||||
className={`p-4 rounded-lg ${
|
|
||||||
actionMessage.startsWith('Error')
|
|
||||||
? 'bg-red-50 text-red-700'
|
|
||||||
: 'bg-green-50 text-green-700'
|
|
||||||
}`}
|
}`}
|
||||||
>
|
>
|
||||||
{actionMessage}
|
{poolPaused ? (
|
||||||
</div>
|
<>
|
||||||
|
<Play className={`w-5 h-5 ${poolLoading ? 'animate-pulse' : ''}`} />
|
||||||
|
Start Pool
|
||||||
|
</>
|
||||||
|
) : (
|
||||||
|
<>
|
||||||
|
<Square className={`w-5 h-5 ${poolLoading ? 'animate-pulse' : ''}`} />
|
||||||
|
Stop Pool
|
||||||
|
</>
|
||||||
)}
|
)}
|
||||||
|
</button>
|
||||||
|
<span className="text-sm text-gray-400">Auto-refreshes every 15s</span>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
{error && (
|
{error && (
|
||||||
<div className="p-4 bg-red-50 text-red-700 rounded-lg">{error}</div>
|
<div className="p-4 bg-red-50 text-red-700 rounded-lg">{error}</div>
|
||||||
@@ -496,7 +495,7 @@ export default function TasksDashboard() {
|
|||||||
STATUS_COLORS[task.status]
|
STATUS_COLORS[task.status]
|
||||||
}`}
|
}`}
|
||||||
>
|
>
|
||||||
{STATUS_ICONS[task.status]}
|
{getStatusIcon(task.status, poolPaused)}
|
||||||
{task.status}
|
{task.status}
|
||||||
</span>
|
</span>
|
||||||
</td>
|
</td>
|
||||||
|
|||||||
Reference in New Issue
Block a user