Merge pull request 'feat: Add MinIO/S3 support for payload storage' (#63) from feat/minio-payload-storage into master

Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/63
This commit is contained in:
kelly
2025-12-12 19:00:29 +00:00

View File

@@ -5,10 +5,13 @@
*
* Design Pattern: Metadata/Payload Separation
* - Metadata in PostgreSQL (raw_crawl_payloads table): Small, indexed, queryable
* - Payload on filesystem: Gzipped JSON at storage_path
* - Payload stored in MinIO/S3 (or local filesystem as fallback): Gzipped JSON
*
* Storage structure:
* /storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
* Storage structure (MinIO):
* cannaiq/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
*
* Storage structure (Local fallback):
* ./storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
*
* Benefits:
* - Compare any two crawls to see what changed
@@ -16,6 +19,7 @@
* - Debug issues by seeing exactly what the API returned
* - DB stays small, backups stay fast
* - ~90% compression (1.5MB -> 150KB per crawl)
* - Shared storage accessible by all worker pods (MinIO)
*/
import * as fs from 'fs';
@@ -24,13 +28,47 @@ import * as zlib from 'zlib';
import { promisify } from 'util';
import { Pool } from 'pg';
import * as crypto from 'crypto';
import * as Minio from 'minio';
const gzip = promisify(zlib.gzip);
const gunzip = promisify(zlib.gunzip);
// Base path for payload storage (matches image storage pattern)
// Base path for payload storage (used for local fallback and as key prefix in MinIO)
const PAYLOAD_BASE_PATH = process.env.PAYLOAD_STORAGE_PATH || './storage/payloads';
// MinIO configuration
const MINIO_ENDPOINT = process.env.MINIO_ENDPOINT;
const MINIO_PORT = parseInt(process.env.MINIO_PORT || '443');
const MINIO_USE_SSL = process.env.MINIO_USE_SSL === 'true';
const MINIO_ACCESS_KEY = process.env.MINIO_ACCESS_KEY;
const MINIO_SECRET_KEY = process.env.MINIO_SECRET_KEY;
const MINIO_BUCKET = process.env.MINIO_BUCKET || 'cannaiq';
// Check if MinIO is configured
const useMinIO = !!(MINIO_ENDPOINT && MINIO_ACCESS_KEY && MINIO_SECRET_KEY);
let minioClient: Minio.Client | null = null;
function getMinioClient(): Minio.Client {
if (!minioClient && useMinIO) {
minioClient = new Minio.Client({
endPoint: MINIO_ENDPOINT!,
port: MINIO_PORT,
useSSL: MINIO_USE_SSL,
accessKey: MINIO_ACCESS_KEY!,
secretKey: MINIO_SECRET_KEY!,
});
}
return minioClient!;
}
// Log which storage backend we're using
if (useMinIO) {
console.log(`[PayloadStorage] Using MinIO storage: ${MINIO_ENDPOINT}/${MINIO_BUCKET}`);
} else {
console.log(`[PayloadStorage] Using local filesystem storage: ${PAYLOAD_BASE_PATH}`);
}
/**
* Result from saving a payload
*/
@@ -58,9 +96,10 @@ export interface LoadPayloadResult {
}
/**
* Generate storage path for a payload
* Generate storage path/key for a payload
*
* Format: /storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
* MinIO format: payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
* Local format: ./storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
*/
function generateStoragePath(dispensaryId: number, timestamp: Date): string {
const year = timestamp.getFullYear();
@@ -68,13 +107,15 @@ function generateStoragePath(dispensaryId: number, timestamp: Date): string {
const day = String(timestamp.getDate()).padStart(2, '0');
const ts = timestamp.getTime();
return path.join(
PAYLOAD_BASE_PATH,
String(year),
month,
day,
`store_${dispensaryId}_${ts}.json.gz`
);
const relativePath = `payloads/${year}/${month}/${day}/store_${dispensaryId}_${ts}.json.gz`;
if (useMinIO) {
// MinIO uses forward slashes, no leading slash
return relativePath;
} else {
// Local filesystem uses OS-specific path
return path.join(PAYLOAD_BASE_PATH, String(year), month, day, `store_${dispensaryId}_${ts}.json.gz`);
}
}
/**
@@ -93,7 +134,7 @@ function calculateChecksum(data: Buffer): string {
}
/**
* Save a raw crawl payload to filesystem and record metadata in DB
* Save a raw crawl payload to MinIO/S3 (or filesystem) and record metadata in DB
*
* @param pool - Database connection pool
* @param dispensaryId - ID of the dispensary
@@ -119,9 +160,19 @@ export async function saveRawPayload(
const compressedSize = compressed.length;
const checksum = calculateChecksum(compressed);
// Write to filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
// Write to storage backend
if (useMinIO) {
// Upload to MinIO
const client = getMinioClient();
await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, {
'Content-Type': 'application/gzip',
'Content-Encoding': 'gzip',
});
} else {
// Write to local filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
}
// Record metadata in DB
const result = await pool.query(`
@@ -147,7 +198,8 @@ export async function saveRawPayload(
checksum
]);
console.log(`[PayloadStorage] Saved payload for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`);
const backend = useMinIO ? 'MinIO' : 'local';
console.log(`[PayloadStorage] Saved payload to ${backend} for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`);
return {
id: result.rows[0].id,
@@ -196,13 +248,32 @@ export async function loadRawPayloadById(
}
/**
* Load a raw payload directly from filesystem path
* Load a raw payload directly from storage path (MinIO or filesystem)
*
* @param storagePath - Path to gzipped JSON file
* @param storagePath - Path/key to gzipped JSON file
* @returns Parsed JSON payload
*/
export async function loadPayloadFromPath(storagePath: string): Promise<any> {
const compressed = await fs.promises.readFile(storagePath);
let compressed: Buffer;
// Determine if path looks like MinIO key (starts with payloads/) or local path
const isMinIOPath = storagePath.startsWith('payloads/') && useMinIO;
if (isMinIOPath) {
// Download from MinIO
const client = getMinioClient();
const chunks: Buffer[] = [];
const stream = await client.getObject(MINIO_BUCKET, storagePath);
for await (const chunk of stream) {
chunks.push(chunk as Buffer);
}
compressed = Buffer.concat(chunks);
} else {
// Read from local filesystem
compressed = await fs.promises.readFile(storagePath);
}
const decompressed = await gunzip(compressed);
return JSON.parse(decompressed.toString('utf8'));
}
@@ -378,9 +449,10 @@ export interface SaveDiscoveryPayloadResult {
}
/**
* Generate storage path for a discovery payload
* Generate storage path/key for a discovery payload
*
* Format: /storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
* MinIO format: payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
* Local format: ./storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
*/
function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): string {
const year = timestamp.getFullYear();
@@ -388,18 +460,17 @@ function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): strin
const day = String(timestamp.getDate()).padStart(2, '0');
const ts = timestamp.getTime();
return path.join(
PAYLOAD_BASE_PATH,
'discovery',
String(year),
month,
day,
`state_${stateCode.toLowerCase()}_${ts}.json.gz`
);
const relativePath = `payloads/discovery/${year}/${month}/${day}/state_${stateCode.toLowerCase()}_${ts}.json.gz`;
if (useMinIO) {
return relativePath;
} else {
return path.join(PAYLOAD_BASE_PATH, 'discovery', String(year), month, day, `state_${stateCode.toLowerCase()}_${ts}.json.gz`);
}
}
/**
* Save a raw store discovery payload to filesystem and record metadata in DB
* Save a raw store discovery payload to MinIO/S3 (or filesystem) and record metadata in DB
*
* @param pool - Database connection pool
* @param stateCode - State code (e.g., 'AZ', 'MI')
@@ -423,9 +494,19 @@ export async function saveDiscoveryPayload(
const compressedSize = compressed.length;
const checksum = calculateChecksum(compressed);
// Write to filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
// Write to storage backend
if (useMinIO) {
// Upload to MinIO
const client = getMinioClient();
await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, {
'Content-Type': 'application/gzip',
'Content-Encoding': 'gzip',
});
} else {
// Write to local filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
}
// Record metadata in DB
const result = await pool.query(`
@@ -451,7 +532,8 @@ export async function saveDiscoveryPayload(
checksum
]);
console.log(`[PayloadStorage] Saved discovery payload for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`);
const backend = useMinIO ? 'MinIO' : 'local';
console.log(`[PayloadStorage] Saved discovery payload to ${backend} for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`);
return {
id: result.rows[0].id,