Files
cannaiq/backend/dist/services/crawler-jobs.js
Kelly 66e07b2009 fix(monitor): remove non-existent worker columns from job_run_logs query
The job_run_logs table tracks scheduled job orchestration, not individual
worker jobs. Worker info (worker_id, worker_hostname) belongs on
dispensary_crawl_jobs, not job_run_logs.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 18:45:05 -07:00

477 lines
22 KiB
JavaScript

"use strict";
/**
* Crawler Jobs Service
*
* Handles three types of jobs:
* 1. DetectMenuProviderJob - Detect menu provider for a dispensary
* 2. DutchieMenuCrawlJob - Production Dutchie crawl
* 3. SandboxCrawlJob - Learning/testing crawl for unknown providers
*/
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.runDetectMenuProviderJob = runDetectMenuProviderJob;
exports.runDutchieMenuCrawlJob = runDutchieMenuCrawlJob;
exports.runSandboxCrawlJob = runSandboxCrawlJob;
exports.processSandboxJobs = processSandboxJobs;
const migrate_1 = require("../db/migrate");
const logger_1 = require("./logger");
const menu_provider_detector_1 = require("./menu-provider-detector");
const scraper_v2_1 = require("../scraper-v2");
const puppeteer_1 = __importDefault(require("puppeteer"));
const fs_1 = require("fs");
const path_1 = __importDefault(require("path"));
const availability_1 = require("./availability");
const WORKER_ID = `crawler-${process.pid}-${Date.now()}`;
// ========================================
// Helper Functions
// ========================================
async function getDispensary(dispensaryId) {
const result = await migrate_1.pool.query(`SELECT id, name, website, menu_url, menu_provider, menu_provider_confidence,
crawler_mode, crawler_status, scraper_template
FROM dispensaries WHERE id = $1`, [dispensaryId]);
return result.rows[0] || null;
}
async function updateDispensary(dispensaryId, updates) {
const setClauses = [];
const values = [];
let paramIndex = 1;
for (const [key, value] of Object.entries(updates)) {
setClauses.push(`${key} = $${paramIndex}`);
values.push(value);
paramIndex++;
}
setClauses.push(`updated_at = NOW()`);
values.push(dispensaryId);
await migrate_1.pool.query(`UPDATE dispensaries SET ${setClauses.join(', ')} WHERE id = $${paramIndex}`, values);
}
async function createSandboxEntry(dispensaryId, suspectedProvider, mode, detectionSignals) {
// First, check if there's an existing active sandbox
const existing = await migrate_1.pool.query(`SELECT id FROM crawler_sandboxes
WHERE dispensary_id = $1 AND status NOT IN ('moved_to_production', 'failed')`, [dispensaryId]);
if (existing.rows.length > 0) {
// Update existing
await migrate_1.pool.query(`UPDATE crawler_sandboxes
SET suspected_menu_provider = $2, mode = $3, detection_signals = COALESCE($4, detection_signals), updated_at = NOW()
WHERE id = $1`, [existing.rows[0].id, suspectedProvider, mode, detectionSignals ? JSON.stringify(detectionSignals) : null]);
return existing.rows[0].id;
}
// Create new
const result = await migrate_1.pool.query(`INSERT INTO crawler_sandboxes (dispensary_id, suspected_menu_provider, mode, detection_signals, status)
VALUES ($1, $2, $3, $4, 'pending')
RETURNING id`, [dispensaryId, suspectedProvider, mode, detectionSignals ? JSON.stringify(detectionSignals) : '{}']);
return result.rows[0].id;
}
async function createSandboxJob(dispensaryId, sandboxId, jobType, priority = 0) {
const result = await migrate_1.pool.query(`INSERT INTO sandbox_crawl_jobs (dispensary_id, sandbox_id, job_type, status, priority)
VALUES ($1, $2, $3, 'pending', $4)
RETURNING id`, [dispensaryId, sandboxId, jobType, priority]);
return result.rows[0].id;
}
// Get linked store ID for a dispensary (for using existing scraper)
async function getStoreIdForDispensary(dispensaryId) {
// Check if there's a stores entry linked to this dispensary
const result = await migrate_1.pool.query(`SELECT s.id FROM stores s
JOIN dispensaries d ON d.menu_url = s.dutchie_url OR d.name ILIKE '%' || s.name || '%'
WHERE d.id = $1
LIMIT 1`, [dispensaryId]);
if (result.rows.length > 0) {
return result.rows[0].id;
}
// Try to find by website
const result2 = await migrate_1.pool.query(`SELECT s.id FROM stores s
JOIN dispensaries d ON d.website ILIKE '%' || s.slug || '%'
WHERE d.id = $1
LIMIT 1`, [dispensaryId]);
return result2.rows[0]?.id || null;
}
// ========================================
// Job 1: Detect Menu Provider
// ========================================
async function runDetectMenuProviderJob(dispensaryId) {
logger_1.logger.info('crawler-jobs', `Starting menu provider detection for dispensary ${dispensaryId}`);
const dispensary = await getDispensary(dispensaryId);
if (!dispensary) {
return { success: false, message: `Dispensary ${dispensaryId} not found` };
}
// Check for website URL
const websiteUrl = dispensary.website || dispensary.menu_url;
if (!websiteUrl) {
await updateDispensary(dispensaryId, {
crawler_status: 'error_needs_review',
last_menu_error_at: new Date(),
last_error_message: 'No website URL available for detection',
});
return { success: false, message: 'No website URL available' };
}
try {
// Run detection
const detection = await (0, menu_provider_detector_1.detectMenuProvider)(websiteUrl, {
checkMenuPaths: true,
timeout: 30000,
});
// Update dispensary with results
const updates = {
menu_provider: detection.provider,
menu_provider_confidence: detection.confidence,
provider_detection_data: JSON.stringify({
signals: detection.signals,
urlsTested: detection.urlsTested,
menuEntryPoints: detection.menuEntryPoints,
rawSignals: detection.rawSignals,
detectedAt: new Date().toISOString(),
}),
crawler_status: 'idle',
};
// Decide crawler mode based on provider
if (detection.provider === 'dutchie' && detection.confidence >= 70) {
// Dutchie with high confidence -> production
updates.crawler_mode = 'production';
logger_1.logger.info('crawler-jobs', `Dispensary ${dispensaryId} detected as Dutchie (${detection.confidence}%), setting to production`);
}
else {
// Unknown or non-Dutchie -> sandbox
updates.crawler_mode = 'sandbox';
// Create sandbox entry for further analysis
const sandboxId = await createSandboxEntry(dispensaryId, detection.provider, 'detection', {
signals: detection.signals,
rawSignals: detection.rawSignals,
});
// Queue sandbox crawl job
await createSandboxJob(dispensaryId, sandboxId, 'detection');
logger_1.logger.info('crawler-jobs', `Dispensary ${dispensaryId} detected as ${detection.provider} (${detection.confidence}%), setting to sandbox`);
}
// Update menu entry points if found
if (detection.menuEntryPoints.length > 0 && !dispensary.menu_url) {
updates.menu_url = detection.menuEntryPoints[0];
}
await updateDispensary(dispensaryId, updates);
return {
success: true,
message: `Detected provider: ${detection.provider} (${detection.confidence}%)`,
data: {
provider: detection.provider,
confidence: detection.confidence,
mode: updates.crawler_mode,
menuEntryPoints: detection.menuEntryPoints,
},
};
}
catch (error) {
logger_1.logger.error('crawler-jobs', `Detection failed for dispensary ${dispensaryId}: ${error.message}`);
await updateDispensary(dispensaryId, {
crawler_status: 'error_needs_review',
last_menu_error_at: new Date(),
last_error_message: `Detection failed: ${error.message}`,
});
return { success: false, message: error.message };
}
}
// ========================================
// Job 2: Dutchie Menu Crawl (Production)
// ========================================
async function runDutchieMenuCrawlJob(dispensaryId) {
logger_1.logger.info('crawler-jobs', `Starting Dutchie production crawl for dispensary ${dispensaryId}`);
const dispensary = await getDispensary(dispensaryId);
if (!dispensary) {
return { success: false, message: `Dispensary ${dispensaryId} not found` };
}
// Verify it's a Dutchie production dispensary
if (dispensary.menu_provider !== 'dutchie') {
logger_1.logger.warn('crawler-jobs', `Dispensary ${dispensaryId} is not Dutchie, skipping production crawl`);
return { success: false, message: 'Not a Dutchie dispensary' };
}
if (dispensary.crawler_mode !== 'production') {
logger_1.logger.warn('crawler-jobs', `Dispensary ${dispensaryId} is not in production mode, skipping`);
return { success: false, message: 'Not in production mode' };
}
// Find linked store ID
const storeId = await getStoreIdForDispensary(dispensaryId);
if (!storeId) {
// Need to create a store entry or handle differently
logger_1.logger.warn('crawler-jobs', `No linked store found for dispensary ${dispensaryId}`);
return { success: false, message: 'No linked store found - needs setup' };
}
try {
// Update status to running
await updateDispensary(dispensaryId, { crawler_status: 'running' });
// Run the existing Dutchie scraper
await (0, scraper_v2_1.scrapeStore)(storeId, 3); // 3 parallel workers
// Update success status
await updateDispensary(dispensaryId, {
crawler_status: 'ok',
last_menu_scrape: new Date(),
menu_scrape_status: 'active',
});
logger_1.logger.info('crawler-jobs', `Dutchie crawl completed for dispensary ${dispensaryId}`);
return {
success: true,
message: 'Dutchie crawl completed successfully',
data: { storeId },
};
}
catch (error) {
logger_1.logger.error('crawler-jobs', `Dutchie crawl failed for dispensary ${dispensaryId}: ${error.message}`);
// Check if this might be a provider change
let providerChanged = false;
try {
const browser = await puppeteer_1.default.launch({ headless: true, args: ['--no-sandbox'] });
const page = await browser.newPage();
const url = dispensary.menu_url || dispensary.website;
if (url) {
await page.goto(url, { waitUntil: 'networkidle2', timeout: 30000 });
const changeResult = await (0, menu_provider_detector_1.detectProviderChange)(page, 'dutchie');
providerChanged = changeResult.changed;
if (providerChanged) {
// Provider changed - move to sandbox
await updateDispensary(dispensaryId, {
crawler_mode: 'sandbox',
crawler_status: 'error_needs_review',
last_menu_error_at: new Date(),
last_error_message: `Provider appears to have changed from Dutchie to ${changeResult.newProvider}`,
});
const sandboxId = await createSandboxEntry(dispensaryId, changeResult.newProvider || 'unknown', 'detection', { providerChangeDetected: true, previousProvider: 'dutchie' });
await createSandboxJob(dispensaryId, sandboxId, 'detection');
logger_1.logger.warn('crawler-jobs', `Provider change detected for dispensary ${dispensaryId}: Dutchie -> ${changeResult.newProvider}`);
}
}
await browser.close();
}
catch {
// Ignore detection errors during failure handling
}
if (!providerChanged) {
await updateDispensary(dispensaryId, {
crawler_status: 'error_needs_review',
last_menu_error_at: new Date(),
last_error_message: error.message,
});
}
return { success: false, message: error.message };
}
}
// ========================================
// Job 3: Sandbox Crawl (Learning Mode)
// ========================================
async function runSandboxCrawlJob(dispensaryId, sandboxId) {
logger_1.logger.info('crawler-jobs', `Starting sandbox crawl for dispensary ${dispensaryId}`);
const dispensary = await getDispensary(dispensaryId);
if (!dispensary) {
return { success: false, message: `Dispensary ${dispensaryId} not found` };
}
// Get or create sandbox entry
let sandbox;
if (sandboxId) {
const result = await migrate_1.pool.query('SELECT * FROM crawler_sandboxes WHERE id = $1', [sandboxId]);
sandbox = result.rows[0];
}
else {
const result = await migrate_1.pool.query(`SELECT * FROM crawler_sandboxes
WHERE dispensary_id = $1 AND status NOT IN ('moved_to_production', 'failed')
ORDER BY created_at DESC LIMIT 1`, [dispensaryId]);
sandbox = result.rows[0];
if (!sandbox) {
const newSandboxId = await createSandboxEntry(dispensaryId, dispensary.menu_provider, 'template_learning');
const result = await migrate_1.pool.query('SELECT * FROM crawler_sandboxes WHERE id = $1', [newSandboxId]);
sandbox = result.rows[0];
}
}
const websiteUrl = dispensary.menu_url || dispensary.website;
if (!websiteUrl) {
await migrate_1.pool.query(`UPDATE crawler_sandboxes SET status = 'failed', failure_reason = 'No website URL' WHERE id = $1`, [sandbox.id]);
return { success: false, message: 'No website URL available' };
}
let browser = null;
try {
// Update status
await migrate_1.pool.query(`UPDATE crawler_sandboxes SET status = 'analyzing', updated_at = NOW() WHERE id = $1`, [sandbox.id]);
await updateDispensary(dispensaryId, { crawler_status: 'running' });
// Launch browser
browser = await puppeteer_1.default.launch({
headless: true,
args: ['--no-sandbox', '--disable-setuid-sandbox'],
});
const page = await browser.newPage();
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36');
// URLs to crawl (limited depth for sandbox)
const urlsToVisit = [websiteUrl];
const menuPaths = ['/menu', '/shop', '/products', '/order'];
for (const path of menuPaths) {
const baseUrl = new URL(websiteUrl).origin;
urlsToVisit.push(`${baseUrl}${path}`);
}
const urlsTested = [];
const menuEntryPoints = [];
const capturedHtml = [];
const analysisData = {
provider_signals: {},
selector_candidates: [],
page_structures: [],
};
// Crawl each URL
for (const url of urlsToVisit) {
try {
urlsTested.push(url);
await page.goto(url, { waitUntil: 'networkidle2', timeout: 30000 });
await new Promise(r => setTimeout(r, 2000)); // Wait for dynamic content
// Get page HTML
const html = await page.content();
// Check if this looks like a menu page
const hasMenuContent = await page.evaluate(() => {
const text = document.body.innerText.toLowerCase();
return (text.includes('add to cart') ||
text.includes('thc') ||
text.includes('indica') ||
text.includes('sativa'));
});
if (hasMenuContent) {
menuEntryPoints.push(url);
capturedHtml.push({ url, html });
// Analyze page structure for selector candidates
const structure = await page.evaluate(() => {
const candidates = [];
// Look for product-like containers
const productSelectors = [
'.product', '.product-card', '.menu-item', '.item-card',
'[data-product]', '[data-item]', '.strain', '.listing',
];
for (const selector of productSelectors) {
const els = document.querySelectorAll(selector);
if (els.length > 3) { // Likely a list
candidates.push({
selector,
count: els.length,
type: 'product_container',
});
}
}
// Look for price patterns
const pricePattern = /\$\d+(\.\d{2})?/;
const textNodes = document.body.innerText;
const priceMatches = textNodes.match(/\$\d+(\.\d{2})?/g);
return {
candidates,
priceCount: priceMatches?.length || 0,
hasAddToCart: textNodes.toLowerCase().includes('add to cart'),
};
});
// Extract availability hints from page content
const availabilityHints = (0, availability_1.extractAvailabilityHints)(html);
analysisData.page_structures.push({
url,
...structure,
availabilityHints,
});
}
}
catch (pageError) {
if (!pageError.message.includes('404')) {
logger_1.logger.warn('crawler-jobs', `Sandbox crawl error for ${url}: ${pageError.message}`);
}
}
}
// Save HTML to storage (local for now, S3 later)
let rawHtmlLocation = null;
if (capturedHtml.length > 0) {
const htmlDir = path_1.default.join(process.cwd(), 'sandbox-data', `dispensary-${dispensaryId}`);
await fs_1.promises.mkdir(htmlDir, { recursive: true });
for (const { url, html } of capturedHtml) {
const filename = `${Date.now()}-${url.replace(/[^a-z0-9]/gi, '_')}.html`;
await fs_1.promises.writeFile(path_1.default.join(htmlDir, filename), html);
}
rawHtmlLocation = htmlDir;
}
// Update sandbox with results
await migrate_1.pool.query(`UPDATE crawler_sandboxes SET
status = $1,
urls_tested = $2,
menu_entry_points = $3,
raw_html_location = $4,
analysis_json = $5,
confidence_score = $6,
analyzed_at = NOW(),
updated_at = NOW()
WHERE id = $7`, [
menuEntryPoints.length > 0 ? 'needs_human_review' : 'pending',
JSON.stringify(urlsTested),
JSON.stringify(menuEntryPoints),
rawHtmlLocation,
JSON.stringify(analysisData),
menuEntryPoints.length > 0 ? 50 : 20,
sandbox.id,
]);
// Update dispensary status
await updateDispensary(dispensaryId, {
crawler_status: 'error_needs_review', // Sandbox results need review
});
logger_1.logger.info('crawler-jobs', `Sandbox crawl completed for dispensary ${dispensaryId}: ${menuEntryPoints.length} menu pages found`);
return {
success: true,
message: `Sandbox crawl completed. Found ${menuEntryPoints.length} menu entry points.`,
data: {
sandboxId: sandbox.id,
urlsTested: urlsTested.length,
menuEntryPoints,
analysisData,
},
};
}
catch (error) {
logger_1.logger.error('crawler-jobs', `Sandbox crawl failed for dispensary ${dispensaryId}: ${error.message}`);
await migrate_1.pool.query(`UPDATE crawler_sandboxes SET status = 'failed', failure_reason = $1 WHERE id = $2`, [error.message, sandbox.id]);
await updateDispensary(dispensaryId, {
crawler_status: 'error_needs_review',
last_menu_error_at: new Date(),
last_error_message: `Sandbox crawl failed: ${error.message}`,
});
return { success: false, message: error.message };
}
finally {
if (browser) {
await browser.close();
}
}
}
// ========================================
// Queue Processing Functions
// ========================================
/**
* Process pending sandbox jobs
*/
async function processSandboxJobs(limit = 5) {
// Claim pending jobs
const jobs = await migrate_1.pool.query(`UPDATE sandbox_crawl_jobs
SET status = 'running', worker_id = $1, started_at = NOW()
WHERE id IN (
SELECT id FROM sandbox_crawl_jobs
WHERE status = 'pending' AND scheduled_at <= NOW()
ORDER BY priority DESC, scheduled_at ASC
LIMIT $2
FOR UPDATE SKIP LOCKED
)
RETURNING *`, [WORKER_ID, limit]);
for (const job of jobs.rows) {
try {
let result;
if (job.job_type === 'detection') {
result = await runDetectMenuProviderJob(job.dispensary_id);
}
else {
result = await runSandboxCrawlJob(job.dispensary_id, job.sandbox_id);
}
await migrate_1.pool.query(`UPDATE sandbox_crawl_jobs
SET status = $1, completed_at = NOW(), result_summary = $2, error_message = $3
WHERE id = $4`, [
result.success ? 'completed' : 'failed',
JSON.stringify(result.data || {}),
result.success ? null : result.message,
job.id,
]);
}
catch (error) {
await migrate_1.pool.query(`UPDATE sandbox_crawl_jobs SET status = 'failed', error_message = $1 WHERE id = $2`, [error.message, job.id]);
}
}
}