301 lines
10 KiB
JavaScript
301 lines
10 KiB
JavaScript
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.PipelineEngine = exports.StatsPipeline = exports.DatabasePipeline = exports.ImagePipeline = exports.DeduplicationPipeline = exports.SanitizationPipeline = exports.ValidationPipeline = void 0;
|
|
const logger_1 = require("../services/logger");
|
|
const migrate_1 = require("../db/migrate");
|
|
const minio_1 = require("../utils/minio");
|
|
/**
|
|
* Validation Pipeline - ensures data quality
|
|
*/
|
|
class ValidationPipeline {
|
|
name = 'ValidationPipeline';
|
|
priority = 100;
|
|
async process(item, spider) {
|
|
// Required fields
|
|
if (!item.name || item.name.trim().length < 2) {
|
|
logger_1.logger.warn('pipeline', `Dropping product: invalid name`);
|
|
return null;
|
|
}
|
|
if (!item.dutchieUrl) {
|
|
logger_1.logger.warn('pipeline', `Dropping product ${item.name}: no URL`);
|
|
return null;
|
|
}
|
|
// Validate numeric fields
|
|
if (item.price !== undefined && (item.price < 0 || item.price > 10000)) {
|
|
logger_1.logger.warn('pipeline', `Invalid price for ${item.name}: ${item.price}`);
|
|
item.price = undefined;
|
|
}
|
|
if (item.thcPercentage !== undefined && (item.thcPercentage < 0 || item.thcPercentage > 100)) {
|
|
logger_1.logger.warn('pipeline', `Invalid THC for ${item.name}: ${item.thcPercentage}`);
|
|
item.thcPercentage = undefined;
|
|
}
|
|
if (item.cbdPercentage !== undefined && (item.cbdPercentage < 0 || item.cbdPercentage > 100)) {
|
|
logger_1.logger.warn('pipeline', `Invalid CBD for ${item.name}: ${item.cbdPercentage}`);
|
|
item.cbdPercentage = undefined;
|
|
}
|
|
return item;
|
|
}
|
|
}
|
|
exports.ValidationPipeline = ValidationPipeline;
|
|
/**
|
|
* Sanitization Pipeline - cleans and normalizes data
|
|
*/
|
|
class SanitizationPipeline {
|
|
name = 'SanitizationPipeline';
|
|
priority = 90;
|
|
async process(item, spider) {
|
|
// Truncate long strings
|
|
if (item.name) {
|
|
item.name = item.name.substring(0, 500).trim();
|
|
}
|
|
if (item.description) {
|
|
item.description = item.description.substring(0, 5000).trim();
|
|
}
|
|
if (item.brand) {
|
|
item.brand = item.brand.substring(0, 255).trim();
|
|
}
|
|
if (item.weight) {
|
|
item.weight = item.weight.substring(0, 100).trim();
|
|
}
|
|
// Normalize strain type
|
|
if (item.strainType) {
|
|
const normalized = item.strainType.toLowerCase();
|
|
if (normalized.includes('indica')) {
|
|
item.strainType = 'Indica';
|
|
}
|
|
else if (normalized.includes('sativa')) {
|
|
item.strainType = 'Sativa';
|
|
}
|
|
else if (normalized.includes('hybrid')) {
|
|
item.strainType = 'Hybrid';
|
|
}
|
|
else {
|
|
item.strainType = undefined;
|
|
}
|
|
}
|
|
// Clean up metadata
|
|
if (item.metadata) {
|
|
// Remove empty arrays
|
|
Object.keys(item.metadata).forEach(key => {
|
|
if (Array.isArray(item.metadata[key]) && item.metadata[key].length === 0) {
|
|
delete item.metadata[key];
|
|
}
|
|
});
|
|
}
|
|
return item;
|
|
}
|
|
}
|
|
exports.SanitizationPipeline = SanitizationPipeline;
|
|
/**
|
|
* Deduplication Pipeline - prevents duplicate items
|
|
*/
|
|
class DeduplicationPipeline {
|
|
name = 'DeduplicationPipeline';
|
|
priority = 80;
|
|
seen = new Set();
|
|
async process(item, spider) {
|
|
const fingerprint = `${item.dutchieProductId}`;
|
|
if (this.seen.has(fingerprint)) {
|
|
logger_1.logger.debug('pipeline', `Duplicate product detected: ${item.name}`);
|
|
return null;
|
|
}
|
|
this.seen.add(fingerprint);
|
|
return item;
|
|
}
|
|
clear() {
|
|
this.seen.clear();
|
|
}
|
|
}
|
|
exports.DeduplicationPipeline = DeduplicationPipeline;
|
|
/**
|
|
* Image Processing Pipeline - handles image downloads
|
|
*/
|
|
class ImagePipeline {
|
|
name = 'ImagePipeline';
|
|
priority = 70;
|
|
extractImageId(url) {
|
|
try {
|
|
const match = url.match(/images\.dutchie\.com\/([a-f0-9]+)/i);
|
|
return match ? match[1] : null;
|
|
}
|
|
catch (e) {
|
|
return null;
|
|
}
|
|
}
|
|
getFullSizeImageUrl(imageUrl) {
|
|
const imageId = this.extractImageId(imageUrl);
|
|
if (!imageId)
|
|
return imageUrl;
|
|
return `https://images.dutchie.com/${imageId}?auto=format&fit=max&q=95&w=2000&h=2000`;
|
|
}
|
|
async process(item, spider) {
|
|
if (item.imageUrl) {
|
|
// Convert to full-size URL
|
|
item.imageUrl = this.getFullSizeImageUrl(item.imageUrl);
|
|
}
|
|
return item;
|
|
}
|
|
}
|
|
exports.ImagePipeline = ImagePipeline;
|
|
/**
|
|
* Database Pipeline - saves items to database
|
|
*/
|
|
class DatabasePipeline {
|
|
name = 'DatabasePipeline';
|
|
priority = 10; // Low priority - runs last
|
|
async process(item, spider) {
|
|
const client = await migrate_1.pool.connect();
|
|
try {
|
|
// Extract store and category from metadata (set by spider)
|
|
const storeId = item.storeId;
|
|
const categoryId = item.categoryId;
|
|
if (!storeId || !categoryId) {
|
|
logger_1.logger.error('pipeline', `Missing storeId or categoryId for ${item.name}`);
|
|
return null;
|
|
}
|
|
// Check if product exists
|
|
const existingResult = await client.query(`
|
|
SELECT id, image_url, local_image_path
|
|
FROM products
|
|
WHERE store_id = $1 AND name = $2 AND category_id = $3
|
|
`, [storeId, item.name, categoryId]);
|
|
let localImagePath = null;
|
|
let productId;
|
|
if (existingResult.rows.length > 0) {
|
|
// Update existing product
|
|
productId = existingResult.rows[0].id;
|
|
localImagePath = existingResult.rows[0].local_image_path;
|
|
await client.query(`
|
|
UPDATE products
|
|
SET name = $1, description = $2, price = $3,
|
|
strain_type = $4, thc_percentage = $5, cbd_percentage = $6,
|
|
brand = $7, weight = $8, image_url = $9, dutchie_url = $10,
|
|
in_stock = true, metadata = $11, last_seen_at = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = $12
|
|
`, [
|
|
item.name, item.description, item.price,
|
|
item.strainType, item.thcPercentage, item.cbdPercentage,
|
|
item.brand, item.weight, item.imageUrl, item.dutchieUrl,
|
|
JSON.stringify(item.metadata || {}), productId
|
|
]);
|
|
logger_1.logger.debug('pipeline', `Updated product: ${item.name}`);
|
|
}
|
|
else {
|
|
// Insert new product
|
|
const insertResult = await client.query(`
|
|
INSERT INTO products (
|
|
store_id, category_id, dutchie_product_id, name, description,
|
|
price, strain_type, thc_percentage, cbd_percentage,
|
|
brand, weight, image_url, dutchie_url, in_stock, metadata
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, true, $14)
|
|
RETURNING id
|
|
`, [
|
|
storeId, categoryId, item.dutchieProductId, item.name, item.description,
|
|
item.price, item.strainType, item.thcPercentage, item.cbdPercentage,
|
|
item.brand, item.weight, item.imageUrl, item.dutchieUrl,
|
|
JSON.stringify(item.metadata || {})
|
|
]);
|
|
productId = insertResult.rows[0].id;
|
|
logger_1.logger.debug('pipeline', `Inserted new product: ${item.name}`);
|
|
}
|
|
// Download image if needed
|
|
if (item.imageUrl && !localImagePath) {
|
|
try {
|
|
localImagePath = await (0, minio_1.uploadImageFromUrl)(item.imageUrl, productId);
|
|
await client.query(`
|
|
UPDATE products
|
|
SET local_image_path = $1
|
|
WHERE id = $2
|
|
`, [localImagePath, productId]);
|
|
logger_1.logger.debug('pipeline', `Downloaded image for: ${item.name}`);
|
|
}
|
|
catch (error) {
|
|
logger_1.logger.error('pipeline', `Failed to download image for ${item.name}: ${error}`);
|
|
}
|
|
}
|
|
return item;
|
|
}
|
|
catch (error) {
|
|
logger_1.logger.error('pipeline', `Failed to save product ${item.name}: ${error}`);
|
|
return null;
|
|
}
|
|
finally {
|
|
client.release();
|
|
}
|
|
}
|
|
}
|
|
exports.DatabasePipeline = DatabasePipeline;
|
|
/**
|
|
* Stats Pipeline - tracks statistics
|
|
*/
|
|
class StatsPipeline {
|
|
name = 'StatsPipeline';
|
|
priority = 50;
|
|
stats = {
|
|
total: 0,
|
|
withImages: 0,
|
|
withThc: 0,
|
|
withCbd: 0,
|
|
withDescription: 0
|
|
};
|
|
async process(item, spider) {
|
|
this.stats.total++;
|
|
if (item.imageUrl)
|
|
this.stats.withImages++;
|
|
if (item.thcPercentage)
|
|
this.stats.withThc++;
|
|
if (item.cbdPercentage)
|
|
this.stats.withCbd++;
|
|
if (item.description)
|
|
this.stats.withDescription++;
|
|
return item;
|
|
}
|
|
getStats() {
|
|
return { ...this.stats };
|
|
}
|
|
clear() {
|
|
this.stats = {
|
|
total: 0,
|
|
withImages: 0,
|
|
withThc: 0,
|
|
withCbd: 0,
|
|
withDescription: 0
|
|
};
|
|
}
|
|
}
|
|
exports.StatsPipeline = StatsPipeline;
|
|
/**
|
|
* Pipeline Engine - orchestrates all pipelines
|
|
*/
|
|
class PipelineEngine {
|
|
pipelines = [];
|
|
use(pipeline) {
|
|
this.pipelines.push(pipeline);
|
|
// Sort by priority (higher first)
|
|
this.pipelines.sort((a, b) => b.priority - a.priority);
|
|
}
|
|
async processItem(item, spider) {
|
|
let current = item;
|
|
for (const pipeline of this.pipelines) {
|
|
try {
|
|
current = await pipeline.process(current, spider);
|
|
if (!current) {
|
|
// Item was filtered out
|
|
logger_1.logger.debug('pipeline', `Item filtered by ${pipeline.name}`);
|
|
return null;
|
|
}
|
|
}
|
|
catch (error) {
|
|
logger_1.logger.error('pipeline', `Error in ${pipeline.name}: ${error}`);
|
|
// Continue with other pipelines
|
|
}
|
|
}
|
|
return current;
|
|
}
|
|
getPipeline(name) {
|
|
return this.pipelines.find(p => p.name === name);
|
|
}
|
|
}
|
|
exports.PipelineEngine = PipelineEngine;
|