feat: Remove Run Now, add source tracking, optimize dashboard

- Remove /run-now endpoint (use task priority instead)
- Add source tracking to worker_tasks (source, source_schedule_id, source_metadata)
- Parallelize dashboard API calls (Promise.all)
- Add 1-5 min caching to /markets/dashboard and /national/summary
- Add performance indexes for dashboard queries

Migrations:
- 104: Task source tracking columns
- 105: Dashboard performance indexes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-13 13:23:35 -07:00
parent a8fec97bcb
commit 8b3ae40089
13 changed files with 271 additions and 460 deletions

View File

@@ -1,5 +1,8 @@
# Claude Guidelines for CannaiQ # Claude Guidelines for CannaiQ
## CURRENT ENVIRONMENT: PRODUCTION
**We are working in PRODUCTION only.** All database queries and API calls should target the remote production environment, not localhost. Use kubectl port-forward or remote DB connections as needed.
## PERMANENT RULES (NEVER VIOLATE) ## PERMANENT RULES (NEVER VIOLATE)
### 1. NO DELETE ### 1. NO DELETE
@@ -247,14 +250,14 @@ These binaries mimic real browser TLS fingerprints to avoid detection.
--- ---
## Staggered Task Workflow (Added 2025-12-12) ## Bulk Task Workflow (Updated 2025-12-13)
### Overview ### Overview
When creating many tasks at once (e.g., product refresh for all AZ stores), staggered scheduling prevents resource contention, proxy assignment lag, and API rate limiting. Tasks are created with `scheduled_for = NOW()` by default. Worker-level controls handle pacing - no task-level staggering needed.
### How It Works ### How It Works
``` ```
1. Task created with scheduled_for = NOW() + (index * stagger_seconds) 1. Task created with scheduled_for = NOW()
2. Worker claims task only when scheduled_for <= NOW() 2. Worker claims task only when scheduled_for <= NOW()
3. Worker runs preflight on EVERY task claim (proxy health check) 3. Worker runs preflight on EVERY task claim (proxy health check)
4. If preflight passes, worker executes task 4. If preflight passes, worker executes task
@@ -263,57 +266,51 @@ When creating many tasks at once (e.g., product refresh for all AZ stores), stag
7. Repeat - preflight runs on each new task claim 7. Repeat - preflight runs on each new task claim
``` ```
### Worker-Level Throttling
These controls pace task execution - no staggering at task creation time:
| Control | Purpose |
|---------|---------|
| `MAX_CONCURRENT_TASKS` | Limits concurrent tasks per pod (default: 3) |
| Working hours | Restricts when tasks run (configurable per schedule) |
| Preflight checks | Ensures proxy health before each task |
| Per-store locking | Only one active task per dispensary |
### Key Points ### Key Points
- **Preflight is per-task, not per-startup**: Each task claim triggers a new preflight check - **Preflight is per-task, not per-startup**: Each task claim triggers a new preflight check
- **Stagger prevents thundering herd**: 15 seconds between tasks is default - **Worker controls pacing**: Tasks scheduled for NOW() but claimed based on worker capacity
- **Task assignment is the trigger**: Worker picks up task → runs preflight → executes if passed - **Optional staggering**: Pass `stagger_seconds > 0` if you need explicit delays
### API Endpoints ### API Endpoints
```bash ```bash
# Create staggered tasks for specific dispensary IDs # Create bulk tasks for specific dispensary IDs
POST /api/tasks/batch/staggered POST /api/tasks/batch/staggered
{ {
"dispensary_ids": [1, 2, 3, 4], "dispensary_ids": [1, 2, 3, 4],
"role": "product_refresh", # or "product_discovery" "role": "product_refresh", # or "product_discovery"
"stagger_seconds": 15, # default: 15 "stagger_seconds": 0, # default: 0 (all NOW)
"platform": "dutchie", # default: "dutchie" "platform": "dutchie", # default: "dutchie"
"method": null # "curl" | "http" | null "method": null # "curl" | "http" | null
} }
# Create staggered tasks for AZ stores (convenience endpoint) # Create bulk tasks for all stores in a state
POST /api/tasks/batch/az-stores POST /api/tasks/crawl-state/:stateCode
{ {
"total_tasks": 24, # default: 24 "stagger_seconds": 0, # default: 0 (all NOW)
"stagger_seconds": 15, # default: 15 "method": "http" # default: "http"
"split_roles": true # default: true (12 refresh, 12 discovery)
} }
``` ```
### Example: 24 Tasks for AZ Stores ### Example: Tasks for AZ Stores
```bash ```bash
curl -X POST http://localhost:3010/api/tasks/batch/az-stores \ curl -X POST http://localhost:3010/api/tasks/crawl-state/AZ \
-H "Content-Type: application/json" \ -H "Content-Type: application/json"
-d '{"total_tasks": 24, "stagger_seconds": 15, "split_roles": true}'
```
Response:
```json
{
"success": true,
"total": 24,
"product_refresh": 12,
"product_discovery": 12,
"stagger_seconds": 15,
"total_duration_seconds": 345,
"estimated_completion": "2025-12-12T08:40:00.000Z",
"message": "Created 24 staggered tasks for AZ stores (12 refresh, 12 discovery)"
}
``` ```
### Related Files ### Related Files
| File | Purpose | | File | Purpose |
|------|---------| |------|---------|
| `src/tasks/task-service.ts` | `createStaggeredTasks()` and `createAZStoreTasks()` methods | | `src/tasks/task-service.ts` | `createStaggeredTasks()` method |
| `src/routes/tasks.ts` | API endpoints for batch task creation | | `src/routes/tasks.ts` | API endpoints for batch task creation |
| `src/tasks/task-worker.ts` | Worker task claiming and preflight logic | | `src/tasks/task-worker.ts` | Worker task claiming and preflight logic |

View File

@@ -0,0 +1,25 @@
-- Migration 104: Add source tracking to worker_tasks
-- Purpose: Track WHERE tasks are created from (schedule vs API endpoint)
--
-- All automated task creation should be visible in task_schedules.
-- This column helps identify "phantom" tasks created outside the schedule system.
-- Add source column to worker_tasks
ALTER TABLE worker_tasks
ADD COLUMN IF NOT EXISTS source VARCHAR(100);
-- Add source_id column (references schedule_id if from a schedule)
ALTER TABLE worker_tasks
ADD COLUMN IF NOT EXISTS source_schedule_id INTEGER REFERENCES task_schedules(id);
-- Add request metadata (IP, user agent) for debugging
ALTER TABLE worker_tasks
ADD COLUMN IF NOT EXISTS source_metadata JSONB;
-- Create index for querying by source
CREATE INDEX IF NOT EXISTS idx_worker_tasks_source ON worker_tasks(source);
-- Comment explaining source values
COMMENT ON COLUMN worker_tasks.source IS 'Task creation source: schedule, api_run_now, api_crawl_state, api_batch_staggered, api_batch_az_stores, task_chain, manual';
COMMENT ON COLUMN worker_tasks.source_schedule_id IS 'ID of the schedule that created this task (if source=schedule or source=api_run_now)';
COMMENT ON COLUMN worker_tasks.source_metadata IS 'Request metadata: {ip, user_agent, endpoint, timestamp}';

View File

@@ -0,0 +1,25 @@
-- Migration 105: Add indexes for dashboard performance
-- Purpose: Speed up the /dashboard and /national/summary endpoints
--
-- These queries were identified as slow:
-- 1. COUNT(*) FROM store_product_snapshots WHERE captured_at >= NOW() - INTERVAL '24 hours'
-- 2. National summary aggregate queries
-- Index for snapshot counts by time (used in dashboard)
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_store_product_snapshots_captured_at
ON store_product_snapshots(captured_at DESC);
-- Index for crawl traces by time and success (used in dashboard)
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_crawl_traces_started_success
ON crawl_orchestration_traces(started_at DESC, success);
-- Partial index for recent failed crawls (faster for dashboard alerts)
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_crawl_traces_recent_failures
ON crawl_orchestration_traces(started_at DESC)
WHERE success = false;
-- Composite index for store_products aggregations by dispensary
-- Helps with national summary state metrics query
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_store_products_dispensary_brand
ON store_products(dispensary_id, brand_name_raw)
WHERE brand_name_raw IS NOT NULL;

View File

@@ -151,6 +151,19 @@ function generateSlug(name: string, city: string, state: string): string {
return base; return base;
} }
/**
* Derive menu_type from platform_menu_url pattern
*/
function deriveMenuType(url: string | null): string {
if (!url) return 'unknown';
if (url.includes('/dispensary/')) return 'standalone';
if (url.includes('/embedded-menu/')) return 'embedded';
if (url.includes('/stores/')) return 'standalone';
// Custom domain = embedded widget on store's site
if (!url.includes('dutchie.com')) return 'embedded';
return 'unknown';
}
/** /**
* Log a promotion action to dutchie_promotion_log * Log a promotion action to dutchie_promotion_log
*/ */
@@ -399,7 +412,7 @@ async function promoteLocation(
loc.timezone, // $15 timezone loc.timezone, // $15 timezone
loc.platform_location_id, // $16 platform_dispensary_id loc.platform_location_id, // $16 platform_dispensary_id
loc.platform_menu_url, // $17 menu_url loc.platform_menu_url, // $17 menu_url
'dutchie', // $18 menu_type deriveMenuType(loc.platform_menu_url), // $18 menu_type
loc.description, // $19 description loc.description, // $19 description
loc.logo_image, // $20 logo_image loc.logo_image, // $20 logo_image
loc.banner_image, // $21 banner_image loc.banner_image, // $21 banner_image

View File

@@ -5,6 +5,29 @@ import { pool } from '../db/pool';
const router = Router(); const router = Router();
router.use(authMiddleware); router.use(authMiddleware);
// In-memory cache for expensive queries
interface CacheEntry {
data: any;
expiresAt: number;
}
const cache: Map<string, CacheEntry> = new Map();
function getCached<T>(key: string): T | null {
const entry = cache.get(key);
if (entry && entry.expiresAt > Date.now()) {
return entry.data as T;
}
cache.delete(key);
return null;
}
function setCache(key: string, data: any, ttlSeconds: number): void {
cache.set(key, {
data,
expiresAt: Date.now() + ttlSeconds * 1000,
});
}
// Get analytics overview // Get analytics overview
router.get('/overview', async (req, res) => { router.get('/overview', async (req, res) => {
try { try {
@@ -96,10 +119,17 @@ router.get('/products/:id', async (req, res) => {
/** /**
* GET /api/analytics/national/summary * GET /api/analytics/national/summary
* National dashboard summary with state-by-state metrics * National dashboard summary with state-by-state metrics
* OPTIMIZED: Uses approximate counts and single query for state metrics * OPTIMIZED: Cached for 5 minutes, uses approximate counts
*/ */
router.get('/national/summary', async (req, res) => { router.get('/national/summary', async (req, res) => {
try { try {
// Check cache first (5 minute TTL)
const CACHE_KEY = 'national_summary';
const cached = getCached<any>(CACHE_KEY);
if (cached) {
return res.json(cached);
}
// Single optimized query for all state metrics // Single optimized query for all state metrics
const { rows: stateMetrics } = await pool.query(` const { rows: stateMetrics } = await pool.query(`
SELECT SELECT
@@ -144,7 +174,7 @@ router.get('/national/summary', async (req, res) => {
) b ) b
`); `);
res.json({ const response = {
success: true, success: true,
data: { data: {
totalStates: stateMetrics.length, totalStates: stateMetrics.length,
@@ -165,7 +195,12 @@ router.get('/national/summary', async (req, res) => {
onSpecialProducts: parseInt(s.on_special_products || '0'), onSpecialProducts: parseInt(s.on_special_products || '0'),
})), })),
}, },
}); };
// Cache for 5 minutes
setCache(CACHE_KEY, response, 300);
res.json(response);
} catch (error: any) { } catch (error: any) {
console.error('[Analytics] Error fetching national summary:', error.message); console.error('[Analytics] Error fetching national summary:', error.message);
res.status(500).json({ success: false, error: error.message }); res.status(500).json({ success: false, error: error.message });

View File

@@ -11,13 +11,21 @@ import { pool } from '../db/pool';
const router = Router(); const router = Router();
router.use(authMiddleware); router.use(authMiddleware);
// In-memory cache for dashboard (1 minute TTL)
let dashboardCache: { data: any; expiresAt: number } | null = null;
/** /**
* GET /api/markets/dashboard * GET /api/markets/dashboard
* Dashboard summary with counts for dispensaries, products, brands, etc. * Dashboard summary with counts for dispensaries, products, brands, etc.
* Optimized: Uses single query with approximate counts for large tables * Optimized: Cached for 1 minute, uses approximate counts for large tables
*/ */
router.get('/dashboard', async (req: Request, res: Response) => { router.get('/dashboard', async (req: Request, res: Response) => {
try { try {
// Check cache first (1 minute TTL)
if (dashboardCache && dashboardCache.expiresAt > Date.now()) {
return res.json(dashboardCache.data);
}
// Single optimized query for all counts // Single optimized query for all counts
const { rows } = await pool.query(` const { rows } = await pool.query(`
SELECT SELECT
@@ -31,7 +39,7 @@ router.get('/dashboard', async (req: Request, res: Response) => {
`); `);
const r = rows[0]; const r = rows[0];
res.json({ const data = {
dispensaryCount: parseInt(r?.dispensary_count || '0', 10), dispensaryCount: parseInt(r?.dispensary_count || '0', 10),
productCount: parseInt(r?.product_count || '0', 10), productCount: parseInt(r?.product_count || '0', 10),
brandCount: parseInt(r?.brand_count || '0', 10), brandCount: parseInt(r?.brand_count || '0', 10),
@@ -39,7 +47,12 @@ router.get('/dashboard', async (req: Request, res: Response) => {
snapshotCount24h: parseInt(r?.snapshot_count_24h || '0', 10), snapshotCount24h: parseInt(r?.snapshot_count_24h || '0', 10),
lastCrawlTime: r?.last_crawl || null, lastCrawlTime: r?.last_crawl || null,
failedJobCount: parseInt(r?.failed_count || '0', 10), failedJobCount: parseInt(r?.failed_count || '0', 10),
}); };
// Cache for 1 minute
dashboardCache = { data, expiresAt: Date.now() + 60 * 1000 };
res.json(data);
} catch (error: any) { } catch (error: any) {
console.error('[Markets] Error fetching dashboard:', error.message); console.error('[Markets] Error fetching dashboard:', error.message);
res.status(500).json({ error: error.message }); res.status(500).json({ error: error.message });

View File

@@ -16,9 +16,11 @@
* PUT /api/tasks/schedules/:id - Update schedule * PUT /api/tasks/schedules/:id - Update schedule
* DELETE /api/tasks/schedules/:id - Delete schedule * DELETE /api/tasks/schedules/:id - Delete schedule
* DELETE /api/tasks/schedules - Bulk delete schedules * DELETE /api/tasks/schedules - Bulk delete schedules
* POST /api/tasks/schedules/:id/run-now - Trigger schedule immediately
* POST /api/tasks/schedules/:id/toggle - Toggle schedule enabled/disabled * POST /api/tasks/schedules/:id/toggle - Toggle schedule enabled/disabled
* *
* Note: "Run Now" was removed - use task priority instead.
* Higher priority tasks get picked up first (ORDER BY priority DESC).
*
* Note: Schedule routes are defined BEFORE /:id to avoid route conflicts * Note: Schedule routes are defined BEFORE /:id to avoid route conflicts
* (Express matches routes in order, and "schedules" would match /:id otherwise) * (Express matches routes in order, and "schedules" would match /:id otherwise)
*/ */
@@ -29,7 +31,21 @@ import {
TaskRole, TaskRole,
TaskStatus, TaskStatus,
TaskFilter, TaskFilter,
TaskSource,
} from '../tasks/task-service'; } from '../tasks/task-service';
/**
* Extract request metadata for source tracking
*/
function getRequestMetadata(req: Request): Record<string, unknown> {
return {
ip: req.ip || req.socket?.remoteAddress || 'unknown',
userAgent: req.get('user-agent') || 'unknown',
endpoint: req.originalUrl,
method: req.method,
timestamp: new Date().toISOString(),
};
}
import { pool } from '../db/pool'; import { pool } from '../db/pool';
import { import {
isTaskPoolPaused, isTaskPoolPaused,
@@ -524,146 +540,6 @@ router.delete('/schedules/:id', async (req: Request, res: Response) => {
} }
}); });
/**
* POST /api/tasks/schedules/:id/run-now
* Manually trigger a scheduled task to run immediately
*
* For product_discovery schedules with state_code, this creates individual
* tasks for each store in that state (fans out properly).
*/
router.post('/schedules/:id/run-now', async (req: Request, res: Response) => {
try {
const scheduleId = parseInt(req.params.id, 10);
// Get the full schedule
const scheduleResult = await pool.query(`
SELECT id, name, role, state_code, dispensary_id, platform, priority, interval_hours, method
FROM task_schedules WHERE id = $1
`, [scheduleId]);
if (scheduleResult.rows.length === 0) {
return res.status(404).json({ error: 'Schedule not found' });
}
const schedule = scheduleResult.rows[0];
let tasksCreated = 0;
const isCrawlRole = ['product_discovery', 'product_refresh', 'payload_fetch'].includes(schedule.role);
// Single-dispensary schedule (e.g., "Deeply Rooted Hourly")
if (isCrawlRole && schedule.dispensary_id) {
// Check if this specific store can be refreshed (no pending task)
const storeResult = await pool.query(`
SELECT d.id, d.name
FROM dispensaries d
WHERE d.id = $1
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role IN ('product_discovery', 'product_refresh', 'payload_fetch')
AND t.status IN ('pending', 'claimed', 'running')
)
`, [schedule.dispensary_id]);
if (storeResult.rows.length > 0) {
await taskService.createTask({
role: 'product_discovery',
dispensary_id: schedule.dispensary_id,
platform: schedule.platform || 'dutchie',
priority: schedule.priority + 10,
method: schedule.method || 'http',
});
tasksCreated = 1;
} else {
return res.json({
success: true,
message: `Store ${schedule.dispensary_id} has a pending task or is disabled`,
tasksCreated: 0,
dispensaryId: schedule.dispensary_id,
});
}
}
// Per-state schedule (e.g., "AZ Product Refresh")
else if (isCrawlRole && schedule.state_code) {
// Find stores in this state needing refresh
const storeResult = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND s.code = $1
-- No pending/running crawl task already
AND NOT EXISTS (
SELECT 1 FROM worker_tasks t
WHERE t.dispensary_id = d.id
AND t.role IN ('product_discovery', 'product_refresh', 'payload_fetch')
AND t.status IN ('pending', 'claimed', 'running')
)
ORDER BY d.last_fetch_at NULLS FIRST, d.id
`, [schedule.state_code]);
const dispensaryIds = storeResult.rows.map((r: { id: number }) => r.id);
if (dispensaryIds.length > 0) {
// Create staggered tasks for all stores (always use product_discovery role)
const result = await taskService.createStaggeredTasks(
dispensaryIds,
'product_discovery', // Normalize to product_discovery
15, // 15 seconds stagger
schedule.platform || 'dutchie',
schedule.method || 'http'
);
tasksCreated = result.created;
} else {
// No stores need refresh - return early with message
return res.json({
success: true,
message: `No ${schedule.state_code} stores need refresh at this time`,
tasksCreated: 0,
stateCode: schedule.state_code,
});
}
} else if (!isCrawlRole) {
// For other schedules (store_discovery, analytics_refresh), create a single task
await taskService.createTask({
role: schedule.role,
platform: schedule.platform,
priority: schedule.priority + 10,
method: schedule.method,
});
tasksCreated = 1;
} else {
// Crawl role without dispensary_id or state_code - reject
return res.status(400).json({
error: `${schedule.role} schedules require a dispensary_id or state_code`,
});
}
// Update last_run_at on the schedule
await pool.query(`
UPDATE task_schedules
SET last_run_at = NOW(),
next_run_at = NOW() + (interval_hours || ' hours')::interval,
last_task_count = $2,
updated_at = NOW()
WHERE id = $1
`, [scheduleId, tasksCreated]);
res.json({
success: true,
message: `Schedule "${schedule.name}" triggered`,
tasksCreated,
stateCode: schedule.state_code,
});
} catch (error: unknown) {
console.error('Error running schedule:', error);
res.status(500).json({ error: 'Failed to run schedule' });
}
});
/** /**
* POST /api/tasks/schedules/:id/toggle * POST /api/tasks/schedules/:id/toggle
* Toggle a schedule's enabled status * Toggle a schedule's enabled status
@@ -1275,10 +1151,15 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => {
*/ */
router.post('/batch/staggered', async (req: Request, res: Response) => { router.post('/batch/staggered', async (req: Request, res: Response) => {
try { try {
const requestMetadata = getRequestMetadata(req);
// Log the request for tracking phantom tasks
console.log(`[TaskAPI] POST /batch/staggered from ${requestMetadata.ip} (${requestMetadata.userAgent})`);
const { const {
dispensary_ids, dispensary_ids,
role, role,
stagger_seconds = 15, stagger_seconds = 0, // Default to 0 (no stagger) - worker controls pacing
platform = 'dutchie', platform = 'dutchie',
method = null, method = null,
} = req.body; } = req.body;
@@ -1291,12 +1172,18 @@ router.post('/batch/staggered', async (req: Request, res: Response) => {
return res.status(400).json({ error: 'role is required' }); return res.status(400).json({ error: 'role is required' });
} }
console.log(`[TaskAPI] Creating ${dispensary_ids.length} ${role} tasks for dispensaries: ${dispensary_ids.slice(0, 5).join(',')}...`);
const result = await taskService.createStaggeredTasks( const result = await taskService.createStaggeredTasks(
dispensary_ids, dispensary_ids,
role as TaskRole, role as TaskRole,
stagger_seconds, stagger_seconds,
platform, platform,
method method,
{
source: 'api_batch_staggered',
source_metadata: requestMetadata,
}
); );
const totalDuration = (result.created - 1) * stagger_seconds; const totalDuration = (result.created - 1) * stagger_seconds;
@@ -1320,49 +1207,6 @@ router.post('/batch/staggered', async (req: Request, res: Response) => {
} }
}); });
/**
* POST /api/tasks/batch/az-stores
* Convenience endpoint to create staggered tasks for Arizona stores
*
* Body:
* - total_tasks: number (default: 24) - Total tasks to create
* - stagger_seconds: number (default: 15) - Seconds between each task
* - split_roles: boolean (default: true) - Split between product_refresh and product_discovery
*/
router.post('/batch/az-stores', async (req: Request, res: Response) => {
try {
const {
total_tasks = 24,
stagger_seconds = 15,
split_roles = true,
} = req.body;
const result = await taskService.createAZStoreTasks(
total_tasks,
stagger_seconds,
split_roles
);
const totalDuration = (result.total - 1) * stagger_seconds;
const estimatedEndTime = new Date(Date.now() + totalDuration * 1000);
res.status(201).json({
success: true,
total: result.total,
product_refresh: result.product_refresh,
product_discovery: result.product_discovery,
task_ids: result.taskIds,
stagger_seconds,
total_duration_seconds: totalDuration,
estimated_completion: estimatedEndTime.toISOString(),
message: `Created ${result.total} staggered tasks for AZ stores (${result.product_refresh} refresh, ${result.product_discovery} discovery)`,
});
} catch (error: unknown) {
console.error('Error creating AZ store tasks:', error);
res.status(500).json({ error: 'Failed to create AZ store tasks' });
}
});
/** /**
* POST /api/tasks/batch/entry-point-discovery * POST /api/tasks/batch/entry-point-discovery
* Create entry_point_discovery tasks for stores missing platform_dispensary_id * Create entry_point_discovery tasks for stores missing platform_dispensary_id
@@ -1556,13 +1400,13 @@ router.post('/batch/store-discovery', async (req: Request, res: Response) => {
* Create product_discovery tasks for all stores in a state * Create product_discovery tasks for all stores in a state
* *
* This is the primary endpoint for triggering crawls by state. * This is the primary endpoint for triggering crawls by state.
* Creates staggered tasks for all crawl-enabled stores in the specified state. * Creates tasks for all crawl-enabled stores in the specified state.
* *
* Params: * Params:
* - stateCode: State code (e.g., 'AZ', 'CA', 'CO') * - stateCode: State code (e.g., 'AZ', 'CA', 'CO')
* *
* Body (optional): * Body (optional):
* - stagger_seconds: number (default: 15) - Seconds between each task * - stagger_seconds: number (default: 0) - Seconds between each task (0 = worker controls pacing)
* - priority: number (default: 10) - Task priority * - priority: number (default: 10) - Task priority
* - method: 'curl' | 'http' | null (default: 'http') * - method: 'curl' | 'http' | null (default: 'http')
* *
@@ -1574,8 +1418,13 @@ router.post('/batch/store-discovery', async (req: Request, res: Response) => {
router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => { router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => {
try { try {
const stateCode = req.params.stateCode.toUpperCase(); const stateCode = req.params.stateCode.toUpperCase();
const requestMetadata = getRequestMetadata(req);
// Log the request for tracking phantom tasks
console.log(`[TaskAPI] POST /crawl-state/${stateCode} from ${requestMetadata.ip} (${requestMetadata.userAgent})`);
const { const {
stagger_seconds = 15, stagger_seconds = 0, // Default to 0 (no stagger) - worker controls pacing
priority = 10, priority = 10,
method = 'http', method = 'http',
} = req.body; } = req.body;
@@ -1617,13 +1466,19 @@ router.post('/crawl-state/:stateCode', async (req: Request, res: Response) => {
const dispensaryIds = dispensariesResult.rows.map((d: { id: number }) => d.id); const dispensaryIds = dispensariesResult.rows.map((d: { id: number }) => d.id);
// Create staggered tasks console.log(`[TaskAPI] Creating ${dispensaryIds.length} product_discovery tasks for ${stateCode}`);
// Create tasks with source tracking
const result = await taskService.createStaggeredTasks( const result = await taskService.createStaggeredTasks(
dispensaryIds, dispensaryIds,
'product_discovery', 'product_discovery',
stagger_seconds, stagger_seconds,
'dutchie', 'dutchie',
method method,
{
source: 'api_crawl_state',
source_metadata: { ...requestMetadata, stateCode },
}
); );
const totalDuration = (result.created - 1) * stagger_seconds; const totalDuration = (result.created - 1) * stagger_seconds;

View File

@@ -316,13 +316,17 @@ class TaskScheduler {
} }
// Create product_discovery tasks with HTTP transport // Create product_discovery tasks with HTTP transport
// Stagger by 15 seconds to prevent overwhelming proxies // No stagger - worker controls pacing
const { created } = await taskService.createStaggeredTasks( const { created } = await taskService.createStaggeredTasks(
dispensaryIds, dispensaryIds,
'product_discovery', 'product_discovery',
15, // 15 seconds apart 0, // No stagger - worker controls pacing
schedule.platform || 'dutchie', schedule.platform || 'dutchie',
'http' // Force HTTP transport 'http', // Force HTTP transport
{
source: 'schedule',
source_schedule_id: schedule.id,
}
); );
return created; return created;
@@ -350,6 +354,8 @@ class TaskScheduler {
platform: schedule.platform || 'dutchie', platform: schedule.platform || 'dutchie',
priority: schedule.priority, priority: schedule.priority,
method: 'http', // Force HTTP transport for browser-based discovery method: 'http', // Force HTTP transport for browser-based discovery
source: 'schedule',
source_schedule_id: schedule.id,
}); });
return 1; return 1;
@@ -375,6 +381,8 @@ class TaskScheduler {
await taskService.createTask({ await taskService.createTask({
role: 'analytics_refresh', role: 'analytics_refresh',
priority: schedule.priority, priority: schedule.priority,
source: 'schedule',
source_schedule_id: schedule.id,
}); });
return 1; return 1;

View File

@@ -77,8 +77,21 @@ export interface CreateTaskParams {
method?: 'curl' | 'http'; // Transport method: curl=axios/proxy, http=Puppeteer/browser method?: 'curl' | 'http'; // Transport method: curl=axios/proxy, http=Puppeteer/browser
scheduled_for?: Date; scheduled_for?: Date;
payload?: Record<string, unknown>; // Per TASK_WORKFLOW_2024-12-10.md: For task chaining data payload?: Record<string, unknown>; // Per TASK_WORKFLOW_2024-12-10.md: For task chaining data
// Source tracking - helps identify where tasks come from
source?: TaskSource;
source_schedule_id?: number;
source_metadata?: Record<string, unknown>;
} }
// Task creation sources - all automated tasks should be traceable
// Note: "Run Now" was removed - use task priority instead
export type TaskSource =
| 'schedule' // Created by task-scheduler.ts from task_schedules
| 'api_crawl_state' // POST /api/tasks/crawl-state/:stateCode
| 'api_batch_staggered' // POST /api/tasks/batch/staggered
| 'task_chain' // Created by task chaining (e.g., store_discovery -> product_discovery)
| 'manual'; // Created via admin UI or direct API call
export interface CapacityMetrics { export interface CapacityMetrics {
role: string; role: string;
pending_tasks: number; pending_tasks: number;
@@ -108,8 +121,8 @@ class TaskService {
*/ */
async createTask(params: CreateTaskParams): Promise<WorkerTask> { async createTask(params: CreateTaskParams): Promise<WorkerTask> {
const result = await pool.query( const result = await pool.query(
`INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for, payload) `INSERT INTO worker_tasks (role, dispensary_id, platform, priority, method, scheduled_for, payload, source, source_schedule_id, source_metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
RETURNING *`, RETURNING *`,
[ [
params.role, params.role,
@@ -119,6 +132,9 @@ class TaskService {
params.method ?? null, // null = any worker can pick up, 'http' = http-capable workers only, 'curl' = curl workers only params.method ?? null, // null = any worker can pick up, 'http' = http-capable workers only, 'curl' = curl workers only
params.scheduled_for ?? null, params.scheduled_for ?? null,
params.payload ? JSON.stringify(params.payload) : null, params.payload ? JSON.stringify(params.payload) : null,
params.source ?? null,
params.source_schedule_id ?? null,
params.source_metadata ? JSON.stringify(params.source_metadata) : null,
] ]
); );
return result.rows[0] as WorkerTask; return result.rows[0] as WorkerTask;
@@ -712,16 +728,15 @@ class TaskService {
} }
/** /**
* Create multiple tasks with staggered start times. * Create multiple tasks for bulk processing.
* *
* STAGGERED TASK WORKFLOW: * BULK TASK WORKFLOW:
* ======================= * ===================
* This prevents resource contention and proxy assignment lag when creating * Creates tasks with scheduled_for = NOW() by default. Worker-level controls
* many tasks at once. Each task gets a scheduled_for timestamp offset from * handle pacing (MAX_CONCURRENT_TASKS, working hours, preflight checks).
* the previous task.
* *
* Workflow: * Workflow:
* 1. Task is created with scheduled_for = NOW() + (index * staggerSeconds) * 1. Task is created with scheduled_for = NOW() (or staggered if specified)
* 2. Worker claims task only when scheduled_for <= NOW() * 2. Worker claims task only when scheduled_for <= NOW()
* 3. Worker runs preflight check on EVERY task claim * 3. Worker runs preflight check on EVERY task claim
* 4. If preflight passes, worker executes task * 4. If preflight passes, worker executes task
@@ -729,15 +744,14 @@ class TaskService {
* 6. Worker finishes task, polls for next available task * 6. Worker finishes task, polls for next available task
* 7. Repeat - preflight runs again on next task claim * 7. Repeat - preflight runs again on next task claim
* *
* Benefits: * Worker-Level Throttling:
* - Prevents all 8 workers from hitting proxies simultaneously * - MAX_CONCURRENT_TASKS env var limits concurrent tasks per worker
* - Reduces API rate limiting / 403 errors * - Working hours configuration restricts when tasks run
* - Spreads resource usage over time * - Preflight checks ensure proxy health before each task
* - Each task still runs preflight, ensuring proxy health
* *
* @param dispensaryIds - Array of dispensary IDs to create tasks for * @param dispensaryIds - Array of dispensary IDs to create tasks for
* @param role - Task role (e.g., 'product_refresh', 'product_discovery') * @param role - Task role (e.g., 'product_refresh', 'product_discovery')
* @param staggerSeconds - Seconds between each task's scheduled_for time (default: 15) * @param staggerSeconds - Seconds between each task's scheduled_for time (default: 0 = all NOW())
* @param platform - Platform identifier (default: 'dutchie') * @param platform - Platform identifier (default: 'dutchie')
* @param method - Transport method: 'curl' or 'http' (default: null for any) * @param method - Transport method: 'curl' or 'http' (default: null for any)
* @returns Number of tasks created * @returns Number of tasks created
@@ -745,16 +759,26 @@ class TaskService {
async createStaggeredTasks( async createStaggeredTasks(
dispensaryIds: number[], dispensaryIds: number[],
role: TaskRole, role: TaskRole,
staggerSeconds: number = 15, staggerSeconds: number = 0,
platform: string = 'dutchie', platform: string = 'dutchie',
method: 'curl' | 'http' | null = null, method: 'curl' | 'http' | null = null,
options: { skipRecentHours?: number } = {} options: {
skipRecentHours?: number;
source?: TaskSource;
source_schedule_id?: number;
source_metadata?: Record<string, unknown>;
} = {}
): Promise<{ created: number; skipped: number; taskIds: number[] }> { ): Promise<{ created: number; skipped: number; taskIds: number[] }> {
if (dispensaryIds.length === 0) { if (dispensaryIds.length === 0) {
return { created: 0, skipped: 0, taskIds: [] }; return { created: 0, skipped: 0, taskIds: [] };
} }
const { skipRecentHours = 4 } = options; // Skip if completed within last 4 hours const {
skipRecentHours = 4,
source = null,
source_schedule_id = null,
source_metadata = null,
} = options;
// Filter out dispensaries that: // Filter out dispensaries that:
// 1. Already have a pending/claimed/running task for this role // 1. Already have a pending/claimed/running task for this role
@@ -786,17 +810,20 @@ class TaskService {
SELECT dispensary_id, ROW_NUMBER() OVER (ORDER BY dispensary_id) - 1 as idx SELECT dispensary_id, ROW_NUMBER() OVER (ORDER BY dispensary_id) - 1 as idx
FROM eligible_ids FROM eligible_ids
) )
INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status) INSERT INTO worker_tasks (role, dispensary_id, platform, method, scheduled_for, status, source, source_schedule_id, source_metadata)
SELECT SELECT
$2::varchar as role, $2::varchar as role,
n.dispensary_id, n.dispensary_id,
$3::varchar as platform, $3::varchar as platform,
$4::varchar as method, $4::varchar as method,
NOW() + (n.idx * $5::int * INTERVAL '1 second') as scheduled_for, NOW() + (n.idx * $5::int * INTERVAL '1 second') as scheduled_for,
'pending' as status 'pending' as status,
$7::varchar as source,
$8::int as source_schedule_id,
$9::jsonb as source_metadata
FROM numbered n FROM numbered n
RETURNING id RETURNING id
`, [dispensaryIds, role, platform, method, staggerSeconds, skipRecentHours]); `, [dispensaryIds, role, platform, method, staggerSeconds, skipRecentHours, source, source_schedule_id, source_metadata ? JSON.stringify(source_metadata) : null]);
const taskIds = result.rows.map((r: { id: number }) => r.id); const taskIds = result.rows.map((r: { id: number }) => r.id);
const skipped = dispensaryIds.length - taskIds.length; const skipped = dispensaryIds.length - taskIds.length;
@@ -810,112 +837,6 @@ class TaskService {
return { created: taskIds.length, skipped, taskIds }; return { created: taskIds.length, skipped, taskIds };
} }
/**
* Create a batch of AZ store tasks with automatic distribution.
*
* This is a convenience method for creating tasks for Arizona stores with:
* - Automatic staggering to prevent resource contention
* - Even distribution across both refresh and discovery roles
*
* @param totalTasks - Total number of tasks to create
* @param staggerSeconds - Seconds between each task's start time
* @param splitRoles - If true, split between product_refresh and product_discovery
* @returns Summary of created tasks
*/
async createAZStoreTasks(
totalTasks: number = 24,
staggerSeconds: number = 15,
splitRoles: boolean = true
): Promise<{
total: number;
product_refresh: number;
product_discovery: number;
taskIds: number[];
}> {
// Get AZ stores with platform_id and menu_url
const storesResult = await pool.query(`
SELECT d.id
FROM dispensaries d
JOIN states s ON d.state_id = s.id
WHERE s.code = 'AZ'
AND d.crawl_enabled = true
AND d.platform_dispensary_id IS NOT NULL
AND d.menu_url IS NOT NULL
ORDER BY d.id
`);
const storeIds = storesResult.rows.map((r: { id: number }) => r.id);
if (storeIds.length === 0) {
console.log('[TaskService] No AZ stores found with platform_id and menu_url');
return { total: 0, product_refresh: 0, product_discovery: 0, taskIds: [] };
}
// Limit tasks to available stores
const maxTasks = Math.min(totalTasks, storeIds.length * 2); // 2x for both roles
const allTaskIds: number[] = [];
if (splitRoles) {
// Split between refresh and discovery
const tasksPerRole = Math.floor(maxTasks / 2);
const refreshStores = storeIds.slice(0, tasksPerRole);
const discoveryStores = storeIds.slice(0, tasksPerRole);
// Create refresh tasks first
const refreshResult = await this.createStaggeredTasks(
refreshStores,
'product_refresh',
staggerSeconds,
'dutchie'
);
allTaskIds.push(...refreshResult.taskIds);
// Create discovery tasks starting after refresh tasks are scheduled
const discoveryStartOffset = tasksPerRole * staggerSeconds;
const discoveryResult = await pool.query(`
WITH task_data AS (
SELECT
unnest($1::int[]) as dispensary_id,
generate_series(0, array_length($1::int[], 1) - 1) as idx
)
INSERT INTO worker_tasks (role, dispensary_id, platform, scheduled_for, status)
SELECT
'product_discovery'::varchar as role,
td.dispensary_id,
'dutchie'::varchar as platform,
NOW() + ($2::int * INTERVAL '1 second') + (td.idx * $3::int * INTERVAL '1 second') as scheduled_for,
'pending' as status
FROM task_data td
ON CONFLICT DO NOTHING
RETURNING id
`, [discoveryStores, discoveryStartOffset, staggerSeconds]);
allTaskIds.push(...discoveryResult.rows.map((r: { id: number }) => r.id));
return {
total: allTaskIds.length,
product_refresh: refreshResult.taskIds.length,
product_discovery: discoveryResult.rowCount ?? 0,
taskIds: allTaskIds
};
}
// Single role mode - all product_discovery
const result = await this.createStaggeredTasks(
storeIds.slice(0, totalTasks),
'product_discovery',
staggerSeconds,
'dutchie'
);
return {
total: result.taskIds.length,
product_refresh: 0,
product_discovery: result.taskIds.length,
taskIds: result.taskIds
};
}
/** /**
* Cleanup stale tasks that are stuck in 'claimed' or 'running' status. * Cleanup stale tasks that are stuck in 'claimed' or 'running' status.
* *

View File

@@ -2683,8 +2683,6 @@ class ApiClient {
updateSchedule = this.updateDutchieAZSchedule.bind(this); updateSchedule = this.updateDutchieAZSchedule.bind(this);
/** @deprecated Use deleteTaskSchedule() - queries task_schedules table */ /** @deprecated Use deleteTaskSchedule() - queries task_schedules table */
deleteSchedule = this.deleteDutchieAZSchedule.bind(this); deleteSchedule = this.deleteDutchieAZSchedule.bind(this);
/** @deprecated Use runTaskScheduleNow() - queries task_schedules table */
triggerSchedule = this.triggerDutchieAZSchedule.bind(this);
/** @deprecated - job_schedules init not needed for task_schedules */ /** @deprecated - job_schedules init not needed for task_schedules */
initSchedules = this.initDutchieAZSchedules.bind(this); initSchedules = this.initDutchieAZSchedules.bind(this);
getScheduleLogs = this.getCrawlScheduleLogs.bind(this); getScheduleLogs = this.getCrawlScheduleLogs.bind(this);
@@ -3061,11 +3059,7 @@ class ApiClient {
); );
} }
async runTaskScheduleNow(id: number) { // Note: runTaskScheduleNow() was removed - use task priority instead
return this.request<{ success: boolean; message: string; tasksCreated?: number; stateCode?: string }>(`/api/tasks/schedules/${id}/run-now`, {
method: 'POST',
});
}
async toggleTaskSchedule(id: number) { async toggleTaskSchedule(id: number) {
return this.request<{ success: boolean; schedule: { id: number; name: string; enabled: boolean }; message: string }>( return this.request<{ success: boolean; schedule: { id: number; name: string; enabled: boolean }; message: string }>(

View File

@@ -130,15 +130,6 @@ export function CrawlSchedulePage() {
} }
}; };
const handleTriggerSchedule = async (id: number) => {
try {
await api.triggerSchedule(id);
await loadData();
} catch (error) {
console.error('Failed to trigger schedule:', error);
}
};
const handleToggleEnabled = async (schedule: JobSchedule) => { const handleToggleEnabled = async (schedule: JobSchedule) => {
try { try {
await api.updateSchedule(schedule.id, { enabled: !schedule.enabled }); await api.updateSchedule(schedule.id, { enabled: !schedule.enabled });
@@ -538,21 +529,6 @@ export function CrawlSchedulePage() {
</td> </td>
<td style={{ padding: '15px', textAlign: 'center' }}> <td style={{ padding: '15px', textAlign: 'center' }}>
<div style={{ display: 'flex', gap: '8px', justifyContent: 'center' }}> <div style={{ display: 'flex', gap: '8px', justifyContent: 'center' }}>
<button
onClick={() => handleTriggerSchedule(schedule.id)}
disabled={schedule.lastStatus === 'running'}
style={{
padding: '6px 12px',
background: schedule.lastStatus === 'running' ? '#94a3b8' : '#2563eb',
color: 'white',
border: 'none',
borderRadius: '4px',
cursor: schedule.lastStatus === 'running' ? 'not-allowed' : 'pointer',
fontSize: '13px'
}}
>
Run Now
</button>
<button <button
onClick={() => setEditingSchedule(schedule)} onClick={() => setEditingSchedule(schedule)}
style={{ style={{

View File

@@ -48,9 +48,12 @@ export function Dashboard() {
const [showDroppedAlert, setShowDroppedAlert] = useState(false); const [showDroppedAlert, setShowDroppedAlert] = useState(false);
useEffect(() => { useEffect(() => {
loadData(); // Run all initial data fetches in parallel
checkNotificationStatus(); Promise.all([
checkDroppedStores(); loadData(),
checkNotificationStatus(),
checkDroppedStores(),
]);
}, []); }, []);
const checkDroppedStores = async () => { const checkDroppedStores = async () => {
@@ -93,8 +96,13 @@ export function Dashboard() {
const loadData = async () => { const loadData = async () => {
try { try {
// Fetch dashboard data (primary data source) // Fetch all dashboard data IN PARALLEL for faster loading
const dashboard = await api.getMarketDashboard(); const [dashboard, activityData, nationalResponse, counts] = await Promise.all([
api.getMarketDashboard(),
api.getDashboardActivity().catch(() => null), // Activity may require auth
api.get('/api/analytics/national/summary').catch(() => null),
api.getTaskCounts().catch(() => null),
]);
// Map dashboard data to the expected stats format // Map dashboard data to the expected stats format
setStats({ setStats({
@@ -121,34 +129,13 @@ export function Dashboard() {
lastCrawlTime: dashboard.lastCrawlTime lastCrawlTime: dashboard.lastCrawlTime
}); });
// Try to fetch activity data (may fail if not authenticated) setActivity(activityData);
try {
const activityData = await api.getDashboardActivity(); if (nationalResponse?.data?.success && nationalResponse?.data?.data) {
setActivity(activityData); setNationalStats(nationalResponse.data.data);
} catch {
// Activity data requires auth, just skip it
setActivity(null);
} }
// Fetch national analytics summary setTaskCounts(counts);
try {
const response = await api.get('/api/analytics/national/summary');
if (response.data?.success && response.data?.data) {
setNationalStats(response.data.data);
}
} catch {
// National stats not critical, just skip
setNationalStats(null);
}
// Fetch task queue counts
try {
const counts = await api.getTaskCounts();
setTaskCounts(counts);
} catch {
// Task counts not critical, just skip
setTaskCounts(null);
}
} catch (error) { } catch (error) {
console.error('Failed to load dashboard:', error); console.error('Failed to load dashboard:', error);
} finally { } finally {

View File

@@ -180,15 +180,7 @@ function CreateTaskModal({ isOpen, onClose, onTaskCreated }: CreateTaskModalProp
platform: 'dutchie', platform: 'dutchie',
}); });
// Run the schedule immediately so tasks appear right away // Schedule will be picked up by scheduler on its next iteration
if (result?.id) {
try {
await api.runTaskScheduleNow(result.id);
} catch (runErr) {
console.warn('Schedule created but failed to run immediately:', runErr);
}
}
onTaskCreated(); onTaskCreated();
onClose(); onClose();
resetForm(); resetForm();
@@ -880,7 +872,6 @@ export default function TasksDashboard() {
const [selectedSchedules, setSelectedSchedules] = useState<Set<number>>(new Set()); const [selectedSchedules, setSelectedSchedules] = useState<Set<number>>(new Set());
const [editingSchedule, setEditingSchedule] = useState<TaskSchedule | null>(null); const [editingSchedule, setEditingSchedule] = useState<TaskSchedule | null>(null);
const [showScheduleModal, setShowScheduleModal] = useState(false); const [showScheduleModal, setShowScheduleModal] = useState(false);
const [runningScheduleId, setRunningScheduleId] = useState<number | null>(null);
// Pagination // Pagination
const [page, setPage] = useState(0); const [page, setPage] = useState(0);
@@ -976,21 +967,6 @@ export default function TasksDashboard() {
} }
}; };
const handleRunScheduleNow = async (scheduleId: number) => {
if (runningScheduleId !== null) return; // Prevent duplicate clicks
setRunningScheduleId(scheduleId);
try {
const result = await api.runTaskScheduleNow(scheduleId) as { success: boolean; message: string; tasksCreated?: number };
alert(result.message + (result.tasksCreated ? ` (${result.tasksCreated} tasks created)` : ''));
fetchData();
} catch (err: any) {
console.error('Run schedule error:', err);
alert(err.response?.data?.error || 'Failed to run schedule');
} finally {
setRunningScheduleId(null);
}
};
const toggleSelectSchedule = (id: number) => { const toggleSelectSchedule = (id: number) => {
setSelectedSchedules(prev => { setSelectedSchedules(prev => {
const next = new Set(prev); const next = new Set(prev);
@@ -1452,20 +1428,6 @@ export default function TasksDashboard() {
</td> </td>
<td className="px-4 py-3"> <td className="px-4 py-3">
<div className="flex items-center gap-1"> <div className="flex items-center gap-1">
<button
onClick={() => handleRunScheduleNow(schedule.id)}
disabled={runningScheduleId !== null}
className={`p-1.5 rounded transition-colors ${
runningScheduleId === schedule.id
? 'text-emerald-600 bg-emerald-50 cursor-wait'
: runningScheduleId !== null
? 'text-gray-300 cursor-not-allowed'
: 'text-gray-400 hover:text-emerald-600 hover:bg-emerald-50'
}`}
title={runningScheduleId === schedule.id ? 'Running...' : 'Run now'}
>
<PlayCircle className={`w-4 h-4 ${runningScheduleId === schedule.id ? 'animate-pulse' : ''}`} />
</button>
<button <button
onClick={() => handleToggleSchedule(schedule.id)} onClick={() => handleToggleSchedule(schedule.id)}
className={`p-1.5 rounded transition-colors ${ className={`p-1.5 rounded transition-colors ${