Files
cannaiq/backend/dist/dutchie-az/services/scheduler.js
Kelly 66e07b2009 fix(monitor): remove non-existent worker columns from job_run_logs query
The job_run_logs table tracks scheduled job orchestration, not individual
worker jobs. Worker info (worker_id, worker_hostname) belongs on
dispensary_crawl_jobs, not job_run_logs.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 18:45:05 -07:00

596 lines
21 KiB
JavaScript

"use strict";
/**
* Dutchie AZ Scheduler Service
*
* Handles scheduled crawling with JITTER - no fixed intervals!
* Each job re-schedules itself with a NEW random offset after each run.
* This makes timing "wander" around the clock, avoiding detectable patterns.
*
* Jitter Logic:
* nextRunAt = lastRunAt + baseIntervalMinutes + random(-jitterMinutes, +jitterMinutes)
*
* Example: 4-hour base with ±30min jitter = runs anywhere from 3h30m to 4h30m apart
*/
Object.defineProperty(exports, "__esModule", { value: true });
exports.crawlSingleDispensary = void 0;
exports.getAllSchedules = getAllSchedules;
exports.getScheduleById = getScheduleById;
exports.createSchedule = createSchedule;
exports.updateSchedule = updateSchedule;
exports.deleteSchedule = deleteSchedule;
exports.getRunLogs = getRunLogs;
exports.startScheduler = startScheduler;
exports.stopScheduler = stopScheduler;
exports.getSchedulerStatus = getSchedulerStatus;
exports.triggerScheduleNow = triggerScheduleNow;
exports.initializeDefaultSchedules = initializeDefaultSchedules;
exports.triggerImmediateCrawl = triggerImmediateCrawl;
const connection_1 = require("../db/connection");
const menu_detection_1 = require("./menu-detection");
const job_queue_1 = require("./job-queue");
// Scheduler poll interval (how often we check for due jobs)
const SCHEDULER_POLL_INTERVAL_MS = 60 * 1000; // 1 minute
// Track running state
let isSchedulerRunning = false;
let schedulerInterval = null;
// ============================================================
// JITTER CALCULATION
// ============================================================
/**
* Generate a random jitter value in minutes
* Returns a value between -jitterMinutes and +jitterMinutes
*/
function getRandomJitterMinutes(jitterMinutes) {
// random() returns [0, 1), we want [-jitter, +jitter]
return (Math.random() * 2 - 1) * jitterMinutes;
}
/**
* Calculate next run time with jitter
* nextRunAt = baseTime + baseIntervalMinutes + random(-jitter, +jitter)
*/
function calculateNextRunAt(baseTime, baseIntervalMinutes, jitterMinutes) {
const jitter = getRandomJitterMinutes(jitterMinutes);
const totalMinutes = baseIntervalMinutes + jitter;
const totalMs = totalMinutes * 60 * 1000;
return new Date(baseTime.getTime() + totalMs);
}
// ============================================================
// DATABASE OPERATIONS
// ============================================================
/**
* Get all job schedules
*/
async function getAllSchedules() {
const { rows } = await (0, connection_1.query)(`
SELECT
id, job_name, description, enabled,
base_interval_minutes, jitter_minutes,
last_run_at, last_status, last_error_message, last_duration_ms,
next_run_at, job_config, created_at, updated_at
FROM job_schedules
ORDER BY job_name
`);
return rows.map(row => ({
id: row.id,
jobName: row.job_name,
description: row.description,
enabled: row.enabled,
baseIntervalMinutes: row.base_interval_minutes,
jitterMinutes: row.jitter_minutes,
lastRunAt: row.last_run_at,
lastStatus: row.last_status,
lastErrorMessage: row.last_error_message,
lastDurationMs: row.last_duration_ms,
nextRunAt: row.next_run_at,
jobConfig: row.job_config,
createdAt: row.created_at,
updatedAt: row.updated_at,
}));
}
/**
* Get a single schedule by ID
*/
async function getScheduleById(id) {
const { rows } = await (0, connection_1.query)(`SELECT * FROM job_schedules WHERE id = $1`, [id]);
if (rows.length === 0)
return null;
const row = rows[0];
return {
id: row.id,
jobName: row.job_name,
description: row.description,
enabled: row.enabled,
baseIntervalMinutes: row.base_interval_minutes,
jitterMinutes: row.jitter_minutes,
lastRunAt: row.last_run_at,
lastStatus: row.last_status,
lastErrorMessage: row.last_error_message,
lastDurationMs: row.last_duration_ms,
nextRunAt: row.next_run_at,
jobConfig: row.job_config,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}
/**
* Create a new schedule
*/
async function createSchedule(schedule) {
// Calculate initial nextRunAt
const nextRunAt = schedule.startImmediately
? new Date() // Start immediately
: calculateNextRunAt(new Date(), schedule.baseIntervalMinutes, schedule.jitterMinutes);
const { rows } = await (0, connection_1.query)(`
INSERT INTO job_schedules (
job_name, description, enabled,
base_interval_minutes, jitter_minutes,
next_run_at, job_config
) VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *
`, [
schedule.jobName,
schedule.description || null,
schedule.enabled ?? true,
schedule.baseIntervalMinutes,
schedule.jitterMinutes,
nextRunAt,
schedule.jobConfig ? JSON.stringify(schedule.jobConfig) : null,
]);
const row = rows[0];
console.log(`[Scheduler] Created schedule "${schedule.jobName}" - next run at ${nextRunAt.toISOString()}`);
return {
id: row.id,
jobName: row.job_name,
description: row.description,
enabled: row.enabled,
baseIntervalMinutes: row.base_interval_minutes,
jitterMinutes: row.jitter_minutes,
lastRunAt: row.last_run_at,
lastStatus: row.last_status,
lastErrorMessage: row.last_error_message,
lastDurationMs: row.last_duration_ms,
nextRunAt: row.next_run_at,
jobConfig: row.job_config,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}
/**
* Update a schedule
*/
async function updateSchedule(id, updates) {
const setClauses = [];
const params = [];
let paramIndex = 1;
if (updates.description !== undefined) {
setClauses.push(`description = $${paramIndex++}`);
params.push(updates.description);
}
if (updates.enabled !== undefined) {
setClauses.push(`enabled = $${paramIndex++}`);
params.push(updates.enabled);
}
if (updates.baseIntervalMinutes !== undefined) {
setClauses.push(`base_interval_minutes = $${paramIndex++}`);
params.push(updates.baseIntervalMinutes);
}
if (updates.jitterMinutes !== undefined) {
setClauses.push(`jitter_minutes = $${paramIndex++}`);
params.push(updates.jitterMinutes);
}
if (updates.jobConfig !== undefined) {
setClauses.push(`job_config = $${paramIndex++}`);
params.push(JSON.stringify(updates.jobConfig));
}
if (setClauses.length === 0) {
return getScheduleById(id);
}
setClauses.push(`updated_at = NOW()`);
params.push(id);
const { rows } = await (0, connection_1.query)(`UPDATE job_schedules SET ${setClauses.join(', ')} WHERE id = $${paramIndex} RETURNING *`, params);
if (rows.length === 0)
return null;
const row = rows[0];
return {
id: row.id,
jobName: row.job_name,
description: row.description,
enabled: row.enabled,
baseIntervalMinutes: row.base_interval_minutes,
jitterMinutes: row.jitter_minutes,
lastRunAt: row.last_run_at,
lastStatus: row.last_status,
lastErrorMessage: row.last_error_message,
lastDurationMs: row.last_duration_ms,
nextRunAt: row.next_run_at,
jobConfig: row.job_config,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}
/**
* Delete a schedule
*/
async function deleteSchedule(id) {
const result = await (0, connection_1.query)(`DELETE FROM job_schedules WHERE id = $1`, [id]);
return (result.rowCount || 0) > 0;
}
/**
* Mark a schedule as running
*/
async function markScheduleRunning(id) {
await (0, connection_1.query)(`UPDATE job_schedules SET last_status = 'running', updated_at = NOW() WHERE id = $1`, [id]);
}
/**
* Update schedule after job completion with NEW jittered next_run_at
*/
async function updateScheduleAfterRun(id, status, durationMs, errorMessage) {
// Get current schedule to calculate new nextRunAt
const schedule = await getScheduleById(id);
if (!schedule)
return;
const now = new Date();
const newNextRunAt = calculateNextRunAt(now, schedule.baseIntervalMinutes, schedule.jitterMinutes);
console.log(`[Scheduler] Schedule "${schedule.jobName}" completed (${status}). Next run: ${newNextRunAt.toISOString()}`);
await (0, connection_1.query)(`
UPDATE job_schedules SET
last_run_at = $2,
last_status = $3,
last_error_message = $4,
last_duration_ms = $5,
next_run_at = $6,
updated_at = NOW()
WHERE id = $1
`, [id, now, status, errorMessage || null, durationMs, newNextRunAt]);
}
/**
* Create a job run log entry
*/
async function createRunLog(scheduleId, jobName, status) {
const { rows } = await (0, connection_1.query)(`
INSERT INTO job_run_logs (schedule_id, job_name, status, started_at)
VALUES ($1, $2, $3, NOW())
RETURNING id
`, [scheduleId, jobName, status]);
return rows[0].id;
}
/**
* Update a job run log entry
*/
async function updateRunLog(runLogId, status, results) {
await (0, connection_1.query)(`
UPDATE job_run_logs SET
status = $2,
completed_at = NOW(),
duration_ms = $3,
error_message = $4,
items_processed = $5,
items_succeeded = $6,
items_failed = $7,
metadata = $8
WHERE id = $1
`, [
runLogId,
status,
results.durationMs,
results.errorMessage || null,
results.itemsProcessed || 0,
results.itemsSucceeded || 0,
results.itemsFailed || 0,
results.metadata ? JSON.stringify(results.metadata) : null,
]);
}
/**
* Get job run logs
*/
async function getRunLogs(options) {
const { scheduleId, jobName, limit = 50, offset = 0 } = options;
let whereClause = 'WHERE 1=1';
const params = [];
let paramIndex = 1;
if (scheduleId) {
whereClause += ` AND schedule_id = $${paramIndex++}`;
params.push(scheduleId);
}
if (jobName) {
whereClause += ` AND job_name = $${paramIndex++}`;
params.push(jobName);
}
params.push(limit, offset);
const { rows } = await (0, connection_1.query)(`
SELECT * FROM job_run_logs
${whereClause}
ORDER BY created_at DESC
LIMIT $${paramIndex} OFFSET $${paramIndex + 1}
`, params);
const { rows: countRows } = await (0, connection_1.query)(`SELECT COUNT(*) as total FROM job_run_logs ${whereClause}`, params.slice(0, -2));
return {
logs: rows,
total: parseInt(countRows[0]?.total || '0', 10),
};
}
// ============================================================
// JOB EXECUTION
// ============================================================
/**
* Execute a job based on its name
*/
async function executeJob(schedule) {
const config = schedule.jobConfig || {};
switch (schedule.jobName) {
case 'dutchie_az_product_crawl':
return executeProductCrawl(config);
case 'dutchie_az_discovery':
return executeDiscovery(config);
case 'dutchie_az_menu_detection':
return (0, menu_detection_1.executeMenuDetectionJob)(config);
default:
throw new Error(`Unknown job type: ${schedule.jobName}`);
}
}
/**
* Execute the AZ Dutchie product crawl job
*
* NEW BEHAVIOR: Instead of running crawls directly, this now ENQUEUES jobs
* into the crawl_jobs queue. Workers (running as separate replicas) will
* pick up and process these jobs.
*
* This allows:
* - Multiple workers to process jobs in parallel
* - No double-crawls (DB-level locking per dispensary)
* - Better scalability (add more worker replicas)
* - Live monitoring of individual job progress
*/
async function executeProductCrawl(config) {
const pricingType = config.pricingType || 'rec';
const useBothModes = config.useBothModes !== false;
// Get all "ready" dispensaries (menu_type='dutchie' AND platform_dispensary_id IS NOT NULL AND not failed)
// Note: Menu detection is handled separately by the dutchie_az_menu_detection schedule
const { rows: rawRows } = await (0, connection_1.query)(`
SELECT id FROM dispensaries
WHERE state = 'AZ'
AND menu_type = 'dutchie'
AND platform_dispensary_id IS NOT NULL
AND failed_at IS NULL
ORDER BY last_crawl_at ASC NULLS FIRST
`);
const dispensaryIds = rawRows.map((r) => r.id);
if (dispensaryIds.length === 0) {
return {
status: 'success',
itemsProcessed: 0,
itemsSucceeded: 0,
itemsFailed: 0,
metadata: { message: 'No ready dispensaries to crawl. Run menu detection to discover more.' },
};
}
console.log(`[Scheduler] Enqueueing crawl jobs for ${dispensaryIds.length} dispensaries...`);
// Bulk enqueue jobs (skips dispensaries that already have pending/running jobs)
const { enqueued, skipped } = await (0, job_queue_1.bulkEnqueueJobs)('dutchie_product_crawl', dispensaryIds, {
priority: 0,
metadata: { pricingType, useBothModes },
});
console.log(`[Scheduler] Enqueued ${enqueued} jobs, skipped ${skipped} (already queued)`);
// Get current queue stats
const queueStats = await (0, job_queue_1.getQueueStats)();
return {
status: 'success',
itemsProcessed: dispensaryIds.length,
itemsSucceeded: enqueued,
itemsFailed: 0, // Enqueue itself doesn't fail
metadata: {
enqueued,
skipped,
queueStats,
pricingType,
useBothModes,
message: `Enqueued ${enqueued} jobs. Workers will process them. Check /scraper-monitor for progress.`,
},
};
}
/**
* Execute the AZ Dutchie discovery job (placeholder)
*/
async function executeDiscovery(_config) {
// Placeholder - implement discovery logic
return {
status: 'success',
itemsProcessed: 0,
itemsSucceeded: 0,
itemsFailed: 0,
metadata: { message: 'Discovery not yet implemented' },
};
}
// ============================================================
// SCHEDULER RUNNER
// ============================================================
/**
* Check for due jobs and run them
*/
async function checkAndRunDueJobs() {
try {
// Get enabled schedules where nextRunAt <= now
const { rows } = await (0, connection_1.query)(`
SELECT * FROM job_schedules
WHERE enabled = true
AND next_run_at IS NOT NULL
AND next_run_at <= NOW()
AND (last_status IS NULL OR last_status != 'running')
ORDER BY next_run_at ASC
`);
if (rows.length === 0)
return;
console.log(`[Scheduler] Found ${rows.length} due job(s)`);
for (const row of rows) {
const schedule = {
id: row.id,
jobName: row.job_name,
description: row.description,
enabled: row.enabled,
baseIntervalMinutes: row.base_interval_minutes,
jitterMinutes: row.jitter_minutes,
lastRunAt: row.last_run_at,
lastStatus: row.last_status,
lastErrorMessage: row.last_error_message,
lastDurationMs: row.last_duration_ms,
nextRunAt: row.next_run_at,
jobConfig: row.job_config,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
await runScheduledJob(schedule);
}
}
catch (error) {
console.error('[Scheduler] Error checking for due jobs:', error);
}
}
/**
* Run a single scheduled job
*/
async function runScheduledJob(schedule) {
const startTime = Date.now();
console.log(`[Scheduler] Starting job "${schedule.jobName}"...`);
// Mark as running
await markScheduleRunning(schedule.id);
// Create run log entry
const runLogId = await createRunLog(schedule.id, schedule.jobName, 'running');
try {
// Execute the job
const result = await executeJob(schedule);
const durationMs = Date.now() - startTime;
// Determine final status (exclude 'running' and null)
const finalStatus = result.status === 'running' || result.status === null
? 'success'
: result.status;
// Update run log
await updateRunLog(runLogId, finalStatus, {
durationMs,
errorMessage: result.errorMessage,
itemsProcessed: result.itemsProcessed,
itemsSucceeded: result.itemsSucceeded,
itemsFailed: result.itemsFailed,
metadata: result.metadata,
});
// Update schedule with NEW jittered next_run_at
await updateScheduleAfterRun(schedule.id, result.status, durationMs, result.errorMessage);
console.log(`[Scheduler] Job "${schedule.jobName}" completed in ${Math.round(durationMs / 1000)}s (${result.status})`);
}
catch (error) {
const durationMs = Date.now() - startTime;
console.error(`[Scheduler] Job "${schedule.jobName}" failed:`, error.message);
// Update run log with error
await updateRunLog(runLogId, 'error', {
durationMs,
errorMessage: error.message,
itemsProcessed: 0,
itemsSucceeded: 0,
itemsFailed: 0,
});
// Update schedule with NEW jittered next_run_at
await updateScheduleAfterRun(schedule.id, 'error', durationMs, error.message);
}
}
// ============================================================
// PUBLIC API
// ============================================================
/**
* Start the scheduler
*/
function startScheduler() {
if (isSchedulerRunning) {
console.log('[Scheduler] Scheduler is already running');
return;
}
isSchedulerRunning = true;
console.log(`[Scheduler] Starting scheduler (polling every ${SCHEDULER_POLL_INTERVAL_MS / 1000}s)...`);
// Immediately check for due jobs
checkAndRunDueJobs();
// Set up interval to check for due jobs
schedulerInterval = setInterval(checkAndRunDueJobs, SCHEDULER_POLL_INTERVAL_MS);
}
/**
* Stop the scheduler
*/
function stopScheduler() {
if (!isSchedulerRunning) {
console.log('[Scheduler] Scheduler is not running');
return;
}
isSchedulerRunning = false;
if (schedulerInterval) {
clearInterval(schedulerInterval);
schedulerInterval = null;
}
console.log('[Scheduler] Scheduler stopped');
}
/**
* Get scheduler status
*/
function getSchedulerStatus() {
return {
running: isSchedulerRunning,
pollIntervalMs: SCHEDULER_POLL_INTERVAL_MS,
};
}
/**
* Trigger immediate execution of a schedule
*/
async function triggerScheduleNow(scheduleId) {
const schedule = await getScheduleById(scheduleId);
if (!schedule) {
return { success: false, message: 'Schedule not found' };
}
if (schedule.lastStatus === 'running') {
return { success: false, message: 'Job is already running' };
}
// Run the job
await runScheduledJob(schedule);
return { success: true, message: 'Job triggered successfully' };
}
/**
* Initialize default schedules if they don't exist
*/
async function initializeDefaultSchedules() {
const schedules = await getAllSchedules();
// Check if product crawl schedule exists
const productCrawlExists = schedules.some(s => s.jobName === 'dutchie_az_product_crawl');
if (!productCrawlExists) {
await createSchedule({
jobName: 'dutchie_az_product_crawl',
description: 'Crawl all AZ Dutchie dispensary products',
enabled: true,
baseIntervalMinutes: 240, // 4 hours
jitterMinutes: 30, // ±30 minutes
jobConfig: { pricingType: 'rec', useBothModes: true },
startImmediately: false,
});
console.log('[Scheduler] Created default product crawl schedule');
}
// Check if menu detection schedule exists
const menuDetectionExists = schedules.some(s => s.jobName === 'dutchie_az_menu_detection');
if (!menuDetectionExists) {
await createSchedule({
jobName: 'dutchie_az_menu_detection',
description: 'Detect menu providers and resolve platform IDs for AZ dispensaries',
enabled: true,
baseIntervalMinutes: 1440, // 24 hours
jitterMinutes: 60, // ±1 hour
jobConfig: { state: 'AZ', onlyUnknown: true },
startImmediately: false,
});
console.log('[Scheduler] Created default menu detection schedule');
}
}
// Re-export for backward compatibility
var product_crawler_1 = require("./product-crawler");
Object.defineProperty(exports, "crawlSingleDispensary", { enumerable: true, get: function () { return product_crawler_1.crawlDispensaryProducts; } });
async function triggerImmediateCrawl() {
const schedules = await getAllSchedules();
const productCrawl = schedules.find(s => s.jobName === 'dutchie_az_product_crawl');
if (productCrawl) {
return triggerScheduleNow(productCrawl.id);
}
return { success: false, message: 'Product crawl schedule not found' };
}