diff --git a/backend/src/index.ts b/backend/src/index.ts index 931374ab..178c3a65 100755 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -146,6 +146,7 @@ import tasksRoutes from './routes/tasks'; import workerRegistryRoutes from './routes/worker-registry'; // Per TASK_WORKFLOW_2024-12-10.md: Raw payload access API import payloadsRoutes from './routes/payloads'; +import k8sRoutes from './routes/k8s'; // Mark requests from trusted domains (cannaiq.co, findagram.co, findadispo.com) // These domains can access the API without authentication @@ -230,6 +231,10 @@ console.log('[WorkerRegistry] Routes registered at /api/worker-registry'); app.use('/api/payloads', payloadsRoutes); console.log('[Payloads] Routes registered at /api/payloads'); +// K8s control routes - worker scaling from admin UI +app.use('/api/k8s', k8sRoutes); +console.log('[K8s] Routes registered at /api/k8s'); + // Phase 3: Analytics V2 - Enhanced analytics with rec/med state segmentation try { const analyticsV2Router = createAnalyticsV2Router(getPool()); diff --git a/backend/src/routes/k8s.ts b/backend/src/routes/k8s.ts new file mode 100644 index 00000000..db6348dd --- /dev/null +++ b/backend/src/routes/k8s.ts @@ -0,0 +1,144 @@ +/** + * Kubernetes Control Routes + * + * Provides admin UI control over k8s resources like worker scaling. + * Uses in-cluster config when running in k8s, or kubeconfig locally. + */ + +import { Router, Request, Response } from 'express'; +import * as k8s from '@kubernetes/client-node'; + +const router = Router(); + +// K8s client setup - lazy initialization +let appsApi: k8s.AppsV1Api | null = null; +let k8sError: string | null = null; + +function getK8sClient(): k8s.AppsV1Api | null { + if (appsApi) return appsApi; + if (k8sError) return null; + + try { + const kc = new k8s.KubeConfig(); + + // Try in-cluster config first (when running in k8s) + try { + kc.loadFromCluster(); + console.log('[K8s] Loaded in-cluster config'); + } catch { + // Fall back to default kubeconfig (local dev) + try { + kc.loadFromDefault(); + console.log('[K8s] Loaded default kubeconfig'); + } catch (e) { + k8sError = 'No k8s config available'; + console.log('[K8s] No config available - k8s routes disabled'); + return null; + } + } + + appsApi = kc.makeApiClient(k8s.AppsV1Api); + return appsApi; + } catch (e: any) { + k8sError = e.message; + console.error('[K8s] Failed to initialize client:', e.message); + return null; + } +} + +const NAMESPACE = process.env.K8S_NAMESPACE || 'dispensary-scraper'; +const WORKER_DEPLOYMENT = 'scraper-worker'; + +/** + * GET /api/k8s/workers + * Get current worker deployment status + */ +router.get('/workers', async (_req: Request, res: Response) => { + const client = getK8sClient(); + + if (!client) { + return res.json({ + success: true, + available: false, + error: k8sError || 'K8s not available', + replicas: 0, + readyReplicas: 0, + }); + } + + try { + const deployment = await client.readNamespacedDeployment({ + name: WORKER_DEPLOYMENT, + namespace: NAMESPACE, + }); + + res.json({ + success: true, + available: true, + replicas: deployment.spec?.replicas || 0, + readyReplicas: deployment.status?.readyReplicas || 0, + availableReplicas: deployment.status?.availableReplicas || 0, + updatedReplicas: deployment.status?.updatedReplicas || 0, + }); + } catch (e: any) { + console.error('[K8s] Error getting deployment:', e.message); + res.status(500).json({ + success: false, + error: e.message, + }); + } +}); + +/** + * POST /api/k8s/workers/scale + * Scale worker deployment + * Body: { replicas: number } + */ +router.post('/workers/scale', async (req: Request, res: Response) => { + const client = getK8sClient(); + + if (!client) { + return res.status(503).json({ + success: false, + error: k8sError || 'K8s not available', + }); + } + + const { replicas } = req.body; + + if (typeof replicas !== 'number' || replicas < 0 || replicas > 50) { + return res.status(400).json({ + success: false, + error: 'replicas must be a number between 0 and 50', + }); + } + + try { + // Patch the deployment to set replicas + await client.patchNamespacedDeploymentScale({ + name: WORKER_DEPLOYMENT, + namespace: NAMESPACE, + body: { + spec: { replicas }, + }, + }, undefined, undefined, undefined, undefined, undefined, { + headers: { 'Content-Type': 'application/strategic-merge-patch+json' }, + }); + + console.log(`[K8s] Scaled ${WORKER_DEPLOYMENT} to ${replicas} replicas`); + + res.json({ + success: true, + replicas, + message: `Scaled to ${replicas} workers`, + }); + } catch (e: any) { + console.error('[K8s] Error scaling deployment:', e.message); + res.status(500).json({ + success: false, + error: e.message, + }); + } +}); + +export default router; diff --git a/cannaiq/src/lib/api.ts b/cannaiq/src/lib/api.ts index ee9f1107..bacf9f56 100755 --- a/cannaiq/src/lib/api.ts +++ b/cannaiq/src/lib/api.ts @@ -2950,6 +2950,25 @@ class ApiClient { { method: 'POST' } ); } + + // K8s Worker Control + async getK8sWorkers() { + return this.request<{ + success: boolean; + available: boolean; + replicas: number; + readyReplicas: number; + availableReplicas?: number; + error?: string; + }>('/api/k8s/workers'); + } + + async scaleK8sWorkers(replicas: number) { + return this.request<{ success: boolean; replicas: number; message?: string; error?: string }>( + '/api/k8s/workers/scale', + { method: 'POST', body: JSON.stringify({ replicas }) } + ); + } } export const api = new ApiClient(API_URL); diff --git a/cannaiq/src/pages/TasksDashboard.tsx b/cannaiq/src/pages/TasksDashboard.tsx index 7fb7fce6..8ed8f134 100644 --- a/cannaiq/src/pages/TasksDashboard.tsx +++ b/cannaiq/src/pages/TasksDashboard.tsx @@ -14,9 +14,11 @@ import { ChevronUp, Gauge, Users, - Power, Play, Square, + Plus, + Minus, + Server, } from 'lucide-react'; interface Task { @@ -141,6 +143,12 @@ export default function TasksDashboard() { const [poolPaused, setPoolPaused] = useState(false); const [poolLoading, setPoolLoading] = useState(false); + // K8s worker state + const [k8sAvailable, setK8sAvailable] = useState(false); + const [workerReplicas, setWorkerReplicas] = useState(0); + const [workerReady, setWorkerReady] = useState(0); + const [scalingWorkers, setScalingWorkers] = useState(false); + // Filters const [roleFilter, setRoleFilter] = useState(''); const [statusFilter, setStatusFilter] = useState(''); @@ -150,7 +158,7 @@ export default function TasksDashboard() { const fetchData = async () => { try { - const [tasksRes, countsRes, capacityRes, poolStatus] = await Promise.all([ + const [tasksRes, countsRes, capacityRes, poolStatus, k8sRes] = await Promise.all([ api.getTasks({ role: roleFilter || undefined, status: statusFilter || undefined, @@ -159,12 +167,16 @@ export default function TasksDashboard() { api.getTaskCounts(), api.getTaskCapacity(), api.getTaskPoolStatus(), + api.getK8sWorkers(), ]); setTasks(tasksRes.tasks || []); setCounts(countsRes); setCapacity(capacityRes.metrics || []); setPoolPaused(poolStatus.paused); + setK8sAvailable(k8sRes.available); + setWorkerReplicas(k8sRes.replicas); + setWorkerReady(k8sRes.readyReplicas); setError(null); } catch (err: any) { setError(err.message || 'Failed to load tasks'); @@ -173,6 +185,25 @@ export default function TasksDashboard() { } }; + const scaleWorkers = async (delta: number) => { + const newReplicas = Math.max(0, Math.min(50, workerReplicas + delta)); + if (newReplicas === workerReplicas) return; + + setScalingWorkers(true); + try { + const res = await api.scaleK8sWorkers(newReplicas); + if (res.success) { + setWorkerReplicas(res.replicas); + } else { + setError(res.error || 'Failed to scale workers'); + } + } catch (err: any) { + setError(err.message || 'Failed to scale workers'); + } finally { + setScalingWorkers(false); + } + }; + const togglePool = async () => { setPoolLoading(true); try { @@ -238,6 +269,32 @@ export default function TasksDashboard() {
+ {/* Worker Scaling */} + {k8sAvailable && ( +
+ + + + {workerReady}/{workerReplicas} + + +
+ )} + {/* Pool Toggle */}