Files
cannaiq/backend/dist/dutchie-az/services/job-queue.js
Kelly 66e07b2009 fix(monitor): remove non-existent worker columns from job_run_logs query
The job_run_logs table tracks scheduled job orchestration, not individual
worker jobs. Worker info (worker_id, worker_hostname) belongs on
dispensary_crawl_jobs, not job_run_logs.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 18:45:05 -07:00

415 lines
16 KiB
JavaScript

"use strict";
/**
* Job Queue Service
*
* DB-backed job queue with claiming/locking for distributed workers.
* Ensures only one worker processes a given store at a time.
*/
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
var desc = Object.getOwnPropertyDescriptor(m, k);
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
desc = { enumerable: true, get: function() { return m[k]; } };
}
Object.defineProperty(o, k2, desc);
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
Object.defineProperty(o, "default", { enumerable: true, value: v });
}) : function(o, v) {
o["default"] = v;
});
var __importStar = (this && this.__importStar) || (function () {
var ownKeys = function(o) {
ownKeys = Object.getOwnPropertyNames || function (o) {
var ar = [];
for (var k in o) if (Object.prototype.hasOwnProperty.call(o, k)) ar[ar.length] = k;
return ar;
};
return ownKeys(o);
};
return function (mod) {
if (mod && mod.__esModule) return mod;
var result = {};
if (mod != null) for (var k = ownKeys(mod), i = 0; i < k.length; i++) if (k[i] !== "default") __createBinding(result, mod, k[i]);
__setModuleDefault(result, mod);
return result;
};
})();
Object.defineProperty(exports, "__esModule", { value: true });
exports.getWorkerId = getWorkerId;
exports.getWorkerHostname = getWorkerHostname;
exports.enqueueJob = enqueueJob;
exports.bulkEnqueueJobs = bulkEnqueueJobs;
exports.claimNextJob = claimNextJob;
exports.updateJobProgress = updateJobProgress;
exports.heartbeat = heartbeat;
exports.completeJob = completeJob;
exports.failJob = failJob;
exports.getQueueStats = getQueueStats;
exports.getActiveWorkers = getActiveWorkers;
exports.getRunningJobs = getRunningJobs;
exports.recoverStaleJobs = recoverStaleJobs;
exports.cleanupOldJobs = cleanupOldJobs;
const connection_1 = require("../db/connection");
const uuid_1 = require("uuid");
const os = __importStar(require("os"));
// ============================================================
// WORKER IDENTITY
// ============================================================
let _workerId = null;
/**
* Get or create a unique worker ID for this process
* In Kubernetes, uses POD_NAME for clarity; otherwise generates a unique ID
*/
function getWorkerId() {
if (!_workerId) {
// Prefer POD_NAME in K8s (set via fieldRef)
const podName = process.env.POD_NAME;
if (podName) {
_workerId = podName;
}
else {
const hostname = os.hostname();
const pid = process.pid;
const uuid = (0, uuid_1.v4)().slice(0, 8);
_workerId = `${hostname}-${pid}-${uuid}`;
}
}
return _workerId;
}
/**
* Get hostname for worker tracking
* In Kubernetes, uses POD_NAME; otherwise uses os.hostname()
*/
function getWorkerHostname() {
return process.env.POD_NAME || os.hostname();
}
// ============================================================
// JOB ENQUEUEING
// ============================================================
/**
* Enqueue a new job for processing
* Returns null if a pending/running job already exists for this dispensary
*/
async function enqueueJob(options) {
const { jobType, dispensaryId, priority = 0, metadata, maxRetries = 3, } = options;
// Check if there's already a pending/running job for this dispensary
if (dispensaryId) {
const { rows: existing } = await (0, connection_1.query)(`SELECT id FROM dispensary_crawl_jobs
WHERE dispensary_id = $1 AND status IN ('pending', 'running')
LIMIT 1`, [dispensaryId]);
if (existing.length > 0) {
console.log(`[JobQueue] Skipping enqueue - job already exists for dispensary ${dispensaryId}`);
return null;
}
}
const { rows } = await (0, connection_1.query)(`INSERT INTO dispensary_crawl_jobs (job_type, dispensary_id, status, priority, max_retries, metadata, created_at)
VALUES ($1, $2, 'pending', $3, $4, $5, NOW())
RETURNING id`, [jobType, dispensaryId || null, priority, maxRetries, metadata ? JSON.stringify(metadata) : null]);
const jobId = rows[0].id;
console.log(`[JobQueue] Enqueued job ${jobId} (type=${jobType}, dispensary=${dispensaryId})`);
return jobId;
}
/**
* Bulk enqueue jobs for multiple dispensaries
* Skips dispensaries that already have pending/running jobs
*/
async function bulkEnqueueJobs(jobType, dispensaryIds, options = {}) {
const { priority = 0, metadata } = options;
// Get dispensaries that already have pending/running jobs
const { rows: existing } = await (0, connection_1.query)(`SELECT DISTINCT dispensary_id FROM dispensary_crawl_jobs
WHERE dispensary_id = ANY($1) AND status IN ('pending', 'running')`, [dispensaryIds]);
const existingSet = new Set(existing.map((r) => r.dispensary_id));
// Filter out dispensaries with existing jobs
const toEnqueue = dispensaryIds.filter(id => !existingSet.has(id));
if (toEnqueue.length === 0) {
return { enqueued: 0, skipped: dispensaryIds.length };
}
// Bulk insert - each row needs 4 params: job_type, dispensary_id, priority, metadata
const metadataJson = metadata ? JSON.stringify(metadata) : null;
const values = toEnqueue.map((_, i) => {
const offset = i * 4;
return `($${offset + 1}, $${offset + 2}, 'pending', $${offset + 3}, 3, $${offset + 4}, NOW())`;
}).join(', ');
const params = [];
toEnqueue.forEach(dispensaryId => {
params.push(jobType, dispensaryId, priority, metadataJson);
});
await (0, connection_1.query)(`INSERT INTO dispensary_crawl_jobs (job_type, dispensary_id, status, priority, max_retries, metadata, created_at)
VALUES ${values}`, params);
console.log(`[JobQueue] Bulk enqueued ${toEnqueue.length} jobs, skipped ${existingSet.size}`);
return { enqueued: toEnqueue.length, skipped: existingSet.size };
}
// ============================================================
// JOB CLAIMING (with locking)
// ============================================================
/**
* Claim the next available job from the queue
* Uses SELECT FOR UPDATE SKIP LOCKED to prevent double-claims
*/
async function claimNextJob(options) {
const { workerId, jobTypes, lockDurationMinutes = 30 } = options;
const hostname = getWorkerHostname();
const client = await (0, connection_1.getClient)();
try {
await client.query('BEGIN');
// Build job type filter
let typeFilter = '';
const params = [workerId, hostname, lockDurationMinutes];
let paramIndex = 4;
if (jobTypes && jobTypes.length > 0) {
typeFilter = `AND job_type = ANY($${paramIndex})`;
params.push(jobTypes);
paramIndex++;
}
// Claim the next pending job using FOR UPDATE SKIP LOCKED
// This atomically selects and locks a row, skipping any already locked by other workers
const { rows } = await client.query(`UPDATE dispensary_crawl_jobs
SET
status = 'running',
claimed_by = $1,
claimed_at = NOW(),
worker_id = $1,
worker_hostname = $2,
started_at = NOW(),
locked_until = NOW() + ($3 || ' minutes')::INTERVAL,
last_heartbeat_at = NOW(),
updated_at = NOW()
WHERE id = (
SELECT id FROM dispensary_crawl_jobs
WHERE status = 'pending'
${typeFilter}
ORDER BY priority DESC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *`, params);
await client.query('COMMIT');
if (rows.length === 0) {
return null;
}
const job = mapDbRowToJob(rows[0]);
console.log(`[JobQueue] Worker ${workerId} claimed job ${job.id} (type=${job.jobType}, dispensary=${job.dispensaryId})`);
return job;
}
catch (error) {
await client.query('ROLLBACK');
throw error;
}
finally {
client.release();
}
}
// ============================================================
// JOB PROGRESS & COMPLETION
// ============================================================
/**
* Update job progress (for live monitoring)
*/
async function updateJobProgress(jobId, progress) {
const updates = ['last_heartbeat_at = NOW()', 'updated_at = NOW()'];
const params = [];
let paramIndex = 1;
if (progress.productsFound !== undefined) {
updates.push(`products_found = $${paramIndex++}`);
params.push(progress.productsFound);
}
if (progress.productsUpserted !== undefined) {
updates.push(`products_upserted = $${paramIndex++}`);
params.push(progress.productsUpserted);
}
if (progress.snapshotsCreated !== undefined) {
updates.push(`snapshots_created = $${paramIndex++}`);
params.push(progress.snapshotsCreated);
}
if (progress.currentPage !== undefined) {
updates.push(`current_page = $${paramIndex++}`);
params.push(progress.currentPage);
}
if (progress.totalPages !== undefined) {
updates.push(`total_pages = $${paramIndex++}`);
params.push(progress.totalPages);
}
params.push(jobId);
await (0, connection_1.query)(`UPDATE dispensary_crawl_jobs SET ${updates.join(', ')} WHERE id = $${paramIndex}`, params);
}
/**
* Send heartbeat to keep job alive (prevents timeout)
*/
async function heartbeat(jobId) {
await (0, connection_1.query)(`UPDATE dispensary_crawl_jobs
SET last_heartbeat_at = NOW(), locked_until = NOW() + INTERVAL '30 minutes'
WHERE id = $1 AND status = 'running'`, [jobId]);
}
/**
* Mark job as completed
*/
async function completeJob(jobId, result) {
await (0, connection_1.query)(`UPDATE dispensary_crawl_jobs
SET
status = 'completed',
completed_at = NOW(),
products_found = COALESCE($2, products_found),
products_upserted = COALESCE($3, products_upserted),
snapshots_created = COALESCE($4, snapshots_created),
updated_at = NOW()
WHERE id = $1`, [jobId, result.productsFound, result.productsUpserted, result.snapshotsCreated]);
console.log(`[JobQueue] Job ${jobId} completed`);
}
/**
* Mark job as failed
*/
async function failJob(jobId, errorMessage) {
// Check if we should retry
const { rows } = await (0, connection_1.query)(`SELECT retry_count, max_retries FROM dispensary_crawl_jobs WHERE id = $1`, [jobId]);
if (rows.length === 0)
return false;
const { retry_count, max_retries } = rows[0];
if (retry_count < max_retries) {
// Re-queue for retry
await (0, connection_1.query)(`UPDATE dispensary_crawl_jobs
SET
status = 'pending',
retry_count = retry_count + 1,
claimed_by = NULL,
claimed_at = NULL,
worker_id = NULL,
worker_hostname = NULL,
started_at = NULL,
locked_until = NULL,
last_heartbeat_at = NULL,
error_message = $2,
updated_at = NOW()
WHERE id = $1`, [jobId, errorMessage]);
console.log(`[JobQueue] Job ${jobId} failed, re-queued for retry (${retry_count + 1}/${max_retries})`);
return true; // Will retry
}
else {
// Mark as failed permanently
await (0, connection_1.query)(`UPDATE dispensary_crawl_jobs
SET
status = 'failed',
completed_at = NOW(),
error_message = $2,
updated_at = NOW()
WHERE id = $1`, [jobId, errorMessage]);
console.log(`[JobQueue] Job ${jobId} failed permanently after ${retry_count} retries`);
return false; // No more retries
}
}
// ============================================================
// QUEUE MONITORING
// ============================================================
/**
* Get queue statistics
*/
async function getQueueStats() {
const { rows } = await (0, connection_1.query)(`SELECT * FROM v_queue_stats`);
const stats = rows[0] || {};
return {
pending: parseInt(stats.pending_jobs || '0', 10),
running: parseInt(stats.running_jobs || '0', 10),
completed1h: parseInt(stats.completed_1h || '0', 10),
failed1h: parseInt(stats.failed_1h || '0', 10),
activeWorkers: parseInt(stats.active_workers || '0', 10),
avgDurationSeconds: stats.avg_duration_seconds ? parseFloat(stats.avg_duration_seconds) : null,
};
}
/**
* Get active workers
*/
async function getActiveWorkers() {
const { rows } = await (0, connection_1.query)(`SELECT * FROM v_active_workers`);
return rows.map((row) => ({
workerId: row.worker_id,
hostname: row.worker_hostname,
currentJobs: parseInt(row.current_jobs || '0', 10),
totalProductsFound: parseInt(row.total_products_found || '0', 10),
totalProductsUpserted: parseInt(row.total_products_upserted || '0', 10),
totalSnapshots: parseInt(row.total_snapshots || '0', 10),
firstClaimedAt: new Date(row.first_claimed_at),
lastHeartbeat: row.last_heartbeat ? new Date(row.last_heartbeat) : null,
}));
}
/**
* Get running jobs with worker info
*/
async function getRunningJobs() {
const { rows } = await (0, connection_1.query)(`SELECT cj.*, d.name as dispensary_name, d.city
FROM dispensary_crawl_jobs cj
LEFT JOIN dispensaries d ON cj.dispensary_id = d.id
WHERE cj.status = 'running'
ORDER BY cj.started_at DESC`);
return rows.map(mapDbRowToJob);
}
/**
* Recover stale jobs (workers that died without completing)
*/
async function recoverStaleJobs(staleMinutes = 15) {
const { rowCount } = await (0, connection_1.query)(`UPDATE dispensary_crawl_jobs
SET
status = 'pending',
claimed_by = NULL,
claimed_at = NULL,
worker_id = NULL,
worker_hostname = NULL,
started_at = NULL,
locked_until = NULL,
error_message = 'Recovered from stale worker',
retry_count = retry_count + 1,
updated_at = NOW()
WHERE status = 'running'
AND last_heartbeat_at < NOW() - ($1 || ' minutes')::INTERVAL
AND retry_count < max_retries`, [staleMinutes]);
if (rowCount && rowCount > 0) {
console.log(`[JobQueue] Recovered ${rowCount} stale jobs`);
}
return rowCount || 0;
}
/**
* Clean up old completed/failed jobs
*/
async function cleanupOldJobs(olderThanDays = 7) {
const { rowCount } = await (0, connection_1.query)(`DELETE FROM dispensary_crawl_jobs
WHERE status IN ('completed', 'failed')
AND completed_at < NOW() - ($1 || ' days')::INTERVAL`, [olderThanDays]);
if (rowCount && rowCount > 0) {
console.log(`[JobQueue] Cleaned up ${rowCount} old jobs`);
}
return rowCount || 0;
}
// ============================================================
// HELPERS
// ============================================================
function mapDbRowToJob(row) {
return {
id: row.id,
jobType: row.job_type,
dispensaryId: row.dispensary_id,
status: row.status,
priority: row.priority || 0,
retryCount: row.retry_count || 0,
maxRetries: row.max_retries || 3,
claimedBy: row.claimed_by,
claimedAt: row.claimed_at ? new Date(row.claimed_at) : null,
workerHostname: row.worker_hostname,
startedAt: row.started_at ? new Date(row.started_at) : null,
completedAt: row.completed_at ? new Date(row.completed_at) : null,
errorMessage: row.error_message,
productsFound: row.products_found || 0,
productsUpserted: row.products_upserted || 0,
snapshotsCreated: row.snapshots_created || 0,
currentPage: row.current_page || 0,
totalPages: row.total_pages,
lastHeartbeatAt: row.last_heartbeat_at ? new Date(row.last_heartbeat_at) : null,
metadata: row.metadata,
createdAt: new Date(row.created_at),
// Add extra fields from join if present
...(row.dispensary_name && { dispensaryName: row.dispensary_name }),
...(row.city && { city: row.city }),
};
}