feat: Auto-migrations on startup, worker exit location, proxy improvements
- Add auto-migration system that runs SQL files from migrations/ on server startup - Track applied migrations in schema_migrations table - Show proxy exit location in Workers dashboard - Add "Cleanup Stale" button to remove old workers - Add remove button for individual workers - Include proxy location (city, state, country) in worker heartbeats - Update Proxy interface with location fields - Re-enable bulk proxy import without ON CONFLICT 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
141
backend/src/db/auto-migrate.ts
Normal file
141
backend/src/db/auto-migrate.ts
Normal file
@@ -0,0 +1,141 @@
|
||||
/**
|
||||
* Auto-Migration System
|
||||
*
|
||||
* Runs SQL migration files from the migrations/ folder automatically on server startup.
|
||||
* Uses a schema_migrations table to track which migrations have been applied.
|
||||
*
|
||||
* Safe to run multiple times - only applies new migrations.
|
||||
*/
|
||||
|
||||
import { Pool } from 'pg';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
const MIGRATIONS_DIR = path.join(__dirname, '../../migrations');
|
||||
|
||||
/**
|
||||
* Ensure schema_migrations table exists
|
||||
*/
|
||||
async function ensureMigrationsTable(pool: Pool): Promise<void> {
|
||||
await pool.query(`
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
id SERIAL PRIMARY KEY,
|
||||
name VARCHAR(255) UNIQUE NOT NULL,
|
||||
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
||||
)
|
||||
`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get list of already-applied migrations
|
||||
*/
|
||||
async function getAppliedMigrations(pool: Pool): Promise<Set<string>> {
|
||||
const result = await pool.query('SELECT name FROM schema_migrations');
|
||||
return new Set(result.rows.map(row => row.name));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get list of migration files from disk
|
||||
*/
|
||||
function getMigrationFiles(): string[] {
|
||||
if (!fs.existsSync(MIGRATIONS_DIR)) {
|
||||
console.log('[AutoMigrate] No migrations directory found');
|
||||
return [];
|
||||
}
|
||||
|
||||
return fs.readdirSync(MIGRATIONS_DIR)
|
||||
.filter(f => f.endsWith('.sql'))
|
||||
.sort(); // Sort alphabetically (001_, 002_, etc.)
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a single migration file
|
||||
*/
|
||||
async function runMigration(pool: Pool, filename: string): Promise<void> {
|
||||
const filepath = path.join(MIGRATIONS_DIR, filename);
|
||||
const sql = fs.readFileSync(filepath, 'utf8');
|
||||
|
||||
const client = await pool.connect();
|
||||
try {
|
||||
await client.query('BEGIN');
|
||||
|
||||
// Run the migration SQL
|
||||
await client.query(sql);
|
||||
|
||||
// Record that this migration was applied
|
||||
await client.query(
|
||||
'INSERT INTO schema_migrations (name) VALUES ($1) ON CONFLICT (name) DO NOTHING',
|
||||
[filename]
|
||||
);
|
||||
|
||||
await client.query('COMMIT');
|
||||
console.log(`[AutoMigrate] ✓ Applied: ${filename}`);
|
||||
} catch (error: any) {
|
||||
await client.query('ROLLBACK');
|
||||
console.error(`[AutoMigrate] ✗ Failed: ${filename}`);
|
||||
throw error;
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run all pending migrations
|
||||
*
|
||||
* @param pool - Database connection pool
|
||||
* @returns Number of migrations applied
|
||||
*/
|
||||
export async function runAutoMigrations(pool: Pool): Promise<number> {
|
||||
console.log('[AutoMigrate] Checking for pending migrations...');
|
||||
|
||||
try {
|
||||
// Ensure migrations table exists
|
||||
await ensureMigrationsTable(pool);
|
||||
|
||||
// Get applied and available migrations
|
||||
const applied = await getAppliedMigrations(pool);
|
||||
const available = getMigrationFiles();
|
||||
|
||||
// Find pending migrations
|
||||
const pending = available.filter(f => !applied.has(f));
|
||||
|
||||
if (pending.length === 0) {
|
||||
console.log('[AutoMigrate] No pending migrations');
|
||||
return 0;
|
||||
}
|
||||
|
||||
console.log(`[AutoMigrate] Found ${pending.length} pending migrations`);
|
||||
|
||||
// Run each pending migration in order
|
||||
for (const filename of pending) {
|
||||
await runMigration(pool, filename);
|
||||
}
|
||||
|
||||
console.log(`[AutoMigrate] Successfully applied ${pending.length} migrations`);
|
||||
return pending.length;
|
||||
|
||||
} catch (error: any) {
|
||||
console.error('[AutoMigrate] Migration failed:', error.message);
|
||||
// Don't crash the server - log and continue
|
||||
// The specific failing migration will have been rolled back
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check migration status without running anything
|
||||
*/
|
||||
export async function checkMigrationStatus(pool: Pool): Promise<{
|
||||
applied: string[];
|
||||
pending: string[];
|
||||
}> {
|
||||
await ensureMigrationsTable(pool);
|
||||
|
||||
const applied = await getAppliedMigrations(pool);
|
||||
const available = getMigrationFiles();
|
||||
|
||||
return {
|
||||
applied: available.filter(f => applied.has(f)),
|
||||
pending: available.filter(f => !applied.has(f)),
|
||||
};
|
||||
}
|
||||
@@ -6,6 +6,8 @@ import { initializeMinio, isMinioEnabled } from './utils/minio';
|
||||
import { initializeImageStorage } from './utils/image-storage';
|
||||
import { logger } from './services/logger';
|
||||
import { cleanupOrphanedJobs } from './services/proxyTestQueue';
|
||||
import { runAutoMigrations } from './db/auto-migrate';
|
||||
import { getPool } from './db/pool';
|
||||
import healthRoutes from './routes/health';
|
||||
import imageProxyRoutes from './routes/image-proxy';
|
||||
|
||||
@@ -307,6 +309,17 @@ async function startServer() {
|
||||
try {
|
||||
logger.info('system', 'Starting server...');
|
||||
|
||||
// Run auto-migrations before anything else
|
||||
const pool = getPool();
|
||||
const migrationsApplied = await runAutoMigrations(pool);
|
||||
if (migrationsApplied > 0) {
|
||||
logger.info('system', `Applied ${migrationsApplied} database migrations`);
|
||||
} else if (migrationsApplied === 0) {
|
||||
logger.info('system', 'Database schema up to date');
|
||||
} else {
|
||||
logger.warn('system', 'Some migrations failed - check logs');
|
||||
}
|
||||
|
||||
await initializeMinio();
|
||||
await initializeImageStorage();
|
||||
logger.info('system', isMinioEnabled() ? 'MinIO storage initialized' : 'Local filesystem storage initialized');
|
||||
|
||||
@@ -2,7 +2,7 @@ import { Router } from 'express';
|
||||
import { authMiddleware, requireRole } from '../auth/middleware';
|
||||
import { pool } from '../db/pool';
|
||||
import { testProxy, addProxy, addProxiesFromList } from '../services/proxy';
|
||||
import { createProxyTestJob, getProxyTestJob, getActiveProxyTestJob, cancelProxyTestJob } from '../services/proxyTestQueue';
|
||||
import { createProxyTestJob, getProxyTestJob, getActiveProxyTestJob, cancelProxyTestJob, ProxyTestMode } from '../services/proxyTestQueue';
|
||||
|
||||
const router = Router();
|
||||
router.use(authMiddleware);
|
||||
@@ -11,9 +11,10 @@ router.use(authMiddleware);
|
||||
router.get('/', async (req, res) => {
|
||||
try {
|
||||
const result = await pool.query(`
|
||||
SELECT id, host, port, protocol, active, is_anonymous,
|
||||
SELECT id, host, port, protocol, username, password, active, is_anonymous,
|
||||
last_tested_at, test_result, response_time_ms, created_at,
|
||||
city, state, country, country_code, location_updated_at
|
||||
city, state, country, country_code, location_updated_at,
|
||||
COALESCE(max_connections, 1) as max_connections
|
||||
FROM proxies
|
||||
ORDER BY created_at DESC
|
||||
`);
|
||||
@@ -166,13 +167,39 @@ router.post('/:id/test', requireRole('superadmin', 'admin'), async (req, res) =>
|
||||
});
|
||||
|
||||
// Start proxy test job
|
||||
// Query params: mode=all|failed|inactive, concurrency=10
|
||||
router.post('/test-all', requireRole('superadmin', 'admin'), async (req, res) => {
|
||||
try {
|
||||
const jobId = await createProxyTestJob();
|
||||
res.json({ jobId, message: 'Proxy test job started' });
|
||||
} catch (error) {
|
||||
const mode = (req.query.mode as ProxyTestMode) || 'all';
|
||||
const concurrency = parseInt(req.query.concurrency as string) || 10;
|
||||
|
||||
// Validate mode
|
||||
if (!['all', 'failed', 'inactive'].includes(mode)) {
|
||||
return res.status(400).json({ error: 'Invalid mode. Use: all, failed, or inactive' });
|
||||
}
|
||||
|
||||
// Validate concurrency (1-50)
|
||||
if (concurrency < 1 || concurrency > 50) {
|
||||
return res.status(400).json({ error: 'Concurrency must be between 1 and 50' });
|
||||
}
|
||||
|
||||
const jobId = await createProxyTestJob(mode, concurrency);
|
||||
res.json({ jobId, mode, concurrency, message: `Proxy test job started (mode: ${mode}, concurrency: ${concurrency})` });
|
||||
} catch (error: any) {
|
||||
console.error('Error starting proxy test job:', error);
|
||||
res.status(500).json({ error: 'Failed to start proxy test job' });
|
||||
res.status(500).json({ error: error.message || 'Failed to start proxy test job' });
|
||||
}
|
||||
});
|
||||
|
||||
// Convenience endpoint: Test only failed proxies
|
||||
router.post('/test-failed', requireRole('superadmin', 'admin'), async (req, res) => {
|
||||
try {
|
||||
const concurrency = parseInt(req.query.concurrency as string) || 10;
|
||||
const jobId = await createProxyTestJob('failed', concurrency);
|
||||
res.json({ jobId, mode: 'failed', concurrency, message: 'Retesting failed proxies...' });
|
||||
} catch (error: any) {
|
||||
console.error('Error starting failed proxy test:', error);
|
||||
res.status(500).json({ error: error.message || 'Failed to start proxy test job' });
|
||||
}
|
||||
});
|
||||
|
||||
@@ -197,8 +224,8 @@ router.post('/test-job/:jobId/cancel', requireRole('superadmin', 'admin'), async
|
||||
router.put('/:id', requireRole('superadmin', 'admin'), async (req, res) => {
|
||||
try {
|
||||
const { id } = req.params;
|
||||
const { host, port, protocol, username, password, active } = req.body;
|
||||
|
||||
const { host, port, protocol, username, password, active, max_connections } = req.body;
|
||||
|
||||
const result = await pool.query(`
|
||||
UPDATE proxies
|
||||
SET host = COALESCE($1, host),
|
||||
@@ -207,10 +234,11 @@ router.put('/:id', requireRole('superadmin', 'admin'), async (req, res) => {
|
||||
username = COALESCE($4, username),
|
||||
password = COALESCE($5, password),
|
||||
active = COALESCE($6, active),
|
||||
max_connections = COALESCE($7, max_connections),
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = $7
|
||||
WHERE id = $8
|
||||
RETURNING *
|
||||
`, [host, port, protocol, username, password, active, id]);
|
||||
`, [host, port, protocol, username, password, active, max_connections, id]);
|
||||
|
||||
if (result.rows.length === 0) {
|
||||
return res.status(404).json({ error: 'Proxy not found' });
|
||||
|
||||
@@ -61,6 +61,13 @@ export interface Proxy {
|
||||
failureCount: number;
|
||||
successCount: number;
|
||||
avgResponseTimeMs: number | null;
|
||||
maxConnections: number; // Number of concurrent connections allowed (for rotating proxies)
|
||||
// Location info (if known)
|
||||
city?: string;
|
||||
state?: string;
|
||||
country?: string;
|
||||
countryCode?: string;
|
||||
timezone?: string;
|
||||
}
|
||||
|
||||
export interface ProxyStats {
|
||||
@@ -113,14 +120,23 @@ export class ProxyRotator {
|
||||
last_tested_at as "lastUsedAt",
|
||||
failure_count as "failureCount",
|
||||
0 as "successCount",
|
||||
response_time_ms as "avgResponseTimeMs"
|
||||
response_time_ms as "avgResponseTimeMs",
|
||||
COALESCE(max_connections, 1) as "maxConnections",
|
||||
city,
|
||||
state,
|
||||
country,
|
||||
country_code as "countryCode",
|
||||
timezone
|
||||
FROM proxies
|
||||
WHERE active = true
|
||||
ORDER BY failure_count ASC, last_tested_at ASC NULLS FIRST
|
||||
`);
|
||||
|
||||
this.proxies = result.rows;
|
||||
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies`);
|
||||
|
||||
// Calculate total concurrent capacity
|
||||
const totalCapacity = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0);
|
||||
console.log(`[ProxyRotator] Loaded ${this.proxies.length} active proxies (${totalCapacity} max concurrent connections)`);
|
||||
} catch (error) {
|
||||
// Table might not exist - that's okay
|
||||
console.warn(`[ProxyRotator] Could not load proxies: ${error}`);
|
||||
@@ -256,7 +272,7 @@ export class ProxyRotator {
|
||||
*/
|
||||
getStats(): ProxyStats {
|
||||
const totalProxies = this.proxies.length;
|
||||
const activeProxies = this.proxies.filter(p => p.isActive).length;
|
||||
const activeProxies = this.proxies.reduce((sum, p) => sum + p.maxConnections, 0); // Total concurrent capacity
|
||||
const blockedProxies = this.proxies.filter(p => p.failureCount >= 5).length;
|
||||
|
||||
const successRates = this.proxies
|
||||
@@ -269,7 +285,7 @@ export class ProxyRotator {
|
||||
|
||||
return {
|
||||
totalProxies,
|
||||
activeProxies,
|
||||
activeProxies, // Total concurrent capacity across all proxies
|
||||
blockedProxies,
|
||||
avgSuccessRate,
|
||||
};
|
||||
@@ -403,6 +419,26 @@ export class CrawlRotator {
|
||||
await this.proxy.markFailed(current.id, error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current proxy location info (for reporting)
|
||||
* Note: For rotating proxies (like IPRoyal), the actual exit location varies per request
|
||||
*/
|
||||
getProxyLocation(): { city?: string; state?: string; country?: string; timezone?: string; isRotating: boolean } | null {
|
||||
const current = this.proxy.getCurrent();
|
||||
if (!current) return null;
|
||||
|
||||
// Check if this is a rotating proxy (max_connections > 1 usually indicates rotating)
|
||||
const isRotating = current.maxConnections > 1;
|
||||
|
||||
return {
|
||||
city: current.city,
|
||||
state: current.state,
|
||||
country: current.country,
|
||||
timezone: current.timezone,
|
||||
isRotating
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================
|
||||
|
||||
@@ -276,7 +276,6 @@ export async function addProxiesFromList(proxies: Array<{
|
||||
await pool.query(`
|
||||
INSERT INTO proxies (host, port, protocol, username, password, active)
|
||||
VALUES ($1, $2, $3, $4, $5, false)
|
||||
ON CONFLICT (host, port, protocol) DO NOTHING
|
||||
`, [
|
||||
proxy.host,
|
||||
proxy.port,
|
||||
@@ -285,27 +284,9 @@ export async function addProxiesFromList(proxies: Array<{
|
||||
proxy.password
|
||||
]);
|
||||
|
||||
// Check if it was actually inserted
|
||||
const result = await pool.query(`
|
||||
SELECT id FROM proxies
|
||||
WHERE host = $1 AND port = $2 AND protocol = $3
|
||||
`, [proxy.host, proxy.port, proxy.protocol]);
|
||||
|
||||
if (result.rows.length > 0) {
|
||||
// Check if it was just inserted (no last_tested_at means new)
|
||||
const checkResult = await pool.query(`
|
||||
SELECT last_tested_at FROM proxies
|
||||
WHERE host = $1 AND port = $2 AND protocol = $3
|
||||
`, [proxy.host, proxy.port, proxy.protocol]);
|
||||
|
||||
if (checkResult.rows[0].last_tested_at === null) {
|
||||
added++;
|
||||
if (added % 100 === 0) {
|
||||
console.log(`📥 Imported ${added} proxies...`);
|
||||
}
|
||||
} else {
|
||||
duplicates++;
|
||||
}
|
||||
added++;
|
||||
if (added % 100 === 0) {
|
||||
console.log(`📥 Imported ${added} proxies...`);
|
||||
}
|
||||
} catch (error: any) {
|
||||
failed++;
|
||||
|
||||
@@ -8,8 +8,12 @@ interface ProxyTestJob {
|
||||
tested_proxies: number;
|
||||
passed_proxies: number;
|
||||
failed_proxies: number;
|
||||
mode?: string; // 'all' | 'failed' | 'inactive'
|
||||
}
|
||||
|
||||
// Concurrency settings
|
||||
const DEFAULT_CONCURRENCY = 10; // Test 10 proxies at a time
|
||||
|
||||
// Simple in-memory queue - could be replaced with Bull/Bee-Queue for production
|
||||
const activeJobs = new Map<number, { cancelled: boolean }>();
|
||||
|
||||
@@ -33,18 +37,35 @@ export async function cleanupOrphanedJobs(): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
export async function createProxyTestJob(): Promise<number> {
|
||||
export type ProxyTestMode = 'all' | 'failed' | 'inactive';
|
||||
|
||||
export async function createProxyTestJob(mode: ProxyTestMode = 'all', concurrency: number = DEFAULT_CONCURRENCY): Promise<number> {
|
||||
// Check for existing running jobs first
|
||||
const existingJob = await getActiveProxyTestJob();
|
||||
if (existingJob) {
|
||||
throw new Error('A proxy test job is already running. Please cancel it first.');
|
||||
}
|
||||
const result = await pool.query(`
|
||||
SELECT COUNT(*) as count FROM proxies
|
||||
`);
|
||||
|
||||
// Get count based on mode
|
||||
let countQuery: string;
|
||||
switch (mode) {
|
||||
case 'failed':
|
||||
countQuery = `SELECT COUNT(*) as count FROM proxies WHERE test_result = 'failed' OR active = false`;
|
||||
break;
|
||||
case 'inactive':
|
||||
countQuery = `SELECT COUNT(*) as count FROM proxies WHERE active = false`;
|
||||
break;
|
||||
default:
|
||||
countQuery = `SELECT COUNT(*) as count FROM proxies`;
|
||||
}
|
||||
|
||||
const result = await pool.query(countQuery);
|
||||
const totalProxies = parseInt(result.rows[0].count);
|
||||
|
||||
if (totalProxies === 0) {
|
||||
throw new Error(`No proxies to test with mode '${mode}'`);
|
||||
}
|
||||
|
||||
const jobResult = await pool.query(`
|
||||
INSERT INTO proxy_test_jobs (status, total_proxies)
|
||||
VALUES ('pending', $1)
|
||||
@@ -53,8 +74,8 @@ export async function createProxyTestJob(): Promise<number> {
|
||||
|
||||
const jobId = jobResult.rows[0].id;
|
||||
|
||||
// Start job in background
|
||||
runProxyTestJob(jobId).catch(err => {
|
||||
// Start job in background with mode and concurrency
|
||||
runProxyTestJob(jobId, mode, concurrency).catch(err => {
|
||||
console.error(`❌ Proxy test job ${jobId} failed:`, err);
|
||||
});
|
||||
|
||||
@@ -111,7 +132,7 @@ export async function cancelProxyTestJob(jobId: number): Promise<boolean> {
|
||||
return result.rows.length > 0;
|
||||
}
|
||||
|
||||
async function runProxyTestJob(jobId: number): Promise<void> {
|
||||
async function runProxyTestJob(jobId: number, mode: ProxyTestMode = 'all', concurrency: number = DEFAULT_CONCURRENCY): Promise<void> {
|
||||
// Register job as active
|
||||
activeJobs.set(jobId, { cancelled: false });
|
||||
|
||||
@@ -125,20 +146,30 @@ async function runProxyTestJob(jobId: number): Promise<void> {
|
||||
WHERE id = $1
|
||||
`, [jobId]);
|
||||
|
||||
console.log(`🔍 Starting proxy test job ${jobId}...`);
|
||||
console.log(`🔍 Starting proxy test job ${jobId} (mode: ${mode}, concurrency: ${concurrency})...`);
|
||||
|
||||
// Get all proxies
|
||||
const result = await pool.query(`
|
||||
SELECT id, host, port, protocol, username, password
|
||||
FROM proxies
|
||||
ORDER BY id
|
||||
`);
|
||||
// Get proxies based on mode
|
||||
let query: string;
|
||||
switch (mode) {
|
||||
case 'failed':
|
||||
query = `SELECT id, host, port, protocol, username, password FROM proxies WHERE test_result = 'failed' OR active = false ORDER BY id`;
|
||||
break;
|
||||
case 'inactive':
|
||||
query = `SELECT id, host, port, protocol, username, password FROM proxies WHERE active = false ORDER BY id`;
|
||||
break;
|
||||
default:
|
||||
query = `SELECT id, host, port, protocol, username, password FROM proxies ORDER BY id`;
|
||||
}
|
||||
|
||||
const result = await pool.query(query);
|
||||
const proxies = result.rows;
|
||||
|
||||
let tested = 0;
|
||||
let passed = 0;
|
||||
let failed = 0;
|
||||
|
||||
for (const proxy of result.rows) {
|
||||
// Process proxies in batches for parallel testing
|
||||
for (let i = 0; i < proxies.length; i += concurrency) {
|
||||
// Check if job was cancelled
|
||||
const jobControl = activeJobs.get(jobId);
|
||||
if (jobControl?.cancelled) {
|
||||
@@ -146,23 +177,34 @@ async function runProxyTestJob(jobId: number): Promise<void> {
|
||||
break;
|
||||
}
|
||||
|
||||
// Test the proxy
|
||||
const testResult = await testProxy(
|
||||
proxy.host,
|
||||
proxy.port,
|
||||
proxy.protocol,
|
||||
proxy.username,
|
||||
proxy.password
|
||||
const batch = proxies.slice(i, i + concurrency);
|
||||
|
||||
// Test batch in parallel
|
||||
const batchResults = await Promise.all(
|
||||
batch.map(async (proxy) => {
|
||||
const testResult = await testProxy(
|
||||
proxy.host,
|
||||
proxy.port,
|
||||
proxy.protocol,
|
||||
proxy.username,
|
||||
proxy.password
|
||||
);
|
||||
|
||||
// Save result
|
||||
await saveProxyTestResult(proxy.id, testResult);
|
||||
|
||||
return testResult.success;
|
||||
})
|
||||
);
|
||||
|
||||
// Save result
|
||||
await saveProxyTestResult(proxy.id, testResult);
|
||||
|
||||
tested++;
|
||||
if (testResult.success) {
|
||||
passed++;
|
||||
} else {
|
||||
failed++;
|
||||
// Count results
|
||||
for (const success of batchResults) {
|
||||
tested++;
|
||||
if (success) {
|
||||
passed++;
|
||||
} else {
|
||||
failed++;
|
||||
}
|
||||
}
|
||||
|
||||
// Update job progress
|
||||
@@ -175,10 +217,8 @@ async function runProxyTestJob(jobId: number): Promise<void> {
|
||||
WHERE id = $4
|
||||
`, [tested, passed, failed, jobId]);
|
||||
|
||||
// Log progress every 10 proxies
|
||||
if (tested % 10 === 0) {
|
||||
console.log(`📊 Job ${jobId}: ${tested}/${result.rows.length} proxies tested (${passed} passed, ${failed} failed)`);
|
||||
}
|
||||
// Log progress
|
||||
console.log(`📊 Job ${jobId}: ${tested}/${proxies.length} proxies tested (${passed} passed, ${failed} failed)`);
|
||||
}
|
||||
|
||||
// Mark job as completed
|
||||
|
||||
@@ -182,12 +182,13 @@ export class TaskWorker {
|
||||
}
|
||||
|
||||
/**
|
||||
* Send heartbeat to registry with resource usage
|
||||
* Send heartbeat to registry with resource usage and proxy location
|
||||
*/
|
||||
private async sendRegistryHeartbeat(): Promise<void> {
|
||||
try {
|
||||
const memUsage = process.memoryUsage();
|
||||
const cpuUsage = process.cpuUsage();
|
||||
const proxyLocation = this.crawlRotator.getProxyLocation();
|
||||
|
||||
await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, {
|
||||
method: 'POST',
|
||||
@@ -202,6 +203,7 @@ export class TaskWorker {
|
||||
memory_rss_mb: Math.round(memUsage.rss / 1024 / 1024),
|
||||
cpu_user_ms: Math.round(cpuUsage.user / 1000),
|
||||
cpu_system_ms: Math.round(cpuUsage.system / 1000),
|
||||
proxy_location: proxyLocation,
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user