Files
cannaiq/backend/src/canonical-hydration/crawl-run-recorder.ts
Kelly b4a2fb7d03 feat: Add v2 architecture with multi-state support and orchestrator services
Major additions:
- Multi-state expansion: states table, StateSelector, NationalDashboard, StateHeatmap, CrossStateCompare
- Orchestrator services: trace service, error taxonomy, retry manager, proxy rotator
- Discovery system: dutchie discovery service, geo validation, city seeding scripts
- Analytics infrastructure: analytics v2 routes, brand/pricing/stores intelligence pages
- Local development: setup-local.sh starts all 5 services (postgres, backend, cannaiq, findadispo, findagram)
- Migrations 037-056: crawler profiles, states, analytics indexes, worker metadata

Frontend pages added:
- Discovery, ChainsDashboard, IntelligenceBrands, IntelligencePricing, IntelligenceStores
- StateHeatmap, CrossStateCompare, SyncInfoPanel

Components added:
- StateSelector, OrchestratorTraceModal, WorkflowStepper

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-07 11:30:57 -07:00

227 lines
6.5 KiB
TypeScript

/**
* CrawlRunRecorder
* Records crawl runs from source job tables (dispensary_crawl_jobs) to canonical crawl_runs table
*/
import { Pool, PoolClient } from 'pg';
import { SourceJob, CrawlRun, ServiceContext, SourceJobType } from './types';
export class CrawlRunRecorder {
private pool: Pool;
private log: (message: string) => void;
constructor(ctx: ServiceContext) {
this.pool = ctx.pool;
this.log = ctx.logger || console.log;
}
/**
* Record a single crawl run from a source job
* Uses ON CONFLICT to ensure idempotency
*/
async recordCrawlRun(
sourceJob: SourceJob,
sourceJobType: SourceJobType = 'dispensary_crawl_jobs'
): Promise<number | null> {
// Skip jobs that aren't completed successfully
if (sourceJob.status !== 'completed') {
return null;
}
const crawlRun: Partial<CrawlRun> = {
dispensary_id: sourceJob.dispensary_id,
provider: 'dutchie', // Source is always dutchie for now
started_at: sourceJob.started_at || new Date(),
finished_at: sourceJob.completed_at,
duration_ms: sourceJob.duration_ms,
status: this.mapStatus(sourceJob.status),
error_message: sourceJob.error_message,
products_found: sourceJob.products_found,
products_new: sourceJob.products_new,
products_updated: sourceJob.products_updated,
snapshots_written: null, // Will be updated after snapshot insertion
worker_id: null,
trigger_type: sourceJob.job_type === 'dutchie_product_crawl' ? 'scheduled' : 'manual',
metadata: { sourceJobType, originalJobType: sourceJob.job_type },
source_job_type: sourceJobType,
source_job_id: sourceJob.id,
};
const result = await this.pool.query(
`INSERT INTO crawl_runs (
dispensary_id, provider, started_at, finished_at, duration_ms,
status, error_message, products_found, products_new, products_updated,
snapshots_written, worker_id, trigger_type, metadata,
source_job_type, source_job_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (source_job_type, source_job_id) WHERE source_job_id IS NOT NULL
DO UPDATE SET
finished_at = EXCLUDED.finished_at,
duration_ms = EXCLUDED.duration_ms,
status = EXCLUDED.status,
error_message = EXCLUDED.error_message,
products_found = EXCLUDED.products_found,
products_new = EXCLUDED.products_new,
products_updated = EXCLUDED.products_updated
RETURNING id`,
[
crawlRun.dispensary_id,
crawlRun.provider,
crawlRun.started_at,
crawlRun.finished_at,
crawlRun.duration_ms,
crawlRun.status,
crawlRun.error_message,
crawlRun.products_found,
crawlRun.products_new,
crawlRun.products_updated,
crawlRun.snapshots_written,
crawlRun.worker_id,
crawlRun.trigger_type,
JSON.stringify(crawlRun.metadata),
crawlRun.source_job_type,
crawlRun.source_job_id,
]
);
return result.rows[0]?.id || null;
}
/**
* Record multiple crawl runs in a batch
*/
async recordCrawlRunsBatch(
sourceJobs: SourceJob[],
sourceJobType: SourceJobType = 'dispensary_crawl_jobs'
): Promise<{ created: number; skipped: number; crawlRunIds: Map<number, number> }> {
let created = 0;
let skipped = 0;
const crawlRunIds = new Map<number, number>(); // sourceJobId -> crawlRunId
for (const job of sourceJobs) {
const crawlRunId = await this.recordCrawlRun(job, sourceJobType);
if (crawlRunId) {
created++;
crawlRunIds.set(job.id, crawlRunId);
} else {
skipped++;
}
}
return { created, skipped, crawlRunIds };
}
/**
* Update snapshots_written count for a crawl run
*/
async updateSnapshotsWritten(crawlRunId: number, snapshotsWritten: number): Promise<void> {
await this.pool.query(
'UPDATE crawl_runs SET snapshots_written = $1 WHERE id = $2',
[snapshotsWritten, crawlRunId]
);
}
/**
* Get crawl run ID by source job
*/
async getCrawlRunIdBySourceJob(
sourceJobType: SourceJobType,
sourceJobId: number
): Promise<number | null> {
const result = await this.pool.query(
'SELECT id FROM crawl_runs WHERE source_job_type = $1 AND source_job_id = $2',
[sourceJobType, sourceJobId]
);
return result.rows[0]?.id || null;
}
/**
* Get unhydrated source jobs (jobs not yet recorded in crawl_runs)
*/
async getUnhydratedJobs(
dispensaryId?: number,
startDate?: Date,
limit: number = 100
): Promise<SourceJob[]> {
let query = `
SELECT j.*
FROM dispensary_crawl_jobs j
LEFT JOIN crawl_runs cr ON cr.source_job_type = 'dispensary_crawl_jobs' AND cr.source_job_id = j.id
WHERE cr.id IS NULL
AND j.status = 'completed'
AND j.job_type = 'dutchie_product_crawl'
`;
const params: any[] = [];
let paramIndex = 1;
if (dispensaryId) {
query += ` AND j.dispensary_id = $${paramIndex++}`;
params.push(dispensaryId);
}
if (startDate) {
query += ` AND j.completed_at >= $${paramIndex++}`;
params.push(startDate);
}
query += ` ORDER BY j.completed_at ASC LIMIT $${paramIndex}`;
params.push(limit);
const result = await this.pool.query(query, params);
return result.rows;
}
/**
* Get all source jobs for backfill (within date range)
*/
async getSourceJobsForBackfill(
startDate?: Date,
endDate?: Date,
dispensaryId?: number,
limit: number = 1000
): Promise<SourceJob[]> {
let query = `
SELECT *
FROM dispensary_crawl_jobs
WHERE status = 'completed'
AND job_type = 'dutchie_product_crawl'
`;
const params: any[] = [];
let paramIndex = 1;
if (startDate) {
query += ` AND completed_at >= $${paramIndex++}`;
params.push(startDate);
}
if (endDate) {
query += ` AND completed_at <= $${paramIndex++}`;
params.push(endDate);
}
if (dispensaryId) {
query += ` AND dispensary_id = $${paramIndex++}`;
params.push(dispensaryId);
}
query += ` ORDER BY completed_at ASC LIMIT $${paramIndex}`;
params.push(limit);
const result = await this.pool.query(query, params);
return result.rows;
}
private mapStatus(sourceStatus: string): string {
switch (sourceStatus) {
case 'completed':
return 'success';
case 'failed':
return 'failed';
case 'running':
return 'running';
default:
return sourceStatus;
}
}
}