Files
cannaiq/backend/src/scraper-v2/pipelines.ts
Kelly 199b6a8a23 Remove incorrect migration 029, add snapshot architecture, improve scraper
- Delete migration 029 that was incorrectly creating duplicate dispensaries
- Add migration 028 for snapshot architecture
- Improve downloader with proxy/UA rotation
- Update scraper monitor and tools pages
- Various scraper improvements

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-01 08:52:54 -07:00

354 lines
10 KiB
TypeScript

import { ItemPipeline, Product } from './types';
import { logger } from '../services/logger';
import { pool } from '../db/migrate';
import { uploadImageFromUrl } from '../utils/minio';
/**
* Validation Pipeline - ensures data quality
*/
export class ValidationPipeline implements ItemPipeline<Product> {
name = 'ValidationPipeline';
priority = 100;
async process(item: Product, spider: string): Promise<Product | null> {
// Required fields
if (!item.name || item.name.trim().length < 2) {
logger.warn('pipeline', `Dropping product: invalid name`);
return null;
}
if (!item.dutchieUrl) {
logger.warn('pipeline', `Dropping product ${item.name}: no URL`);
return null;
}
// Validate numeric fields
if (item.price !== undefined && (item.price < 0 || item.price > 10000)) {
logger.warn('pipeline', `Invalid price for ${item.name}: ${item.price}`);
item.price = undefined;
}
if (item.thcPercentage !== undefined && (item.thcPercentage < 0 || item.thcPercentage > 100)) {
logger.warn('pipeline', `Invalid THC for ${item.name}: ${item.thcPercentage}`);
item.thcPercentage = undefined;
}
if (item.cbdPercentage !== undefined && (item.cbdPercentage < 0 || item.cbdPercentage > 100)) {
logger.warn('pipeline', `Invalid CBD for ${item.name}: ${item.cbdPercentage}`);
item.cbdPercentage = undefined;
}
return item;
}
}
/**
* Sanitization Pipeline - cleans and normalizes data
*/
export class SanitizationPipeline implements ItemPipeline<Product> {
name = 'SanitizationPipeline';
priority = 90;
async process(item: Product, spider: string): Promise<Product> {
// Truncate long strings
if (item.name) {
item.name = item.name.substring(0, 500).trim();
}
if (item.description) {
item.description = item.description.substring(0, 5000).trim();
}
if (item.brand) {
item.brand = item.brand.substring(0, 255).trim();
}
if (item.weight) {
item.weight = item.weight.substring(0, 100).trim();
}
// Normalize strain type
if (item.strainType) {
const normalized = item.strainType.toLowerCase();
if (normalized.includes('indica')) {
item.strainType = 'Indica';
} else if (normalized.includes('sativa')) {
item.strainType = 'Sativa';
} else if (normalized.includes('hybrid')) {
item.strainType = 'Hybrid';
} else {
item.strainType = undefined;
}
}
// Clean up metadata
if (item.metadata) {
// Remove empty arrays
Object.keys(item.metadata).forEach(key => {
if (Array.isArray(item.metadata[key]) && item.metadata[key].length === 0) {
delete item.metadata[key];
}
});
}
return item;
}
}
/**
* Deduplication Pipeline - prevents duplicate items
*/
export class DeduplicationPipeline implements ItemPipeline<Product> {
name = 'DeduplicationPipeline';
priority = 80;
private seen: Set<string> = new Set();
async process(item: Product, spider: string): Promise<Product | null> {
const fingerprint = `${item.dutchieProductId}`;
if (this.seen.has(fingerprint)) {
logger.debug('pipeline', `Duplicate product detected: ${item.name}`);
return null;
}
this.seen.add(fingerprint);
return item;
}
clear(): void {
this.seen.clear();
}
}
/**
* Image Processing Pipeline - handles image downloads
*/
export class ImagePipeline implements ItemPipeline<Product> {
name = 'ImagePipeline';
priority = 70;
private extractImageId(url: string): string | null {
try {
const match = url.match(/images\.dutchie\.com\/([a-f0-9]+)/i);
return match ? match[1] : null;
} catch (e) {
return null;
}
}
private getFullSizeImageUrl(imageUrl: string): string {
const imageId = this.extractImageId(imageUrl);
if (!imageId) return imageUrl;
return `https://images.dutchie.com/${imageId}?auto=format&fit=max&q=95&w=2000&h=2000`;
}
async process(item: Product, spider: string): Promise<Product> {
if (item.imageUrl) {
// Convert to full-size URL
item.imageUrl = this.getFullSizeImageUrl(item.imageUrl);
}
return item;
}
}
/**
* Generate a URL-safe slug from a product name
*/
function generateSlug(name: string): string {
return name
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-+|-+$/g, '')
.substring(0, 400);
}
/**
* Database Pipeline - saves items to database
*/
export class DatabasePipeline implements ItemPipeline<Product> {
name = 'DatabasePipeline';
priority = 10; // Low priority - runs last
async process(item: Product, spider: string): Promise<Product | null> {
const client = await pool.connect();
try {
// Extract store and category from metadata (set by spider)
const storeId = (item as any).storeId;
const categoryId = (item as any).categoryId;
const dispensaryId = (item as any).dispensaryId;
// Generate slug from name
const slug = generateSlug(item.name);
if (!storeId || !categoryId) {
logger.error('pipeline', `Missing storeId or categoryId for ${item.name}`);
return null;
}
// Check if product exists
const existingResult = await client.query(`
SELECT id, image_url, local_image_path
FROM products
WHERE store_id = $1 AND name = $2 AND category_id = $3
`, [storeId, item.name, categoryId]);
let localImagePath = null;
let productId: number;
if (existingResult.rows.length > 0) {
// Update existing product
productId = existingResult.rows[0].id;
localImagePath = existingResult.rows[0].local_image_path;
await client.query(`
UPDATE products
SET name = $1, description = $2, price = $3,
strain_type = $4, thc_percentage = $5, cbd_percentage = $6,
brand = $7, weight = $8, image_url = $9, dutchie_url = $10,
in_stock = true, metadata = $11, last_seen_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP, dispensary_id = $13, slug = COALESCE(slug, $14)
WHERE id = $12
`, [
item.name, item.description, item.price,
item.strainType, item.thcPercentage, item.cbdPercentage,
item.brand, item.weight, item.imageUrl, item.dutchieUrl,
JSON.stringify(item.metadata || {}), productId, dispensaryId, slug
]);
logger.debug('pipeline', `Updated product: ${item.name}`);
} else {
// Insert new product
const insertResult = await client.query(`
INSERT INTO products (
store_id, category_id, dispensary_id, dutchie_product_id, slug, name, description,
price, strain_type, thc_percentage, cbd_percentage,
brand, weight, image_url, dutchie_url, in_stock, metadata
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, true, $16)
RETURNING id
`, [
storeId, categoryId, dispensaryId, item.dutchieProductId, slug, item.name, item.description,
item.price, item.strainType, item.thcPercentage, item.cbdPercentage,
item.brand, item.weight, item.imageUrl, item.dutchieUrl,
JSON.stringify(item.metadata || {})
]);
productId = insertResult.rows[0].id;
logger.debug('pipeline', `Inserted new product: ${item.name}`);
}
// Download image if needed
if (item.imageUrl && !localImagePath) {
try {
// Get store slug for organized image storage
const storeResult = await client.query(
'SELECT slug FROM stores WHERE id = $1',
[storeId]
);
const storeSlug = storeResult.rows[0]?.slug || undefined;
const imageSizes = await uploadImageFromUrl(item.imageUrl, productId, storeSlug);
// Use thumbnail path for local_image_path
localImagePath = imageSizes.thumbnail;
await client.query(`
UPDATE products SET local_image_path = $1 WHERE id = $2
`, [imageSizes.thumbnail, productId]);
logger.debug('pipeline', `Downloaded image for: ${item.name}`);
} catch (error) {
logger.error('pipeline', `Failed to download image for ${item.name}: ${error}`);
}
}
return item;
} catch (error) {
logger.error('pipeline', `Failed to save product ${item.name}: ${error}`);
return null;
} finally {
client.release();
}
}
}
/**
* Stats Pipeline - tracks statistics
*/
export class StatsPipeline implements ItemPipeline<Product> {
name = 'StatsPipeline';
priority = 50;
private stats = {
total: 0,
withImages: 0,
withThc: 0,
withCbd: 0,
withDescription: 0
};
async process(item: Product, spider: string): Promise<Product> {
this.stats.total++;
if (item.imageUrl) this.stats.withImages++;
if (item.thcPercentage) this.stats.withThc++;
if (item.cbdPercentage) this.stats.withCbd++;
if (item.description) this.stats.withDescription++;
return item;
}
getStats() {
return { ...this.stats };
}
clear(): void {
this.stats = {
total: 0,
withImages: 0,
withThc: 0,
withCbd: 0,
withDescription: 0
};
}
}
/**
* Pipeline Engine - orchestrates all pipelines
*/
export class PipelineEngine {
private pipelines: ItemPipeline<any>[] = [];
use(pipeline: ItemPipeline<any>): void {
this.pipelines.push(pipeline);
// Sort by priority (higher first)
this.pipelines.sort((a, b) => b.priority - a.priority);
}
async processItem(item: any, spider: string): Promise<any | null> {
let current = item;
for (const pipeline of this.pipelines) {
try {
current = await pipeline.process(current, spider);
if (!current) {
// Item was filtered out
logger.debug('pipeline', `Item filtered by ${pipeline.name}`);
return null;
}
} catch (error) {
logger.error('pipeline', `Error in ${pipeline.name}: ${error}`);
// Continue with other pipelines
}
}
return current;
}
getPipeline<T extends ItemPipeline<any>>(name: string): T | undefined {
return this.pipelines.find(p => p.name === name) as T;
}
}