/** * Payload Storage Utility * * Per TASK_WORKFLOW_2024-12-10.md: Store raw API payloads for historical analysis. * * Design Pattern: Metadata/Payload Separation * - Metadata in PostgreSQL (raw_crawl_payloads table): Small, indexed, queryable * - Payload stored in MinIO/S3 (or local filesystem as fallback): Gzipped JSON * * Storage structure (MinIO): * cannaiq/payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz * * Storage structure (Local fallback): * ./storage/payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz * * Platform values: 'dutchie', 'jane' * * Examples: * payloads/dutchie/2024/12/13/store_456_1734105600000.json.gz * payloads/jane/2024/12/13/store_2788_1734105600000.json.gz * * Benefits: * - Compare any two crawls to see what changed * - Replay/re-normalize historical data if logic changes * - Debug issues by seeing exactly what the API returned * - DB stays small, backups stay fast * - ~90% compression (1.5MB -> 150KB per crawl) * - Shared storage accessible by all worker pods (MinIO) * - Platform separation for different retention/management policies */ import * as fs from 'fs'; import * as path from 'path'; import * as zlib from 'zlib'; import { promisify } from 'util'; import { Pool } from 'pg'; import * as crypto from 'crypto'; import * as Minio from 'minio'; const gzip = promisify(zlib.gzip); const gunzip = promisify(zlib.gunzip); // Base path for payload storage (used for local fallback and as key prefix in MinIO) const PAYLOAD_BASE_PATH = process.env.PAYLOAD_STORAGE_PATH || './storage/payloads'; // MinIO configuration const MINIO_ENDPOINT = process.env.MINIO_ENDPOINT; const MINIO_PORT = parseInt(process.env.MINIO_PORT || '443'); const MINIO_USE_SSL = process.env.MINIO_USE_SSL === 'true'; const MINIO_ACCESS_KEY = process.env.MINIO_ACCESS_KEY; const MINIO_SECRET_KEY = process.env.MINIO_SECRET_KEY; const MINIO_BUCKET = process.env.MINIO_BUCKET || 'cannaiq'; // Check if MinIO is configured const useMinIO = !!(MINIO_ENDPOINT && MINIO_ACCESS_KEY && MINIO_SECRET_KEY); let minioClient: Minio.Client | null = null; function getMinioClient(): Minio.Client { if (!minioClient && useMinIO) { minioClient = new Minio.Client({ endPoint: MINIO_ENDPOINT!, port: MINIO_PORT, useSSL: MINIO_USE_SSL, accessKey: MINIO_ACCESS_KEY!, secretKey: MINIO_SECRET_KEY!, }); } return minioClient!; } // Log which storage backend we're using if (useMinIO) { console.log(`[PayloadStorage] Using MinIO storage: ${MINIO_ENDPOINT}/${MINIO_BUCKET}`); } else { console.log(`[PayloadStorage] Using local filesystem storage: ${PAYLOAD_BASE_PATH}`); } /** * Result from saving a payload */ export interface SavePayloadResult { id: number; storagePath: string; sizeBytes: number; sizeBytesRaw: number; checksum: string; } /** * Result from loading a payload */ export interface LoadPayloadResult { payload: any; metadata: { id: number; dispensaryId: number; crawlRunId: number | null; productCount: number; fetchedAt: Date; storagePath: string; }; } /** * Generate storage path/key for a payload * * MinIO format: payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz * Local format: ./storage/payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz * * Platform defaults to 'dutchie' for backward compatibility */ function generateStoragePath(dispensaryId: number, timestamp: Date, platform: string = 'dutchie'): string { const year = timestamp.getFullYear(); const month = String(timestamp.getMonth() + 1).padStart(2, '0'); const day = String(timestamp.getDate()).padStart(2, '0'); const ts = timestamp.getTime(); const relativePath = `payloads/${platform}/${year}/${month}/${day}/store_${dispensaryId}_${ts}.json.gz`; if (useMinIO) { // MinIO uses forward slashes, no leading slash return relativePath; } else { // Local filesystem uses OS-specific path return path.join(PAYLOAD_BASE_PATH, platform, String(year), month, day, `store_${dispensaryId}_${ts}.json.gz`); } } /** * Ensure directory exists for a file path */ async function ensureDir(filePath: string): Promise { const dir = path.dirname(filePath); await fs.promises.mkdir(dir, { recursive: true }); } /** * Calculate SHA256 checksum of data */ function calculateChecksum(data: Buffer): string { return crypto.createHash('sha256').update(data).digest('hex'); } /** * Save a raw crawl payload to MinIO/S3 (or filesystem) and record metadata in DB * * @param pool - Database connection pool * @param dispensaryId - ID of the dispensary * @param payload - Raw JSON payload from GraphQL/API * @param crawlRunId - Optional crawl_run ID for linking * @param productCount - Number of products in payload * @param platform - Platform identifier ('dutchie' | 'jane'), defaults to 'dutchie' * @returns SavePayloadResult with file info and DB record ID */ export async function saveRawPayload( pool: Pool, dispensaryId: number, payload: any, crawlRunId: number | null = null, productCount: number = 0, platform: string = 'dutchie' ): Promise { const timestamp = new Date(); const storagePath = generateStoragePath(dispensaryId, timestamp, platform); // Serialize and compress const jsonStr = JSON.stringify(payload); const rawSize = Buffer.byteLength(jsonStr, 'utf8'); const compressed = await gzip(Buffer.from(jsonStr, 'utf8')); const compressedSize = compressed.length; const checksum = calculateChecksum(compressed); // Write to storage backend if (useMinIO) { // Upload to MinIO const client = getMinioClient(); await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, { 'Content-Type': 'application/gzip', 'Content-Encoding': 'gzip', }); } else { // Write to local filesystem await ensureDir(storagePath); await fs.promises.writeFile(storagePath, compressed); } // Record metadata in DB const result = await pool.query(` INSERT INTO raw_crawl_payloads ( crawl_run_id, dispensary_id, storage_path, product_count, size_bytes, size_bytes_raw, fetched_at, checksum_sha256 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id `, [ crawlRunId, dispensaryId, storagePath, productCount, compressedSize, rawSize, timestamp, checksum ]); // Update dispensary timestamp await pool.query(` UPDATE dispensaries SET last_payload_at = $2 WHERE id = $1 `, [dispensaryId, timestamp]); const backend = useMinIO ? 'MinIO' : 'local'; console.log(`[PayloadStorage] Saved payload to ${backend} for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`); return { id: result.rows[0].id, storagePath, sizeBytes: compressedSize, sizeBytesRaw: rawSize, checksum }; } /** * Load a raw payload from filesystem by metadata ID * * @param pool - Database connection pool * @param payloadId - ID from raw_crawl_payloads table * @returns LoadPayloadResult with parsed payload and metadata */ export async function loadRawPayloadById( pool: Pool, payloadId: number ): Promise { const result = await pool.query(` SELECT id, dispensary_id, crawl_run_id, storage_path, product_count, fetched_at FROM raw_crawl_payloads WHERE id = $1 `, [payloadId]); if (result.rows.length === 0) { return null; } const row = result.rows[0]; const payload = await loadPayloadFromPath(row.storage_path); return { payload, metadata: { id: row.id, dispensaryId: row.dispensary_id, crawlRunId: row.crawl_run_id, productCount: row.product_count, fetchedAt: row.fetched_at, storagePath: row.storage_path } }; } /** * Load a raw payload directly from storage path (MinIO or filesystem) * * @param storagePath - Path/key to gzipped JSON file * @returns Parsed JSON payload */ export async function loadPayloadFromPath(storagePath: string): Promise { let compressed: Buffer; // Determine if path looks like MinIO key (starts with payloads/) or local path const isMinIOPath = storagePath.startsWith('payloads/') && useMinIO; if (isMinIOPath) { // Download from MinIO const client = getMinioClient(); const chunks: Buffer[] = []; const stream = await client.getObject(MINIO_BUCKET, storagePath); for await (const chunk of stream) { chunks.push(chunk as Buffer); } compressed = Buffer.concat(chunks); } else { // Read from local filesystem compressed = await fs.promises.readFile(storagePath); } const decompressed = await gunzip(compressed); return JSON.parse(decompressed.toString('utf8')); } /** * Get the latest payload for a dispensary * * @param pool - Database connection pool * @param dispensaryId - ID of the dispensary * @returns LoadPayloadResult or null if none exists */ export async function getLatestPayload( pool: Pool, dispensaryId: number ): Promise { const result = await pool.query(` SELECT id, dispensary_id, crawl_run_id, storage_path, product_count, fetched_at FROM raw_crawl_payloads WHERE dispensary_id = $1 ORDER BY fetched_at DESC LIMIT 1 `, [dispensaryId]); if (result.rows.length === 0) { return null; } const row = result.rows[0]; const payload = await loadPayloadFromPath(row.storage_path); return { payload, metadata: { id: row.id, dispensaryId: row.dispensary_id, crawlRunId: row.crawl_run_id, productCount: row.product_count, fetchedAt: row.fetched_at, storagePath: row.storage_path } }; } /** * Get two payloads for comparison (latest and previous, or by IDs) * * @param pool - Database connection pool * @param dispensaryId - ID of the dispensary * @param limit - Number of recent payloads to retrieve (default 2) * @returns Array of LoadPayloadResult, most recent first */ export async function getRecentPayloads( pool: Pool, dispensaryId: number, limit: number = 2 ): Promise { const result = await pool.query(` SELECT id, dispensary_id, crawl_run_id, storage_path, product_count, fetched_at FROM raw_crawl_payloads WHERE dispensary_id = $1 ORDER BY fetched_at DESC LIMIT $2 `, [dispensaryId, limit]); const payloads: LoadPayloadResult[] = []; for (const row of result.rows) { const payload = await loadPayloadFromPath(row.storage_path); payloads.push({ payload, metadata: { id: row.id, dispensaryId: row.dispensary_id, crawlRunId: row.crawl_run_id, productCount: row.product_count, fetchedAt: row.fetched_at, storagePath: row.storage_path } }); } return payloads; } /** * List payload metadata without loading files (for browsing/pagination) * * @param pool - Database connection pool * @param options - Query options * @returns Array of metadata rows */ export async function listPayloadMetadata( pool: Pool, options: { dispensaryId?: number; startDate?: Date; endDate?: Date; limit?: number; offset?: number; } = {} ): Promise> { const conditions: string[] = []; const params: any[] = []; let paramIndex = 1; if (options.dispensaryId) { conditions.push(`dispensary_id = $${paramIndex++}`); params.push(options.dispensaryId); } if (options.startDate) { conditions.push(`fetched_at >= $${paramIndex++}`); params.push(options.startDate); } if (options.endDate) { conditions.push(`fetched_at <= $${paramIndex++}`); params.push(options.endDate); } const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; const limit = options.limit || 50; const offset = options.offset || 0; params.push(limit, offset); const result = await pool.query(` SELECT id, dispensary_id, crawl_run_id, storage_path, product_count, size_bytes, size_bytes_raw, fetched_at FROM raw_crawl_payloads ${whereClause} ORDER BY fetched_at DESC LIMIT $${paramIndex++} OFFSET $${paramIndex} `, params); return result.rows.map(row => ({ id: row.id, dispensaryId: row.dispensary_id, crawlRunId: row.crawl_run_id, storagePath: row.storage_path, productCount: row.product_count, sizeBytes: row.size_bytes, sizeBytesRaw: row.size_bytes_raw, fetchedAt: row.fetched_at })); } /** * Result from saving a discovery payload */ export interface SaveDiscoveryPayloadResult { id: number; storagePath: string; sizeBytes: number; sizeBytesRaw: number; checksum: string; } /** * Generate storage path/key for a discovery payload * * MinIO format: payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz * Local format: ./storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz */ function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): string { const year = timestamp.getFullYear(); const month = String(timestamp.getMonth() + 1).padStart(2, '0'); const day = String(timestamp.getDate()).padStart(2, '0'); const ts = timestamp.getTime(); const relativePath = `payloads/discovery/${year}/${month}/${day}/state_${stateCode.toLowerCase()}_${ts}.json.gz`; if (useMinIO) { return relativePath; } else { return path.join(PAYLOAD_BASE_PATH, 'discovery', String(year), month, day, `state_${stateCode.toLowerCase()}_${ts}.json.gz`); } } /** * Save a raw store discovery payload to MinIO/S3 (or filesystem) and record metadata in DB * * @param pool - Database connection pool * @param stateCode - State code (e.g., 'AZ', 'MI') * @param payload - Raw JSON payload from discovery GraphQL * @param storeCount - Number of stores in payload * @returns SaveDiscoveryPayloadResult with file info and DB record ID */ export async function saveDiscoveryPayload( pool: Pool, stateCode: string, payload: any, storeCount: number = 0 ): Promise { const timestamp = new Date(); const storagePath = generateDiscoveryStoragePath(stateCode, timestamp); // Serialize and compress const jsonStr = JSON.stringify(payload); const rawSize = Buffer.byteLength(jsonStr, 'utf8'); const compressed = await gzip(Buffer.from(jsonStr, 'utf8')); const compressedSize = compressed.length; const checksum = calculateChecksum(compressed); // Write to storage backend if (useMinIO) { // Upload to MinIO const client = getMinioClient(); await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, { 'Content-Type': 'application/gzip', 'Content-Encoding': 'gzip', }); } else { // Write to local filesystem await ensureDir(storagePath); await fs.promises.writeFile(storagePath, compressed); } // Record metadata in DB const result = await pool.query(` INSERT INTO raw_crawl_payloads ( payload_type, state_code, storage_path, store_count, size_bytes, size_bytes_raw, fetched_at, checksum_sha256 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id `, [ 'store_discovery', stateCode.toUpperCase(), storagePath, storeCount, compressedSize, rawSize, timestamp, checksum ]); const backend = useMinIO ? 'MinIO' : 'local'; console.log(`[PayloadStorage] Saved discovery payload to ${backend} for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`); return { id: result.rows[0].id, storagePath, sizeBytes: compressedSize, sizeBytesRaw: rawSize, checksum }; } /** * Get the latest discovery payload for a state * * @param pool - Database connection pool * @param stateCode - State code (e.g., 'AZ', 'MI') * @returns Parsed payload and metadata, or null if none exists */ export async function getLatestDiscoveryPayload( pool: Pool, stateCode: string ): Promise<{ payload: any; metadata: any } | null> { const result = await pool.query(` SELECT id, state_code, storage_path, store_count, fetched_at FROM raw_crawl_payloads WHERE payload_type = 'store_discovery' AND state_code = $1 ORDER BY fetched_at DESC LIMIT 1 `, [stateCode.toUpperCase()]); if (result.rows.length === 0) { return null; } const row = result.rows[0]; const payload = await loadPayloadFromPath(row.storage_path); return { payload, metadata: { id: row.id, stateCode: row.state_code, storeCount: row.store_count, fetchedAt: row.fetched_at, storagePath: row.storage_path } }; } /** * Delete old payloads (for retention policy) * * @param pool - Database connection pool * @param olderThan - Delete payloads older than this date * @returns Number of payloads deleted */ export async function deleteOldPayloads( pool: Pool, olderThan: Date ): Promise { // Get paths first const result = await pool.query(` SELECT id, storage_path FROM raw_crawl_payloads WHERE fetched_at < $1 `, [olderThan]); // Delete files for (const row of result.rows) { try { await fs.promises.unlink(row.storage_path); } catch (err: any) { if (err.code !== 'ENOENT') { console.warn(`[PayloadStorage] Failed to delete ${row.storage_path}: ${err.message}`); } } } // Delete DB records await pool.query(` DELETE FROM raw_crawl_payloads WHERE fetched_at < $1 `, [olderThan]); console.log(`[PayloadStorage] Deleted ${result.rows.length} payloads older than ${olderThan.toISOString()}`); return result.rows.length; }