From 6eb1babc86ab8df446ea9517de76857e7d924d3d Mon Sep 17 00:00:00 2001 From: Kelly Date: Wed, 10 Dec 2025 01:42:00 -0700 Subject: [PATCH 1/4] feat: Auto-migrations on startup, worker exit location, proxy improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add auto-migration system that runs SQL files from migrations/ on server startup - Track applied migrations in schema_migrations table - Show proxy exit location in Workers dashboard - Add "Cleanup Stale" button to remove old workers - Add remove button for individual workers - Include proxy location (city, state, country) in worker heartbeats - Update Proxy interface with location fields - Re-enable bulk proxy import without ON CONFLICT 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/src/db/auto-migrate.ts | 141 +++++++++++++++++++ backend/src/index.ts | 13 ++ backend/src/routes/proxies.ts | 50 +++++-- backend/src/services/crawl-rotator.ts | 44 +++++- backend/src/services/proxy.ts | 25 +--- backend/src/services/proxyTestQueue.ts | 108 ++++++++++----- backend/src/tasks/task-worker.ts | 4 +- cannaiq/src/pages/Proxies.tsx | 179 ++++++++++++++++++++++++- cannaiq/src/pages/WorkersDashboard.tsx | 119 ++++++++++++++-- 9 files changed, 598 insertions(+), 85 deletions(-) create mode 100644 backend/src/db/auto-migrate.ts diff --git a/backend/src/db/auto-migrate.ts b/backend/src/db/auto-migrate.ts new file mode 100644 index 00000000..04712671 --- /dev/null +++ b/backend/src/db/auto-migrate.ts @@ -0,0 +1,141 @@ +/** + * Auto-Migration System + * + * Runs SQL migration files from the migrations/ folder automatically on server startup. + * Uses a schema_migrations table to track which migrations have been applied. + * + * Safe to run multiple times - only applies new migrations. + */ + +import { Pool } from 'pg'; +import fs from 'fs'; +import path from 'path'; + +const MIGRATIONS_DIR = path.join(__dirname, '../../migrations'); + +/** + * Ensure schema_migrations table exists + */ +async function ensureMigrationsTable(pool: Pool): Promise { + await pool.query(` + CREATE TABLE IF NOT EXISTS schema_migrations ( + id SERIAL PRIMARY KEY, + name VARCHAR(255) UNIQUE NOT NULL, + applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() + ) + `); +} + +/** + * Get list of already-applied migrations + */ +async function getAppliedMigrations(pool: Pool): Promise> { + const result = await pool.query('SELECT name FROM schema_migrations'); + return new Set(result.rows.map(row => row.name)); +} + +/** + * Get list of migration files from disk + */ +function getMigrationFiles(): string[] { + if (!fs.existsSync(MIGRATIONS_DIR)) { + console.log('[AutoMigrate] No migrations directory found'); + return []; + } + + return fs.readdirSync(MIGRATIONS_DIR) + .filter(f => f.endsWith('.sql')) + .sort(); // Sort alphabetically (001_, 002_, etc.) +} + +/** + * Run a single migration file + */ +async function runMigration(pool: Pool, filename: string): Promise { + const filepath = path.join(MIGRATIONS_DIR, filename); + const sql = fs.readFileSync(filepath, 'utf8'); + + const client = await pool.connect(); + try { + await client.query('BEGIN'); + + // Run the migration SQL + await client.query(sql); + + // Record that this migration was applied + await client.query( + 'INSERT INTO schema_migrations (name) VALUES ($1) ON CONFLICT (name) DO NOTHING', + [filename] + ); + + await client.query('COMMIT'); + console.log(`[AutoMigrate] ✓ Applied: ${filename}`); + } catch (error: any) { + await client.query('ROLLBACK'); + console.error(`[AutoMigrate] ✗ Failed: ${filename}`); + throw error; + } finally { + client.release(); + } +} + +/** + * Run all pending migrations + * + * @param pool - Database connection pool + * @returns Number of migrations applied + */ +export async function runAutoMigrations(pool: Pool): Promise { + console.log('[AutoMigrate] Checking for pending migrations...'); + + try { + // Ensure migrations table exists + await ensureMigrationsTable(pool); + + // Get applied and available migrations + const applied = await getAppliedMigrations(pool); + const available = getMigrationFiles(); + + // Find pending migrations + const pending = available.filter(f => !applied.has(f)); + + if (pending.length === 0) { + console.log('[AutoMigrate] No pending migrations'); + return 0; + } + + console.log(`[AutoMigrate] Found ${pending.length} pending migrations`); + + // Run each pending migration in order + for (const filename of pending) { + await runMigration(pool, filename); + } + + console.log(`[AutoMigrate] Successfully applied ${pending.length} migrations`); + return pending.length; + + } catch (error: any) { + console.error('[AutoMigrate] Migration failed:', error.message); + // Don't crash the server - log and continue + // The specific failing migration will have been rolled back + return -1; + } +} + +/** + * Check migration status without running anything + */ +export async function checkMigrationStatus(pool: Pool): Promise<{ + applied: string[]; + pending: string[]; +}> { + await ensureMigrationsTable(pool); + + const applied = await getAppliedMigrations(pool); + const available = getMigrationFiles(); + + return { + applied: available.filter(f => applied.has(f)), + pending: available.filter(f => !applied.has(f)), + }; +} diff --git a/backend/src/index.ts b/backend/src/index.ts index 6a8d1d3e..4be0011e 100755 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -6,6 +6,8 @@ import { initializeMinio, isMinioEnabled } from './utils/minio'; import { initializeImageStorage } from './utils/image-storage'; import { logger } from './services/logger'; import { cleanupOrphanedJobs } from './services/proxyTestQueue'; +import { runAutoMigrations } from './db/auto-migrate'; +import { getPool } from './db/pool'; import healthRoutes from './routes/health'; import imageProxyRoutes from './routes/image-proxy'; @@ -307,6 +309,17 @@ async function startServer() { try { logger.info('system', 'Starting server...'); + // Run auto-migrations before anything else + const pool = getPool(); + const migrationsApplied = await runAutoMigrations(pool); + if (migrationsApplied > 0) { + logger.info('system', `Applied ${migrationsApplied} database migrations`); + } else if (migrationsApplied === 0) { + logger.info('system', 'Database schema up to date'); + } else { + logger.warn('system', 'Some migrations failed - check logs'); + } + await initializeMinio(); await initializeImageStorage(); logger.info('system', isMinioEnabled() ? 'MinIO storage initialized' : 'Local filesystem storage initialized'); diff --git a/backend/src/routes/proxies.ts b/backend/src/routes/proxies.ts index 36d33468..0ab9f090 100755 --- a/backend/src/routes/proxies.ts +++ b/backend/src/routes/proxies.ts @@ -2,7 +2,7 @@ import { Router } from 'express'; import { authMiddleware, requireRole } from '../auth/middleware'; import { pool } from '../db/pool'; import { testProxy, addProxy, addProxiesFromList } from '../services/proxy'; -import { createProxyTestJob, getProxyTestJob, getActiveProxyTestJob, cancelProxyTestJob } from '../services/proxyTestQueue'; +import { createProxyTestJob, getProxyTestJob, getActiveProxyTestJob, cancelProxyTestJob, ProxyTestMode } from '../services/proxyTestQueue'; const router = Router(); router.use(authMiddleware); @@ -11,9 +11,10 @@ router.use(authMiddleware); router.get('/', async (req, res) => { try { const result = await pool.query(` - SELECT id, host, port, protocol, active, is_anonymous, + SELECT id, host, port, protocol, username, password, active, is_anonymous, last_tested_at, test_result, response_time_ms, created_at, - city, state, country, country_code, location_updated_at + city, state, country, country_code, location_updated_at, + COALESCE(max_connections, 1) as max_connections FROM proxies ORDER BY created_at DESC `); @@ -166,13 +167,39 @@ router.post('/:id/test', requireRole('superadmin', 'admin'), async (req, res) => }); // Start proxy test job +// Query params: mode=all|failed|inactive, concurrency=10 router.post('/test-all', requireRole('superadmin', 'admin'), async (req, res) => { try { - const jobId = await createProxyTestJob(); - res.json({ jobId, message: 'Proxy test job started' }); - } catch (error) { + const mode = (req.query.mode as ProxyTestMode) || 'all'; + const concurrency = parseInt(req.query.concurrency as string) || 10; + + // Validate mode + if (!['all', 'failed', 'inactive'].includes(mode)) { + return res.status(400).json({ error: 'Invalid mode. Use: all, failed, or inactive' }); + } + + // Validate concurrency (1-50) + if (concurrency < 1 || concurrency > 50) { + return res.status(400).json({ error: 'Concurrency must be between 1 and 50' }); + } + + const jobId = await createProxyTestJob(mode, concurrency); + res.json({ jobId, mode, concurrency, message: `Proxy test job started (mode: ${mode}, concurrency: ${concurrency})` }); + } catch (error: any) { console.error('Error starting proxy test job:', error); - res.status(500).json({ error: 'Failed to start proxy test job' }); + res.status(500).json({ error: error.message || 'Failed to start proxy test job' }); + } +}); + +// Convenience endpoint: Test only failed proxies +router.post('/test-failed', requireRole('superadmin', 'admin'), async (req, res) => { + try { + const concurrency = parseInt(req.query.concurrency as string) || 10; + const jobId = await createProxyTestJob('failed', concurrency); + res.json({ jobId, mode: 'failed', concurrency, message: 'Retesting failed proxies...' }); + } catch (error: any) { + console.error('Error starting failed proxy test:', error); + res.status(500).json({ error: error.message || 'Failed to start proxy test job' }); } }); @@ -197,8 +224,8 @@ router.post('/test-job/:jobId/cancel', requireRole('superadmin', 'admin'), async router.put('/:id', requireRole('superadmin', 'admin'), async (req, res) => { try { const { id } = req.params; - const { host, port, protocol, username, password, active } = req.body; - + const { host, port, protocol, username, password, active, max_connections } = req.body; + const result = await pool.query(` UPDATE proxies SET host = COALESCE($1, host), @@ -207,10 +234,11 @@ router.put('/:id', requireRole('superadmin', 'admin'), async (req, res) => { username = COALESCE($4, username), password = COALESCE($5, password), active = COALESCE($6, active), + max_connections = COALESCE($7, max_connections), updated_at = CURRENT_TIMESTAMP - WHERE id = $7 + WHERE id = $8 RETURNING * - `, [host, port, protocol, username, password, active, id]); + `, [host, port, protocol, username, password, active, max_connections, id]); if (result.rows.length === 0) { return res.status(404).json({ error: 'Proxy not found' }); diff --git a/backend/src/services/crawl-rotator.ts b/backend/src/services/crawl-rotator.ts index 4b9708cf..ee072ed3 100644 --- a/backend/src/services/crawl-rotator.ts +++ b/backend/src/services/crawl-rotator.ts @@ -61,6 +61,13 @@ export interface Proxy { failureCount: number; successCount: number; avgResponseTimeMs: number | null; + maxConnections: number; // Number of concurrent connections allowed (for rotating proxies) + // Location info (if known) + city?: string; + state?: string; + country?: string; + countryCode?: string; + timezone?: string; } export interface ProxyStats { @@ -113,14 +120,23 @@ export class ProxyRotator { last_tested_at as "lastUsedAt", failure_count as "failureCount", 0 as "successCount", - response_time_ms as "avgResponseTimeMs" + response_time_ms as "avgResponseTimeMs", + COALESCE(max_connections, 1) as "maxConnections", + city, + state, + country, + country_code as "countryCode", + timezone FROM proxies WHERE active = true ORDER BY failure_count ASC, last_tested_at ASC NULLS FIRST `); this.proxies = result.rows; - console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies`); + + // Calculate total concurrent capacity + const totalCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0); + console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections)`); } catch (error) { // Table might not exist - that's okay console.warn(`[ProxyRotator] Could not load proxies: ${error}`); @@ -256,7 +272,7 @@ export class ProxyRotator { */ getStats(): ProxyStats { const totalProxies = this.proxies.length; - const activeProxies = this.proxies.filter(p => p.isActive).length; + const activeProxies = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0); // Total concurrent capacity const blockedProxies = this.proxies.filter(p => p.failureCount >= 5).length; const successRates = this.proxies @@ -269,7 +285,7 @@ export class ProxyRotator { return { totalProxies, - activeProxies, + activeProxies, // Total concurrent capacity across all proxies blockedProxies, avgSuccessRate, }; @@ -403,6 +419,26 @@ export class CrawlRotator { await this.proxy.markFailed(current.id, error); } } + + /** + * Get current proxy location info (for reporting) + * Note: For rotating proxies (like IPRoyal), the actual exit location varies per request + */ + getProxyLocation(): { city?: string; state?: string; country?: string; timezone?: string; isRotating: boolean } | null { + const current = this.proxy.getCurrent(); + if (!current) return null; + + // Check if this is a rotating proxy (max_connections > 1 usually indicates rotating) + const isRotating = current.maxConnections > 1; + + return { + city: current.city, + state: current.state, + country: current.country, + timezone: current.timezone, + isRotating + }; + } } // ============================================================ diff --git a/backend/src/services/proxy.ts b/backend/src/services/proxy.ts index 15cb6b34..a00d6f8f 100755 --- a/backend/src/services/proxy.ts +++ b/backend/src/services/proxy.ts @@ -276,7 +276,6 @@ export async function addProxiesFromList(proxies: Array<{ await pool.query(` INSERT INTO proxies (host, port, protocol, username, password, active) VALUES ($1, $2, $3, $4, $5, false) - ON CONFLICT (host, port, protocol) DO NOTHING `, [ proxy.host, proxy.port, @@ -285,27 +284,9 @@ export async function addProxiesFromList(proxies: Array<{ proxy.password ]); - // Check if it was actually inserted - const result = await pool.query(` - SELECT id FROM proxies - WHERE host = $1 AND port = $2 AND protocol = $3 - `, [proxy.host, proxy.port, proxy.protocol]); - - if (result.rows.length > 0) { - // Check if it was just inserted (no last_tested_at means new) - const checkResult = await pool.query(` - SELECT last_tested_at FROM proxies - WHERE host = $1 AND port = $2 AND protocol = $3 - `, [proxy.host, proxy.port, proxy.protocol]); - - if (checkResult.rows[0].last_tested_at === null) { - added++; - if (added % 100 === 0) { - console.log(`📥 Imported ${added} proxies...`); - } - } else { - duplicates++; - } + added++; + if (added % 100 === 0) { + console.log(`📥 Imported ${added} proxies...`); } } catch (error: any) { failed++; diff --git a/backend/src/services/proxyTestQueue.ts b/backend/src/services/proxyTestQueue.ts index 42b24128..c17b897c 100644 --- a/backend/src/services/proxyTestQueue.ts +++ b/backend/src/services/proxyTestQueue.ts @@ -8,8 +8,12 @@ interface ProxyTestJob { tested_proxies: number; passed_proxies: number; failed_proxies: number; + mode?: string; // 'all' | 'failed' | 'inactive' } +// Concurrency settings +const DEFAULT_CONCURRENCY = 10; // Test 10 proxies at a time + // Simple in-memory queue - could be replaced with Bull/Bee-Queue for production const activeJobs = new Map(); @@ -33,18 +37,35 @@ export async function cleanupOrphanedJobs(): Promise { } } -export async function createProxyTestJob(): Promise { +export type ProxyTestMode = 'all' | 'failed' | 'inactive'; + +export async function createProxyTestJob(mode: ProxyTestMode = 'all', concurrency: number = DEFAULT_CONCURRENCY): Promise { // Check for existing running jobs first const existingJob = await getActiveProxyTestJob(); if (existingJob) { throw new Error('A proxy test job is already running. Please cancel it first.'); } - const result = await pool.query(` - SELECT COUNT(*) as count FROM proxies - `); + // Get count based on mode + let countQuery: string; + switch (mode) { + case 'failed': + countQuery = `SELECT COUNT(*) as count FROM proxies WHERE test_result = 'failed' OR active = false`; + break; + case 'inactive': + countQuery = `SELECT COUNT(*) as count FROM proxies WHERE active = false`; + break; + default: + countQuery = `SELECT COUNT(*) as count FROM proxies`; + } + + const result = await pool.query(countQuery); const totalProxies = parseInt(result.rows[0].count); + if (totalProxies === 0) { + throw new Error(`No proxies to test with mode '${mode}'`); + } + const jobResult = await pool.query(` INSERT INTO proxy_test_jobs (status, total_proxies) VALUES ('pending', $1) @@ -53,8 +74,8 @@ export async function createProxyTestJob(): Promise { const jobId = jobResult.rows[0].id; - // Start job in background - runProxyTestJob(jobId).catch(err => { + // Start job in background with mode and concurrency + runProxyTestJob(jobId, mode, concurrency).catch(err => { console.error(`❌ Proxy test job ${jobId} failed:`, err); }); @@ -111,7 +132,7 @@ export async function cancelProxyTestJob(jobId: number): Promise { return result.rows.length > 0; } -async function runProxyTestJob(jobId: number): Promise { +async function runProxyTestJob(jobId: number, mode: ProxyTestMode = 'all', concurrency: number = DEFAULT_CONCURRENCY): Promise { // Register job as active activeJobs.set(jobId, { cancelled: false }); @@ -125,20 +146,30 @@ async function runProxyTestJob(jobId: number): Promise { WHERE id = $1 `, [jobId]); - console.log(`🔍 Starting proxy test job ${jobId}...`); + console.log(`🔍 Starting proxy test job ${jobId} (mode: ${mode}, concurrency: ${concurrency})...`); - // Get all proxies - const result = await pool.query(` - SELECT id, host, port, protocol, username, password - FROM proxies - ORDER BY id - `); + // Get proxies based on mode + let query: string; + switch (mode) { + case 'failed': + query = `SELECT id, host, port, protocol, username, password FROM proxies WHERE test_result = 'failed' OR active = false ORDER BY id`; + break; + case 'inactive': + query = `SELECT id, host, port, protocol, username, password FROM proxies WHERE active = false ORDER BY id`; + break; + default: + query = `SELECT id, host, port, protocol, username, password FROM proxies ORDER BY id`; + } + + const result = await pool.query(query); + const proxies = result.rows; let tested = 0; let passed = 0; let failed = 0; - for (const proxy of result.rows) { + // Process proxies in batches for parallel testing + for (let i = 0; i < proxies.length; i += concurrency) { // Check if job was cancelled const jobControl = activeJobs.get(jobId); if (jobControl?.cancelled) { @@ -146,23 +177,34 @@ async function runProxyTestJob(jobId: number): Promise { break; } - // Test the proxy - const testResult = await testProxy( - proxy.host, - proxy.port, - proxy.protocol, - proxy.username, - proxy.password + const batch = proxies.slice(i, i + concurrency); + + // Test batch in parallel + const batchResults = await Promise.all( + batch.map(async (proxy) => { + const testResult = await testProxy( + proxy.host, + proxy.port, + proxy.protocol, + proxy.username, + proxy.password + ); + + // Save result + await saveProxyTestResult(proxy.id, testResult); + + return testResult.success; + }) ); - // Save result - await saveProxyTestResult(proxy.id, testResult); - - tested++; - if (testResult.success) { - passed++; - } else { - failed++; + // Count results + for (const success of batchResults) { + tested++; + if (success) { + passed++; + } else { + failed++; + } } // Update job progress @@ -175,10 +217,8 @@ async function runProxyTestJob(jobId: number): Promise { WHERE id = $4 `, [tested, passed, failed, jobId]); - // Log progress every 10 proxies - if (tested % 10 === 0) { - console.log(`📊 Job ${jobId}: ${tested}/${result.rows.length} proxies tested (${passed} passed, ${failed} failed)`); - } + // Log progress + console.log(`📊 Job ${jobId}: ${tested}/${proxies.length} proxies tested (${passed} passed, ${failed} failed)`); } // Mark job as completed diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index df33597f..7adf7bd9 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -182,12 +182,13 @@ export class TaskWorker { } /** - * Send heartbeat to registry with resource usage + * Send heartbeat to registry with resource usage and proxy location */ private async sendRegistryHeartbeat(): Promise { try { const memUsage = process.memoryUsage(); const cpuUsage = process.cpuUsage(); + const proxyLocation = this.crawlRotator.getProxyLocation(); await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, { method: 'POST', @@ -202,6 +203,7 @@ export class TaskWorker { memory_rss_mb: Math.round(memUsage.rss / 1024 / 1024), cpu_user_ms: Math.round(cpuUsage.user / 1000), cpu_system_ms: Math.round(cpuUsage.system / 1000), + proxy_location: proxyLocation, } }) }); diff --git a/cannaiq/src/pages/Proxies.tsx b/cannaiq/src/pages/Proxies.tsx index 4b6cf17d..130dfbfd 100755 --- a/cannaiq/src/pages/Proxies.tsx +++ b/cannaiq/src/pages/Proxies.tsx @@ -2,12 +2,13 @@ import { useEffect, useState } from 'react'; import { Layout } from '../components/Layout'; import { api } from '../lib/api'; import { Toast } from '../components/Toast'; -import { Shield, CheckCircle, XCircle, RefreshCw, Plus, MapPin, Clock, TrendingUp, Trash2, AlertCircle, Upload, FileText, X } from 'lucide-react'; +import { Shield, CheckCircle, XCircle, RefreshCw, Plus, MapPin, Clock, TrendingUp, Trash2, AlertCircle, Upload, FileText, X, Edit2 } from 'lucide-react'; export function Proxies() { const [proxies, setProxies] = useState([]); const [loading, setLoading] = useState(true); const [showAddForm, setShowAddForm] = useState(false); + const [editingProxy, setEditingProxy] = useState(null); const [testing, setTesting] = useState<{ [key: number]: boolean }>({}); const [activeJob, setActiveJob] = useState(null); const [notification, setNotification] = useState<{ message: string; type: 'success' | 'error' | 'info' } | null>(null); @@ -342,6 +343,18 @@ export function Proxies() { /> )} + {/* Edit Proxy Modal */} + {editingProxy && ( + setEditingProxy(null)} + onSuccess={() => { + setEditingProxy(null); + loadProxies(); + }} + /> + )} + {/* Proxy List */}
{proxies.map(proxy => ( @@ -360,6 +373,9 @@ export function Proxies() { {proxy.is_anonymous && ( Anonymous )} + {proxy.max_connections > 1 && ( + {proxy.max_connections} connections + )} {(proxy.city || proxy.state || proxy.country) && ( @@ -394,6 +410,13 @@ export function Proxies() {
+ {!proxy.active ? (
); } + +function EditProxyModal({ proxy, onClose, onSuccess }: { proxy: any; onClose: () => void; onSuccess: () => void }) { + const [host, setHost] = useState(proxy.host || ''); + const [port, setPort] = useState(proxy.port?.toString() || ''); + const [protocol, setProtocol] = useState(proxy.protocol || 'http'); + const [username, setUsername] = useState(proxy.username || ''); + const [password, setPassword] = useState(proxy.password || ''); + const [maxConnections, setMaxConnections] = useState(proxy.max_connections?.toString() || '1'); + const [loading, setSaving] = useState(false); + const [notification, setNotification] = useState<{ message: string; type: 'success' | 'error' | 'info' } | null>(null); + + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault(); + setSaving(true); + + try { + await api.updateProxy(proxy.id, { + host, + port: parseInt(port), + protocol, + username: username || undefined, + password: password || undefined, + max_connections: parseInt(maxConnections) || 1, + }); + onSuccess(); + } catch (error: any) { + setNotification({ message: 'Failed to update proxy: ' + error.message, type: 'error' }); + } finally { + setSaving(false); + } + }; + + return ( +
+
+ {notification && ( + setNotification(null)} + /> + )} +
+
+

Edit Proxy

+ +
+ +
+
+
+ + setHost(e.target.value)} + required + className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500 focus:border-blue-500" + /> +
+ +
+
+ + setPort(e.target.value)} + required + className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500 focus:border-blue-500" + /> +
+ +
+ + +
+
+ +
+ + setUsername(e.target.value)} + placeholder="Optional" + className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500 focus:border-blue-500" + /> +
+ +
+ + setPassword(e.target.value)} + placeholder="Optional" + className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500 focus:border-blue-500" + /> +
+ +
+ + setMaxConnections(e.target.value)} + min="1" + max="500" + className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500 focus:border-blue-500" + /> +

For rotating proxies - allows concurrent connections

+
+
+ +
+ + +
+
+
+
+
+ ); +} diff --git a/cannaiq/src/pages/WorkersDashboard.tsx b/cannaiq/src/pages/WorkersDashboard.tsx index 253ff631..277f81ea 100644 --- a/cannaiq/src/pages/WorkersDashboard.tsx +++ b/cannaiq/src/pages/WorkersDashboard.tsx @@ -15,6 +15,9 @@ import { Cpu, Heart, Gauge, + Server, + MapPin, + Trash2, } from 'lucide-react'; // Worker from registry @@ -26,6 +29,7 @@ interface Worker { status: string; pod_name: string | null; hostname: string | null; + ip_address: string | null; started_at: string; last_heartbeat_at: string; last_task_at: string | null; @@ -34,6 +38,22 @@ interface Worker { current_task_id: number | null; health_status: string; seconds_since_heartbeat: number; + metadata: { + cpu?: number; + memory?: number; + memoryTotal?: number; + memory_mb?: number; + memory_total_mb?: number; + cpu_user_ms?: number; + cpu_system_ms?: number; + proxy_location?: { + city?: string; + state?: string; + country?: string; + timezone?: string; + isRotating?: boolean; + }; + } | null; } // Current task info @@ -140,7 +160,7 @@ function LiveTimer({ startedAt }: { startedAt: string | null }) { ); } -function RoleBadge({ role }: { role: string }) { +function RoleBadge({ role }: { role: string | null }) { const colors: Record = { product_refresh: 'bg-emerald-100 text-emerald-700', product_discovery: 'bg-blue-100 text-blue-700', @@ -149,6 +169,14 @@ function RoleBadge({ role }: { role: string }) { analytics_refresh: 'bg-pink-100 text-pink-700', }; + if (!role) { + return ( + + any task + + ); + } + return ( {role.replace(/_/g, ' ')} @@ -210,6 +238,27 @@ export function WorkersDashboard() { } }, []); + // Cleanup stale workers + const handleCleanupStale = async () => { + try { + await api.post('/api/worker-registry/cleanup', { stale_threshold_minutes: 2 }); + fetchData(); + } catch (err: any) { + console.error('Cleanup error:', err); + } + }; + + // Remove a single worker + const handleRemoveWorker = async (workerId: string) => { + if (!confirm('Remove this worker from the registry?')) return; + try { + await api.delete(`/api/worker-registry/workers/${workerId}`); + fetchData(); + } catch (err: any) { + console.error('Remove error:', err); + } + }; + useEffect(() => { fetchData(); const interval = setInterval(fetchData, 5000); @@ -256,13 +305,23 @@ export function WorkersDashboard() { {workers.length} registered workers ({busyWorkers.length} busy, {idleWorkers.length} idle)

- +
+ + +
{error && ( @@ -372,11 +431,12 @@ export function WorkersDashboard() { Worker Role Status + Exit Location Current Task - Task Duration + Duration Utilization Heartbeat - Uptime + @@ -409,6 +469,35 @@ export function WorkersDashboard() { + + {(() => { + const loc = worker.metadata?.proxy_location; + if (!loc) { + return -; + } + const parts = [loc.city, loc.state, loc.country].filter(Boolean); + if (parts.length === 0) { + return loc.isRotating ? ( + + Rotating + + ) : ( + Unknown + ); + } + return ( +
+ + + {parts.join(', ')} + + {loc.isRotating && ( + * + )} +
+ ); + })()} + {worker.current_task_id ? (
@@ -452,8 +541,14 @@ export function WorkersDashboard() { {formatRelativeTime(worker.last_heartbeat_at)}
- - {formatUptime(worker.started_at)} + + ); From b20a0a4fa582309206fcbd5cb5d19eb71d7fce46 Mon Sep 17 00:00:00 2001 From: Kelly Date: Wed, 10 Dec 2025 08:12:04 -0700 Subject: [PATCH 2/4] fix: Add generic delete method to ApiClient + CI speedups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add delete() method to ApiClient for WorkersDashboard cleanup - Add npm cache volume for faster npm ci - Add TypeScript incremental builds with tsBuildInfoFile cache - Should significantly speed up repeated CI runs 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .woodpecker/.ci.yml | 26 ++++++++++++++++++-------- backend/src/index.ts | 1 - cannaiq/src/lib/api.ts | 7 +++++++ 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/.woodpecker/.ci.yml b/.woodpecker/.ci.yml index 1d23337a..660f6b22 100644 --- a/.woodpecker/.ci.yml +++ b/.woodpecker/.ci.yml @@ -1,46 +1,56 @@ when: - event: [push, pull_request] +# Volume mounts for caching npm and TypeScript across builds +variables: + - &node_volumes + - npm-cache:/root/.npm + - tsc-cache:/tmp/tsc-cache + steps: # =========================================== # PR VALIDATION: Parallel type checks (PRs only) # =========================================== typecheck-backend: image: code.cannabrands.app/creationshop/node:20 + volumes: *node_volumes commands: - cd backend - - npm ci --prefer-offline - - npx tsc --noEmit + - npm ci --prefer-offline --cache /root/.npm + - npx tsc --incremental --tsBuildInfoFile /tmp/tsc-cache/backend.tsbuildinfo --noEmit depends_on: [] when: event: pull_request typecheck-cannaiq: image: code.cannabrands.app/creationshop/node:20 + volumes: *node_volumes commands: - cd cannaiq - - npm ci --prefer-offline - - npx tsc --noEmit + - npm ci --prefer-offline --cache /root/.npm + - npx tsc --incremental --tsBuildInfoFile /tmp/tsc-cache/cannaiq.tsbuildinfo --noEmit depends_on: [] when: event: pull_request typecheck-findadispo: image: code.cannabrands.app/creationshop/node:20 + volumes: *node_volumes commands: - cd findadispo/frontend - - npm ci --prefer-offline - - npx tsc --noEmit 2>/dev/null || true + - npm ci --prefer-offline --cache /root/.npm + - npx tsc --incremental --tsBuildInfoFile /tmp/tsc-cache/findadispo.tsbuildinfo --noEmit 2>/dev/null || true depends_on: [] when: event: pull_request typecheck-findagram: image: code.cannabrands.app/creationshop/node:20 + volumes: *node_volumes commands: - cd findagram/frontend - - npm ci --prefer-offline - - npx tsc --noEmit 2>/dev/null || true + - npm ci --prefer-offline --cache /root/.npm + - npx tsc --incremental --tsBuildInfoFile /tmp/tsc-cache/findagram.tsbuildinfo --noEmit 2>/dev/null || true depends_on: [] when: event: pull_request diff --git a/backend/src/index.ts b/backend/src/index.ts index 4be0011e..89198438 100755 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -129,7 +129,6 @@ import { createStatesRouter } from './routes/states'; import { createAnalyticsV2Router } from './routes/analytics-v2'; import { createDiscoveryRoutes } from './discovery'; import pipelineRoutes from './routes/pipeline'; -import { getPool } from './db/pool'; // Consumer API routes (findadispo.com, findagram.co) import consumerAuthRoutes from './routes/consumer-auth'; diff --git a/cannaiq/src/lib/api.ts b/cannaiq/src/lib/api.ts index b96f7fd9..c1df5b10 100755 --- a/cannaiq/src/lib/api.ts +++ b/cannaiq/src/lib/api.ts @@ -69,6 +69,13 @@ class ApiClient { return { data }; } + async delete(endpoint: string): Promise<{ data: T }> { + const data = await this.request(endpoint, { + method: 'DELETE', + }); + return { data }; + } + // Auth async login(email: string, password: string) { return this.request<{ token: string; user: any }>('/api/auth/login', { From 4e84f30f8b51b9221633314aeec59275d17159c0 Mon Sep 17 00:00:00 2001 From: Kelly Date: Wed, 10 Dec 2025 08:41:14 -0700 Subject: [PATCH 3/4] feat: Auto-retry tasks, 403 proxy rotation, task deletion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix 403 handler to rotate BOTH proxy and fingerprint (was only fingerprint) - Add auto-retry logic to task service (retry up to max_retries before failing) - Add error tooltip on task status badge showing retry count and error message - Add DELETE /api/tasks/:id endpoint (only for non-running tasks) - Add delete button to JobQueue task table 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/src/platforms/dutchie/client.ts | 6 ++- backend/src/routes/tasks.ts | 30 +++++++++++++++ backend/src/tasks/task-service.ts | 46 ++++++++++++++++++++-- cannaiq/src/pages/JobQueue.tsx | 51 +++++++++++++++++++++++-- 4 files changed, 123 insertions(+), 10 deletions(-) diff --git a/backend/src/platforms/dutchie/client.ts b/backend/src/platforms/dutchie/client.ts index 4817e476..817b0fd4 100644 --- a/backend/src/platforms/dutchie/client.ts +++ b/backend/src/platforms/dutchie/client.ts @@ -534,7 +534,8 @@ export async function executeGraphQL( } if (response.status === 403 && retryOn403) { - console.warn(`[Dutchie Client] 403 blocked - rotating fingerprint...`); + console.warn(`[Dutchie Client] 403 blocked - rotating proxy and fingerprint...`); + await rotateProxyOn403('403 Forbidden on GraphQL'); rotateFingerprint(); attempt++; await sleep(1000 * attempt); @@ -617,7 +618,8 @@ export async function fetchPage( } if (response.status === 403 && retryOn403) { - console.warn(`[Dutchie Client] 403 blocked - rotating fingerprint...`); + console.warn(`[Dutchie Client] 403 blocked - rotating proxy and fingerprint...`); + await rotateProxyOn403('403 Forbidden on page fetch'); rotateFingerprint(); attempt++; await sleep(1000 * attempt); diff --git a/backend/src/routes/tasks.ts b/backend/src/routes/tasks.ts index 6c68e3cc..83c97ed3 100644 --- a/backend/src/routes/tasks.ts +++ b/backend/src/routes/tasks.ts @@ -145,6 +145,36 @@ router.get('/:id', async (req: Request, res: Response) => { } }); +/** + * DELETE /api/tasks/:id + * Delete a specific task by ID + * Only allows deletion of failed, completed, or pending tasks (not running) + */ +router.delete('/:id', async (req: Request, res: Response) => { + try { + const taskId = parseInt(req.params.id, 10); + + // First check if task exists and its status + const task = await taskService.getTask(taskId); + if (!task) { + return res.status(404).json({ error: 'Task not found' }); + } + + // Don't allow deleting running tasks + if (task.status === 'running' || task.status === 'claimed') { + return res.status(400).json({ error: 'Cannot delete a running or claimed task' }); + } + + // Delete the task + await pool.query('DELETE FROM worker_tasks WHERE id = $1', [taskId]); + + res.json({ success: true, message: `Task ${taskId} deleted` }); + } catch (error: unknown) { + console.error('Error deleting task:', error); + res.status(500).json({ error: 'Failed to delete task' }); + } +}); + /** * POST /api/tasks * Create a new task diff --git a/backend/src/tasks/task-service.ts b/backend/src/tasks/task-service.ts index 979e3401..c28c6a76 100644 --- a/backend/src/tasks/task-service.ts +++ b/backend/src/tasks/task-service.ts @@ -206,15 +206,53 @@ class TaskService { } /** - * Mark a task as failed + * Mark a task as failed, with auto-retry if under max_retries + * Returns true if task was re-queued for retry, false if permanently failed */ - async failTask(taskId: number, errorMessage: string): Promise { + async failTask(taskId: number, errorMessage: string): Promise { + // Get current retry state + const result = await pool.query( + `SELECT retry_count, max_retries FROM worker_tasks WHERE id = $1`, + [taskId] + ); + + if (result.rows.length === 0) { + return false; + } + + const { retry_count, max_retries } = result.rows[0]; + const newRetryCount = (retry_count || 0) + 1; + + if (newRetryCount < (max_retries || 3)) { + // Re-queue for retry - reset to pending with incremented retry_count + await pool.query( + `UPDATE worker_tasks + SET status = 'pending', + worker_id = NULL, + claimed_at = NULL, + started_at = NULL, + retry_count = $2, + error_message = $3, + updated_at = NOW() + WHERE id = $1`, + [taskId, newRetryCount, `Retry ${newRetryCount}: ${errorMessage}`] + ); + console.log(`[TaskService] Task ${taskId} queued for retry ${newRetryCount}/${max_retries || 3}`); + return true; + } + + // Max retries exceeded - mark as permanently failed await pool.query( `UPDATE worker_tasks - SET status = 'failed', completed_at = NOW(), error_message = $2 + SET status = 'failed', + completed_at = NOW(), + retry_count = $2, + error_message = $3 WHERE id = $1`, - [taskId, errorMessage] + [taskId, newRetryCount, `Failed after ${newRetryCount} attempts: ${errorMessage}`] ); + console.log(`[TaskService] Task ${taskId} permanently failed after ${newRetryCount} attempts`); + return false; } /** diff --git a/cannaiq/src/pages/JobQueue.tsx b/cannaiq/src/pages/JobQueue.tsx index 3dbdace9..37b6cf3e 100644 --- a/cannaiq/src/pages/JobQueue.tsx +++ b/cannaiq/src/pages/JobQueue.tsx @@ -17,6 +17,7 @@ import { X, Search, Calendar, + Trash2, } from 'lucide-react'; // Worker from registry @@ -61,6 +62,9 @@ interface Task { started_at: string | null; completed_at: string | null; error: string | null; + error_message: string | null; + retry_count: number; + max_retries: number; result: any; created_at: string; updated_at: string; @@ -499,7 +503,7 @@ function WorkerStatusBadge({ status, healthStatus }: { status: string; healthSta ); } -function TaskStatusBadge({ status }: { status: string }) { +function TaskStatusBadge({ status, error, retryCount }: { status: string; error?: string | null; retryCount?: number }) { const config: Record = { pending: { bg: 'bg-yellow-100', text: 'text-yellow-700', icon: Clock }, running: { bg: 'bg-blue-100', text: 'text-blue-700', icon: Activity }, @@ -510,10 +514,25 @@ function TaskStatusBadge({ status }: { status: string }) { const cfg = config[status] || { bg: 'bg-gray-100', text: 'text-gray-700', icon: Clock }; const Icon = cfg.icon; + // Build tooltip text + let tooltip = ''; + if (error) { + tooltip = error; + } + if (retryCount && retryCount > 0) { + tooltip = `Attempt ${retryCount + 1}${error ? `: ${error}` : ''}`; + } + return ( - + {status} + {retryCount && retryCount > 0 && status !== 'failed' && ( + ({retryCount}) + )} ); } @@ -735,6 +754,18 @@ export function JobQueue() { return () => clearInterval(interval); }, [fetchWorkers]); + // Delete a task + const handleDeleteTask = async (taskId: number) => { + if (!confirm('Delete this task?')) return; + try { + await api.delete(`/api/tasks/${taskId}`); + fetchTasks(); + } catch (err: any) { + console.error('Delete error:', err); + alert(err.response?.data?.error || 'Failed to delete task'); + } + }; + // Get active workers (for display) const activeWorkers = workers.filter(w => w.status !== 'offline' && w.status !== 'terminated'); const busyWorkers = workers.filter(w => w.current_task_id !== null); @@ -910,12 +941,13 @@ export function JobQueue() { Assigned To Created Duration + {tasks.length === 0 ? ( - +

No tasks found

@@ -958,7 +990,7 @@ export function JobQueue() { )} - + {assignedWorker ? ( @@ -986,6 +1018,17 @@ export function JobQueue() { '-' )} + + {(task.status === 'failed' || task.status === 'completed' || task.status === 'pending') && ( + + )} + ); }) From aea93bc96b9e962a22fdd9e77f3ddb6742b87580 Mon Sep 17 00:00:00 2001 From: Kelly Date: Wed, 10 Dec 2025 08:53:10 -0700 Subject: [PATCH 4/4] fix(ci): Revert volume caching - may have broken CI trigger --- .woodpecker/.ci.yml | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/.woodpecker/.ci.yml b/.woodpecker/.ci.yml index 660f6b22..1d23337a 100644 --- a/.woodpecker/.ci.yml +++ b/.woodpecker/.ci.yml @@ -1,56 +1,46 @@ when: - event: [push, pull_request] -# Volume mounts for caching npm and TypeScript across builds -variables: - - &node_volumes - - npm-cache:/root/.npm - - tsc-cache:/tmp/tsc-cache - steps: # =========================================== # PR VALIDATION: Parallel type checks (PRs only) # =========================================== typecheck-backend: image: code.cannabrands.app/creationshop/node:20 - volumes: *node_volumes commands: - cd backend - - npm ci --prefer-offline --cache /root/.npm - - npx tsc --incremental --tsBuildInfoFile /tmp/tsc-cache/backend.tsbuildinfo --noEmit + - npm ci --prefer-offline + - npx tsc --noEmit depends_on: [] when: event: pull_request typecheck-cannaiq: image: code.cannabrands.app/creationshop/node:20 - volumes: *node_volumes commands: - cd cannaiq - - npm ci --prefer-offline --cache /root/.npm - - npx tsc --incremental --tsBuildInfoFile /tmp/tsc-cache/cannaiq.tsbuildinfo --noEmit + - npm ci --prefer-offline + - npx tsc --noEmit depends_on: [] when: event: pull_request typecheck-findadispo: image: code.cannabrands.app/creationshop/node:20 - volumes: *node_volumes commands: - cd findadispo/frontend - - npm ci --prefer-offline --cache /root/.npm - - npx tsc --incremental --tsBuildInfoFile /tmp/tsc-cache/findadispo.tsbuildinfo --noEmit 2>/dev/null || true + - npm ci --prefer-offline + - npx tsc --noEmit 2>/dev/null || true depends_on: [] when: event: pull_request typecheck-findagram: image: code.cannabrands.app/creationshop/node:20 - volumes: *node_volumes commands: - cd findagram/frontend - - npm ci --prefer-offline --cache /root/.npm - - npx tsc --incremental --tsBuildInfoFile /tmp/tsc-cache/findagram.tsbuildinfo --noEmit 2>/dev/null || true + - npm ci --prefer-offline + - npx tsc --noEmit 2>/dev/null || true depends_on: [] when: event: pull_request