- Add backend stale process monitoring API (/api/stale-processes) - Add users management route - Add frontend landing page and stale process monitor UI on /scraper-tools - Move old development scripts to backend/archive/ - Update frontend build with new features 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
395 lines
18 KiB
JavaScript
395 lines
18 KiB
JavaScript
"use strict";
|
|
/**
|
|
* Dispensary Crawl Orchestrator
|
|
*
|
|
* Orchestrates the complete crawl workflow for a dispensary:
|
|
* 1. Load dispensary data
|
|
* 2. Check if provider detection is needed
|
|
* 3. Run provider detection if needed
|
|
* 4. Queue appropriate crawl jobs based on provider/mode
|
|
* 5. Update dispensary_crawl_schedule with meaningful status
|
|
*
|
|
* This works DIRECTLY with dispensaries (not through stores table).
|
|
*/
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.runDispensaryOrchestrator = runDispensaryOrchestrator;
|
|
exports.runBatchDispensaryOrchestrator = runBatchDispensaryOrchestrator;
|
|
exports.getDispensariesDueForOrchestration = getDispensariesDueForOrchestration;
|
|
exports.ensureAllDispensariesHaveSchedules = ensureAllDispensariesHaveSchedules;
|
|
exports.processDispensaryScheduler = processDispensaryScheduler;
|
|
const uuid_1 = require("uuid");
|
|
const migrate_1 = require("../db/migrate");
|
|
const crawler_logger_1 = require("./crawler-logger");
|
|
const intelligence_detector_1 = require("./intelligence-detector");
|
|
const category_crawler_jobs_1 = require("./category-crawler-jobs");
|
|
// ========================================
|
|
// Main Orchestrator Function
|
|
// ========================================
|
|
/**
|
|
* Run the complete crawl orchestration for a dispensary
|
|
*
|
|
* Behavior:
|
|
* 1. Load the dispensary info
|
|
* 2. If product_provider is missing or stale (>7 days), run detection
|
|
* 3. After detection:
|
|
* - If product_provider = 'dutchie' and product_crawler_mode = 'production': Run production crawl
|
|
* - Otherwise: Run sandbox crawl
|
|
* 4. Update dispensary_crawl_schedule with status/summary
|
|
*/
|
|
async function runDispensaryOrchestrator(dispensaryId, scheduleId) {
|
|
const startTime = Date.now();
|
|
const runId = (0, uuid_1.v4)();
|
|
let result = {
|
|
status: 'pending',
|
|
summary: '',
|
|
runId,
|
|
dispensaryId,
|
|
dispensaryName: '',
|
|
detectionRan: false,
|
|
crawlRan: false,
|
|
durationMs: 0,
|
|
};
|
|
try {
|
|
// Mark schedule as running
|
|
await updateScheduleStatus(dispensaryId, 'running', 'Starting orchestrator...', null, runId);
|
|
// 1. Load dispensary info
|
|
const dispensary = await getDispensaryInfo(dispensaryId);
|
|
if (!dispensary) {
|
|
throw new Error(`Dispensary ${dispensaryId} not found`);
|
|
}
|
|
result.dispensaryName = dispensary.name;
|
|
// 2. Check if provider detection is needed
|
|
const needsDetection = await checkNeedsDetection(dispensary);
|
|
if (needsDetection) {
|
|
// Run provider detection
|
|
const websiteUrl = dispensary.menu_url || dispensary.website;
|
|
if (!websiteUrl) {
|
|
result.status = 'error';
|
|
result.summary = 'No website URL available for detection';
|
|
result.error = 'Dispensary has no menu_url or website configured';
|
|
await updateScheduleStatus(dispensaryId, 'error', result.summary, result.error, runId);
|
|
result.durationMs = Date.now() - startTime;
|
|
await createJobRecord(dispensaryId, scheduleId, result);
|
|
return result;
|
|
}
|
|
await updateScheduleStatus(dispensaryId, 'running', 'Running provider detection...', null, runId);
|
|
const detectionResult = await (0, intelligence_detector_1.detectMultiCategoryProviders)(websiteUrl);
|
|
result.detectionRan = true;
|
|
result.detectionResult = detectionResult;
|
|
// Save detection results to dispensary
|
|
await (0, intelligence_detector_1.updateAllCategoryProviders)(dispensaryId, detectionResult);
|
|
crawler_logger_1.crawlerLogger.providerDetected({
|
|
dispensary_id: dispensaryId,
|
|
dispensary_name: dispensary.name,
|
|
detected_provider: detectionResult.product.provider,
|
|
confidence: detectionResult.product.confidence,
|
|
detection_method: 'dispensary_orchestrator',
|
|
menu_url: websiteUrl,
|
|
category: 'product',
|
|
});
|
|
// Refresh dispensary info after detection
|
|
const updatedDispensary = await getDispensaryInfo(dispensaryId);
|
|
if (updatedDispensary) {
|
|
Object.assign(dispensary, updatedDispensary);
|
|
}
|
|
}
|
|
// 3. Determine crawl type and run
|
|
// Use product_provider if available, otherwise fall back to menu_type
|
|
const provider = dispensary.product_provider || dispensary.menu_type;
|
|
const mode = dispensary.product_crawler_mode;
|
|
// Run production Dutchie crawl if:
|
|
// 1. product_provider is 'dutchie' with production mode, OR
|
|
// 2. menu_type is 'dutchie' with platform_dispensary_id (known Dutchie store)
|
|
const isDutchieProduction = (provider === 'dutchie' && mode === 'production') ||
|
|
(dispensary.menu_type === 'dutchie' && dispensary.platform_dispensary_id);
|
|
if (isDutchieProduction) {
|
|
// Production Dutchie crawl
|
|
await updateScheduleStatus(dispensaryId, 'running', 'Running Dutchie production crawl...', null, runId);
|
|
try {
|
|
// Run the category-specific crawl job
|
|
const crawlResult = await (0, category_crawler_jobs_1.runCrawlProductsJob)(dispensaryId);
|
|
result.crawlRan = true;
|
|
result.crawlType = 'production';
|
|
if (crawlResult.success) {
|
|
result.productsFound = crawlResult.data?.productsFound || 0;
|
|
const detectionPart = result.detectionRan ? 'Detection + ' : '';
|
|
result.summary = `${detectionPart}Dutchie products crawl completed`;
|
|
result.status = 'success';
|
|
crawler_logger_1.crawlerLogger.jobCompleted({
|
|
job_id: 0,
|
|
store_id: 0,
|
|
store_name: dispensary.name,
|
|
duration_ms: Date.now() - startTime,
|
|
products_found: result.productsFound || 0,
|
|
products_new: 0,
|
|
products_updated: 0,
|
|
provider: 'dutchie',
|
|
});
|
|
}
|
|
else {
|
|
result.status = 'error';
|
|
result.error = crawlResult.message;
|
|
result.summary = `Dutchie crawl failed: ${crawlResult.message.slice(0, 100)}`;
|
|
}
|
|
}
|
|
catch (crawlError) {
|
|
result.status = 'error';
|
|
result.error = crawlError.message;
|
|
result.summary = `Dutchie crawl failed: ${crawlError.message.slice(0, 100)}`;
|
|
result.crawlRan = true;
|
|
result.crawlType = 'production';
|
|
crawler_logger_1.crawlerLogger.jobFailed({
|
|
job_id: 0,
|
|
store_id: 0,
|
|
store_name: dispensary.name,
|
|
duration_ms: Date.now() - startTime,
|
|
error_message: crawlError.message,
|
|
provider: 'dutchie',
|
|
});
|
|
}
|
|
}
|
|
else if (provider && provider !== 'unknown') {
|
|
// Sandbox crawl for non-Dutchie or sandbox mode
|
|
await updateScheduleStatus(dispensaryId, 'running', `Running ${provider} sandbox crawl...`, null, runId);
|
|
try {
|
|
const sandboxResult = await (0, category_crawler_jobs_1.runSandboxProductsJob)(dispensaryId);
|
|
result.crawlRan = true;
|
|
result.crawlType = 'sandbox';
|
|
result.productsFound = sandboxResult.data?.productsExtracted || 0;
|
|
const detectionPart = result.detectionRan ? 'Detection + ' : '';
|
|
if (sandboxResult.success) {
|
|
result.summary = `${detectionPart}${provider} sandbox crawl (${result.productsFound} items, quality ${sandboxResult.data?.qualityScore || 0}%)`;
|
|
result.status = 'sandbox_only';
|
|
}
|
|
else {
|
|
result.summary = `${detectionPart}${provider} sandbox failed: ${sandboxResult.message}`;
|
|
result.status = 'error';
|
|
result.error = sandboxResult.message;
|
|
}
|
|
}
|
|
catch (sandboxError) {
|
|
result.status = 'error';
|
|
result.error = sandboxError.message;
|
|
result.summary = `Sandbox crawl failed: ${sandboxError.message.slice(0, 100)}`;
|
|
result.crawlRan = true;
|
|
result.crawlType = 'sandbox';
|
|
}
|
|
}
|
|
else {
|
|
// No provider detected - detection only
|
|
if (result.detectionRan) {
|
|
result.summary = `Detection complete: provider=${dispensary.product_provider || 'unknown'}, confidence=${dispensary.product_confidence || 0}%`;
|
|
result.status = 'detection_only';
|
|
}
|
|
else {
|
|
result.summary = 'No provider detected and no crawl possible';
|
|
result.status = 'error';
|
|
result.error = 'Could not determine menu provider';
|
|
}
|
|
}
|
|
}
|
|
catch (error) {
|
|
result.status = 'error';
|
|
result.error = error.message;
|
|
result.summary = `Orchestrator error: ${error.message.slice(0, 100)}`;
|
|
crawler_logger_1.crawlerLogger.queueFailure({
|
|
queue_type: 'dispensary_orchestrator',
|
|
error_message: error.message,
|
|
});
|
|
}
|
|
result.durationMs = Date.now() - startTime;
|
|
// Update final schedule status
|
|
await updateScheduleStatus(dispensaryId, result.status, result.summary, result.error || null, runId);
|
|
// Create job record
|
|
await createJobRecord(dispensaryId, scheduleId, result);
|
|
return result;
|
|
}
|
|
// ========================================
|
|
// Helper Functions
|
|
// ========================================
|
|
async function getDispensaryInfo(dispensaryId) {
|
|
const result = await migrate_1.pool.query(`SELECT id, name, city, website, menu_url, menu_type, platform_dispensary_id,
|
|
product_provider, product_confidence, product_crawler_mode, last_product_scan_at
|
|
FROM dispensaries
|
|
WHERE id = $1`, [dispensaryId]);
|
|
return result.rows[0] || null;
|
|
}
|
|
async function checkNeedsDetection(dispensary) {
|
|
// If menu_type is already 'dutchie' and we have platform_dispensary_id, skip detection entirely
|
|
// This avoids wasteful detection timeouts for known Dutchie stores
|
|
if (dispensary.menu_type === 'dutchie' && dispensary.platform_dispensary_id) {
|
|
return false;
|
|
}
|
|
// No provider = definitely needs detection
|
|
if (!dispensary.product_provider)
|
|
return true;
|
|
// Unknown provider = needs detection
|
|
if (dispensary.product_provider === 'unknown')
|
|
return true;
|
|
// Low confidence = needs re-detection
|
|
if (dispensary.product_confidence !== null && dispensary.product_confidence < 50)
|
|
return true;
|
|
// Stale detection (> 7 days) = needs refresh
|
|
if (dispensary.last_product_scan_at) {
|
|
const daysSince = (Date.now() - new Date(dispensary.last_product_scan_at).getTime()) / (1000 * 60 * 60 * 24);
|
|
if (daysSince > 7)
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
async function updateScheduleStatus(dispensaryId, status, summary, error, runId) {
|
|
await migrate_1.pool.query(`INSERT INTO dispensary_crawl_schedule (dispensary_id, last_status, last_summary, last_error, last_run_at, updated_at)
|
|
VALUES ($1, $2, $3, $4, NOW(), NOW())
|
|
ON CONFLICT (dispensary_id) DO UPDATE SET
|
|
last_status = $2,
|
|
last_summary = $3,
|
|
last_error = $4,
|
|
last_run_at = NOW(),
|
|
updated_at = NOW()`, [dispensaryId, status, summary, error]);
|
|
}
|
|
async function createJobRecord(dispensaryId, scheduleId, result) {
|
|
await migrate_1.pool.query(`INSERT INTO dispensary_crawl_jobs (
|
|
dispensary_id, schedule_id, job_type, trigger_type, status, priority,
|
|
scheduled_at, started_at, completed_at, duration_ms,
|
|
detection_ran, crawl_ran, crawl_type,
|
|
products_found, products_new, products_updated,
|
|
detected_provider, detected_confidence, detected_mode,
|
|
error_message, run_id
|
|
) VALUES (
|
|
$1, $2, 'orchestrator', 'manual', $3, 100,
|
|
NOW(), NOW(), NOW(), $4,
|
|
$5, $6, $7,
|
|
$8, $9, $10,
|
|
$11, $12, $13,
|
|
$14, $15
|
|
)`, [
|
|
dispensaryId,
|
|
scheduleId || null,
|
|
result.status === 'success' ? 'completed' : result.status === 'error' ? 'failed' : 'completed',
|
|
result.durationMs,
|
|
result.detectionRan,
|
|
result.crawlRan,
|
|
result.crawlType || null,
|
|
result.productsFound || null,
|
|
result.productsNew || null,
|
|
result.productsUpdated || null,
|
|
result.detectionResult?.product.provider || null,
|
|
result.detectionResult?.product.confidence || null,
|
|
result.detectionResult?.product.mode || null,
|
|
result.error || null,
|
|
result.runId,
|
|
]);
|
|
// Update schedule stats
|
|
if (result.status === 'success' || result.status === 'sandbox_only' || result.status === 'detection_only') {
|
|
await migrate_1.pool.query(`UPDATE dispensary_crawl_schedule SET
|
|
total_runs = COALESCE(total_runs, 0) + 1,
|
|
successful_runs = COALESCE(successful_runs, 0) + 1,
|
|
consecutive_failures = 0,
|
|
next_run_at = NOW() + (interval_minutes || ' minutes')::INTERVAL,
|
|
last_duration_ms = $2
|
|
WHERE dispensary_id = $1`, [dispensaryId, result.durationMs]);
|
|
}
|
|
else if (result.status === 'error') {
|
|
await migrate_1.pool.query(`UPDATE dispensary_crawl_schedule SET
|
|
total_runs = COALESCE(total_runs, 0) + 1,
|
|
consecutive_failures = COALESCE(consecutive_failures, 0) + 1,
|
|
next_run_at = NOW() + (interval_minutes || ' minutes')::INTERVAL,
|
|
last_duration_ms = $2
|
|
WHERE dispensary_id = $1`, [dispensaryId, result.durationMs]);
|
|
}
|
|
}
|
|
// ========================================
|
|
// Batch Processing
|
|
// ========================================
|
|
/**
|
|
* Run orchestrator for multiple dispensaries
|
|
*/
|
|
async function runBatchDispensaryOrchestrator(dispensaryIds, concurrency = 3) {
|
|
const results = [];
|
|
// Process in batches
|
|
for (let i = 0; i < dispensaryIds.length; i += concurrency) {
|
|
const batch = dispensaryIds.slice(i, i + concurrency);
|
|
console.log(`Processing batch ${Math.floor(i / concurrency) + 1}: dispensaries ${batch.join(', ')}`);
|
|
const batchResults = await Promise.all(batch.map(id => runDispensaryOrchestrator(id)));
|
|
results.push(...batchResults);
|
|
// Small delay between batches to avoid overwhelming the system
|
|
if (i + concurrency < dispensaryIds.length) {
|
|
await new Promise(r => setTimeout(r, 1000));
|
|
}
|
|
}
|
|
return results;
|
|
}
|
|
/**
|
|
* Get dispensaries that are due for orchestration
|
|
*/
|
|
async function getDispensariesDueForOrchestration(limit = 10) {
|
|
const result = await migrate_1.pool.query(`SELECT d.id
|
|
FROM dispensaries d
|
|
LEFT JOIN dispensary_crawl_schedule dcs ON dcs.dispensary_id = d.id
|
|
WHERE COALESCE(dcs.is_active, TRUE) = TRUE
|
|
AND (
|
|
dcs.next_run_at IS NULL
|
|
OR dcs.next_run_at <= NOW()
|
|
)
|
|
AND (dcs.last_status IS NULL OR dcs.last_status NOT IN ('running', 'pending'))
|
|
ORDER BY COALESCE(dcs.priority, 0) DESC, dcs.last_run_at ASC NULLS FIRST
|
|
LIMIT $1`, [limit]);
|
|
return result.rows.map(row => row.id);
|
|
}
|
|
/**
|
|
* Ensure all dispensaries have schedule entries
|
|
*/
|
|
async function ensureAllDispensariesHaveSchedules(intervalMinutes = 240) {
|
|
// Get all dispensary IDs that don't have a schedule
|
|
const result = await migrate_1.pool.query(`INSERT INTO dispensary_crawl_schedule (dispensary_id, is_active, interval_minutes, priority)
|
|
SELECT d.id, TRUE, $1, 0
|
|
FROM dispensaries d
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM dispensary_crawl_schedule dcs WHERE dcs.dispensary_id = d.id
|
|
)
|
|
RETURNING id`, [intervalMinutes]);
|
|
const existingCount = await migrate_1.pool.query('SELECT COUNT(*) FROM dispensary_crawl_schedule');
|
|
return {
|
|
created: result.rowCount || 0,
|
|
existing: parseInt(existingCount.rows[0].count) - (result.rowCount || 0),
|
|
};
|
|
}
|
|
// ========================================
|
|
// Scheduler Integration
|
|
// ========================================
|
|
let dispensarySchedulerRunning = false;
|
|
/**
|
|
* Process dispensaries using the intelligent orchestrator
|
|
* Called periodically by the scheduler
|
|
*/
|
|
async function processDispensaryScheduler() {
|
|
if (dispensarySchedulerRunning) {
|
|
console.log('Dispensary scheduler already running, skipping...');
|
|
return;
|
|
}
|
|
dispensarySchedulerRunning = true;
|
|
try {
|
|
// Get dispensaries due for orchestration
|
|
const dispensaryIds = await getDispensariesDueForOrchestration(3);
|
|
if (dispensaryIds.length === 0) {
|
|
return;
|
|
}
|
|
console.log(`Dispensary Scheduler: Processing ${dispensaryIds.length} dispensaries due for crawl`);
|
|
// Process each dispensary through the orchestrator
|
|
for (const dispensaryId of dispensaryIds) {
|
|
try {
|
|
console.log(`Dispensary Scheduler: Starting crawl for dispensary ${dispensaryId}`);
|
|
const result = await runDispensaryOrchestrator(dispensaryId);
|
|
console.log(`Dispensary Scheduler: Dispensary ${dispensaryId} completed - ${result.summary}`);
|
|
}
|
|
catch (error) {
|
|
console.error(`Dispensary Scheduler: Dispensary ${dispensaryId} failed - ${error.message}`);
|
|
}
|
|
}
|
|
console.log(`Dispensary Scheduler: Finished processing ${dispensaryIds.length} dispensaries`);
|
|
}
|
|
finally {
|
|
dispensarySchedulerRunning = false;
|
|
}
|
|
}
|