feat: Add working hours for natural traffic patterns

Workers check their timezone (from preflight IP geolocation) and current
hour's weight probability to determine availability. This creates natural
traffic patterns - more workers active during peak hours, fewer during
off-peak. Tasks queue up at night and drain during the day.

Migrations:
- 099: working_hours table with hourly weights by profile
- 100: Add timezone column to worker_registry
- 101: Store timezone from preflight IP geolocation
- 102: check_working_hours() function with probability roll

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-13 03:24:42 -07:00
parent 268429b86c
commit ec6843dfd6
5 changed files with 349 additions and 0 deletions

View File

@@ -0,0 +1,68 @@
-- Migration: 099_working_hours.sql
-- Description: Working hours profiles for natural traffic pattern simulation
-- Created: 2024-12-13
-- Working hours table: defines hourly activity weights to mimic natural traffic
CREATE TABLE IF NOT EXISTS working_hours (
id SERIAL PRIMARY KEY,
name VARCHAR(50) UNIQUE NOT NULL,
description TEXT,
-- Hour weights: {"0": 15, "1": 5, ..., "18": 100, ...}
-- Value = percent chance to trigger activity that hour (0-100)
hour_weights JSONB NOT NULL,
-- Day-of-week multipliers (0=Sunday, 6=Saturday)
-- Optional adjustment for weekend vs weekday patterns
dow_weights JSONB DEFAULT '{"0": 90, "1": 100, "2": 100, "3": 100, "4": 100, "5": 110, "6": 95}',
timezone VARCHAR(50) DEFAULT 'America/Phoenix',
enabled BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Seed: Natural traffic pattern based on internet usage research
-- Optimized for cannabis dispensary browsing (lunch + after-work peaks)
INSERT INTO working_hours (name, description, timezone, hour_weights) VALUES (
'natural_traffic',
'Mimics natural user browsing patterns - peaks at lunch and 5-7 PM',
'America/Phoenix',
'{
"0": 15,
"1": 5,
"2": 5,
"3": 5,
"4": 5,
"5": 10,
"6": 20,
"7": 30,
"8": 35,
"9": 45,
"10": 50,
"11": 60,
"12": 75,
"13": 65,
"14": 60,
"15": 70,
"16": 80,
"17": 95,
"18": 100,
"19": 100,
"20": 90,
"21": 70,
"22": 45,
"23": 25
}'::jsonb
) ON CONFLICT (name) DO UPDATE SET
hour_weights = EXCLUDED.hour_weights,
description = EXCLUDED.description,
updated_at = NOW();
-- Index for quick lookups
CREATE INDEX IF NOT EXISTS idx_working_hours_name ON working_hours(name);
CREATE INDEX IF NOT EXISTS idx_working_hours_enabled ON working_hours(enabled);
COMMENT ON TABLE working_hours IS 'Activity profiles for natural traffic simulation. Hour weights are percent chance (0-100) to trigger activity.';
COMMENT ON COLUMN working_hours.hour_weights IS 'JSON object mapping hour (0-23) to percent chance (0-100). 100 = always run, 0 = never run.';
COMMENT ON COLUMN working_hours.dow_weights IS 'Optional day-of-week multipliers. 0=Sunday. Applied as (hour_weight * dow_weight / 100).';

View File

@@ -0,0 +1,19 @@
-- Migration: 100_worker_timezone.sql
-- Description: Add timezone column to worker_registry for working hours support
-- Created: 2024-12-13
-- Add timezone column to worker_registry
-- Populated from preflight IP geolocation (e.g., 'America/New_York')
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS timezone VARCHAR(50);
-- Add working_hours_id to link worker to a specific working hours profile
-- NULL means use default 'natural_traffic' profile
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS working_hours_id INTEGER REFERENCES working_hours(id);
-- Index for workers by timezone (useful for capacity planning)
CREATE INDEX IF NOT EXISTS idx_worker_registry_timezone ON worker_registry(timezone);
COMMENT ON COLUMN worker_registry.timezone IS 'IANA timezone from preflight IP geolocation (e.g., America/New_York)';
COMMENT ON COLUMN worker_registry.working_hours_id IS 'Reference to working_hours profile. NULL uses default natural_traffic.';

View File

@@ -0,0 +1,78 @@
-- Migration: 101_worker_preflight_timezone.sql
-- Description: Update update_worker_preflight to extract timezone from fingerprint
-- Created: 2024-12-13
CREATE OR REPLACE FUNCTION public.update_worker_preflight(
p_worker_id character varying,
p_transport character varying,
p_status character varying,
p_ip character varying DEFAULT NULL,
p_response_ms integer DEFAULT NULL,
p_error text DEFAULT NULL,
p_fingerprint jsonb DEFAULT NULL
)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
v_curl_status VARCHAR(20);
v_http_status VARCHAR(20);
v_overall_status VARCHAR(20);
v_timezone VARCHAR(50);
BEGIN
IF p_transport = 'curl' THEN
UPDATE worker_registry
SET
preflight_curl_status = p_status,
preflight_curl_at = NOW(),
preflight_curl_ms = p_response_ms,
preflight_curl_error = p_error,
curl_ip = p_ip,
updated_at = NOW()
WHERE worker_id = p_worker_id;
ELSIF p_transport = 'http' THEN
-- Extract timezone from fingerprint JSON if present
v_timezone := p_fingerprint->>'detectedTimezone';
UPDATE worker_registry
SET
preflight_http_status = p_status,
preflight_http_at = NOW(),
preflight_http_ms = p_response_ms,
preflight_http_error = p_error,
http_ip = p_ip,
fingerprint_data = COALESCE(p_fingerprint, fingerprint_data),
-- Save extracted timezone
timezone = COALESCE(v_timezone, timezone),
updated_at = NOW()
WHERE worker_id = p_worker_id;
END IF;
-- Update overall preflight status
SELECT preflight_curl_status, preflight_http_status
INTO v_curl_status, v_http_status
FROM worker_registry
WHERE worker_id = p_worker_id;
-- Compute overall status
IF v_curl_status = 'passed' AND v_http_status = 'passed' THEN
v_overall_status := 'passed';
ELSIF v_curl_status = 'passed' OR v_http_status = 'passed' THEN
v_overall_status := 'partial';
ELSIF v_curl_status = 'failed' OR v_http_status = 'failed' THEN
v_overall_status := 'failed';
ELSE
v_overall_status := 'pending';
END IF;
UPDATE worker_registry
SET
preflight_status = v_overall_status,
preflight_at = NOW()
WHERE worker_id = p_worker_id;
END;
$function$;
COMMENT ON FUNCTION update_worker_preflight(varchar, varchar, varchar, varchar, integer, text, jsonb)
IS 'Updates worker preflight status and extracts timezone from fingerprint for working hours';

View File

@@ -0,0 +1,114 @@
-- Migration: 102_check_working_hours.sql
-- Description: Function to check if worker should be available based on working hours
-- Created: 2024-12-13
-- Function to check if a worker should be available for work
-- Returns TRUE if worker passes the probability check for current hour
-- Returns FALSE if worker should sleep/skip this cycle
CREATE OR REPLACE FUNCTION check_working_hours(
p_worker_id VARCHAR,
p_profile_name VARCHAR DEFAULT 'natural_traffic'
)
RETURNS TABLE (
is_available BOOLEAN,
current_hour INTEGER,
hour_weight INTEGER,
worker_timezone VARCHAR,
roll INTEGER,
reason TEXT
)
LANGUAGE plpgsql
AS $function$
DECLARE
v_timezone VARCHAR(50);
v_hour INTEGER;
v_weight INTEGER;
v_dow INTEGER;
v_dow_weight INTEGER;
v_final_weight INTEGER;
v_roll INTEGER;
v_hour_weights JSONB;
v_dow_weights JSONB;
v_profile_enabled BOOLEAN;
BEGIN
-- Get worker's timezone (from preflight)
SELECT wr.timezone INTO v_timezone
FROM worker_registry wr
WHERE wr.worker_id = p_worker_id;
-- Default to America/Phoenix if no timezone set
v_timezone := COALESCE(v_timezone, 'America/Phoenix');
-- Get current hour in worker's timezone
v_hour := EXTRACT(HOUR FROM NOW() AT TIME ZONE v_timezone)::INTEGER;
-- Get day of week (0=Sunday)
v_dow := EXTRACT(DOW FROM NOW() AT TIME ZONE v_timezone)::INTEGER;
-- Get working hours profile
SELECT wh.hour_weights, wh.dow_weights, wh.enabled
INTO v_hour_weights, v_dow_weights, v_profile_enabled
FROM working_hours wh
WHERE wh.name = p_profile_name AND wh.enabled = true;
-- If profile not found or disabled, always available
IF v_hour_weights IS NULL THEN
RETURN QUERY SELECT
TRUE::BOOLEAN,
v_hour,
100::INTEGER,
v_timezone,
0::INTEGER,
'Profile not found or disabled - defaulting to available'::TEXT;
RETURN;
END IF;
-- Get hour weight (default to 50 if hour not specified)
v_weight := COALESCE((v_hour_weights->>v_hour::TEXT)::INTEGER, 50);
-- Get day-of-week weight (default to 100)
v_dow_weight := COALESCE((v_dow_weights->>v_dow::TEXT)::INTEGER, 100);
-- Calculate final weight (hour_weight * dow_weight / 100)
v_final_weight := (v_weight * v_dow_weight / 100);
-- Roll the dice (0-99)
v_roll := floor(random() * 100)::INTEGER;
-- Return result
RETURN QUERY SELECT
(v_roll < v_final_weight)::BOOLEAN AS is_available,
v_hour AS current_hour,
v_final_weight AS hour_weight,
v_timezone AS worker_timezone,
v_roll AS roll,
CASE
WHEN v_roll < v_final_weight THEN
format('Available: rolled %s < %s%% threshold', v_roll, v_final_weight)
ELSE
format('Sleeping: rolled %s >= %s%% threshold', v_roll, v_final_weight)
END AS reason;
END;
$function$;
-- Simplified version that just returns boolean
CREATE OR REPLACE FUNCTION is_worker_available(
p_worker_id VARCHAR,
p_profile_name VARCHAR DEFAULT 'natural_traffic'
)
RETURNS BOOLEAN
LANGUAGE plpgsql
AS $function$
DECLARE
v_result BOOLEAN;
BEGIN
SELECT is_available INTO v_result
FROM check_working_hours(p_worker_id, p_profile_name);
RETURN COALESCE(v_result, TRUE);
END;
$function$;
COMMENT ON FUNCTION check_working_hours(VARCHAR, VARCHAR) IS
'Check if worker should be available based on working hours profile. Returns detailed info.';
COMMENT ON FUNCTION is_worker_available(VARCHAR, VARCHAR) IS
'Simple boolean check if worker passes working hours probability roll.';

View File

@@ -696,6 +696,57 @@ export class TaskWorker {
this.isRetryingPreflight = false;
}
/**
* Get the effective max concurrent tasks based on working hours.
* Uses the worker's timezone (from preflight IP geolocation) to determine
* the current hour's weight, then scales max concurrent tasks accordingly.
*
* This creates natural traffic patterns - workers run fewer concurrent
* tasks during off-peak hours, full capacity during peak hours.
*
* Example with MAX_CONCURRENT_TASKS = 3:
* 3 AM (5% weight): effectiveMax = max(1, round(3 × 0.05)) = 1
* 12 PM (75% weight): effectiveMax = max(1, round(3 × 0.75)) = 2
* 6 PM (100% weight): effectiveMax = max(1, round(3 × 1.00)) = 3
*/
private async getEffectiveMaxTasks(): Promise<{ effectiveMax: number; hourWeight: number; reason: string }> {
try {
const result = await this.pool.query(`
SELECT current_hour, hour_weight, worker_timezone
FROM check_working_hours($1, 'natural_traffic')
`, [this.workerId]);
if (result.rows.length === 0) {
// Function returned nothing - default to full capacity
return {
effectiveMax: this.maxConcurrentTasks,
hourWeight: 100,
reason: 'Working hours check returned no result - using full capacity'
};
}
const row = result.rows[0];
const hourWeight = row.hour_weight || 100;
// Scale max concurrent tasks by hour weight, minimum 1
const effectiveMax = Math.max(1, Math.round(this.maxConcurrentTasks * hourWeight / 100));
return {
effectiveMax,
hourWeight,
reason: `Hour ${row.current_hour} (${row.worker_timezone}): ${hourWeight}% → ${effectiveMax}/${this.maxConcurrentTasks} slots`
};
} catch (err: any) {
// On error, default to full capacity (don't block workers due to DB issues)
console.warn(`[TaskWorker] Working hours check failed: ${err.message} - using full capacity`);
return {
effectiveMax: this.maxConcurrentTasks,
hourWeight: 100,
reason: 'Working hours check error - using full capacity'
};
}
}
/**
* Lazy initialization of stealth systems.
* Called BEFORE claiming first task (not at worker startup).
@@ -1030,6 +1081,25 @@ export class TaskWorker {
return; // Return to main loop, will re-check on next iteration
}
// =================================================================
// WORKING HOURS GATE - Simulate natural traffic patterns
// Workers scale their concurrent task limit based on the current
// hour's weight in their timezone. This creates natural throughput:
// 3 AM (5%): 1 slot → worker runs 1 task at a time
// 6 PM (100%): 3 slots → worker runs full capacity
// =================================================================
const { effectiveMax, hourWeight, reason } = await this.getEffectiveMaxTasks();
if (this.activeTasks.size >= effectiveMax) {
// Already at working hours capacity - wait before checking again
const sleepMs = 10000 + Math.random() * 20000; // 10-30 seconds
if (this.activeTasks.size > 0) {
// Only log if we're actually throttled (have tasks but can't take more)
console.log(`[TaskWorker] ${this.friendlyName} AT CAPACITY - ${reason} (${this.activeTasks.size}/${effectiveMax} active)`);
}
await this.sleep(sleepMs);
return; // Return to main loop, will re-check on next iteration
}
// Pass preflight capabilities to only claim compatible tasks
const task = await taskService.claimTask(
this.role,