From c84ef0396b53b860d8169ab41ca151b8c5501a35 Mon Sep 17 00:00:00 2001 From: Kelly Date: Thu, 11 Dec 2025 21:07:58 -0700 Subject: [PATCH] feat(tasks): Add proxy_test task handler and discovery run tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add proxy_test task handler that fetches IP via proxy to verify connectivity - Add discovery_runs migration (083) for tracking store discovery progress - Register proxy_test in task service and worker 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/migrations/083_discovery_runs.sql | 88 +++++++++++++++++++++++ backend/src/tasks/handlers/index.ts | 1 + backend/src/tasks/handlers/proxy-test.ts | 51 +++++++++++++ backend/src/tasks/task-service.ts | 3 +- backend/src/tasks/task-worker.ts | 2 + 5 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 backend/migrations/083_discovery_runs.sql create mode 100644 backend/src/tasks/handlers/proxy-test.ts diff --git a/backend/migrations/083_discovery_runs.sql b/backend/migrations/083_discovery_runs.sql new file mode 100644 index 00000000..d9c97ef2 --- /dev/null +++ b/backend/migrations/083_discovery_runs.sql @@ -0,0 +1,88 @@ +-- Migration 083: Discovery Run Tracking +-- Tracks progress of store discovery runs step-by-step + +-- Main discovery runs table +CREATE TABLE IF NOT EXISTS discovery_runs ( + id SERIAL PRIMARY KEY, + platform VARCHAR(50) NOT NULL DEFAULT 'dutchie', + status VARCHAR(20) NOT NULL DEFAULT 'running', -- running, completed, failed + started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + finished_at TIMESTAMPTZ, + task_id INTEGER REFERENCES worker_task_queue(id), + + -- Totals + states_total INTEGER DEFAULT 0, + states_completed INTEGER DEFAULT 0, + locations_discovered INTEGER DEFAULT 0, + locations_promoted INTEGER DEFAULT 0, + new_store_ids INTEGER[] DEFAULT '{}', + + -- Error info + error_message TEXT, + + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Per-state progress within a run +CREATE TABLE IF NOT EXISTS discovery_run_states ( + id SERIAL PRIMARY KEY, + run_id INTEGER NOT NULL REFERENCES discovery_runs(id) ON DELETE CASCADE, + state_code VARCHAR(2) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending, running, completed, failed + started_at TIMESTAMPTZ, + finished_at TIMESTAMPTZ, + + -- Results + cities_found INTEGER DEFAULT 0, + locations_found INTEGER DEFAULT 0, + locations_upserted INTEGER DEFAULT 0, + new_dispensary_ids INTEGER[] DEFAULT '{}', + + -- Error info + error_message TEXT, + + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + UNIQUE(run_id, state_code) +); + +-- Step-by-step log for detailed progress tracking +CREATE TABLE IF NOT EXISTS discovery_run_steps ( + id SERIAL PRIMARY KEY, + run_id INTEGER NOT NULL REFERENCES discovery_runs(id) ON DELETE CASCADE, + state_code VARCHAR(2), + step_name VARCHAR(100) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'started', -- started, completed, failed + started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + finished_at TIMESTAMPTZ, + + -- Details (JSON for flexibility) + details JSONB DEFAULT '{}', + + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Indexes for querying +CREATE INDEX IF NOT EXISTS idx_discovery_runs_status ON discovery_runs(status); +CREATE INDEX IF NOT EXISTS idx_discovery_runs_platform ON discovery_runs(platform); +CREATE INDEX IF NOT EXISTS idx_discovery_runs_started_at ON discovery_runs(started_at DESC); +CREATE INDEX IF NOT EXISTS idx_discovery_run_states_run_id ON discovery_run_states(run_id); +CREATE INDEX IF NOT EXISTS idx_discovery_run_steps_run_id ON discovery_run_steps(run_id); + +-- View for latest run status per platform +CREATE OR REPLACE VIEW v_latest_discovery_runs AS +SELECT DISTINCT ON (platform) + id, + platform, + status, + started_at, + finished_at, + states_total, + states_completed, + locations_discovered, + locations_promoted, + array_length(new_store_ids, 1) as new_stores_count, + error_message, + EXTRACT(EPOCH FROM (COALESCE(finished_at, NOW()) - started_at)) as duration_seconds +FROM discovery_runs +ORDER BY platform, started_at DESC; diff --git a/backend/src/tasks/handlers/index.ts b/backend/src/tasks/handlers/index.ts index c5763d8e..7ac17dcd 100644 --- a/backend/src/tasks/handlers/index.ts +++ b/backend/src/tasks/handlers/index.ts @@ -9,3 +9,4 @@ export { handleProductDiscovery } from './product-discovery'; export { handleStoreDiscovery } from './store-discovery'; export { handleEntryPointDiscovery } from './entry-point-discovery'; export { handleAnalyticsRefresh } from './analytics-refresh'; +export { handleProxyTest } from './proxy-test'; diff --git a/backend/src/tasks/handlers/proxy-test.ts b/backend/src/tasks/handlers/proxy-test.ts new file mode 100644 index 00000000..8f54d490 --- /dev/null +++ b/backend/src/tasks/handlers/proxy-test.ts @@ -0,0 +1,51 @@ +/** + * Proxy Test Handler + * Tests proxy connectivity by fetching public IP via ipify + */ + +import { TaskContext, TaskResult } from '../task-worker'; +import { execSync } from 'child_process'; + +export async function handleProxyTest(ctx: TaskContext): Promise { + const { pool } = ctx; + + console.log('[ProxyTest] Testing proxy connection...'); + + try { + // Get active proxy from DB + const proxyResult = await pool.query(` + SELECT host, port, username, password + FROM proxies + WHERE is_active = true + LIMIT 1 + `); + + if (proxyResult.rows.length === 0) { + return { success: false, error: 'No active proxy configured' }; + } + + const p = proxyResult.rows[0]; + const proxyUrl = p.username + ? `http://${p.username}:${p.password}@${p.host}:${p.port}` + : `http://${p.host}:${p.port}`; + + console.log(`[ProxyTest] Using proxy: ${p.host}:${p.port}`); + + // Fetch IP via proxy + const cmd = `curl -s --proxy '${proxyUrl}' 'https://api.ipify.org?format=json'`; + const output = execSync(cmd, { timeout: 30000 }).toString().trim(); + const data = JSON.parse(output); + + console.log(`[ProxyTest] Proxy IP: ${data.ip}`); + + return { + success: true, + proxyIp: data.ip, + proxyHost: p.host, + proxyPort: p.port, + }; + } catch (error: any) { + console.error('[ProxyTest] Error:', error.message); + return { success: false, error: error.message }; + } +} diff --git a/backend/src/tasks/task-service.ts b/backend/src/tasks/task-service.ts index 8392443c..322690d3 100644 --- a/backend/src/tasks/task-service.ts +++ b/backend/src/tasks/task-service.ts @@ -31,7 +31,8 @@ export type TaskRole = | 'product_discovery' | 'payload_fetch' // NEW: Fetches from API, saves to disk | 'product_refresh' // CHANGED: Now reads from local payload - | 'analytics_refresh'; + | 'analytics_refresh' + | 'proxy_test'; // Tests proxy connectivity via ipify export type TaskStatus = | 'pending' diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index baee5773..efadf027 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -59,6 +59,7 @@ import { handleProductDiscovery } from './handlers/product-discovery'; import { handleStoreDiscovery } from './handlers/store-discovery'; import { handleEntryPointDiscovery } from './handlers/entry-point-discovery'; import { handleAnalyticsRefresh } from './handlers/analytics-refresh'; +import { handleProxyTest } from './handlers/proxy-test'; const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000'); const HEARTBEAT_INTERVAL_MS = parseInt(process.env.HEARTBEAT_INTERVAL_MS || '30000'); @@ -133,6 +134,7 @@ const TASK_HANDLERS: Record = { store_discovery: handleStoreDiscovery, entry_point_discovery: handleEntryPointDiscovery, analytics_refresh: handleAnalyticsRefresh, + proxy_test: handleProxyTest, // Tests proxy via ipify }; /**