From ec6843dfd6d0323e4ff02f96604e83df11090169 Mon Sep 17 00:00:00 2001 From: Kelly Date: Sat, 13 Dec 2025 03:24:42 -0700 Subject: [PATCH] feat: Add working hours for natural traffic patterns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Workers check their timezone (from preflight IP geolocation) and current hour's weight probability to determine availability. This creates natural traffic patterns - more workers active during peak hours, fewer during off-peak. Tasks queue up at night and drain during the day. Migrations: - 099: working_hours table with hourly weights by profile - 100: Add timezone column to worker_registry - 101: Store timezone from preflight IP geolocation - 102: check_working_hours() function with probability roll 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/migrations/099_working_hours.sql | 68 +++++++++++ backend/migrations/100_worker_timezone.sql | 19 +++ .../101_worker_preflight_timezone.sql | 78 ++++++++++++ .../migrations/102_check_working_hours.sql | 114 ++++++++++++++++++ backend/src/tasks/task-worker.ts | 70 +++++++++++ 5 files changed, 349 insertions(+) create mode 100644 backend/migrations/099_working_hours.sql create mode 100644 backend/migrations/100_worker_timezone.sql create mode 100644 backend/migrations/101_worker_preflight_timezone.sql create mode 100644 backend/migrations/102_check_working_hours.sql diff --git a/backend/migrations/099_working_hours.sql b/backend/migrations/099_working_hours.sql new file mode 100644 index 00000000..8a7dd0f0 --- /dev/null +++ b/backend/migrations/099_working_hours.sql @@ -0,0 +1,68 @@ +-- Migration: 099_working_hours.sql +-- Description: Working hours profiles for natural traffic pattern simulation +-- Created: 2024-12-13 + +-- Working hours table: defines hourly activity weights to mimic natural traffic +CREATE TABLE IF NOT EXISTS working_hours ( + id SERIAL PRIMARY KEY, + name VARCHAR(50) UNIQUE NOT NULL, + description TEXT, + + -- Hour weights: {"0": 15, "1": 5, ..., "18": 100, ...} + -- Value = percent chance to trigger activity that hour (0-100) + hour_weights JSONB NOT NULL, + + -- Day-of-week multipliers (0=Sunday, 6=Saturday) + -- Optional adjustment for weekend vs weekday patterns + dow_weights JSONB DEFAULT '{"0": 90, "1": 100, "2": 100, "3": 100, "4": 100, "5": 110, "6": 95}', + + timezone VARCHAR(50) DEFAULT 'America/Phoenix', + enabled BOOLEAN DEFAULT true, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +-- Seed: Natural traffic pattern based on internet usage research +-- Optimized for cannabis dispensary browsing (lunch + after-work peaks) +INSERT INTO working_hours (name, description, timezone, hour_weights) VALUES ( + 'natural_traffic', + 'Mimics natural user browsing patterns - peaks at lunch and 5-7 PM', + 'America/Phoenix', + '{ + "0": 15, + "1": 5, + "2": 5, + "3": 5, + "4": 5, + "5": 10, + "6": 20, + "7": 30, + "8": 35, + "9": 45, + "10": 50, + "11": 60, + "12": 75, + "13": 65, + "14": 60, + "15": 70, + "16": 80, + "17": 95, + "18": 100, + "19": 100, + "20": 90, + "21": 70, + "22": 45, + "23": 25 + }'::jsonb +) ON CONFLICT (name) DO UPDATE SET + hour_weights = EXCLUDED.hour_weights, + description = EXCLUDED.description, + updated_at = NOW(); + +-- Index for quick lookups +CREATE INDEX IF NOT EXISTS idx_working_hours_name ON working_hours(name); +CREATE INDEX IF NOT EXISTS idx_working_hours_enabled ON working_hours(enabled); + +COMMENT ON TABLE working_hours IS 'Activity profiles for natural traffic simulation. Hour weights are percent chance (0-100) to trigger activity.'; +COMMENT ON COLUMN working_hours.hour_weights IS 'JSON object mapping hour (0-23) to percent chance (0-100). 100 = always run, 0 = never run.'; +COMMENT ON COLUMN working_hours.dow_weights IS 'Optional day-of-week multipliers. 0=Sunday. Applied as (hour_weight * dow_weight / 100).'; diff --git a/backend/migrations/100_worker_timezone.sql b/backend/migrations/100_worker_timezone.sql new file mode 100644 index 00000000..eaa87152 --- /dev/null +++ b/backend/migrations/100_worker_timezone.sql @@ -0,0 +1,19 @@ +-- Migration: 100_worker_timezone.sql +-- Description: Add timezone column to worker_registry for working hours support +-- Created: 2024-12-13 + +-- Add timezone column to worker_registry +-- Populated from preflight IP geolocation (e.g., 'America/New_York') +ALTER TABLE worker_registry +ADD COLUMN IF NOT EXISTS timezone VARCHAR(50); + +-- Add working_hours_id to link worker to a specific working hours profile +-- NULL means use default 'natural_traffic' profile +ALTER TABLE worker_registry +ADD COLUMN IF NOT EXISTS working_hours_id INTEGER REFERENCES working_hours(id); + +-- Index for workers by timezone (useful for capacity planning) +CREATE INDEX IF NOT EXISTS idx_worker_registry_timezone ON worker_registry(timezone); + +COMMENT ON COLUMN worker_registry.timezone IS 'IANA timezone from preflight IP geolocation (e.g., America/New_York)'; +COMMENT ON COLUMN worker_registry.working_hours_id IS 'Reference to working_hours profile. NULL uses default natural_traffic.'; diff --git a/backend/migrations/101_worker_preflight_timezone.sql b/backend/migrations/101_worker_preflight_timezone.sql new file mode 100644 index 00000000..3d08cce1 --- /dev/null +++ b/backend/migrations/101_worker_preflight_timezone.sql @@ -0,0 +1,78 @@ +-- Migration: 101_worker_preflight_timezone.sql +-- Description: Update update_worker_preflight to extract timezone from fingerprint +-- Created: 2024-12-13 + +CREATE OR REPLACE FUNCTION public.update_worker_preflight( + p_worker_id character varying, + p_transport character varying, + p_status character varying, + p_ip character varying DEFAULT NULL, + p_response_ms integer DEFAULT NULL, + p_error text DEFAULT NULL, + p_fingerprint jsonb DEFAULT NULL +) +RETURNS void +LANGUAGE plpgsql +AS $function$ +DECLARE + v_curl_status VARCHAR(20); + v_http_status VARCHAR(20); + v_overall_status VARCHAR(20); + v_timezone VARCHAR(50); +BEGIN + IF p_transport = 'curl' THEN + UPDATE worker_registry + SET + preflight_curl_status = p_status, + preflight_curl_at = NOW(), + preflight_curl_ms = p_response_ms, + preflight_curl_error = p_error, + curl_ip = p_ip, + updated_at = NOW() + WHERE worker_id = p_worker_id; + + ELSIF p_transport = 'http' THEN + -- Extract timezone from fingerprint JSON if present + v_timezone := p_fingerprint->>'detectedTimezone'; + + UPDATE worker_registry + SET + preflight_http_status = p_status, + preflight_http_at = NOW(), + preflight_http_ms = p_response_ms, + preflight_http_error = p_error, + http_ip = p_ip, + fingerprint_data = COALESCE(p_fingerprint, fingerprint_data), + -- Save extracted timezone + timezone = COALESCE(v_timezone, timezone), + updated_at = NOW() + WHERE worker_id = p_worker_id; + END IF; + + -- Update overall preflight status + SELECT preflight_curl_status, preflight_http_status + INTO v_curl_status, v_http_status + FROM worker_registry + WHERE worker_id = p_worker_id; + + -- Compute overall status + IF v_curl_status = 'passed' AND v_http_status = 'passed' THEN + v_overall_status := 'passed'; + ELSIF v_curl_status = 'passed' OR v_http_status = 'passed' THEN + v_overall_status := 'partial'; + ELSIF v_curl_status = 'failed' OR v_http_status = 'failed' THEN + v_overall_status := 'failed'; + ELSE + v_overall_status := 'pending'; + END IF; + + UPDATE worker_registry + SET + preflight_status = v_overall_status, + preflight_at = NOW() + WHERE worker_id = p_worker_id; +END; +$function$; + +COMMENT ON FUNCTION update_worker_preflight(varchar, varchar, varchar, varchar, integer, text, jsonb) + IS 'Updates worker preflight status and extracts timezone from fingerprint for working hours'; diff --git a/backend/migrations/102_check_working_hours.sql b/backend/migrations/102_check_working_hours.sql new file mode 100644 index 00000000..044d5725 --- /dev/null +++ b/backend/migrations/102_check_working_hours.sql @@ -0,0 +1,114 @@ +-- Migration: 102_check_working_hours.sql +-- Description: Function to check if worker should be available based on working hours +-- Created: 2024-12-13 + +-- Function to check if a worker should be available for work +-- Returns TRUE if worker passes the probability check for current hour +-- Returns FALSE if worker should sleep/skip this cycle +CREATE OR REPLACE FUNCTION check_working_hours( + p_worker_id VARCHAR, + p_profile_name VARCHAR DEFAULT 'natural_traffic' +) +RETURNS TABLE ( + is_available BOOLEAN, + current_hour INTEGER, + hour_weight INTEGER, + worker_timezone VARCHAR, + roll INTEGER, + reason TEXT +) +LANGUAGE plpgsql +AS $function$ +DECLARE + v_timezone VARCHAR(50); + v_hour INTEGER; + v_weight INTEGER; + v_dow INTEGER; + v_dow_weight INTEGER; + v_final_weight INTEGER; + v_roll INTEGER; + v_hour_weights JSONB; + v_dow_weights JSONB; + v_profile_enabled BOOLEAN; +BEGIN + -- Get worker's timezone (from preflight) + SELECT wr.timezone INTO v_timezone + FROM worker_registry wr + WHERE wr.worker_id = p_worker_id; + + -- Default to America/Phoenix if no timezone set + v_timezone := COALESCE(v_timezone, 'America/Phoenix'); + + -- Get current hour in worker's timezone + v_hour := EXTRACT(HOUR FROM NOW() AT TIME ZONE v_timezone)::INTEGER; + + -- Get day of week (0=Sunday) + v_dow := EXTRACT(DOW FROM NOW() AT TIME ZONE v_timezone)::INTEGER; + + -- Get working hours profile + SELECT wh.hour_weights, wh.dow_weights, wh.enabled + INTO v_hour_weights, v_dow_weights, v_profile_enabled + FROM working_hours wh + WHERE wh.name = p_profile_name AND wh.enabled = true; + + -- If profile not found or disabled, always available + IF v_hour_weights IS NULL THEN + RETURN QUERY SELECT + TRUE::BOOLEAN, + v_hour, + 100::INTEGER, + v_timezone, + 0::INTEGER, + 'Profile not found or disabled - defaulting to available'::TEXT; + RETURN; + END IF; + + -- Get hour weight (default to 50 if hour not specified) + v_weight := COALESCE((v_hour_weights->>v_hour::TEXT)::INTEGER, 50); + + -- Get day-of-week weight (default to 100) + v_dow_weight := COALESCE((v_dow_weights->>v_dow::TEXT)::INTEGER, 100); + + -- Calculate final weight (hour_weight * dow_weight / 100) + v_final_weight := (v_weight * v_dow_weight / 100); + + -- Roll the dice (0-99) + v_roll := floor(random() * 100)::INTEGER; + + -- Return result + RETURN QUERY SELECT + (v_roll < v_final_weight)::BOOLEAN AS is_available, + v_hour AS current_hour, + v_final_weight AS hour_weight, + v_timezone AS worker_timezone, + v_roll AS roll, + CASE + WHEN v_roll < v_final_weight THEN + format('Available: rolled %s < %s%% threshold', v_roll, v_final_weight) + ELSE + format('Sleeping: rolled %s >= %s%% threshold', v_roll, v_final_weight) + END AS reason; +END; +$function$; + +-- Simplified version that just returns boolean +CREATE OR REPLACE FUNCTION is_worker_available( + p_worker_id VARCHAR, + p_profile_name VARCHAR DEFAULT 'natural_traffic' +) +RETURNS BOOLEAN +LANGUAGE plpgsql +AS $function$ +DECLARE + v_result BOOLEAN; +BEGIN + SELECT is_available INTO v_result + FROM check_working_hours(p_worker_id, p_profile_name); + RETURN COALESCE(v_result, TRUE); +END; +$function$; + +COMMENT ON FUNCTION check_working_hours(VARCHAR, VARCHAR) IS + 'Check if worker should be available based on working hours profile. Returns detailed info.'; +COMMENT ON FUNCTION is_worker_available(VARCHAR, VARCHAR) IS + 'Simple boolean check if worker passes working hours probability roll.'; diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 76b65e0e..5df072d7 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -696,6 +696,57 @@ export class TaskWorker { this.isRetryingPreflight = false; } + /** + * Get the effective max concurrent tasks based on working hours. + * Uses the worker's timezone (from preflight IP geolocation) to determine + * the current hour's weight, then scales max concurrent tasks accordingly. + * + * This creates natural traffic patterns - workers run fewer concurrent + * tasks during off-peak hours, full capacity during peak hours. + * + * Example with MAX_CONCURRENT_TASKS = 3: + * 3 AM (5% weight): effectiveMax = max(1, round(3 × 0.05)) = 1 + * 12 PM (75% weight): effectiveMax = max(1, round(3 × 0.75)) = 2 + * 6 PM (100% weight): effectiveMax = max(1, round(3 × 1.00)) = 3 + */ + private async getEffectiveMaxTasks(): Promise<{ effectiveMax: number; hourWeight: number; reason: string }> { + try { + const result = await this.pool.query(` + SELECT current_hour, hour_weight, worker_timezone + FROM check_working_hours($1, 'natural_traffic') + `, [this.workerId]); + + if (result.rows.length === 0) { + // Function returned nothing - default to full capacity + return { + effectiveMax: this.maxConcurrentTasks, + hourWeight: 100, + reason: 'Working hours check returned no result - using full capacity' + }; + } + + const row = result.rows[0]; + const hourWeight = row.hour_weight || 100; + + // Scale max concurrent tasks by hour weight, minimum 1 + const effectiveMax = Math.max(1, Math.round(this.maxConcurrentTasks * hourWeight / 100)); + + return { + effectiveMax, + hourWeight, + reason: `Hour ${row.current_hour} (${row.worker_timezone}): ${hourWeight}% → ${effectiveMax}/${this.maxConcurrentTasks} slots` + }; + } catch (err: any) { + // On error, default to full capacity (don't block workers due to DB issues) + console.warn(`[TaskWorker] Working hours check failed: ${err.message} - using full capacity`); + return { + effectiveMax: this.maxConcurrentTasks, + hourWeight: 100, + reason: 'Working hours check error - using full capacity' + }; + } + } + /** * Lazy initialization of stealth systems. * Called BEFORE claiming first task (not at worker startup). @@ -1030,6 +1081,25 @@ export class TaskWorker { return; // Return to main loop, will re-check on next iteration } + // ================================================================= + // WORKING HOURS GATE - Simulate natural traffic patterns + // Workers scale their concurrent task limit based on the current + // hour's weight in their timezone. This creates natural throughput: + // 3 AM (5%): 1 slot → worker runs 1 task at a time + // 6 PM (100%): 3 slots → worker runs full capacity + // ================================================================= + const { effectiveMax, hourWeight, reason } = await this.getEffectiveMaxTasks(); + if (this.activeTasks.size >= effectiveMax) { + // Already at working hours capacity - wait before checking again + const sleepMs = 10000 + Math.random() * 20000; // 10-30 seconds + if (this.activeTasks.size > 0) { + // Only log if we're actually throttled (have tasks but can't take more) + console.log(`[TaskWorker] ${this.friendlyName} AT CAPACITY - ${reason} (${this.activeTasks.size}/${effectiveMax} active)`); + } + await this.sleep(sleepMs); + return; // Return to main loop, will re-check on next iteration + } + // Pass preflight capabilities to only claim compatible tasks const task = await taskService.claimTask( this.role,