diff --git a/backend/src/utils/payload-storage.ts b/backend/src/utils/payload-storage.ts index b30254ee..7c60872b 100644 --- a/backend/src/utils/payload-storage.ts +++ b/backend/src/utils/payload-storage.ts @@ -5,10 +5,13 @@ * * Design Pattern: Metadata/Payload Separation * - Metadata in PostgreSQL (raw_crawl_payloads table): Small, indexed, queryable - * - Payload on filesystem: Gzipped JSON at storage_path + * - Payload stored in MinIO/S3 (or local filesystem as fallback): Gzipped JSON * - * Storage structure: - * /storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz + * Storage structure (MinIO): + * cannaiq/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz + * + * Storage structure (Local fallback): + * ./storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz * * Benefits: * - Compare any two crawls to see what changed @@ -16,6 +19,7 @@ * - Debug issues by seeing exactly what the API returned * - DB stays small, backups stay fast * - ~90% compression (1.5MB -> 150KB per crawl) + * - Shared storage accessible by all worker pods (MinIO) */ import * as fs from 'fs'; @@ -24,13 +28,47 @@ import * as zlib from 'zlib'; import { promisify } from 'util'; import { Pool } from 'pg'; import * as crypto from 'crypto'; +import * as Minio from 'minio'; const gzip = promisify(zlib.gzip); const gunzip = promisify(zlib.gunzip); -// Base path for payload storage (matches image storage pattern) +// Base path for payload storage (used for local fallback and as key prefix in MinIO) const PAYLOAD_BASE_PATH = process.env.PAYLOAD_STORAGE_PATH || './storage/payloads'; +// MinIO configuration +const MINIO_ENDPOINT = process.env.MINIO_ENDPOINT; +const MINIO_PORT = parseInt(process.env.MINIO_PORT || '443'); +const MINIO_USE_SSL = process.env.MINIO_USE_SSL === 'true'; +const MINIO_ACCESS_KEY = process.env.MINIO_ACCESS_KEY; +const MINIO_SECRET_KEY = process.env.MINIO_SECRET_KEY; +const MINIO_BUCKET = process.env.MINIO_BUCKET || 'cannaiq'; + +// Check if MinIO is configured +const useMinIO = !!(MINIO_ENDPOINT && MINIO_ACCESS_KEY && MINIO_SECRET_KEY); + +let minioClient: Minio.Client | null = null; + +function getMinioClient(): Minio.Client { + if (!minioClient && useMinIO) { + minioClient = new Minio.Client({ + endPoint: MINIO_ENDPOINT!, + port: MINIO_PORT, + useSSL: MINIO_USE_SSL, + accessKey: MINIO_ACCESS_KEY!, + secretKey: MINIO_SECRET_KEY!, + }); + } + return minioClient!; +} + +// Log which storage backend we're using +if (useMinIO) { + console.log(`[PayloadStorage] Using MinIO storage: ${MINIO_ENDPOINT}/${MINIO_BUCKET}`); +} else { + console.log(`[PayloadStorage] Using local filesystem storage: ${PAYLOAD_BASE_PATH}`); +} + /** * Result from saving a payload */ @@ -58,9 +96,10 @@ export interface LoadPayloadResult { } /** - * Generate storage path for a payload + * Generate storage path/key for a payload * - * Format: /storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz + * MinIO format: payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz + * Local format: ./storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz */ function generateStoragePath(dispensaryId: number, timestamp: Date): string { const year = timestamp.getFullYear(); @@ -68,13 +107,15 @@ function generateStoragePath(dispensaryId: number, timestamp: Date): string { const day = String(timestamp.getDate()).padStart(2, '0'); const ts = timestamp.getTime(); - return path.join( - PAYLOAD_BASE_PATH, - String(year), - month, - day, - `store_${dispensaryId}_${ts}.json.gz` - ); + const relativePath = `payloads/${year}/${month}/${day}/store_${dispensaryId}_${ts}.json.gz`; + + if (useMinIO) { + // MinIO uses forward slashes, no leading slash + return relativePath; + } else { + // Local filesystem uses OS-specific path + return path.join(PAYLOAD_BASE_PATH, String(year), month, day, `store_${dispensaryId}_${ts}.json.gz`); + } } /** @@ -93,7 +134,7 @@ function calculateChecksum(data: Buffer): string { } /** - * Save a raw crawl payload to filesystem and record metadata in DB + * Save a raw crawl payload to MinIO/S3 (or filesystem) and record metadata in DB * * @param pool - Database connection pool * @param dispensaryId - ID of the dispensary @@ -119,9 +160,19 @@ export async function saveRawPayload( const compressedSize = compressed.length; const checksum = calculateChecksum(compressed); - // Write to filesystem - await ensureDir(storagePath); - await fs.promises.writeFile(storagePath, compressed); + // Write to storage backend + if (useMinIO) { + // Upload to MinIO + const client = getMinioClient(); + await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, { + 'Content-Type': 'application/gzip', + 'Content-Encoding': 'gzip', + }); + } else { + // Write to local filesystem + await ensureDir(storagePath); + await fs.promises.writeFile(storagePath, compressed); + } // Record metadata in DB const result = await pool.query(` @@ -147,7 +198,8 @@ export async function saveRawPayload( checksum ]); - console.log(`[PayloadStorage] Saved payload for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`); + const backend = useMinIO ? 'MinIO' : 'local'; + console.log(`[PayloadStorage] Saved payload to ${backend} for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`); return { id: result.rows[0].id, @@ -196,13 +248,32 @@ export async function loadRawPayloadById( } /** - * Load a raw payload directly from filesystem path + * Load a raw payload directly from storage path (MinIO or filesystem) * - * @param storagePath - Path to gzipped JSON file + * @param storagePath - Path/key to gzipped JSON file * @returns Parsed JSON payload */ export async function loadPayloadFromPath(storagePath: string): Promise { - const compressed = await fs.promises.readFile(storagePath); + let compressed: Buffer; + + // Determine if path looks like MinIO key (starts with payloads/) or local path + const isMinIOPath = storagePath.startsWith('payloads/') && useMinIO; + + if (isMinIOPath) { + // Download from MinIO + const client = getMinioClient(); + const chunks: Buffer[] = []; + const stream = await client.getObject(MINIO_BUCKET, storagePath); + + for await (const chunk of stream) { + chunks.push(chunk as Buffer); + } + compressed = Buffer.concat(chunks); + } else { + // Read from local filesystem + compressed = await fs.promises.readFile(storagePath); + } + const decompressed = await gunzip(compressed); return JSON.parse(decompressed.toString('utf8')); } @@ -378,9 +449,10 @@ export interface SaveDiscoveryPayloadResult { } /** - * Generate storage path for a discovery payload + * Generate storage path/key for a discovery payload * - * Format: /storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz + * MinIO format: payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz + * Local format: ./storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz */ function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): string { const year = timestamp.getFullYear(); @@ -388,18 +460,17 @@ function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): strin const day = String(timestamp.getDate()).padStart(2, '0'); const ts = timestamp.getTime(); - return path.join( - PAYLOAD_BASE_PATH, - 'discovery', - String(year), - month, - day, - `state_${stateCode.toLowerCase()}_${ts}.json.gz` - ); + const relativePath = `payloads/discovery/${year}/${month}/${day}/state_${stateCode.toLowerCase()}_${ts}.json.gz`; + + if (useMinIO) { + return relativePath; + } else { + return path.join(PAYLOAD_BASE_PATH, 'discovery', String(year), month, day, `state_${stateCode.toLowerCase()}_${ts}.json.gz`); + } } /** - * Save a raw store discovery payload to filesystem and record metadata in DB + * Save a raw store discovery payload to MinIO/S3 (or filesystem) and record metadata in DB * * @param pool - Database connection pool * @param stateCode - State code (e.g., 'AZ', 'MI') @@ -423,9 +494,19 @@ export async function saveDiscoveryPayload( const compressedSize = compressed.length; const checksum = calculateChecksum(compressed); - // Write to filesystem - await ensureDir(storagePath); - await fs.promises.writeFile(storagePath, compressed); + // Write to storage backend + if (useMinIO) { + // Upload to MinIO + const client = getMinioClient(); + await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, { + 'Content-Type': 'application/gzip', + 'Content-Encoding': 'gzip', + }); + } else { + // Write to local filesystem + await ensureDir(storagePath); + await fs.promises.writeFile(storagePath, compressed); + } // Record metadata in DB const result = await pool.query(` @@ -451,7 +532,8 @@ export async function saveDiscoveryPayload( checksum ]); - console.log(`[PayloadStorage] Saved discovery payload for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`); + const backend = useMinIO ? 'MinIO' : 'local'; + console.log(`[PayloadStorage] Saved discovery payload to ${backend} for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`); return { id: result.rows[0].id,