feat(tasks): Task tracking, IP-per-store, and schedule edit fixes

- Add task completion verification with DB and output layers
- Add reconciliation loop to sync worker memory with DB state
- Implement IP-per-store-per-platform conflict detection
- Add task ID hash to MinIO payload filenames for traceability
- Fix schedule edit modal with dispensary info in API responses
- Add task ID display after dispensary name in worker dashboard
- Add migrations for proxy_ip and source tracking columns

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-14 10:49:21 -07:00
parent 3e9667571f
commit 9518ca48a5
12 changed files with 546 additions and 29 deletions

View File

@@ -0,0 +1,17 @@
-- Migration: Add proxy_ip tracking to worker_tasks
-- Purpose: Prevent same IP from hitting multiple stores on same platform simultaneously
--
-- Anti-detection measure: Dutchie/Jane may flag if same IP makes requests
-- for multiple different stores. This column lets us track and prevent that.
-- Add proxy_ip column to track which proxy IP is being used for each task
ALTER TABLE worker_tasks ADD COLUMN IF NOT EXISTS proxy_ip VARCHAR(45);
-- Index for quick lookup of active tasks by proxy IP
-- Used to check: "Is this IP already hitting another store?"
CREATE INDEX IF NOT EXISTS idx_worker_tasks_proxy_ip_active
ON worker_tasks (proxy_ip, platform)
WHERE status IN ('claimed', 'running') AND proxy_ip IS NOT NULL;
-- Comment
COMMENT ON COLUMN worker_tasks.proxy_ip IS 'Proxy IP assigned to this task. Used to prevent same IP hitting multiple stores on same platform.';

View File

@@ -0,0 +1,16 @@
-- Migration: Add source tracking columns to worker_tasks
-- Purpose: Track where tasks originated from (schedule, API, manual)
-- Add source tracking columns
ALTER TABLE worker_tasks ADD COLUMN IF NOT EXISTS source VARCHAR(50);
ALTER TABLE worker_tasks ADD COLUMN IF NOT EXISTS source_schedule_id INTEGER REFERENCES task_schedules(id);
ALTER TABLE worker_tasks ADD COLUMN IF NOT EXISTS source_metadata JSONB;
-- Index for tracking tasks by schedule
CREATE INDEX IF NOT EXISTS idx_worker_tasks_source_schedule
ON worker_tasks (source_schedule_id) WHERE source_schedule_id IS NOT NULL;
-- Comments
COMMENT ON COLUMN worker_tasks.source IS 'Origin of task: schedule, api, manual, chain';
COMMENT ON COLUMN worker_tasks.source_schedule_id IS 'ID of schedule that created this task';
COMMENT ON COLUMN worker_tasks.source_metadata IS 'Additional metadata about task origin';

View File

@@ -225,23 +225,27 @@ router.get('/schedules', async (req: Request, res: Response) => {
query = ` query = `
SELECT ts.id, ts.name, ts.role, ts.description, ts.enabled, ts.interval_hours, SELECT ts.id, ts.name, ts.role, ts.description, ts.enabled, ts.interval_hours,
ts.priority, ts.state_code, ts.pool_id, tp.display_name as pool_name, ts.priority, ts.state_code, ts.pool_id, tp.display_name as pool_name,
ts.dispensary_id, d.name as dispensary_name,
ts.platform, ts.method, ts.platform, ts.method,
COALESCE(ts.is_immutable, false) as is_immutable, COALESCE(ts.is_immutable, false) as is_immutable,
ts.last_run_at, ts.next_run_at, ts.last_run_at, ts.next_run_at,
ts.last_task_count, ts.last_error, ts.created_at, ts.updated_at ts.last_task_count, ts.last_error, ts.created_at, ts.updated_at
FROM task_schedules ts FROM task_schedules ts
LEFT JOIN task_pools tp ON tp.id = ts.pool_id LEFT JOIN task_pools tp ON tp.id = ts.pool_id
LEFT JOIN dispensaries d ON d.id = ts.dispensary_id
`; `;
} else { } else {
// Fallback query without pool_id (migration 114 not yet run) // Fallback query without pool_id (migration 114 not yet run)
query = ` query = `
SELECT ts.id, ts.name, ts.role, ts.description, ts.enabled, ts.interval_hours, SELECT ts.id, ts.name, ts.role, ts.description, ts.enabled, ts.interval_hours,
ts.priority, ts.state_code, NULL::integer as pool_id, NULL::text as pool_name, ts.priority, ts.state_code, NULL::integer as pool_id, NULL::text as pool_name,
ts.dispensary_id, d.name as dispensary_name,
ts.platform, ts.method, ts.platform, ts.method,
COALESCE(ts.is_immutable, false) as is_immutable, COALESCE(ts.is_immutable, false) as is_immutable,
ts.last_run_at, ts.next_run_at, ts.last_run_at, ts.next_run_at,
ts.last_task_count, ts.last_error, ts.created_at, ts.updated_at ts.last_task_count, ts.last_error, ts.created_at, ts.updated_at
FROM task_schedules ts FROM task_schedules ts
LEFT JOIN dispensaries d ON d.id = ts.dispensary_id
`; `;
} }
@@ -534,13 +538,23 @@ router.put('/schedules/:id', async (req: Request, res: Response) => {
SET ${updates.join(', ')} SET ${updates.join(', ')}
WHERE id = $${paramIndex} WHERE id = $${paramIndex}
RETURNING id, name, role, description, enabled, interval_hours, RETURNING id, name, role, description, enabled, interval_hours,
priority, state_code, platform, method, priority, state_code, platform, method, dispensary_id, pool_id,
COALESCE(is_immutable, false) as is_immutable, COALESCE(is_immutable, false) as is_immutable,
last_run_at, next_run_at, last_run_at, next_run_at,
last_task_count, last_error, created_at, updated_at last_task_count, last_error, created_at, updated_at
`, values); `, values);
res.json(result.rows[0]); // Add dispensary_name if dispensary_id is set
const updatedSchedule = result.rows[0];
if (updatedSchedule.dispensary_id) {
const dispResult = await pool.query(
'SELECT name FROM dispensaries WHERE id = $1',
[updatedSchedule.dispensary_id]
);
updatedSchedule.dispensary_name = dispResult.rows[0]?.name || null;
}
res.json(updatedSchedule);
} catch (error: any) { } catch (error: any) {
if (error.code === '23505') { if (error.code === '23505') {
return res.status(409).json({ error: 'A schedule with this name already exists' }); return res.status(409).json({ error: 'A schedule with this name already exists' });

View File

@@ -382,7 +382,9 @@ export async function handleProductDiscoveryDutchie(ctx: TaskContext): Promise<T
dispensaryId, dispensaryId,
rawPayload, rawPayload,
null, // crawl_run_id - not using crawl_runs in new system null, // crawl_run_id - not using crawl_runs in new system
result.products.length result.products.length,
'dutchie',
task.id // task ID for traceability
); );
console.log(`[ProductDiscoveryHTTP] Saved payload #${payloadResult.id} (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`); console.log(`[ProductDiscoveryHTTP] Saved payload #${payloadResult.id} (${(payloadResult.sizeBytes / 1024).toFixed(1)}KB)`);

View File

@@ -104,7 +104,8 @@ export async function handleProductDiscoveryJane(ctx: TaskContext): Promise<Task
rawPayload, rawPayload,
null, // crawl_run_id null, // crawl_run_id
result.products.length, result.products.length,
'jane' // platform 'jane', // platform
task.id // task ID for traceability
); );
console.log(`[JaneProductDiscovery] Saved payload ${payloadId} (${Math.round(sizeBytes / 1024)}KB)`); console.log(`[JaneProductDiscovery] Saved payload ${payloadId} (${Math.round(sizeBytes / 1024)}KB)`);

View File

@@ -121,7 +121,8 @@ export async function handleProductDiscoveryTreez(ctx: TaskContext): Promise<Tas
rawPayload, rawPayload,
null, // crawl_run_id null, // crawl_run_id
result.products.length, result.products.length,
'treez' // platform 'treez', // platform
task.id // task ID for traceability
); );
console.log(`[TreezProductDiscovery] Saved payload ${payloadId} (${Math.round(sizeBytes / 1024)}KB)`); console.log(`[TreezProductDiscovery] Saved payload ${payloadId} (${Math.round(sizeBytes / 1024)}KB)`);

View File

@@ -259,15 +259,29 @@ class TaskService {
} }
/** /**
* Mark a task as completed * Mark a task as completed with verification
* Returns true if completion was verified in DB, false otherwise
*/ */
async completeTask(taskId: number, result?: Record<string, unknown>): Promise<void> { async completeTask(taskId: number, result?: Record<string, unknown>): Promise<boolean> {
await pool.query( await pool.query(
`UPDATE worker_tasks `UPDATE worker_tasks
SET status = 'completed', completed_at = NOW(), result = $2 SET status = 'completed', completed_at = NOW(), result = $2
WHERE id = $1`, WHERE id = $1`,
[taskId, result ? JSON.stringify(result) : null] [taskId, result ? JSON.stringify(result) : null]
); );
// Verify completion was recorded
const verify = await pool.query(
`SELECT status FROM worker_tasks WHERE id = $1`,
[taskId]
);
if (verify.rows[0]?.status !== 'completed') {
console.error(`[TaskService] Task ${taskId} completion NOT VERIFIED - DB shows status: ${verify.rows[0]?.status}`);
return false;
}
return true;
} }
/** /**
@@ -288,6 +302,19 @@ class TaskService {
console.log(`[TaskService] Task ${taskId} released back to pending`); console.log(`[TaskService] Task ${taskId} released back to pending`);
} }
/**
* Verify a task claim by checking DB state matches expected
* Returns true if task is claimed by the specified worker
*/
async verifyTaskClaim(taskId: number, workerId: string): Promise<boolean> {
const result = await pool.query(
`SELECT id FROM worker_tasks
WHERE id = $1 AND worker_id = $2 AND status = 'claimed'`,
[taskId, workerId]
);
return result.rows.length > 0;
}
/** /**
* Determine if an error is a "soft failure" (transient) that should be requeued * Determine if an error is a "soft failure" (transient) that should be requeued
* Soft failures: timeouts, connection issues, browser launch issues * Soft failures: timeouts, connection issues, browser launch issues
@@ -871,7 +898,7 @@ class TaskService {
* @param staleMinutes - Tasks older than this (based on last_heartbeat_at or claimed_at) are reset * @param staleMinutes - Tasks older than this (based on last_heartbeat_at or claimed_at) are reset
* @returns Object with cleanup stats * @returns Object with cleanup stats
*/ */
async cleanupStaleTasks(staleMinutes: number = 30): Promise<{ async cleanupStaleTasks(staleMinutes: number = 3): Promise<{
cleaned: number; cleaned: number;
byStatus: { claimed: number; running: number }; byStatus: { claimed: number; running: number };
byRole: Record<string, number>; byRole: Record<string, number>;
@@ -941,6 +968,214 @@ class TaskService {
return Math.ceil(totalTaskCapacityNeeded / tasksPerWorkerHour); return Math.ceil(totalTaskCapacityNeeded / tasksPerWorkerHour);
} }
/**
* Get all active tasks (claimed/running) for a specific worker
* Used for reconciliation between worker memory and DB state
*/
async getWorkerActiveTasks(workerId: string): Promise<{ id: number; role: TaskRole; dispensary_id: number | null; claimed_at: Date }[]> {
const result = await pool.query(
`SELECT id, role, dispensary_id, claimed_at
FROM worker_tasks
WHERE worker_id = $1 AND status IN ('claimed', 'running')`,
[workerId]
);
return result.rows;
}
/**
* Verify task output actually exists for critical task types
* Checks that the task produced expected artifacts (MinIO payload, DB updates, etc.)
*
* @param task - The completed task to verify
* @returns true if output verified, false if output verification failed
*/
async verifyTaskOutput(task: WorkerTask): Promise<{ verified: boolean; reason?: string }> {
if (!task.dispensary_id) {
// Non-dispensary tasks don't need output verification
return { verified: true };
}
const claimedAt = task.claimed_at || task.created_at;
switch (task.role) {
case 'product_refresh':
case 'product_discovery': {
// Verify payload was saved to raw_crawl_payloads after task was claimed
const payloadResult = await pool.query(
`SELECT id, product_count, fetched_at
FROM raw_crawl_payloads
WHERE dispensary_id = $1
AND fetched_at > $2
ORDER BY fetched_at DESC
LIMIT 1`,
[task.dispensary_id, claimedAt]
);
if (payloadResult.rows.length === 0) {
return {
verified: false,
reason: `No payload found in raw_crawl_payloads for dispensary ${task.dispensary_id} after ${claimedAt}`
};
}
const payload = payloadResult.rows[0];
// Verify products were actually updated
const productResult = await pool.query(
`SELECT COUNT(*)::int as count
FROM store_products
WHERE dispensary_id = $1
AND updated_at > $2`,
[task.dispensary_id, claimedAt]
);
const productsUpdated = productResult.rows[0]?.count || 0;
// If payload has products but none were updated, something went wrong
if (payload.product_count > 0 && productsUpdated === 0) {
return {
verified: false,
reason: `Payload has ${payload.product_count} products but 0 store_products were updated`
};
}
return { verified: true };
}
case 'payload_fetch': {
// Verify payload was saved
const result = await pool.query(
`SELECT id FROM raw_crawl_payloads
WHERE dispensary_id = $1 AND fetched_at > $2
LIMIT 1`,
[task.dispensary_id, claimedAt]
);
if (result.rows.length === 0) {
return {
verified: false,
reason: `No payload found for dispensary ${task.dispensary_id} after ${claimedAt}`
};
}
return { verified: true };
}
default:
// Other task types - trust the handler
return { verified: true };
}
}
/**
* Release a task with a reason (for orphan reconciliation)
*/
async releaseTaskWithReason(taskId: number, reason: string): Promise<void> {
await pool.query(
`UPDATE worker_tasks
SET status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
proxy_ip = NULL,
error_message = CONCAT(COALESCE(error_message, ''), ' [', $2, ']'),
updated_at = NOW()
WHERE id = $1 AND status IN ('claimed', 'running')`,
[taskId, reason]
);
console.log(`[TaskService] Task ${taskId} released: ${reason}`);
}
// ==========================================================================
// IP-PER-STORE TRACKING
// ==========================================================================
// Prevents same proxy IP from hitting multiple stores on same platform.
// Anti-detection measure: platforms may flag same IP accessing multiple stores.
// ==========================================================================
/**
* Check if a proxy IP is already being used for another store on the same platform
*
* @param proxyIp - The proxy IP to check
* @param platform - The platform (dutchie, jane, treez)
* @param dispensaryId - The store we want to use this IP for
* @returns Object with inUse flag and details if conflicting
*/
async checkProxyIpConflict(
proxyIp: string,
platform: string,
dispensaryId: number
): Promise<{ inUse: boolean; conflictingTask?: { id: number; dispensary_id: number; dispensary_name?: string } }> {
const result = await pool.query(
`SELECT t.id, t.dispensary_id, d.name as dispensary_name
FROM worker_tasks t
LEFT JOIN dispensaries d ON d.id = t.dispensary_id
WHERE t.proxy_ip = $1
AND t.platform = $2
AND t.dispensary_id != $3
AND t.status IN ('claimed', 'running')
LIMIT 1`,
[proxyIp, platform, dispensaryId]
);
if (result.rows.length > 0) {
return {
inUse: true,
conflictingTask: result.rows[0]
};
}
return { inUse: false };
}
/**
* Assign a proxy IP to a task
* Should be called after claiming task and selecting proxy, before executing
*
* @param taskId - The task to assign IP to
* @param proxyIp - The proxy IP being used
* @returns true if assigned successfully
*/
async assignProxyIp(taskId: number, proxyIp: string): Promise<boolean> {
const result = await pool.query(
`UPDATE worker_tasks
SET proxy_ip = $2, updated_at = NOW()
WHERE id = $1 AND status IN ('claimed', 'running')
RETURNING id`,
[taskId, proxyIp]
);
return result.rowCount !== null && result.rowCount > 0;
}
/**
* Clear proxy IP from a task (on completion or release)
*/
async clearProxyIp(taskId: number): Promise<void> {
await pool.query(
`UPDATE worker_tasks SET proxy_ip = NULL WHERE id = $1`,
[taskId]
);
}
/**
* Get list of proxy IPs currently in use for a platform
* Useful for proxy selection to avoid conflicts
*
* @param platform - The platform to check
* @returns Array of proxy IPs currently in use
*/
async getActiveProxyIps(platform: string): Promise<string[]> {
const result = await pool.query(
`SELECT DISTINCT proxy_ip
FROM worker_tasks
WHERE platform = $1
AND status IN ('claimed', 'running')
AND proxy_ip IS NOT NULL`,
[platform]
);
return result.rows.map(r => r.proxy_ip);
}
} }
export const taskService = new TaskService(); export const taskService = new TaskService();

View File

@@ -292,6 +292,7 @@ export class TaskWorker {
private heartbeatInterval: NodeJS.Timeout | null = null; private heartbeatInterval: NodeJS.Timeout | null = null;
private registryHeartbeatInterval: NodeJS.Timeout | null = null; private registryHeartbeatInterval: NodeJS.Timeout | null = null;
private staleCleanupInterval: NodeJS.Timeout | null = null; private staleCleanupInterval: NodeJS.Timeout | null = null;
private reconciliationInterval: NodeJS.Timeout | null = null;
private crawlRotator: CrawlRotator; private crawlRotator: CrawlRotator;
// ========================================================================== // ==========================================================================
@@ -1379,7 +1380,7 @@ export class TaskWorker {
private async runStaleTaskCleanup(): Promise<void> { private async runStaleTaskCleanup(): Promise<void> {
try { try {
console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`); console.log(`[TaskWorker] ${this.friendlyName} running stale task cleanup...`);
const cleanupResult = await taskService.cleanupStaleTasks(30); // 30 minute threshold const cleanupResult = await taskService.cleanupStaleTasks(3); // 3 minute threshold - fast release of orphaned tasks
if (cleanupResult.cleaned > 0) { if (cleanupResult.cleaned > 0) {
console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`); console.log(`[TaskWorker] Cleaned up ${cleanupResult.cleaned} stale tasks`);
} }
@@ -1410,6 +1411,78 @@ export class TaskWorker {
} }
} }
// ==========================================================================
// TASK RECONCILIATION
// ==========================================================================
// Periodically reconcile worker's in-memory activeTasks with DB state.
// This catches orphaned tasks where DB says worker has task but worker
// doesn't know about it (memory lost, crash recovery, etc.)
// ==========================================================================
/**
* Start periodic task reconciliation
*/
private startPeriodicReconciliation(): void {
const RECONCILIATION_INTERVAL_MS = 30 * 1000; // 30 seconds
this.reconciliationInterval = setInterval(async () => {
await this.reconcileActiveTasks();
}, RECONCILIATION_INTERVAL_MS);
console.log(`[TaskWorker] ${this.friendlyName} started periodic task reconciliation (every 30s)`);
}
/**
* Stop periodic task reconciliation
*/
private stopPeriodicReconciliation(): void {
if (this.reconciliationInterval) {
clearInterval(this.reconciliationInterval);
this.reconciliationInterval = null;
}
}
/**
* Reconcile in-memory activeTasks with DB state
*
* This ensures worker's view matches DB truth:
* - Tasks in memory but not in DB → remove from memory (someone else took it)
* - Tasks in DB but not in memory → ORPHAN - release back to pool
*/
private async reconcileActiveTasks(): Promise<void> {
try {
// Get all tasks DB says this worker has claimed/running
const dbTasks = await taskService.getWorkerActiveTasks(this.workerId);
const dbTaskIds = new Set(dbTasks.map(t => t.id));
const memoryTaskIds = new Set(this.activeTasks.keys());
// Tasks in memory but not in DB → remove from tracking
for (const taskId of memoryTaskIds) {
if (!dbTaskIds.has(taskId)) {
console.warn(`[TaskWorker] ${this.friendlyName} RECONCILIATION: Task ${taskId} in memory but not in DB - removing from tracking`);
this.activeTasks.delete(taskId);
this.taskPromises.delete(taskId);
this.clearTaskStep(taskId);
}
}
// Tasks in DB but not in memory → ORPHAN - release back to pool
for (const dbTask of dbTasks) {
if (!memoryTaskIds.has(dbTask.id)) {
console.error(`[TaskWorker] ${this.friendlyName} RECONCILIATION: Task ${dbTask.id} claimed in DB but not tracked - releasing as orphan`);
await taskService.releaseTaskWithReason(dbTask.id, `Orphan: worker ${this.workerId} has no memory of task`);
}
}
// Log reconciliation result if any issues found
const orphansFound = dbTasks.filter(t => !memoryTaskIds.has(t.id)).length;
const staleMemory = Array.from(memoryTaskIds).filter(id => !dbTaskIds.has(id)).length;
if (orphansFound > 0 || staleMemory > 0) {
console.log(`[TaskWorker] ${this.friendlyName} RECONCILIATION complete: ${orphansFound} orphans released, ${staleMemory} stale memory entries removed`);
}
} catch (error: any) {
console.error(`[TaskWorker] ${this.friendlyName} reconciliation error:`, error.message);
}
}
/** /**
* Start the worker loop * Start the worker loop
* *
@@ -1435,6 +1508,9 @@ export class TaskWorker {
// Start periodic cleanup every 10 minutes // Start periodic cleanup every 10 minutes
this.startPeriodicStaleCleanup(); this.startPeriodicStaleCleanup();
// Start periodic reconciliation every 30 seconds
this.startPeriodicReconciliation();
const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)'; const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)';
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (stealth=lazy, max ${this.maxConcurrentTasks} concurrent tasks)`); console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (stealth=lazy, max ${this.maxConcurrentTasks} concurrent tasks)`);
@@ -1617,6 +1693,16 @@ export class TaskWorker {
if (task) { if (task) {
console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`); console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`);
// =================================================================
// CLAIM VERIFICATION - Ensure DB state matches before tracking
// Prevents orphaned tasks when claim succeeds in DB but worker loses track
// =================================================================
const claimVerified = await taskService.verifyTaskClaim(task.id, this.workerId);
if (!claimVerified) {
console.error(`[TaskWorker] ${this.friendlyName} CLAIM VERIFICATION FAILED for task ${task.id} - DB state mismatch, skipping`);
return; // Don't track this task - something went wrong
}
// ================================================================= // =================================================================
// PREFLIGHT CHECK - Use stored preflight results based on task method // PREFLIGHT CHECK - Use stored preflight results based on task method
// We already ran dual-transport preflights at startup, so just verify // We already ran dual-transport preflights at startup, so just verify
@@ -1655,16 +1741,41 @@ export class TaskWorker {
console.log(`[TaskWorker] ${this.friendlyName} preflight verified for task ${task.id}: ${preflightMsg}`); console.log(`[TaskWorker] ${this.friendlyName} preflight verified for task ${task.id}: ${preflightMsg}`);
// =================================================================
// IP-PER-STORE CHECK - Prevent same IP hitting multiple stores
// =================================================================
const currentProxyIp = this.preflightHttpResult?.proxyIp || this.preflightCurlResult?.proxyIp;
if (currentProxyIp && task.dispensary_id && task.platform) {
const ipConflict = await taskService.checkProxyIpConflict(
currentProxyIp,
task.platform,
task.dispensary_id
);
if (ipConflict.inUse) {
console.log(`[TaskWorker] ${this.friendlyName} IP CONFLICT: ${currentProxyIp} already hitting ${ipConflict.conflictingTask?.dispensary_name || ipConflict.conflictingTask?.dispensary_id} on ${task.platform}`);
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - will retry with different timing`);
await taskService.releaseTask(task.id);
await this.sleep(5000); // Short wait before trying another task
return;
}
// No conflict - assign IP to this task
await taskService.assignProxyIp(task.id, currentProxyIp);
}
this.activeTasks.set(task.id, task); this.activeTasks.set(task.id, task);
// Start task in background (don't await) // Start task in background (don't await)
const taskPromise = this.executeTask(task); const taskPromise = this.executeTask(task);
this.taskPromises.set(task.id, taskPromise); this.taskPromises.set(task.id, taskPromise);
// Clean up when done // Clean up when done (including proxy IP)
taskPromise.finally(() => { taskPromise.finally(async () => {
this.activeTasks.delete(task.id); this.activeTasks.delete(task.id);
this.taskPromises.delete(task.id); this.taskPromises.delete(task.id);
// Clear proxy IP so other tasks can use it
await taskService.clearProxyIp(task.id).catch(() => {});
}); });
// Immediately try to claim more tasks (don't wait for poll interval) // Immediately try to claim more tasks (don't wait for poll interval)
@@ -1826,6 +1937,28 @@ export class TaskWorker {
// Mark task as running // Mark task as running
await WorkerSession.startTask(nextTask.task_id, this.workerId); await WorkerSession.startTask(nextTask.task_id, this.workerId);
// =================================================================
// IP-PER-STORE CHECK - Prevent same IP hitting multiple stores
// =================================================================
const currentProxyIp = this.preflightHttpResult?.proxyIp || this.currentSession?.ip_address;
if (currentProxyIp && workerTask.dispensary_id && workerTask.platform) {
const ipConflict = await taskService.checkProxyIpConflict(
currentProxyIp,
workerTask.platform,
workerTask.dispensary_id
);
if (ipConflict.inUse) {
console.log(`[TaskWorker] ${this.friendlyName} IP CONFLICT: ${currentProxyIp} already hitting ${ipConflict.conflictingTask?.dispensary_name} on ${workerTask.platform}`);
console.log(`[TaskWorker] Skipping task ${nextTask.task_id} - will retry later`);
await this.sleep(5000);
return;
}
// No conflict - assign IP to this task
await taskService.assignProxyIp(nextTask.task_id, currentProxyIp);
}
console.log(`[TaskWorker] ${this.friendlyName} starting task ${nextTask.task_id} (${nextTask.role}) for ${nextTask.dispensary_name}`); console.log(`[TaskWorker] ${this.friendlyName} starting task ${nextTask.task_id} (${nextTask.role}) for ${nextTask.dispensary_name}`);
this.activeTasks.set(nextTask.task_id, workerTask); this.activeTasks.set(nextTask.task_id, workerTask);
@@ -1834,10 +1967,11 @@ export class TaskWorker {
const taskPromise = this.executeSessionTask(workerTask); const taskPromise = this.executeSessionTask(workerTask);
this.taskPromises.set(nextTask.task_id, taskPromise); this.taskPromises.set(nextTask.task_id, taskPromise);
// Cleanup when done // Cleanup when done (including proxy IP)
taskPromise.finally(() => { taskPromise.finally(async () => {
this.activeTasks.delete(nextTask.task_id); this.activeTasks.delete(nextTask.task_id);
this.taskPromises.delete(nextTask.task_id); this.taskPromises.delete(nextTask.task_id);
await taskService.clearProxyIp(nextTask.task_id).catch(() => {});
}); });
} }
@@ -2020,6 +2154,28 @@ export class TaskWorker {
updated_at: new Date(), updated_at: new Date(),
}; };
// =================================================================
// IP-PER-STORE CHECK - Prevent same IP hitting multiple stores
// =================================================================
const currentProxyIp = this.preflightHttpResult?.proxyIp;
if (currentProxyIp && workerTask.dispensary_id && workerTask.platform) {
const ipConflict = await taskService.checkProxyIpConflict(
currentProxyIp,
workerTask.platform,
workerTask.dispensary_id
);
if (ipConflict.inUse) {
console.log(`[TaskWorker] ${this.friendlyName} IP CONFLICT: ${currentProxyIp} already hitting ${ipConflict.conflictingTask?.dispensary_name} on ${workerTask.platform}`);
console.log(`[TaskWorker] Skipping task ${nextTask.task_id} - will retry later`);
await this.sleep(5000);
return;
}
// No conflict - assign IP to this task
await taskService.assignProxyIp(nextTask.task_id, currentProxyIp);
}
console.log(`[TaskWorker] ${this.friendlyName} starting task ${nextTask.task_id} (${nextTask.role}) for ${nextTask.dispensary_name}`); console.log(`[TaskWorker] ${this.friendlyName} starting task ${nextTask.task_id} (${nextTask.role}) for ${nextTask.dispensary_name}`);
this.activeTasks.set(nextTask.task_id, workerTask); this.activeTasks.set(nextTask.task_id, workerTask);
@@ -2028,12 +2184,13 @@ export class TaskWorker {
const taskPromise = this.executePoolTask(workerTask); const taskPromise = this.executePoolTask(workerTask);
this.taskPromises.set(nextTask.task_id, taskPromise); this.taskPromises.set(nextTask.task_id, taskPromise);
// Cleanup when done // Cleanup when done (including proxy IP)
taskPromise.finally(() => { taskPromise.finally(async () => {
this.activeTasks.delete(nextTask.task_id); this.activeTasks.delete(nextTask.task_id);
this.taskPromises.delete(nextTask.task_id); this.taskPromises.delete(nextTask.task_id);
// Remove from poolTasks // Remove from poolTasks
this.poolTasks = this.poolTasks.filter(t => t.task_id !== nextTask.task_id); this.poolTasks = this.poolTasks.filter(t => t.task_id !== nextTask.task_id);
await taskService.clearProxyIp(nextTask.task_id).catch(() => {});
}); });
} }
@@ -2091,8 +2248,23 @@ export class TaskWorker {
this.clearTaskStep(task.id); this.clearTaskStep(task.id);
if (result.success) { if (result.success) {
await taskService.completeTask(task.id, { result: result.data }); // Layer 1: Verify DB completion was recorded
console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED`); const dbVerified = await taskService.completeTask(task.id, { result: result.data });
if (!dbVerified) {
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} DB COMPLETION VERIFICATION FAILED`);
await taskService.failTask(task.id, 'DB completion verification failed');
return;
}
// Layer 2: Verify output exists for payload-creating tasks
const outputVerification = await taskService.verifyTaskOutput(task);
if (!outputVerification.verified) {
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} OUTPUT VERIFICATION FAILED: ${outputVerification.reason}`);
await taskService.failTask(task.id, `Output verification failed: ${outputVerification.reason}`);
return;
}
console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED (verified)`);
} else { } else {
await taskService.failTask(task.id, result.error || 'Unknown error'); await taskService.failTask(task.id, result.error || 'Unknown error');
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} FAILED: ${result.error}`); console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} FAILED: ${result.error}`);
@@ -2140,8 +2312,18 @@ export class TaskWorker {
this.clearTaskStep(task.id); this.clearTaskStep(task.id);
if (result.success) { if (result.success) {
// Use WorkerSession completion but also verify with taskService
await WorkerSession.completeTask(task.id, this.workerId, { result: result.data }); await WorkerSession.completeTask(task.id, this.workerId, { result: result.data });
console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED`);
// Verify output exists for payload-creating tasks
const outputVerification = await taskService.verifyTaskOutput(task);
if (!outputVerification.verified) {
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} OUTPUT VERIFICATION FAILED: ${outputVerification.reason}`);
await WorkerSession.failTask(task.id, this.workerId, `Output verification failed: ${outputVerification.reason}`);
return;
}
console.log(`[TaskWorker] ${this.friendlyName} task ${task.id} COMPLETED (verified)`);
} else { } else {
await WorkerSession.failTask(task.id, this.workerId, result.error); await WorkerSession.failTask(task.id, this.workerId, result.error);
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} FAILED: ${result.error}`); console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} FAILED: ${result.error}`);
@@ -2161,6 +2343,7 @@ export class TaskWorker {
this.stopHeartbeat(); this.stopHeartbeat();
this.stopRegistryHeartbeat(); this.stopRegistryHeartbeat();
this.stopPeriodicStaleCleanup(); this.stopPeriodicStaleCleanup();
this.stopPeriodicReconciliation();
// Clean up session pool state if using new system // Clean up session pool state if using new system
if (USE_SESSION_POOL && this.currentSession) { if (USE_SESSION_POOL && this.currentSession) {
@@ -2217,10 +2400,30 @@ export class TaskWorker {
// Clear step tracking // Clear step tracking
this.clearTaskStep(task.id); this.clearTaskStep(task.id);
// Mark as completed // =================================================================
await taskService.completeTask(task.id, result); // COMPLETION VERIFICATION - Ensure task actually completed
// =================================================================
// Layer 1: Verify DB completion was recorded
const dbVerified = await taskService.completeTask(task.id, result);
if (!dbVerified) {
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} DB COMPLETION VERIFICATION FAILED`);
await taskService.failTask(task.id, 'DB completion verification failed');
await this.reportTaskCompletion(false);
return;
}
// Layer 2: Verify output exists for payload-creating tasks
const outputVerification = await taskService.verifyTaskOutput(task);
if (!outputVerification.verified) {
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} OUTPUT VERIFICATION FAILED: ${outputVerification.reason}`);
await taskService.failTask(task.id, `Output verification failed: ${outputVerification.reason}`);
await this.reportTaskCompletion(false);
return;
}
await this.reportTaskCompletion(true); await this.reportTaskCompletion(true);
console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id} [${this.activeTasks.size}/${this.maxConcurrentTasks} active]`); console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id} (verified) [${this.activeTasks.size}/${this.maxConcurrentTasks} active]`);
// Track identity task count (for identity pool rotation) // Track identity task count (for identity pool rotation)
if (USE_IDENTITY_POOL) { if (USE_IDENTITY_POOL) {

View File

@@ -102,28 +102,40 @@ export interface LoadPayloadResult {
}; };
} }
/**
* Generate a short hash from task ID for filename inclusion
* Uses last 5 chars of base36 representation for brevity
*/
function taskIdHash(taskId: number | null): string {
if (!taskId) return '';
// Convert to base36 and take last 5 chars for short but unique identifier
return taskId.toString(36).slice(-5).padStart(5, '0');
}
/** /**
* Generate storage path/key for a payload * Generate storage path/key for a payload
* *
* MinIO format: payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz * MinIO format: payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_t{task_hash}_{timestamp}.json.gz
* Local format: ./storage/payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_{timestamp}.json.gz * Local format: ./storage/payloads/{platform}/{year}/{month}/{day}/store_{dispensary_id}_t{task_hash}_{timestamp}.json.gz
* *
* Platform defaults to 'dutchie' for backward compatibility * Platform defaults to 'dutchie' for backward compatibility
* Task ID is encoded as a 5-char base36 hash for traceability
*/ */
function generateStoragePath(dispensaryId: number, timestamp: Date, platform: string = 'dutchie'): string { function generateStoragePath(dispensaryId: number, timestamp: Date, platform: string = 'dutchie', taskId: number | null = null): string {
const year = timestamp.getFullYear(); const year = timestamp.getFullYear();
const month = String(timestamp.getMonth() + 1).padStart(2, '0'); const month = String(timestamp.getMonth() + 1).padStart(2, '0');
const day = String(timestamp.getDate()).padStart(2, '0'); const day = String(timestamp.getDate()).padStart(2, '0');
const ts = timestamp.getTime(); const ts = timestamp.getTime();
const taskHash = taskId ? `_t${taskIdHash(taskId)}` : '';
const relativePath = `payloads/${platform}/${year}/${month}/${day}/store_${dispensaryId}_${ts}.json.gz`; const relativePath = `payloads/${platform}/${year}/${month}/${day}/store_${dispensaryId}${taskHash}_${ts}.json.gz`;
if (useMinIO) { if (useMinIO) {
// MinIO uses forward slashes, no leading slash // MinIO uses forward slashes, no leading slash
return relativePath; return relativePath;
} else { } else {
// Local filesystem uses OS-specific path // Local filesystem uses OS-specific path
return path.join(PAYLOAD_BASE_PATH, platform, String(year), month, day, `store_${dispensaryId}_${ts}.json.gz`); return path.join(PAYLOAD_BASE_PATH, platform, String(year), month, day, `store_${dispensaryId}${taskHash}_${ts}.json.gz`);
} }
} }
@@ -151,6 +163,7 @@ function calculateChecksum(data: Buffer): string {
* @param crawlRunId - Optional crawl_run ID for linking * @param crawlRunId - Optional crawl_run ID for linking
* @param productCount - Number of products in payload * @param productCount - Number of products in payload
* @param platform - Platform identifier ('dutchie' | 'jane'), defaults to 'dutchie' * @param platform - Platform identifier ('dutchie' | 'jane'), defaults to 'dutchie'
* @param taskId - Optional task ID for traceability in filename
* @returns SavePayloadResult with file info and DB record ID * @returns SavePayloadResult with file info and DB record ID
*/ */
export async function saveRawPayload( export async function saveRawPayload(
@@ -159,10 +172,11 @@ export async function saveRawPayload(
payload: any, payload: any,
crawlRunId: number | null = null, crawlRunId: number | null = null,
productCount: number = 0, productCount: number = 0,
platform: string = 'dutchie' platform: string = 'dutchie',
taskId: number | null = null
): Promise<SavePayloadResult> { ): Promise<SavePayloadResult> {
const timestamp = new Date(); const timestamp = new Date();
const storagePath = generateStoragePath(dispensaryId, timestamp, platform); const storagePath = generateStoragePath(dispensaryId, timestamp, platform, taskId);
// Serialize and compress // Serialize and compress
const jsonStr = JSON.stringify(payload); const jsonStr = JSON.stringify(payload);

View File

@@ -3140,6 +3140,8 @@ export interface TaskSchedule {
interval_hours: number; interval_hours: number;
priority: number; priority: number;
state_code: string | null; state_code: string | null;
dispensary_id: number | null;
dispensary_name: string | null;
pool_id: number | null; pool_id: number | null;
pool_name: string | null; pool_name: string | null;
platform: string | null; platform: string | null;

View File

@@ -708,6 +708,15 @@ function ScheduleEditModal({ isOpen, schedule, onClose, onSave }: ScheduleEditMo
</div> </div>
)} )}
{/* Show dispensary if this is a per-store schedule */}
{schedule?.dispensary_id && (
<div className="bg-blue-50 border border-blue-200 rounded-lg p-3">
<div className="text-sm text-blue-800">
<span className="font-medium">Store:</span> {schedule.dispensary_name || `ID ${schedule.dispensary_id}`}
</div>
</div>
)}
<div> <div>
<label className="block text-sm font-medium text-gray-700 mb-1">Name *</label> <label className="block text-sm font-medium text-gray-700 mb-1">Name *</label>
<input <input

View File

@@ -634,9 +634,12 @@ function TaskCountBadge({ worker, tasks }: { worker: Worker; tasks: Task[] }) {
title={`Task #${task.task_id}: ${task.role}\n${task.dispensary?.name || 'Unknown'}\nRunning: ${formatSecondsToTime(task.running_seconds)}`} title={`Task #${task.task_id}: ${task.role}\n${task.dispensary?.name || 'Unknown'}\nRunning: ${formatSecondsToTime(task.running_seconds)}`}
> >
<div className="w-1.5 h-1.5 rounded-full bg-blue-500 animate-pulse" /> <div className="w-1.5 h-1.5 rounded-full bg-blue-500 animate-pulse" />
<span className="text-gray-600 truncate max-w-[120px]"> <span className="text-gray-600 truncate max-w-[140px]">
{task.dispensary?.name?.split(' ').slice(0, 2).join(' ') || task.role.replace(/_/g, ' ')} {task.dispensary?.name?.split(' ').slice(0, 2).join(' ') || task.role.replace(/_/g, ' ')}
</span> </span>
<span className="text-gray-400 text-[10px]">
#{task.task_id}
</span>
<span className="text-gray-400 ml-auto whitespace-nowrap"> <span className="text-gray-400 ml-auto whitespace-nowrap">
{formatSecondsToTime(task.running_seconds)} {formatSecondsToTime(task.running_seconds)}
</span> </span>