Merge pull request 'feat(workers): Add proxy reload, staggered tasks, and bulk proxy import' (#58) from feat/proxy-reload-and-bulk-import into master

Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/58
This commit is contained in:
kelly
2025-12-12 09:11:23 +00:00
8 changed files with 742 additions and 7 deletions

View File

@@ -205,6 +205,78 @@ These binaries mimic real browser TLS fingerprints to avoid detection.
---
## Staggered Task Workflow (Added 2025-12-12)
### Overview
When creating many tasks at once (e.g., product refresh for all AZ stores), staggered scheduling prevents resource contention, proxy assignment lag, and API rate limiting.
### How It Works
```
1. Task created with scheduled_for = NOW() + (index * stagger_seconds)
2. Worker claims task only when scheduled_for <= NOW()
3. Worker runs preflight on EVERY task claim (proxy health check)
4. If preflight passes, worker executes task
5. If preflight fails, task released back to pending for another worker
6. Worker finishes task, polls for next available task
7. Repeat - preflight runs on each new task claim
```
### Key Points
- **Preflight is per-task, not per-startup**: Each task claim triggers a new preflight check
- **Stagger prevents thundering herd**: 15 seconds between tasks is default
- **Task assignment is the trigger**: Worker picks up task → runs preflight → executes if passed
### API Endpoints
```bash
# Create staggered tasks for specific dispensary IDs
POST /api/tasks/batch/staggered
{
"dispensary_ids": [1, 2, 3, 4],
"role": "product_refresh", # or "product_discovery"
"stagger_seconds": 15, # default: 15
"platform": "dutchie", # default: "dutchie"
"method": null # "curl" | "http" | null
}
# Create staggered tasks for AZ stores (convenience endpoint)
POST /api/tasks/batch/az-stores
{
"total_tasks": 24, # default: 24
"stagger_seconds": 15, # default: 15
"split_roles": true # default: true (12 refresh, 12 discovery)
}
```
### Example: 24 Tasks for AZ Stores
```bash
curl -X POST http://localhost:3010/api/tasks/batch/az-stores \
-H "Content-Type: application/json" \
-d '{"total_tasks": 24, "stagger_seconds": 15, "split_roles": true}'
```
Response:
```json
{
"success": true,
"total": 24,
"product_refresh": 12,
"product_discovery": 12,
"stagger_seconds": 15,
"total_duration_seconds": 345,
"estimated_completion": "2025-12-12T08:40:00.000Z",
"message": "Created 24 staggered tasks for AZ stores (12 refresh, 12 discovery)"
}
```
### Related Files
| File | Purpose |
|------|---------|
| `src/tasks/task-service.ts` | `createStaggeredTasks()` and `createAZStoreTasks()` methods |
| `src/routes/tasks.ts` | API endpoints for batch task creation |
| `src/tasks/task-worker.ts` | Worker task claiming and preflight logic |
---
## Documentation
| Doc | Purpose |

View File

@@ -0,0 +1,10 @@
-- Migration 086: Add proxy_url column for alternative URL formats
-- Some proxy providers use non-standard URL formats (e.g., host:port:user:pass)
-- This column allows storing the raw URL directly
-- Add proxy_url column - if set, used directly instead of constructing from parts
ALTER TABLE proxies
ADD COLUMN IF NOT EXISTS proxy_url TEXT;
-- Add comment
COMMENT ON COLUMN proxies.proxy_url IS 'Raw proxy URL (if provider uses non-standard format). Takes precedence over constructed URL from host/port/user/pass.';

View File

@@ -976,6 +976,123 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => {
}
});
// ============================================================
// STAGGERED BATCH TASK CREATION
// ============================================================
/**
* POST /api/tasks/batch/staggered
* Create multiple tasks with staggered start times
*
* This endpoint prevents resource contention when creating many tasks by
* staggering their scheduled_for timestamps. Each task becomes eligible
* for claiming only after its scheduled time.
*
* WORKFLOW:
* 1. Tasks created with scheduled_for = NOW() + (index * stagger_seconds)
* 2. Worker claims task only when scheduled_for <= NOW()
* 3. Worker runs preflight on EVERY task claim
* 4. If preflight passes, worker executes task
* 5. If preflight fails, task released back to pending for another worker
*
* Body:
* - dispensary_ids: number[] (required) - Array of dispensary IDs
* - role: TaskRole (required) - 'product_refresh' | 'product_discovery'
* - stagger_seconds: number (default: 15) - Seconds between each task start
* - platform: string (default: 'dutchie')
* - method: 'curl' | 'http' | null (default: null)
*/
router.post('/batch/staggered', async (req: Request, res: Response) => {
try {
const {
dispensary_ids,
role,
stagger_seconds = 15,
platform = 'dutchie',
method = null,
} = req.body;
if (!dispensary_ids || !Array.isArray(dispensary_ids) || dispensary_ids.length === 0) {
return res.status(400).json({ error: 'dispensary_ids array is required' });
}
if (!role) {
return res.status(400).json({ error: 'role is required' });
}
const result = await taskService.createStaggeredTasks(
dispensary_ids,
role as TaskRole,
stagger_seconds,
platform,
method
);
const totalDuration = (dispensary_ids.length - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
created: result.created,
task_ids: result.taskIds,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.created} staggered ${role} tasks (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
});
} catch (error: unknown) {
console.error('Error creating staggered tasks:', error);
res.status(500).json({ error: 'Failed to create staggered tasks' });
}
});
/**
* POST /api/tasks/batch/az-stores
* Convenience endpoint to create staggered tasks for Arizona stores
*
* Body:
* - total_tasks: number (default: 24) - Total tasks to create
* - stagger_seconds: number (default: 15) - Seconds between each task
* - split_roles: boolean (default: true) - Split between product_refresh and product_discovery
*/
router.post('/batch/az-stores', async (req: Request, res: Response) => {
try {
const {
total_tasks = 24,
stagger_seconds = 15,
split_roles = true,
} = req.body;
const result = await taskService.createAZStoreTasks(
total_tasks,
stagger_seconds,
split_roles
);
const totalDuration = (result.total - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
total: result.total,
product_refresh: result.product_refresh,
product_discovery: result.product_discovery,
task_ids: result.taskIds,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.total} staggered tasks for AZ stores (${result.product_refresh} refresh, ${result.product_discovery} discovery)`,
});
} catch (error: unknown) {
console.error('Error creating AZ store tasks:', error);
res.status(500).json({ error: 'Failed to create AZ store tasks' });
}
});
// ============================================================
// TASK POOL MANAGEMENT
// ============================================================
/**
* GET /api/tasks/pool/status
* Check if task pool is paused

View File

@@ -252,12 +252,9 @@ router.post('/deregister', async (req: Request, res: Response) => {
// Release the name back to the pool
await pool.query('SELECT release_worker_name($1)', [worker_id]);
// Mark as terminated
// Delete the worker entry (clean shutdown)
const { rows } = await pool.query(`
UPDATE worker_registry
SET status = 'terminated',
current_task_id = NULL,
updated_at = NOW()
DELETE FROM worker_registry
WHERE worker_id = $1
RETURNING id, friendly_name
`, [worker_id]);

View File

@@ -0,0 +1,263 @@
/**
* Bulk Proxy Import Script
*
* Imports proxies from various formats into the proxies table.
* Supports:
* - Standard format: http://user:pass@host:port
* - Colon format: http://host:port:user:pass
* - Simple format: host:port:user:pass (defaults to http)
*
* Usage:
* npx tsx src/scripts/import-proxies.ts < proxies.txt
* echo "http://host:port:user:pass" | npx tsx src/scripts/import-proxies.ts
* npx tsx src/scripts/import-proxies.ts --file proxies.txt
* npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"
*
* Options:
* --file <path> Read proxies from file (one per line)
* --url <url> Import a single proxy URL
* --max-connections Set max_connections for all imported proxies (default: 1)
* --dry-run Parse and show what would be imported without inserting
*/
import { getPool } from '../db/pool';
import * as fs from 'fs';
import * as readline from 'readline';
interface ParsedProxy {
protocol: string;
host: string;
port: number;
username?: string;
password?: string;
rawUrl: string;
}
/**
* Parse a proxy URL in various formats
*/
function parseProxyUrl(input: string): ParsedProxy | null {
const trimmed = input.trim();
if (!trimmed || trimmed.startsWith('#')) return null;
// Format 1: Standard URL format - http://user:pass@host:port
const standardMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):([^@]+)@([^:]+):(\d+)$/);
if (standardMatch) {
return {
protocol: standardMatch[1],
username: standardMatch[2],
password: standardMatch[3],
host: standardMatch[4],
port: parseInt(standardMatch[5], 10),
rawUrl: trimmed,
};
}
// Format 2: Standard URL without auth - http://host:port
const noAuthMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+)$/);
if (noAuthMatch) {
return {
protocol: noAuthMatch[1],
host: noAuthMatch[2],
port: parseInt(noAuthMatch[3], 10),
rawUrl: trimmed,
};
}
// Format 3: Colon format with protocol - http://host:port:user:pass
const colonWithProtocolMatch = trimmed.match(/^(https?|socks5):\/\/([^:]+):(\d+):([^:]+):(.+)$/);
if (colonWithProtocolMatch) {
return {
protocol: colonWithProtocolMatch[1],
host: colonWithProtocolMatch[2],
port: parseInt(colonWithProtocolMatch[3], 10),
username: colonWithProtocolMatch[4],
password: colonWithProtocolMatch[5],
rawUrl: trimmed, // Keep raw URL for non-standard format
};
}
// Format 4: Colon format without protocol - host:port:user:pass
const colonMatch = trimmed.match(/^([^:]+):(\d+):([^:]+):(.+)$/);
if (colonMatch) {
return {
protocol: 'http',
host: colonMatch[1],
port: parseInt(colonMatch[2], 10),
username: colonMatch[3],
password: colonMatch[4],
rawUrl: `http://${trimmed}`, // Construct raw URL
};
}
// Format 5: Simple host:port
const simpleMatch = trimmed.match(/^([^:]+):(\d+)$/);
if (simpleMatch) {
return {
protocol: 'http',
host: simpleMatch[1],
port: parseInt(simpleMatch[2], 10),
rawUrl: `http://${trimmed}`,
};
}
console.error(`[ImportProxies] Could not parse: ${trimmed}`);
return null;
}
/**
* Check if proxy URL is in non-standard format (needs proxy_url column)
*/
function isNonStandardFormat(rawUrl: string): boolean {
// Colon format: protocol://host:port:user:pass
return /^(https?|socks5):\/\/[^:]+:\d+:[^:]+:.+$/.test(rawUrl);
}
async function importProxies(proxies: ParsedProxy[], maxConnections: number, dryRun: boolean) {
if (dryRun) {
console.log('\n[ImportProxies] DRY RUN - Would import:');
for (const p of proxies) {
const needsRawUrl = isNonStandardFormat(p.rawUrl);
console.log(` ${p.host}:${p.port} (${p.protocol}) user=${p.username || 'none'} needsProxyUrl=${needsRawUrl}`);
}
console.log(`\nTotal: ${proxies.length} proxies`);
return;
}
const pool = getPool();
let inserted = 0;
let skipped = 0;
for (const proxy of proxies) {
try {
// Determine if we need to store the raw URL (non-standard format)
const needsRawUrl = isNonStandardFormat(proxy.rawUrl);
const result = await pool.query(`
INSERT INTO proxies (host, port, protocol, username, password, max_connections, proxy_url, active)
VALUES ($1, $2, $3, $4, $5, $6, $7, true)
ON CONFLICT (host, port, protocol)
DO UPDATE SET
username = EXCLUDED.username,
password = EXCLUDED.password,
max_connections = EXCLUDED.max_connections,
proxy_url = EXCLUDED.proxy_url,
active = true,
updated_at = NOW()
RETURNING id, (xmax = 0) as is_insert
`, [
proxy.host,
proxy.port,
proxy.protocol,
proxy.username || null,
proxy.password || null,
maxConnections,
needsRawUrl ? proxy.rawUrl : null,
]);
const isInsert = result.rows[0]?.is_insert;
if (isInsert) {
inserted++;
console.log(`[ImportProxies] Inserted: ${proxy.host}:${proxy.port}`);
} else {
console.log(`[ImportProxies] Updated: ${proxy.host}:${proxy.port}`);
inserted++; // Count updates too
}
} catch (err: any) {
console.error(`[ImportProxies] Error inserting ${proxy.host}:${proxy.port}: ${err.message}`);
skipped++;
}
}
console.log(`\n[ImportProxies] Complete: ${inserted} imported, ${skipped} skipped`);
// Notify any listening workers
try {
await pool.query(`NOTIFY proxy_added, 'bulk import'`);
console.log('[ImportProxies] Sent proxy_added notification to workers');
} catch {
// Ignore notification errors
}
}
async function readFromStdin(): Promise<string[]> {
return new Promise((resolve) => {
const lines: string[] = [];
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
});
rl.on('line', (line) => {
lines.push(line);
});
rl.on('close', () => {
resolve(lines);
});
});
}
async function main() {
const args = process.argv.slice(2);
let lines: string[] = [];
let maxConnections = 1;
let dryRun = false;
// Parse arguments
for (let i = 0; i < args.length; i++) {
if (args[i] === '--file' && args[i + 1]) {
const content = fs.readFileSync(args[i + 1], 'utf-8');
lines.push(...content.split('\n'));
i++;
} else if (args[i] === '--url' && args[i + 1]) {
lines.push(args[i + 1]);
i++;
} else if (args[i] === '--max-connections' && args[i + 1]) {
maxConnections = parseInt(args[i + 1], 10);
i++;
} else if (args[i] === '--dry-run') {
dryRun = true;
} else if (!args[i].startsWith('--')) {
// Treat as URL directly
lines.push(args[i]);
}
}
// If no lines yet, read from stdin
if (lines.length === 0) {
console.log('[ImportProxies] Reading from stdin...');
lines = await readFromStdin();
}
// Parse all lines
const proxies: ParsedProxy[] = [];
for (const line of lines) {
const parsed = parseProxyUrl(line);
if (parsed) {
proxies.push(parsed);
}
}
if (proxies.length === 0) {
console.error('[ImportProxies] No valid proxies found');
console.error('\nUsage:');
console.error(' npx tsx src/scripts/import-proxies.ts --url "http://host:port:user:pass"');
console.error(' npx tsx src/scripts/import-proxies.ts --file proxies.txt');
console.error(' echo "host:port:user:pass" | npx tsx src/scripts/import-proxies.ts');
console.error('\nSupported formats:');
console.error(' http://user:pass@host:port (standard)');
console.error(' http://host:port:user:pass (colon format)');
console.error(' host:port:user:pass (simple)');
process.exit(1);
}
console.log(`[ImportProxies] Parsed ${proxies.length} proxies (max_connections=${maxConnections})`);
await importProxies(proxies, maxConnections, dryRun);
}
main().catch((err) => {
console.error('[ImportProxies] Fatal error:', err);
process.exit(1);
});

View File

@@ -77,6 +77,11 @@ export interface Proxy {
country?: string;
countryCode?: string;
timezone?: string;
/**
* Raw proxy URL override. If set, used directly instead of constructing from parts.
* Supports non-standard formats like: http://host:port:user:pass
*/
proxyUrl?: string;
}
export interface ProxyStats {
@@ -129,6 +134,10 @@ export class ProxyRotator {
private proxies: Proxy[] = [];
private currentIndex: number = 0;
private lastRotation: Date = new Date();
private lastReloadAt: Date = new Date();
// Proxy reload interval - how often to check for proxy changes (default: 60 seconds)
private reloadIntervalMs: number = 60000;
constructor(pool?: Pool) {
this.pool = pool || null;
@@ -138,6 +147,13 @@ export class ProxyRotator {
this.pool = pool;
}
/**
* Set the reload interval for periodic proxy checks
*/
setReloadInterval(ms: number): void {
this.reloadIntervalMs = ms;
}
/**
* Load proxies from database
*/
@@ -167,22 +183,76 @@ export class ProxyRotator {
state,
country,
country_code as "countryCode",
timezone
timezone,
proxy_url as "proxyUrl"
FROM proxies
WHERE active = true
ORDER BY failure_count ASC, last_tested_at ASC NULLS FIRST
`);
this.proxies = result.rows;
this.lastReloadAt = new Date();
const totalCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections)`);
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections / threads)`);
} catch (error) {
console.warn(`[ProxyRotator] Could not load proxies: ${error}`);
this.proxies = [];
}
}
/**
* Check if proxy list is stale and needs reload
*/
isStale(): boolean {
const elapsed = Date.now() - this.lastReloadAt.getTime();
return elapsed > this.reloadIntervalMs;
}
/**
* Reload proxies if the cache is stale.
* This ensures workers pick up new proxies or see disabled proxies removed.
* Returns true if proxies were reloaded.
*/
async reloadIfStale(): Promise<boolean> {
if (!this.isStale()) {
return false;
}
const oldCount = this.proxies.length;
const oldCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
const oldIds = new Set(this.proxies.map(p => p.id));
await this.loadProxies();
const newCount = this.proxies.length;
const newCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
const newIds = new Set(this.proxies.map(p => p.id));
// Log changes
const added = this.proxies.filter(p => !oldIds.has(p.id));
const removed = [...oldIds].filter(id => !newIds.has(id));
if (added.length > 0 || removed.length > 0 || oldCapacity !== newCapacity) {
console.log(`[ProxyRotator] Reloaded proxies: ${oldCount}${newCount} proxies, ${oldCapacity}${newCapacity} threads`);
if (added.length > 0) {
console.log(`[ProxyRotator] Added: ${added.map(p => `${p.host}:${p.port} (${p.maxConnections} threads)`).join(', ')}`);
}
if (removed.length > 0) {
console.log(`[ProxyRotator] Removed: ${removed.join(', ')}`);
}
}
return true;
}
/**
* Get time since last reload in seconds
*/
getSecondsSinceReload(): number {
return Math.floor((Date.now() - this.lastReloadAt.getTime()) / 1000);
}
/**
* Get next proxy in rotation
*/
@@ -342,8 +412,16 @@ export class ProxyRotator {
/**
* Get proxy URL for HTTP client
* If proxy.proxyUrl is set, uses it directly (supports non-standard formats).
* Otherwise constructs standard format: protocol://user:pass@host:port
*/
getProxyUrl(proxy: Proxy): string {
// Use raw proxyUrl if set (supports non-standard formats like host:port:user:pass)
if (proxy.proxyUrl) {
return proxy.proxyUrl;
}
// Construct standard format
const auth = proxy.username && proxy.password
? `${proxy.username}:${proxy.password}@`
: '';
@@ -584,6 +662,23 @@ export class CrawlRotator {
await this.proxy.loadProxies();
}
/**
* Reload proxy list if stale.
* Workers should call this periodically to pick up proxy changes.
* Returns true if proxies were reloaded.
*/
async reloadIfStale(): Promise<boolean> {
return this.proxy.reloadIfStale();
}
/**
* Set proxy reload interval in milliseconds.
* Default is 60 seconds.
*/
setProxyReloadInterval(ms: number): void {
this.proxy.setReloadInterval(ms);
}
/**
* Rotate proxy only (get new IP)
*/

View File

@@ -641,6 +641,181 @@ class TaskService {
return (result.rows[0] as { completed_at: Date | null })?.completed_at ?? null;
}
/**
* Create multiple tasks with staggered start times.
*
* STAGGERED TASK WORKFLOW:
* =======================
* This prevents resource contention and proxy assignment lag when creating
* many tasks at once. Each task gets a scheduled_for timestamp offset from
* the previous task.
*
* Workflow:
* 1. Task is created with scheduled_for = NOW() + (index * staggerSeconds)
* 2. Worker claims task only when scheduled_for <= NOW()
* 3. Worker runs preflight check on EVERY task claim
* 4. If preflight passes, worker executes task
* 5. If preflight fails, task is released back to pending for another worker
* 6. Worker finishes task, polls for next available task
* 7. Repeat - preflight runs again on next task claim
*
* Benefits:
* - Prevents all 8 workers from hitting proxies simultaneously
* - Reduces API rate limiting / 403 errors
* - Spreads resource usage over time
* - Each task still runs preflight, ensuring proxy health
*
* @param dispensaryIds - Array of dispensary IDs to create tasks for
* @param role - Task role (e.g., 'product_refresh', 'product_discovery')
* @param staggerSeconds - Seconds between each task's scheduled_for time (default: 15)
* @param platform - Platform identifier (default: 'dutchie')
* @param method - Transport method: 'curl' or 'http' (default: null for any)
* @returns Number of tasks created
*/
async createStaggeredTasks(
dispensaryIds: number[],
role: TaskRole,
staggerSeconds: number = 15,
platform: string = 'dutchie',
method: 'curl' | 'http' | null = null
): Promise<{ created: number; taskIds: number[] }> {
if (dispensaryIds.length === 0) {
return { created: 0, taskIds: [] };
}
// Use a single INSERT with generate_series for efficiency
const result = await pool.query(`
WITH task_data AS (
SELECT
unnest($1::int[]) as dispensary_id,
generate_series(0, array_length($1::int[], 1) - 1) as idx
)
INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status)
SELECT
$2::varchar as role,
td.dispensary_id,
$3::varchar as platform,
$4::varchar as method,
NOW() + (td.idx * $5::int * INTERVAL '1 second') as scheduled_for,
'pending' as status
FROM task_data td
ON CONFLICT DO NOTHING
RETURNING id
`, [dispensaryIds, role, platform, method, staggerSeconds]);
const taskIds = result.rows.map((r: { id: number }) => r.id);
console.log(`[TaskService] Created ${taskIds.length} staggered ${role} tasks (${staggerSeconds}s apart)`);
return { created: taskIds.length, taskIds };
}
/**
* Create a batch of AZ store tasks with automatic distribution.
*
* This is a convenience method for creating tasks for Arizona stores with:
* - Automatic staggering to prevent resource contention
* - Even distribution across both refresh and discovery roles
*
* @param totalTasks - Total number of tasks to create
* @param staggerSeconds - Seconds between each task's start time
* @param splitRoles - If true, split between product_refresh and product_discovery
* @returns Summary of created tasks
*/
async createAZStoreTasks(
totalTasks: number = 24,
staggerSeconds: number = 15,
splitRoles: boolean = true
): Promise<{
total: number;
product_refresh: number;
product_discovery: number;
taskIds: number[];
}> {
// Get AZ stores with platform_id and menu_url
const storesResult = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE s.code = 'AZ'
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND d.menu_url IS NOT NULL
ORDER BY d.id
`);
const storeIds = storesResult.rows.map((r: { id: number }) => r.id);
if (storeIds.length === 0) {
console.log('[TaskService] No AZ stores found with platform_id and menu_url');
return { total: 0, product_refresh: 0, product_discovery: 0, taskIds: [] };
}
// Limit tasks to available stores
const maxTasks = Math.min(totalTasks, storeIds.length * 2); // 2x for both roles
const allTaskIds: number[] = [];
if (splitRoles) {
// Split between refresh and discovery
const tasksPerRole = Math.floor(maxTasks / 2);
const refreshStores = storeIds.slice(0, tasksPerRole);
const discoveryStores = storeIds.slice(0, tasksPerRole);
// Create refresh tasks first
const refreshResult = await this.createStaggeredTasks(
refreshStores,
'product_refresh',
staggerSeconds,
'dutchie'
);
allTaskIds.push(...refreshResult.taskIds);
// Create discovery tasks starting after refresh tasks are scheduled
const discoveryStartOffset = tasksPerRole * staggerSeconds;
const discoveryResult = await pool.query(`
WITH task_data AS (
SELECT
unnest($1::int[]) as dispensary_id,
generate_series(0, array_length($1::int[], 1) - 1) as idx
)
INSERT INTO worker_tasks (role, dispensary_id, platform, scheduled_for, status)
SELECT
'product_discovery'::varchar as role,
td.dispensary_id,
'dutchie'::varchar as platform,
NOW() + ($2::int * INTERVAL '1 second') + (td.idx * $3::int * INTERVAL '1 second') as scheduled_for,
'pending' as status
FROM task_data td
ON CONFLICT DO NOTHING
RETURNING id
`, [discoveryStores, discoveryStartOffset, staggerSeconds]);
allTaskIds.push(...discoveryResult.rows.map((r: { id: number }) => r.id));
return {
total: allTaskIds.length,
product_refresh: refreshResult.taskIds.length,
product_discovery: discoveryResult.rowCount ?? 0,
taskIds: allTaskIds
};
}
// Single role mode - all product_discovery
const result = await this.createStaggeredTasks(
storeIds.slice(0, totalTasks),
'product_discovery',
staggerSeconds,
'dutchie'
);
return {
total: result.taskIds.length,
product_refresh: 0,
product_discovery: result.taskIds.length,
taskIds: result.taskIds
};
}
/**
* Calculate workers needed to complete tasks within SLA
*/

View File

@@ -740,6 +740,12 @@ export class TaskWorker {
this.backoffReason = null;
}
// Periodically reload proxies to pick up changes (new proxies, disabled proxies)
// This runs every ~60 seconds (controlled by setProxyReloadInterval)
if (this.stealthInitialized) {
await this.crawlRotator.reloadIfStale();
}
// Check for decommission signal
const shouldDecommission = await this.checkDecommission();
if (shouldDecommission) {