Files
cannaiq/backend/migrations/051_worker_definitions.sql
Kelly 2f483b3084 feat: SEO template library, discovery pipeline, and orchestrator enhancements
## SEO Template Library
- Add complete template library with 7 page types (state, city, category, brand, product, search, regeneration)
- Add Template Library tab in SEO Orchestrator with accordion-based editors
- Add template preview, validation, and variable injection engine
- Add API endpoints: /api/seo/templates, preview, validate, generate, regenerate

## Discovery Pipeline
- Add promotion.ts for discovery location validation and promotion
- Add discover-all-states.ts script for multi-state discovery
- Add promotion log migration (067)
- Enhance discovery routes and types

## Orchestrator & Admin
- Add crawl_enabled filter to stores page
- Add API permissions page
- Add job queue management
- Add price analytics routes
- Add markets and intelligence routes
- Enhance dashboard and worker monitoring

## Infrastructure
- Add migrations for worker definitions, SEO settings, field alignment
- Add canonical pipeline for scraper v2
- Update hydration and sync orchestrator
- Enhance multi-state query service

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-09 00:05:34 -07:00

120 lines
4.0 KiB
SQL

-- Migration 051: Worker Definitions
-- Creates a dedicated workers table for named workers with roles and assignments
-- Workers table - defines named workers with roles
CREATE TABLE IF NOT EXISTS workers (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL UNIQUE,
role VARCHAR(100) NOT NULL,
description TEXT,
enabled BOOLEAN DEFAULT TRUE,
-- Schedule configuration (for dedicated crawl workers)
schedule_type VARCHAR(50) DEFAULT 'interval', -- 'interval', 'cron', 'manual'
interval_minutes INTEGER DEFAULT 240,
cron_expression VARCHAR(100), -- e.g., '0 */4 * * *'
jitter_minutes INTEGER DEFAULT 30,
-- Assignment scope
assignment_type VARCHAR(50) DEFAULT 'all', -- 'all', 'state', 'dispensary', 'chain'
assigned_state_codes TEXT[], -- e.g., ['AZ', 'CA']
assigned_dispensary_ids INTEGER[],
assigned_chain_ids INTEGER[],
-- Job configuration
job_type VARCHAR(50) NOT NULL DEFAULT 'dutchie_product_crawl',
job_config JSONB DEFAULT '{}',
priority INTEGER DEFAULT 0,
max_concurrent INTEGER DEFAULT 1,
-- Status tracking
status VARCHAR(50) DEFAULT 'idle', -- 'idle', 'running', 'paused', 'error'
last_run_at TIMESTAMPTZ,
last_status VARCHAR(50),
last_error TEXT,
last_duration_ms INTEGER,
next_run_at TIMESTAMPTZ,
current_job_id INTEGER,
-- Metrics
total_runs INTEGER DEFAULT 0,
successful_runs INTEGER DEFAULT 0,
failed_runs INTEGER DEFAULT 0,
avg_duration_ms INTEGER,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Worker run history
CREATE TABLE IF NOT EXISTS worker_runs (
id SERIAL PRIMARY KEY,
worker_id INTEGER NOT NULL REFERENCES workers(id) ON DELETE CASCADE,
started_at TIMESTAMPTZ DEFAULT NOW(),
completed_at TIMESTAMPTZ,
status VARCHAR(50) DEFAULT 'running', -- 'running', 'success', 'error', 'cancelled'
duration_ms INTEGER,
-- What was processed
jobs_created INTEGER DEFAULT 0,
jobs_completed INTEGER DEFAULT 0,
jobs_failed INTEGER DEFAULT 0,
dispensaries_crawled INTEGER DEFAULT 0,
products_found INTEGER DEFAULT 0,
error_message TEXT,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Index for efficient lookups
CREATE INDEX IF NOT EXISTS idx_workers_enabled ON workers(enabled) WHERE enabled = TRUE;
CREATE INDEX IF NOT EXISTS idx_workers_next_run ON workers(next_run_at) WHERE enabled = TRUE;
CREATE INDEX IF NOT EXISTS idx_workers_status ON workers(status);
CREATE INDEX IF NOT EXISTS idx_worker_runs_worker_id ON worker_runs(worker_id);
CREATE INDEX IF NOT EXISTS idx_worker_runs_started_at ON worker_runs(started_at DESC);
-- Add worker_id to dispensary_crawl_jobs if not exists
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'dispensary_crawl_jobs' AND column_name = 'assigned_worker_id'
) THEN
ALTER TABLE dispensary_crawl_jobs ADD COLUMN assigned_worker_id INTEGER REFERENCES workers(id);
END IF;
END $$;
-- Migrate existing job_schedules workers to new workers table
INSERT INTO workers (name, role, description, enabled, interval_minutes, jitter_minutes, job_type, job_config, last_run_at, last_status, last_error, last_duration_ms, next_run_at)
SELECT
worker_name,
worker_role,
description,
enabled,
base_interval_minutes,
jitter_minutes,
job_name,
job_config,
last_run_at,
last_status,
last_error_message,
last_duration_ms,
next_run_at
FROM job_schedules
WHERE worker_name IS NOT NULL
ON CONFLICT (name) DO UPDATE SET
updated_at = NOW();
-- Available worker roles (reference)
COMMENT ON TABLE workers IS 'Named workers with specific roles and assignments. Roles include:
- product_sync: Crawls products from dispensary menus
- store_discovery: Discovers new dispensary locations
- entry_point_finder: Detects menu providers and resolves platform IDs
- analytics_refresh: Refreshes materialized views and analytics
- price_monitor: Monitors price changes and triggers alerts
- inventory_sync: Syncs inventory levels
- image_processor: Downloads and processes product images
- data_validator: Validates data integrity';