Files
cannaiq/backend/dist/services/dispensary-orchestrator.js
Kelly d91c55a344 feat: Add stale process monitor, users route, landing page, archive old scripts
- Add backend stale process monitoring API (/api/stale-processes)
- Add users management route
- Add frontend landing page and stale process monitor UI on /scraper-tools
- Move old development scripts to backend/archive/
- Update frontend build with new features

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-05 04:07:31 -07:00

395 lines
18 KiB
JavaScript

"use strict";
/**
* Dispensary Crawl Orchestrator
*
* Orchestrates the complete crawl workflow for a dispensary:
* 1. Load dispensary data
* 2. Check if provider detection is needed
* 3. Run provider detection if needed
* 4. Queue appropriate crawl jobs based on provider/mode
* 5. Update dispensary_crawl_schedule with meaningful status
*
* This works DIRECTLY with dispensaries (not through stores table).
*/
Object.defineProperty(exports, "__esModule", { value: true });
exports.runDispensaryOrchestrator = runDispensaryOrchestrator;
exports.runBatchDispensaryOrchestrator = runBatchDispensaryOrchestrator;
exports.getDispensariesDueForOrchestration = getDispensariesDueForOrchestration;
exports.ensureAllDispensariesHaveSchedules = ensureAllDispensariesHaveSchedules;
exports.processDispensaryScheduler = processDispensaryScheduler;
const uuid_1 = require("uuid");
const migrate_1 = require("../db/migrate");
const crawler_logger_1 = require("./crawler-logger");
const intelligence_detector_1 = require("./intelligence-detector");
const category_crawler_jobs_1 = require("./category-crawler-jobs");
// ========================================
// Main Orchestrator Function
// ========================================
/**
* Run the complete crawl orchestration for a dispensary
*
* Behavior:
* 1. Load the dispensary info
* 2. If product_provider is missing or stale (>7 days), run detection
* 3. After detection:
* - If product_provider = 'dutchie' and product_crawler_mode = 'production': Run production crawl
* - Otherwise: Run sandbox crawl
* 4. Update dispensary_crawl_schedule with status/summary
*/
async function runDispensaryOrchestrator(dispensaryId, scheduleId) {
const startTime = Date.now();
const runId = (0, uuid_1.v4)();
let result = {
status: 'pending',
summary: '',
runId,
dispensaryId,
dispensaryName: '',
detectionRan: false,
crawlRan: false,
durationMs: 0,
};
try {
// Mark schedule as running
await updateScheduleStatus(dispensaryId, 'running', 'Starting orchestrator...', null, runId);
// 1. Load dispensary info
const dispensary = await getDispensaryInfo(dispensaryId);
if (!dispensary) {
throw new Error(`Dispensary ${dispensaryId} not found`);
}
result.dispensaryName = dispensary.name;
// 2. Check if provider detection is needed
const needsDetection = await checkNeedsDetection(dispensary);
if (needsDetection) {
// Run provider detection
const websiteUrl = dispensary.menu_url || dispensary.website;
if (!websiteUrl) {
result.status = 'error';
result.summary = 'No website URL available for detection';
result.error = 'Dispensary has no menu_url or website configured';
await updateScheduleStatus(dispensaryId, 'error', result.summary, result.error, runId);
result.durationMs = Date.now() - startTime;
await createJobRecord(dispensaryId, scheduleId, result);
return result;
}
await updateScheduleStatus(dispensaryId, 'running', 'Running provider detection...', null, runId);
const detectionResult = await (0, intelligence_detector_1.detectMultiCategoryProviders)(websiteUrl);
result.detectionRan = true;
result.detectionResult = detectionResult;
// Save detection results to dispensary
await (0, intelligence_detector_1.updateAllCategoryProviders)(dispensaryId, detectionResult);
crawler_logger_1.crawlerLogger.providerDetected({
dispensary_id: dispensaryId,
dispensary_name: dispensary.name,
detected_provider: detectionResult.product.provider,
confidence: detectionResult.product.confidence,
detection_method: 'dispensary_orchestrator',
menu_url: websiteUrl,
category: 'product',
});
// Refresh dispensary info after detection
const updatedDispensary = await getDispensaryInfo(dispensaryId);
if (updatedDispensary) {
Object.assign(dispensary, updatedDispensary);
}
}
// 3. Determine crawl type and run
// Use product_provider if available, otherwise fall back to menu_type
const provider = dispensary.product_provider || dispensary.menu_type;
const mode = dispensary.product_crawler_mode;
// Run production Dutchie crawl if:
// 1. product_provider is 'dutchie' with production mode, OR
// 2. menu_type is 'dutchie' with platform_dispensary_id (known Dutchie store)
const isDutchieProduction = (provider === 'dutchie' && mode === 'production') ||
(dispensary.menu_type === 'dutchie' && dispensary.platform_dispensary_id);
if (isDutchieProduction) {
// Production Dutchie crawl
await updateScheduleStatus(dispensaryId, 'running', 'Running Dutchie production crawl...', null, runId);
try {
// Run the category-specific crawl job
const crawlResult = await (0, category_crawler_jobs_1.runCrawlProductsJob)(dispensaryId);
result.crawlRan = true;
result.crawlType = 'production';
if (crawlResult.success) {
result.productsFound = crawlResult.data?.productsFound || 0;
const detectionPart = result.detectionRan ? 'Detection + ' : '';
result.summary = `${detectionPart}Dutchie products crawl completed`;
result.status = 'success';
crawler_logger_1.crawlerLogger.jobCompleted({
job_id: 0,
store_id: 0,
store_name: dispensary.name,
duration_ms: Date.now() - startTime,
products_found: result.productsFound || 0,
products_new: 0,
products_updated: 0,
provider: 'dutchie',
});
}
else {
result.status = 'error';
result.error = crawlResult.message;
result.summary = `Dutchie crawl failed: ${crawlResult.message.slice(0, 100)}`;
}
}
catch (crawlError) {
result.status = 'error';
result.error = crawlError.message;
result.summary = `Dutchie crawl failed: ${crawlError.message.slice(0, 100)}`;
result.crawlRan = true;
result.crawlType = 'production';
crawler_logger_1.crawlerLogger.jobFailed({
job_id: 0,
store_id: 0,
store_name: dispensary.name,
duration_ms: Date.now() - startTime,
error_message: crawlError.message,
provider: 'dutchie',
});
}
}
else if (provider && provider !== 'unknown') {
// Sandbox crawl for non-Dutchie or sandbox mode
await updateScheduleStatus(dispensaryId, 'running', `Running ${provider} sandbox crawl...`, null, runId);
try {
const sandboxResult = await (0, category_crawler_jobs_1.runSandboxProductsJob)(dispensaryId);
result.crawlRan = true;
result.crawlType = 'sandbox';
result.productsFound = sandboxResult.data?.productsExtracted || 0;
const detectionPart = result.detectionRan ? 'Detection + ' : '';
if (sandboxResult.success) {
result.summary = `${detectionPart}${provider} sandbox crawl (${result.productsFound} items, quality ${sandboxResult.data?.qualityScore || 0}%)`;
result.status = 'sandbox_only';
}
else {
result.summary = `${detectionPart}${provider} sandbox failed: ${sandboxResult.message}`;
result.status = 'error';
result.error = sandboxResult.message;
}
}
catch (sandboxError) {
result.status = 'error';
result.error = sandboxError.message;
result.summary = `Sandbox crawl failed: ${sandboxError.message.slice(0, 100)}`;
result.crawlRan = true;
result.crawlType = 'sandbox';
}
}
else {
// No provider detected - detection only
if (result.detectionRan) {
result.summary = `Detection complete: provider=${dispensary.product_provider || 'unknown'}, confidence=${dispensary.product_confidence || 0}%`;
result.status = 'detection_only';
}
else {
result.summary = 'No provider detected and no crawl possible';
result.status = 'error';
result.error = 'Could not determine menu provider';
}
}
}
catch (error) {
result.status = 'error';
result.error = error.message;
result.summary = `Orchestrator error: ${error.message.slice(0, 100)}`;
crawler_logger_1.crawlerLogger.queueFailure({
queue_type: 'dispensary_orchestrator',
error_message: error.message,
});
}
result.durationMs = Date.now() - startTime;
// Update final schedule status
await updateScheduleStatus(dispensaryId, result.status, result.summary, result.error || null, runId);
// Create job record
await createJobRecord(dispensaryId, scheduleId, result);
return result;
}
// ========================================
// Helper Functions
// ========================================
async function getDispensaryInfo(dispensaryId) {
const result = await migrate_1.pool.query(`SELECT id, name, city, website, menu_url, menu_type, platform_dispensary_id,
product_provider, product_confidence, product_crawler_mode, last_product_scan_at
FROM dispensaries
WHERE id = $1`, [dispensaryId]);
return result.rows[0] || null;
}
async function checkNeedsDetection(dispensary) {
// If menu_type is already 'dutchie' and we have platform_dispensary_id, skip detection entirely
// This avoids wasteful detection timeouts for known Dutchie stores
if (dispensary.menu_type === 'dutchie' && dispensary.platform_dispensary_id) {
return false;
}
// No provider = definitely needs detection
if (!dispensary.product_provider)
return true;
// Unknown provider = needs detection
if (dispensary.product_provider === 'unknown')
return true;
// Low confidence = needs re-detection
if (dispensary.product_confidence !== null && dispensary.product_confidence < 50)
return true;
// Stale detection (> 7 days) = needs refresh
if (dispensary.last_product_scan_at) {
const daysSince = (Date.now() - new Date(dispensary.last_product_scan_at).getTime()) / (1000 * 60 * 60 * 24);
if (daysSince > 7)
return true;
}
return false;
}
async function updateScheduleStatus(dispensaryId, status, summary, error, runId) {
await migrate_1.pool.query(`INSERT INTO dispensary_crawl_schedule (dispensary_id, last_status, last_summary, last_error, last_run_at, updated_at)
VALUES ($1, $2, $3, $4, NOW(), NOW())
ON CONFLICT (dispensary_id) DO UPDATE SET
last_status = $2,
last_summary = $3,
last_error = $4,
last_run_at = NOW(),
updated_at = NOW()`, [dispensaryId, status, summary, error]);
}
async function createJobRecord(dispensaryId, scheduleId, result) {
await migrate_1.pool.query(`INSERT INTO dispensary_crawl_jobs (
dispensary_id, schedule_id, job_type, trigger_type, status, priority,
scheduled_at, started_at, completed_at, duration_ms,
detection_ran, crawl_ran, crawl_type,
products_found, products_new, products_updated,
detected_provider, detected_confidence, detected_mode,
error_message, run_id
) VALUES (
$1, $2, 'orchestrator', 'manual', $3, 100,
NOW(), NOW(), NOW(), $4,
$5, $6, $7,
$8, $9, $10,
$11, $12, $13,
$14, $15
)`, [
dispensaryId,
scheduleId || null,
result.status === 'success' ? 'completed' : result.status === 'error' ? 'failed' : 'completed',
result.durationMs,
result.detectionRan,
result.crawlRan,
result.crawlType || null,
result.productsFound || null,
result.productsNew || null,
result.productsUpdated || null,
result.detectionResult?.product.provider || null,
result.detectionResult?.product.confidence || null,
result.detectionResult?.product.mode || null,
result.error || null,
result.runId,
]);
// Update schedule stats
if (result.status === 'success' || result.status === 'sandbox_only' || result.status === 'detection_only') {
await migrate_1.pool.query(`UPDATE dispensary_crawl_schedule SET
total_runs = COALESCE(total_runs, 0) + 1,
successful_runs = COALESCE(successful_runs, 0) + 1,
consecutive_failures = 0,
next_run_at = NOW() + (interval_minutes || ' minutes')::INTERVAL,
last_duration_ms = $2
WHERE dispensary_id = $1`, [dispensaryId, result.durationMs]);
}
else if (result.status === 'error') {
await migrate_1.pool.query(`UPDATE dispensary_crawl_schedule SET
total_runs = COALESCE(total_runs, 0) + 1,
consecutive_failures = COALESCE(consecutive_failures, 0) + 1,
next_run_at = NOW() + (interval_minutes || ' minutes')::INTERVAL,
last_duration_ms = $2
WHERE dispensary_id = $1`, [dispensaryId, result.durationMs]);
}
}
// ========================================
// Batch Processing
// ========================================
/**
* Run orchestrator for multiple dispensaries
*/
async function runBatchDispensaryOrchestrator(dispensaryIds, concurrency = 3) {
const results = [];
// Process in batches
for (let i = 0; i < dispensaryIds.length; i += concurrency) {
const batch = dispensaryIds.slice(i, i + concurrency);
console.log(`Processing batch ${Math.floor(i / concurrency) + 1}: dispensaries ${batch.join(', ')}`);
const batchResults = await Promise.all(batch.map(id => runDispensaryOrchestrator(id)));
results.push(...batchResults);
// Small delay between batches to avoid overwhelming the system
if (i + concurrency < dispensaryIds.length) {
await new Promise(r => setTimeout(r, 1000));
}
}
return results;
}
/**
* Get dispensaries that are due for orchestration
*/
async function getDispensariesDueForOrchestration(limit = 10) {
const result = await migrate_1.pool.query(`SELECT d.id
FROM dispensaries d
LEFT JOIN dispensary_crawl_schedule dcs ON dcs.dispensary_id = d.id
WHERE COALESCE(dcs.is_active, TRUE) = TRUE
AND (
dcs.next_run_at IS NULL
OR dcs.next_run_at <= NOW()
)
AND (dcs.last_status IS NULL OR dcs.last_status NOT IN ('running', 'pending'))
ORDER BY COALESCE(dcs.priority, 0) DESC, dcs.last_run_at ASC NULLS FIRST
LIMIT $1`, [limit]);
return result.rows.map(row => row.id);
}
/**
* Ensure all dispensaries have schedule entries
*/
async function ensureAllDispensariesHaveSchedules(intervalMinutes = 240) {
// Get all dispensary IDs that don't have a schedule
const result = await migrate_1.pool.query(`INSERT INTO dispensary_crawl_schedule (dispensary_id, is_active, interval_minutes, priority)
SELECT d.id, TRUE, $1, 0
FROM dispensaries d
WHERE NOT EXISTS (
SELECT 1 FROM dispensary_crawl_schedule dcs WHERE dcs.dispensary_id = d.id
)
RETURNING id`, [intervalMinutes]);
const existingCount = await migrate_1.pool.query('SELECT COUNT(*) FROM dispensary_crawl_schedule');
return {
created: result.rowCount || 0,
existing: parseInt(existingCount.rows[0].count) - (result.rowCount || 0),
};
}
// ========================================
// Scheduler Integration
// ========================================
let dispensarySchedulerRunning = false;
/**
* Process dispensaries using the intelligent orchestrator
* Called periodically by the scheduler
*/
async function processDispensaryScheduler() {
if (dispensarySchedulerRunning) {
console.log('Dispensary scheduler already running, skipping...');
return;
}
dispensarySchedulerRunning = true;
try {
// Get dispensaries due for orchestration
const dispensaryIds = await getDispensariesDueForOrchestration(3);
if (dispensaryIds.length === 0) {
return;
}
console.log(`Dispensary Scheduler: Processing ${dispensaryIds.length} dispensaries due for crawl`);
// Process each dispensary through the orchestrator
for (const dispensaryId of dispensaryIds) {
try {
console.log(`Dispensary Scheduler: Starting crawl for dispensary ${dispensaryId}`);
const result = await runDispensaryOrchestrator(dispensaryId);
console.log(`Dispensary Scheduler: Dispensary ${dispensaryId} completed - ${result.summary}`);
}
catch (error) {
console.error(`Dispensary Scheduler: Dispensary ${dispensaryId} failed - ${error.message}`);
}
}
console.log(`Dispensary Scheduler: Finished processing ${dispensaryIds.length} dispensaries`);
}
finally {
dispensarySchedulerRunning = false;
}
}