Compare commits

..

3 Commits

Author SHA1 Message Date
Kelly
3f958fbff3 fix(ci): Fix buildx cache_from syntax for array format
Plugin was splitting comma-separated values incorrectly.
Use array format with quoted strings instead.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 21:10:26 -07:00
Kelly
c84ef0396b feat(tasks): Add proxy_test task handler and discovery run tracking
- Add proxy_test task handler that fetches IP via proxy to verify connectivity
- Add discovery_runs migration (083) for tracking store discovery progress
- Register proxy_test in task service and worker

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 21:07:58 -07:00
Kelly
6cd1f55119 fix(workers): Preserve fantasy names on pod restart
- Re-registration no longer overwrites pod_name with K8s name
- New workers get fantasy name (Aethelgard, Xylos, etc.) as pod_name
- Document worker naming convention in CLAUDE.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 20:35:25 -07:00
8 changed files with 172 additions and 21 deletions

View File

@@ -86,8 +86,10 @@ steps:
from_secret: registry_password
platforms: linux/amd64
provenance: false
cache_from: type=registry,ref=code.cannabrands.app/creationshop/dispensary-scraper:cache
cache_to: type=registry,ref=code.cannabrands.app/creationshop/dispensary-scraper:cache,mode=max
cache_from:
- "type=registry,ref=code.cannabrands.app/creationshop/dispensary-scraper:cache"
cache_to:
- "type=registry,ref=code.cannabrands.app/creationshop/dispensary-scraper:cache,mode=max"
build_args:
APP_BUILD_VERSION: ${CI_COMMIT_SHA:0:8}
APP_GIT_SHA: ${CI_COMMIT_SHA}
@@ -114,8 +116,10 @@ steps:
from_secret: registry_password
platforms: linux/amd64
provenance: false
cache_from: type=registry,ref=code.cannabrands.app/creationshop/cannaiq-frontend:cache
cache_to: type=registry,ref=code.cannabrands.app/creationshop/cannaiq-frontend:cache,mode=max
cache_from:
- "type=registry,ref=code.cannabrands.app/creationshop/cannaiq-frontend:cache"
cache_to:
- "type=registry,ref=code.cannabrands.app/creationshop/cannaiq-frontend:cache,mode=max"
depends_on: []
when:
branch: master
@@ -137,8 +141,10 @@ steps:
from_secret: registry_password
platforms: linux/amd64
provenance: false
cache_from: type=registry,ref=code.cannabrands.app/creationshop/findadispo-frontend:cache
cache_to: type=registry,ref=code.cannabrands.app/creationshop/findadispo-frontend:cache,mode=max
cache_from:
- "type=registry,ref=code.cannabrands.app/creationshop/findadispo-frontend:cache"
cache_to:
- "type=registry,ref=code.cannabrands.app/creationshop/findadispo-frontend:cache,mode=max"
depends_on: []
when:
branch: master
@@ -160,8 +166,10 @@ steps:
from_secret: registry_password
platforms: linux/amd64
provenance: false
cache_from: type=registry,ref=code.cannabrands.app/creationshop/findagram-frontend:cache
cache_to: type=registry,ref=code.cannabrands.app/creationshop/findagram-frontend:cache,mode=max
cache_from:
- "type=registry,ref=code.cannabrands.app/creationshop/findagram-frontend:cache"
cache_to:
- "type=registry,ref=code.cannabrands.app/creationshop/findagram-frontend:cache,mode=max"
depends_on: []
when:
branch: master

View File

@@ -939,7 +939,8 @@ export default defineConfig({
20) **Crawler Architecture**
- **Scraper pod (1 replica)**: Runs the Express API server + scheduler.
- **Scraper-worker pods (5 replicas)**: Each worker runs `dist/dutchie-az/services/worker.js`, polling the job queue.
- **Scraper-worker pods (25 replicas)**: Each runs `dist/tasks/task-worker.js`, polling the job queue.
- **Worker naming**: Pods use fantasy names (Aethelgard, Xylos, Kryll, Coriolis, etc.) - see `k8s/scraper-worker.yaml` ConfigMap. Worker IDs: `{PodName}-worker-{n}`
- **Job types**: `menu_detection`, `menu_detection_single`, `dutchie_product_crawl`
- **Job schedules** (managed in `job_schedules` table):
- `dutchie_az_menu_detection`: Runs daily with 60-min jitter

View File

@@ -0,0 +1,88 @@
-- Migration 083: Discovery Run Tracking
-- Tracks progress of store discovery runs step-by-step
-- Main discovery runs table
CREATE TABLE IF NOT EXISTS discovery_runs (
id SERIAL PRIMARY KEY,
platform VARCHAR(50) NOT NULL DEFAULT 'dutchie',
status VARCHAR(20) NOT NULL DEFAULT 'running', -- running, completed, failed
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
finished_at TIMESTAMPTZ,
task_id INTEGER REFERENCES worker_task_queue(id),
-- Totals
states_total INTEGER DEFAULT 0,
states_completed INTEGER DEFAULT 0,
locations_discovered INTEGER DEFAULT 0,
locations_promoted INTEGER DEFAULT 0,
new_store_ids INTEGER[] DEFAULT '{}',
-- Error info
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Per-state progress within a run
CREATE TABLE IF NOT EXISTS discovery_run_states (
id SERIAL PRIMARY KEY,
run_id INTEGER NOT NULL REFERENCES discovery_runs(id) ON DELETE CASCADE,
state_code VARCHAR(2) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending, running, completed, failed
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
-- Results
cities_found INTEGER DEFAULT 0,
locations_found INTEGER DEFAULT 0,
locations_upserted INTEGER DEFAULT 0,
new_dispensary_ids INTEGER[] DEFAULT '{}',
-- Error info
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(run_id, state_code)
);
-- Step-by-step log for detailed progress tracking
CREATE TABLE IF NOT EXISTS discovery_run_steps (
id SERIAL PRIMARY KEY,
run_id INTEGER NOT NULL REFERENCES discovery_runs(id) ON DELETE CASCADE,
state_code VARCHAR(2),
step_name VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'started', -- started, completed, failed
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
finished_at TIMESTAMPTZ,
-- Details (JSON for flexibility)
details JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Indexes for querying
CREATE INDEX IF NOT EXISTS idx_discovery_runs_status ON discovery_runs(status);
CREATE INDEX IF NOT EXISTS idx_discovery_runs_platform ON discovery_runs(platform);
CREATE INDEX IF NOT EXISTS idx_discovery_runs_started_at ON discovery_runs(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_discovery_run_states_run_id ON discovery_run_states(run_id);
CREATE INDEX IF NOT EXISTS idx_discovery_run_steps_run_id ON discovery_run_steps(run_id);
-- View for latest run status per platform
CREATE OR REPLACE VIEW v_latest_discovery_runs AS
SELECT DISTINCT ON (platform)
id,
platform,
status,
started_at,
finished_at,
states_total,
states_completed,
locations_discovered,
locations_promoted,
array_length(new_store_ids, 1) as new_stores_count,
error_message,
EXTRACT(EPOCH FROM (COALESCE(finished_at, NOW()) - started_at)) as duration_seconds
FROM discovery_runs
ORDER BY platform, started_at DESC;

View File

@@ -70,21 +70,20 @@ router.post('/register', async (req: Request, res: Response) => {
);
if (existing.rows.length > 0) {
// Re-activate existing worker
// Re-activate existing worker - keep existing pod_name (fantasy name), don't overwrite with K8s name
const { rows } = await pool.query(`
UPDATE worker_registry
SET status = 'active',
role = $1,
pod_name = $2,
hostname = $3,
ip_address = $4,
hostname = $2,
ip_address = $3,
last_heartbeat_at = NOW(),
started_at = NOW(),
metadata = $5,
metadata = $4,
updated_at = NOW()
WHERE worker_id = $6
RETURNING id, worker_id, friendly_name, role
`, [role, pod_name, finalHostname, clientIp, metadata, finalWorkerId]);
WHERE worker_id = $5
RETURNING id, worker_id, friendly_name, pod_name, role
`, [role, finalHostname, clientIp, metadata, finalWorkerId]);
const worker = rows[0];
const roleMsg = role ? `for ${role}` : 'as role-agnostic';
@@ -105,13 +104,13 @@ router.post('/register', async (req: Request, res: Response) => {
const nameResult = await pool.query('SELECT assign_worker_name($1) as name', [finalWorkerId]);
const friendlyName = nameResult.rows[0].name;
// Register the worker
// Register the worker - use friendlyName as pod_name (not K8s name)
const { rows } = await pool.query(`
INSERT INTO worker_registry (
worker_id, friendly_name, role, pod_name, hostname, ip_address, status, metadata
) VALUES ($1, $2, $3, $4, $5, $6, 'active', $7)
RETURNING id, worker_id, friendly_name, role
`, [finalWorkerId, friendlyName, role, pod_name, finalHostname, clientIp, metadata]);
RETURNING id, worker_id, friendly_name, pod_name, role
`, [finalWorkerId, friendlyName, role, friendlyName, finalHostname, clientIp, metadata]);
const worker = rows[0];
const roleMsg = role ? `for ${role}` : 'as role-agnostic';

View File

@@ -9,3 +9,4 @@ export { handleProductDiscovery } from './product-discovery';
export { handleStoreDiscovery } from './store-discovery';
export { handleEntryPointDiscovery } from './entry-point-discovery';
export { handleAnalyticsRefresh } from './analytics-refresh';
export { handleProxyTest } from './proxy-test';

View File

@@ -0,0 +1,51 @@
/**
* Proxy Test Handler
* Tests proxy connectivity by fetching public IP via ipify
*/
import { TaskContext, TaskResult } from '../task-worker';
import { execSync } from 'child_process';
export async function handleProxyTest(ctx: TaskContext): Promise<TaskResult> {
const { pool } = ctx;
console.log('[ProxyTest] Testing proxy connection...');
try {
// Get active proxy from DB
const proxyResult = await pool.query(`
SELECT host, port, username, password
FROM proxies
WHERE is_active = true
LIMIT 1
`);
if (proxyResult.rows.length === 0) {
return { success: false, error: 'No active proxy configured' };
}
const p = proxyResult.rows[0];
const proxyUrl = p.username
? `http://${p.username}:${p.password}@${p.host}:${p.port}`
: `http://${p.host}:${p.port}`;
console.log(`[ProxyTest] Using proxy: ${p.host}:${p.port}`);
// Fetch IP via proxy
const cmd = `curl -s --proxy '${proxyUrl}' 'https://api.ipify.org?format=json'`;
const output = execSync(cmd, { timeout: 30000 }).toString().trim();
const data = JSON.parse(output);
console.log(`[ProxyTest] Proxy IP: ${data.ip}`);
return {
success: true,
proxyIp: data.ip,
proxyHost: p.host,
proxyPort: p.port,
};
} catch (error: any) {
console.error('[ProxyTest] Error:', error.message);
return { success: false, error: error.message };
}
}

View File

@@ -31,7 +31,8 @@ export type TaskRole =
| 'product_discovery'
| 'payload_fetch' // NEW: Fetches from API, saves to disk
| 'product_refresh' // CHANGED: Now reads from local payload
| 'analytics_refresh';
| 'analytics_refresh'
| 'proxy_test'; // Tests proxy connectivity via ipify
export type TaskStatus =
| 'pending'

View File

@@ -59,6 +59,7 @@ import { handleProductDiscovery } from './handlers/product-discovery';
import { handleStoreDiscovery } from './handlers/store-discovery';
import { handleEntryPointDiscovery } from './handlers/entry-point-discovery';
import { handleAnalyticsRefresh } from './handlers/analytics-refresh';
import { handleProxyTest } from './handlers/proxy-test';
const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000');
const HEARTBEAT_INTERVAL_MS = parseInt(process.env.HEARTBEAT_INTERVAL_MS || '30000');
@@ -133,6 +134,7 @@ const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
store_discovery: handleStoreDiscovery,
entry_point_discovery: handleEntryPointDiscovery,
analytics_refresh: handleAnalyticsRefresh,
proxy_test: handleProxyTest, // Tests proxy via ipify
};
/**