Compare commits

..

6 Commits

Author SHA1 Message Date
Kelly
6bcadd9e71 fix(preflight): Correct parameter order and add IP/fingerprint reporting
- Fix update_worker_preflight call to use correct parameter order:
  (worker_id, transport, status, ip, response_ms, error, fingerprint)
- Add proxyIp to both curl and http preflight reports
- Add fingerprint JSONB with timezone, location, and bot detection data
- Log HTTP IP and timezone after preflight completes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 00:32:45 -07:00
kelly
a77bf8611a Merge pull request 'feat(workers): Preflight phase 1 - Schema, StatefulSet, and timezone matching' (#55) from feat/preflight-phase1-schema into master 2025-12-12 07:30:53 +00:00
Kelly
33feca3138 fix(antidetect): Match browser timezone to proxy IP location
- Add IP geolocation lookup via ip-api.com to get timezone from proxy IP
- Use ipify.org API for reliable proxy IP detection (replaces unreliable fingerprint.com scraping)
- Set browser timezone via CDP Emulation.setTimezoneOverride to match proxy location
- Add detectedTimezone and detectedLocation to preflight result
- Add /api/worker-registry/preflight-test endpoint for smoke testing

Fixes timezone mismatch where browser showed America/Phoenix while proxy was in America/New_York

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 00:25:39 -07:00
kelly
7d85a97b63 Merge pull request 'feat: Preflight schema and StatefulSet' (#54) from feat/preflight-phase1-schema into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/54
2025-12-12 07:14:40 +00:00
Kelly
ce081effd4 feat(workers): Add preflight schema and StatefulSet
- Migration 085: Add curl_ip, http_ip, fingerprint_data, preflight_status,
  preflight_at columns to worker_registry
- StatefulSet manifest for 8 persistent workers with OnDelete update strategy

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 23:45:04 -07:00
kelly
2ed088b4d8 Merge pull request 'feat(api): Add preflight columns to worker registry API response' (#50) from feat/preflight-api-fields into master 2025-12-12 06:22:06 +00:00
5 changed files with 404 additions and 3 deletions

View File

@@ -0,0 +1,77 @@
apiVersion: v1
kind: Service
metadata:
name: scraper-worker
namespace: dispensary-scraper
labels:
app: scraper-worker
spec:
clusterIP: None # Headless service required for StatefulSet
selector:
app: scraper-worker
ports:
- port: 3010
name: http
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: scraper-worker
namespace: dispensary-scraper
spec:
serviceName: scraper-worker
replicas: 8
podManagementPolicy: Parallel # Start all pods at once
updateStrategy:
type: OnDelete # Pods only update when manually deleted - no automatic restarts
selector:
matchLabels:
app: scraper-worker
template:
metadata:
labels:
app: scraper-worker
spec:
terminationGracePeriodSeconds: 60
imagePullSecrets:
- name: regcred
containers:
- name: worker
image: code.cannabrands.app/creationshop/dispensary-scraper:latest
imagePullPolicy: Always
command: ["node"]
args: ["dist/tasks/task-worker.js"]
env:
- name: WORKER_MODE
value: "true"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: MAX_CONCURRENT_TASKS
value: "50"
- name: API_BASE_URL
value: http://scraper
- name: NODE_OPTIONS
value: --max-old-space-size=1500
envFrom:
- configMapRef:
name: scraper-config
- secretRef:
name: scraper-secrets
resources:
requests:
cpu: 100m
memory: 1Gi
limits:
cpu: 500m
memory: 2Gi
livenessProbe:
exec:
command:
- /bin/sh
- -c
- pgrep -f 'task-worker' > /dev/null
initialDelaySeconds: 10
periodSeconds: 30
failureThreshold: 3

View File

@@ -0,0 +1,168 @@
-- Migration 085: Add IP and fingerprint columns for preflight reporting
-- These columns were missing from migration 084
-- ===================================================================
-- PART 1: Add IP address columns to worker_registry
-- ===================================================================
-- IP address detected during curl/axios preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS curl_ip VARCHAR(45);
-- IP address detected during http/Puppeteer preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS http_ip VARCHAR(45);
-- ===================================================================
-- PART 2: Add fingerprint data column
-- ===================================================================
-- Browser fingerprint data captured during Puppeteer preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS fingerprint_data JSONB;
-- ===================================================================
-- PART 3: Add combined preflight status/timestamp for convenience
-- ===================================================================
-- Overall preflight status (computed from both transports)
-- Values: 'pending', 'passed', 'partial', 'failed'
-- - 'pending': neither transport tested
-- - 'passed': both transports passed (or http passed for browser-only)
-- - 'partial': at least one passed
-- - 'failed': no transport passed
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_status VARCHAR(20) DEFAULT 'pending';
-- Most recent preflight completion timestamp
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_at TIMESTAMPTZ;
-- ===================================================================
-- PART 4: Update function to set preflight status
-- ===================================================================
CREATE OR REPLACE FUNCTION update_worker_preflight(
p_worker_id VARCHAR(100),
p_transport VARCHAR(10), -- 'curl' or 'http'
p_status VARCHAR(20), -- 'passed', 'failed', 'skipped'
p_ip VARCHAR(45) DEFAULT NULL,
p_response_ms INTEGER DEFAULT NULL,
p_error TEXT DEFAULT NULL,
p_fingerprint JSONB DEFAULT NULL
) RETURNS VOID AS $$
DECLARE
v_curl_status VARCHAR(20);
v_http_status VARCHAR(20);
v_overall_status VARCHAR(20);
BEGIN
IF p_transport = 'curl' THEN
UPDATE worker_registry
SET
preflight_curl_status = p_status,
preflight_curl_at = NOW(),
preflight_curl_ms = p_response_ms,
preflight_curl_error = p_error,
curl_ip = p_ip,
updated_at = NOW()
WHERE worker_id = p_worker_id;
ELSIF p_transport = 'http' THEN
UPDATE worker_registry
SET
preflight_http_status = p_status,
preflight_http_at = NOW(),
preflight_http_ms = p_response_ms,
preflight_http_error = p_error,
http_ip = p_ip,
fingerprint_data = COALESCE(p_fingerprint, fingerprint_data),
updated_at = NOW()
WHERE worker_id = p_worker_id;
END IF;
-- Update overall preflight status
SELECT preflight_curl_status, preflight_http_status
INTO v_curl_status, v_http_status
FROM worker_registry
WHERE worker_id = p_worker_id;
-- Compute overall status
IF v_curl_status = 'passed' AND v_http_status = 'passed' THEN
v_overall_status := 'passed';
ELSIF v_curl_status = 'passed' OR v_http_status = 'passed' THEN
v_overall_status := 'partial';
ELSIF v_curl_status = 'failed' OR v_http_status = 'failed' THEN
v_overall_status := 'failed';
ELSE
v_overall_status := 'pending';
END IF;
UPDATE worker_registry
SET
preflight_status = v_overall_status,
preflight_at = NOW()
WHERE worker_id = p_worker_id;
END;
$$ LANGUAGE plpgsql;
-- ===================================================================
-- PART 5: Update v_active_workers view
-- ===================================================================
DROP VIEW IF EXISTS v_active_workers;
CREATE VIEW v_active_workers AS
SELECT
wr.id,
wr.worker_id,
wr.friendly_name,
wr.role,
wr.status,
wr.pod_name,
wr.hostname,
wr.started_at,
wr.last_heartbeat_at,
wr.last_task_at,
wr.tasks_completed,
wr.tasks_failed,
wr.current_task_id,
-- IP addresses from preflights
wr.curl_ip,
wr.http_ip,
-- Combined preflight status
wr.preflight_status,
wr.preflight_at,
-- Detailed preflight status per transport
wr.preflight_curl_status,
wr.preflight_http_status,
wr.preflight_curl_at,
wr.preflight_http_at,
wr.preflight_curl_error,
wr.preflight_http_error,
wr.preflight_curl_ms,
wr.preflight_http_ms,
-- Fingerprint data
wr.fingerprint_data,
-- Computed fields
EXTRACT(EPOCH FROM (NOW() - wr.last_heartbeat_at)) as seconds_since_heartbeat,
CASE
WHEN wr.status = 'offline' THEN 'offline'
WHEN wr.last_heartbeat_at < NOW() - INTERVAL '2 minutes' THEN 'stale'
WHEN wr.current_task_id IS NOT NULL THEN 'busy'
ELSE 'ready'
END as health_status,
-- Capability flags (can this worker handle curl/http tasks?)
(wr.preflight_curl_status = 'passed') as can_curl,
(wr.preflight_http_status = 'passed') as can_http
FROM worker_registry wr
WHERE wr.status != 'terminated'
ORDER BY wr.status = 'active' DESC, wr.last_heartbeat_at DESC;
-- ===================================================================
-- Comments
-- ===================================================================
COMMENT ON COLUMN worker_registry.curl_ip IS 'IP address detected during curl/axios preflight';
COMMENT ON COLUMN worker_registry.http_ip IS 'IP address detected during Puppeteer preflight';
COMMENT ON COLUMN worker_registry.fingerprint_data IS 'Browser fingerprint captured during Puppeteer preflight';
COMMENT ON COLUMN worker_registry.preflight_status IS 'Overall preflight status: pending, passed, partial, failed';
COMMENT ON COLUMN worker_registry.preflight_at IS 'Most recent preflight completion timestamp';

View File

@@ -23,6 +23,8 @@
import { Router, Request, Response } from 'express';
import { pool } from '../db/pool';
import os from 'os';
import { runPuppeteerPreflightWithRetry } from '../services/puppeteer-preflight';
import { CrawlRotator } from '../services/crawl-rotator';
const router = Router();
@@ -864,4 +866,58 @@ router.get('/pods', async (_req: Request, res: Response) => {
}
});
// ============================================================
// PREFLIGHT SMOKE TEST
// ============================================================
/**
* POST /api/worker-registry/preflight-test
* Run an HTTP (Puppeteer) preflight test and return results
*
* This is a smoke test endpoint to verify the preflight system works.
* Returns IP, fingerprint data, bot detection results, and products fetched.
*/
router.post('/preflight-test', async (_req: Request, res: Response) => {
try {
console.log('[PreflightTest] Starting HTTP preflight smoke test...');
// Create a temporary CrawlRotator for the test
const crawlRotator = new CrawlRotator();
// Run the Puppeteer preflight (with 1 retry)
const startTime = Date.now();
const result = await runPuppeteerPreflightWithRetry(crawlRotator, 1);
const duration = Date.now() - startTime;
console.log(`[PreflightTest] Completed in ${duration}ms - passed: ${result.passed}`);
res.json({
success: true,
test: 'http_preflight',
duration_ms: duration,
result: {
passed: result.passed,
proxy_ip: result.proxyIp,
fingerprint: result.fingerprint,
bot_detection: result.botDetection,
products_returned: result.productsReturned,
browser_user_agent: result.browserUserAgent,
ip_verified: result.ipVerified,
proxy_available: result.proxyAvailable,
proxy_connected: result.proxyConnected,
antidetect_ready: result.antidetectReady,
response_time_ms: result.responseTimeMs,
error: result.error
}
});
} catch (error: any) {
console.error('[PreflightTest] Error:', error.message);
res.status(500).json({
success: false,
test: 'http_preflight',
error: error.message
});
}
});
export default router;

View File

@@ -26,6 +26,34 @@ const TEST_PLATFORM_ID = '6405ef617056e8014d79101b';
const FINGERPRINT_DEMO_URL = 'https://demo.fingerprint.com/';
const AMIUNIQUE_URL = 'https://amiunique.org/fingerprint';
// IP geolocation API for timezone lookup (free, no key required)
const IP_API_URL = 'http://ip-api.com/json';
/**
* Look up timezone from IP address using ip-api.com
* Returns IANA timezone (e.g., 'America/New_York') or null on failure
*/
async function getTimezoneFromIp(ip: string): Promise<{ timezone: string; city?: string; region?: string } | null> {
try {
const axios = require('axios');
const response = await axios.get(`${IP_API_URL}/${ip}?fields=status,timezone,city,regionName`, {
timeout: 5000,
});
if (response.data?.status === 'success' && response.data?.timezone) {
return {
timezone: response.data.timezone,
city: response.data.city,
region: response.data.regionName,
};
}
return null;
} catch (err: any) {
console.log(`[PuppeteerPreflight] IP geolocation lookup failed: ${err.message}`);
return null;
}
}
export interface PuppeteerPreflightResult extends PreflightResult {
method: 'http';
/** Number of products returned (proves API access) */
@@ -42,6 +70,13 @@ export interface PuppeteerPreflightResult extends PreflightResult {
expectedProxyIp?: string;
/** Whether IP verification passed (detected IP matches proxy) */
ipVerified?: boolean;
/** Detected timezone from IP geolocation */
detectedTimezone?: string;
/** Detected location from IP geolocation */
detectedLocation?: {
city?: string;
region?: string;
};
}
/**
@@ -136,7 +171,52 @@ export async function runPuppeteerPreflight(
};
// =========================================================================
// STEP 1: Visit fingerprint.com demo to verify anti-detect and get IP
// STEP 1a: Get IP address directly via simple API (more reliable than scraping)
// =========================================================================
console.log(`[PuppeteerPreflight] Getting proxy IP address...`);
try {
const ipApiResponse = await page.evaluate(async () => {
try {
const response = await fetch('https://api.ipify.org?format=json');
const data = await response.json();
return { ip: data.ip, error: null };
} catch (err: any) {
return { ip: null, error: err.message };
}
});
if (ipApiResponse.ip) {
result.proxyIp = ipApiResponse.ip;
result.proxyConnected = true;
console.log(`[PuppeteerPreflight] Detected proxy IP: ${ipApiResponse.ip}`);
// Look up timezone from IP
const geoData = await getTimezoneFromIp(ipApiResponse.ip);
if (geoData) {
result.detectedTimezone = geoData.timezone;
result.detectedLocation = { city: geoData.city, region: geoData.region };
console.log(`[PuppeteerPreflight] IP Geolocation: ${geoData.city}, ${geoData.region} (${geoData.timezone})`);
// Set browser timezone to match proxy location via CDP
try {
const client = await page.target().createCDPSession();
await client.send('Emulation.setTimezoneOverride', { timezoneId: geoData.timezone });
console.log(`[PuppeteerPreflight] Browser timezone set to: ${geoData.timezone}`);
} catch (tzErr: any) {
console.log(`[PuppeteerPreflight] Failed to set browser timezone: ${tzErr.message}`);
}
} else {
console.log(`[PuppeteerPreflight] WARNING: Could not determine timezone from IP - timezone mismatch possible`);
}
} else {
console.log(`[PuppeteerPreflight] IP lookup failed: ${ipApiResponse.error || 'unknown error'}`);
}
} catch (ipErr: any) {
console.log(`[PuppeteerPreflight] IP API error: ${ipErr.message}`);
}
// =========================================================================
// STEP 1b: Visit fingerprint.com demo to verify anti-detect
// =========================================================================
console.log(`[PuppeteerPreflight] Testing anti-detect at ${FINGERPRINT_DEMO_URL}...`);
@@ -199,6 +279,8 @@ export async function runPuppeteerPreflight(
// Don't fail - residential proxies often show different egress IPs
}
}
// Note: Timezone already set earlier via ipify.org IP lookup
}
if (fingerprintData.visitorId) {

View File

@@ -435,29 +435,47 @@ export class TaskWorker {
/**
* Report preflight status to worker_registry
* Function signature: update_worker_preflight(worker_id, transport, status, ip, response_ms, error, fingerprint)
*/
private async reportPreflightStatus(): Promise<void> {
try {
// Update worker_registry directly via SQL (more reliable than API)
// CURL preflight - includes IP address
await this.pool.query(`
SELECT update_worker_preflight($1, 'curl', $2, $3, $4)
SELECT update_worker_preflight($1, 'curl', $2, $3, $4, $5, $6)
`, [
this.workerId,
this.preflightCurlPassed ? 'passed' : 'failed',
this.preflightCurlResult?.proxyIp || null,
this.preflightCurlResult?.responseTimeMs || null,
this.preflightCurlResult?.error || null,
null, // No fingerprint for curl
]);
// HTTP preflight - includes IP, fingerprint, and timezone data
const httpFingerprint = this.preflightHttpResult ? {
...this.preflightHttpResult.fingerprint,
detectedTimezone: (this.preflightHttpResult as any).detectedTimezone,
detectedLocation: (this.preflightHttpResult as any).detectedLocation,
productsReturned: this.preflightHttpResult.productsReturned,
botDetection: (this.preflightHttpResult as any).botDetection,
} : null;
await this.pool.query(`
SELECT update_worker_preflight($1, 'http', $2, $3, $4)
SELECT update_worker_preflight($1, 'http', $2, $3, $4, $5, $6)
`, [
this.workerId,
this.preflightHttpPassed ? 'passed' : 'failed',
this.preflightHttpResult?.proxyIp || null,
this.preflightHttpResult?.responseTimeMs || null,
this.preflightHttpResult?.error || null,
httpFingerprint ? JSON.stringify(httpFingerprint) : null,
]);
console.log(`[TaskWorker] Preflight status reported to worker_registry`);
if (this.preflightHttpResult?.proxyIp) {
console.log(`[TaskWorker] HTTP IP: ${this.preflightHttpResult.proxyIp}, Timezone: ${(this.preflightHttpResult as any).detectedTimezone || 'unknown'}`);
}
} catch (err: any) {
// Non-fatal - worker can still function
console.warn(`[TaskWorker] Could not report preflight status: ${err.message}`);