-- ============================================================================ -- Migration 050: CannaiQ Canonical Schema v2 -- ============================================================================ -- -- Purpose: Add canonical tables for multi-state analytics, pricing engine, -- promotions, intelligence, and brand/buyer portals. -- -- RULES: -- - STRICTLY ADDITIVE (no DROP, DELETE, TRUNCATE, or ALTER column type) -- - All new tables use IF NOT EXISTS -- - All new columns use ADD COLUMN IF NOT EXISTS -- - All indexes use IF NOT EXISTS -- - Compatible with existing dutchie_products, dispensaries, etc. -- -- Run with: -- psql $CANNAIQ_DB_URL -f migrations/050_cannaiq_canonical_v2.sql -- -- ============================================================================ -- ============================================================================ -- SECTION 1: STATES TABLE -- ============================================================================ -- Reference table for US states. Already may exist from 041/043. -- This is idempotent. CREATE TABLE IF NOT EXISTS states ( id SERIAL PRIMARY KEY, code VARCHAR(2) NOT NULL UNIQUE, name VARCHAR(100) NOT NULL, timezone VARCHAR(50) DEFAULT 'America/Phoenix', is_active BOOLEAN DEFAULT TRUE, crawl_enabled BOOLEAN DEFAULT TRUE, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); -- Insert states if not present INSERT INTO states (code, name, timezone) VALUES ('AZ', 'Arizona', 'America/Phoenix'), ('CA', 'California', 'America/Los_Angeles'), ('CO', 'Colorado', 'America/Denver'), ('FL', 'Florida', 'America/New_York'), ('IL', 'Illinois', 'America/Chicago'), ('MA', 'Massachusetts', 'America/New_York'), ('MD', 'Maryland', 'America/New_York'), ('MI', 'Michigan', 'America/Detroit'), ('MO', 'Missouri', 'America/Chicago'), ('NV', 'Nevada', 'America/Los_Angeles'), ('NJ', 'New Jersey', 'America/New_York'), ('NY', 'New York', 'America/New_York'), ('OH', 'Ohio', 'America/New_York'), ('OK', 'Oklahoma', 'America/Chicago'), ('OR', 'Oregon', 'America/Los_Angeles'), ('PA', 'Pennsylvania', 'America/New_York'), ('WA', 'Washington', 'America/Los_Angeles') ON CONFLICT (code) DO UPDATE SET timezone = EXCLUDED.timezone, updated_at = NOW(); CREATE INDEX IF NOT EXISTS idx_states_code ON states(code); CREATE INDEX IF NOT EXISTS idx_states_active ON states(is_active) WHERE is_active = TRUE; COMMENT ON TABLE states IS 'US states where CannaiQ operates. Single source of truth for state configuration.'; -- ============================================================================ -- SECTION 2: CHAINS TABLE (Retail Groups) -- ============================================================================ -- Chains are multi-location operators like Curaleaf, Trulieve, Harvest, etc. CREATE TABLE IF NOT EXISTS chains ( id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, slug VARCHAR(255) NOT NULL UNIQUE, -- Branding website_url TEXT, logo_url TEXT, description TEXT, -- Business info headquarters_city VARCHAR(100), headquarters_state_id INTEGER REFERENCES states(id), founded_year INTEGER, -- Status is_active BOOLEAN DEFAULT TRUE, is_public BOOLEAN DEFAULT FALSE, -- Publicly traded? stock_ticker VARCHAR(10), -- Metadata created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_chains_slug ON chains(slug); CREATE INDEX IF NOT EXISTS idx_chains_active ON chains(is_active) WHERE is_active = TRUE; COMMENT ON TABLE chains IS 'Retail chains/groups that own multiple dispensary locations.'; -- ============================================================================ -- SECTION 3: CANONICAL BRANDS TABLE -- ============================================================================ -- This is the master brand catalog across all providers and states. -- Distinct from the per-store `brands` table which tracks store-level brand presence. CREATE TABLE IF NOT EXISTS canonical_brands ( id SERIAL PRIMARY KEY, name VARCHAR(255) NOT NULL, slug VARCHAR(255) NOT NULL UNIQUE, -- External IDs from various platforms dutchie_brand_id VARCHAR(100), jane_brand_id VARCHAR(100), treez_brand_id VARCHAR(100), weedmaps_brand_id VARCHAR(100), -- Branding logo_url TEXT, local_logo_path TEXT, -- Local storage path website_url TEXT, instagram_handle VARCHAR(100), description TEXT, -- Classification is_portfolio_brand BOOLEAN DEFAULT FALSE, -- TRUE if brand we represent is_house_brand BOOLEAN DEFAULT FALSE, -- TRUE if dispensary house brand parent_company VARCHAR(255), -- Parent company name if subsidiary -- State presence states_available TEXT[], -- Array of state codes where brand is present -- Status is_active BOOLEAN DEFAULT TRUE, is_verified BOOLEAN DEFAULT FALSE, -- Manually verified brand info verified_at TIMESTAMPTZ, -- Metadata created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_canonical_brands_slug ON canonical_brands(slug); CREATE INDEX IF NOT EXISTS idx_canonical_brands_dutchie ON canonical_brands(dutchie_brand_id) WHERE dutchie_brand_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_canonical_brands_portfolio ON canonical_brands(is_portfolio_brand) WHERE is_portfolio_brand = TRUE; CREATE INDEX IF NOT EXISTS idx_canonical_brands_states ON canonical_brands USING GIN(states_available); COMMENT ON TABLE canonical_brands IS 'Canonical brand catalog across all providers. Master brand reference.'; COMMENT ON COLUMN canonical_brands.is_portfolio_brand IS 'TRUE if this is a brand CannaiQ represents/manages.'; -- ============================================================================ -- SECTION 4: CRAWL_RUNS TABLE -- ============================================================================ -- One record per crawl execution. Links to snapshots. CREATE TABLE IF NOT EXISTS crawl_runs ( id SERIAL PRIMARY KEY, dispensary_id INTEGER NOT NULL REFERENCES dispensaries(id) ON DELETE CASCADE, state_id INTEGER REFERENCES states(id), -- Provider info provider VARCHAR(50) NOT NULL DEFAULT 'dutchie', -- Timing started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), finished_at TIMESTAMPTZ, duration_ms INTEGER, -- Status status VARCHAR(20) NOT NULL DEFAULT 'running', -- running, success, failed, partial error_code VARCHAR(50), error_message TEXT, http_status INTEGER, -- Results products_found INTEGER DEFAULT 0, products_new INTEGER DEFAULT 0, products_updated INTEGER DEFAULT 0, products_missing INTEGER DEFAULT 0, -- Products gone from feed snapshots_written INTEGER DEFAULT 0, -- Infrastructure worker_id VARCHAR(100), worker_hostname VARCHAR(100), proxy_used TEXT, trigger_type VARCHAR(50) DEFAULT 'scheduled', -- scheduled, manual, api -- Metadata metadata JSONB DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_crawl_runs_dispensary ON crawl_runs(dispensary_id); CREATE INDEX IF NOT EXISTS idx_crawl_runs_state ON crawl_runs(state_id) WHERE state_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_crawl_runs_status ON crawl_runs(status); CREATE INDEX IF NOT EXISTS idx_crawl_runs_started ON crawl_runs(started_at DESC); CREATE INDEX IF NOT EXISTS idx_crawl_runs_dispensary_started ON crawl_runs(dispensary_id, started_at DESC); COMMENT ON TABLE crawl_runs IS 'Each crawl execution. Links to snapshots and traces.'; -- ============================================================================ -- SECTION 5: STORE_PRODUCTS TABLE (Current Menu State) -- ============================================================================ -- Canonical representation of what's currently on the menu. -- Provider-agnostic structure for analytics. CREATE TABLE IF NOT EXISTS store_products ( id SERIAL PRIMARY KEY, dispensary_id INTEGER NOT NULL REFERENCES dispensaries(id) ON DELETE CASCADE, state_id INTEGER REFERENCES states(id), -- Links to canonical entities canonical_brand_id INTEGER REFERENCES canonical_brands(id) ON DELETE SET NULL, category_id INTEGER REFERENCES categories(id) ON DELETE SET NULL, -- Provider-specific identifiers provider VARCHAR(50) NOT NULL DEFAULT 'dutchie', provider_product_id VARCHAR(100) NOT NULL, -- Platform product ID provider_brand_id VARCHAR(100), -- Platform brand ID enterprise_product_id VARCHAR(100), -- Cross-store product ID -- Raw data from platform (not normalized) name VARCHAR(500) NOT NULL, brand_name VARCHAR(255), category VARCHAR(100), subcategory VARCHAR(100), strain_type VARCHAR(50), description TEXT, -- Pricing (current) price_rec NUMERIC(10,2), price_med NUMERIC(10,2), price_rec_special NUMERIC(10,2), price_med_special NUMERIC(10,2), is_on_special BOOLEAN DEFAULT FALSE, special_name TEXT, discount_percent NUMERIC(5,2), price_unit VARCHAR(20) DEFAULT 'each', -- gram, ounce, each, mg -- Inventory is_in_stock BOOLEAN DEFAULT TRUE, stock_quantity INTEGER, stock_status VARCHAR(50) DEFAULT 'in_stock', -- in_stock, out_of_stock, low_stock, missing_from_feed -- Potency thc_percent NUMERIC(5,2), cbd_percent NUMERIC(5,2), thc_mg NUMERIC(10,2), cbd_mg NUMERIC(10,2), -- Weight/Size weight_value NUMERIC(10,2), weight_unit VARCHAR(20), -- g, oz, mg -- Images image_url TEXT, local_image_path TEXT, thumbnail_url TEXT, -- Flags is_featured BOOLEAN DEFAULT FALSE, medical_only BOOLEAN DEFAULT FALSE, rec_only BOOLEAN DEFAULT FALSE, -- Menu position (for tracking prominence) menu_position INTEGER, -- Timestamps first_seen_at TIMESTAMPTZ DEFAULT NOW(), last_seen_at TIMESTAMPTZ DEFAULT NOW(), last_price_change_at TIMESTAMPTZ, last_stock_change_at TIMESTAMPTZ, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE(dispensary_id, provider, provider_product_id) ); CREATE INDEX IF NOT EXISTS idx_store_products_dispensary ON store_products(dispensary_id); CREATE INDEX IF NOT EXISTS idx_store_products_state ON store_products(state_id) WHERE state_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_store_products_brand ON store_products(canonical_brand_id) WHERE canonical_brand_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_store_products_category ON store_products(category) WHERE category IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_store_products_in_stock ON store_products(dispensary_id, is_in_stock); CREATE INDEX IF NOT EXISTS idx_store_products_special ON store_products(dispensary_id, is_on_special) WHERE is_on_special = TRUE; CREATE INDEX IF NOT EXISTS idx_store_products_last_seen ON store_products(last_seen_at DESC); CREATE INDEX IF NOT EXISTS idx_store_products_provider ON store_products(provider); CREATE INDEX IF NOT EXISTS idx_store_products_enterprise ON store_products(enterprise_product_id) WHERE enterprise_product_id IS NOT NULL; COMMENT ON TABLE store_products IS 'Current state of products on each dispensary menu. Provider-agnostic.'; -- ============================================================================ -- SECTION 6: STORE_PRODUCT_SNAPSHOTS TABLE (Historical Data) -- ============================================================================ -- Time-series data for analytics. One row per product per crawl. -- CRITICAL: NEVER DELETE from this table. CREATE TABLE IF NOT EXISTS store_product_snapshots ( id SERIAL PRIMARY KEY, dispensary_id INTEGER NOT NULL REFERENCES dispensaries(id) ON DELETE CASCADE, store_product_id INTEGER REFERENCES store_products(id) ON DELETE SET NULL, state_id INTEGER REFERENCES states(id), -- Provider info provider VARCHAR(50) NOT NULL DEFAULT 'dutchie', provider_product_id VARCHAR(100), -- Link to crawl run crawl_run_id INTEGER REFERENCES crawl_runs(id) ON DELETE SET NULL, -- Capture timestamp captured_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- Raw data from platform name VARCHAR(500), brand_name VARCHAR(255), category VARCHAR(100), subcategory VARCHAR(100), -- Pricing at time of capture price_rec NUMERIC(10,2), price_med NUMERIC(10,2), price_rec_special NUMERIC(10,2), price_med_special NUMERIC(10,2), is_on_special BOOLEAN DEFAULT FALSE, discount_percent NUMERIC(5,2), -- Inventory at time of capture is_in_stock BOOLEAN DEFAULT TRUE, stock_quantity INTEGER, stock_status VARCHAR(50) DEFAULT 'in_stock', is_present_in_feed BOOLEAN DEFAULT TRUE, -- FALSE = missing from feed -- Potency at time of capture thc_percent NUMERIC(5,2), cbd_percent NUMERIC(5,2), -- Menu position (for tracking prominence changes) menu_position INTEGER, -- Image URL at time of capture image_url TEXT, -- Full raw response for debugging raw_data JSONB, created_at TIMESTAMPTZ DEFAULT NOW() ); -- Partitioning-ready indexes (for future table partitioning by month) CREATE INDEX IF NOT EXISTS idx_snapshots_dispensary_captured ON store_product_snapshots(dispensary_id, captured_at DESC); CREATE INDEX IF NOT EXISTS idx_snapshots_state_captured ON store_product_snapshots(state_id, captured_at DESC) WHERE state_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_snapshots_product_captured ON store_product_snapshots(store_product_id, captured_at DESC) WHERE store_product_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_snapshots_crawl_run ON store_product_snapshots(crawl_run_id) WHERE crawl_run_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_snapshots_captured_at ON store_product_snapshots(captured_at DESC); CREATE INDEX IF NOT EXISTS idx_snapshots_brand ON store_product_snapshots(brand_name) WHERE brand_name IS NOT NULL; COMMENT ON TABLE store_product_snapshots IS 'Historical crawl data. One row per product per crawl. NEVER DELETE.'; -- ============================================================================ -- SECTION 7: ADD state_id AND chain_id TO DISPENSARIES -- ============================================================================ -- Link dispensaries to states and chains tables. ALTER TABLE dispensaries ADD COLUMN IF NOT EXISTS state_id INTEGER REFERENCES states(id); ALTER TABLE dispensaries ADD COLUMN IF NOT EXISTS chain_id INTEGER REFERENCES chains(id); CREATE INDEX IF NOT EXISTS idx_dispensaries_state_id ON dispensaries(state_id); CREATE INDEX IF NOT EXISTS idx_dispensaries_chain_id ON dispensaries(chain_id) WHERE chain_id IS NOT NULL; -- Backfill state_id from existing state column UPDATE dispensaries d SET state_id = s.id FROM states s WHERE d.state = s.code AND d.state_id IS NULL; COMMENT ON COLUMN dispensaries.state_id IS 'FK to states table. Canonical state reference.'; COMMENT ON COLUMN dispensaries.chain_id IS 'FK to chains table. NULL if independent dispensary.'; -- ============================================================================ -- SECTION 8: BRAND PENETRATION TABLE -- ============================================================================ -- Pre-computed brand presence across stores for analytics dashboards. CREATE TABLE IF NOT EXISTS brand_penetration ( id SERIAL PRIMARY KEY, canonical_brand_id INTEGER NOT NULL REFERENCES canonical_brands(id) ON DELETE CASCADE, state_id INTEGER NOT NULL REFERENCES states(id) ON DELETE CASCADE, -- Metrics stores_carrying INTEGER DEFAULT 0, stores_total INTEGER DEFAULT 0, penetration_pct NUMERIC(5,2) DEFAULT 0, -- Product breakdown products_count INTEGER DEFAULT 0, products_in_stock INTEGER DEFAULT 0, products_on_special INTEGER DEFAULT 0, -- Pricing avg_price NUMERIC(10,2), min_price NUMERIC(10,2), max_price NUMERIC(10,2), -- Time range calculated_at TIMESTAMPTZ DEFAULT NOW(), period_start TIMESTAMPTZ, period_end TIMESTAMPTZ, UNIQUE(canonical_brand_id, state_id, calculated_at) ); CREATE INDEX IF NOT EXISTS idx_brand_penetration_brand ON brand_penetration(canonical_brand_id); CREATE INDEX IF NOT EXISTS idx_brand_penetration_state ON brand_penetration(state_id); CREATE INDEX IF NOT EXISTS idx_brand_penetration_calculated ON brand_penetration(calculated_at DESC); COMMENT ON TABLE brand_penetration IS 'Pre-computed brand penetration metrics by state.'; -- ============================================================================ -- SECTION 9: PRICE_ALERTS TABLE -- ============================================================================ -- Track significant price changes for intelligence/alerts. CREATE TABLE IF NOT EXISTS price_alerts ( id SERIAL PRIMARY KEY, store_product_id INTEGER REFERENCES store_products(id) ON DELETE CASCADE, dispensary_id INTEGER NOT NULL REFERENCES dispensaries(id) ON DELETE CASCADE, state_id INTEGER REFERENCES states(id), -- What changed alert_type VARCHAR(50) NOT NULL, -- price_drop, price_increase, new_special, special_ended -- Values old_price NUMERIC(10,2), new_price NUMERIC(10,2), change_amount NUMERIC(10,2), change_percent NUMERIC(5,2), -- Context product_name VARCHAR(500), brand_name VARCHAR(255), category VARCHAR(100), -- Status is_processed BOOLEAN DEFAULT FALSE, processed_at TIMESTAMPTZ, created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_price_alerts_dispensary ON price_alerts(dispensary_id); CREATE INDEX IF NOT EXISTS idx_price_alerts_state ON price_alerts(state_id) WHERE state_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_price_alerts_type ON price_alerts(alert_type); CREATE INDEX IF NOT EXISTS idx_price_alerts_unprocessed ON price_alerts(is_processed) WHERE is_processed = FALSE; CREATE INDEX IF NOT EXISTS idx_price_alerts_created ON price_alerts(created_at DESC); COMMENT ON TABLE price_alerts IS 'Significant price changes for intelligence/alerting.'; -- ============================================================================ -- SECTION 10: RAW_PAYLOADS TABLE -- ============================================================================ -- Store raw API responses for replay/debugging. Separate from snapshots. CREATE TABLE IF NOT EXISTS raw_payloads ( id SERIAL PRIMARY KEY, dispensary_id INTEGER NOT NULL REFERENCES dispensaries(id) ON DELETE CASCADE, crawl_run_id INTEGER REFERENCES crawl_runs(id) ON DELETE SET NULL, -- Payload info provider VARCHAR(50) NOT NULL DEFAULT 'dutchie', payload_type VARCHAR(50) NOT NULL DEFAULT 'products', -- products, brands, specials -- The raw data payload JSONB NOT NULL, payload_size_bytes INTEGER, -- Deduplication payload_hash VARCHAR(64), -- SHA256 for deduplication -- Processing status is_processed BOOLEAN DEFAULT FALSE, processed_at TIMESTAMPTZ, captured_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_raw_payloads_dispensary ON raw_payloads(dispensary_id, captured_at DESC); CREATE INDEX IF NOT EXISTS idx_raw_payloads_crawl_run ON raw_payloads(crawl_run_id) WHERE crawl_run_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_raw_payloads_unprocessed ON raw_payloads(is_processed) WHERE is_processed = FALSE; CREATE INDEX IF NOT EXISTS idx_raw_payloads_hash ON raw_payloads(payload_hash) WHERE payload_hash IS NOT NULL; COMMENT ON TABLE raw_payloads IS 'Raw API responses for replay/debugging. Enables re-hydration.'; -- ============================================================================ -- SECTION 11: ANALYTICS CACHE TABLES -- ============================================================================ -- Pre-computed analytics for dashboard performance. -- Daily store metrics CREATE TABLE IF NOT EXISTS analytics_store_daily ( id SERIAL PRIMARY KEY, dispensary_id INTEGER NOT NULL REFERENCES dispensaries(id) ON DELETE CASCADE, state_id INTEGER REFERENCES states(id), date DATE NOT NULL, -- Product counts total_products INTEGER DEFAULT 0, in_stock_products INTEGER DEFAULT 0, out_of_stock_products INTEGER DEFAULT 0, on_special_products INTEGER DEFAULT 0, -- Brand/category diversity unique_brands INTEGER DEFAULT 0, unique_categories INTEGER DEFAULT 0, -- Pricing avg_price NUMERIC(10,2), median_price NUMERIC(10,2), -- Crawl health crawl_count INTEGER DEFAULT 0, successful_crawls INTEGER DEFAULT 0, created_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE(dispensary_id, date) ); CREATE INDEX IF NOT EXISTS idx_analytics_store_daily_dispensary ON analytics_store_daily(dispensary_id, date DESC); CREATE INDEX IF NOT EXISTS idx_analytics_store_daily_state ON analytics_store_daily(state_id, date DESC) WHERE state_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_analytics_store_daily_date ON analytics_store_daily(date DESC); -- Daily brand metrics CREATE TABLE IF NOT EXISTS analytics_brand_daily ( id SERIAL PRIMARY KEY, canonical_brand_id INTEGER NOT NULL REFERENCES canonical_brands(id) ON DELETE CASCADE, state_id INTEGER REFERENCES states(id), date DATE NOT NULL, -- Presence stores_carrying INTEGER DEFAULT 0, products_count INTEGER DEFAULT 0, -- Stock in_stock_count INTEGER DEFAULT 0, out_of_stock_count INTEGER DEFAULT 0, -- Pricing avg_price NUMERIC(10,2), min_price NUMERIC(10,2), max_price NUMERIC(10,2), on_special_count INTEGER DEFAULT 0, created_at TIMESTAMPTZ DEFAULT NOW(), UNIQUE(canonical_brand_id, state_id, date) ); CREATE INDEX IF NOT EXISTS idx_analytics_brand_daily_brand ON analytics_brand_daily(canonical_brand_id, date DESC); CREATE INDEX IF NOT EXISTS idx_analytics_brand_daily_state ON analytics_brand_daily(state_id, date DESC) WHERE state_id IS NOT NULL; -- ============================================================================ -- SECTION 12: VIEWS FOR COMPATIBILITY -- ============================================================================ -- View: Latest snapshot per store product CREATE OR REPLACE VIEW v_latest_store_snapshots AS SELECT DISTINCT ON (dispensary_id, provider_product_id) sps.* FROM store_product_snapshots sps ORDER BY dispensary_id, provider_product_id, captured_at DESC; -- View: Crawl run summary per dispensary CREATE OR REPLACE VIEW v_dispensary_crawl_summary AS SELECT d.id AS dispensary_id, COALESCE(d.dba_name, d.name) AS dispensary_name, d.city, d.state, d.state_id, s.name AS state_name, COUNT(DISTINCT sp.id) AS current_product_count, COUNT(DISTINCT sp.id) FILTER (WHERE sp.is_in_stock) AS in_stock_count, COUNT(DISTINCT sp.id) FILTER (WHERE sp.is_on_special) AS on_special_count, MAX(cr.finished_at) AS last_crawl_at, (SELECT status FROM crawl_runs WHERE dispensary_id = d.id ORDER BY started_at DESC LIMIT 1) AS last_crawl_status FROM dispensaries d LEFT JOIN states s ON s.id = d.state_id LEFT JOIN store_products sp ON sp.dispensary_id = d.id LEFT JOIN crawl_runs cr ON cr.dispensary_id = d.id GROUP BY d.id, d.dba_name, d.name, d.city, d.state, d.state_id, s.name; -- View: Brand presence across stores CREATE OR REPLACE VIEW v_brand_store_presence AS SELECT cb.id AS brand_id, cb.name AS brand_name, cb.slug AS brand_slug, s.id AS state_id, s.code AS state_code, COUNT(DISTINCT sp.dispensary_id) AS store_count, COUNT(sp.id) AS product_count, COUNT(sp.id) FILTER (WHERE sp.is_in_stock) AS in_stock_count, AVG(sp.price_rec) AS avg_price, MIN(sp.price_rec) AS min_price, MAX(sp.price_rec) AS max_price FROM canonical_brands cb JOIN store_products sp ON sp.canonical_brand_id = cb.id LEFT JOIN states s ON s.id = sp.state_id GROUP BY cb.id, cb.name, cb.slug, s.id, s.code; -- ============================================================================ -- SECTION 13: ADD FK FROM store_product_snapshots TO crawl_runs -- ============================================================================ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.table_constraints WHERE constraint_name = 'store_product_snapshots_crawl_run_id_fkey' ) THEN ALTER TABLE store_product_snapshots ADD CONSTRAINT store_product_snapshots_crawl_run_id_fkey FOREIGN KEY (crawl_run_id) REFERENCES crawl_runs(id) ON DELETE SET NULL; END IF; END $$; -- ============================================================================ -- SECTION 14: ADD crawl_run_id TO crawl_orchestration_traces -- ============================================================================ ALTER TABLE crawl_orchestration_traces ADD COLUMN IF NOT EXISTS crawl_run_id INTEGER REFERENCES crawl_runs(id) ON DELETE SET NULL; CREATE INDEX IF NOT EXISTS idx_traces_crawl_run ON crawl_orchestration_traces(crawl_run_id) WHERE crawl_run_id IS NOT NULL; -- ============================================================================ -- SECTION 15: UPDATE dispensary_crawler_profiles -- ============================================================================ -- Add status columns for profile lifecycle. ALTER TABLE dispensary_crawler_profiles ADD COLUMN IF NOT EXISTS status VARCHAR(50) DEFAULT 'sandbox'; ALTER TABLE dispensary_crawler_profiles ADD COLUMN IF NOT EXISTS allow_autopromote BOOLEAN DEFAULT FALSE; ALTER TABLE dispensary_crawler_profiles ADD COLUMN IF NOT EXISTS validated_at TIMESTAMPTZ; CREATE INDEX IF NOT EXISTS idx_profiles_status ON dispensary_crawler_profiles(status); COMMENT ON COLUMN dispensary_crawler_profiles.status IS 'Profile status: sandbox, production, needs_manual, disabled'; -- ============================================================================ -- SECTION 16: UPDATE dispensary_crawl_jobs WITH ADDITIONAL COLUMNS -- ============================================================================ -- Add columns needed for enhanced job tracking. ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS worker_id VARCHAR(100); ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS worker_hostname VARCHAR(100); ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS claimed_by VARCHAR(100); ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS claimed_at TIMESTAMPTZ; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS locked_until TIMESTAMPTZ; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS last_heartbeat_at TIMESTAMPTZ; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS max_retries INTEGER DEFAULT 3; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS products_upserted INTEGER DEFAULT 0; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS snapshots_created INTEGER DEFAULT 0; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS current_page INTEGER DEFAULT 0; ALTER TABLE dispensary_crawl_jobs ADD COLUMN IF NOT EXISTS total_pages INTEGER; CREATE INDEX IF NOT EXISTS idx_crawl_jobs_status_pending ON dispensary_crawl_jobs(status) WHERE status = 'pending'; CREATE INDEX IF NOT EXISTS idx_crawl_jobs_claimed_by ON dispensary_crawl_jobs(claimed_by) WHERE claimed_by IS NOT NULL; -- ============================================================================ -- SECTION 17: QUEUE MONITORING VIEWS -- ============================================================================ CREATE OR REPLACE VIEW v_queue_stats AS SELECT (SELECT COUNT(*) FROM dispensary_crawl_jobs WHERE status = 'pending') AS pending_jobs, (SELECT COUNT(*) FROM dispensary_crawl_jobs WHERE status = 'running') AS running_jobs, (SELECT COUNT(*) FROM dispensary_crawl_jobs WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') AS completed_1h, (SELECT COUNT(*) FROM dispensary_crawl_jobs WHERE status = 'failed' AND completed_at > NOW() - INTERVAL '1 hour') AS failed_1h, (SELECT COUNT(DISTINCT worker_id) FROM dispensary_crawl_jobs WHERE status = 'running' AND worker_id IS NOT NULL) AS active_workers, (SELECT AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) FROM dispensary_crawl_jobs WHERE status = 'completed' AND completed_at > NOW() - INTERVAL '1 hour') AS avg_duration_seconds; CREATE OR REPLACE VIEW v_active_workers AS SELECT worker_id, worker_hostname, COUNT(*) AS current_jobs, SUM(products_found) AS total_products_found, SUM(products_upserted) AS total_products_upserted, SUM(snapshots_created) AS total_snapshots, MIN(claimed_at) AS first_claimed_at, MAX(last_heartbeat_at) AS last_heartbeat FROM dispensary_crawl_jobs WHERE status = 'running' AND worker_id IS NOT NULL GROUP BY worker_id, worker_hostname; -- ============================================================================ -- DONE -- ============================================================================ SELECT 'Migration 050 completed successfully. Canonical schema v2 is ready.' AS status;