feat: Add payloads dashboard, disable snapshots, fix scheduler
Frontend: - Add PayloadsDashboard page with search, filter, view, and diff - Update TasksDashboard default sort: pending → claimed → completed - Add payload API methods to api.ts Backend: - Disable snapshot creation in product-refresh handler - Remove product_refresh from schedule role options - Disable compression in payload-storage (plain JSON for debugging) - Fix task-scheduler: map 'embedded' menu_type to 'dutchie' platform - Fix task-scheduler: use schedule.interval_hours as skipRecentHours 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -323,9 +323,13 @@ class TaskScheduler {
|
||||
}
|
||||
|
||||
// Group dispensaries by platform (menu_type)
|
||||
// Map 'embedded' to 'dutchie' since embedded is just a menu display type, not a platform
|
||||
const byPlatform: Record<string, number[]> = {};
|
||||
for (const d of dispensaries) {
|
||||
const platform = d.menu_type || 'dutchie';
|
||||
let platform = d.menu_type || 'dutchie';
|
||||
if (platform === 'embedded') {
|
||||
platform = 'dutchie';
|
||||
}
|
||||
if (!byPlatform[platform]) {
|
||||
byPlatform[platform] = [];
|
||||
}
|
||||
@@ -333,6 +337,7 @@ class TaskScheduler {
|
||||
}
|
||||
|
||||
// Create tasks per platform so each task routes to the correct handler
|
||||
// Use schedule.interval_hours as skipRecentHours to allow re-runs at schedule frequency
|
||||
let totalCreated = 0;
|
||||
for (const [platform, ids] of Object.entries(byPlatform)) {
|
||||
if (ids.length > 0) {
|
||||
@@ -346,6 +351,7 @@ class TaskScheduler {
|
||||
{
|
||||
source: 'schedule',
|
||||
source_schedule_id: schedule.id,
|
||||
skipRecentHours: schedule.interval_hours, // Match schedule frequency
|
||||
}
|
||||
);
|
||||
totalCreated += created;
|
||||
|
||||
@@ -241,19 +241,9 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
|
||||
|
||||
await ctx.heartbeat();
|
||||
|
||||
// Create snapshots
|
||||
console.log(`[ProductRefresh] Creating snapshots...`);
|
||||
|
||||
const snapshotsResult = await createStoreProductSnapshots(
|
||||
pool,
|
||||
dispensaryId,
|
||||
normalizationResult.products,
|
||||
normalizationResult.pricing,
|
||||
normalizationResult.availability,
|
||||
null // No crawl_run_id in new system
|
||||
);
|
||||
|
||||
console.log(`[ProductRefresh] Created ${snapshotsResult.created} snapshots`);
|
||||
// Snapshots disabled - logic needs review
|
||||
// TODO: Re-enable when snapshot strategy is finalized
|
||||
const snapshotsResult = { created: 0 };
|
||||
|
||||
await ctx.heartbeat();
|
||||
|
||||
|
||||
@@ -128,14 +128,15 @@ function generateStoragePath(dispensaryId: number, timestamp: Date, platform: st
|
||||
const ts = timestamp.getTime();
|
||||
const taskHash = taskId ? `_t${taskIdHash(taskId)}` : '';
|
||||
|
||||
const relativePath = `payloads/${platform}/${year}/${month}/${day}/store_${dispensaryId}${taskHash}_${ts}.json.gz`;
|
||||
// Compression disabled - store as plain JSON for easier debugging
|
||||
const relativePath = `payloads/${platform}/${year}/${month}/${day}/store_${dispensaryId}${taskHash}_${ts}.json`;
|
||||
|
||||
if (useMinIO) {
|
||||
// MinIO uses forward slashes, no leading slash
|
||||
return relativePath;
|
||||
} else {
|
||||
// Local filesystem uses OS-specific path
|
||||
return path.join(PAYLOAD_BASE_PATH, platform, String(year), month, day, `store_${dispensaryId}${taskHash}_${ts}.json.gz`);
|
||||
return path.join(PAYLOAD_BASE_PATH, platform, String(year), month, day, `store_${dispensaryId}${taskHash}_${ts}.json`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -178,28 +179,26 @@ export async function saveRawPayload(
|
||||
const timestamp = new Date();
|
||||
const storagePath = generateStoragePath(dispensaryId, timestamp, platform, taskId);
|
||||
|
||||
// Serialize and compress
|
||||
const jsonStr = JSON.stringify(payload);
|
||||
// Serialize as plain JSON (compression disabled for easier debugging)
|
||||
const jsonStr = JSON.stringify(payload, null, 2);
|
||||
const rawSize = Buffer.byteLength(jsonStr, 'utf8');
|
||||
const compressed = await gzip(Buffer.from(jsonStr, 'utf8'));
|
||||
const compressedSize = compressed.length;
|
||||
const checksum = calculateChecksum(compressed);
|
||||
const jsonBuffer = Buffer.from(jsonStr, 'utf8');
|
||||
const checksum = calculateChecksum(jsonBuffer);
|
||||
|
||||
// Write to storage backend
|
||||
if (useMinIO) {
|
||||
// Upload to MinIO
|
||||
const client = getMinioClient();
|
||||
await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, {
|
||||
'Content-Type': 'application/gzip',
|
||||
'Content-Encoding': 'gzip',
|
||||
await client.putObject(MINIO_BUCKET, storagePath, jsonBuffer, rawSize, {
|
||||
'Content-Type': 'application/json',
|
||||
});
|
||||
} else {
|
||||
// Write to local filesystem
|
||||
await ensureDir(storagePath);
|
||||
await fs.promises.writeFile(storagePath, compressed);
|
||||
await fs.promises.writeFile(storagePath, jsonBuffer);
|
||||
}
|
||||
|
||||
// Record metadata in DB
|
||||
// Record metadata in DB (size_bytes = size_bytes_raw since no compression)
|
||||
const result = await pool.query(`
|
||||
INSERT INTO raw_crawl_payloads (
|
||||
crawl_run_id,
|
||||
@@ -217,7 +216,7 @@ export async function saveRawPayload(
|
||||
dispensaryId,
|
||||
storagePath,
|
||||
productCount,
|
||||
compressedSize,
|
||||
rawSize, // No compression, same as raw
|
||||
rawSize,
|
||||
timestamp,
|
||||
checksum
|
||||
@@ -229,12 +228,12 @@ export async function saveRawPayload(
|
||||
`, [dispensaryId, timestamp]);
|
||||
|
||||
const backend = useMinIO ? 'MinIO' : 'local';
|
||||
console.log(`[PayloadStorage] Saved payload to ${backend} for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`);
|
||||
console.log(`[PayloadStorage] Saved payload to ${backend} for store ${dispensaryId}: ${storagePath} (${(rawSize / 1024).toFixed(1)}KB)`);
|
||||
|
||||
return {
|
||||
id: result.rows[0].id,
|
||||
storagePath,
|
||||
sizeBytes: compressedSize,
|
||||
sizeBytes: rawSize,
|
||||
sizeBytesRaw: rawSize,
|
||||
checksum
|
||||
};
|
||||
@@ -284,7 +283,7 @@ export async function loadRawPayloadById(
|
||||
* @returns Parsed JSON payload
|
||||
*/
|
||||
export async function loadPayloadFromPath(storagePath: string): Promise<any> {
|
||||
let compressed: Buffer;
|
||||
let fileBuffer: Buffer;
|
||||
|
||||
// Determine if path looks like MinIO key (starts with payloads/) or local path
|
||||
const isMinIOPath = storagePath.startsWith('payloads/') && useMinIO;
|
||||
@@ -298,14 +297,19 @@ export async function loadPayloadFromPath(storagePath: string): Promise<any> {
|
||||
for await (const chunk of stream) {
|
||||
chunks.push(chunk as Buffer);
|
||||
}
|
||||
compressed = Buffer.concat(chunks);
|
||||
fileBuffer = Buffer.concat(chunks);
|
||||
} else {
|
||||
// Read from local filesystem
|
||||
compressed = await fs.promises.readFile(storagePath);
|
||||
fileBuffer = await fs.promises.readFile(storagePath);
|
||||
}
|
||||
|
||||
const decompressed = await gunzip(compressed);
|
||||
return JSON.parse(decompressed.toString('utf8'));
|
||||
// Handle both compressed (.gz) and uncompressed (.json) files
|
||||
if (storagePath.endsWith('.gz')) {
|
||||
const decompressed = await gunzip(fileBuffer);
|
||||
return JSON.parse(decompressed.toString('utf8'));
|
||||
} else {
|
||||
return JSON.parse(fileBuffer.toString('utf8'));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -490,12 +494,13 @@ function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): strin
|
||||
const day = String(timestamp.getDate()).padStart(2, '0');
|
||||
const ts = timestamp.getTime();
|
||||
|
||||
const relativePath = `payloads/discovery/${year}/${month}/${day}/state_${stateCode.toLowerCase()}_${ts}.json.gz`;
|
||||
// Compression disabled - store as plain JSON
|
||||
const relativePath = `payloads/discovery/${year}/${month}/${day}/state_${stateCode.toLowerCase()}_${ts}.json`;
|
||||
|
||||
if (useMinIO) {
|
||||
return relativePath;
|
||||
} else {
|
||||
return path.join(PAYLOAD_BASE_PATH, 'discovery', String(year), month, day, `state_${stateCode.toLowerCase()}_${ts}.json.gz`);
|
||||
return path.join(PAYLOAD_BASE_PATH, 'discovery', String(year), month, day, `state_${stateCode.toLowerCase()}_${ts}.json`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -517,28 +522,26 @@ export async function saveDiscoveryPayload(
|
||||
const timestamp = new Date();
|
||||
const storagePath = generateDiscoveryStoragePath(stateCode, timestamp);
|
||||
|
||||
// Serialize and compress
|
||||
const jsonStr = JSON.stringify(payload);
|
||||
// Serialize as plain JSON (compression disabled for easier debugging)
|
||||
const jsonStr = JSON.stringify(payload, null, 2);
|
||||
const rawSize = Buffer.byteLength(jsonStr, 'utf8');
|
||||
const compressed = await gzip(Buffer.from(jsonStr, 'utf8'));
|
||||
const compressedSize = compressed.length;
|
||||
const checksum = calculateChecksum(compressed);
|
||||
const jsonBuffer = Buffer.from(jsonStr, 'utf8');
|
||||
const checksum = calculateChecksum(jsonBuffer);
|
||||
|
||||
// Write to storage backend
|
||||
if (useMinIO) {
|
||||
// Upload to MinIO
|
||||
const client = getMinioClient();
|
||||
await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, {
|
||||
'Content-Type': 'application/gzip',
|
||||
'Content-Encoding': 'gzip',
|
||||
await client.putObject(MINIO_BUCKET, storagePath, jsonBuffer, rawSize, {
|
||||
'Content-Type': 'application/json',
|
||||
});
|
||||
} else {
|
||||
// Write to local filesystem
|
||||
await ensureDir(storagePath);
|
||||
await fs.promises.writeFile(storagePath, compressed);
|
||||
await fs.promises.writeFile(storagePath, jsonBuffer);
|
||||
}
|
||||
|
||||
// Record metadata in DB
|
||||
// Record metadata in DB (size_bytes = size_bytes_raw since no compression)
|
||||
const result = await pool.query(`
|
||||
INSERT INTO raw_crawl_payloads (
|
||||
payload_type,
|
||||
@@ -556,19 +559,19 @@ export async function saveDiscoveryPayload(
|
||||
stateCode.toUpperCase(),
|
||||
storagePath,
|
||||
storeCount,
|
||||
compressedSize,
|
||||
rawSize,
|
||||
rawSize,
|
||||
timestamp,
|
||||
checksum
|
||||
]);
|
||||
|
||||
const backend = useMinIO ? 'MinIO' : 'local';
|
||||
console.log(`[PayloadStorage] Saved discovery payload to ${backend} for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`);
|
||||
console.log(`[PayloadStorage] Saved discovery payload to ${backend} for ${stateCode}: ${storagePath} (${storeCount} stores, ${(rawSize / 1024).toFixed(1)}KB)`);
|
||||
|
||||
return {
|
||||
id: result.rows[0].id,
|
||||
storagePath,
|
||||
sizeBytes: compressedSize,
|
||||
sizeBytes: rawSize,
|
||||
sizeBytesRaw: rawSize,
|
||||
checksum
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user