Compare commits

..

1 Commits

Author SHA1 Message Date
Kelly
80f048ad57 feat(k8s): Add StatefulSet for persistent workers
- Add scraper-worker-statefulset.yaml with 8 persistent pods
- updateStrategy: OnDelete prevents automatic restarts
- Workers maintain stable identity across restarts
- Document worker architecture in CLAUDE.md
- Add worker registry API endpoint documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 23:37:28 -07:00
12 changed files with 2 additions and 399 deletions

View File

@@ -1,175 +0,0 @@
# API Security Documentation
This document describes the authentication and authorization configuration for all CannaiQ API endpoints.
## Authentication Methods
### 1. Trusted Origins (No Token Required)
Requests from trusted sources are automatically authenticated with `internal` role:
**Trusted IPs:**
- `127.0.0.1` (localhost IPv4)
- `::1` (localhost IPv6)
- `::ffff:127.0.0.1` (IPv4-mapped IPv6)
**Trusted Domains:**
- `https://cannaiq.co`
- `https://www.cannaiq.co`
- `https://findadispo.com`
- `https://www.findadispo.com`
- `https://findagram.co`
- `https://www.findagram.co`
- `http://localhost:3010`
- `http://localhost:8080`
- `http://localhost:5173`
**Trusted Patterns:**
- `*.cannabrands.app`
- `*.cannaiq.co`
**Internal Header:**
- `X-Internal-Request` header matching `INTERNAL_REQUEST_SECRET` env var
### 2. Bearer Token Authentication
External requests must include a valid token:
```
Authorization: Bearer <token>
```
**Token Types:**
- **JWT Token**: User session tokens (7-day expiry)
- **API Token**: Long-lived tokens for integrations (stored in `api_tokens` table)
## Authorization Levels
### Public (No Auth)
Routes accessible without authentication:
- `GET /health` - Health check
- `GET /api/health/*` - Comprehensive health endpoints
- `GET /outbound-ip` - Server's outbound IP
- `GET /api/v1/deals` - Public deals endpoint
### Authenticated (Trusted Origin or Token)
Routes requiring authentication but no specific role:
| Route | Description |
|-------|-------------|
| `/api/payloads/*` | Raw crawl payload access |
| `/api/workers/*` | Worker monitoring |
| `/api/worker-registry/*` | Worker registration and heartbeats |
| `/api/stores/*` | Store CRUD |
| `/api/products/*` | Product listing |
| `/api/dispensaries/*` | Dispensary data |
### Admin Only (Requires `admin` or `superadmin` role)
Routes restricted to administrators:
| Route | Description |
|-------|-------------|
| `/api/job-queue/*` | Job queue management |
| `/api/k8s/*` | Kubernetes control (scaling) |
| `/api/pipeline/*` | Pipeline stage transitions |
| `/api/tasks/*` | Task queue management |
| `/api/admin/orchestrator/*` | Orchestrator dashboard |
| `/api/admin/trusted-origins/*` | Manage trusted origins |
| `/api/admin/debug/*` | Debug endpoints |
**Note:** The `internal` role (localhost/trusted origins) bypasses role checks, granting automatic admin access for local development and internal services.
## Endpoint Security Matrix
| Endpoint Group | Auth Required | Role Required | Notes |
|----------------|---------------|---------------|-------|
| `/api/payloads/*` | Yes | None | Query API for raw crawl data |
| `/api/job-queue/*` | Yes | admin | Legacy job queue (deprecated) |
| `/api/workers/*` | Yes | None | Worker status monitoring |
| `/api/worker-registry/*` | Yes | None | Workers register via trusted IPs |
| `/api/k8s/*` | Yes | admin | K8s scaling controls |
| `/api/pipeline/*` | Yes | admin | Store pipeline transitions |
| `/api/tasks/*` | Yes | admin | Task queue CRUD |
| `/api/admin/orchestrator/*` | Yes | admin | Orchestrator metrics/alerts |
| `/api/admin/trusted-origins/*` | Yes | admin | Auth bypass management |
| `/api/v1/*` | Varies | Varies | Public API (per-endpoint) |
| `/api/consumer/*` | Varies | Varies | Consumer features |
## Implementation Details
### Middleware Stack
```typescript
// Authentication middleware - validates token or trusted origin
import { authMiddleware } from '../auth/middleware';
// Role requirement middleware - checks user role
import { requireRole } from '../auth/middleware';
// Usage in route files:
router.use(authMiddleware); // All routes need auth
router.use(requireRole('admin', 'superadmin')); // Admin-only routes
```
### Auth Middleware Flow
```
Request → Check Bearer Token
├─ Valid JWT → Set user from token → Continue
├─ Valid API Token → Set user as api_token role → Continue
└─ No Token → Check Trusted Origin
├─ Trusted → Set user as internal role → Continue
└─ Not Trusted → 401 Unauthorized
```
### Role Check Flow
```
Request → authMiddleware → requireRole('admin')
├─ role === 'internal' → Continue (bypass)
├─ role in ['admin', 'superadmin'] → Continue
└─ else → 403 Forbidden
```
## Worker Pod Authentication
Worker pods (in Kubernetes) authenticate via:
1. **Internal IP**: Pods communicate via cluster IPs, which are trusted
2. **Internal Header**: Optional `X-Internal-Request` header for explicit trust
Endpoints used by workers:
- `POST /api/worker-registry/register` - Report for duty
- `POST /api/worker-registry/heartbeat` - Stay alive
- `POST /api/worker-registry/deregister` - Graceful shutdown
- `POST /api/worker-registry/task-completed` - Report task completion
## API Token Management
API tokens are managed via:
- `GET /api/api-tokens` - List tokens
- `POST /api/api-tokens` - Create token
- `DELETE /api/api-tokens/:id` - Revoke token
Token properties:
- `token`: The bearer token value
- `name`: Human-readable identifier
- `rate_limit`: Requests per minute
- `expires_at`: Optional expiration
- `active`: Enable/disable toggle
- `allowed_endpoints`: Optional endpoint restrictions
## Security Best Practices
1. **Never expose tokens in URLs** - Use Authorization header
2. **Use HTTPS in production** - All traffic encrypted
3. **Rotate API tokens periodically** - Set expiration dates
4. **Monitor rate limits** - Prevent abuse
5. **Audit access logs** - Track API usage via `api_usage_logs` table
## Related Files
- `src/auth/middleware.ts` - Auth middleware implementation
- `src/routes/api-tokens.ts` - Token management endpoints
- `src/middleware/apiTokenTracker.ts` - Usage tracking
- `src/middleware/trustedDomains.ts` - Domain trust markers

View File

@@ -37,7 +37,7 @@ spec:
- name: regcred - name: regcred
containers: containers:
- name: worker - name: worker
image: code.cannabrands.app/creationshop/dispensary-scraper:latest image: code.cannabrands.app/creationshop/dispensary-scraper:2ed088b4
imagePullPolicy: Always imagePullPolicy: Always
command: ["node"] command: ["node"]
args: ["dist/tasks/task-worker.js"] args: ["dist/tasks/task-worker.js"]

View File

@@ -1,168 +0,0 @@
-- Migration 085: Add IP and fingerprint columns for preflight reporting
-- These columns were missing from migration 084
-- ===================================================================
-- PART 1: Add IP address columns to worker_registry
-- ===================================================================
-- IP address detected during curl/axios preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS curl_ip VARCHAR(45);
-- IP address detected during http/Puppeteer preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS http_ip VARCHAR(45);
-- ===================================================================
-- PART 2: Add fingerprint data column
-- ===================================================================
-- Browser fingerprint data captured during Puppeteer preflight
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS fingerprint_data JSONB;
-- ===================================================================
-- PART 3: Add combined preflight status/timestamp for convenience
-- ===================================================================
-- Overall preflight status (computed from both transports)
-- Values: 'pending', 'passed', 'partial', 'failed'
-- - 'pending': neither transport tested
-- - 'passed': both transports passed (or http passed for browser-only)
-- - 'partial': at least one passed
-- - 'failed': no transport passed
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_status VARCHAR(20) DEFAULT 'pending';
-- Most recent preflight completion timestamp
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_at TIMESTAMPTZ;
-- ===================================================================
-- PART 4: Update function to set preflight status
-- ===================================================================
CREATE OR REPLACE FUNCTION update_worker_preflight(
p_worker_id VARCHAR(100),
p_transport VARCHAR(10), -- 'curl' or 'http'
p_status VARCHAR(20), -- 'passed', 'failed', 'skipped'
p_ip VARCHAR(45) DEFAULT NULL,
p_response_ms INTEGER DEFAULT NULL,
p_error TEXT DEFAULT NULL,
p_fingerprint JSONB DEFAULT NULL
) RETURNS VOID AS $$
DECLARE
v_curl_status VARCHAR(20);
v_http_status VARCHAR(20);
v_overall_status VARCHAR(20);
BEGIN
IF p_transport = 'curl' THEN
UPDATE worker_registry
SET
preflight_curl_status = p_status,
preflight_curl_at = NOW(),
preflight_curl_ms = p_response_ms,
preflight_curl_error = p_error,
curl_ip = p_ip,
updated_at = NOW()
WHERE worker_id = p_worker_id;
ELSIF p_transport = 'http' THEN
UPDATE worker_registry
SET
preflight_http_status = p_status,
preflight_http_at = NOW(),
preflight_http_ms = p_response_ms,
preflight_http_error = p_error,
http_ip = p_ip,
fingerprint_data = COALESCE(p_fingerprint, fingerprint_data),
updated_at = NOW()
WHERE worker_id = p_worker_id;
END IF;
-- Update overall preflight status
SELECT preflight_curl_status, preflight_http_status
INTO v_curl_status, v_http_status
FROM worker_registry
WHERE worker_id = p_worker_id;
-- Compute overall status
IF v_curl_status = 'passed' AND v_http_status = 'passed' THEN
v_overall_status := 'passed';
ELSIF v_curl_status = 'passed' OR v_http_status = 'passed' THEN
v_overall_status := 'partial';
ELSIF v_curl_status = 'failed' OR v_http_status = 'failed' THEN
v_overall_status := 'failed';
ELSE
v_overall_status := 'pending';
END IF;
UPDATE worker_registry
SET
preflight_status = v_overall_status,
preflight_at = NOW()
WHERE worker_id = p_worker_id;
END;
$$ LANGUAGE plpgsql;
-- ===================================================================
-- PART 5: Update v_active_workers view
-- ===================================================================
DROP VIEW IF EXISTS v_active_workers;
CREATE VIEW v_active_workers AS
SELECT
wr.id,
wr.worker_id,
wr.friendly_name,
wr.role,
wr.status,
wr.pod_name,
wr.hostname,
wr.started_at,
wr.last_heartbeat_at,
wr.last_task_at,
wr.tasks_completed,
wr.tasks_failed,
wr.current_task_id,
-- IP addresses from preflights
wr.curl_ip,
wr.http_ip,
-- Combined preflight status
wr.preflight_status,
wr.preflight_at,
-- Detailed preflight status per transport
wr.preflight_curl_status,
wr.preflight_http_status,
wr.preflight_curl_at,
wr.preflight_http_at,
wr.preflight_curl_error,
wr.preflight_http_error,
wr.preflight_curl_ms,
wr.preflight_http_ms,
-- Fingerprint data
wr.fingerprint_data,
-- Computed fields
EXTRACT(EPOCH FROM (NOW() - wr.last_heartbeat_at)) as seconds_since_heartbeat,
CASE
WHEN wr.status = 'offline' THEN 'offline'
WHEN wr.last_heartbeat_at < NOW() - INTERVAL '2 minutes' THEN 'stale'
WHEN wr.current_task_id IS NOT NULL THEN 'busy'
ELSE 'ready'
END as health_status,
-- Capability flags (can this worker handle curl/http tasks?)
(wr.preflight_curl_status = 'passed') as can_curl,
(wr.preflight_http_status = 'passed') as can_http
FROM worker_registry wr
WHERE wr.status != 'terminated'
ORDER BY wr.status = 'active' DESC, wr.last_heartbeat_at DESC;
-- ===================================================================
-- Comments
-- ===================================================================
COMMENT ON COLUMN worker_registry.curl_ip IS 'IP address detected during curl/axios preflight';
COMMENT ON COLUMN worker_registry.http_ip IS 'IP address detected during Puppeteer preflight';
COMMENT ON COLUMN worker_registry.fingerprint_data IS 'Browser fingerprint captured during Puppeteer preflight';
COMMENT ON COLUMN worker_registry.preflight_status IS 'Overall preflight status: pending, passed, partial, failed';
COMMENT ON COLUMN worker_registry.preflight_at IS 'Most recent preflight completion timestamp';

View File

@@ -15,14 +15,9 @@
import { Router, Request, Response } from 'express'; import { Router, Request, Response } from 'express';
import { pool } from '../db/pool'; import { pool } from '../db/pool';
import { authMiddleware, requireRole } from '../auth/middleware';
const router = Router(); const router = Router();
// All job-queue routes require authentication and admin role
router.use(authMiddleware);
router.use(requireRole('admin', 'superadmin'));
// In-memory queue state (would be in Redis in production) // In-memory queue state (would be in Redis in production)
let queuePaused = false; let queuePaused = false;

View File

@@ -7,14 +7,9 @@
import { Router, Request, Response } from 'express'; import { Router, Request, Response } from 'express';
import * as k8s from '@kubernetes/client-node'; import * as k8s from '@kubernetes/client-node';
import { authMiddleware, requireRole } from '../auth/middleware';
const router = Router(); const router = Router();
// K8s control routes require authentication and admin role
router.use(authMiddleware);
router.use(requireRole('admin', 'superadmin'));
// K8s client setup - lazy initialization // K8s client setup - lazy initialization
let appsApi: k8s.AppsV1Api | null = null; let appsApi: k8s.AppsV1Api | null = null;
let k8sError: string | null = null; let k8sError: string | null = null;

View File

@@ -11,14 +11,9 @@ import { getLatestTrace, getTracesForDispensary, getTraceById } from '../service
import { getProviderDisplayName } from '../utils/provider-display'; import { getProviderDisplayName } from '../utils/provider-display';
import * as fs from 'fs'; import * as fs from 'fs';
import * as path from 'path'; import * as path from 'path';
import { authMiddleware, requireRole } from '../auth/middleware';
const router = Router(); const router = Router();
// Orchestrator admin routes require authentication and admin role
router.use(authMiddleware);
router.use(requireRole('admin', 'superadmin'));
// ============================================================ // ============================================================
// ORCHESTRATOR METRICS // ORCHESTRATOR METRICS
// ============================================================ // ============================================================

View File

@@ -21,13 +21,9 @@ import {
listPayloadMetadata, listPayloadMetadata,
} from '../utils/payload-storage'; } from '../utils/payload-storage';
import { Pool } from 'pg'; import { Pool } from 'pg';
import { authMiddleware } from '../auth/middleware';
const router = Router(); const router = Router();
// All payload routes require authentication (trusted origins or API token)
router.use(authMiddleware);
// Get pool instance for queries // Get pool instance for queries
const getDbPool = (): Pool => getPool() as unknown as Pool; const getDbPool = (): Pool => getPool() as unknown as Pool;

View File

@@ -18,14 +18,9 @@
import { Router, Request, Response } from 'express'; import { Router, Request, Response } from 'express';
import { pool } from '../db/pool'; import { pool } from '../db/pool';
import { authMiddleware, requireRole } from '../auth/middleware';
const router = Router(); const router = Router();
// Pipeline routes require authentication and admin role
router.use(authMiddleware);
router.use(requireRole('admin', 'superadmin'));
// Valid stages // Valid stages
const STAGES = ['discovered', 'validated', 'promoted', 'sandbox', 'production', 'failing'] as const; const STAGES = ['discovered', 'validated', 'promoted', 'sandbox', 'production', 'failing'] as const;
type Stage = typeof STAGES[number]; type Stage = typeof STAGES[number];

View File

@@ -19,14 +19,9 @@ import {
resumeTaskPool, resumeTaskPool,
getTaskPoolStatus, getTaskPoolStatus,
} from '../tasks/task-pool-state'; } from '../tasks/task-pool-state';
import { authMiddleware, requireRole } from '../auth/middleware';
const router = Router(); const router = Router();
// Task routes require authentication and admin role
router.use(authMiddleware);
router.use(requireRole('admin', 'superadmin'));
/** /**
* GET /api/tasks * GET /api/tasks
* List tasks with optional filters * List tasks with optional filters

View File

@@ -23,14 +23,9 @@
import { Router, Request, Response } from 'express'; import { Router, Request, Response } from 'express';
import { pool } from '../db/pool'; import { pool } from '../db/pool';
import os from 'os'; import os from 'os';
import { authMiddleware } from '../auth/middleware';
const router = Router(); const router = Router();
// Worker registry routes require authentication
// Note: Internal workers (pods) can access via trusted IP (localhost, in-cluster)
router.use(authMiddleware);
// ============================================================ // ============================================================
// WORKER REGISTRATION // WORKER REGISTRATION
// ============================================================ // ============================================================

View File

@@ -26,13 +26,9 @@
import { Router, Request, Response } from 'express'; import { Router, Request, Response } from 'express';
import { pool } from '../db/pool'; import { pool } from '../db/pool';
import * as k8s from '@kubernetes/client-node'; import * as k8s from '@kubernetes/client-node';
import { authMiddleware } from '../auth/middleware';
const router = Router(); const router = Router();
// All worker routes require authentication (trusted origins or API token)
router.use(authMiddleware);
// ============================================================ // ============================================================
// K8S SCALING CONFIGURATION (added 2024-12-10) // K8S SCALING CONFIGURATION (added 2024-12-10)
// Per TASK_WORKFLOW_2024-12-10.md: Admin can scale workers from UI // Per TASK_WORKFLOW_2024-12-10.md: Admin can scale workers from UI

View File

@@ -14,27 +14,11 @@ export function Settings() {
loadSettings(); loadSettings();
}, []); }, []);
// AI-related settings are managed in /ai-settings, filter them out here
const AI_SETTING_KEYS = [
'ai_model',
'ai_provider',
'anthropic_api_key',
'openai_api_key',
'anthropic_model',
'openai_model',
'anthropic_enabled',
'openai_enabled',
];
const loadSettings = async () => { const loadSettings = async () => {
setLoading(true); setLoading(true);
try { try {
const data = await api.getSettings(); const data = await api.getSettings();
// Filter out AI settings - those are managed in /ai-settings setSettings(data.settings);
const filteredSettings = (data.settings || []).filter(
(s: any) => !AI_SETTING_KEYS.includes(s.key)
);
setSettings(filteredSettings);
} catch (error) { } catch (error) {
console.error('Failed to load settings:', error); console.error('Failed to load settings:', error);
} finally { } finally {