diff --git a/backend/migrations/082_proxy_notification_trigger.sql b/backend/migrations/082_proxy_notification_trigger.sql new file mode 100644 index 00000000..4dfe1732 --- /dev/null +++ b/backend/migrations/082_proxy_notification_trigger.sql @@ -0,0 +1,27 @@ +-- Migration: 082_proxy_notification_trigger +-- Date: 2024-12-11 +-- Description: Add PostgreSQL NOTIFY trigger to alert workers when proxies are added + +-- Create function to notify workers when active proxy is added/activated +CREATE OR REPLACE FUNCTION notify_proxy_added() +RETURNS TRIGGER AS $$ +BEGIN + -- Only notify if proxy is active + IF NEW.active = true THEN + PERFORM pg_notify('proxy_added', NEW.id::text); + END IF; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Drop existing trigger if any +DROP TRIGGER IF EXISTS proxy_added_trigger ON proxies; + +-- Create trigger on insert and update of active column +CREATE TRIGGER proxy_added_trigger +AFTER INSERT OR UPDATE OF active ON proxies +FOR EACH ROW +EXECUTE FUNCTION notify_proxy_added(); + +COMMENT ON FUNCTION notify_proxy_added() IS +'Sends PostgreSQL NOTIFY to proxy_added channel when an active proxy is added or activated. Workers LISTEN on this channel to wake up immediately.'; diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 39604d45..214fc357 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -117,40 +117,79 @@ export class TaskWorker { * Called once on worker startup before processing any tasks. * * IMPORTANT: Proxies are REQUIRED. Workers will wait until proxies are available. + * Workers listen for PostgreSQL NOTIFY 'proxy_added' to wake up immediately when proxies are added. */ private async initializeStealth(): Promise { const MAX_WAIT_MINUTES = 60; - const RETRY_INTERVAL_MS = 30000; // 30 seconds - const maxAttempts = (MAX_WAIT_MINUTES * 60 * 1000) / RETRY_INTERVAL_MS; + const POLL_INTERVAL_MS = 30000; // 30 seconds fallback polling + const maxAttempts = (MAX_WAIT_MINUTES * 60 * 1000) / POLL_INTERVAL_MS; let attempts = 0; + let notifyClient: any = null; - while (attempts < maxAttempts) { - try { - // Load proxies from database - await this.crawlRotator.initialize(); - - const stats = this.crawlRotator.proxy.getStats(); - if (stats.activeProxies > 0) { - console.log(`[TaskWorker] Loaded ${stats.activeProxies} proxies (${stats.avgSuccessRate.toFixed(1)}% avg success rate)`); - - // Wire rotator to Dutchie client - proxies will be used for ALL requests - setCrawlRotator(this.crawlRotator); - - console.log(`[TaskWorker] Stealth initialized: ${this.crawlRotator.userAgent.getCount()} fingerprints, proxy REQUIRED for all requests`); - return; - } - - attempts++; - console.log(`[TaskWorker] No active proxies available (attempt ${attempts}). Waiting ${RETRY_INTERVAL_MS / 1000}s for proxies to be added...`); - await this.sleep(RETRY_INTERVAL_MS); - } catch (error: any) { - attempts++; - console.log(`[TaskWorker] Error loading proxies (attempt ${attempts}): ${error.message}. Retrying in ${RETRY_INTERVAL_MS / 1000}s...`); - await this.sleep(RETRY_INTERVAL_MS); - } + // Set up PostgreSQL LISTEN for proxy notifications + try { + notifyClient = await this.pool.connect(); + await notifyClient.query('LISTEN proxy_added'); + console.log(`[TaskWorker] Listening for proxy_added notifications...`); + } catch (err: any) { + console.log(`[TaskWorker] Could not set up LISTEN (will poll): ${err.message}`); } - throw new Error(`No active proxies available after waiting ${MAX_WAIT_MINUTES} minutes. Add proxies to the database.`); + // Create a promise that resolves when notified + let notifyResolve: (() => void) | null = null; + if (notifyClient) { + notifyClient.on('notification', (msg: any) => { + if (msg.channel === 'proxy_added') { + console.log(`[TaskWorker] Received proxy_added notification!`); + if (notifyResolve) notifyResolve(); + } + }); + } + + try { + while (attempts < maxAttempts) { + try { + // Load proxies from database + await this.crawlRotator.initialize(); + + const stats = this.crawlRotator.proxy.getStats(); + if (stats.activeProxies > 0) { + console.log(`[TaskWorker] Loaded ${stats.activeProxies} proxies (${stats.avgSuccessRate.toFixed(1)}% avg success rate)`); + + // Wire rotator to Dutchie client - proxies will be used for ALL requests + setCrawlRotator(this.crawlRotator); + + console.log(`[TaskWorker] Stealth initialized: ${this.crawlRotator.userAgent.getCount()} fingerprints, proxy REQUIRED for all requests`); + return; + } + + attempts++; + console.log(`[TaskWorker] No active proxies available (attempt ${attempts}). Waiting for proxies...`); + + // Wait for either notification or timeout + await new Promise((resolve) => { + notifyResolve = resolve; + setTimeout(resolve, POLL_INTERVAL_MS); + }); + } catch (error: any) { + attempts++; + console.log(`[TaskWorker] Error loading proxies (attempt ${attempts}): ${error.message}. Retrying...`); + await this.sleep(POLL_INTERVAL_MS); + } + } + + throw new Error(`No active proxies available after waiting ${MAX_WAIT_MINUTES} minutes. Add proxies to the database.`); + } finally { + // Clean up LISTEN connection + if (notifyClient) { + try { + await notifyClient.query('UNLISTEN proxy_added'); + notifyClient.release(); + } catch { + // Ignore cleanup errors + } + } + } } /**