feat(worker): Listen for proxy_added notifications
- Workers now use PostgreSQL LISTEN/NOTIFY to wake up immediately when proxies are added - Added trigger on proxies table to NOTIFY 'proxy_added' when active proxy inserted/updated - Falls back to 30s polling if LISTEN fails 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
27
backend/migrations/082_proxy_notification_trigger.sql
Normal file
27
backend/migrations/082_proxy_notification_trigger.sql
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
-- Migration: 082_proxy_notification_trigger
|
||||||
|
-- Date: 2024-12-11
|
||||||
|
-- Description: Add PostgreSQL NOTIFY trigger to alert workers when proxies are added
|
||||||
|
|
||||||
|
-- Create function to notify workers when active proxy is added/activated
|
||||||
|
CREATE OR REPLACE FUNCTION notify_proxy_added()
|
||||||
|
RETURNS TRIGGER AS $$
|
||||||
|
BEGIN
|
||||||
|
-- Only notify if proxy is active
|
||||||
|
IF NEW.active = true THEN
|
||||||
|
PERFORM pg_notify('proxy_added', NEW.id::text);
|
||||||
|
END IF;
|
||||||
|
RETURN NEW;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
-- Drop existing trigger if any
|
||||||
|
DROP TRIGGER IF EXISTS proxy_added_trigger ON proxies;
|
||||||
|
|
||||||
|
-- Create trigger on insert and update of active column
|
||||||
|
CREATE TRIGGER proxy_added_trigger
|
||||||
|
AFTER INSERT OR UPDATE OF active ON proxies
|
||||||
|
FOR EACH ROW
|
||||||
|
EXECUTE FUNCTION notify_proxy_added();
|
||||||
|
|
||||||
|
COMMENT ON FUNCTION notify_proxy_added() IS
|
||||||
|
'Sends PostgreSQL NOTIFY to proxy_added channel when an active proxy is added or activated. Workers LISTEN on this channel to wake up immediately.';
|
||||||
@@ -117,40 +117,79 @@ export class TaskWorker {
|
|||||||
* Called once on worker startup before processing any tasks.
|
* Called once on worker startup before processing any tasks.
|
||||||
*
|
*
|
||||||
* IMPORTANT: Proxies are REQUIRED. Workers will wait until proxies are available.
|
* IMPORTANT: Proxies are REQUIRED. Workers will wait until proxies are available.
|
||||||
|
* Workers listen for PostgreSQL NOTIFY 'proxy_added' to wake up immediately when proxies are added.
|
||||||
*/
|
*/
|
||||||
private async initializeStealth(): Promise<void> {
|
private async initializeStealth(): Promise<void> {
|
||||||
const MAX_WAIT_MINUTES = 60;
|
const MAX_WAIT_MINUTES = 60;
|
||||||
const RETRY_INTERVAL_MS = 30000; // 30 seconds
|
const POLL_INTERVAL_MS = 30000; // 30 seconds fallback polling
|
||||||
const maxAttempts = (MAX_WAIT_MINUTES * 60 * 1000) / RETRY_INTERVAL_MS;
|
const maxAttempts = (MAX_WAIT_MINUTES * 60 * 1000) / POLL_INTERVAL_MS;
|
||||||
let attempts = 0;
|
let attempts = 0;
|
||||||
|
let notifyClient: any = null;
|
||||||
|
|
||||||
while (attempts < maxAttempts) {
|
// Set up PostgreSQL LISTEN for proxy notifications
|
||||||
try {
|
try {
|
||||||
// Load proxies from database
|
notifyClient = await this.pool.connect();
|
||||||
await this.crawlRotator.initialize();
|
await notifyClient.query('LISTEN proxy_added');
|
||||||
|
console.log(`[TaskWorker] Listening for proxy_added notifications...`);
|
||||||
const stats = this.crawlRotator.proxy.getStats();
|
} catch (err: any) {
|
||||||
if (stats.activeProxies > 0) {
|
console.log(`[TaskWorker] Could not set up LISTEN (will poll): ${err.message}`);
|
||||||
console.log(`[TaskWorker] Loaded ${stats.activeProxies} proxies (${stats.avgSuccessRate.toFixed(1)}% avg success rate)`);
|
|
||||||
|
|
||||||
// Wire rotator to Dutchie client - proxies will be used for ALL requests
|
|
||||||
setCrawlRotator(this.crawlRotator);
|
|
||||||
|
|
||||||
console.log(`[TaskWorker] Stealth initialized: ${this.crawlRotator.userAgent.getCount()} fingerprints, proxy REQUIRED for all requests`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
attempts++;
|
|
||||||
console.log(`[TaskWorker] No active proxies available (attempt ${attempts}). Waiting ${RETRY_INTERVAL_MS / 1000}s for proxies to be added...`);
|
|
||||||
await this.sleep(RETRY_INTERVAL_MS);
|
|
||||||
} catch (error: any) {
|
|
||||||
attempts++;
|
|
||||||
console.log(`[TaskWorker] Error loading proxies (attempt ${attempts}): ${error.message}. Retrying in ${RETRY_INTERVAL_MS / 1000}s...`);
|
|
||||||
await this.sleep(RETRY_INTERVAL_MS);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new Error(`No active proxies available after waiting ${MAX_WAIT_MINUTES} minutes. Add proxies to the database.`);
|
// Create a promise that resolves when notified
|
||||||
|
let notifyResolve: (() => void) | null = null;
|
||||||
|
if (notifyClient) {
|
||||||
|
notifyClient.on('notification', (msg: any) => {
|
||||||
|
if (msg.channel === 'proxy_added') {
|
||||||
|
console.log(`[TaskWorker] Received proxy_added notification!`);
|
||||||
|
if (notifyResolve) notifyResolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (attempts < maxAttempts) {
|
||||||
|
try {
|
||||||
|
// Load proxies from database
|
||||||
|
await this.crawlRotator.initialize();
|
||||||
|
|
||||||
|
const stats = this.crawlRotator.proxy.getStats();
|
||||||
|
if (stats.activeProxies > 0) {
|
||||||
|
console.log(`[TaskWorker] Loaded ${stats.activeProxies} proxies (${stats.avgSuccessRate.toFixed(1)}% avg success rate)`);
|
||||||
|
|
||||||
|
// Wire rotator to Dutchie client - proxies will be used for ALL requests
|
||||||
|
setCrawlRotator(this.crawlRotator);
|
||||||
|
|
||||||
|
console.log(`[TaskWorker] Stealth initialized: ${this.crawlRotator.userAgent.getCount()} fingerprints, proxy REQUIRED for all requests`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
attempts++;
|
||||||
|
console.log(`[TaskWorker] No active proxies available (attempt ${attempts}). Waiting for proxies...`);
|
||||||
|
|
||||||
|
// Wait for either notification or timeout
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
notifyResolve = resolve;
|
||||||
|
setTimeout(resolve, POLL_INTERVAL_MS);
|
||||||
|
});
|
||||||
|
} catch (error: any) {
|
||||||
|
attempts++;
|
||||||
|
console.log(`[TaskWorker] Error loading proxies (attempt ${attempts}): ${error.message}. Retrying...`);
|
||||||
|
await this.sleep(POLL_INTERVAL_MS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new Error(`No active proxies available after waiting ${MAX_WAIT_MINUTES} minutes. Add proxies to the database.`);
|
||||||
|
} finally {
|
||||||
|
// Clean up LISTEN connection
|
||||||
|
if (notifyClient) {
|
||||||
|
try {
|
||||||
|
await notifyClient.query('UNLISTEN proxy_added');
|
||||||
|
notifyClient.release();
|
||||||
|
} catch {
|
||||||
|
// Ignore cleanup errors
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user