Files
cannaiq/backend/src/system/services/sync-orchestrator.ts
Kelly 2f483b3084 feat: SEO template library, discovery pipeline, and orchestrator enhancements
## SEO Template Library
- Add complete template library with 7 page types (state, city, category, brand, product, search, regeneration)
- Add Template Library tab in SEO Orchestrator with accordion-based editors
- Add template preview, validation, and variable injection engine
- Add API endpoints: /api/seo/templates, preview, validate, generate, regenerate

## Discovery Pipeline
- Add promotion.ts for discovery location validation and promotion
- Add discover-all-states.ts script for multi-state discovery
- Add promotion log migration (067)
- Enhance discovery routes and types

## Orchestrator & Admin
- Add crawl_enabled filter to stores page
- Add API permissions page
- Add job queue management
- Add price analytics routes
- Add markets and intelligence routes
- Enhance dashboard and worker monitoring

## Infrastructure
- Add migrations for worker definitions, SEO settings, field alignment
- Add canonical pipeline for scraper v2
- Update hydration and sync orchestrator
- Enhance multi-state query service

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-09 00:05:34 -07:00

959 lines
29 KiB
TypeScript

/**
* Production Sync Orchestrator
*
* Central controller responsible for:
* - Detecting new raw payloads
* - Running hydration jobs
* - Verifying upserts
* - Calculating diffs (before/after snapshot change detection)
* - Triggering analytics pre-compute updates
* - Scheduling catch-up runs
* - Ensuring no double hydration runs (distributed lock)
*
* Phase 5: Full Production Sync + Monitoring
*/
import { Pool } from 'pg';
import { MetricsService } from './metrics';
import { DLQService } from './dlq';
import { AlertService } from './alerts';
import { DutchieNormalizer, hydrateToCanonical } from '../../hydration';
export type OrchestratorStatus = 'RUNNING' | 'SLEEPING' | 'LOCKED' | 'PAUSED' | 'ERROR';
export interface OrchestratorConfig {
batchSize: number;
pollIntervalMs: number;
maxRetries: number;
lockTimeoutMs: number;
enableAnalyticsPrecompute: boolean;
enableIntegrityChecks: boolean;
}
export interface SyncRunMetrics {
payloadsQueued: number;
payloadsProcessed: number;
payloadsSkipped: number;
payloadsFailed: number;
payloadsDlq: number;
productsUpserted: number;
productsInserted: number;
productsUpdated: number;
productsDiscontinued: number;
snapshotsCreated: number;
}
export interface SyncStatus {
orchestratorStatus: OrchestratorStatus;
currentWorkerId: string | null;
lastHeartbeatAt: Date | null;
isPaused: boolean;
pauseReason: string | null;
consecutiveFailures: number;
lastRunStartedAt: Date | null;
lastRunCompletedAt: Date | null;
lastRunDurationMs: number | null;
lastRunPayloadsProcessed: number;
lastRunErrors: number;
config: OrchestratorConfig;
unprocessedPayloads: number;
dlqPending: number;
activeAlerts: number;
runs24h: {
total: number;
completed: number;
failed: number;
};
}
export interface QueueDepth {
unprocessed: number;
byState: Record<string, number>;
byPlatform: Record<string, number>;
oldestPayloadAge: number | null; // milliseconds
estimatedProcessingTime: number | null; // milliseconds
}
const DEFAULT_CONFIG: OrchestratorConfig = {
batchSize: 50,
pollIntervalMs: 5000,
maxRetries: 3,
lockTimeoutMs: 300000, // 5 minutes
enableAnalyticsPrecompute: true,
enableIntegrityChecks: true,
};
export class SyncOrchestrator {
private pool: Pool;
private metrics: MetricsService;
private dlq: DLQService;
private alerts: AlertService;
private workerId: string;
private isRunning: boolean = false;
private pollInterval: NodeJS.Timeout | null = null;
private normalizer: DutchieNormalizer;
constructor(
pool: Pool,
metrics: MetricsService,
dlq: DLQService,
alerts: AlertService,
workerId?: string
) {
this.pool = pool;
this.metrics = metrics;
this.dlq = dlq;
this.alerts = alerts;
this.workerId = workerId || `orchestrator-${process.env.HOSTNAME || process.pid}`;
this.normalizer = new DutchieNormalizer();
}
/**
* Get current sync status
*/
async getStatus(): Promise<SyncStatus> {
const result = await this.pool.query(`SELECT * FROM v_sync_status`);
if (result.rows.length === 0) {
return {
orchestratorStatus: 'SLEEPING',
currentWorkerId: null,
lastHeartbeatAt: null,
isPaused: false,
pauseReason: null,
consecutiveFailures: 0,
lastRunStartedAt: null,
lastRunCompletedAt: null,
lastRunDurationMs: null,
lastRunPayloadsProcessed: 0,
lastRunErrors: 0,
config: DEFAULT_CONFIG,
unprocessedPayloads: 0,
dlqPending: 0,
activeAlerts: 0,
runs24h: { total: 0, completed: 0, failed: 0 },
};
}
const row = result.rows[0];
return {
orchestratorStatus: row.orchestrator_status as OrchestratorStatus,
currentWorkerId: row.current_worker_id,
lastHeartbeatAt: row.last_heartbeat_at,
isPaused: row.is_paused,
pauseReason: row.pause_reason,
consecutiveFailures: row.consecutive_failures,
lastRunStartedAt: row.last_run_started_at,
lastRunCompletedAt: row.last_run_completed_at,
lastRunDurationMs: row.last_run_duration_ms,
lastRunPayloadsProcessed: row.last_run_payloads_processed,
lastRunErrors: row.last_run_errors,
config: row.config || DEFAULT_CONFIG,
unprocessedPayloads: parseInt(row.unprocessed_payloads) || 0,
dlqPending: parseInt(row.dlq_pending) || 0,
activeAlerts: parseInt(row.active_alerts) || 0,
runs24h: row.runs_24h || { total: 0, completed: 0, failed: 0 },
};
}
/**
* Get queue depth information
*/
async getQueueDepth(): Promise<QueueDepth> {
const [countResult, byStateResult, byPlatformResult, oldestResult] = await Promise.all([
this.pool.query(`
SELECT COUNT(*) as count FROM raw_payloads WHERE processed = FALSE
`),
this.pool.query(`
SELECT
COALESCE(d.state, 'unknown') as state,
COUNT(*) as count
FROM raw_payloads rp
LEFT JOIN dispensaries d ON rp.dispensary_id = d.id
WHERE rp.processed = FALSE
GROUP BY d.state
`),
this.pool.query(`
SELECT platform, COUNT(*) as count
FROM raw_payloads
WHERE processed = FALSE
GROUP BY platform
`),
this.pool.query(`
SELECT fetched_at FROM raw_payloads
WHERE processed = FALSE
ORDER BY fetched_at ASC
LIMIT 1
`),
]);
const unprocessed = parseInt(countResult.rows[0]?.count) || 0;
const byState: Record<string, number> = {};
byStateResult.rows.forEach(r => {
byState[r.state] = parseInt(r.count);
});
const byPlatform: Record<string, number> = {};
byPlatformResult.rows.forEach(r => {
byPlatform[r.platform] = parseInt(r.count);
});
const oldestPayloadAge = oldestResult.rows.length > 0
? Date.now() - new Date(oldestResult.rows[0].fetched_at).getTime()
: null;
// Estimate processing time based on recent throughput
const throughputResult = await this.pool.query(`
SELECT
COALESCE(AVG(payloads_processed::float / NULLIF(duration_ms, 0) * 1000), 10) as payloads_per_sec
FROM sync_runs
WHERE status = 'completed'
AND started_at >= NOW() - INTERVAL '1 hour'
AND duration_ms > 0
`);
const payloadsPerSec = parseFloat(throughputResult.rows[0]?.payloads_per_sec) || 10;
const estimatedProcessingTime = unprocessed > 0
? Math.round((unprocessed / payloadsPerSec) * 1000)
: null;
return {
unprocessed,
byState,
byPlatform,
oldestPayloadAge,
estimatedProcessingTime,
};
}
/**
* Acquire distributed lock
*/
private async acquireLock(): Promise<boolean> {
const lockName = 'sync_orchestrator';
const lockTimeout = DEFAULT_CONFIG.lockTimeoutMs;
const result = await this.pool.query(`
INSERT INTO hydration_locks (lock_name, worker_id, acquired_at, expires_at, heartbeat_at)
VALUES ($1, $2, NOW(), NOW() + ($3 || ' milliseconds')::INTERVAL, NOW())
ON CONFLICT (lock_name) DO UPDATE SET
worker_id = EXCLUDED.worker_id,
acquired_at = EXCLUDED.acquired_at,
expires_at = EXCLUDED.expires_at,
heartbeat_at = EXCLUDED.heartbeat_at
WHERE hydration_locks.expires_at < NOW()
OR hydration_locks.worker_id = $2
RETURNING id
`, [lockName, this.workerId, lockTimeout]);
return result.rows.length > 0;
}
/**
* Release distributed lock
*/
private async releaseLock(): Promise<void> {
await this.pool.query(`
DELETE FROM hydration_locks
WHERE lock_name = 'sync_orchestrator' AND worker_id = $1
`, [this.workerId]);
}
/**
* Update lock heartbeat
*/
private async refreshLock(): Promise<boolean> {
const result = await this.pool.query(`
UPDATE hydration_locks
SET heartbeat_at = NOW(),
expires_at = NOW() + ($2 || ' milliseconds')::INTERVAL
WHERE lock_name = 'sync_orchestrator' AND worker_id = $1
RETURNING id
`, [this.workerId, DEFAULT_CONFIG.lockTimeoutMs]);
return result.rows.length > 0;
}
/**
* Update orchestrator state
*/
private async updateState(status: OrchestratorStatus, metrics?: Partial<SyncRunMetrics>): Promise<void> {
await this.pool.query(`
UPDATE sync_orchestrator_state
SET status = $1,
current_worker_id = $2,
last_heartbeat_at = NOW(),
updated_at = NOW()
${metrics?.payloadsProcessed !== undefined ? ', last_run_payloads_processed = $3' : ''}
${metrics?.payloadsFailed !== undefined ? ', last_run_errors = $4' : ''}
WHERE id = 1
`, [
status,
this.workerId,
metrics?.payloadsProcessed,
metrics?.payloadsFailed,
].filter(v => v !== undefined));
}
/**
* Run a single sync cycle
*/
async runSync(): Promise<SyncRunMetrics> {
const startTime = Date.now();
const runId = crypto.randomUUID();
// Check if paused
const status = await this.getStatus();
if (status.isPaused) {
throw new Error(`Orchestrator is paused: ${status.pauseReason}`);
}
// Try to acquire lock
const hasLock = await this.acquireLock();
if (!hasLock) {
throw new Error('Could not acquire orchestrator lock - another instance is running');
}
const metrics: SyncRunMetrics = {
payloadsQueued: 0,
payloadsProcessed: 0,
payloadsSkipped: 0,
payloadsFailed: 0,
payloadsDlq: 0,
productsUpserted: 0,
productsInserted: 0,
productsUpdated: 0,
productsDiscontinued: 0,
snapshotsCreated: 0,
};
try {
await this.updateState('RUNNING');
// Create sync run record
await this.pool.query(`
INSERT INTO sync_runs (run_id, worker_id, status)
VALUES ($1, $2, 'running')
`, [runId, this.workerId]);
// Get unprocessed payloads
const queueDepth = await this.getQueueDepth();
metrics.payloadsQueued = queueDepth.unprocessed;
// Process in batches
const config = status.config;
let hasMore = true;
let batchCount = 0;
while (hasMore && batchCount < 100) { // Safety limit
batchCount++;
// Refresh lock
await this.refreshLock();
// Get batch of payloads
const payloadsResult = await this.pool.query(`
SELECT
rp.id, rp.dispensary_id, rp.raw_json, rp.platform,
rp.product_count, rp.pricing_type, rp.crawl_mode,
rp.hydration_attempts, rp.fetched_at,
d.state, d.name as dispensary_name
FROM raw_payloads rp
LEFT JOIN dispensaries d ON rp.dispensary_id = d.id
WHERE rp.processed = FALSE
ORDER BY rp.fetched_at ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
`, [config.batchSize]);
if (payloadsResult.rows.length === 0) {
hasMore = false;
break;
}
// Process each payload
for (const payload of payloadsResult.rows) {
try {
const result = await this.processPayload(payload, config);
metrics.payloadsProcessed++;
metrics.productsUpserted += result.productsUpserted;
metrics.productsInserted += result.productsInserted;
metrics.productsUpdated += result.productsUpdated;
metrics.snapshotsCreated += result.snapshotsCreated;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
// Check if should move to DLQ
if (payload.hydration_attempts >= config.maxRetries - 1) {
await this.dlq.movePayloadToDlq(
payload.id,
this.classifyError(error),
errorMessage
);
metrics.payloadsDlq++;
} else {
// Increment attempts and record error
await this.pool.query(`
UPDATE raw_payloads
SET hydration_attempts = hydration_attempts + 1,
hydration_error = $2
WHERE id = $1
`, [payload.id, errorMessage]);
}
metrics.payloadsFailed++;
await this.metrics.recordError(
this.classifyError(error),
errorMessage,
'raw_payloads',
payload.id,
payload.dispensary_id
);
}
}
// Update metrics after each batch
await this.metrics.recordMetric('payloads_processed_today', metrics.payloadsProcessed);
}
// Update metrics
await this.metrics.recordMetric('payloads_unprocessed', metrics.payloadsQueued - metrics.payloadsProcessed);
await this.metrics.recordMetric('canonical_rows_inserted', metrics.productsInserted);
await this.metrics.recordMetric('canonical_rows_updated', metrics.productsUpdated);
await this.metrics.recordMetric('snapshot_volume', metrics.snapshotsCreated);
// Calculate success rate
const successRate = metrics.payloadsProcessed > 0
? ((metrics.payloadsProcessed - metrics.payloadsFailed) / metrics.payloadsProcessed) * 100
: 100;
await this.metrics.recordMetric('hydration_success_rate', successRate);
// Trigger analytics precompute if enabled
if (config.enableAnalyticsPrecompute && metrics.payloadsProcessed > 0) {
await this.triggerAnalyticsUpdate();
}
// Complete sync run
const duration = Date.now() - startTime;
await this.pool.query(`
UPDATE sync_runs
SET status = 'completed',
finished_at = NOW(),
duration_ms = $2,
payloads_queued = $3,
payloads_processed = $4,
payloads_failed = $5,
payloads_dlq = $6,
products_upserted = $7,
products_inserted = $8,
products_updated = $9,
snapshots_created = $10
WHERE run_id = $1
`, [
runId, duration,
metrics.payloadsQueued, metrics.payloadsProcessed,
metrics.payloadsFailed, metrics.payloadsDlq,
metrics.productsUpserted, metrics.productsInserted,
metrics.productsUpdated, metrics.snapshotsCreated,
]);
// Update orchestrator state
await this.pool.query(`
UPDATE sync_orchestrator_state
SET status = 'SLEEPING',
last_run_started_at = $1,
last_run_completed_at = NOW(),
last_run_duration_ms = $2,
last_run_payloads_processed = $3,
last_run_errors = $4,
consecutive_failures = 0,
updated_at = NOW()
WHERE id = 1
`, [new Date(startTime), duration, metrics.payloadsProcessed, metrics.payloadsFailed]);
return metrics;
} catch (error) {
// Record failure
const errorMessage = error instanceof Error ? error.message : String(error);
await this.pool.query(`
UPDATE sync_runs
SET status = 'failed',
finished_at = NOW(),
error_summary = $2
WHERE run_id = $1
`, [runId, errorMessage]);
await this.pool.query(`
UPDATE sync_orchestrator_state
SET status = 'ERROR',
consecutive_failures = consecutive_failures + 1,
updated_at = NOW()
WHERE id = 1
`);
await this.alerts.createAlert(
'SYNC_FAILURE',
'error',
'Sync run failed',
errorMessage,
'sync-orchestrator'
);
throw error;
} finally {
await this.releaseLock();
}
}
/**
* Process a single payload - now uses canonical tables via hydration pipeline
*/
private async processPayload(
payload: any,
_config: OrchestratorConfig
): Promise<{
productsUpserted: number;
productsInserted: number;
productsUpdated: number;
snapshotsCreated: number;
}> {
const startTime = Date.now();
// Parse products from raw JSON
const rawData = payload.raw_json;
// Validate the payload using normalizer
const validation = this.normalizer.validatePayload(rawData);
if (!validation.valid) {
// Mark as processed with warning
await this.pool.query(`
UPDATE raw_payloads
SET processed = TRUE,
normalized_at = NOW(),
hydration_error = $2
WHERE id = $1
`, [payload.id, validation.errors.join('; ')]);
return { productsUpserted: 0, productsInserted: 0, productsUpdated: 0, snapshotsCreated: 0 };
}
// Normalize the payload using the hydration normalizer
const normResult = this.normalizer.normalize(rawData);
if (normResult.products.length === 0) {
// Mark as processed with warning
await this.pool.query(`
UPDATE raw_payloads
SET processed = TRUE,
normalized_at = NOW(),
hydration_error = 'No products found in payload after normalization'
WHERE id = $1
`, [payload.id]);
return { productsUpserted: 0, productsInserted: 0, productsUpdated: 0, snapshotsCreated: 0 };
}
// Get or create crawl_run for this payload
const crawlRunId = await this.getOrCreateCrawlRun(payload.dispensary_id, payload.id);
// Use canonical hydration to write to store_products, product_variants, etc.
const hydrateResult = await hydrateToCanonical(
this.pool,
payload.dispensary_id,
normResult,
crawlRunId
);
// Also write to legacy tables for backwards compatibility
const products = this.extractProducts(rawData);
await this.upsertProducts(payload.dispensary_id, products);
const snapshotsCreated = await this.createSnapshots(payload.dispensary_id, products, payload.id);
// Calculate latency
const latencyMs = Date.now() - new Date(payload.fetched_at).getTime();
await this.metrics.recordMetric('ingestion_latency_avg_ms', latencyMs);
// Mark payload as processed
await this.pool.query(`
UPDATE raw_payloads
SET processed = TRUE,
normalized_at = NOW()
WHERE id = $1
`, [payload.id]);
// Return combined metrics (canonical + legacy)
return {
productsUpserted: hydrateResult.productsUpserted,
productsInserted: hydrateResult.productsNew,
productsUpdated: hydrateResult.productsUpdated,
snapshotsCreated: hydrateResult.snapshotsCreated + snapshotsCreated,
};
}
/**
* Get or create a crawl_run record for tracking
*/
private async getOrCreateCrawlRun(dispensaryId: number, payloadId: string): Promise<number | null> {
try {
const result = await this.pool.query(`
INSERT INTO crawl_runs (dispensary_id, provider, started_at, status, trigger_type, metadata)
VALUES ($1, 'dutchie', NOW(), 'running', 'hydration', jsonb_build_object('payload_id', $2))
RETURNING id
`, [dispensaryId, payloadId]);
return result.rows[0].id;
} catch (error) {
console.warn('[SyncOrchestrator] Could not create crawl_run:', error);
return null;
}
}
/**
* Extract products from raw payload
*/
private extractProducts(rawData: any): any[] {
// Handle different payload formats
if (Array.isArray(rawData)) {
return rawData;
}
// Dutchie format
if (rawData.products) {
return rawData.products;
}
// Nested data format
if (rawData.data?.products) {
return rawData.data.products;
}
if (rawData.data?.filteredProducts?.products) {
return rawData.data.filteredProducts.products;
}
return [];
}
/**
* Upsert products to canonical table
*/
private async upsertProducts(
dispensaryId: number,
products: any[]
): Promise<{ upserted: number; inserted: number; updated: number }> {
let inserted = 0;
let updated = 0;
// Process in chunks
const chunkSize = 100;
for (let i = 0; i < products.length; i += chunkSize) {
const chunk = products.slice(i, i + chunkSize);
for (const product of chunk) {
const externalId = product.id || product.externalId || product.product_id;
if (!externalId) continue;
const result = await this.pool.query(`
INSERT INTO dutchie_products (
dispensary_id, external_product_id, name, brand_name, type,
latest_raw_payload, updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
ON CONFLICT (dispensary_id, external_product_id)
DO UPDATE SET
name = EXCLUDED.name,
brand_name = EXCLUDED.brand_name,
type = EXCLUDED.type,
latest_raw_payload = EXCLUDED.latest_raw_payload,
updated_at = NOW()
RETURNING (xmax = 0) as is_insert
`, [
dispensaryId,
externalId,
product.name || product.Name,
product.brand || product.Brand || product.brandName,
product.type || product.Type || product.category,
JSON.stringify(product),
]);
if (result.rows[0]?.is_insert) {
inserted++;
} else {
updated++;
}
}
}
return { upserted: inserted + updated, inserted, updated };
}
/**
* Create product snapshots
*/
private async createSnapshots(
dispensaryId: number,
products: any[],
payloadId: string
): Promise<number> {
let created = 0;
// Get product IDs
const externalIds = products
.map(p => p.id || p.externalId || p.product_id)
.filter(Boolean);
if (externalIds.length === 0) return 0;
const productsResult = await this.pool.query(`
SELECT id, external_product_id FROM dutchie_products
WHERE dispensary_id = $1 AND external_product_id = ANY($2)
`, [dispensaryId, externalIds]);
const productIdMap = new Map<string, number>();
productsResult.rows.forEach(r => {
productIdMap.set(r.external_product_id, r.id);
});
// Insert snapshots in chunks
const chunkSize = 100;
for (let i = 0; i < products.length; i += chunkSize) {
const chunk = products.slice(i, i + chunkSize);
const values: any[] = [];
const placeholders: string[] = [];
let paramIndex = 1;
for (const product of chunk) {
const externalId = product.id || product.externalId || product.product_id;
const productId = productIdMap.get(externalId);
if (!productId) continue;
// Extract pricing
const prices = product.Prices || product.prices || [];
const recPrice = prices.find((p: any) => p.pricingType === 'rec' || !p.pricingType);
values.push(
productId,
dispensaryId,
payloadId,
recPrice?.price ? Math.round(recPrice.price * 100) : null,
product.potencyCBD?.formatted || null,
product.potencyTHC?.formatted || null,
product.Status === 'Active' ? 'in_stock' : 'out_of_stock'
);
placeholders.push(`($${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, NOW())`);
}
if (placeholders.length > 0) {
await this.pool.query(`
INSERT INTO dutchie_product_snapshots (
dutchie_product_id, dispensary_id, crawl_run_id,
rec_min_price_cents, cbd_content, thc_content, stock_status, crawled_at
)
VALUES ${placeholders.join(', ')}
`, values);
created += placeholders.length;
}
}
return created;
}
/**
* Classify error type
*/
private classifyError(error: unknown): string {
const message = error instanceof Error ? error.message.toLowerCase() : String(error).toLowerCase();
if (message.includes('parse') || message.includes('json')) {
return 'INGESTION_PARSE_ERROR';
}
if (message.includes('normalize') || message.includes('transform')) {
return 'NORMALIZATION_ERROR';
}
if (message.includes('upsert') || message.includes('insert') || message.includes('duplicate')) {
return 'HYDRATION_UPSERT_ERROR';
}
if (message.includes('brand')) {
return 'MISSING_BRAND_MAP';
}
if (message.includes('category')) {
return 'MISSING_CATEGORY_MAP';
}
if (message.includes('state')) {
return 'STATE_MISMATCH';
}
if (message.includes('external_id') || message.includes('external_product_id')) {
return 'DUPLICATE_EXTERNAL_ID';
}
return 'HYDRATION_ERROR';
}
/**
* Trigger analytics precompute
*/
private async triggerAnalyticsUpdate(): Promise<void> {
try {
// Capture brand snapshots
await this.pool.query(`SELECT capture_brand_snapshots()`);
// Capture category snapshots
await this.pool.query(`SELECT capture_category_snapshots()`);
// Refresh materialized views if they exist
try {
await this.pool.query(`REFRESH MATERIALIZED VIEW CONCURRENTLY v_brand_summary`);
} catch {
// View might not exist, ignore
}
console.log('[SyncOrchestrator] Analytics precompute completed');
} catch (error) {
console.warn('[SyncOrchestrator] Analytics precompute failed:', error);
}
}
/**
* Pause orchestrator
*/
async pause(reason: string): Promise<void> {
await this.pool.query(`
UPDATE sync_orchestrator_state
SET is_paused = TRUE,
pause_reason = $1,
updated_at = NOW()
WHERE id = 1
`, [reason]);
await this.alerts.createAlert(
'ORCHESTRATOR_PAUSED',
'warning',
'Sync orchestrator paused',
reason,
'sync-orchestrator'
);
}
/**
* Resume orchestrator
*/
async resume(): Promise<void> {
await this.pool.query(`
UPDATE sync_orchestrator_state
SET is_paused = FALSE,
pause_reason = NULL,
updated_at = NOW()
WHERE id = 1
`);
await this.alerts.resolveAlert('ORCHESTRATOR_PAUSED');
}
/**
* Get health status
*/
async getHealth(): Promise<{
healthy: boolean;
checks: Record<string, { status: 'ok' | 'warning' | 'error'; message: string }>;
}> {
const checks: Record<string, { status: 'ok' | 'warning' | 'error'; message: string }> = {};
// Check database connection
try {
await this.pool.query('SELECT 1');
checks.database = { status: 'ok', message: 'Database connection healthy' };
} catch (error) {
checks.database = { status: 'error', message: `Database error: ${error}` };
}
// Check orchestrator state
const status = await this.getStatus();
if (status.isPaused) {
checks.orchestrator = { status: 'warning', message: `Paused: ${status.pauseReason}` };
} else if (status.consecutiveFailures > 5) {
checks.orchestrator = { status: 'error', message: `${status.consecutiveFailures} consecutive failures` };
} else {
checks.orchestrator = { status: 'ok', message: `Status: ${status.orchestratorStatus}` };
}
// Check queue depth
const queue = await this.getQueueDepth();
if (queue.unprocessed > 1000) {
checks.queue = { status: 'warning', message: `${queue.unprocessed} unprocessed payloads` };
} else {
checks.queue = { status: 'ok', message: `${queue.unprocessed} unprocessed payloads` };
}
// Check DLQ
const dlqStats = await this.dlq.getStats();
if (dlqStats.pending > 100) {
checks.dlq = { status: 'warning', message: `${dlqStats.pending} payloads in DLQ` };
} else if (dlqStats.pending > 0) {
checks.dlq = { status: 'ok', message: `${dlqStats.pending} payloads in DLQ` };
} else {
checks.dlq = { status: 'ok', message: 'DLQ empty' };
}
// Check latency
const latencyResult = await this.pool.query(`
SELECT metric_value FROM system_metrics_current
WHERE metric_name = 'ingestion_latency_avg_ms'
`);
const latency = parseFloat(latencyResult.rows[0]?.metric_value) || 0;
if (latency > 300000) { // 5 minutes
checks.latency = { status: 'error', message: `Ingestion latency: ${Math.round(latency / 1000)}s` };
} else if (latency > 60000) { // 1 minute
checks.latency = { status: 'warning', message: `Ingestion latency: ${Math.round(latency / 1000)}s` };
} else {
checks.latency = { status: 'ok', message: `Ingestion latency: ${Math.round(latency / 1000)}s` };
}
const healthy = Object.values(checks).every(c => c.status !== 'error');
return { healthy, checks };
}
/**
* Start continuous sync loop
*/
start(): void {
if (this.isRunning) return;
this.isRunning = true;
console.log(`[SyncOrchestrator] Starting with worker ID: ${this.workerId}`);
const poll = async () => {
if (!this.isRunning) return;
try {
const status = await this.getStatus();
if (!status.isPaused && status.unprocessedPayloads > 0) {
await this.runSync();
}
} catch (error) {
console.error('[SyncOrchestrator] Sync error:', error);
}
if (this.isRunning) {
this.pollInterval = setTimeout(poll, DEFAULT_CONFIG.pollIntervalMs);
}
};
poll();
}
/**
* Stop continuous sync loop
*/
stop(): void {
this.isRunning = false;
if (this.pollInterval) {
clearTimeout(this.pollInterval);
this.pollInterval = null;
}
console.log('[SyncOrchestrator] Stopped');
}
}