Files
cannaiq/backend/src/platforms/dutchie/client.ts
Kelly 4949b22457 feat(tasks): Refactor task workflow with payload/refresh separation
Major changes:
- Split crawl into payload_fetch (API → disk) and product_refresh (disk → DB)
- Add task chaining: store_discovery → product_discovery → payload_fetch → product_refresh
- Add payload storage utilities for gzipped JSON on filesystem
- Add /api/payloads endpoints for payload access and diffing
- Add DB-driven TaskScheduler with schedule persistence
- Track newDispensaryIds through discovery promotion for chaining
- Add stealth improvements: HTTP fingerprinting, proxy rotation enhancements
- Add Workers dashboard K8s scaling controls

New files:
- src/tasks/handlers/payload-fetch.ts - Fetches from API, saves to disk
- src/services/task-scheduler.ts - DB-driven schedule management
- src/utils/payload-storage.ts - Payload save/load utilities
- src/routes/payloads.ts - Payload API endpoints
- src/services/http-fingerprint.ts - Browser fingerprint generation
- docs/TASK_WORKFLOW_2024-12-10.md - Complete workflow documentation

Migrations:
- 078: Proxy consecutive 403 tracking
- 079: task_schedules table
- 080: raw_crawl_payloads table
- 081: payload column and last_fetch_at

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 22:15:35 -07:00

658 lines
21 KiB
TypeScript

/**
* ============================================================
* DUTCHIE PLATFORM CLIENT - LOCKED MODULE
* ============================================================
*
* DO NOT MODIFY THIS FILE WITHOUT EXPLICIT AUTHORIZATION.
*
* Updated: 2025-12-10 per workflow-12102025.md
*
* KEY BEHAVIORS (per workflow-12102025.md):
* 1. startSession() gets identity from PROXY LOCATION, not task params
* 2. On 403: immediately get new IP + new fingerprint, then retry
* 3. After 3 consecutive 403s on same proxy → disable it (burned)
* 4. Language is always English (en-US)
*
* IMPLEMENTATION:
* - Uses curl via child_process.execSync (bypasses TLS fingerprinting)
* - NO Puppeteer, NO axios, NO fetch
* - Uses intoli/user-agents via CrawlRotator for realistic fingerprints
* - Residential IP compatible
*
* USAGE:
* import { curlPost, curlGet, executeGraphQL, startSession } from '@dutchie/client';
*
* ============================================================
*/
import { execSync } from 'child_process';
import {
buildOrderedHeaders,
buildRefererFromMenuUrl,
getCurlBinary,
isCurlImpersonateAvailable,
HeaderContext,
BrowserType,
} from '../../services/http-fingerprint';
// ============================================================
// TYPES
// ============================================================
export interface CurlResponse {
status: number;
data: any;
error?: string;
}
// Per workflow-12102025.md: fingerprint comes from CrawlRotator's BrowserFingerprint
// We keep a simplified interface here for header building
export interface Fingerprint {
userAgent: string;
acceptLanguage: string;
secChUa?: string;
secChUaPlatform?: string;
secChUaMobile?: string;
}
// ============================================================
// CONFIGURATION
// ============================================================
export const DUTCHIE_CONFIG = {
graphqlEndpoint: 'https://dutchie.com/api-3/graphql',
baseUrl: 'https://dutchie.com',
timeout: 30000,
maxRetries: 3,
perPage: 100,
maxPages: 200,
pageDelayMs: 500,
modeDelayMs: 2000,
};
// ============================================================
// PROXY SUPPORT
// Per workflow-12102025.md:
// - On 403: recordBlock() → increment consecutive_403_count
// - After 3 consecutive 403s → proxy disabled
// - Immediately rotate to new IP + new fingerprint on 403
// ============================================================
import type { CrawlRotator, BrowserFingerprint } from '../../services/crawl-rotator';
let currentProxy: string | null = null;
let crawlRotator: CrawlRotator | null = null;
/**
* Set proxy for all Dutchie requests
* Format: http://user:pass@host:port or socks5://host:port
*/
export function setProxy(proxy: string | null): void {
currentProxy = proxy;
if (proxy) {
console.log(`[Dutchie Client] Proxy set: ${proxy.replace(/:[^:@]+@/, ':***@')}`);
} else {
console.log('[Dutchie Client] Proxy disabled (direct connection)');
}
}
/**
* Get current proxy URL
*/
export function getProxy(): string | null {
return currentProxy;
}
/**
* Set CrawlRotator for proxy rotation on 403s
* Per workflow-12102025.md: enables automatic rotation when blocked
*/
export function setCrawlRotator(rotator: CrawlRotator | null): void {
crawlRotator = rotator;
if (rotator) {
console.log('[Dutchie Client] CrawlRotator attached - proxy rotation enabled');
const proxy = rotator.proxy.getCurrent();
if (proxy) {
currentProxy = rotator.proxy.getProxyUrl(proxy);
console.log(`[Dutchie Client] Initial proxy: ${currentProxy.replace(/:[^:@]+@/, ':***@')}`);
}
}
}
/**
* Get attached CrawlRotator
*/
export function getCrawlRotator(): CrawlRotator | null {
return crawlRotator;
}
/**
* Handle 403 block - per workflow-12102025.md:
* 1. Record block on current proxy (increments consecutive_403_count)
* 2. Immediately rotate to new proxy (new IP)
* 3. Rotate fingerprint
* Returns false if no more proxies available
*/
async function handle403Block(): Promise<boolean> {
if (!crawlRotator) {
console.warn('[Dutchie Client] No CrawlRotator - cannot handle 403');
return false;
}
// Per workflow-12102025.md: record block (tracks consecutive 403s)
const wasDisabled = await crawlRotator.recordBlock();
if (wasDisabled) {
console.log('[Dutchie Client] Current proxy was disabled (3 consecutive 403s)');
}
// Per workflow-12102025.md: immediately get new IP + new fingerprint
const { proxy: nextProxy, fingerprint } = crawlRotator.rotateBoth();
if (nextProxy) {
currentProxy = crawlRotator.proxy.getProxyUrl(nextProxy);
console.log(`[Dutchie Client] Rotated to new proxy: ${currentProxy.replace(/:[^:@]+@/, ':***@')}`);
console.log(`[Dutchie Client] New fingerprint: ${fingerprint.userAgent.slice(0, 50)}...`);
return true;
}
console.error('[Dutchie Client] No more proxies available!');
return false;
}
/**
* Record success on current proxy
* Per workflow-12102025.md: resets consecutive_403_count
*/
async function recordProxySuccess(responseTimeMs?: number): Promise<void> {
if (crawlRotator) {
await crawlRotator.recordSuccess(responseTimeMs);
}
}
/**
* Build curl proxy argument
*/
function getProxyArg(): string {
if (!currentProxy) return '';
return `--proxy '${currentProxy}'`;
}
export const GRAPHQL_HASHES = {
FilteredProducts: 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0',
GetAddressBasedDispensaryData: '13461f73abf7268770dfd05fe7e10c523084b2bb916a929c08efe3d87531977b',
ConsumerDispensaries: '0a5bfa6ca1d64ae47bcccb7c8077c87147cbc4e6982c17ceec97a2a4948b311b',
DispensaryInfo: '13461f73abf7268770dfd05fe7e10c523084b2bb916a929c08efe3d87531977b',
GetAllCitiesByState: 'ae547a0466ace5a48f91e55bf6699eacd87e3a42841560f0c0eabed5a0a920e6',
};
// ============================================================
// SESSION MANAGEMENT
// Per workflow-12102025.md:
// - Session identity comes from PROXY LOCATION
// - NOT from task params (no stateCode/timezone params)
// - Language is always English
// ============================================================
export interface CrawlSession {
sessionId: string;
fingerprint: BrowserFingerprint;
proxyUrl: string | null;
proxyTimezone?: string;
proxyState?: string;
startedAt: Date;
// Per workflow-12102025.md: Dynamic Referer per dispensary
menuUrl?: string;
referer: string;
}
let currentSession: CrawlSession | null = null;
/**
* Start a new crawl session
*
* Per workflow-12102025.md:
* - NO state/timezone params - identity comes from proxy location
* - Gets fingerprint from CrawlRotator (uses intoli/user-agents)
* - Language is always English (en-US)
* - Dynamic Referer per dispensary (from menuUrl)
*
* @param menuUrl - The dispensary's menu URL for dynamic Referer header
*/
export function startSession(menuUrl?: string): CrawlSession {
if (!crawlRotator) {
throw new Error('[Dutchie Client] Cannot start session without CrawlRotator');
}
// Per workflow-12102025.md: get identity from proxy location
const proxyLocation = crawlRotator.getProxyLocation();
const fingerprint = crawlRotator.userAgent.getCurrent();
// Per workflow-12102025.md: Dynamic Referer per dispensary
const referer = buildRefererFromMenuUrl(menuUrl);
currentSession = {
sessionId: `session_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`,
fingerprint,
proxyUrl: currentProxy,
proxyTimezone: proxyLocation?.timezone,
proxyState: proxyLocation?.state,
startedAt: new Date(),
menuUrl,
referer,
};
console.log(`[Dutchie Client] Started session ${currentSession.sessionId}`);
console.log(`[Dutchie Client] Browser: ${fingerprint.browserName} (${fingerprint.deviceCategory})`);
console.log(`[Dutchie Client] DNT: ${fingerprint.httpFingerprint.hasDNT ? 'enabled' : 'disabled'}`);
console.log(`[Dutchie Client] TLS: ${fingerprint.httpFingerprint.curlImpersonateBinary}`);
console.log(`[Dutchie Client] Referer: ${referer}`);
if (proxyLocation?.timezone) {
console.log(`[Dutchie Client] Proxy: ${proxyLocation.state || 'unknown'} (${proxyLocation.timezone})`);
}
return currentSession;
}
/**
* End the current crawl session
*/
export function endSession(): void {
if (currentSession) {
const duration = Math.round((Date.now() - currentSession.startedAt.getTime()) / 1000);
console.log(`[Dutchie Client] Ended session ${currentSession.sessionId} (${duration}s)`);
currentSession = null;
}
}
/**
* Get current active session
*/
export function getCurrentSession(): CrawlSession | null {
return currentSession;
}
// ============================================================
// CURL HTTP CLIENT
// ============================================================
/**
* Per workflow-12102025.md: Build headers using HTTP fingerprint system
* Returns headers in browser-specific order with all natural variations
*/
export function buildHeaders(isPost: boolean, contentLength?: number): { headers: Record<string, string>; orderedHeaders: string[] } {
if (!currentSession || !crawlRotator) {
throw new Error('[Dutchie Client] Cannot build headers without active session');
}
const fp = currentSession.fingerprint;
const httpFp = fp.httpFingerprint;
// Per workflow-12102025.md: Build context for ordered headers
const context: HeaderContext = {
userAgent: fp.userAgent,
secChUa: fp.secChUa,
secChUaPlatform: fp.secChUaPlatform,
secChUaMobile: fp.secChUaMobile,
referer: currentSession.referer,
isPost,
contentLength,
};
// Per workflow-12102025.md: Get ordered headers from HTTP fingerprint service
return buildOrderedHeaders(httpFp, context);
}
/**
* Per workflow-12102025.md: Get curl binary for current session's browser
* Uses curl-impersonate for TLS fingerprint matching
*/
function getCurlBinaryForSession(): string {
if (!currentSession) {
return 'curl'; // Fallback to standard curl
}
const browserType = currentSession.fingerprint.browserName as BrowserType;
// Per workflow-12102025.md: Check if curl-impersonate is available
if (isCurlImpersonateAvailable(browserType)) {
return getCurlBinary(browserType);
}
// Fallback to standard curl with warning
console.warn(`[Dutchie Client] curl-impersonate not available for ${browserType}, using standard curl`);
return 'curl';
}
/**
* Per workflow-12102025.md: Execute HTTP POST using curl/curl-impersonate
* - Uses browser-specific TLS fingerprint via curl-impersonate
* - Headers sent in browser-specific order
* - Dynamic Referer per dispensary
*/
export function curlPost(url: string, body: any, timeout = 30000): CurlResponse {
const bodyJson = JSON.stringify(body);
// Per workflow-12102025.md: Build ordered headers for POST request
const { headers, orderedHeaders } = buildHeaders(true, bodyJson.length);
// Per workflow-12102025.md: Build header args in browser-specific order
const headerArgs = orderedHeaders
.filter(h => h !== 'Host' && h !== 'Content-Length') // curl handles these
.map(h => `-H '${h}: ${headers[h]}'`)
.join(' ');
const bodyEscaped = bodyJson.replace(/'/g, "'\\''");
const timeoutSec = Math.ceil(timeout / 1000);
const separator = '___HTTP_STATUS___';
const proxyArg = getProxyArg();
// Per workflow-12102025.md: Use curl-impersonate for TLS fingerprint matching
const curlBinary = getCurlBinaryForSession();
const cmd = `${curlBinary} -s --compressed ${proxyArg} -w '${separator}%{http_code}' --max-time ${timeoutSec} ${headerArgs} -d '${bodyEscaped}' '${url}'`;
try {
const output = execSync(cmd, {
encoding: 'utf-8',
maxBuffer: 10 * 1024 * 1024,
timeout: timeout + 5000
});
const separatorIndex = output.lastIndexOf(separator);
if (separatorIndex === -1) {
const lines = output.trim().split('\n');
const statusCode = parseInt(lines.pop() || '0', 10);
const responseBody = lines.join('\n');
try {
return { status: statusCode, data: JSON.parse(responseBody) };
} catch {
return { status: statusCode, data: responseBody };
}
}
const responseBody = output.slice(0, separatorIndex);
const statusCode = parseInt(output.slice(separatorIndex + separator.length).trim(), 10);
try {
return { status: statusCode, data: JSON.parse(responseBody) };
} catch {
return { status: statusCode, data: responseBody };
}
} catch (error: any) {
return {
status: 0,
data: null,
error: error.message || 'curl request failed'
};
}
}
/**
* Per workflow-12102025.md: Execute HTTP GET using curl/curl-impersonate
* - Uses browser-specific TLS fingerprint via curl-impersonate
* - Headers sent in browser-specific order
* - Dynamic Referer per dispensary
*/
export function curlGet(url: string, timeout = 30000): CurlResponse {
// Per workflow-12102025.md: Build ordered headers for GET request
const { headers, orderedHeaders } = buildHeaders(false);
// Per workflow-12102025.md: Build header args in browser-specific order
const headerArgs = orderedHeaders
.filter(h => h !== 'Host' && h !== 'Content-Length') // curl handles these
.map(h => `-H '${h}: ${headers[h]}'`)
.join(' ');
const timeoutSec = Math.ceil(timeout / 1000);
const separator = '___HTTP_STATUS___';
const proxyArg = getProxyArg();
// Per workflow-12102025.md: Use curl-impersonate for TLS fingerprint matching
const curlBinary = getCurlBinaryForSession();
const cmd = `${curlBinary} -s --compressed ${proxyArg} -w '${separator}%{http_code}' --max-time ${timeoutSec} ${headerArgs} '${url}'`;
try {
const output = execSync(cmd, {
encoding: 'utf-8',
maxBuffer: 10 * 1024 * 1024,
timeout: timeout + 5000
});
const separatorIndex = output.lastIndexOf(separator);
if (separatorIndex === -1) {
const lines = output.trim().split('\n');
const statusCode = parseInt(lines.pop() || '0', 10);
const responseBody = lines.join('\n');
return { status: statusCode, data: responseBody };
}
const responseBody = output.slice(0, separatorIndex);
const statusCode = parseInt(output.slice(separatorIndex + separator.length).trim(), 10);
try {
return { status: statusCode, data: JSON.parse(responseBody) };
} catch {
return { status: statusCode, data: responseBody };
}
} catch (error: any) {
return {
status: 0,
data: null,
error: error.message || 'curl request failed'
};
}
}
// ============================================================
// GRAPHQL EXECUTION
// Per workflow-12102025.md:
// - On 403: immediately rotate IP + fingerprint (no delay first)
// - Then retry
// ============================================================
export interface ExecuteGraphQLOptions {
maxRetries?: number;
retryOn403?: boolean;
cName?: string;
}
/**
* Per workflow-12102025.md: Execute GraphQL query with curl/curl-impersonate
* - Uses browser-specific TLS fingerprint
* - Headers in browser-specific order
* - On 403: immediately rotate IP + fingerprint, then retry
*/
export async function executeGraphQL(
operationName: string,
variables: any,
hash: string,
options: ExecuteGraphQLOptions
): Promise<any> {
const { maxRetries = 3, retryOn403 = true } = options;
// Per workflow-12102025.md: Session must be active for requests
if (!currentSession) {
throw new Error('[Dutchie Client] Cannot execute GraphQL without active session - call startSession() first');
}
const body = {
operationName,
variables,
extensions: {
persistedQuery: { version: 1, sha256Hash: hash },
},
};
let lastError: Error | null = null;
let attempt = 0;
while (attempt <= maxRetries) {
console.log(`[Dutchie Client] curl POST ${operationName} (attempt ${attempt + 1}/${maxRetries + 1})`);
const startTime = Date.now();
// Per workflow-12102025.md: curlPost now uses ordered headers and curl-impersonate
const response = curlPost(DUTCHIE_CONFIG.graphqlEndpoint, body, DUTCHIE_CONFIG.timeout);
const responseTime = Date.now() - startTime;
console.log(`[Dutchie Client] Response status: ${response.status} (${responseTime}ms)`);
if (response.error) {
console.error(`[Dutchie Client] curl error: ${response.error}`);
lastError = new Error(response.error);
attempt++;
if (attempt <= maxRetries) {
await sleep(1000 * attempt);
}
continue;
}
if (response.status === 200) {
// Per workflow-12102025.md: success resets consecutive 403 count
await recordProxySuccess(responseTime);
if (response.data?.errors?.length > 0) {
console.warn(`[Dutchie Client] GraphQL errors: ${JSON.stringify(response.data.errors[0])}`);
}
return response.data;
}
if (response.status === 403 && retryOn403) {
// Per workflow-12102025.md: immediately rotate IP + fingerprint
console.warn(`[Dutchie Client] 403 blocked - immediately rotating proxy + fingerprint...`);
const hasMoreProxies = await handle403Block();
if (!hasMoreProxies) {
throw new Error('All proxies exhausted - no more IPs available');
}
// Per workflow-12102025.md: Update session referer after rotation
currentSession.referer = buildRefererFromMenuUrl(currentSession.menuUrl);
attempt++;
// Per workflow-12102025.md: small backoff after rotation
await sleep(500);
continue;
}
const bodyPreview = typeof response.data === 'string'
? response.data.slice(0, 200)
: JSON.stringify(response.data).slice(0, 200);
console.error(`[Dutchie Client] HTTP ${response.status}: ${bodyPreview}`);
lastError = new Error(`HTTP ${response.status}`);
attempt++;
if (attempt <= maxRetries) {
await sleep(1000 * attempt);
}
}
throw lastError || new Error('Max retries exceeded');
}
// ============================================================
// HTML PAGE FETCHING
// ============================================================
export interface FetchPageOptions {
maxRetries?: number;
retryOn403?: boolean;
}
/**
* Per workflow-12102025.md: Fetch HTML page from Dutchie
* - Uses browser-specific TLS fingerprint
* - Headers in browser-specific order
* - Same 403 handling as GraphQL
*/
export async function fetchPage(
path: string,
options: FetchPageOptions = {}
): Promise<{ html: string; status: number } | null> {
const { maxRetries = 3, retryOn403 = true } = options;
const url = `${DUTCHIE_CONFIG.baseUrl}${path}`;
// Per workflow-12102025.md: Session must be active for requests
if (!currentSession) {
throw new Error('[Dutchie Client] Cannot fetch page without active session - call startSession() first');
}
let attempt = 0;
while (attempt <= maxRetries) {
// Per workflow-12102025.md: curlGet now uses ordered headers and curl-impersonate
console.log(`[Dutchie Client] curl GET ${path} (attempt ${attempt + 1}/${maxRetries + 1})`);
const startTime = Date.now();
const response = curlGet(url, DUTCHIE_CONFIG.timeout);
const responseTime = Date.now() - startTime;
console.log(`[Dutchie Client] Response status: ${response.status} (${responseTime}ms)`);
if (response.error) {
console.error(`[Dutchie Client] curl error: ${response.error}`);
attempt++;
if (attempt <= maxRetries) {
await sleep(1000 * attempt);
}
continue;
}
if (response.status === 200) {
// Per workflow-12102025.md: success resets consecutive 403 count
await recordProxySuccess(responseTime);
return { html: response.data, status: response.status };
}
if (response.status === 403 && retryOn403) {
// Per workflow-12102025.md: immediately rotate IP + fingerprint
console.warn(`[Dutchie Client] 403 blocked - immediately rotating proxy + fingerprint...`);
const hasMoreProxies = await handle403Block();
if (!hasMoreProxies) {
throw new Error('All proxies exhausted - no more IPs available');
}
// Per workflow-12102025.md: Update session after rotation
currentSession.referer = buildRefererFromMenuUrl(currentSession.menuUrl);
attempt++;
// Per workflow-12102025.md: small backoff after rotation
await sleep(500);
continue;
}
console.error(`[Dutchie Client] HTTP ${response.status}`);
attempt++;
if (attempt <= maxRetries) {
await sleep(1000 * attempt);
}
}
return null;
}
/**
* Extract __NEXT_DATA__ from HTML page
*/
export function extractNextData(html: string): any | null {
const match = html.match(/<script id="__NEXT_DATA__" type="application\/json">([^<]+)<\/script>/);
if (match && match[1]) {
try {
return JSON.parse(match[1]);
} catch (e) {
console.error('[Dutchie Client] Failed to parse __NEXT_DATA__:', e);
return null;
}
}
return null;
}
// ============================================================
// UTILITY
// ============================================================
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}