Files
cannaiq/backend/src/scraper-v2/canonical-pipeline.ts
Kelly 2f483b3084 feat: SEO template library, discovery pipeline, and orchestrator enhancements
## SEO Template Library
- Add complete template library with 7 page types (state, city, category, brand, product, search, regeneration)
- Add Template Library tab in SEO Orchestrator with accordion-based editors
- Add template preview, validation, and variable injection engine
- Add API endpoints: /api/seo/templates, preview, validate, generate, regenerate

## Discovery Pipeline
- Add promotion.ts for discovery location validation and promotion
- Add discover-all-states.ts script for multi-state discovery
- Add promotion log migration (067)
- Enhance discovery routes and types

## Orchestrator & Admin
- Add crawl_enabled filter to stores page
- Add API permissions page
- Add job queue management
- Add price analytics routes
- Add markets and intelligence routes
- Enhance dashboard and worker monitoring

## Infrastructure
- Add migrations for worker definitions, SEO settings, field alignment
- Add canonical pipeline for scraper v2
- Update hydration and sync orchestrator
- Enhance multi-state query service

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-09 00:05:34 -07:00

354 lines
11 KiB
TypeScript

/**
* Canonical Database Pipeline
*
* Writes scraped products to the canonical tables:
* - store_products (current state)
* - store_product_snapshots (historical)
* - product_variants (per-weight pricing)
* - product_variant_snapshots (variant history)
*
* This replaces the legacy DatabasePipeline that wrote to `products` table.
*/
import { ItemPipeline, Product } from './types';
import { logger } from '../services/logger';
import { pool } from '../db/pool';
import { v4 as uuidv4 } from 'uuid';
interface VariantData {
option: string;
priceRec: number | null;
priceMed: number | null;
priceRecSpecial: number | null;
priceMedSpecial: number | null;
quantity: number | null;
inStock: boolean;
isOnSpecial: boolean;
}
/**
* Parse weight string like "1g", "3.5g", "1/8oz" into value and unit
*/
function parseWeight(option: string): { value: number | null; unit: string | null } {
if (!option) return { value: null, unit: null };
// Match patterns like "1g", "3.5g", "1/8oz", "100mg"
const match = option.match(/^([\d.\/]+)\s*(g|oz|mg|ml|each|pk|ct)?$/i);
if (!match) return { value: null, unit: null };
let value: number | null = null;
const rawValue = match[1];
const unit = match[2]?.toLowerCase() || null;
// Handle fractions like "1/8"
if (rawValue.includes('/')) {
const [num, denom] = rawValue.split('/');
value = parseFloat(num) / parseFloat(denom);
} else {
value = parseFloat(rawValue);
}
if (isNaN(value)) value = null;
return { value, unit };
}
/**
* Canonical Database Pipeline - saves items to canonical tables
*
* TABLES:
* - store_products: Current product state per store
* - store_product_snapshots: Historical snapshot per crawl
* - product_variants: Current variant state (per-weight pricing)
* - product_variant_snapshots: Historical variant snapshots
*/
export class CanonicalDatabasePipeline implements ItemPipeline<Product> {
name = 'CanonicalDatabasePipeline';
priority = 10; // Low priority - runs last
private crawlRunId: number | null = null;
setCrawlRunId(id: number): void {
this.crawlRunId = id;
}
async process(item: Product, spider: string): Promise<Product | null> {
const client = await pool.connect();
try {
// Extract metadata set by spider
const dispensaryId = (item as any).dispensaryId;
const categoryName = (item as any).categoryName;
const variants: VariantData[] = (item as any).variants || [];
if (!dispensaryId) {
logger.error('pipeline', `Missing dispensaryId for ${item.name}`);
return null;
}
const externalProductId = item.dutchieProductId || null;
const provider = 'dutchie';
// Determine stock status
const isInStock = (item as any).inStock !== false;
const stockQuantity = (item as any).stockQuantity || null;
// Extract pricing
const priceRec = item.price || null;
const priceMed = (item as any).priceMed || null;
let storeProductId: number | null = null;
let isNewProduct = false;
// ============================================================
// UPSERT store_products
// ============================================================
const upsertResult = await client.query(`
INSERT INTO store_products (
dispensary_id, provider, provider_product_id,
name_raw, brand_name_raw, category_raw,
price_rec, price_med,
thc_percent, cbd_percent,
is_in_stock, stock_quantity,
image_url, source_url,
raw_data,
first_seen_at, last_seen_at,
created_at, updated_at
) VALUES (
$1, $2, $3,
$4, $5, $6,
$7, $8,
$9, $10,
$11, $12,
$13, $14,
$15,
NOW(), NOW(),
NOW(), NOW()
)
ON CONFLICT (dispensary_id, provider, provider_product_id)
DO UPDATE SET
name_raw = EXCLUDED.name_raw,
brand_name_raw = EXCLUDED.brand_name_raw,
category_raw = EXCLUDED.category_raw,
price_rec = EXCLUDED.price_rec,
price_med = EXCLUDED.price_med,
thc_percent = EXCLUDED.thc_percent,
cbd_percent = EXCLUDED.cbd_percent,
is_in_stock = EXCLUDED.is_in_stock,
stock_quantity = EXCLUDED.stock_quantity,
image_url = COALESCE(EXCLUDED.image_url, store_products.image_url),
source_url = EXCLUDED.source_url,
raw_data = EXCLUDED.raw_data,
last_seen_at = NOW(),
updated_at = NOW()
RETURNING id, (xmax = 0) as is_new
`, [
dispensaryId, provider, externalProductId,
item.name, item.brand || null, categoryName || null,
priceRec, priceMed,
item.thcPercentage || null, item.cbdPercentage || null,
isInStock, stockQuantity,
item.imageUrl || null, item.dutchieUrl || null,
JSON.stringify(item.metadata || {}),
]);
storeProductId = upsertResult.rows[0].id;
isNewProduct = upsertResult.rows[0].is_new;
logger.debug('pipeline', `${isNewProduct ? 'Inserted' : 'Updated'} canonical product: ${item.name} (ID: ${storeProductId})`);
// ============================================================
// INSERT store_product_snapshots
// ============================================================
await client.query(`
INSERT INTO store_product_snapshots (
store_product_id, dispensary_id, crawl_run_id,
price_rec, price_med,
is_in_stock, stock_quantity,
is_present_in_feed,
captured_at, created_at
) VALUES (
$1, $2, $3,
$4, $5,
$6, $7,
TRUE,
NOW(), NOW()
)
ON CONFLICT (store_product_id, crawl_run_id) WHERE crawl_run_id IS NOT NULL
DO UPDATE SET
price_rec = EXCLUDED.price_rec,
price_med = EXCLUDED.price_med,
is_in_stock = EXCLUDED.is_in_stock,
stock_quantity = EXCLUDED.stock_quantity
`, [
storeProductId, dispensaryId, this.crawlRunId,
priceRec, priceMed,
isInStock, stockQuantity,
]);
// ============================================================
// UPSERT product_variants (if variants exist)
// ============================================================
if (variants.length > 0) {
for (const variant of variants) {
const { value: weightValue, unit: weightUnit } = parseWeight(variant.option);
const variantResult = await client.query(`
INSERT INTO product_variants (
store_product_id, dispensary_id,
option,
price_rec, price_med, price_rec_special, price_med_special,
quantity, quantity_available, in_stock, is_on_special,
weight_value, weight_unit,
first_seen_at, last_seen_at,
created_at, updated_at
) VALUES (
$1, $2,
$3,
$4, $5, $6, $7,
$8, $8, $9, $10,
$11, $12,
NOW(), NOW(),
NOW(), NOW()
)
ON CONFLICT (store_product_id, option)
DO UPDATE SET
price_rec = EXCLUDED.price_rec,
price_med = EXCLUDED.price_med,
price_rec_special = EXCLUDED.price_rec_special,
price_med_special = EXCLUDED.price_med_special,
quantity = EXCLUDED.quantity,
quantity_available = EXCLUDED.quantity_available,
in_stock = EXCLUDED.in_stock,
is_on_special = EXCLUDED.is_on_special,
weight_value = EXCLUDED.weight_value,
weight_unit = EXCLUDED.weight_unit,
last_seen_at = NOW(),
last_price_change_at = CASE
WHEN product_variants.price_rec IS DISTINCT FROM EXCLUDED.price_rec
OR product_variants.price_rec_special IS DISTINCT FROM EXCLUDED.price_rec_special
THEN NOW()
ELSE product_variants.last_price_change_at
END,
last_stock_change_at = CASE
WHEN product_variants.in_stock IS DISTINCT FROM EXCLUDED.in_stock
THEN NOW()
ELSE product_variants.last_stock_change_at
END,
updated_at = NOW()
RETURNING id
`, [
storeProductId, dispensaryId,
variant.option,
variant.priceRec, variant.priceMed, variant.priceRecSpecial, variant.priceMedSpecial,
variant.quantity, variant.inStock, variant.isOnSpecial,
weightValue, weightUnit,
]);
const variantId = variantResult.rows[0].id;
// Insert variant snapshot
await client.query(`
INSERT INTO product_variant_snapshots (
product_variant_id, store_product_id, dispensary_id, crawl_run_id,
option,
price_rec, price_med, price_rec_special, price_med_special,
quantity, in_stock, is_on_special,
is_present_in_feed,
captured_at, created_at
) VALUES (
$1, $2, $3, $4,
$5,
$6, $7, $8, $9,
$10, $11, $12,
TRUE,
NOW(), NOW()
)
`, [
variantId, storeProductId, dispensaryId, this.crawlRunId,
variant.option,
variant.priceRec, variant.priceMed, variant.priceRecSpecial, variant.priceMedSpecial,
variant.quantity, variant.inStock, variant.isOnSpecial,
]);
}
logger.debug('pipeline', `Upserted ${variants.length} variants for ${item.name}`);
}
// Attach metadata for stats tracking
(item as any).isNewProduct = isNewProduct;
(item as any).storeProductId = storeProductId;
return item;
} catch (error) {
logger.error('pipeline', `Failed to save canonical product ${item.name}: ${error}`);
return null;
} finally {
client.release();
}
}
}
/**
* Create a crawl run record before starting crawl
*/
export async function createCrawlRun(
dispensaryId: number,
provider: string = 'dutchie',
triggerType: string = 'manual'
): Promise<number> {
const result = await pool.query(`
INSERT INTO crawl_runs (
dispensary_id, provider,
started_at, status, trigger_type
) VALUES ($1, $2, NOW(), 'running', $3)
RETURNING id
`, [dispensaryId, provider, triggerType]);
return result.rows[0].id;
}
/**
* Complete a crawl run with stats
*/
export async function completeCrawlRun(
crawlRunId: number,
stats: {
productsFound: number;
productsNew: number;
productsUpdated: number;
snapshotsWritten: number;
variantsUpserted?: number;
status?: 'completed' | 'failed' | 'partial';
error?: string;
}
): Promise<void> {
await pool.query(`
UPDATE crawl_runs SET
finished_at = NOW(),
status = $2,
products_found = $3,
products_new = $4,
products_updated = $5,
snapshots_written = $6,
metadata = jsonb_build_object(
'variants_upserted', $7,
'error', $8
)
WHERE id = $1
`, [
crawlRunId,
stats.status || 'completed',
stats.productsFound,
stats.productsNew,
stats.productsUpdated,
stats.snapshotsWritten,
stats.variantsUpserted || 0,
stats.error || null,
]);
}