diff --git a/backend/src/tasks/handlers/index.ts b/backend/src/tasks/handlers/index.ts index e4583c0c..ecdeac16 100644 --- a/backend/src/tasks/handlers/index.ts +++ b/backend/src/tasks/handlers/index.ts @@ -2,9 +2,15 @@ * Task Handlers Index * * Exports all task handlers for the task worker. + * + * Product Discovery: + * - handleProductDiscoveryCurl: curl/axios based (for curl transport) + * - handleProductDiscoveryHttp: Puppeteer browser-based (for http transport) */ -export { handleProductDiscovery } from './product-discovery'; +export { handleProductDiscovery as handleProductDiscoveryCurl } from './product-discovery-curl'; +export { handleProductDiscoveryHttp } from './product-discovery-http'; +export { handlePayloadFetch as handlePayloadFetchCurl } from './payload-fetch-curl'; export { handleProductRefresh } from './product-refresh'; export { handleStoreDiscovery } from './store-discovery'; export { handleEntryPointDiscovery } from './entry-point-discovery'; diff --git a/backend/src/tasks/handlers/payload-fetch.ts b/backend/src/tasks/handlers/payload-fetch-curl.ts similarity index 100% rename from backend/src/tasks/handlers/payload-fetch.ts rename to backend/src/tasks/handlers/payload-fetch-curl.ts diff --git a/backend/src/tasks/handlers/product-discovery.ts b/backend/src/tasks/handlers/product-discovery-curl.ts similarity index 94% rename from backend/src/tasks/handlers/product-discovery.ts rename to backend/src/tasks/handlers/product-discovery-curl.ts index 6599ddfc..187e458d 100644 --- a/backend/src/tasks/handlers/product-discovery.ts +++ b/backend/src/tasks/handlers/product-discovery-curl.ts @@ -13,7 +13,7 @@ */ import { TaskContext, TaskResult } from '../task-worker'; -import { handlePayloadFetch } from './payload-fetch'; +import { handlePayloadFetch } from './payload-fetch-curl'; export async function handleProductDiscovery(ctx: TaskContext): Promise { const { task } = ctx; diff --git a/backend/src/tasks/handlers/product-discovery-http.ts b/backend/src/tasks/handlers/product-discovery-http.ts new file mode 100644 index 00000000..56d51555 --- /dev/null +++ b/backend/src/tasks/handlers/product-discovery-http.ts @@ -0,0 +1,314 @@ +/** + * Product Discovery HTTP Handler (Browser-based) + * + * Uses Puppeteer + StealthPlugin to fetch products via browser context. + * Based on test-intercept.js pattern from ORGANIC_SCRAPING_GUIDE.md. + * + * This handler: + * 1. Loads dispensary info + * 2. Launches headless browser with proxy (if provided) + * 3. Establishes session by visiting embedded menu + * 4. Fetches ALL products via GraphQL from browser context + * 5. Saves raw payload to filesystem (gzipped) + * 6. Records metadata in raw_crawl_payloads table + * 7. Queues product_refresh task to process the payload + * + * Why browser-based: + * - Works with session-based residential proxies (Evomi) + * - Lower detection risk than curl/axios + * - Real Chrome TLS fingerprint + */ + +import { TaskContext, TaskResult } from '../task-worker'; +import { saveRawPayload } from '../../utils/payload-storage'; +import { taskService } from '../task-service'; + +// GraphQL hash for FilteredProducts query - MUST match CLAUDE.md +const FILTERED_PRODUCTS_HASH = 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0'; + +export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise { + const { pool, task, crawlRotator } = ctx; + const dispensaryId = task.dispensary_id; + + if (!dispensaryId) { + return { success: false, error: 'No dispensary_id specified for product_discovery task' }; + } + + let browser: any = null; + + try { + // ============================================================ + // STEP 1: Load dispensary info + // ============================================================ + const dispResult = await pool.query(` + SELECT + id, name, platform_dispensary_id, menu_url, menu_type, city, state + FROM dispensaries + WHERE id = $1 AND crawl_enabled = true + `, [dispensaryId]); + + if (dispResult.rows.length === 0) { + return { success: false, error: `Dispensary ${dispensaryId} not found or not crawl_enabled` }; + } + + const dispensary = dispResult.rows[0]; + const platformId = dispensary.platform_dispensary_id; + + if (!platformId) { + return { success: false, error: `Dispensary ${dispensaryId} has no platform_dispensary_id` }; + } + + // Extract cName from menu_url + const cNameMatch = dispensary.menu_url?.match(/\/(?:embedded-menu|dispensary)\/([^/?]+)/); + const cName = cNameMatch ? cNameMatch[1] : 'dispensary'; + + console.log(`[ProductDiscoveryHTTP] Starting for ${dispensary.name} (ID: ${dispensaryId})`); + console.log(`[ProductDiscoveryHTTP] Platform ID: ${platformId}, cName: ${cName}`); + + await ctx.heartbeat(); + + // ============================================================ + // STEP 2: Setup Puppeteer with proxy + // ============================================================ + const puppeteer = require('puppeteer-extra'); + const StealthPlugin = require('puppeteer-extra-plugin-stealth'); + puppeteer.use(StealthPlugin()); + + // Get proxy from CrawlRotator if available + let proxyUrl: string | null = null; + if (crawlRotator) { + const currentProxy = crawlRotator.proxy.getCurrent(); + if (currentProxy) { + proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy); + console.log(`[ProductDiscoveryHTTP] Using proxy: ${currentProxy.host}:${currentProxy.port}`); + } + } + + // Build browser args + const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox']; + if (proxyUrl) { + const proxyUrlParsed = new URL(proxyUrl); + browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`); + } + + browser = await puppeteer.launch({ + headless: 'new', + args: browserArgs, + }); + + const page = await browser.newPage(); + + // Setup proxy auth if needed + if (proxyUrl) { + const proxyUrlParsed = new URL(proxyUrl); + if (proxyUrlParsed.username && proxyUrlParsed.password) { + await page.authenticate({ + username: decodeURIComponent(proxyUrlParsed.username), + password: decodeURIComponent(proxyUrlParsed.password), + }); + } + } + + await ctx.heartbeat(); + + // ============================================================ + // STEP 3: Establish session by visiting embedded menu + // ============================================================ + const embedUrl = `https://dutchie.com/embedded-menu/${cName}?menuType=rec`; + console.log(`[ProductDiscoveryHTTP] Establishing session at ${embedUrl}...`); + + await page.goto(embedUrl, { + waitUntil: 'networkidle2', + timeout: 60000, + }); + + console.log(`[ProductDiscoveryHTTP] Session established, fetching products...`); + + await ctx.heartbeat(); + + // ============================================================ + // STEP 4: Fetch ALL products via GraphQL from browser context + // ============================================================ + const result = await page.evaluate(async (platformId: string, graphqlHash: string) => { + const allProducts: any[] = []; + const logs: string[] = []; + let pageNum = 0; + const perPage = 100; + let totalCount = 0; + const sessionId = 'browser-session-' + Date.now(); + + try { + while (pageNum < 30) { // Max 30 pages = 3000 products + const variables = { + includeEnterpriseSpecials: false, + productsFilter: { + dispensaryId: platformId, + pricingType: 'rec', + Status: 'Active', // CRITICAL: Must be 'Active', not null + types: [], + useCache: true, + isDefaultSort: true, + sortBy: 'popularSortIdx', + sortDirection: 1, + bypassOnlineThresholds: true, + isKioskMenu: false, + removeProductsBelowOptionThresholds: false, + }, + page: pageNum, + perPage: perPage, + }; + + const extensions = { + persistedQuery: { + version: 1, + sha256Hash: graphqlHash, + }, + }; + + // Build GET URL like the browser does + const qs = new URLSearchParams({ + operationName: 'FilteredProducts', + variables: JSON.stringify(variables), + extensions: JSON.stringify(extensions), + }); + const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`; + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Accept': 'application/json', + 'content-type': 'application/json', + 'x-dutchie-session': sessionId, + 'apollographql-client-name': 'Marketplace (production)', + }, + credentials: 'include', + }); + + logs.push(`Page ${pageNum}: HTTP ${response.status}`); + + if (!response.ok) { + const text = await response.text(); + logs.push(`HTTP error: ${response.status} - ${text.slice(0, 200)}`); + break; + } + + const json = await response.json(); + + if (json.errors) { + logs.push(`GraphQL error: ${JSON.stringify(json.errors).slice(0, 200)}`); + break; + } + + const data = json?.data?.filteredProducts; + if (!data || !data.products) { + logs.push('No products in response'); + break; + } + + const products = data.products; + allProducts.push(...products); + + if (pageNum === 0) { + totalCount = data.queryInfo?.totalCount || 0; + logs.push(`Total reported: ${totalCount}`); + } + + logs.push(`Got ${products.length} products (total: ${allProducts.length}/${totalCount})`); + + if (allProducts.length >= totalCount || products.length < perPage) { + break; + } + + pageNum++; + + // Small delay between pages to be polite + await new Promise(r => setTimeout(r, 200)); + } + } catch (err: any) { + logs.push(`Error: ${err.message}`); + } + + return { products: allProducts, totalCount, logs }; + }, platformId, FILTERED_PRODUCTS_HASH); + + // Print logs from browser context + result.logs.forEach((log: string) => console.log(`[Browser] ${log}`)); + + console.log(`[ProductDiscoveryHTTP] Fetched ${result.products.length} products (API reported ${result.totalCount})`); + + await browser.close(); + browser = null; + + if (result.products.length === 0) { + return { + success: false, + error: 'No products returned from GraphQL', + productsProcessed: 0, + }; + } + + await ctx.heartbeat(); + + // ============================================================ + // STEP 5: Save raw payload to filesystem + // ============================================================ + const rawPayload = { + dispensaryId, + platformId, + cName, + fetchedAt: new Date().toISOString(), + productCount: result.products.length, + products: result.products, + }; + + const payloadResult = await saveRawPayload( + pool, + dispensaryId, + rawPayload, + null, // crawl_run_id - not using crawl_runs in new system + result.products.length + ); + + console.log(`[ProductDiscoveryHTTP] Saved payload #${payloadResult.id} (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`); + + // ============================================================ + // STEP 6: Update dispensary last_fetch_at + // ============================================================ + await pool.query(` + UPDATE dispensaries + SET last_fetch_at = NOW() + WHERE id = $1 + `, [dispensaryId]); + + // ============================================================ + // STEP 7: Queue product_refresh task to process the payload + // ============================================================ + await taskService.createTask({ + role: 'product_refresh', + dispensary_id: dispensaryId, + priority: task.priority || 0, + payload: { payload_id: payloadResult.id }, + }); + + console.log(`[ProductDiscoveryHTTP] Queued product_refresh task for payload #${payloadResult.id}`); + + return { + success: true, + payloadId: payloadResult.id, + productCount: result.products.length, + sizeBytes: payloadResult.sizeBytes, + }; + + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + console.error(`[ProductDiscoveryHTTP] Error for dispensary ${dispensaryId}:`, errorMessage); + return { + success: false, + error: errorMessage, + }; + } finally { + if (browser) { + await browser.close().catch(() => {}); + } + } +} diff --git a/backend/src/tasks/handlers/product-refresh.ts b/backend/src/tasks/handlers/product-refresh.ts index dda14e38..4c18fb39 100644 --- a/backend/src/tasks/handlers/product-refresh.ts +++ b/backend/src/tasks/handlers/product-refresh.ts @@ -27,6 +27,7 @@ import { downloadProductImages, } from '../../hydration/canonical-upsert'; import { loadRawPayloadById, getLatestPayload } from '../../utils/payload-storage'; +import { taskService } from '../task-service'; const normalizer = new DutchieNormalizer(); @@ -86,7 +87,37 @@ export async function handleProductRefresh(ctx: TaskContext): Promise; // Per TASK_WORKFLOW_2024-12-10.md: For task chaining data } @@ -106,14 +107,15 @@ class TaskService { */ async createTask(params: CreateTaskParams): Promise { const result = await pool.query( - `INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for, payload) - VALUES ($1, $2, $3, $4, $5, $6) + `INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for, payload) + VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING *`, [ params.role, params.dispensary_id ?? null, params.platform ?? null, params.priority ?? 0, + params.method ?? null, // null = any worker can pick up, 'http' = http-capable workers only, 'curl' = curl workers only params.scheduled_for ?? null, params.payload ? JSON.stringify(params.payload) : null, ] @@ -128,8 +130,8 @@ class TaskService { if (tasks.length === 0) return 0; const values = tasks.map((t, i) => { - const base = i * 5; - return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5})`; + const base = i * 6; + return `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, $${base + 5}, $${base + 6})`; }); const params = tasks.flatMap((t) => [ @@ -137,11 +139,12 @@ class TaskService { t.dispensary_id ?? null, t.platform ?? null, t.priority ?? 0, + t.method ?? null, t.scheduled_for ?? null, ]); const result = await pool.query( - `INSERT INTO worker_tasks (role, dispensary_id, platform, priority, scheduled_for) + `INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for) VALUES ${values.join(', ')} ON CONFLICT DO NOTHING`, params diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 7ad4b883..0c7271f5 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -69,9 +69,11 @@ import { runPuppeteerPreflightWithRetry, PuppeteerPreflightResult } from '../ser // Task handlers by role // Per TASK_WORKFLOW_2024-12-10.md: payload_fetch and product_refresh are now separate -import { handlePayloadFetch } from './handlers/payload-fetch'; +// Dual-transport: curl vs http (browser-based) handlers +import { handlePayloadFetch } from './handlers/payload-fetch-curl'; import { handleProductRefresh } from './handlers/product-refresh'; -import { handleProductDiscovery } from './handlers/product-discovery'; +import { handleProductDiscovery } from './handlers/product-discovery-curl'; +import { handleProductDiscoveryHttp } from './handlers/product-discovery-http'; import { handleStoreDiscovery } from './handlers/store-discovery'; import { handleEntryPointDiscovery } from './handlers/entry-point-discovery'; import { handleAnalyticsRefresh } from './handlers/analytics-refresh'; @@ -144,17 +146,38 @@ type TaskHandler = (ctx: TaskContext) => Promise; // Per TASK_WORKFLOW_2024-12-10.md: Handler registry // payload_fetch: Fetches from Dutchie API, saves to disk // product_refresh: Reads local payload, normalizes, upserts to DB -// product_discovery: Main handler for product crawling +// product_discovery: Main handler for product crawling (has curl and http variants) const TASK_HANDLERS: Record = { - payload_fetch: handlePayloadFetch, // API fetch -> disk + payload_fetch: handlePayloadFetch, // API fetch -> disk (curl) product_refresh: handleProductRefresh, // disk -> DB - product_discovery: handleProductDiscovery, + product_discovery: handleProductDiscovery, // Default: curl (see getHandlerForTask for http override) store_discovery: handleStoreDiscovery, entry_point_discovery: handleEntryPointDiscovery, analytics_refresh: handleAnalyticsRefresh, whoami: handleWhoami, // Tests proxy + anti-detect }; +/** + * Get the appropriate handler for a task, considering both role and method. + * + * For product_discovery: + * - method='http' -> handleProductDiscoveryHttp (browser-based, for Evomi proxies) + * - method='curl' or unspecified -> handleProductDiscovery (curl-based) + */ +function getHandlerForTask(task: WorkerTask): TaskHandler | undefined { + const role = task.role as TaskRole; + const method = task.method || 'curl'; + + // Special handling for product_discovery with method='http' + if (role === 'product_discovery' && method === 'http') { + console.log(`[TaskWorker] Using HTTP handler for product_discovery (method=${method})`); + return handleProductDiscoveryHttp; + } + + // Default: use the static handler registry + return TASK_HANDLERS[role]; +} + /** * Resource usage stats reported to the registry and used for backoff decisions. * These values are included in worker heartbeats and displayed in the UI. @@ -783,13 +806,32 @@ export class TaskWorker { console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`); // ================================================================= - // PREFLIGHT CHECK - CRITICAL: Worker MUST pass before task execution - // Verifies: 1) Proxy available 2) Proxy connected 3) Anti-detect ready + // PREFLIGHT CHECK - Use stored preflight results based on task method + // We already ran dual-transport preflights at startup, so just verify + // the correct preflight passed for this task's required method. // ================================================================= - const preflight = await this.crawlRotator.preflight(); - if (!preflight.passed) { - console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${preflight.error}`); - console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without proxy/anti-detect`); + const taskMethod = task.method || 'http'; // Default to http if not specified + let preflightPassed = false; + let preflightMsg = ''; + + if (taskMethod === 'http' && this.preflightHttpPassed) { + preflightPassed = true; + preflightMsg = `HTTP preflight passed (IP: ${this.preflightHttpResult?.proxyIp || 'unknown'})`; + } else if (taskMethod === 'curl' && this.preflightCurlPassed) { + preflightPassed = true; + preflightMsg = `CURL preflight passed (IP: ${this.preflightCurlResult?.proxyIp || 'unknown'})`; + } else if (!task.method && (this.preflightHttpPassed || this.preflightCurlPassed)) { + // No method preference - either transport works + preflightPassed = true; + preflightMsg = this.preflightHttpPassed ? 'HTTP preflight passed' : 'CURL preflight passed'; + } + + if (!preflightPassed) { + const errorMsg = taskMethod === 'http' + ? 'HTTP preflight not passed - cannot execute http tasks' + : 'CURL preflight not passed - cannot execute curl tasks'; + console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${errorMsg}`); + console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without preflight`); // Release task back to pending so another worker can pick it up await taskService.releaseTask(task.id); @@ -799,7 +841,7 @@ export class TaskWorker { return; } - console.log(`[TaskWorker] ${this.friendlyName} preflight PASSED for task ${task.id} (proxy: ${preflight.proxyIp}, ${preflight.responseTimeMs}ms)`); + console.log(`[TaskWorker] ${this.friendlyName} preflight verified for task ${task.id}: ${preflightMsg}`); this.activeTasks.set(task.id, task); @@ -843,8 +885,8 @@ export class TaskWorker { // Mark as running await taskService.startTask(task.id); - // Get handler for this role - const handler = TASK_HANDLERS[task.role]; + // Get handler for this role (considers method for dual-transport) + const handler = getHandlerForTask(task); if (!handler) { throw new Error(`No handler registered for role: ${task.role}`); }