feat: Add MinIO/S3 support for payload storage
- Update payload-storage.ts to use MinIO when configured
- Payloads stored at: cannaiq/payloads/{year}/{month}/{day}/store_{id}_{ts}.json.gz
- Falls back to local filesystem when MINIO_* env vars not set
- Enables shared storage across all worker pods
- Fixes ephemeral storage issue where payloads were lost on pod restart
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -5,10 +5,13 @@
|
|||||||
*
|
*
|
||||||
* Design Pattern: Metadata/Payload Separation
|
* Design Pattern: Metadata/Payload Separation
|
||||||
* - Metadata in PostgreSQL (raw_crawl_payloads table): Small, indexed, queryable
|
* - Metadata in PostgreSQL (raw_crawl_payloads table): Small, indexed, queryable
|
||||||
* - Payload on filesystem: Gzipped JSON at storage_path
|
* - Payload stored in MinIO/S3 (or local filesystem as fallback): Gzipped JSON
|
||||||
*
|
*
|
||||||
* Storage structure:
|
* Storage structure (MinIO):
|
||||||
* /storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
|
* cannaiq/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
|
||||||
|
*
|
||||||
|
* Storage structure (Local fallback):
|
||||||
|
* ./storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
|
||||||
*
|
*
|
||||||
* Benefits:
|
* Benefits:
|
||||||
* - Compare any two crawls to see what changed
|
* - Compare any two crawls to see what changed
|
||||||
@@ -16,6 +19,7 @@
|
|||||||
* - Debug issues by seeing exactly what the API returned
|
* - Debug issues by seeing exactly what the API returned
|
||||||
* - DB stays small, backups stay fast
|
* - DB stays small, backups stay fast
|
||||||
* - ~90% compression (1.5MB -> 150KB per crawl)
|
* - ~90% compression (1.5MB -> 150KB per crawl)
|
||||||
|
* - Shared storage accessible by all worker pods (MinIO)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import * as fs from 'fs';
|
import * as fs from 'fs';
|
||||||
@@ -24,13 +28,47 @@ import * as zlib from 'zlib';
|
|||||||
import { promisify } from 'util';
|
import { promisify } from 'util';
|
||||||
import { Pool } from 'pg';
|
import { Pool } from 'pg';
|
||||||
import * as crypto from 'crypto';
|
import * as crypto from 'crypto';
|
||||||
|
import * as Minio from 'minio';
|
||||||
|
|
||||||
const gzip = promisify(zlib.gzip);
|
const gzip = promisify(zlib.gzip);
|
||||||
const gunzip = promisify(zlib.gunzip);
|
const gunzip = promisify(zlib.gunzip);
|
||||||
|
|
||||||
// Base path for payload storage (matches image storage pattern)
|
// Base path for payload storage (used for local fallback and as key prefix in MinIO)
|
||||||
const PAYLOAD_BASE_PATH = process.env.PAYLOAD_STORAGE_PATH || './storage/payloads';
|
const PAYLOAD_BASE_PATH = process.env.PAYLOAD_STORAGE_PATH || './storage/payloads';
|
||||||
|
|
||||||
|
// MinIO configuration
|
||||||
|
const MINIO_ENDPOINT = process.env.MINIO_ENDPOINT;
|
||||||
|
const MINIO_PORT = parseInt(process.env.MINIO_PORT || '443');
|
||||||
|
const MINIO_USE_SSL = process.env.MINIO_USE_SSL === 'true';
|
||||||
|
const MINIO_ACCESS_KEY = process.env.MINIO_ACCESS_KEY;
|
||||||
|
const MINIO_SECRET_KEY = process.env.MINIO_SECRET_KEY;
|
||||||
|
const MINIO_BUCKET = process.env.MINIO_BUCKET || 'cannaiq';
|
||||||
|
|
||||||
|
// Check if MinIO is configured
|
||||||
|
const useMinIO = !!(MINIO_ENDPOINT && MINIO_ACCESS_KEY && MINIO_SECRET_KEY);
|
||||||
|
|
||||||
|
let minioClient: Minio.Client | null = null;
|
||||||
|
|
||||||
|
function getMinioClient(): Minio.Client {
|
||||||
|
if (!minioClient && useMinIO) {
|
||||||
|
minioClient = new Minio.Client({
|
||||||
|
endPoint: MINIO_ENDPOINT!,
|
||||||
|
port: MINIO_PORT,
|
||||||
|
useSSL: MINIO_USE_SSL,
|
||||||
|
accessKey: MINIO_ACCESS_KEY!,
|
||||||
|
secretKey: MINIO_SECRET_KEY!,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return minioClient!;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log which storage backend we're using
|
||||||
|
if (useMinIO) {
|
||||||
|
console.log(`[PayloadStorage] Using MinIO storage: ${MINIO_ENDPOINT}/${MINIO_BUCKET}`);
|
||||||
|
} else {
|
||||||
|
console.log(`[PayloadStorage] Using local filesystem storage: ${PAYLOAD_BASE_PATH}`);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Result from saving a payload
|
* Result from saving a payload
|
||||||
*/
|
*/
|
||||||
@@ -58,9 +96,10 @@ export interface LoadPayloadResult {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate storage path for a payload
|
* Generate storage path/key for a payload
|
||||||
*
|
*
|
||||||
* Format: /storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
|
* MinIO format: payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
|
||||||
|
* Local format: ./storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
|
||||||
*/
|
*/
|
||||||
function generateStoragePath(dispensaryId: number, timestamp: Date): string {
|
function generateStoragePath(dispensaryId: number, timestamp: Date): string {
|
||||||
const year = timestamp.getFullYear();
|
const year = timestamp.getFullYear();
|
||||||
@@ -68,13 +107,15 @@ function generateStoragePath(dispensaryId: number, timestamp: Date): string {
|
|||||||
const day = String(timestamp.getDate()).padStart(2, '0');
|
const day = String(timestamp.getDate()).padStart(2, '0');
|
||||||
const ts = timestamp.getTime();
|
const ts = timestamp.getTime();
|
||||||
|
|
||||||
return path.join(
|
const relativePath = `payloads/${year}/${month}/${day}/store_${dispensaryId}_${ts}.json.gz`;
|
||||||
PAYLOAD_BASE_PATH,
|
|
||||||
String(year),
|
if (useMinIO) {
|
||||||
month,
|
// MinIO uses forward slashes, no leading slash
|
||||||
day,
|
return relativePath;
|
||||||
`store_${dispensaryId}_${ts}.json.gz`
|
} else {
|
||||||
);
|
// Local filesystem uses OS-specific path
|
||||||
|
return path.join(PAYLOAD_BASE_PATH, String(year), month, day, `store_${dispensaryId}_${ts}.json.gz`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -93,7 +134,7 @@ function calculateChecksum(data: Buffer): string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Save a raw crawl payload to filesystem and record metadata in DB
|
* Save a raw crawl payload to MinIO/S3 (or filesystem) and record metadata in DB
|
||||||
*
|
*
|
||||||
* @param pool - Database connection pool
|
* @param pool - Database connection pool
|
||||||
* @param dispensaryId - ID of the dispensary
|
* @param dispensaryId - ID of the dispensary
|
||||||
@@ -119,9 +160,19 @@ export async function saveRawPayload(
|
|||||||
const compressedSize = compressed.length;
|
const compressedSize = compressed.length;
|
||||||
const checksum = calculateChecksum(compressed);
|
const checksum = calculateChecksum(compressed);
|
||||||
|
|
||||||
// Write to filesystem
|
// Write to storage backend
|
||||||
|
if (useMinIO) {
|
||||||
|
// Upload to MinIO
|
||||||
|
const client = getMinioClient();
|
||||||
|
await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, {
|
||||||
|
'Content-Type': 'application/gzip',
|
||||||
|
'Content-Encoding': 'gzip',
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// Write to local filesystem
|
||||||
await ensureDir(storagePath);
|
await ensureDir(storagePath);
|
||||||
await fs.promises.writeFile(storagePath, compressed);
|
await fs.promises.writeFile(storagePath, compressed);
|
||||||
|
}
|
||||||
|
|
||||||
// Record metadata in DB
|
// Record metadata in DB
|
||||||
const result = await pool.query(`
|
const result = await pool.query(`
|
||||||
@@ -147,7 +198,8 @@ export async function saveRawPayload(
|
|||||||
checksum
|
checksum
|
||||||
]);
|
]);
|
||||||
|
|
||||||
console.log(`[PayloadStorage] Saved payload for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`);
|
const backend = useMinIO ? 'MinIO' : 'local';
|
||||||
|
console.log(`[PayloadStorage] Saved payload to ${backend} for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
id: result.rows[0].id,
|
id: result.rows[0].id,
|
||||||
@@ -196,13 +248,32 @@ export async function loadRawPayloadById(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load a raw payload directly from filesystem path
|
* Load a raw payload directly from storage path (MinIO or filesystem)
|
||||||
*
|
*
|
||||||
* @param storagePath - Path to gzipped JSON file
|
* @param storagePath - Path/key to gzipped JSON file
|
||||||
* @returns Parsed JSON payload
|
* @returns Parsed JSON payload
|
||||||
*/
|
*/
|
||||||
export async function loadPayloadFromPath(storagePath: string): Promise<any> {
|
export async function loadPayloadFromPath(storagePath: string): Promise<any> {
|
||||||
const compressed = await fs.promises.readFile(storagePath);
|
let compressed: Buffer;
|
||||||
|
|
||||||
|
// Determine if path looks like MinIO key (starts with payloads/) or local path
|
||||||
|
const isMinIOPath = storagePath.startsWith('payloads/') && useMinIO;
|
||||||
|
|
||||||
|
if (isMinIOPath) {
|
||||||
|
// Download from MinIO
|
||||||
|
const client = getMinioClient();
|
||||||
|
const chunks: Buffer[] = [];
|
||||||
|
const stream = await client.getObject(MINIO_BUCKET, storagePath);
|
||||||
|
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
chunks.push(chunk as Buffer);
|
||||||
|
}
|
||||||
|
compressed = Buffer.concat(chunks);
|
||||||
|
} else {
|
||||||
|
// Read from local filesystem
|
||||||
|
compressed = await fs.promises.readFile(storagePath);
|
||||||
|
}
|
||||||
|
|
||||||
const decompressed = await gunzip(compressed);
|
const decompressed = await gunzip(compressed);
|
||||||
return JSON.parse(decompressed.toString('utf8'));
|
return JSON.parse(decompressed.toString('utf8'));
|
||||||
}
|
}
|
||||||
@@ -378,9 +449,10 @@ export interface SaveDiscoveryPayloadResult {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate storage path for a discovery payload
|
* Generate storage path/key for a discovery payload
|
||||||
*
|
*
|
||||||
* Format: /storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
|
* MinIO format: payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
|
||||||
|
* Local format: ./storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
|
||||||
*/
|
*/
|
||||||
function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): string {
|
function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): string {
|
||||||
const year = timestamp.getFullYear();
|
const year = timestamp.getFullYear();
|
||||||
@@ -388,18 +460,17 @@ function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): strin
|
|||||||
const day = String(timestamp.getDate()).padStart(2, '0');
|
const day = String(timestamp.getDate()).padStart(2, '0');
|
||||||
const ts = timestamp.getTime();
|
const ts = timestamp.getTime();
|
||||||
|
|
||||||
return path.join(
|
const relativePath = `payloads/discovery/${year}/${month}/${day}/state_${stateCode.toLowerCase()}_${ts}.json.gz`;
|
||||||
PAYLOAD_BASE_PATH,
|
|
||||||
'discovery',
|
if (useMinIO) {
|
||||||
String(year),
|
return relativePath;
|
||||||
month,
|
} else {
|
||||||
day,
|
return path.join(PAYLOAD_BASE_PATH, 'discovery', String(year), month, day, `state_${stateCode.toLowerCase()}_${ts}.json.gz`);
|
||||||
`state_${stateCode.toLowerCase()}_${ts}.json.gz`
|
}
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Save a raw store discovery payload to filesystem and record metadata in DB
|
* Save a raw store discovery payload to MinIO/S3 (or filesystem) and record metadata in DB
|
||||||
*
|
*
|
||||||
* @param pool - Database connection pool
|
* @param pool - Database connection pool
|
||||||
* @param stateCode - State code (e.g., 'AZ', 'MI')
|
* @param stateCode - State code (e.g., 'AZ', 'MI')
|
||||||
@@ -423,9 +494,19 @@ export async function saveDiscoveryPayload(
|
|||||||
const compressedSize = compressed.length;
|
const compressedSize = compressed.length;
|
||||||
const checksum = calculateChecksum(compressed);
|
const checksum = calculateChecksum(compressed);
|
||||||
|
|
||||||
// Write to filesystem
|
// Write to storage backend
|
||||||
|
if (useMinIO) {
|
||||||
|
// Upload to MinIO
|
||||||
|
const client = getMinioClient();
|
||||||
|
await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, {
|
||||||
|
'Content-Type': 'application/gzip',
|
||||||
|
'Content-Encoding': 'gzip',
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// Write to local filesystem
|
||||||
await ensureDir(storagePath);
|
await ensureDir(storagePath);
|
||||||
await fs.promises.writeFile(storagePath, compressed);
|
await fs.promises.writeFile(storagePath, compressed);
|
||||||
|
}
|
||||||
|
|
||||||
// Record metadata in DB
|
// Record metadata in DB
|
||||||
const result = await pool.query(`
|
const result = await pool.query(`
|
||||||
@@ -451,7 +532,8 @@ export async function saveDiscoveryPayload(
|
|||||||
checksum
|
checksum
|
||||||
]);
|
]);
|
||||||
|
|
||||||
console.log(`[PayloadStorage] Saved discovery payload for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`);
|
const backend = useMinIO ? 'MinIO' : 'local';
|
||||||
|
console.log(`[PayloadStorage] Saved discovery payload to ${backend} for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
id: result.rows[0].id,
|
id: result.rows[0].id,
|
||||||
|
|||||||
Reference in New Issue
Block a user