Compare commits

...

5 Commits

Author SHA1 Message Date
Kelly
822d2b0609 feat: Idempotent entry_point_discovery with bulk endpoint
- Track id_resolution_status, attempts, and errors in handler
- Add POST /api/tasks/batch/entry-point-discovery endpoint
- Skip already-resolved stores, retry failed with force flag
2025-12-12 20:27:36 -07:00
Kelly
dfd36dacf8 fix: Show next run time correctly for schedules 2025-12-12 20:21:15 -07:00
Kelly
4ea7139ed5 feat: Add step reporting to all task handlers
Added updateStep() calls to:
- payload-fetch-curl: loading → preflight → fetching → saving
- product-refresh: loading → normalizing → upserting
- store-discovery-http: starting → preflight → navigating → fetching

This enables real-time visibility of worker progress in the dashboard.
2025-12-12 20:14:00 -07:00
Kelly
63023a4061 feat: Worker improvements and Run Now duplicate prevention
- Fix Run Now to prevent duplicate task creation
- Add loading state to Run Now button in UI
- Return early when no stores need refresh
- Worker dashboard improvements
- Browser pooling architecture updates
- K8s worker config updates (8 replicas, 3 concurrent tasks)
2025-12-12 20:11:31 -07:00
Kelly
c98c409f59 feat: Add MinIO/S3 support for payload storage
- Update payload-storage.ts to use MinIO when configured
- Payloads stored at: cannaiq/payloads/{year}/{month}/{day}/store_{id}_{ts}.json.gz
- Falls back to local filesystem when MINIO_* env vars not set
- Enables shared storage across all worker pods
- Fixes ephemeral storage issue where payloads were lost on pod restart

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-12 11:30:57 -07:00
16 changed files with 1067 additions and 281 deletions

View File

@@ -25,13 +25,26 @@ Never import `src/db/migrate.ts` at runtime. Use `src/db/pool.ts` for DB access.
- **Worker** = Concurrent task runner INSIDE a pod (controlled by `MAX_CONCURRENT_TASKS` env var)
- Formula: `8 pods × MAX_CONCURRENT_TASKS = total concurrent workers`
**To increase workers:** Change `MAX_CONCURRENT_TASKS` env var, NOT replicas.
```bash
# CORRECT - increase workers per pod
kubectl set env deployment/scraper-worker -n dispensary-scraper MAX_CONCURRENT_TASKS=5
**Browser Task Memory Limits:**
- Each Puppeteer/Chrome browser uses ~400 MB RAM
- Pod memory limit is 2 GB
- **MAX_CONCURRENT_TASKS=3** is the safe maximum for browser tasks
- More than 3 concurrent browsers per pod = OOM crash
# WRONG - never scale above 8 replicas
kubectl scale deployment/scraper-worker --replicas=20 # NEVER DO THIS
| Browsers | RAM Used | Status |
|----------|----------|--------|
| 3 | ~1.3 GB | Safe (recommended) |
| 4 | ~1.7 GB | Risky |
| 5+ | >2 GB | OOM crash |
**To increase throughput:** Add more pods (up to 8), NOT more concurrent tasks per pod.
```bash
# CORRECT - scale pods (up to 8)
kubectl scale deployment/scraper-worker -n dispensary-scraper --replicas=8
# WRONG - will cause OOM crashes
kubectl set env deployment/scraper-worker -n dispensary-scraper MAX_CONCURRENT_TASKS=10
```
**If K8s API returns ServiceUnavailable:** STOP IMMEDIATELY. Do not retry. The cluster is overloaded.

View File

@@ -504,6 +504,103 @@ The Workers Dashboard shows:
| `src/routes/worker-registry.ts:148-195` | Heartbeat endpoint handling |
| `cannaiq/src/pages/WorkersDashboard.tsx:233-305` | UI components for resources |
## Browser Task Memory Limits (Updated 2025-12)
Browser-based tasks (Puppeteer/Chrome) have strict memory constraints that limit concurrency.
### Why Browser Tasks Are Different
Each browser task launches a Chrome process. Unlike I/O-bound API calls, browsers consume significant RAM:
| Component | RAM Usage |
|-----------|-----------|
| Node.js runtime | ~150 MB |
| Chrome browser (base) | ~200-250 MB |
| Dutchie menu page (loaded) | ~100-150 MB |
| **Per browser total** | **~350-450 MB** |
### Memory Math for Pod Limits
```
Pod memory limit: 2 GB (2000 MB)
Node.js runtime: -150 MB
Safety buffer: -100 MB
────────────────────────────────
Available for browsers: 1750 MB
Per browser + page: ~400 MB
Max browsers: 1750 ÷ 400 = ~4 browsers
Recommended: 3 browsers (leaves headroom for spikes)
```
### MAX_CONCURRENT_TASKS for Browser Tasks
| Browsers per Pod | RAM Used | Risk Level |
|------------------|----------|------------|
| 1 | ~500 MB | Very safe |
| 2 | ~900 MB | Safe |
| **3** | **~1.3 GB** | **Recommended** |
| 4 | ~1.7 GB | Tight (may OOM) |
| 5+ | >2 GB | Will OOM crash |
**CRITICAL**: `MAX_CONCURRENT_TASKS=3` is the maximum safe value for browser tasks with current pod limits.
### Scaling Strategy
Scale **horizontally** (more pods) rather than vertically (more concurrency per pod):
```
┌─────────────────────────────────────────────────────────────────────────┐
│ Cluster: 8 pods × 3 browsers = 24 concurrent tasks │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Pod 0 │ │ Pod 1 │ │ Pod 2 │ │ Pod 3 │ │
│ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Pod 4 │ │ Pod 5 │ │ Pod 6 │ │ Pod 7 │ │
│ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │ 3 browsers │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
```
### Browser Lifecycle Per Task
Each task gets a fresh browser with fresh IP/identity:
```
1. Claim task from queue
2. Get fresh proxy from pool
3. Launch browser with proxy
4. Run preflight (verify IP)
5. Execute scrape
6. Close browser
7. Repeat
```
This ensures:
- Fresh IP per task (proxy rotation)
- Fresh fingerprint per task (UA rotation)
- No cookie/session bleed between tasks
- Predictable memory usage
### Increasing Capacity
To handle more concurrent tasks:
1. **Add more pods** (up to 8 per CLAUDE.md limit)
2. **Increase pod memory** (allows 4 browsers per pod):
```yaml
resources:
limits:
memory: "2.5Gi" # from 2Gi
```
**DO NOT** simply increase `MAX_CONCURRENT_TASKS` without also increasing pod memory limits.
## Monitoring
### Logs

View File

@@ -526,14 +526,17 @@ router.delete('/schedules/:id', async (req: Request, res: Response) => {
/**
* POST /api/tasks/schedules/:id/run-now
* Manually trigger a scheduled task to run immediately
*
* For product_discovery schedules with state_code, this creates individual
* tasks for each store in that state (fans out properly).
*/
router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
// Get the schedule
// Get the full schedule
const scheduleResult = await pool.query(`
SELECT id, name, role, state_code, platform, priority
SELECT id, name, role, state_code, platform, priority, interval_hours, method
FROM task_schedules WHERE id = $1
`, [scheduleId]);
@@ -542,27 +545,80 @@ router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
}
const schedule = scheduleResult.rows[0];
let tasksCreated = 0;
// Create a task based on the schedule
const task = await taskService.createTask({
role: schedule.role,
platform: schedule.platform,
priority: schedule.priority + 10, // Boost priority for manual runs
});
// For product_discovery with state_code, fan out to individual stores
if (schedule.role === 'product_discovery' && schedule.state_code) {
// Find stores in this state needing refresh
const storeResult = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND s.code = $1
-- No pending/running product_discovery task already
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role = 'product_discovery'
AND t.status IN ('pending', 'claimed', 'running')
)
ORDER BY d.last_fetch_at NULLS FIRST, d.id
`, [schedule.state_code]);
const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length > 0) {
// Create staggered tasks for all stores
const result = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery',
15, // 15 seconds stagger
schedule.platform || 'dutchie',
schedule.method || 'http'
);
tasksCreated = result.created;
} else {
// No stores need refresh - return early with message
return res.json({
success: true,
message: `No ${schedule.state_code} stores need refresh at this time`,
tasksCreated: 0,
stateCode: schedule.state_code,
});
}
} else if (schedule.role !== 'product_discovery') {
// For other schedules (store_discovery, analytics_refresh), create a single task
await taskService.createTask({
role: schedule.role,
platform: schedule.platform,
priority: schedule.priority + 10,
method: schedule.method,
});
tasksCreated = 1;
} else {
// product_discovery without state_code - shouldn't happen, reject
return res.status(400).json({
error: 'product_discovery schedules require a state_code',
});
}
// Update last_run_at on the schedule
await pool.query(`
UPDATE task_schedules
SET last_run_at = NOW(),
next_run_at = NOW() + (interval_hours || ' hours')::interval,
last_task_count = $2,
updated_at = NOW()
WHERE id = $1
`, [scheduleId]);
`, [scheduleId, tasksCreated]);
res.json({
success: true,
message: `Schedule "${schedule.name}" triggered`,
task,
tasksCreated,
stateCode: schedule.state_code,
});
} catch (error: unknown) {
console.error('Error running schedule:', error);
@@ -1187,6 +1243,225 @@ router.post('/batch/az-stores', async (req: Request, res: Response) => {
}
});
/**
* POST /api/tasks/batch/entry-point-discovery
* Create entry_point_discovery tasks for stores missing platform_dispensary_id
*
* This is idempotent - stores that already have platform_dispensary_id are skipped.
* Only creates tasks for stores with menu_url set and crawl_enabled = true.
*
* Body (optional):
* - state_code: string (optional) - Filter by state code
* - stagger_seconds: number (default: 5) - Seconds between tasks
* - force: boolean (default: false) - Re-run even for previously failed stores
*/
router.post('/batch/entry-point-discovery', async (req: Request, res: Response) => {
try {
const {
state_code,
stagger_seconds = 5,
force = false,
} = req.body;
// Find stores that need entry point discovery
const storeResult = await pool.query(`
SELECT d.id, d.name, d.menu_url
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE d.crawl_enabled = true
AND d.menu_url IS NOT NULL
AND d.platform_dispensary_id IS NULL
${state_code ? 'AND s.code = $1' : ''}
${!force ? "AND (d.id_resolution_status IS NULL OR d.id_resolution_status = 'pending')" : ''}
-- No pending/running entry_point_discovery task already
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role = 'entry_point_discovery'
AND t.status IN ('pending', 'claimed', 'running')
)
ORDER BY d.id
`, state_code ? [state_code.toUpperCase()] : []);
const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length === 0) {
return res.json({
success: true,
message: state_code
? `No ${state_code.toUpperCase()} stores need entry point discovery`
: 'No stores need entry point discovery',
tasks_created: 0,
});
}
// Create staggered tasks
const taskIds: number[] = [];
for (let i = 0; i < dispensaryIds.length; i++) {
const scheduledFor = new Date(Date.now() + i * stagger_seconds * 1000);
const result = await pool.query(`
INSERT INTO worker_tasks (role, dispensary_id, priority, scheduled_for, method)
VALUES ('entry_point_discovery', $1, 10, $2, 'http')
RETURNING id
`, [dispensaryIds[i], scheduledFor]);
taskIds.push(result.rows[0].id);
}
const totalDuration = dispensaryIds.length * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.json({
success: true,
tasks_created: taskIds.length,
task_ids: taskIds,
stores: storeResult.rows.map((r: { id: number; name: string }) => ({ id: r.id, name: r.name })),
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${taskIds.length} entry_point_discovery tasks${state_code ? ` for ${state_code.toUpperCase()}` : ''}`,
});
} catch (error: unknown) {
console.error('Error creating entry point discovery tasks:', error);
res.status(500).json({ error: 'Failed to create entry point discovery tasks' });
}
});
// ============================================================
// STATE-BASED CRAWL ENDPOINTS
// ============================================================
/**
* POST /api/tasks/crawl-state/:stateCode
* Create product_discovery tasks for all stores in a state
*
* This is the primary endpoint for triggering crawls by state.
* Creates staggered tasks for all crawl-enabled stores in the specified state.
*
* Params:
* - stateCode: State code (e.g., 'AZ', 'CA', 'CO')
*
* Body (optional):
* - stagger_seconds: number (default: 15) - Seconds between each task
* - priority: number (default: 10) - Task priority
* - method: 'curl' | 'http' | null (default: 'http')
*
* Returns:
* - tasks_created: Number of tasks created
* - stores_in_state: Total stores found for the state
* - skipped: Number skipped (already have active tasks)
*/
router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => {
try {
const stateCode = req.params.stateCode.toUpperCase();
const {
stagger_seconds = 15,
priority = 10,
method = 'http',
} = req.body;
// Verify state exists
const stateResult = await pool.query(`
SELECT id, code, name FROM states WHERE code = $1
`, [stateCode]);
if (stateResult.rows.length === 0) {
return res.status(404).json({
error: 'State not found',
state_code: stateCode,
});
}
const state = stateResult.rows[0];
// Get all crawl-enabled dispensaries in this state
const dispensariesResult = await pool.query(`
SELECT d.id, d.name
FROM dispensaries d
WHERE d.state_id = $1
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
ORDER BY d.last_fetch_at NULLS FIRST, d.id
`, [state.id]);
if (dispensariesResult.rows.length === 0) {
return res.status(200).json({
success: true,
message: `No crawl-enabled stores found in ${state.name}`,
state_code: stateCode,
state_name: state.name,
tasks_created: 0,
stores_in_state: 0,
});
}
const dispensaryIds = dispensariesResult.rows.map((d: { id: number }) => d.id);
// Create staggered tasks
const result = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery',
stagger_seconds,
'dutchie',
method
);
const totalDuration = (result.created - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
state_code: stateCode,
state_name: state.name,
tasks_created: result.created,
stores_in_state: dispensariesResult.rows.length,
skipped: dispensariesResult.rows.length - result.created,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.created} product_discovery tasks for ${state.name} (${stagger_seconds}s apart, ~${Math.ceil(totalDuration / 60)} min total)`,
});
} catch (error: unknown) {
console.error('Error creating state crawl tasks:', error);
res.status(500).json({ error: 'Failed to create state crawl tasks' });
}
});
/**
* GET /api/tasks/states
* List all states with their store counts and crawl status
*/
router.get('/states', async (_req: Request, res: Response) => {
try {
const result = await pool.query(`
SELECT
s.code,
s.name,
COUNT(d.id)::int as total_stores,
COUNT(d.id) FILTER (WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NOT NULL)::int as crawl_enabled_stores,
COUNT(d.id) FILTER (WHERE d.crawl_enabled = true AND d.platform_dispensary_id IS NULL)::int as missing_platform_id,
MAX(d.last_fetch_at) as last_crawl_at,
(SELECT COUNT(*) FROM worker_tasks t
JOIN dispensaries d2 ON t.dispensary_id = d2.id
WHERE d2.state_id = s.id
AND t.role = 'product_discovery'
AND t.status IN ('pending', 'claimed', 'running'))::int as active_tasks
FROM states s
LEFT JOIN dispensaries d ON d.state_id = s.id
GROUP BY s.id, s.code, s.name
HAVING COUNT(d.id) > 0
ORDER BY COUNT(d.id) DESC
`);
res.json({
states: result.rows,
total_states: result.rows.length,
});
} catch (error: unknown) {
console.error('Error listing states:', error);
res.status(500).json({ error: 'Failed to list states' });
}
});
// ============================================================
// TASK POOL MANAGEMENT
// ============================================================

View File

@@ -155,7 +155,12 @@ router.post('/heartbeat', async (req: Request, res: Response) => {
active_task_count,
max_concurrent_tasks,
status = 'active',
resources
resources,
// Step tracking fields
current_step,
current_step_detail,
current_step_started_at,
task_steps,
} = req.body;
if (!worker_id) {
@@ -168,6 +173,11 @@ router.post('/heartbeat', async (req: Request, res: Response) => {
if (current_task_ids) metadata.current_task_ids = current_task_ids;
if (active_task_count !== undefined) metadata.active_task_count = active_task_count;
if (max_concurrent_tasks !== undefined) metadata.max_concurrent_tasks = max_concurrent_tasks;
// Step tracking
if (current_step) metadata.current_step = current_step;
if (current_step_detail) metadata.current_step_detail = current_step_detail;
if (current_step_started_at) metadata.current_step_started_at = current_step_started_at;
if (task_steps) metadata.task_steps = task_steps;
// Store resources in metadata jsonb column
const { rows } = await pool.query(`

View File

@@ -41,9 +41,16 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
const dispensary = dispResult.rows[0];
// If already has platform_dispensary_id, we're done
// If already has platform_dispensary_id, we're done (idempotent)
if (dispensary.platform_dispensary_id) {
console.log(`[EntryPointDiscovery] Dispensary ${dispensaryId} already has platform ID: ${dispensary.platform_dispensary_id}`);
// Update last_id_resolution_at to show we checked it
await pool.query(`
UPDATE dispensaries
SET last_id_resolution_at = NOW(),
id_resolution_status = 'resolved'
WHERE id = $1
`, [dispensaryId]);
return {
success: true,
alreadyResolved: true,
@@ -51,6 +58,15 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
};
}
// Increment attempt counter
await pool.query(`
UPDATE dispensaries
SET id_resolution_attempts = COALESCE(id_resolution_attempts, 0) + 1,
last_id_resolution_at = NOW(),
id_resolution_status = 'pending'
WHERE id = $1
`, [dispensaryId]);
const menuUrl = dispensary.menu_url;
if (!menuUrl) {
return { success: false, error: `Dispensary ${dispensaryId} has no menu_url` };
@@ -114,7 +130,7 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
console.log(`[EntryPointDiscovery] Failed to resolve ${slug}: ${reason}`);
// Mark as failed resolution but keep menu_type as dutchie
// Mark as failed resolution
await pool.query(`
UPDATE dispensaries
SET
@@ -123,9 +139,11 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
WHEN $2 = 403 THEN 'blocked'
ELSE 'dutchie'
END,
id_resolution_status = 'failed',
id_resolution_error = $3,
updated_at = NOW()
WHERE id = $1
`, [dispensaryId, result.httpStatus || 0]);
`, [dispensaryId, result.httpStatus || 0, reason]);
return {
success: false,
@@ -149,6 +167,8 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
platform_dispensary_id = $2,
menu_type = 'dutchie',
crawl_enabled = true,
id_resolution_status = 'resolved',
id_resolution_error = NULL,
updated_at = NOW()
WHERE id = $1
`, [dispensaryId, platformId]);

View File

@@ -28,7 +28,7 @@ import { saveRawPayload } from '../../utils/payload-storage';
import { taskService } from '../task-service';
export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult> {
const { pool, task } = ctx;
const { pool, task, updateStep } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
@@ -39,6 +39,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
updateStep('loading', 'Loading dispensary info');
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
@@ -67,6 +68,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 2: Start stealth session
// ============================================================
updateStep('preflight', 'Starting stealth session');
const session = startSession();
console.log(`[PayloadFetch] Session started: ${session.sessionId}`);
@@ -75,6 +77,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// ============================================================
// STEP 3: Fetch products via GraphQL (Status: 'All')
// ============================================================
updateStep('fetching', 'Executing GraphQL query');
const allProducts: any[] = [];
let page = 0;
let totalCount = 0;
@@ -162,6 +165,7 @@ export async function handlePayloadFetch(ctx: TaskContext): Promise<TaskResult>
// STEP 4: Save raw payload to filesystem
// Per TASK_WORKFLOW_2024-12-10.md: Metadata/Payload separation
// ============================================================
updateStep('saving', `Saving ${allProducts.length} products`);
const rawPayload = {
dispensaryId,
platformId,

View File

@@ -27,7 +27,7 @@ import { taskService } from '../task-service';
const FILTERED_PRODUCTS_HASH = 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0';
export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator } = ctx;
const { pool, task, crawlRotator, updateStep } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
@@ -40,6 +40,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
updateStep('loading', 'Loading dispensary info');
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
@@ -70,6 +71,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
// ============================================================
// STEP 2: Setup Puppeteer with proxy
// ============================================================
updateStep('preflight', `Launching browser for ${dispensary.name}`);
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
@@ -114,6 +116,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
// ============================================================
// STEP 3: Establish session by visiting embedded menu
// ============================================================
updateStep('navigating', `Loading menu page`);
const embedUrl = `https://dutchie.com/embedded-menu/${cName}?menuType=rec`;
console.log(`[ProductDiscoveryHTTP] Establishing session at ${embedUrl}...`);
@@ -178,6 +181,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
// ============================================================
// STEP 4: Fetch ALL products via GraphQL from browser context
// ============================================================
updateStep('fetching', `Executing GraphQL query`);
const result = await page.evaluate(async (platformId: string, graphqlHash: string) => {
const allProducts: any[] = [];
const logs: string[] = [];
@@ -301,6 +305,7 @@ export async function handleProductDiscoveryHttp(ctx: TaskContext): Promise<Task
// ============================================================
// STEP 5: Save raw payload to filesystem
// ============================================================
updateStep('saving', `Saving ${result.products.length} products`);
const rawPayload = {
dispensaryId,
platformId,

View File

@@ -32,7 +32,7 @@ import { taskService } from '../task-service';
const normalizer = new DutchieNormalizer();
export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult> {
const { pool, task } = ctx;
const { pool, task, updateStep } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
@@ -43,6 +43,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
updateStep('loading', 'Loading dispensary info');
const dispResult = await pool.query(`
SELECT
id, name, platform_dispensary_id, menu_url, menu_type, city, state
@@ -68,6 +69,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// STEP 2: Load payload from filesystem
// Per TASK_WORKFLOW_2024-12-10.md: Read local payload, not API
// ============================================================
updateStep('loading', 'Loading payload from storage');
let payloadData: any;
let payloadId: number;
@@ -142,6 +144,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// ============================================================
// STEP 3: Normalize data
// ============================================================
updateStep('normalizing', `Normalizing ${allProducts.length} products`);
console.log(`[ProductRefresh] Normalizing ${allProducts.length} products...`);
// Build RawPayload for the normalizer
@@ -185,6 +188,7 @@ export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult
// ============================================================
// STEP 4: Upsert to canonical tables
// ============================================================
updateStep('upserting', `Saving ${normalizationResult.products.length} products to DB`);
console.log(`[ProductRefresh] Upserting to store_products...`);
const upsertResult = await upsertStoreProducts(

View File

@@ -71,17 +71,19 @@ interface DiscoveredLocation {
}
export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskResult> {
const { pool, task, crawlRotator } = ctx;
const { pool, task, crawlRotator, updateStep } = ctx;
const platform = task.platform || 'dutchie';
let browser: any = null;
try {
updateStep('starting', 'Initializing store discovery');
console.log(`[StoreDiscoveryHTTP] Starting discovery for platform: ${platform}`);
// ============================================================
// STEP 1: Setup Puppeteer with proxy
// ============================================================
updateStep('preflight', 'Launching browser');
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
@@ -126,6 +128,7 @@ export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskRe
// ============================================================
// STEP 2: Establish session by visiting dispensaries page
// ============================================================
updateStep('navigating', 'Loading session page');
const sessionUrl = 'https://dutchie.com/dispensaries';
console.log(`[StoreDiscoveryHTTP] Establishing session at ${sessionUrl}...`);
@@ -174,6 +177,7 @@ export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskRe
// ============================================================
// STEP 4: Fetch cities for each state via GraphQL
// ============================================================
updateStep('fetching', `Fetching cities for ${stateCodesToDiscover.length} states`);
const statesWithCities = await page.evaluate(async (hash: string) => {
const logs: string[] = [];
try {

View File

@@ -271,52 +271,69 @@ class TaskService {
}
/**
* Mark a task as failed, with auto-retry if under max_retries
* Returns true if task was re-queued for retry, false if permanently failed
* Determine if an error is a "soft failure" (transient) that should be requeued
* Soft failures: timeouts, connection issues, browser launch issues
* Hard failures: business logic errors like "No products returned"
*/
private isSoftFailure(errorMessage: string): boolean {
const softFailurePatterns = [
/timeout/i,
/timed out/i,
/connection.*terminated/i,
/connection.*refused/i,
/ECONNRESET/i,
/ECONNREFUSED/i,
/ETIMEDOUT/i,
/socket hang up/i,
/WS endpoint/i,
/browser process/i,
/Failed to launch/i,
/Navigation.*exceeded/i,
/net::ERR_/i,
/ENOENT.*storage/i, // Storage path issues (transient)
/ENOENT.*payload/i, // Payload path issues (transient)
];
return softFailurePatterns.some(pattern => pattern.test(errorMessage));
}
/**
* Mark a task as failed
*
* Soft failures (timeouts, connection issues): Requeue back to pending for later pickup
* Hard failures (business logic errors): Mark as failed permanently
*/
async failTask(taskId: number, errorMessage: string): Promise<boolean> {
// Get current retry state
const result = await pool.query(
`SELECT retry_count, max_retries FROM worker_tasks WHERE id = $1`,
[taskId]
);
const isSoft = this.isSoftFailure(errorMessage);
if (result.rows.length === 0) {
return false;
}
const { retry_count, max_retries } = result.rows[0];
const newRetryCount = (retry_count || 0) + 1;
if (newRetryCount < (max_retries || 3)) {
// Re-queue for retry - reset to pending with incremented retry_count
if (isSoft) {
// Soft failure: put back in queue immediately for another worker
await pool.query(
`UPDATE worker_tasks
SET status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
retry_count = $2,
error_message = $3,
error_message = $2,
scheduled_for = NULL,
updated_at = NOW()
WHERE id = $1`,
[taskId, newRetryCount, `Retry ${newRetryCount}: ${errorMessage}`]
[taskId, `Requeued: ${errorMessage}`]
);
console.log(`[TaskService] Task ${taskId} queued for retry ${newRetryCount}/${max_retries || 3}`);
console.log(`[TaskService] Task ${taskId} requeued for another worker`);
return true;
}
// Max retries exceeded - mark as permanently failed
// Hard failure: mark as permanently failed
await pool.query(
`UPDATE worker_tasks
SET status = 'failed',
completed_at = NOW(),
retry_count = $2,
error_message = $3
error_message = $2
WHERE id = $1`,
[taskId, newRetryCount, `Failed after ${newRetryCount} attempts: ${errorMessage}`]
[taskId, `Hard failure: ${errorMessage}`]
);
console.log(`[TaskService] Task ${taskId} permanently failed after ${newRetryCount} attempts`);
console.log(`[TaskService] Task ${taskId} hard failed: ${errorMessage}`);
return false;
}

View File

@@ -97,8 +97,12 @@ const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010';
// =============================================================================
// Maximum number of tasks this worker will run concurrently
// Tune based on workload: I/O-bound tasks benefit from higher concurrency
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '15');
// Browser tasks (Puppeteer) use ~400MB RAM each. With 2GB pod limit:
// - 3 browsers = ~1.3GB = SAFE
// - 4 browsers = ~1.7GB = RISKY
// - 5+ browsers = OOM CRASH
// See: docs/WORKER_TASK_ARCHITECTURE.md#browser-task-memory-limits
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
// When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
// Default 85% - gives headroom before OOM
@@ -131,6 +135,8 @@ export interface TaskContext {
task: WorkerTask;
heartbeat: () => Promise<void>;
crawlRotator?: CrawlRotator;
/** Update the current step being executed (shown in dashboard) */
updateStep: (step: string, detail?: string) => void;
}
export interface TaskResult {
@@ -264,6 +270,18 @@ export class TaskWorker {
private preflightsCompleted: boolean = false;
private initializingPromise: Promise<void> | null = null;
// ==========================================================================
// STEP TRACKING FOR DASHBOARD VISIBILITY
// ==========================================================================
// Workers report their current step in heartbeats so the dashboard can show
// real-time progress like "preflight", "loading page", "processing products"
// ==========================================================================
private currentStep: string = 'idle';
private currentStepDetail: string | null = null;
private currentStepStartedAt: Date | null = null;
/** Map of task ID -> step info for concurrent tasks */
private taskSteps: Map<number, { step: string; detail: string | null; startedAt: Date }> = new Map();
constructor(role: TaskRole | null = null, workerId?: string) {
this.pool = getPool();
this.role = role;
@@ -346,6 +364,65 @@ export class TaskWorker {
return this.activeTasks.size < this.maxConcurrentTasks;
}
// ==========================================================================
// STEP TRACKING METHODS
// ==========================================================================
/**
* Update the current step for a task (for dashboard visibility)
* @param taskId - The task ID to update
* @param step - Short step name (e.g., "preflight", "loading", "processing")
* @param detail - Optional detail (e.g., "Verifying IP 1.2.3.4")
*/
public updateTaskStep(taskId: number, step: string, detail?: string): void {
this.taskSteps.set(taskId, {
step,
detail: detail || null,
startedAt: new Date(),
});
// Also update the "primary" step for single-task backwards compat
if (this.activeTasks.size === 1 || taskId === Array.from(this.activeTasks.keys())[0]) {
this.currentStep = step;
this.currentStepDetail = detail || null;
this.currentStepStartedAt = new Date();
}
console.log(`[TaskWorker] Step: ${step}${detail ? ` - ${detail}` : ''} (task #${taskId})`);
}
/**
* Clear step tracking for a task (when task completes)
*/
private clearTaskStep(taskId: number): void {
this.taskSteps.delete(taskId);
// Reset primary step if no more active tasks
if (this.activeTasks.size === 0) {
this.currentStep = 'idle';
this.currentStepDetail = null;
this.currentStepStartedAt = null;
}
}
/**
* Get current step info for all active tasks (for heartbeat)
*/
private getTaskStepsInfo(): Array<{
task_id: number;
step: string;
detail: string | null;
elapsed_ms: number;
}> {
const now = Date.now();
return Array.from(this.taskSteps.entries()).map(([taskId, info]) => ({
task_id: taskId,
step: info.step,
detail: info.detail,
elapsed_ms: now - info.startedAt.getTime(),
}));
}
/**
* Initialize stealth systems (proxy rotation, fingerprints)
* Called LAZILY on first task claim attempt (NOT at worker startup).
@@ -635,7 +712,7 @@ export class TaskWorker {
}
/**
* Send heartbeat to registry with resource usage and proxy location
* Send heartbeat to registry with resource usage, proxy location, and step info
*/
private async sendRegistryHeartbeat(): Promise<void> {
try {
@@ -647,6 +724,9 @@ export class TaskWorker {
// Get array of active task IDs
const activeTaskIds = Array.from(this.activeTasks.keys());
// Get step info for all active tasks
const taskSteps = this.getTaskStepsInfo();
await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
@@ -657,6 +737,11 @@ export class TaskWorker {
active_task_count: this.activeTasks.size,
max_concurrent_tasks: this.maxConcurrentTasks,
status: this.activeTasks.size > 0 ? 'active' : 'idle',
// Step tracking for dashboard visibility
current_step: this.currentStep,
current_step_detail: this.currentStepDetail,
current_step_started_at: this.currentStepStartedAt?.toISOString() || null,
task_steps: taskSteps, // Per-task step info for concurrent workers
resources: {
memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024),
memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024),
@@ -915,7 +1000,7 @@ export class TaskWorker {
throw new Error(`No handler registered for role: ${task.role}`);
}
// Create context
// Create context with step tracking
const ctx: TaskContext = {
pool: this.pool,
workerId: this.workerId,
@@ -924,12 +1009,21 @@ export class TaskWorker {
await taskService.heartbeat(task.id);
},
crawlRotator: this.crawlRotator,
updateStep: (step: string, detail?: string) => {
this.updateTaskStep(task.id, step, detail);
},
};
// Initialize step tracking for this task
this.updateTaskStep(task.id, 'starting', `Initializing ${task.role}`);
// Execute the task
const result = await handler(ctx);
if (result.success) {
// Clear step tracking
this.clearTaskStep(task.id);
// Mark as completed
await taskService.completeTask(task.id, result);
await this.reportTaskCompletion(true);
@@ -945,12 +1039,18 @@ export class TaskWorker {
console.log(`[TaskWorker] Chained new task ${chainedTask.id} (${chainedTask.role})`);
}
} else {
// Clear step tracking
this.clearTaskStep(task.id);
// Mark as failed
await taskService.failTask(task.id, result.error || 'Unknown error');
await this.reportTaskCompletion(false);
console.log(`[TaskWorker] ${this.friendlyName} failed task ${task.id}: ${result.error}`);
}
} catch (error: any) {
// Clear step tracking
this.clearTaskStep(task.id);
// Mark as failed
await taskService.failTask(task.id, error.message);
await this.reportTaskCompletion(false);

View File

@@ -5,10 +5,13 @@
*
* Design Pattern: Metadata/Payload Separation
* - Metadata in PostgreSQL (raw_crawl_payloads table): Small, indexed, queryable
* - Payload on filesystem: Gzipped JSON at storage_path
* - Payload stored in MinIO/S3 (or local filesystem as fallback): Gzipped JSON
*
* Storage structure:
* /storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
* Storage structure (MinIO):
* cannaiq/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
*
* Storage structure (Local fallback):
* ./storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
*
* Benefits:
* - Compare any two crawls to see what changed
@@ -16,6 +19,7 @@
* - Debug issues by seeing exactly what the API returned
* - DB stays small, backups stay fast
* - ~90% compression (1.5MB -> 150KB per crawl)
* - Shared storage accessible by all worker pods (MinIO)
*/
import * as fs from 'fs';
@@ -24,13 +28,47 @@ import * as zlib from 'zlib';
import { promisify } from 'util';
import { Pool } from 'pg';
import * as crypto from 'crypto';
import * as Minio from 'minio';
const gzip = promisify(zlib.gzip);
const gunzip = promisify(zlib.gunzip);
// Base path for payload storage (matches image storage pattern)
// Base path for payload storage (used for local fallback and as key prefix in MinIO)
const PAYLOAD_BASE_PATH = process.env.PAYLOAD_STORAGE_PATH || './storage/payloads';
// MinIO configuration
const MINIO_ENDPOINT = process.env.MINIO_ENDPOINT;
const MINIO_PORT = parseInt(process.env.MINIO_PORT || '443');
const MINIO_USE_SSL = process.env.MINIO_USE_SSL === 'true';
const MINIO_ACCESS_KEY = process.env.MINIO_ACCESS_KEY;
const MINIO_SECRET_KEY = process.env.MINIO_SECRET_KEY;
const MINIO_BUCKET = process.env.MINIO_BUCKET || 'cannaiq';
// Check if MinIO is configured
const useMinIO = !!(MINIO_ENDPOINT && MINIO_ACCESS_KEY && MINIO_SECRET_KEY);
let minioClient: Minio.Client | null = null;
function getMinioClient(): Minio.Client {
if (!minioClient && useMinIO) {
minioClient = new Minio.Client({
endPoint: MINIO_ENDPOINT!,
port: MINIO_PORT,
useSSL: MINIO_USE_SSL,
accessKey: MINIO_ACCESS_KEY!,
secretKey: MINIO_SECRET_KEY!,
});
}
return minioClient!;
}
// Log which storage backend we're using
if (useMinIO) {
console.log(`[PayloadStorage] Using MinIO storage: ${MINIO_ENDPOINT}/${MINIO_BUCKET}`);
} else {
console.log(`[PayloadStorage] Using local filesystem storage: ${PAYLOAD_BASE_PATH}`);
}
/**
* Result from saving a payload
*/
@@ -58,9 +96,10 @@ export interface LoadPayloadResult {
}
/**
* Generate storage path for a payload
* Generate storage path/key for a payload
*
* Format: /storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
* MinIO format: payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
* Local format: ./storage/payloads/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz
*/
function generateStoragePath(dispensaryId: number, timestamp: Date): string {
const year = timestamp.getFullYear();
@@ -68,13 +107,15 @@ function generateStoragePath(dispensaryId: number, timestamp: Date): string {
const day = String(timestamp.getDate()).padStart(2, '0');
const ts = timestamp.getTime();
return path.join(
PAYLOAD_BASE_PATH,
String(year),
month,
day,
`store_${dispensaryId}_${ts}.json.gz`
);
const relativePath = `payloads/${year}/${month}/${day}/store_${dispensaryId}_${ts}.json.gz`;
if (useMinIO) {
// MinIO uses forward slashes, no leading slash
return relativePath;
} else {
// Local filesystem uses OS-specific path
return path.join(PAYLOAD_BASE_PATH, String(year), month, day, `store_${dispensaryId}_${ts}.json.gz`);
}
}
/**
@@ -93,7 +134,7 @@ function calculateChecksum(data: Buffer): string {
}
/**
* Save a raw crawl payload to filesystem and record metadata in DB
* Save a raw crawl payload to MinIO/S3 (or filesystem) and record metadata in DB
*
* @param pool - Database connection pool
* @param dispensaryId - ID of the dispensary
@@ -119,9 +160,19 @@ export async function saveRawPayload(
const compressedSize = compressed.length;
const checksum = calculateChecksum(compressed);
// Write to filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
// Write to storage backend
if (useMinIO) {
// Upload to MinIO
const client = getMinioClient();
await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, {
'Content-Type': 'application/gzip',
'Content-Encoding': 'gzip',
});
} else {
// Write to local filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
}
// Record metadata in DB
const result = await pool.query(`
@@ -147,7 +198,8 @@ export async function saveRawPayload(
checksum
]);
console.log(`[PayloadStorage] Saved payload for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`);
const backend = useMinIO ? 'MinIO' : 'local';
console.log(`[PayloadStorage] Saved payload to ${backend} for store ${dispensaryId}: ${storagePath} (${(compressedSize / 1024).toFixed(1)}KB compressed, ${(rawSize / 1024).toFixed(1)}KB raw)`);
return {
id: result.rows[0].id,
@@ -196,13 +248,32 @@ export async function loadRawPayloadById(
}
/**
* Load a raw payload directly from filesystem path
* Load a raw payload directly from storage path (MinIO or filesystem)
*
* @param storagePath - Path to gzipped JSON file
* @param storagePath - Path/key to gzipped JSON file
* @returns Parsed JSON payload
*/
export async function loadPayloadFromPath(storagePath: string): Promise<any> {
const compressed = await fs.promises.readFile(storagePath);
let compressed: Buffer;
// Determine if path looks like MinIO key (starts with payloads/) or local path
const isMinIOPath = storagePath.startsWith('payloads/') && useMinIO;
if (isMinIOPath) {
// Download from MinIO
const client = getMinioClient();
const chunks: Buffer[] = [];
const stream = await client.getObject(MINIO_BUCKET, storagePath);
for await (const chunk of stream) {
chunks.push(chunk as Buffer);
}
compressed = Buffer.concat(chunks);
} else {
// Read from local filesystem
compressed = await fs.promises.readFile(storagePath);
}
const decompressed = await gunzip(compressed);
return JSON.parse(decompressed.toString('utf8'));
}
@@ -378,9 +449,10 @@ export interface SaveDiscoveryPayloadResult {
}
/**
* Generate storage path for a discovery payload
* Generate storage path/key for a discovery payload
*
* Format: /storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
* MinIO format: payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
* Local format: ./storage/payloads/discovery/{year}/{month}/{day}/state_{state_code}_{timestamp}.json.gz
*/
function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): string {
const year = timestamp.getFullYear();
@@ -388,18 +460,17 @@ function generateDiscoveryStoragePath(stateCode: string, timestamp: Date): strin
const day = String(timestamp.getDate()).padStart(2, '0');
const ts = timestamp.getTime();
return path.join(
PAYLOAD_BASE_PATH,
'discovery',
String(year),
month,
day,
`state_${stateCode.toLowerCase()}_${ts}.json.gz`
);
const relativePath = `payloads/discovery/${year}/${month}/${day}/state_${stateCode.toLowerCase()}_${ts}.json.gz`;
if (useMinIO) {
return relativePath;
} else {
return path.join(PAYLOAD_BASE_PATH, 'discovery', String(year), month, day, `state_${stateCode.toLowerCase()}_${ts}.json.gz`);
}
}
/**
* Save a raw store discovery payload to filesystem and record metadata in DB
* Save a raw store discovery payload to MinIO/S3 (or filesystem) and record metadata in DB
*
* @param pool - Database connection pool
* @param stateCode - State code (e.g., 'AZ', 'MI')
@@ -423,9 +494,19 @@ export async function saveDiscoveryPayload(
const compressedSize = compressed.length;
const checksum = calculateChecksum(compressed);
// Write to filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
// Write to storage backend
if (useMinIO) {
// Upload to MinIO
const client = getMinioClient();
await client.putObject(MINIO_BUCKET, storagePath, compressed, compressedSize, {
'Content-Type': 'application/gzip',
'Content-Encoding': 'gzip',
});
} else {
// Write to local filesystem
await ensureDir(storagePath);
await fs.promises.writeFile(storagePath, compressed);
}
// Record metadata in DB
const result = await pool.query(`
@@ -451,7 +532,8 @@ export async function saveDiscoveryPayload(
checksum
]);
console.log(`[PayloadStorage] Saved discovery payload for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`);
const backend = useMinIO ? 'MinIO' : 'local';
console.log(`[PayloadStorage] Saved discovery payload to ${backend} for ${stateCode}: ${storagePath} (${storeCount} stores, ${(compressedSize / 1024).toFixed(1)}KB compressed)`);
return {
id: result.rows[0].id,

View File

@@ -3051,7 +3051,7 @@ class ApiClient {
}
async runTaskScheduleNow(id: number) {
return this.request<{ success: boolean; message: string; task: any }>(`/api/tasks/schedules/${id}/run-now`, {
return this.request<{ success: boolean; message: string; tasksCreated?: number; stateCode?: string }>(`/api/tasks/schedules/${id}/run-now`, {
method: 'POST',
});
}

View File

@@ -703,6 +703,28 @@ function formatTimeAgo(dateStr: string | null): string {
return `${Math.floor(diff / 86400)}d ago`;
}
function formatNextRun(dateStr: string | null): string {
if (!dateStr) return '-';
const date = new Date(dateStr);
const now = new Date();
const diff = (date.getTime() - now.getTime()) / 1000;
// If in the past, show "overdue"
if (diff < 0) {
const absDiff = Math.abs(diff);
if (absDiff < 60) return 'overdue';
if (absDiff < 3600) return `${Math.floor(absDiff / 60)}m overdue`;
if (absDiff < 86400) return `${Math.floor(absDiff / 3600)}h overdue`;
return `${Math.floor(absDiff / 86400)}d overdue`;
}
// Future time
if (diff < 60) return `in ${Math.round(diff)}s`;
if (diff < 3600) return `in ${Math.floor(diff / 60)}m`;
if (diff < 86400) return `in ${Math.floor(diff / 3600)}h ${Math.floor((diff % 3600) / 60)}m`;
return `in ${Math.floor(diff / 86400)}d ${Math.floor((diff % 86400) / 3600)}h`;
}
export default function TasksDashboard() {
const [tasks, setTasks] = useState<Task[]>([]);
const [counts, setCounts] = useState<TaskCounts | null>(null);
@@ -718,6 +740,7 @@ export default function TasksDashboard() {
const [selectedSchedules, setSelectedSchedules] = useState<Set<number>>(new Set());
const [editingSchedule, setEditingSchedule] = useState<TaskSchedule | null>(null);
const [showScheduleModal, setShowScheduleModal] = useState(false);
const [runningScheduleId, setRunningScheduleId] = useState<number | null>(null);
// Pagination
const [page, setPage] = useState(0);
@@ -812,13 +835,17 @@ export default function TasksDashboard() {
};
const handleRunScheduleNow = async (scheduleId: number) => {
if (runningScheduleId !== null) return; // Prevent duplicate clicks
setRunningScheduleId(scheduleId);
try {
const result = await api.runTaskScheduleNow(scheduleId);
alert(result.message);
const result = await api.runTaskScheduleNow(scheduleId) as { success: boolean; message: string; tasksCreated?: number };
alert(result.message + (result.tasksCreated ? ` (${result.tasksCreated} tasks created)` : ''));
fetchData();
} catch (err: any) {
console.error('Run schedule error:', err);
alert(err.response?.data?.error || 'Failed to run schedule');
} finally {
setRunningScheduleId(null);
}
};
@@ -1046,18 +1073,18 @@ export default function TasksDashboard() {
{formatDuration(metric.avg_duration_sec)}
</td>
<td className="px-4 py-3 text-sm text-right text-gray-600">
{metric.tasks_per_worker_hour?.toFixed(1) || '-'}
{metric.tasks_per_worker_hour ? Number(metric.tasks_per_worker_hour).toFixed(1) : '-'}
</td>
<td className="px-4 py-3 text-sm text-right">
{metric.estimated_hours_to_drain ? (
<span
className={
metric.estimated_hours_to_drain > 4
Number(metric.estimated_hours_to_drain) > 4
? 'text-red-600 font-medium'
: 'text-gray-600'
}
>
{metric.estimated_hours_to_drain.toFixed(1)}h
{Number(metric.estimated_hours_to_drain).toFixed(1)}h
</span>
) : (
'-'
@@ -1230,7 +1257,7 @@ export default function TasksDashboard() {
{schedule.last_run_at ? formatTimeAgo(schedule.last_run_at) : '-'}
</td>
<td className="px-4 py-3 text-sm text-gray-600">
{schedule.next_run_at ? formatTimeAgo(schedule.next_run_at) : '-'}
{schedule.next_run_at ? formatNextRun(schedule.next_run_at) : '-'}
</td>
<td className="px-4 py-3">
<span
@@ -1257,10 +1284,17 @@ export default function TasksDashboard() {
<div className="flex items-center gap-1">
<button
onClick={() => handleRunScheduleNow(schedule.id)}
className="p-1.5 text-gray-400 hover:text-emerald-600 hover:bg-emerald-50 rounded transition-colors"
title="Run now"
disabled={runningScheduleId !== null}
className={`p-1.5 rounded transition-colors ${
runningScheduleId === schedule.id
? 'text-emerald-600 bg-emerald-50 cursor-wait'
: runningScheduleId !== null
? 'text-gray-300 cursor-not-allowed'
: 'text-gray-400 hover:text-emerald-600 hover:bg-emerald-50'
}`}
title={runningScheduleId === schedule.id ? 'Running...' : 'Run now'}
>
<PlayCircle className="w-4 h-4" />
<PlayCircle className={`w-4 h-4 ${runningScheduleId === schedule.id ? 'animate-pulse' : ''}`} />
</button>
<button
onClick={() => handleToggleSchedule(schedule.id)}

View File

@@ -78,6 +78,16 @@ interface Worker {
timezone?: string;
isRotating?: boolean;
};
// Step tracking
current_step?: string;
current_step_detail?: string;
current_step_started_at?: string;
task_steps?: Array<{
task_id: number;
step: string;
detail: string | null;
elapsed_ms: number;
}>;
} | null;
}
@@ -87,11 +97,24 @@ interface Task {
role: string;
dispensary_id: number | null;
dispensary_name?: string;
dispensary_slug?: string;
status: string;
priority: number;
started_at: string | null;
completed_at: string | null;
claimed_by: string | null;
worker_id: string | null;
error_message?: string | null;
result?: {
success?: boolean;
productsProcessed?: number;
snapshotsCreated?: number;
newProducts?: number;
updatedProducts?: number;
storesDiscovered?: number;
markedOos?: number;
error?: string;
} | null;
}
function formatRelativeTime(dateStr: string | null): string {
@@ -349,7 +372,59 @@ function TransportBadge({ worker }: { worker: Worker }) {
);
}
// Task count badge showing active/max concurrent tasks
// Step badge showing current step with detail
function StepBadge({ worker }: { worker: Worker }) {
const step = worker.metadata?.current_step;
const detail = worker.metadata?.current_step_detail;
const startedAt = worker.metadata?.current_step_started_at;
const taskSteps = worker.metadata?.task_steps;
if (!step || step === 'idle') {
return null;
}
// Calculate elapsed time
let elapsedStr = '';
if (startedAt) {
const elapsed = Date.now() - new Date(startedAt).getTime();
if (elapsed < 60000) {
elapsedStr = `${Math.round(elapsed / 1000)}s`;
} else {
elapsedStr = `${Math.round(elapsed / 60000)}m`;
}
}
// Step colors
const getStepColor = (s: string) => {
if (s.includes('preflight')) return 'text-yellow-600 bg-yellow-50';
if (s.includes('loading') || s.includes('navigating')) return 'text-blue-600 bg-blue-50';
if (s.includes('processing') || s.includes('normalizing')) return 'text-purple-600 bg-purple-50';
if (s.includes('saving') || s.includes('upserting')) return 'text-emerald-600 bg-emerald-50';
if (s.includes('error') || s.includes('failed')) return 'text-red-600 bg-red-50';
return 'text-gray-600 bg-gray-50';
};
const colorClass = getStepColor(step);
// Build tooltip with all task steps if concurrent
const tooltipLines = taskSteps?.map(ts =>
`Task #${ts.task_id}: ${ts.step}${ts.detail ? ` - ${ts.detail}` : ''} (${Math.round(ts.elapsed_ms / 1000)}s)`
) || [];
return (
<div
className={`inline-flex items-center gap-1.5 px-2 py-1 rounded text-xs font-medium ${colorClass}`}
title={tooltipLines.length > 0 ? tooltipLines.join('\n') : undefined}
>
<span className="animate-pulse"></span>
<span className="font-semibold">{step}</span>
{detail && <span className="text-gray-500 truncate max-w-[120px]">- {detail}</span>}
{elapsedStr && <span className="text-gray-400">({elapsedStr})</span>}
</div>
);
}
// Task count badge showing active/max concurrent tasks with task details
function TaskCountBadge({ worker, tasks }: { worker: Worker; tasks: Task[] }) {
const activeCount = worker.active_task_count ?? (worker.current_task_id ? 1 : 0);
const maxCount = worker.max_concurrent_tasks ?? 1;
@@ -359,20 +434,34 @@ function TaskCountBadge({ worker, tasks }: { worker: Worker; tasks: Task[] }) {
return <span className="text-gray-400 text-sm">Idle</span>;
}
// Get task names for tooltip
const taskNames = taskIds.map(id => {
const task = tasks.find(t => t.id === id);
return task ? `#${id}: ${task.role}${task.dispensary_name ? ` (${task.dispensary_name})` : ''}` : `#${id}`;
}).join('\n');
// Get task details for display
const activeTasks = taskIds.map(id => tasks.find(t => t.id === id)).filter(Boolean) as Task[];
// Build tooltip with full details
const tooltipLines = activeTasks.map(task =>
`#${task.id}: ${task.role}${task.dispensary_name ? ` - ${task.dispensary_name}` : ''}`
);
// Show first task details inline
const firstTask = activeTasks[0];
const roleLabel = firstTask?.role?.replace(/_/g, ' ') || 'task';
const storeName = firstTask?.dispensary_name;
return (
<div className="flex items-center gap-2" title={taskNames}>
<div className="flex flex-col gap-0.5" title={tooltipLines.join('\n')}>
<span className="text-sm font-medium text-blue-600">
{activeCount}/{maxCount} tasks
{activeCount}/{maxCount} active
</span>
{taskIds.length === 1 && (
<span className="text-xs text-gray-500">#{taskIds[0]}</span>
{firstTask && (
<span className="text-xs text-gray-500 truncate max-w-[140px]">
{roleLabel}{storeName ? `: ${storeName}` : ''}
</span>
)}
{activeTasks.length > 1 && (
<span className="text-xs text-gray-400">+{activeTasks.length - 1} more</span>
)}
{/* Show current step */}
<StepBadge worker={worker} />
</div>
);
}
@@ -507,6 +596,175 @@ function groupWorkersByPod(workers: Worker[]): Map<string, Worker[]> {
return pods;
}
// Calculate task duration in seconds
function getTaskDuration(task: Task): number | null {
if (!task.started_at) return null;
const start = new Date(task.started_at);
const end = task.completed_at ? new Date(task.completed_at) : new Date();
return Math.round((end.getTime() - start.getTime()) / 1000);
}
// Format duration for display
function formatTaskDuration(seconds: number | null): string {
if (seconds === null) return '-';
if (seconds < 60) return `${seconds}s`;
const mins = Math.floor(seconds / 60);
const secs = seconds % 60;
if (mins < 60) return `${mins}m ${secs}s`;
const hrs = Math.floor(mins / 60);
return `${hrs}h ${mins % 60}m`;
}
// Get friendly worker name from worker_id
function getWorkerShortName(workerId: string | null): string {
if (!workerId) return 'Unknown';
// Extract last part after the hash (e.g., "scraper-worker-75b8b9b5c9-46p4j" -> "46p4j")
const parts = workerId.split('-');
return parts[parts.length - 1] || workerId.slice(-8);
}
// Live Activity Panel - shows recent task completions and failures
function LiveActivityPanel({
recentTasks,
runningTasks
}: {
recentTasks: Task[];
runningTasks: Task[];
}) {
// Combine running and recent completed/failed, sort by most recent activity
const allActivity = [
...runningTasks.map(t => ({ ...t, activityType: 'running' as const })),
...recentTasks.map(t => ({ ...t, activityType: t.status as 'completed' | 'failed' })),
].sort((a, b) => {
const aTime = a.activityType === 'running' ? a.started_at : a.completed_at;
const bTime = b.activityType === 'running' ? b.started_at : b.completed_at;
if (!aTime || !bTime) return 0;
return new Date(bTime).getTime() - new Date(aTime).getTime();
}).slice(0, 15); // Show max 15 items
const getRoleIcon = (role: string) => {
switch (role) {
case 'product_refresh': return '🔄';
case 'product_discovery': return '🔍';
case 'store_discovery': return '🏪';
case 'entry_point_discovery': return '🎯';
case 'analytics_refresh': return '📊';
default: return '📋';
}
};
const getStatusConfig = (status: string) => {
switch (status) {
case 'running':
return { bg: 'bg-blue-50', border: 'border-blue-200', icon: '🔵', text: 'text-blue-700' };
case 'completed':
return { bg: 'bg-emerald-50', border: 'border-emerald-200', icon: '🟢', text: 'text-emerald-700' };
case 'failed':
return { bg: 'bg-red-50', border: 'border-red-200', icon: '🔴', text: 'text-red-700' };
default:
return { bg: 'bg-gray-50', border: 'border-gray-200', icon: '⚪', text: 'text-gray-700' };
}
};
const getResultSummary = (task: Task): string => {
if (!task.result) return '';
const parts: string[] = [];
if (task.result.productsProcessed) parts.push(`${task.result.productsProcessed} products`);
if (task.result.newProducts) parts.push(`${task.result.newProducts} new`);
if (task.result.storesDiscovered) parts.push(`${task.result.storesDiscovered} stores`);
if (task.result.markedOos && task.result.markedOos > 0) parts.push(`${task.result.markedOos} OOS`);
return parts.length > 0 ? ` - ${parts.join(', ')}` : '';
};
return (
<div className="bg-white rounded-lg border border-gray-200 overflow-hidden">
<div className="px-4 py-3 border-b border-gray-200 bg-gray-50">
<div className="flex items-center justify-between">
<h3 className="text-sm font-semibold text-gray-900 flex items-center gap-2">
<Activity className="w-4 h-4 text-blue-500" />
Live Activity
</h3>
<span className="text-xs text-gray-500">
{runningTasks.length} running, {recentTasks.length} recent
</span>
</div>
</div>
<div className="divide-y divide-gray-100 max-h-[400px] overflow-y-auto">
{allActivity.length === 0 ? (
<div className="px-4 py-8 text-center text-gray-500">
<Activity className="w-8 h-8 mx-auto mb-2 text-gray-300" />
<p className="text-sm">No recent activity</p>
</div>
) : (
allActivity.map((task) => {
const config = getStatusConfig(task.activityType);
const duration = getTaskDuration(task);
const workerName = getWorkerShortName(task.worker_id);
return (
<div
key={`${task.id}-${task.activityType}`}
className={`px-4 py-3 ${config.bg} ${task.activityType === 'running' ? 'animate-pulse' : ''}`}
>
<div className="flex items-start gap-3">
<span className="text-lg flex-shrink-0">{config.icon}</span>
<div className="flex-1 min-w-0">
<div className="flex items-center gap-2 flex-wrap">
<span className="text-xs font-medium text-gray-500 bg-gray-100 px-1.5 py-0.5 rounded">
{workerName}
</span>
<span className={`text-sm font-medium ${config.text}`}>
{task.activityType === 'running' ? 'working on' : task.activityType}
</span>
<span className="text-sm text-gray-700">
{getRoleIcon(task.role)} {task.role.replace(/_/g, ' ')}
</span>
</div>
{task.dispensary_name && (
<p className="text-sm text-gray-900 font-medium mt-1 truncate">
{task.dispensary_name}
</p>
)}
<div className="flex items-center gap-3 mt-1 text-xs text-gray-500">
{duration !== null && (
<span className="flex items-center gap-1">
<Timer className="w-3 h-3" />
{formatTaskDuration(duration)}
</span>
)}
{task.activityType === 'completed' && task.result && (
<span className="text-emerald-600 font-medium">
{getResultSummary(task)}
</span>
)}
{task.activityType === 'failed' && task.error_message && (
<span className="text-red-600 truncate max-w-[200px]" title={task.error_message}>
{task.error_message.slice(0, 50)}...
</span>
)}
{task.completed_at && (
<span className="text-gray-400">
{formatRelativeTime(task.completed_at)}
</span>
)}
</div>
</div>
<span className="text-xs text-gray-400 flex-shrink-0">
#{task.id}
</span>
</div>
</div>
);
})
)}
</div>
</div>
);
}
// Format estimated time remaining
function formatEstimatedTime(hours: number): string {
if (hours < 1) {
@@ -524,7 +782,8 @@ function formatEstimatedTime(hours: number): string {
export function WorkersDashboard() {
const [workers, setWorkers] = useState<Worker[]>([]);
const [tasks, setTasks] = useState<Task[]>([]);
const [tasks, setTasks] = useState<Task[]>([]); // Running tasks
const [recentTasks, setRecentTasks] = useState<Task[]>([]); // Recent completed/failed
const [pendingTaskCount, setPendingTaskCount] = useState<number>(0);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
@@ -538,15 +797,28 @@ export function WorkersDashboard() {
const fetchData = useCallback(async () => {
try {
// Fetch workers from registry, running tasks, and task counts
const [workersRes, tasksRes, countsRes] = await Promise.all([
// Fetch workers from registry, running tasks, recent tasks, and task counts
const [workersRes, tasksRes, recentCompletedRes, recentFailedRes, countsRes] = await Promise.all([
api.get('/api/worker-registry/workers'),
api.get('/api/tasks?status=running&limit=100'),
api.get('/api/tasks?status=completed&limit=10'),
api.get('/api/tasks?status=failed&limit=5'),
api.get('/api/tasks/counts'),
]);
setWorkers(workersRes.data.workers || []);
setTasks(tasksRes.data.tasks || []);
// Combine recent completed and failed, sort by completion time
const recentCompleted = recentCompletedRes.data.tasks || [];
const recentFailed = recentFailedRes.data.tasks || [];
const combined = [...recentCompleted, ...recentFailed].sort((a, b) => {
const aTime = a.completed_at ? new Date(a.completed_at).getTime() : 0;
const bTime = b.completed_at ? new Date(b.completed_at).getTime() : 0;
return bTime - aTime;
});
setRecentTasks(combined.slice(0, 15));
setPendingTaskCount(countsRes.data?.pending || 0);
setError(null);
} catch (err: any) {
@@ -764,8 +1036,15 @@ export function WorkersDashboard() {
);
})()}
{/* Worker Pods Visualization */}
<div className="bg-white rounded-lg border border-gray-200 overflow-hidden">
{/* Two Column Layout: Live Activity + Worker Pods */}
<div className="grid grid-cols-1 lg:grid-cols-3 gap-6">
{/* Live Activity Panel - Takes 1/3 width on large screens */}
<div className="lg:col-span-1">
<LiveActivityPanel recentTasks={recentTasks} runningTasks={tasks} />
</div>
{/* Worker Pods Visualization - Takes 2/3 width on large screens */}
<div className="lg:col-span-2 bg-white rounded-lg border border-gray-200 overflow-hidden">
<div className="px-4 py-3 border-b border-gray-200 bg-gray-50">
<div className="flex items-center justify-between">
<div>
@@ -911,6 +1190,7 @@ export function WorkersDashboard() {
})()}
</div>
)}
</div>
</div>
{/* Workers Table */}

View File

@@ -12,7 +12,10 @@ metadata:
name: scraper-worker
namespace: dispensary-scraper
spec:
replicas: 25
# MAX 8 PODS - See CLAUDE.md rule #6
# Each pod runs up to MAX_CONCURRENT_TASKS browsers (~400MB each)
# Scale pods for throughput, not concurrent tasks per pod
replicas: 8
selector:
matchLabels:
app: scraper-worker
@@ -44,6 +47,10 @@ spec:
value: "http://scraper"
- name: NODE_OPTIONS
value: "--max-old-space-size=1500"
# Browser memory limits - see docs/WORKER_TASK_ARCHITECTURE.md
# 3 browsers × ~400MB = ~1.3GB (safe for 2GB pod limit)
- name: MAX_CONCURRENT_TASKS
value: "3"
resources:
requests:
memory: "1Gi"
@@ -61,169 +68,3 @@ spec:
periodSeconds: 30
failureThreshold: 3
terminationGracePeriodSeconds: 60
---
# =============================================================================
# ALTERNATIVE: StatefulSet with multiple workers per pod (not currently used)
# =============================================================================
# Task Worker Pods (StatefulSet)
# Each pod runs 5 role-agnostic workers that pull tasks from worker_tasks queue.
#
# Architecture:
# - Pods are named from a predefined list (Aethelgard, Xylos, etc.)
# - Each pod spawns 5 worker processes
# - Workers register with API and show their pod name
# - HPA scales pods 5-15 based on pending task count
# - Workers use DB-level locking (FOR UPDATE SKIP LOCKED) to prevent conflicts
#
# Pod Names (up to 25):
# Aethelgard, Xylos, Kryll, Coriolis, Dimidium, Veridia, Zetani, Talos IV,
# Onyx, Celestia, Gormand, Betha, Ragnar, Syphon, Axiom, Nadir, Terra Nova,
# Acheron, Nexus, Vespera, Helios Prime, Oasis, Mordina, Cygnus, Umbra
---
apiVersion: v1
kind: ConfigMap
metadata:
name: pod-names
namespace: dispensary-scraper
data:
names: |
Aethelgard
Xylos
Kryll
Coriolis
Dimidium
Veridia
Zetani
Talos IV
Onyx
Celestia
Gormand
Betha
Ragnar
Syphon
Axiom
Nadir
Terra Nova
Acheron
Nexus
Vespera
Helios Prime
Oasis
Mordina
Cygnus
Umbra
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: worker-pod
namespace: dispensary-scraper
spec:
serviceName: worker-pods
replicas: 5
podManagementPolicy: Parallel
selector:
matchLabels:
app: worker-pod
template:
metadata:
labels:
app: worker-pod
spec:
imagePullSecrets:
- name: regcred
containers:
- name: workers
image: code.cannabrands.app/creationshop/dispensary-scraper:latest
# Run 5 workers per pod
command: ["/bin/sh", "-c"]
args:
- |
# Get pod ordinal (0, 1, 2, etc.)
ORDINAL=$(echo $HOSTNAME | rev | cut -d'-' -f1 | rev)
# Get pod name from configmap
POD_NAME=$(sed -n "$((ORDINAL + 1))p" /etc/pod-names/names)
echo "Starting pod: $POD_NAME (ordinal: $ORDINAL)"
# Start 5 workers in this pod
for i in 1 2 3 4 5; do
WORKER_ID="${POD_NAME}-worker-${i}" \
POD_NAME="$POD_NAME" \
node dist/tasks/task-worker.js &
done
# Wait for all workers
wait
envFrom:
- configMapRef:
name: scraper-config
- secretRef:
name: scraper-secrets
env:
- name: API_BASE_URL
value: "http://scraper:3010"
- name: WORKERS_PER_POD
value: "5"
volumeMounts:
- name: pod-names
mountPath: /etc/pod-names
resources:
requests:
memory: "512Mi"
cpu: "200m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
exec:
command:
- /bin/sh
- -c
- "pgrep -f 'task-worker' > /dev/null"
initialDelaySeconds: 15
periodSeconds: 30
failureThreshold: 3
volumes:
- name: pod-names
configMap:
name: pod-names
terminationGracePeriodSeconds: 60
---
# Headless service for StatefulSet
apiVersion: v1
kind: Service
metadata:
name: worker-pods
namespace: dispensary-scraper
spec:
clusterIP: None
selector:
app: worker-pod
ports:
- port: 80
name: placeholder
---
# HPA to scale pods based on pending tasks
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: worker-pod-hpa
namespace: dispensary-scraper
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: StatefulSet
name: worker-pod
minReplicas: 5
maxReplicas: 15
metrics:
- type: External
external:
metric:
name: pending_tasks
selector:
matchLabels:
queue: worker_tasks
target:
type: AverageValue
averageValue: "10"