feat(workers): Add proxy reload, staggered tasks, and bulk proxy import
- Periodic proxy reload: Workers now reload proxies every 60s to pick up changes - Staggered task scheduling: New API endpoints for creating tasks with delays - Bulk proxy import: Script supports multiple URL formats including host:port:user:pass - Proxy URL column: Migration 086 adds proxy_url for non-standard formats Key changes: - crawl-rotator.ts: Added reloadIfStale(), isStale(), setReloadInterval() - task-worker.ts: Calls reloadIfStale() in main loop - task-service.ts: Added createStaggeredTasks() and createAZStoreTasks() - tasks.ts: Added POST /batch/staggered and /batch/az-stores endpoints - import-proxies.ts: New script for bulk proxy import - CLAUDE.md: Documented staggered task workflow 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
72
CLAUDE.md
72
CLAUDE.md
@@ -205,6 +205,78 @@ These binaries mimic real browser TLS fingerprints to avoid detection.
|
||||
|
||||
---
|
||||
|
||||
## Staggered Task Workflow (Added 2025-12-12)
|
||||
|
||||
### Overview
|
||||
When creating many tasks at once (e.g., product refresh for all AZ stores), staggered scheduling prevents resource contention, proxy assignment lag, and API rate limiting.
|
||||
|
||||
### How It Works
|
||||
```
|
||||
1. Task created with scheduled_for = NOW() + (index * stagger_seconds)
|
||||
2. Worker claims task only when scheduled_for <= NOW()
|
||||
3. Worker runs preflight on EVERY task claim (proxy health check)
|
||||
4. If preflight passes, worker executes task
|
||||
5. If preflight fails, task released back to pending for another worker
|
||||
6. Worker finishes task, polls for next available task
|
||||
7. Repeat - preflight runs on each new task claim
|
||||
```
|
||||
|
||||
### Key Points
|
||||
- **Preflight is per-task, not per-startup**: Each task claim triggers a new preflight check
|
||||
- **Stagger prevents thundering herd**: 15 seconds between tasks is default
|
||||
- **Task assignment is the trigger**: Worker picks up task → runs preflight → executes if passed
|
||||
|
||||
### API Endpoints
|
||||
```bash
|
||||
# Create staggered tasks for specific dispensary IDs
|
||||
POST /api/tasks/batch/staggered
|
||||
{
|
||||
"dispensary_ids": [1, 2, 3, 4],
|
||||
"role": "product_refresh", # or "product_discovery"
|
||||
"stagger_seconds": 15, # default: 15
|
||||
"platform": "dutchie", # default: "dutchie"
|
||||
"method": null # "curl" | "http" | null
|
||||
}
|
||||
|
||||
# Create staggered tasks for AZ stores (convenience endpoint)
|
||||
POST /api/tasks/batch/az-stores
|
||||
{
|
||||
"total_tasks": 24, # default: 24
|
||||
"stagger_seconds": 15, # default: 15
|
||||
"split_roles": true # default: true (12 refresh, 12 discovery)
|
||||
}
|
||||
```
|
||||
|
||||
### Example: 24 Tasks for AZ Stores
|
||||
```bash
|
||||
curl -X POST http://localhost:3010/api/tasks/batch/az-stores \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"total_tasks": 24, "stagger_seconds": 15, "split_roles": true}'
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"total": 24,
|
||||
"product_refresh": 12,
|
||||
"product_discovery": 12,
|
||||
"stagger_seconds": 15,
|
||||
"total_duration_seconds": 345,
|
||||
"estimated_completion": "2025-12-12T08:40:00.000Z",
|
||||
"message": "Created 24 staggered tasks for AZ stores (12 refresh, 12 discovery)"
|
||||
}
|
||||
```
|
||||
|
||||
### Related Files
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `src/tasks/task-service.ts` | `createStaggeredTasks()` and `createAZStoreTasks()` methods |
|
||||
| `src/routes/tasks.ts` | API endpoints for batch task creation |
|
||||
| `src/tasks/task-worker.ts` | Worker task claiming and preflight logic |
|
||||
|
||||
---
|
||||
|
||||
## Documentation
|
||||
|
||||
| Doc | Purpose |
|
||||
|
||||
10
backend/migrations/086_proxy_url_column.sql
Normal file
10
backend/migrations/086_proxy_url_column.sql
Normal file
@@ -0,0 +1,10 @@
|
||||
-- Migration 086: Add proxy_url column for alternative URL formats
|
||||
-- Some proxy providers use non-standard URL formats (e.g., host:port:user:pass)
|
||||
-- This column allows storing the raw URL directly
|
||||
|
||||
-- Add proxy_url column - if set, used directly instead of constructing from parts
|
||||
ALTER TABLE proxies
|
||||
ADD COLUMN IF NOT EXISTS proxy_url TEXT;
|
||||
|
||||
-- Add comment
|
||||
COMMENT ON COLUMN proxies.proxy_url IS 'Raw proxy URL (if provider uses non-standard format). Takes precedence over constructed URL from host/port/user/pass.';
|
||||
@@ -976,6 +976,123 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => {
|
||||
}
|
||||
});
|
||||
|
||||
// ============================================================
|
||||
// STAGGERED BATCH TASK CREATION
|
||||
// ============================================================
|
||||
|
||||
/**
|
||||
* POST /api/tasks/batch/staggered
|
||||
* Create multiple tasks with staggered start times
|
||||
*
|
||||
* This endpoint prevents resource contention when creating many tasks by
|
||||
* staggering their scheduled_for timestamps. Each task becomes eligible
|
||||
* for claiming only after its scheduled time.
|
||||
*
|
||||
* WORKFLOW:
|
||||
* 1. Tasks created with scheduled_for = NOW() + (index * stagger_seconds)
|
||||
* 2. Worker claims task only when scheduled_for <= NOW()
|
||||
* 3. Worker runs preflight on EVERY task claim
|
||||
* 4. If preflight passes, worker executes task
|
||||
* 5. If preflight fails, task released back to pending for another worker
|
||||
*
|
||||
* Body:
|
||||
* - dispensary_ids: number[] (required) - Array of dispensary IDs
|
||||
* - role: TaskRole (required) - 'product_refresh' | 'product_discovery'
|
||||
* - stagger_seconds: number (default: 15) - Seconds between each task start
|
||||
* - platform: string (default: 'dutchie')
|
||||
* - method: 'curl' | 'http' | null (default: null)
|
||||
*/
|
||||
router.post('/batch/staggered', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const {
|
||||
dispensary_ids,
|
||||
role,
|
||||
stagger_seconds = 15,
|
||||
platform = 'dutchie',
|
||||
method = null,
|
||||
} = req.body;
|
||||
|
||||
if (!dispensary_ids || !Array.isArray(dispensary_ids) || dispensary_ids.length === 0) {
|
||||
return res.status(400).json({ error: 'dispensary_ids array is required' });
|
||||
}
|
||||
|
||||
if (!role) {
|
||||
return res.status(400).json({ error: 'role is required' });
|
||||
}
|
||||
|
||||
const result = await taskService.createStaggeredTasks(
|
||||
dispensary_ids,
|
||||
role as TaskRole,
|
||||
stagger_seconds,
|
||||
platform,
|
||||
method
|
||||
);
|
||||
|
||||
const totalDuration = (dispensary_ids.length - 1) * stagger_seconds;
|
||||
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
|
||||
|
||||
res.status(201).json({
|
||||
success: true,
|
||||
created: result.created,
|
||||
task_ids: result.taskIds,
|
||||
stagger_seconds,
|
||||
total_duration_seconds: totalDuration,
|
||||
estimated_completion: estimatedEndTime.toISOString(),
|
||||
message: `Created ${result.created} staggered ${role} tasks (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error creating staggered tasks:', error);
|
||||
res.status(500).json({ error: 'Failed to create staggered tasks' });
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* POST /api/tasks/batch/az-stores
|
||||
* Convenience endpoint to create staggered tasks for Arizona stores
|
||||
*
|
||||
* Body:
|
||||
* - total_tasks: number (default: 24) - Total tasks to create
|
||||
* - stagger_seconds: number (default: 15) - Seconds between each task
|
||||
* - split_roles: boolean (default: true) - Split between product_refresh and product_discovery
|
||||
*/
|
||||
router.post('/batch/az-stores', async (req: Request, res: Response) => {
|
||||
try {
|
||||
const {
|
||||
total_tasks = 24,
|
||||
stagger_seconds = 15,
|
||||
split_roles = true,
|
||||
} = req.body;
|
||||
|
||||
const result = await taskService.createAZStoreTasks(
|
||||
total_tasks,
|
||||
stagger_seconds,
|
||||
split_roles
|
||||
);
|
||||
|
||||
const totalDuration = (result.total - 1) * stagger_seconds;
|
||||
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
|
||||
|
||||
res.status(201).json({
|
||||
success: true,
|
||||
total: result.total,
|
||||
product_refresh: result.product_refresh,
|
||||
product_discovery: result.product_discovery,
|
||||
task_ids: result.taskIds,
|
||||
stagger_seconds,
|
||||
total_duration_seconds: totalDuration,
|
||||
estimated_completion: estimatedEndTime.toISOString(),
|
||||
message: `Created ${result.total} staggered tasks for AZ stores (${result.product_refresh} refresh, ${result.product_discovery} discovery)`,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
console.error('Error creating AZ store tasks:', error);
|
||||
res.status(500).json({ error: 'Failed to create AZ store tasks' });
|
||||
}
|
||||
});
|
||||
|
||||
// ============================================================
|
||||
// TASK POOL MANAGEMENT
|
||||
// ============================================================
|
||||
|
||||
/**
|
||||
* GET /api/tasks/pool/status
|
||||
* Check if task pool is paused
|
||||
|
||||
@@ -252,12 +252,9 @@ router.post('/deregister', async (req: Request, res: Response) => {
|
||||
// Release the name back to the pool
|
||||
await pool.query('SELECT release_worker_name($1)', [worker_id]);
|
||||
|
||||
// Mark as terminated
|
||||
// Delete the worker entry (clean shutdown)
|
||||
const { rows } = await pool.query(`
|
||||
UPDATE worker_registry
|
||||
SET status = 'terminated',
|
||||
current_task_id = NULL,
|
||||
updated_at = NOW()
|
||||
DELETE FROM worker_registry
|
||||
WHERE worker_id = $1
|
||||
RETURNING id, friendly_name
|
||||
`, [worker_id]);
|
||||
|
||||
263
backend/src/scripts/import-proxies.ts
Normal file
263
backend/src/scripts/import-proxies.ts
Normal file
@@ -0,0 +1,263 @@
|
||||
/**
|
||||
* Bulk Proxy Import Script
|
||||
*
|
||||
* Imports proxies from various formats into the proxies table.
|
||||
* Supports:
|
||||
* - Standard format: http://user:pass@host:port
|
||||
* - Colon format: http://host:port:user:pass
|
||||
* - Simple format: host:port:user:pass (defaults to http)
|
||||
*
|
||||
* Usage:
|
||||
* npx tsx src/scripts/import-proxies.ts < proxies.txt
|
||||
* echo "http://host:port:user:pass" | npx tsx src/scripts/import-proxies.ts
|
||||
* npx tsx src/scripts/import-proxies.ts --file proxies.txt
|
||||
* npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"
|
||||
*
|
||||
* Options:
|
||||
* --file <path> Read proxies from file (one per line)
|
||||
* --url <url> Import a single proxy URL
|
||||
* --max-connections Set max_connections for all imported proxies (default: 1)
|
||||
* --dry-run Parse and show what would be imported without inserting
|
||||
*/
|
||||
|
||||
import { getPool } from '../db/pool';
|
||||
import * as fs from 'fs';
|
||||
import * as readline from 'readline';
|
||||
|
||||
interface ParsedProxy {
|
||||
protocol: string;
|
||||
host: string;
|
||||
port: number;
|
||||
username?: string;
|
||||
password?: string;
|
||||
rawUrl: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a proxy URL in various formats
|
||||
*/
|
||||
function parseProxyUrl(input: string): ParsedProxy | null {
|
||||
const trimmed = input.trim();
|
||||
if (!trimmed || trimmed.startsWith('#')) return null;
|
||||
|
||||
// Format 1: Standard URL format - http://user:pass@host:port
|
||||
const standardMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):([^@]+)@([^:]+):(\d+)$/);
|
||||
if (standardMatch) {
|
||||
return {
|
||||
protocol: standardMatch[1],
|
||||
username: standardMatch[2],
|
||||
password: standardMatch[3],
|
||||
host: standardMatch[4],
|
||||
port: parseInt(standardMatch[5], 10),
|
||||
rawUrl: trimmed,
|
||||
};
|
||||
}
|
||||
|
||||
// Format 2: Standard URL without auth - http://host:port
|
||||
const noAuthMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+)$/);
|
||||
if (noAuthMatch) {
|
||||
return {
|
||||
protocol: noAuthMatch[1],
|
||||
host: noAuthMatch[2],
|
||||
port: parseInt(noAuthMatch[3], 10),
|
||||
rawUrl: trimmed,
|
||||
};
|
||||
}
|
||||
|
||||
// Format 3: Colon format with protocol - http://host:port:user:pass
|
||||
const colonWithProtocolMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+):([^:]+):(.+)$/);
|
||||
if (colonWithProtocolMatch) {
|
||||
return {
|
||||
protocol: colonWithProtocolMatch[1],
|
||||
host: colonWithProtocolMatch[2],
|
||||
port: parseInt(colonWithProtocolMatch[3], 10),
|
||||
username: colonWithProtocolMatch[4],
|
||||
password: colonWithProtocolMatch[5],
|
||||
rawUrl: trimmed, // Keep raw URL for non-standard format
|
||||
};
|
||||
}
|
||||
|
||||
// Format 4: Colon format without protocol - host:port:user:pass
|
||||
const colonMatch = trimmed.match(/^([^:]+):(\d+):([^:]+):(.+)$/);
|
||||
if (colonMatch) {
|
||||
return {
|
||||
protocol: 'http',
|
||||
host: colonMatch[1],
|
||||
port: parseInt(colonMatch[2], 10),
|
||||
username: colonMatch[3],
|
||||
password: colonMatch[4],
|
||||
rawUrl: `http://${trimmed}`, // Construct raw URL
|
||||
};
|
||||
}
|
||||
|
||||
// Format 5: Simple host:port
|
||||
const simpleMatch = trimmed.match(/^([^:]+):(\d+)$/);
|
||||
if (simpleMatch) {
|
||||
return {
|
||||
protocol: 'http',
|
||||
host: simpleMatch[1],
|
||||
port: parseInt(simpleMatch[2], 10),
|
||||
rawUrl: `http://${trimmed}`,
|
||||
};
|
||||
}
|
||||
|
||||
console.error(`[ImportProxies] Could not parse: ${trimmed}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if proxy URL is in non-standard format (needs proxy_url column)
|
||||
*/
|
||||
function isNonStandardFormat(rawUrl: string): boolean {
|
||||
// Colon format: protocol://host:port:user:pass
|
||||
return /^(https?|socks5):\/\/[^:]+:\d+:[^:]+:.+$/.test(rawUrl);
|
||||
}
|
||||
|
||||
async function importProxies(proxies: ParsedProxy[], maxConnections: number, dryRun: boolean) {
|
||||
if (dryRun) {
|
||||
console.log('\n[ImportProxies] DRY RUN - Would import:');
|
||||
for (const p of proxies) {
|
||||
const needsRawUrl = isNonStandardFormat(p.rawUrl);
|
||||
console.log(` ${p.host}:${p.port} (${p.protocol}) user=${p.username || 'none'} needsProxyUrl=${needsRawUrl}`);
|
||||
}
|
||||
console.log(`\nTotal: ${proxies.length} proxies`);
|
||||
return;
|
||||
}
|
||||
|
||||
const pool = getPool();
|
||||
let inserted = 0;
|
||||
let skipped = 0;
|
||||
|
||||
for (const proxy of proxies) {
|
||||
try {
|
||||
// Determine if we need to store the raw URL (non-standard format)
|
||||
const needsRawUrl = isNonStandardFormat(proxy.rawUrl);
|
||||
|
||||
const result = await pool.query(`
|
||||
INSERT INTO proxies (host, port, protocol, username, password, max_connections, proxy_url, active)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, true)
|
||||
ON CONFLICT (host, port, protocol)
|
||||
DO UPDATE SET
|
||||
username = EXCLUDED.username,
|
||||
password = EXCLUDED.password,
|
||||
max_connections = EXCLUDED.max_connections,
|
||||
proxy_url = EXCLUDED.proxy_url,
|
||||
active = true,
|
||||
updated_at = NOW()
|
||||
RETURNING id, (xmax = 0) as is_insert
|
||||
`, [
|
||||
proxy.host,
|
||||
proxy.port,
|
||||
proxy.protocol,
|
||||
proxy.username || null,
|
||||
proxy.password || null,
|
||||
maxConnections,
|
||||
needsRawUrl ? proxy.rawUrl : null,
|
||||
]);
|
||||
|
||||
const isInsert = result.rows[0]?.is_insert;
|
||||
if (isInsert) {
|
||||
inserted++;
|
||||
console.log(`[ImportProxies] Inserted: ${proxy.host}:${proxy.port}`);
|
||||
} else {
|
||||
console.log(`[ImportProxies] Updated: ${proxy.host}:${proxy.port}`);
|
||||
inserted++; // Count updates too
|
||||
}
|
||||
} catch (err: any) {
|
||||
console.error(`[ImportProxies] Error inserting ${proxy.host}:${proxy.port}: ${err.message}`);
|
||||
skipped++;
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`\n[ImportProxies] Complete: ${inserted} imported, ${skipped} skipped`);
|
||||
|
||||
// Notify any listening workers
|
||||
try {
|
||||
await pool.query(`NOTIFY proxy_added, 'bulk import'`);
|
||||
console.log('[ImportProxies] Sent proxy_added notification to workers');
|
||||
} catch {
|
||||
// Ignore notification errors
|
||||
}
|
||||
}
|
||||
|
||||
async function readFromStdin(): Promise<string[]> {
|
||||
return new Promise((resolve) => {
|
||||
const lines: string[] = [];
|
||||
const rl = readline.createInterface({
|
||||
input: process.stdin,
|
||||
output: process.stdout,
|
||||
terminal: false,
|
||||
});
|
||||
|
||||
rl.on('line', (line) => {
|
||||
lines.push(line);
|
||||
});
|
||||
|
||||
rl.on('close', () => {
|
||||
resolve(lines);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function main() {
|
||||
const args = process.argv.slice(2);
|
||||
let lines: string[] = [];
|
||||
let maxConnections = 1;
|
||||
let dryRun = false;
|
||||
|
||||
// Parse arguments
|
||||
for (let i = 0; i < args.length; i++) {
|
||||
if (args[i] === '--file' && args[i + 1]) {
|
||||
const content = fs.readFileSync(args[i + 1], 'utf-8');
|
||||
lines.push(...content.split('\n'));
|
||||
i++;
|
||||
} else if (args[i] === '--url' && args[i + 1]) {
|
||||
lines.push(args[i + 1]);
|
||||
i++;
|
||||
} else if (args[i] === '--max-connections' && args[i + 1]) {
|
||||
maxConnections = parseInt(args[i + 1], 10);
|
||||
i++;
|
||||
} else if (args[i] === '--dry-run') {
|
||||
dryRun = true;
|
||||
} else if (!args[i].startsWith('--')) {
|
||||
// Treat as URL directly
|
||||
lines.push(args[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// If no lines yet, read from stdin
|
||||
if (lines.length === 0) {
|
||||
console.log('[ImportProxies] Reading from stdin...');
|
||||
lines = await readFromStdin();
|
||||
}
|
||||
|
||||
// Parse all lines
|
||||
const proxies: ParsedProxy[] = [];
|
||||
for (const line of lines) {
|
||||
const parsed = parseProxyUrl(line);
|
||||
if (parsed) {
|
||||
proxies.push(parsed);
|
||||
}
|
||||
}
|
||||
|
||||
if (proxies.length === 0) {
|
||||
console.error('[ImportProxies] No valid proxies found');
|
||||
console.error('\nUsage:');
|
||||
console.error(' npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"');
|
||||
console.error(' npx tsx src/scripts/import-proxies.ts --file proxies.txt');
|
||||
console.error(' echo "host:port:user:pass" | npx tsx src/scripts/import-proxies.ts');
|
||||
console.error('\nSupported formats:');
|
||||
console.error(' http://user:pass@host:port (standard)');
|
||||
console.error(' http://host:port:user:pass (colon format)');
|
||||
console.error(' host:port:user:pass (simple)');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
console.log(`[ImportProxies] Parsed ${proxies.length} proxies (max_connections=${maxConnections})`);
|
||||
await importProxies(proxies, maxConnections, dryRun);
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error('[ImportProxies] Fatal error:', err);
|
||||
process.exit(1);
|
||||
});
|
||||
@@ -77,6 +77,11 @@ export interface Proxy {
|
||||
country?: string;
|
||||
countryCode?: string;
|
||||
timezone?: string;
|
||||
/**
|
||||
* Raw proxy URL override. If set, used directly instead of constructing from parts.
|
||||
* Supports non-standard formats like: http://host:port:user:pass
|
||||
*/
|
||||
proxyUrl?: string;
|
||||
}
|
||||
|
||||
export interface ProxyStats {
|
||||
@@ -129,6 +134,10 @@ export class ProxyRotator {
|
||||
private proxies: Proxy[] = [];
|
||||
private currentIndex: number = 0;
|
||||
private lastRotation: Date = new Date();
|
||||
private lastReloadAt: Date = new Date();
|
||||
|
||||
// Proxy reload interval - how often to check for proxy changes (default: 60 seconds)
|
||||
private reloadIntervalMs: number = 60000;
|
||||
|
||||
constructor(pool?: Pool) {
|
||||
this.pool = pool || null;
|
||||
@@ -138,6 +147,13 @@ export class ProxyRotator {
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the reload interval for periodic proxy checks
|
||||
*/
|
||||
setReloadInterval(ms: number): void {
|
||||
this.reloadIntervalMs = ms;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load proxies from database
|
||||
*/
|
||||
@@ -167,22 +183,76 @@ export class ProxyRotator {
|
||||
state,
|
||||
country,
|
||||
country_code as "countryCode",
|
||||
timezone
|
||||
timezone,
|
||||
proxy_url as "proxyUrl"
|
||||
FROM proxies
|
||||
WHERE active = true
|
||||
ORDER BY failure_count ASC, last_tested_at ASC NULLS FIRST
|
||||
`);
|
||||
|
||||
this.proxies = result.rows;
|
||||
this.lastReloadAt = new Date();
|
||||
|
||||
const totalCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
|
||||
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections)`);
|
||||
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections / threads)`);
|
||||
} catch (error) {
|
||||
console.warn(`[ProxyRotator] Could not load proxies: ${error}`);
|
||||
this.proxies = [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if proxy list is stale and needs reload
|
||||
*/
|
||||
isStale(): boolean {
|
||||
const elapsed = Date.now() - this.lastReloadAt.getTime();
|
||||
return elapsed > this.reloadIntervalMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reload proxies if the cache is stale.
|
||||
* This ensures workers pick up new proxies or see disabled proxies removed.
|
||||
* Returns true if proxies were reloaded.
|
||||
*/
|
||||
async reloadIfStale(): Promise<boolean> {
|
||||
if (!this.isStale()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const oldCount = this.proxies.length;
|
||||
const oldCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
|
||||
const oldIds = new Set(this.proxies.map(p => p.id));
|
||||
|
||||
await this.loadProxies();
|
||||
|
||||
const newCount = this.proxies.length;
|
||||
const newCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
|
||||
const newIds = new Set(this.proxies.map(p => p.id));
|
||||
|
||||
// Log changes
|
||||
const added = this.proxies.filter(p => !oldIds.has(p.id));
|
||||
const removed = [...oldIds].filter(id => !newIds.has(id));
|
||||
|
||||
if (added.length > 0 || removed.length > 0 || oldCapacity !== newCapacity) {
|
||||
console.log(`[ProxyRotator] Reloaded proxies: ${oldCount}→${newCount} proxies, ${oldCapacity}→${newCapacity} threads`);
|
||||
if (added.length > 0) {
|
||||
console.log(`[ProxyRotator] Added: ${added.map(p => `${p.host}:${p.port} (${p.maxConnections} threads)`).join(', ')}`);
|
||||
}
|
||||
if (removed.length > 0) {
|
||||
console.log(`[ProxyRotator] Removed: ${removed.join(', ')}`);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get time since last reload in seconds
|
||||
*/
|
||||
getSecondsSinceReload(): number {
|
||||
return Math.floor((Date.now() - this.lastReloadAt.getTime()) / 1000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get next proxy in rotation
|
||||
*/
|
||||
@@ -342,8 +412,16 @@ export class ProxyRotator {
|
||||
|
||||
/**
|
||||
* Get proxy URL for HTTP client
|
||||
* If proxy.proxyUrl is set, uses it directly (supports non-standard formats).
|
||||
* Otherwise constructs standard format: protocol://user:pass@host:port
|
||||
*/
|
||||
getProxyUrl(proxy: Proxy): string {
|
||||
// Use raw proxyUrl if set (supports non-standard formats like host:port:user:pass)
|
||||
if (proxy.proxyUrl) {
|
||||
return proxy.proxyUrl;
|
||||
}
|
||||
|
||||
// Construct standard format
|
||||
const auth = proxy.username && proxy.password
|
||||
? `${proxy.username}:${proxy.password}@`
|
||||
: '';
|
||||
@@ -584,6 +662,23 @@ export class CrawlRotator {
|
||||
await this.proxy.loadProxies();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reload proxy list if stale.
|
||||
* Workers should call this periodically to pick up proxy changes.
|
||||
* Returns true if proxies were reloaded.
|
||||
*/
|
||||
async reloadIfStale(): Promise<boolean> {
|
||||
return this.proxy.reloadIfStale();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set proxy reload interval in milliseconds.
|
||||
* Default is 60 seconds.
|
||||
*/
|
||||
setProxyReloadInterval(ms: number): void {
|
||||
this.proxy.setReloadInterval(ms);
|
||||
}
|
||||
|
||||
/**
|
||||
* Rotate proxy only (get new IP)
|
||||
*/
|
||||
|
||||
@@ -641,6 +641,181 @@ class TaskService {
|
||||
return (result.rows[0] as { completed_at: Date | null })?.completed_at ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create multiple tasks with staggered start times.
|
||||
*
|
||||
* STAGGERED TASK WORKFLOW:
|
||||
* =======================
|
||||
* This prevents resource contention and proxy assignment lag when creating
|
||||
* many tasks at once. Each task gets a scheduled_for timestamp offset from
|
||||
* the previous task.
|
||||
*
|
||||
* Workflow:
|
||||
* 1. Task is created with scheduled_for = NOW() + (index * staggerSeconds)
|
||||
* 2. Worker claims task only when scheduled_for <= NOW()
|
||||
* 3. Worker runs preflight check on EVERY task claim
|
||||
* 4. If preflight passes, worker executes task
|
||||
* 5. If preflight fails, task is released back to pending for another worker
|
||||
* 6. Worker finishes task, polls for next available task
|
||||
* 7. Repeat - preflight runs again on next task claim
|
||||
*
|
||||
* Benefits:
|
||||
* - Prevents all 8 workers from hitting proxies simultaneously
|
||||
* - Reduces API rate limiting / 403 errors
|
||||
* - Spreads resource usage over time
|
||||
* - Each task still runs preflight, ensuring proxy health
|
||||
*
|
||||
* @param dispensaryIds - Array of dispensary IDs to create tasks for
|
||||
* @param role - Task role (e.g., 'product_refresh', 'product_discovery')
|
||||
* @param staggerSeconds - Seconds between each task's scheduled_for time (default: 15)
|
||||
* @param platform - Platform identifier (default: 'dutchie')
|
||||
* @param method - Transport method: 'curl' or 'http' (default: null for any)
|
||||
* @returns Number of tasks created
|
||||
*/
|
||||
async createStaggeredTasks(
|
||||
dispensaryIds: number[],
|
||||
role: TaskRole,
|
||||
staggerSeconds: number = 15,
|
||||
platform: string = 'dutchie',
|
||||
method: 'curl' | 'http' | null = null
|
||||
): Promise<{ created: number; taskIds: number[] }> {
|
||||
if (dispensaryIds.length === 0) {
|
||||
return { created: 0, taskIds: [] };
|
||||
}
|
||||
|
||||
// Use a single INSERT with generate_series for efficiency
|
||||
const result = await pool.query(`
|
||||
WITH task_data AS (
|
||||
SELECT
|
||||
unnest($1::int[]) as dispensary_id,
|
||||
generate_series(0, array_length($1::int[], 1) - 1) as idx
|
||||
)
|
||||
INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status)
|
||||
SELECT
|
||||
$2::varchar as role,
|
||||
td.dispensary_id,
|
||||
$3::varchar as platform,
|
||||
$4::varchar as method,
|
||||
NOW() + (td.idx * $5::int * INTERVAL '1 second') as scheduled_for,
|
||||
'pending' as status
|
||||
FROM task_data td
|
||||
ON CONFLICT DO NOTHING
|
||||
RETURNING id
|
||||
`, [dispensaryIds, role, platform, method, staggerSeconds]);
|
||||
|
||||
const taskIds = result.rows.map((r: { id: number }) => r.id);
|
||||
|
||||
console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks (${staggerSeconds}s apart)`);
|
||||
|
||||
return { created: taskIds.length, taskIds };
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a batch of AZ store tasks with automatic distribution.
|
||||
*
|
||||
* This is a convenience method for creating tasks for Arizona stores with:
|
||||
* - Automatic staggering to prevent resource contention
|
||||
* - Even distribution across both refresh and discovery roles
|
||||
*
|
||||
* @param totalTasks - Total number of tasks to create
|
||||
* @param staggerSeconds - Seconds between each task's start time
|
||||
* @param splitRoles - If true, split between product_refresh and product_discovery
|
||||
* @returns Summary of created tasks
|
||||
*/
|
||||
async createAZStoreTasks(
|
||||
totalTasks: number = 24,
|
||||
staggerSeconds: number = 15,
|
||||
splitRoles: boolean = true
|
||||
): Promise<{
|
||||
total: number;
|
||||
product_refresh: number;
|
||||
product_discovery: number;
|
||||
taskIds: number[];
|
||||
}> {
|
||||
// Get AZ stores with platform_id and menu_url
|
||||
const storesResult = await pool.query(`
|
||||
SELECT d.id
|
||||
FROM dispensaries d
|
||||
JOIN states s ON d.state_id = s.id
|
||||
WHERE s.code = 'AZ'
|
||||
AND d.crawl_enabled = true
|
||||
AND d.platform_dispensary_id IS NOT NULL
|
||||
AND d.menu_url IS NOT NULL
|
||||
ORDER BY d.id
|
||||
`);
|
||||
|
||||
const storeIds = storesResult.rows.map((r: { id: number }) => r.id);
|
||||
|
||||
if (storeIds.length === 0) {
|
||||
console.log('[TaskService] No AZ stores found with platform_id and menu_url');
|
||||
return { total: 0, product_refresh: 0, product_discovery: 0, taskIds: [] };
|
||||
}
|
||||
|
||||
// Limit tasks to available stores
|
||||
const maxTasks = Math.min(totalTasks, storeIds.length * 2); // 2x for both roles
|
||||
const allTaskIds: number[] = [];
|
||||
|
||||
if (splitRoles) {
|
||||
// Split between refresh and discovery
|
||||
const tasksPerRole = Math.floor(maxTasks / 2);
|
||||
const refreshStores = storeIds.slice(0, tasksPerRole);
|
||||
const discoveryStores = storeIds.slice(0, tasksPerRole);
|
||||
|
||||
// Create refresh tasks first
|
||||
const refreshResult = await this.createStaggeredTasks(
|
||||
refreshStores,
|
||||
'product_refresh',
|
||||
staggerSeconds,
|
||||
'dutchie'
|
||||
);
|
||||
allTaskIds.push(...refreshResult.taskIds);
|
||||
|
||||
// Create discovery tasks starting after refresh tasks are scheduled
|
||||
const discoveryStartOffset = tasksPerRole * staggerSeconds;
|
||||
const discoveryResult = await pool.query(`
|
||||
WITH task_data AS (
|
||||
SELECT
|
||||
unnest($1::int[]) as dispensary_id,
|
||||
generate_series(0, array_length($1::int[], 1) - 1) as idx
|
||||
)
|
||||
INSERT INTO worker_tasks (role, dispensary_id, platform, scheduled_for, status)
|
||||
SELECT
|
||||
'product_discovery'::varchar as role,
|
||||
td.dispensary_id,
|
||||
'dutchie'::varchar as platform,
|
||||
NOW() + ($2::int * INTERVAL '1 second') + (td.idx * $3::int * INTERVAL '1 second') as scheduled_for,
|
||||
'pending' as status
|
||||
FROM task_data td
|
||||
ON CONFLICT DO NOTHING
|
||||
RETURNING id
|
||||
`, [discoveryStores, discoveryStartOffset, staggerSeconds]);
|
||||
|
||||
allTaskIds.push(...discoveryResult.rows.map((r: { id: number }) => r.id));
|
||||
|
||||
return {
|
||||
total: allTaskIds.length,
|
||||
product_refresh: refreshResult.taskIds.length,
|
||||
product_discovery: discoveryResult.rowCount ?? 0,
|
||||
taskIds: allTaskIds
|
||||
};
|
||||
}
|
||||
|
||||
// Single role mode - all product_discovery
|
||||
const result = await this.createStaggeredTasks(
|
||||
storeIds.slice(0, totalTasks),
|
||||
'product_discovery',
|
||||
staggerSeconds,
|
||||
'dutchie'
|
||||
);
|
||||
|
||||
return {
|
||||
total: result.taskIds.length,
|
||||
product_refresh: 0,
|
||||
product_discovery: result.taskIds.length,
|
||||
taskIds: result.taskIds
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate workers needed to complete tasks within SLA
|
||||
*/
|
||||
|
||||
@@ -740,6 +740,12 @@ export class TaskWorker {
|
||||
this.backoffReason = null;
|
||||
}
|
||||
|
||||
// Periodically reload proxies to pick up changes (new proxies, disabled proxies)
|
||||
// This runs every ~60 seconds (controlled by setProxyReloadInterval)
|
||||
if (this.stealthInitialized) {
|
||||
await this.crawlRotator.reloadIfStale();
|
||||
}
|
||||
|
||||
// Check for decommission signal
|
||||
const shouldDecommission = await this.checkDecommission();
|
||||
if (shouldDecommission) {
|
||||
|
||||
Reference in New Issue
Block a user