import { ItemPipeline, Product } from './types'; import { logger } from '../services/logger'; import { pool } from '../db/pool'; import { uploadImageFromUrl } from '../utils/minio'; import { normalizeProductName, normalizeBrandName } from '../utils/product-normalizer'; /** * Validation Pipeline - ensures data quality */ export class ValidationPipeline implements ItemPipeline { name = 'ValidationPipeline'; priority = 100; async process(item: Product, spider: string): Promise { // Required fields if (!item.name || item.name.trim().length < 2) { logger.warn('pipeline', `Dropping product: invalid name`); return null; } if (!item.dutchieUrl) { logger.warn('pipeline', `Dropping product ${item.name}: no URL`); return null; } // Validate numeric fields if (item.price !== undefined && (item.price < 0 || item.price > 10000)) { logger.warn('pipeline', `Invalid price for ${item.name}: ${item.price}`); item.price = undefined; } if (item.thcPercentage !== undefined && (item.thcPercentage < 0 || item.thcPercentage > 100)) { logger.warn('pipeline', `Invalid THC for ${item.name}: ${item.thcPercentage}`); item.thcPercentage = undefined; } if (item.cbdPercentage !== undefined && (item.cbdPercentage < 0 || item.cbdPercentage > 100)) { logger.warn('pipeline', `Invalid CBD for ${item.name}: ${item.cbdPercentage}`); item.cbdPercentage = undefined; } return item; } } /** * Sanitization Pipeline - cleans and normalizes data */ export class SanitizationPipeline implements ItemPipeline { name = 'SanitizationPipeline'; priority = 90; async process(item: Product, spider: string): Promise { // Truncate long strings if (item.name) { item.name = item.name.substring(0, 500).trim(); } if (item.description) { item.description = item.description.substring(0, 5000).trim(); } if (item.brand) { item.brand = item.brand.substring(0, 255).trim(); } if (item.weight) { item.weight = item.weight.substring(0, 100).trim(); } // Normalize strain type if (item.strainType) { const normalized = item.strainType.toLowerCase(); if (normalized.includes('indica')) { item.strainType = 'Indica'; } else if (normalized.includes('sativa')) { item.strainType = 'Sativa'; } else if (normalized.includes('hybrid')) { item.strainType = 'Hybrid'; } else { item.strainType = undefined; } } // Clean up metadata if (item.metadata) { // Remove empty arrays Object.keys(item.metadata).forEach(key => { if (Array.isArray(item.metadata[key]) && item.metadata[key].length === 0) { delete item.metadata[key]; } }); } return item; } } /** * Deduplication Pipeline - prevents duplicate items */ export class DeduplicationPipeline implements ItemPipeline { name = 'DeduplicationPipeline'; priority = 80; private seen: Set = new Set(); async process(item: Product, spider: string): Promise { const fingerprint = `${item.dutchieProductId}`; if (this.seen.has(fingerprint)) { logger.debug('pipeline', `Duplicate product detected: ${item.name}`); return null; } this.seen.add(fingerprint); return item; } clear(): void { this.seen.clear(); } } /** * Image Processing Pipeline - handles image downloads */ export class ImagePipeline implements ItemPipeline { name = 'ImagePipeline'; priority = 70; private extractImageId(url: string): string | null { try { const match = url.match(/images\.dutchie\.com\/([a-f0-9]+)/i); return match ? match[1] : null; } catch (e) { return null; } } private getFullSizeImageUrl(imageUrl: string): string { const imageId = this.extractImageId(imageUrl); if (!imageId) return imageUrl; return `https://images.dutchie.com/${imageId}?auto=format&fit=max&q=95&w=2000&h=2000`; } async process(item: Product, spider: string): Promise { if (item.imageUrl) { // Convert to full-size URL item.imageUrl = this.getFullSizeImageUrl(item.imageUrl); } return item; } } /** * Generate a URL-safe slug from a product name */ function generateSlug(name: string): string { return name .toLowerCase() .replace(/[^a-z0-9]+/g, '-') .replace(/^-+|-+$/g, '') .substring(0, 400); } /** * Database Pipeline - saves items to database with improved matching * * MATCHING PRIORITY: * 1. external_id (dutchie_product_id) - exact match * 2. normalized name + brand + category - strong match * 3. normalized name + category - weak match (same product, different/missing brand) * * ALWAYS creates a snapshot after upsert for historical tracking. */ export class DatabasePipeline implements ItemPipeline { name = 'DatabasePipeline'; priority = 10; // Low priority - runs last private crawlId: string | null = null; setCrawlId(id: string): void { this.crawlId = id; } async process(item: Product, spider: string): Promise { const client = await pool.connect(); try { // Extract store and category from metadata (set by spider) const storeId = (item as any).storeId; const categoryId = (item as any).categoryId; const dispensaryId = (item as any).dispensaryId; const categoryName = (item as any).categoryName; // Generate normalized values for matching const nameNormalized = normalizeProductName(item.name); const brandNormalized = normalizeBrandName(item.brand); const slug = generateSlug(item.name); const externalId = item.dutchieProductId || null; if (!storeId || !categoryId) { logger.error('pipeline', `Missing storeId or categoryId for ${item.name}`); return null; } let productId: number | null = null; let localImagePath: string | null = null; let isNewProduct = false; // STEP 1: Try to match by external_id (most reliable) if (externalId) { const extMatch = await client.query(` SELECT id, image_url, local_image_path FROM products WHERE store_id = $1 AND (external_id = $2 OR dutchie_product_id = $2) `, [storeId, externalId]); if (extMatch.rows.length > 0) { productId = extMatch.rows[0].id; localImagePath = extMatch.rows[0].local_image_path; logger.debug('pipeline', `Matched by external_id: ${item.name}`); } } // STEP 2: Try to match by normalized name + brand + category if (!productId) { const normMatch = await client.query(` SELECT id, image_url, local_image_path FROM products WHERE store_id = $1 AND name_normalized = $2 AND brand_normalized = $3 AND category_id = $4 `, [storeId, nameNormalized, brandNormalized, categoryId]); if (normMatch.rows.length > 0) { productId = normMatch.rows[0].id; localImagePath = normMatch.rows[0].local_image_path; logger.debug('pipeline', `Matched by normalized name+brand+category: ${item.name}`); } } // STEP 3: Fallback to normalized name + category only (weaker match) if (!productId) { const weakMatch = await client.query(` SELECT id, image_url, local_image_path FROM products WHERE store_id = $1 AND name_normalized = $2 AND category_id = $3 LIMIT 1 `, [storeId, nameNormalized, categoryId]); if (weakMatch.rows.length === 1) { productId = weakMatch.rows[0].id; localImagePath = weakMatch.rows[0].local_image_path; logger.debug('pipeline', `Matched by normalized name+category: ${item.name}`); } } // STEP 4: Final fallback - exact name match (legacy compatibility) if (!productId) { const exactMatch = await client.query(` SELECT id, image_url, local_image_path FROM products WHERE store_id = $1 AND name = $2 AND category_id = $3 `, [storeId, item.name, categoryId]); if (exactMatch.rows.length > 0) { productId = exactMatch.rows[0].id; localImagePath = exactMatch.rows[0].local_image_path; logger.debug('pipeline', `Matched by exact name: ${item.name}`); } } // UPDATE or INSERT if (productId) { // Update existing product await client.query(` UPDATE products SET name = $1, description = $2, price = $3, strain_type = $4, thc_percentage = $5, cbd_percentage = $6, brand = $7, weight = $8, image_url = COALESCE($9, image_url), dutchie_url = $10, in_stock = true, metadata = $11, last_seen_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP, dispensary_id = $13, slug = COALESCE(slug, $14), name_normalized = $15, brand_normalized = $16, external_id = COALESCE(external_id, $17), source_platform = COALESCE(source_platform, 'dutchie') WHERE id = $12 `, [ item.name, item.description, item.price, item.strainType, item.thcPercentage, item.cbdPercentage, item.brand, item.weight, item.imageUrl, item.dutchieUrl, JSON.stringify(item.metadata || {}), productId, dispensaryId, slug, nameNormalized, brandNormalized, externalId ]); logger.debug('pipeline', `Updated product: ${item.name}`); } else { // Insert new product isNewProduct = true; const insertResult = await client.query(` INSERT INTO products ( store_id, category_id, dispensary_id, dutchie_product_id, external_id, slug, name, name_normalized, description, price, strain_type, thc_percentage, cbd_percentage, brand, brand_normalized, weight, image_url, dutchie_url, in_stock, metadata, source_platform ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, true, $19, 'dutchie') RETURNING id `, [ storeId, categoryId, dispensaryId, externalId, externalId, slug, item.name, nameNormalized, item.description, item.price, item.strainType, item.thcPercentage, item.cbdPercentage, item.brand, brandNormalized, item.weight, item.imageUrl, item.dutchieUrl, JSON.stringify(item.metadata || {}) ]); productId = insertResult.rows[0].id; logger.debug('pipeline', `Inserted NEW product: ${item.name}`); } // ALWAYS create a snapshot for historical tracking await this.createSnapshot(client, { productId: productId!, dispensaryId, externalId, slug, item, categoryName }); // Download image if needed (only for new products or missing local image) if (item.imageUrl && !localImagePath && productId) { try { const storeResult = await client.query( 'SELECT slug FROM stores WHERE id = $1', [storeId] ); const storeSlug = storeResult.rows[0]?.slug || undefined; const imageSizes = await uploadImageFromUrl(item.imageUrl, productId!, storeSlug); localImagePath = imageSizes.thumbnail; await client.query(` UPDATE products SET local_image_path = $1 WHERE id = $2 `, [imageSizes.thumbnail, productId]); logger.debug('pipeline', `Downloaded image for: ${item.name}`); } catch (error) { logger.error('pipeline', `Failed to download image for ${item.name}: ${error}`); } } // Attach metadata for stats tracking (item as any).isNewProduct = isNewProduct; (item as any).productId = productId; return item; } catch (error) { logger.error('pipeline', `Failed to save product ${item.name}: ${error}`); return null; } finally { client.release(); } } /** * Create a snapshot record for historical tracking */ private async createSnapshot( client: any, params: { productId: number; dispensaryId: number | null; externalId: string | null; slug: string; item: Product; categoryName?: string; } ): Promise { try { // Only create snapshots if the table exists (graceful degradation) const tableExists = await client.query(` SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'product_snapshots' ) `); if (!tableExists.rows[0].exists) { return; // Snapshot table not yet created } const crawlId = this.crawlId || crypto.randomUUID(); const { productId, dispensaryId, externalId, slug, item, categoryName } = params; await client.query(` INSERT INTO product_snapshots ( crawl_id, dispensary_id, external_product_id, product_slug, name, brand, category, price, original_price, sale_price, discount_type, discount_value, availability_status, stock_quantity, thc_percentage, cbd_percentage, strain_type, weight, variant, description, image_url, effects, terpenes, captured_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, NOW() ) `, [ crawlId, dispensaryId, externalId, slug, item.name, item.brand || null, categoryName || null, item.price || null, item.originalPrice || null, item.metadata?.salePrice || null, item.metadata?.discountType || null, item.metadata?.discountValue || null, 'in_stock', // availability_status - if we scraped it, it's in stock item.metadata?.stockQuantity || null, item.thcPercentage || null, item.cbdPercentage || null, item.strainType || null, item.weight || null, item.metadata?.variant || null, item.description || null, item.imageUrl || null, item.metadata?.effects || null, item.metadata?.terpenes || null ]); } catch (error) { // Don't fail the whole pipeline if snapshot creation fails logger.warn('pipeline', `Failed to create snapshot for ${params.item.name}: ${error}`); } } } /** * Stats Pipeline - tracks statistics */ export class StatsPipeline implements ItemPipeline { name = 'StatsPipeline'; priority = 50; private stats = { total: 0, withImages: 0, withThc: 0, withCbd: 0, withDescription: 0 }; async process(item: Product, spider: string): Promise { this.stats.total++; if (item.imageUrl) this.stats.withImages++; if (item.thcPercentage) this.stats.withThc++; if (item.cbdPercentage) this.stats.withCbd++; if (item.description) this.stats.withDescription++; return item; } getStats() { return { ...this.stats }; } clear(): void { this.stats = { total: 0, withImages: 0, withThc: 0, withCbd: 0, withDescription: 0 }; } } /** * Pipeline Engine - orchestrates all pipelines */ export class PipelineEngine { private pipelines: ItemPipeline[] = []; use(pipeline: ItemPipeline): void { this.pipelines.push(pipeline); // Sort by priority (higher first) this.pipelines.sort((a, b) => b.priority - a.priority); } async processItem(item: any, spider: string): Promise { let current = item; for (const pipeline of this.pipelines) { try { current = await pipeline.process(current, spider); if (!current) { // Item was filtered out logger.debug('pipeline', `Item filtered by ${pipeline.name}`); return null; } } catch (error) { logger.error('pipeline', `Error in ${pipeline.name}: ${error}`); // Continue with other pipelines } } return current; } getPipeline>(name: string): T | undefined { return this.pipelines.find(p => p.name === name) as T; } }