Major additions: - Multi-state expansion: states table, StateSelector, NationalDashboard, StateHeatmap, CrossStateCompare - Orchestrator services: trace service, error taxonomy, retry manager, proxy rotator - Discovery system: dutchie discovery service, geo validation, city seeding scripts - Analytics infrastructure: analytics v2 routes, brand/pricing/stores intelligence pages - Local development: setup-local.sh starts all 5 services (postgres, backend, cannaiq, findadispo, findagram) - Migrations 037-056: crawler profiles, states, analytics indexes, worker metadata Frontend pages added: - Discovery, ChainsDashboard, IntelligenceBrands, IntelligencePricing, IntelligenceStores - StateHeatmap, CrossStateCompare, SyncInfoPanel Components added: - StateSelector, OrchestratorTraceModal, WorkflowStepper 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
6.4 KiB
Canonical Hydration Pipeline - Runbook
Overview
The Canonical Hydration Pipeline transforms data from the dutchie_* source tables into the provider-agnostic canonical tables (store_products, store_product_snapshots, crawl_runs). This enables:
- Unified analytics across multiple data providers
- Historical price/inventory tracking
- Provider-agnostic API endpoints
Architecture
Source Tables (read-only):
dutchie_products → StoreProductNormalizer → store_products
dutchie_product_snapshots → SnapshotWriter → store_product_snapshots
dispensary_crawl_jobs → CrawlRunRecorder → crawl_runs
Orchestration:
CanonicalHydrationService coordinates all transformations
Table Mappings
dutchie_products → store_products
| Source Column | Target Column | Notes |
|---|---|---|
| dispensary_id | dispensary_id | Direct mapping |
| external_product_id | provider_product_id | Canonical key |
| platform | provider | 'dutchie' |
| name | name_raw | Raw product name |
| brand_name | brand_name_raw | Raw brand name |
| type/subcategory | category_raw | Category info |
| price_rec (JSONB) | price_rec (DECIMAL) | Extracted from JSONB |
| price_med (JSONB) | price_med (DECIMAL) | Extracted from JSONB |
| thc | thc_percent | Parsed percentage |
| cbd | cbd_percent | Parsed percentage |
| stock_status | is_in_stock | Boolean conversion |
| total_quantity_available | stock_quantity | Direct mapping |
| primary_image_url | image_url | Direct mapping |
| created_at | first_seen_at | First seen timestamp |
| updated_at | last_seen_at | Last seen timestamp |
Canonical Keys
- store_products:
(dispensary_id, provider, provider_product_id) - store_product_snapshots:
(store_product_id, crawl_run_id) - crawl_runs:
(source_job_type, source_job_id)
CLI Commands
Check Hydration Status
# Overall status
DATABASE_URL="..." npx tsx src/canonical-hydration/cli/backfill.ts --status
# Single dispensary status
DATABASE_URL="..." npx tsx src/canonical-hydration/cli/backfill.ts --status --dispensary-id 112
Products-Only Hydration
Use when source data has products but no historical snapshots/job records.
# Dry run (see what would be done)
DATABASE_URL="..." npx tsx src/canonical-hydration/cli/products-only.ts --dry-run
# Hydrate single dispensary
DATABASE_URL="..." npx tsx src/canonical-hydration/cli/products-only.ts --dispensary-id 112
# Hydrate all dispensaries
DATABASE_URL="..." npx tsx src/canonical-hydration/cli/products-only.ts
Backfill Hydration
Use when source data has historical job records in dispensary_crawl_jobs.
# Dry run
DATABASE_URL="..." npx tsx src/canonical-hydration/cli/backfill.ts --dry-run
# Backfill with date range
DATABASE_URL="..." npx tsx src/canonical-hydration/cli/backfill.ts --start-date 2024-01-01 --end-date 2024-12-31
# Backfill single dispensary
DATABASE_URL="..." npx tsx src/canonical-hydration/cli/backfill.ts --dispensary-id 112
Incremental Hydration
Use for ongoing hydration of new data.
# Single run
DATABASE_URL="..." npx tsx src/canonical-hydration/cli/incremental.ts
# Continuous loop (runs every 60 seconds)
DATABASE_URL="..." npx tsx src/canonical-hydration/cli/incremental.ts --loop
# Continuous loop with custom interval
DATABASE_URL="..." npx tsx src/canonical-hydration/cli/incremental.ts --loop --interval 300
Migration
Apply the schema migration before first use:
# Apply migration 050
DATABASE_URL="..." psql -f src/migrations/050_canonical_hydration_schema.sql
This migration adds:
source_job_typeandsource_job_idcolumns tocrawl_runs- Unique index on
crawl_runs (source_job_type, source_job_id) - Unique index on
store_product_snapshots (store_product_id, crawl_run_id) - Performance indexes for hydration queries
Idempotency
All hydration operations are idempotent:
- crawl_runs: ON CONFLICT updates existing records
- store_products: ON CONFLICT updates mutable fields
- store_product_snapshots: ON CONFLICT DO NOTHING
Re-running hydration is safe and will not create duplicates.
Monitoring
Check Canonical Data
-- Count canonical records
SELECT
(SELECT COUNT(*) FROM crawl_runs WHERE provider = 'dutchie') as crawl_runs,
(SELECT COUNT(*) FROM store_products WHERE provider = 'dutchie') as products,
(SELECT COUNT(*) FROM store_product_snapshots) as snapshots;
-- Products by dispensary
SELECT dispensary_id, COUNT(*) as products
FROM store_products
WHERE provider = 'dutchie'
GROUP BY dispensary_id
ORDER BY products DESC;
-- Recent crawl runs
SELECT id, dispensary_id, started_at, products_found, snapshots_written
FROM crawl_runs
ORDER BY started_at DESC
LIMIT 10;
Verify Hydration Completeness
-- Compare source vs canonical product counts
SELECT
dp.dispensary_id,
COUNT(DISTINCT dp.id) as source_products,
COUNT(DISTINCT sp.id) as canonical_products
FROM dutchie_products dp
LEFT JOIN store_products sp
ON sp.dispensary_id = dp.dispensary_id
AND sp.provider = 'dutchie'
AND sp.provider_product_id = dp.external_product_id
GROUP BY dp.dispensary_id
ORDER BY dp.dispensary_id;
Troubleshooting
"invalid input syntax for type integer"
This usually means a type mismatch between source and target columns. The most common case is brand_id - the source has UUID strings but the target expects integers. The normalizer sets brand_id = null to handle this.
"could not determine data type of parameter $1"
This indicates a batch insert issue with parameter indexing. Ensure each batch has its own parameter indexing starting from $1.
Empty Snapshots
If snapshotsWritten is 0 but products were upserted:
- Check if snapshots already exist for the crawl run (ON CONFLICT DO NOTHING)
- Verify store_products exist with the correct dispensary_id and provider
Performance
Typical performance metrics:
- ~1000 products/second for upsert
- ~2000 snapshots/second for insert
- 39 dispensaries with 37K products: ~17 seconds
For large backfills, use --batch-size to control memory usage.
Known Limitations
- brand_id not mapped: Source brand_id is UUID, target expects integer. Currently set to null.
- No historical snapshots: If source has no
dutchie_product_snapshots, use products-only mode which creates initial snapshots from current product state. - Source jobs empty: If
dispensary_crawl_jobsis empty, use products-only mode.