Major additions: - Multi-state expansion: states table, StateSelector, NationalDashboard, StateHeatmap, CrossStateCompare - Orchestrator services: trace service, error taxonomy, retry manager, proxy rotator - Discovery system: dutchie discovery service, geo validation, city seeding scripts - Analytics infrastructure: analytics v2 routes, brand/pricing/stores intelligence pages - Local development: setup-local.sh starts all 5 services (postgres, backend, cannaiq, findadispo, findagram) - Migrations 037-056: crawler profiles, states, analytics indexes, worker metadata Frontend pages added: - Discovery, ChainsDashboard, IntelligenceBrands, IntelligencePricing, IntelligenceStores - StateHeatmap, CrossStateCompare, SyncInfoPanel Components added: - StateSelector, OrchestratorTraceModal, WorkflowStepper 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
131 lines
4.6 KiB
SQL
131 lines
4.6 KiB
SQL
-- Migration 046: Raw Payloads Table
|
|
--
|
|
-- Immutable event stream for raw crawler responses.
|
|
-- NEVER delete or overwrite historical payloads.
|
|
--
|
|
-- Run with:
|
|
-- DATABASE_URL="postgresql://..." psql $DATABASE_URL -f migrations/046_raw_payloads_table.sql
|
|
|
|
-- =====================================================
|
|
-- 1) RAW_PAYLOADS TABLE
|
|
-- =====================================================
|
|
CREATE TABLE IF NOT EXISTS raw_payloads (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
|
|
-- Store reference
|
|
dispensary_id INTEGER NOT NULL REFERENCES dispensaries(id) ON DELETE CASCADE,
|
|
|
|
-- Crawl run reference (nullable for backfilled data)
|
|
crawl_run_id INTEGER REFERENCES crawl_runs(id) ON DELETE SET NULL,
|
|
|
|
-- Platform identification
|
|
platform VARCHAR(50) NOT NULL DEFAULT 'dutchie',
|
|
|
|
-- Versioning for schema evolution
|
|
payload_version INTEGER NOT NULL DEFAULT 1,
|
|
|
|
-- The raw JSON response from the crawler (immutable)
|
|
raw_json JSONB NOT NULL,
|
|
|
|
-- Metadata
|
|
product_count INTEGER, -- Number of products in payload
|
|
pricing_type VARCHAR(20), -- 'rec', 'med', or 'both'
|
|
crawl_mode VARCHAR(20), -- 'mode_a', 'mode_b', 'dual'
|
|
|
|
-- Timestamps
|
|
fetched_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
|
|
-- Hydration status
|
|
processed BOOLEAN NOT NULL DEFAULT FALSE,
|
|
normalized_at TIMESTAMPTZ,
|
|
hydration_error TEXT,
|
|
hydration_attempts INTEGER DEFAULT 0,
|
|
|
|
-- Audit
|
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
-- =====================================================
|
|
-- 2) INDEXES FOR EFFICIENT QUERYING
|
|
-- =====================================================
|
|
|
|
-- Primary lookup: unprocessed payloads in FIFO order
|
|
CREATE INDEX IF NOT EXISTS idx_raw_payloads_unprocessed
|
|
ON raw_payloads(fetched_at ASC)
|
|
WHERE processed = FALSE;
|
|
|
|
-- Store-based lookups
|
|
CREATE INDEX IF NOT EXISTS idx_raw_payloads_dispensary
|
|
ON raw_payloads(dispensary_id, fetched_at DESC);
|
|
|
|
-- Platform filtering
|
|
CREATE INDEX IF NOT EXISTS idx_raw_payloads_platform
|
|
ON raw_payloads(platform);
|
|
|
|
-- Crawl run linkage
|
|
CREATE INDEX IF NOT EXISTS idx_raw_payloads_crawl_run
|
|
ON raw_payloads(crawl_run_id)
|
|
WHERE crawl_run_id IS NOT NULL;
|
|
|
|
-- Error tracking
|
|
CREATE INDEX IF NOT EXISTS idx_raw_payloads_errors
|
|
ON raw_payloads(hydration_attempts, processed)
|
|
WHERE hydration_error IS NOT NULL;
|
|
|
|
-- =====================================================
|
|
-- 3) HYDRATION LOCKS TABLE (distributed locking)
|
|
-- =====================================================
|
|
CREATE TABLE IF NOT EXISTS hydration_locks (
|
|
id SERIAL PRIMARY KEY,
|
|
lock_name VARCHAR(100) NOT NULL UNIQUE,
|
|
worker_id VARCHAR(100) NOT NULL,
|
|
acquired_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
expires_at TIMESTAMPTZ NOT NULL,
|
|
heartbeat_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_hydration_locks_expires
|
|
ON hydration_locks(expires_at);
|
|
|
|
-- =====================================================
|
|
-- 4) HYDRATION_RUNS TABLE (audit trail)
|
|
-- =====================================================
|
|
CREATE TABLE IF NOT EXISTS hydration_runs (
|
|
id SERIAL PRIMARY KEY,
|
|
worker_id VARCHAR(100) NOT NULL,
|
|
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
finished_at TIMESTAMPTZ,
|
|
status VARCHAR(20) NOT NULL DEFAULT 'running', -- running, completed, failed
|
|
|
|
-- Metrics
|
|
payloads_processed INTEGER DEFAULT 0,
|
|
products_upserted INTEGER DEFAULT 0,
|
|
snapshots_created INTEGER DEFAULT 0,
|
|
brands_created INTEGER DEFAULT 0,
|
|
errors_count INTEGER DEFAULT 0,
|
|
|
|
-- Error details
|
|
error_message TEXT,
|
|
|
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_hydration_runs_status
|
|
ON hydration_runs(status, started_at DESC);
|
|
|
|
-- =====================================================
|
|
-- 5) COMMENTS
|
|
-- =====================================================
|
|
COMMENT ON TABLE raw_payloads IS 'Immutable event stream of raw crawler responses. NEVER DELETE.';
|
|
COMMENT ON COLUMN raw_payloads.raw_json IS 'Complete raw JSON from GraphQL/API response. Immutable.';
|
|
COMMENT ON COLUMN raw_payloads.payload_version IS 'Schema version for normalization compatibility.';
|
|
COMMENT ON COLUMN raw_payloads.processed IS 'TRUE when payload has been hydrated to canonical tables.';
|
|
COMMENT ON COLUMN raw_payloads.normalized_at IS 'When the payload was successfully hydrated.';
|
|
|
|
COMMENT ON TABLE hydration_locks IS 'Distributed locks for hydration workers to prevent double-processing.';
|
|
COMMENT ON TABLE hydration_runs IS 'Audit trail of hydration job executions.';
|
|
|
|
-- =====================================================
|
|
-- MIGRATION COMPLETE
|
|
-- =====================================================
|