From 6490df9faf0128eec0fd37b98832eadc1387b576 Mon Sep 17 00:00:00 2001 From: Kelly Date: Fri, 12 Dec 2025 01:15:21 -0700 Subject: [PATCH] feat(tasks): Consolidate schedule management into task_schedules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add schedule CRUD endpoints to /api/tasks/schedules - Add Schedules section to TasksDashboard with edit/delete/bulk actions - Deprecate job_schedules table (entries disabled in DB) - Mark CrawlSchedulePage as deprecated (removed from menu) - Add deprecation comments to legacy schedule methods in api.ts - Add migration comments to workers.ts explaining consolidation Key changes: - Schedule management now at /admin/tasks instead of /admin/schedule - task_schedules uses interval_hours (simpler than base_interval_minutes + jitter) - All schedule routes placed before /:id to avoid Express route conflicts 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/src/routes/tasks.ts | 378 +++++++++++++++++ backend/src/routes/workers.ts | 17 +- backend/src/tasks/task-worker.ts | 117 +++++- cannaiq/src/lib/api.ts | 107 ++++- cannaiq/src/pages/CrawlSchedulePage.tsx | 15 + cannaiq/src/pages/TasksDashboard.tsx | 512 +++++++++++++++++++++++- 6 files changed, 1124 insertions(+), 22 deletions(-) diff --git a/backend/src/routes/tasks.ts b/backend/src/routes/tasks.ts index 97946f72..bcf1bbe7 100644 --- a/backend/src/routes/tasks.ts +++ b/backend/src/routes/tasks.ts @@ -3,6 +3,24 @@ * * Endpoints for managing worker tasks, viewing capacity metrics, * and generating batch tasks. + * + * SCHEDULE MANAGEMENT (added 2025-12-12): + * This file now contains the canonical schedule management endpoints. + * The job_schedules table has been deprecated and all schedule management + * is now consolidated into task_schedules: + * + * Schedule endpoints: + * GET /api/tasks/schedules - List all schedules + * POST /api/tasks/schedules - Create new schedule + * GET /api/tasks/schedules/:id - Get schedule by ID + * PUT /api/tasks/schedules/:id - Update schedule + * DELETE /api/tasks/schedules/:id - Delete schedule + * DELETE /api/tasks/schedules - Bulk delete schedules + * POST /api/tasks/schedules/:id/run-now - Trigger schedule immediately + * POST /api/tasks/schedules/:id/toggle - Toggle schedule enabled/disabled + * + * Note: Schedule routes are defined BEFORE /:id to avoid route conflicts + * (Express matches routes in order, and "schedules" would match /:id otherwise) */ import { Router, Request, Response } from 'express'; @@ -131,6 +149,366 @@ router.get('/capacity/:role', async (req: Request, res: Response) => { } }); +// ============================================================ +// SCHEDULE MANAGEMENT ROUTES +// (Must be before /:id to avoid route conflicts) +// ============================================================ + +/** + * GET /api/tasks/schedules + * List all task schedules + */ +router.get('/schedules', async (req: Request, res: Response) => { + try { + const enabledOnly = req.query.enabled === 'true'; + + let query = ` + SELECT id, name, role, description, enabled, interval_hours, + priority, state_code, platform, last_run_at, next_run_at, + last_task_count, last_error, created_at, updated_at + FROM task_schedules + `; + + if (enabledOnly) { + query += ` WHERE enabled = true`; + } + + query += ` ORDER BY name`; + + const result = await pool.query(query); + res.json({ schedules: result.rows }); + } catch (error: unknown) { + console.error('Error listing schedules:', error); + res.status(500).json({ error: 'Failed to list schedules' }); + } +}); + +/** + * DELETE /api/tasks/schedules + * Bulk delete schedules + * + * Body: + * - ids: number[] (required) - array of schedule IDs to delete + * - all: boolean (optional) - if true, delete all schedules (ids ignored) + */ +router.delete('/schedules', async (req: Request, res: Response) => { + try { + const { ids, all } = req.body; + + let result; + + if (all === true) { + // Delete all schedules + result = await pool.query(` + DELETE FROM task_schedules RETURNING id, name + `); + } else if (Array.isArray(ids) && ids.length > 0) { + // Delete specific schedules by IDs + result = await pool.query(` + DELETE FROM task_schedules WHERE id = ANY($1) RETURNING id, name + `, [ids]); + } else { + return res.status(400).json({ + error: 'Either provide ids array or set all=true', + }); + } + + res.json({ + success: true, + deleted_count: result.rowCount, + deleted: result.rows, + message: `Deleted ${result.rowCount} schedule(s)`, + }); + } catch (error: unknown) { + console.error('Error bulk deleting schedules:', error); + res.status(500).json({ error: 'Failed to delete schedules' }); + } +}); + +/** + * POST /api/tasks/schedules + * Create a new schedule + * + * Body: + * - name: string (required, unique) + * - role: TaskRole (required) + * - description: string (optional) + * - enabled: boolean (default true) + * - interval_hours: number (required) + * - priority: number (default 0) + * - state_code: string (optional) + * - platform: string (optional) + */ +router.post('/schedules', async (req: Request, res: Response) => { + try { + const { + name, + role, + description, + enabled = true, + interval_hours, + priority = 0, + state_code, + platform, + } = req.body; + + if (!name || !role || !interval_hours) { + return res.status(400).json({ + error: 'name, role, and interval_hours are required', + }); + } + + // Calculate next_run_at based on interval + const nextRunAt = new Date(Date.now() + interval_hours * 60 * 60 * 1000); + + const result = await pool.query(` + INSERT INTO task_schedules + (name, role, description, enabled, interval_hours, priority, state_code, platform, next_run_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + RETURNING id, name, role, description, enabled, interval_hours, + priority, state_code, platform, last_run_at, next_run_at, + last_task_count, last_error, created_at, updated_at + `, [name, role, description, enabled, interval_hours, priority, state_code, platform, nextRunAt]); + + res.status(201).json(result.rows[0]); + } catch (error: any) { + if (error.code === '23505') { + // Unique constraint violation + return res.status(409).json({ error: 'A schedule with this name already exists' }); + } + console.error('Error creating schedule:', error); + res.status(500).json({ error: 'Failed to create schedule' }); + } +}); + +/** + * GET /api/tasks/schedules/:id + * Get a specific schedule by ID + */ +router.get('/schedules/:id', async (req: Request, res: Response) => { + try { + const scheduleId = parseInt(req.params.id, 10); + + const result = await pool.query(` + SELECT id, name, role, description, enabled, interval_hours, + priority, state_code, platform, last_run_at, next_run_at, + last_task_count, last_error, created_at, updated_at + FROM task_schedules + WHERE id = $1 + `, [scheduleId]); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Schedule not found' }); + } + + res.json(result.rows[0]); + } catch (error: unknown) { + console.error('Error getting schedule:', error); + res.status(500).json({ error: 'Failed to get schedule' }); + } +}); + +/** + * PUT /api/tasks/schedules/:id + * Update an existing schedule + */ +router.put('/schedules/:id', async (req: Request, res: Response) => { + try { + const scheduleId = parseInt(req.params.id, 10); + const { + name, + role, + description, + enabled, + interval_hours, + priority, + state_code, + platform, + } = req.body; + + // Build dynamic update query + const updates: string[] = []; + const values: any[] = []; + let paramIndex = 1; + + if (name !== undefined) { + updates.push(`name = $${paramIndex++}`); + values.push(name); + } + if (role !== undefined) { + updates.push(`role = $${paramIndex++}`); + values.push(role); + } + if (description !== undefined) { + updates.push(`description = $${paramIndex++}`); + values.push(description); + } + if (enabled !== undefined) { + updates.push(`enabled = $${paramIndex++}`); + values.push(enabled); + } + if (interval_hours !== undefined) { + updates.push(`interval_hours = $${paramIndex++}`); + values.push(interval_hours); + + // Recalculate next_run_at if interval changed + const nextRunAt = new Date(Date.now() + interval_hours * 60 * 60 * 1000); + updates.push(`next_run_at = $${paramIndex++}`); + values.push(nextRunAt); + } + if (priority !== undefined) { + updates.push(`priority = $${paramIndex++}`); + values.push(priority); + } + if (state_code !== undefined) { + updates.push(`state_code = $${paramIndex++}`); + values.push(state_code || null); + } + if (platform !== undefined) { + updates.push(`platform = $${paramIndex++}`); + values.push(platform || null); + } + + if (updates.length === 0) { + return res.status(400).json({ error: 'No fields to update' }); + } + + updates.push('updated_at = NOW()'); + values.push(scheduleId); + + const result = await pool.query(` + UPDATE task_schedules + SET ${updates.join(', ')} + WHERE id = $${paramIndex} + RETURNING id, name, role, description, enabled, interval_hours, + priority, state_code, platform, last_run_at, next_run_at, + last_task_count, last_error, created_at, updated_at + `, values); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Schedule not found' }); + } + + res.json(result.rows[0]); + } catch (error: any) { + if (error.code === '23505') { + return res.status(409).json({ error: 'A schedule with this name already exists' }); + } + console.error('Error updating schedule:', error); + res.status(500).json({ error: 'Failed to update schedule' }); + } +}); + +/** + * DELETE /api/tasks/schedules/:id + * Delete a schedule + */ +router.delete('/schedules/:id', async (req: Request, res: Response) => { + try { + const scheduleId = parseInt(req.params.id, 10); + + const result = await pool.query(` + DELETE FROM task_schedules WHERE id = $1 RETURNING id, name + `, [scheduleId]); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Schedule not found' }); + } + + res.json({ + success: true, + message: `Schedule "${result.rows[0].name}" deleted`, + }); + } catch (error: unknown) { + console.error('Error deleting schedule:', error); + res.status(500).json({ error: 'Failed to delete schedule' }); + } +}); + +/** + * POST /api/tasks/schedules/:id/run-now + * Manually trigger a scheduled task to run immediately + */ +router.post('/schedules/:id/run-now', async (req: Request, res: Response) => { + try { + const scheduleId = parseInt(req.params.id, 10); + + // Get the schedule + const scheduleResult = await pool.query(` + SELECT id, name, role, state_code, platform, priority + FROM task_schedules WHERE id = $1 + `, [scheduleId]); + + if (scheduleResult.rows.length === 0) { + return res.status(404).json({ error: 'Schedule not found' }); + } + + const schedule = scheduleResult.rows[0]; + + // Create a task based on the schedule + const task = await taskService.createTask({ + role: schedule.role, + platform: schedule.platform, + priority: schedule.priority + 10, // Boost priority for manual runs + }); + + // Update last_run_at on the schedule + await pool.query(` + UPDATE task_schedules + SET last_run_at = NOW(), + next_run_at = NOW() + (interval_hours || ' hours')::interval, + updated_at = NOW() + WHERE id = $1 + `, [scheduleId]); + + res.json({ + success: true, + message: `Schedule "${schedule.name}" triggered`, + task, + }); + } catch (error: unknown) { + console.error('Error running schedule:', error); + res.status(500).json({ error: 'Failed to run schedule' }); + } +}); + +/** + * POST /api/tasks/schedules/:id/toggle + * Toggle a schedule's enabled status + */ +router.post('/schedules/:id/toggle', async (req: Request, res: Response) => { + try { + const scheduleId = parseInt(req.params.id, 10); + + const result = await pool.query(` + UPDATE task_schedules + SET enabled = NOT enabled, + updated_at = NOW() + WHERE id = $1 + RETURNING id, name, enabled + `, [scheduleId]); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Schedule not found' }); + } + + res.json({ + success: true, + schedule: result.rows[0], + message: result.rows[0].enabled + ? `Schedule "${result.rows[0].name}" enabled` + : `Schedule "${result.rows[0].name}" disabled`, + }); + } catch (error: unknown) { + console.error('Error toggling schedule:', error); + res.status(500).json({ error: 'Failed to toggle schedule' }); + } +}); + +// ============================================================ +// TASK-SPECIFIC ROUTES (with :id parameter) +// ============================================================ + /** * GET /api/tasks/:id * Get a specific task by ID diff --git a/backend/src/routes/workers.ts b/backend/src/routes/workers.ts index d4e072c5..d6914650 100644 --- a/backend/src/routes/workers.ts +++ b/backend/src/routes/workers.ts @@ -4,10 +4,25 @@ * Provider-agnostic worker management and job monitoring. * Replaces legacy /api/dutchie-az/admin/schedules and /api/dutchie-az/monitor/* routes. * + * DEPRECATION NOTE (2025-12-12): + * This file still queries job_schedules for backwards compatibility with + * the /api/workers endpoints that display worker status. However, the + * job_schedules table is DEPRECATED - all entries have been disabled. + * + * Schedule management has been consolidated into task_schedules: + * - Use /api/tasks/schedules for schedule CRUD operations + * - Use TasksDashboard.tsx (/admin/tasks) for schedule management UI + * - task_schedules uses interval_hours (simpler than base_interval_minutes + jitter) + * + * The /api/workers endpoints remain useful for: + * - Monitoring active workers and job status + * - K8s scaling controls + * - Job history and logs + * * Endpoints: * GET /api/workers - List all workers/schedules * GET /api/workers/active - List currently active workers - * GET /api/workers/schedule - Get all job schedules + * GET /api/workers/schedule - Get all job schedules (DEPRECATED - use /api/tasks/schedules) * GET /api/workers/:workerName - Get specific worker details * GET /api/workers/:workerName/scope - Get worker's scope (states, etc.) * GET /api/workers/:workerName/stats - Get worker statistics diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index c80302de..7cb99bd3 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -11,10 +11,17 @@ * - Workers report heartbeats to worker_registry * - Workers are ROLE-AGNOSTIC by default (can handle any task type) * - * Stealth & Anti-Detection: - * PROXIES ARE REQUIRED - workers will fail to start if no proxies available. + * Stealth & Anti-Detection (LAZY INITIALIZATION): + * Workers start IMMEDIATELY without waiting for proxies. + * Stealth systems (proxies, fingerprints, preflights) are initialized + * on first task claim, not at worker startup. * - * On startup, workers initialize the CrawlRotator which provides: + * This allows workers to: + * - Register and send heartbeats immediately + * - Wait in main loop without blocking on proxy availability + * - Initialize proxies/preflights only when tasks are actually available + * + * On first task claim attempt, workers initialize the CrawlRotator which provides: * - Proxy rotation: Loads proxies from `proxies` table, ALL requests use proxy * - User-Agent rotation: Cycles through realistic browser fingerprints * - Fingerprint rotation: Changes browser profile on blocks @@ -34,11 +41,16 @@ * * Environment: * WORKER_ROLE - Which task role to process (optional, null = any task) - * WORKER_ID - Optional custom worker ID (auto-generated if not provided) - * POD_NAME - Kubernetes pod name (optional) + * POD_NAME - K8s StatefulSet pod name (PRIMARY - use this for persistent identity) + * WORKER_ID - Custom worker ID (fallback if POD_NAME not set) * POLL_INTERVAL_MS - How often to check for tasks (default: 5000) * HEARTBEAT_INTERVAL_MS - How often to update heartbeat (default: 30000) * API_BASE_URL - Backend API URL for registration (default: http://localhost:3010) + * + * Worker Identity: + * Workers use POD_NAME as their worker_id for persistent identity across restarts. + * In K8s StatefulSet, POD_NAME = "scraper-worker-0" through "scraper-worker-7". + * This ensures workers re-register with the same ID instead of creating new entries. */ import { Pool } from 'pg'; @@ -209,6 +221,16 @@ export class TaskWorker { private preflightCurlResult: CurlPreflightResult | null = null; private preflightHttpResult: PuppeteerPreflightResult | null = null; + // ========================================================================== + // LAZY INITIALIZATION FLAGS + // ========================================================================== + // Stealth/proxy initialization is deferred until first task claim. + // Workers register immediately and enter main loop without blocking. + // ========================================================================== + private stealthInitialized: boolean = false; + private preflightsCompleted: boolean = false; + private initializingPromise: Promise | null = null; + constructor(role: TaskRole | null = null, workerId?: string) { this.pool = getPool(); this.role = role; @@ -293,9 +315,9 @@ export class TaskWorker { /** * Initialize stealth systems (proxy rotation, fingerprints) - * Called once on worker startup before processing any tasks. + * Called LAZILY on first task claim attempt (NOT at worker startup). * - * IMPORTANT: Proxies are REQUIRED. Workers will wait until proxies are available. + * IMPORTANT: Proxies are REQUIRED to claim tasks. This method waits until proxies are available. * Workers listen for PostgreSQL NOTIFY 'proxy_added' to wake up immediately when proxies are added. */ private async initializeStealth(): Promise { @@ -482,6 +504,51 @@ export class TaskWorker { } } + /** + * Lazy initialization of stealth systems. + * Called BEFORE claiming first task (not at worker startup). + * This allows workers to register and enter main loop immediately. + * + * Returns true if initialization succeeded, false otherwise. + */ + private async ensureStealthInitialized(): Promise { + // Already initialized + if (this.stealthInitialized && this.preflightsCompleted) { + return true; + } + + // Already initializing (prevent concurrent init attempts) + if (this.initializingPromise) { + await this.initializingPromise; + return this.stealthInitialized && this.preflightsCompleted; + } + + console.log(`[TaskWorker] ${this.friendlyName} lazy-initializing stealth systems (first task claim)...`); + + this.initializingPromise = (async () => { + try { + // Initialize proxy/fingerprint rotation + await this.initializeStealth(); + this.stealthInitialized = true; + + // Run dual-transport preflights + await this.runDualPreflights(); + this.preflightsCompleted = true; + + const preflightMsg = `curl=${this.preflightCurlPassed ? '✓' : '✗'} http=${this.preflightHttpPassed ? '✓' : '✗'}`; + console.log(`[TaskWorker] ${this.friendlyName} stealth ready (${preflightMsg})`); + } catch (err: any) { + console.error(`[TaskWorker] ${this.friendlyName} stealth init failed: ${err.message}`); + this.stealthInitialized = false; + this.preflightsCompleted = false; + } + })(); + + await this.initializingPromise; + this.initializingPromise = null; + return this.stealthInitialized && this.preflightsCompleted; + } + /** * Register worker with the registry (get friendly name) */ @@ -615,25 +682,22 @@ export class TaskWorker { /** * Start the worker loop + * + * Workers start IMMEDIATELY without blocking on proxy/preflight init. + * Stealth systems are lazy-initialized on first task claim. + * This allows workers to register and send heartbeats even when proxies aren't ready. */ async start(): Promise { this.isRunning = true; - // Initialize stealth systems (proxy rotation, fingerprints) - await this.initializeStealth(); - - // Register with the API to get a friendly name + // Register with the API to get a friendly name (non-blocking) await this.register(); - // Run dual-transport preflights - await this.runDualPreflights(); - - // Start registry heartbeat + // Start registry heartbeat immediately this.startRegistryHeartbeat(); const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)'; - const preflightMsg = `curl=${this.preflightCurlPassed ? '✓' : '✗'} http=${this.preflightHttpPassed ? '✓' : '✗'}`; - console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (${preflightMsg}, max ${this.maxConcurrentTasks} concurrent tasks)`); + console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (stealth=lazy, max ${this.maxConcurrentTasks} concurrent tasks)`); while (this.isRunning) { try { @@ -687,6 +751,20 @@ export class TaskWorker { // Try to claim more tasks if we have capacity if (this.canAcceptMoreTasks()) { + // ================================================================= + // LAZY INITIALIZATION - Initialize stealth on first task claim + // Workers start immediately and init proxies only when needed + // ================================================================= + if (!this.stealthInitialized) { + const initSuccess = await this.ensureStealthInitialized(); + if (!initSuccess) { + // Init failed - wait and retry next loop + console.log(`[TaskWorker] ${this.friendlyName} stealth init failed, waiting before retry...`); + await this.sleep(30000); + return; + } + } + // Pass preflight capabilities to only claim compatible tasks const task = await taskService.claimTask( this.role, @@ -922,7 +1000,10 @@ async function main(): Promise { process.exit(1); } - const workerId = process.env.WORKER_ID; + // Use POD_NAME for persistent identity in K8s StatefulSet + // This ensures workers keep the same ID across restarts + // Falls back to WORKER_ID, then generates UUID if neither is set + const workerId = process.env.POD_NAME || process.env.WORKER_ID; // Pass null for role-agnostic, or the specific role const worker = new TaskWorker(role || null, workerId); diff --git a/cannaiq/src/lib/api.ts b/cannaiq/src/lib/api.ts index 43ab3fd7..5ed66c28 100755 --- a/cannaiq/src/lib/api.ts +++ b/cannaiq/src/lib/api.ts @@ -2666,13 +2666,25 @@ class ApiClient { // Dashboard methods getMarketDashboard = this.getMarketsDashboard.bind(this); - // Schedule methods (no conflicts) + // ============================================================ + // LEGACY SCHEDULE METHODS (DEPRECATED 2025-12-12) + // These use /api/markets/admin/schedules which queries job_schedules + // Use getTaskSchedules(), updateTaskSchedule(), etc. instead + // (defined below, use /api/tasks/schedules which queries task_schedules) + // ============================================================ + /** @deprecated Use getTaskSchedules() - queries task_schedules table */ getSchedules = this.getCrawlSchedules.bind(this); + /** @deprecated Use getTaskSchedule() - queries task_schedules table */ getSchedule = this.getDutchieAZSchedule.bind(this); + /** @deprecated Use createTaskSchedule() - queries task_schedules table */ createSchedule = this.createDutchieAZSchedule.bind(this); + /** @deprecated Use updateTaskSchedule() - queries task_schedules table */ updateSchedule = this.updateDutchieAZSchedule.bind(this); + /** @deprecated Use deleteTaskSchedule() - queries task_schedules table */ deleteSchedule = this.deleteDutchieAZSchedule.bind(this); + /** @deprecated Use runTaskScheduleNow() - queries task_schedules table */ triggerSchedule = this.triggerDutchieAZSchedule.bind(this); + /** @deprecated - job_schedules init not needed for task_schedules */ initSchedules = this.initDutchieAZSchedules.bind(this); getScheduleLogs = this.getCrawlScheduleLogs.bind(this); getRunLogs = this.getDutchieAZRunLogs.bind(this); @@ -2976,6 +2988,99 @@ class ApiClient { { method: 'POST', body: JSON.stringify({ replicas }) } ); } + + // ========================================== + // Task Schedules API (recurring task definitions) + // ========================================== + + async getTaskSchedules(enabledOnly?: boolean) { + const qs = enabledOnly ? '?enabled=true' : ''; + return this.request<{ schedules: TaskSchedule[] }>(`/api/tasks/schedules${qs}`); + } + + async getTaskSchedule(id: number) { + return this.request(`/api/tasks/schedules/${id}`); + } + + async createTaskSchedule(data: { + name: string; + role: string; + description?: string; + enabled?: boolean; + interval_hours: number; + priority?: number; + state_code?: string; + platform?: string; + }) { + return this.request('/api/tasks/schedules', { + method: 'POST', + body: JSON.stringify(data), + }); + } + + async updateTaskSchedule(id: number, data: Partial<{ + name: string; + role: string; + description: string; + enabled: boolean; + interval_hours: number; + priority: number; + state_code: string; + platform: string; + }>) { + return this.request(`/api/tasks/schedules/${id}`, { + method: 'PUT', + body: JSON.stringify(data), + }); + } + + async deleteTaskSchedule(id: number) { + return this.request<{ success: boolean; message: string }>(`/api/tasks/schedules/${id}`, { + method: 'DELETE', + }); + } + + async deleteTaskSchedulesBulk(ids?: number[], all?: boolean) { + return this.request<{ success: boolean; deleted_count: number; deleted: { id: number; name: string }[]; message: string }>( + '/api/tasks/schedules', + { + method: 'DELETE', + body: JSON.stringify({ ids, all }), + } + ); + } + + async runTaskScheduleNow(id: number) { + return this.request<{ success: boolean; message: string; task: any }>(`/api/tasks/schedules/${id}/run-now`, { + method: 'POST', + }); + } + + async toggleTaskSchedule(id: number) { + return this.request<{ success: boolean; schedule: { id: number; name: string; enabled: boolean }; message: string }>( + `/api/tasks/schedules/${id}/toggle`, + { method: 'POST' } + ); + } +} + +// Type for task schedules +export interface TaskSchedule { + id: number; + name: string; + role: string; + description: string | null; + enabled: boolean; + interval_hours: number; + priority: number; + state_code: string | null; + platform: string | null; + last_run_at: string | null; + next_run_at: string | null; + last_task_count: number; + last_error: string | null; + created_at: string; + updated_at: string; } export const api = new ApiClient(API_URL); diff --git a/cannaiq/src/pages/CrawlSchedulePage.tsx b/cannaiq/src/pages/CrawlSchedulePage.tsx index cdf1a0f4..461d5827 100644 --- a/cannaiq/src/pages/CrawlSchedulePage.tsx +++ b/cannaiq/src/pages/CrawlSchedulePage.tsx @@ -1,3 +1,18 @@ +/** + * @deprecated 2025-12-12 + * + * This page used the legacy job_schedules table which has been deprecated. + * All schedule management has been consolidated into task_schedules and + * is now managed via the /admin/tasks page (TasksDashboard.tsx). + * + * The job_schedules table entries have been disabled and marked deprecated. + * This page is no longer in the navigation menu but kept for reference. + * + * Migration details: + * - job_schedules used base_interval_minutes + jitter_minutes + * - task_schedules uses interval_hours (simpler model) + * - All CRUD operations now via /api/tasks/schedules endpoints + */ import { useEffect, useState } from 'react'; import { Layout } from '../components/Layout'; import { api } from '../lib/api'; diff --git a/cannaiq/src/pages/TasksDashboard.tsx b/cannaiq/src/pages/TasksDashboard.tsx index 1600cc3e..59022023 100644 --- a/cannaiq/src/pages/TasksDashboard.tsx +++ b/cannaiq/src/pages/TasksDashboard.tsx @@ -1,5 +1,5 @@ import { useState, useEffect } from 'react'; -import { api } from '../lib/api'; +import { api, TaskSchedule } from '../lib/api'; import { Layout } from '../components/Layout'; import { ListChecks, @@ -21,6 +21,10 @@ import { X, Calendar, Trash2, + Edit2, + Play, + Pause, + Timer, } from 'lucide-react'; interface Task { @@ -383,8 +387,232 @@ const ROLES = [ 'product_discovery', 'product_refresh', 'analytics_refresh', + 'payload_fetch', ]; +// ============================================================ +// Schedule Edit Modal +// ============================================================ + +interface ScheduleEditModalProps { + isOpen: boolean; + schedule: TaskSchedule | null; + onClose: () => void; + onSave: () => void; +} + +function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditModalProps) { + const [name, setName] = useState(''); + const [role, setRole] = useState('product_refresh'); + const [description, setDescription] = useState(''); + const [enabled, setEnabled] = useState(true); + const [intervalHours, setIntervalHours] = useState(4); + const [priority, setPriority] = useState(0); + const [stateCode, setStateCode] = useState(''); + const [platform, setPlatform] = useState('dutchie'); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + + const isNew = !schedule; + + useEffect(() => { + if (schedule) { + setName(schedule.name); + setRole(schedule.role); + setDescription(schedule.description || ''); + setEnabled(schedule.enabled); + setIntervalHours(schedule.interval_hours); + setPriority(schedule.priority); + setStateCode(schedule.state_code || ''); + setPlatform(schedule.platform || 'dutchie'); + } else { + // Reset for new schedule + setName(''); + setRole('product_refresh'); + setDescription(''); + setEnabled(true); + setIntervalHours(4); + setPriority(0); + setStateCode(''); + setPlatform('dutchie'); + } + setError(null); + }, [schedule, isOpen]); + + const handleSubmit = async () => { + if (!name.trim()) { + setError('Name is required'); + return; + } + + setLoading(true); + setError(null); + + try { + const data = { + name: name.trim(), + role, + description: description.trim() || undefined, + enabled, + interval_hours: intervalHours, + priority, + state_code: stateCode.trim() || undefined, + platform: platform.trim() || undefined, + }; + + if (isNew) { + await api.createTaskSchedule(data as any); + } else { + await api.updateTaskSchedule(schedule!.id, data); + } + + onSave(); + onClose(); + } catch (err: any) { + setError(err.response?.data?.error || err.message || 'Failed to save schedule'); + } finally { + setLoading(false); + } + }; + + if (!isOpen) return null; + + return ( +
+
+
+
+
+

+ {isNew ? 'Create Schedule' : 'Edit Schedule'} +

+ +
+ +
+ {error && ( +
+ {error} +
+ )} + +
+ + setName(e.target.value)} + placeholder="e.g., product_refresh_all" + className="w-full px-3 py-2 border border-gray-200 rounded-lg" + /> +
+ +
+ + +
+ +
+ + setDescription(e.target.value)} + placeholder="Optional description" + className="w-full px-3 py-2 border border-gray-200 rounded-lg" + /> +
+ +
+
+ + setIntervalHours(parseInt(e.target.value) || 4)} + className="w-full px-3 py-2 border border-gray-200 rounded-lg" + /> +
+
+ + setPriority(parseInt(e.target.value) || 0)} + className="w-full px-3 py-2 border border-gray-200 rounded-lg" + /> +
+
+ +
+
+ + setStateCode(e.target.value.toUpperCase())} + placeholder="e.g., AZ" + maxLength={2} + className="w-full px-3 py-2 border border-gray-200 rounded-lg" + /> +
+
+ + setPlatform(e.target.value)} + placeholder="e.g., dutchie" + className="w-full px-3 py-2 border border-gray-200 rounded-lg" + /> +
+
+ +
+ setEnabled(e.target.checked)} + className="w-4 h-4 text-emerald-600 rounded" + /> + +
+
+ +
+ + +
+
+
+
+ ); +} + const STATUS_COLORS: Record = { pending: 'bg-yellow-100 text-yellow-800', claimed: 'bg-blue-100 text-blue-800', @@ -452,6 +680,13 @@ export default function TasksDashboard() { const [poolPaused, setPoolPaused] = useState(false); const [showCreateModal, setShowCreateModal] = useState(false); + // Schedules state + const [schedules, setSchedules] = useState([]); + const [showSchedules, setShowSchedules] = useState(true); + const [selectedSchedules, setSelectedSchedules] = useState>(new Set()); + const [editingSchedule, setEditingSchedule] = useState(null); + const [showScheduleModal, setShowScheduleModal] = useState(false); + // Pagination const [page, setPage] = useState(0); const tasksPerPage = 25; @@ -465,7 +700,7 @@ export default function TasksDashboard() { const fetchData = async () => { try { - const [tasksRes, countsRes, capacityRes, poolStatus] = await Promise.all([ + const [tasksRes, countsRes, capacityRes, poolStatus, schedulesRes] = await Promise.all([ api.getTasks({ role: roleFilter || undefined, status: statusFilter || undefined, @@ -474,12 +709,14 @@ export default function TasksDashboard() { api.getTaskCounts(), api.getTaskCapacity(), api.getTaskPoolStatus(), + api.getTaskSchedules(), ]); setTasks(tasksRes.tasks || []); setCounts(countsRes); setCapacity(capacityRes.metrics || []); setPoolPaused(poolStatus.paused); + setSchedules(schedulesRes.schedules || []); setError(null); } catch (err: any) { setError(err.message || 'Failed to load tasks'); @@ -488,6 +725,76 @@ export default function TasksDashboard() { } }; + const handleDeleteSchedule = async (scheduleId: number) => { + if (!confirm('Delete this schedule?')) return; + try { + await api.deleteTaskSchedule(scheduleId); + setSelectedSchedules(prev => { + const next = new Set(prev); + next.delete(scheduleId); + return next; + }); + fetchData(); + } catch (err: any) { + console.error('Delete schedule error:', err); + alert(err.response?.data?.error || 'Failed to delete schedule'); + } + }; + + const handleBulkDeleteSchedules = async () => { + if (selectedSchedules.size === 0) return; + if (!confirm(`Delete ${selectedSchedules.size} selected schedule(s)?`)) return; + try { + await api.deleteTaskSchedulesBulk(Array.from(selectedSchedules)); + setSelectedSchedules(new Set()); + fetchData(); + } catch (err: any) { + console.error('Bulk delete error:', err); + alert(err.response?.data?.error || 'Failed to delete schedules'); + } + }; + + const handleToggleSchedule = async (scheduleId: number) => { + try { + await api.toggleTaskSchedule(scheduleId); + fetchData(); + } catch (err: any) { + console.error('Toggle schedule error:', err); + alert(err.response?.data?.error || 'Failed to toggle schedule'); + } + }; + + const handleRunScheduleNow = async (scheduleId: number) => { + try { + const result = await api.runTaskScheduleNow(scheduleId); + alert(result.message); + fetchData(); + } catch (err: any) { + console.error('Run schedule error:', err); + alert(err.response?.data?.error || 'Failed to run schedule'); + } + }; + + const toggleSelectSchedule = (id: number) => { + setSelectedSchedules(prev => { + const next = new Set(prev); + if (next.has(id)) { + next.delete(id); + } else { + next.add(id); + } + return next; + }); + }; + + const toggleSelectAllSchedules = () => { + if (selectedSchedules.size === schedules.length) { + setSelectedSchedules(new Set()); + } else { + setSelectedSchedules(new Set(schedules.map(s => s.id))); + } + }; + const handleDeleteTask = async (taskId: number) => { if (!confirm('Delete this task?')) return; try { @@ -583,6 +890,17 @@ export default function TasksDashboard() { onTaskCreated={fetchData} /> + {/* Schedule Edit Modal */} + { + setShowScheduleModal(false); + setEditingSchedule(null); + }} + onSave={fetchData} + /> + {/* Status Summary Cards */}
{Object.entries(counts || {}).map(([status, count]) => ( @@ -714,6 +1032,196 @@ export default function TasksDashboard() { )}
+ {/* Schedules Section */} +
+ + + {showSchedules && ( +
+ {/* Schedule Actions */} +
+
+ + {selectedSchedules.size > 0 && ( + + )} +
+ + {schedules.filter(s => s.enabled).length} enabled + +
+ + {schedules.length === 0 ? ( +
+ No schedules configured. Click "New Schedule" to create one. +
+ ) : ( +
+ + + + + + + + + + + + + + + {schedules.map((schedule) => ( + + + + + + + + + + + ))} + +
+ 0} + onChange={toggleSelectAllSchedules} + className="w-4 h-4 text-emerald-600 rounded" + /> + + Name + + Role + + Interval + + Last Run + + Next Run + + Status + + Actions +
+ toggleSelectSchedule(schedule.id)} + className="w-4 h-4 text-emerald-600 rounded" + /> + +
{schedule.name}
+ {schedule.description && ( +
{schedule.description}
+ )} +
+ {schedule.role.replace(/_/g, ' ')} + + Every {schedule.interval_hours}h + + {schedule.last_run_at ? formatTimeAgo(schedule.last_run_at) : '-'} + + {schedule.next_run_at ? formatTimeAgo(schedule.next_run_at) : '-'} + + + {schedule.enabled ? ( + <> + + Active + + ) : ( + <> + + Paused + + )} + + +
+ + + + +
+
+
+ )} +
+ )} +
+ {/* Filters */}