feat: Stealth worker system with mandatory proxy rotation

## Worker System
- Role-agnostic workers that can handle any task type
- Pod-based architecture with StatefulSet (5-15 pods, 5 workers each)
- Custom pod names (Aethelgard, Xylos, Kryll, etc.)
- Worker registry with friendly names and resource monitoring
- Hub-and-spoke visualization on JobQueue page

## Stealth & Anti-Detection (REQUIRED)
- Proxies are MANDATORY - workers fail to start without active proxies
- CrawlRotator initializes on worker startup
- Loads proxies from `proxies` table
- Auto-rotates proxy + fingerprint on 403 errors
- 12 browser fingerprints (Chrome, Firefox, Safari, Edge)
- Locale/timezone matching for geographic consistency

## Task System
- Renamed product_resync → product_refresh
- Task chaining: store_discovery → entry_point → product_discovery
- Priority-based claiming with FOR UPDATE SKIP LOCKED
- Heartbeat and stale task recovery

## UI Updates
- JobQueue: Pod visualization, resource monitoring on hover
- WorkersDashboard: Simplified worker list
- Removed unused filters from task list

## Other
- IP2Location service for visitor analytics
- Findagram consumer features scaffolding
- Documentation updates

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-10 00:44:59 -07:00
parent 0295637ed6
commit 56cc171287
61 changed files with 8591 additions and 2076 deletions

3
backend/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
# IP2Location database (downloaded separately)
data/ip2location/

View File

@@ -275,6 +275,22 @@ Store metadata:
---
## Worker Roles
Workers pull tasks from the `worker_tasks` queue based on their assigned role.
| Role | Name | Description | Handler |
|------|------|-------------|---------|
| `product_resync` | Product Resync | Re-crawl dispensary products for price/stock changes | `handleProductResync` |
| `product_discovery` | Product Discovery | Initial product discovery for new dispensaries | `handleProductDiscovery` |
| `store_discovery` | Store Discovery | Discover new dispensary locations | `handleStoreDiscovery` |
| `entry_point_discovery` | Entry Point Discovery | Resolve platform IDs from menu URLs | `handleEntryPointDiscovery` |
| `analytics_refresh` | Analytics Refresh | Refresh materialized views and analytics | `handleAnalyticsRefresh` |
**API Endpoint:** `GET /api/worker-registry/roles`
---
## Scheduling
Crawls are scheduled via `worker_tasks` table:
@@ -282,8 +298,219 @@ Crawls are scheduled via `worker_tasks` table:
| Role | Frequency | Description |
|------|-----------|-------------|
| `product_resync` | Every 4 hours | Regular product refresh |
| `product_discovery` | On-demand | First crawl for new stores |
| `entry_point_discovery` | On-demand | New store setup |
| `store_discovery` | Daily | Find new stores |
| `analytics_refresh` | Daily | Refresh analytics materialized views |
---
## Priority & On-Demand Tasks
Tasks are claimed by workers in order of **priority DESC, created_at ASC**.
### Priority Levels
| Priority | Use Case | Example |
|----------|----------|---------|
| 0 | Scheduled/batch tasks | Daily product_resync generation |
| 10 | On-demand/chained tasks | entry_point → product_discovery |
| Higher | Urgent/manual triggers | Admin-triggered immediate crawl |
### Task Chaining
When a task completes, the system automatically creates follow-up tasks:
```
store_discovery (completed)
└─► entry_point_discovery (priority: 10) for each new store
entry_point_discovery (completed, success)
└─► product_discovery (priority: 10) for that store
product_discovery (completed)
└─► [no chain] Store enters regular resync schedule
```
### On-Demand Task Creation
Use the task service to create high-priority tasks:
```typescript
// Create immediate product resync for a store
await taskService.createTask({
role: 'product_resync',
dispensary_id: 123,
platform: 'dutchie',
priority: 20, // Higher than batch tasks
});
// Convenience methods with default high priority (10)
await taskService.createEntryPointTask(dispensaryId, 'dutchie');
await taskService.createProductDiscoveryTask(dispensaryId, 'dutchie');
await taskService.createStoreDiscoveryTask('dutchie', 'AZ');
```
### Claim Function
The `claim_task()` SQL function atomically claims tasks:
- Respects priority ordering (higher = first)
- Uses `FOR UPDATE SKIP LOCKED` for concurrency
- Prevents multiple active tasks per store
---
## Image Storage
Images are downloaded from Dutchie's AWS S3 and stored locally with on-demand resizing.
### Storage Path
```
/storage/images/products/<state>/<store>/<brand>/<product_id>/image-<hash>.webp
/storage/images/brands/<brand>/logo-<hash>.webp
```
**Example:**
```
/storage/images/products/az/az-deeply-rooted/bud-bros/6913e3cd444eac3935e928b9/image-ae38b1f9.webp
```
### Image Proxy API
Served via `/img/*` with on-demand resizing using **sharp**:
```
GET /img/products/az/az-deeply-rooted/bud-bros/6913e3cd444eac3935e928b9/image-ae38b1f9.webp?w=200
```
| Param | Description |
|-------|-------------|
| `w` | Width in pixels (max 4000) |
| `h` | Height in pixels (max 4000) |
| `q` | Quality 1-100 (default 80) |
| `fit` | cover, contain, fill, inside, outside |
| `blur` | Blur sigma (0.3-1000) |
| `gray` | Grayscale (1 = enabled) |
| `format` | webp, jpeg, png, avif (default webp) |
### Key Files
| File | Purpose |
|------|---------|
| `src/utils/image-storage.ts` | Download & save images to local filesystem |
| `src/routes/image-proxy.ts` | On-demand resize/transform at `/img/*` |
### Download Rules
| Scenario | Image Action |
|----------|--------------|
| **New product (first crawl)** | Download if `primaryImageUrl` exists |
| **Existing product (refresh)** | Download only if `local_image_path` is NULL (backfill) |
| **Product already has local image** | Skip download entirely |
**Logic:**
- Images are downloaded **once** and never re-downloaded on subsequent crawls
- `skipIfExists: true` - filesystem check prevents re-download even if queued
- First crawl: all products get images
- Refresh crawl: only new products or products missing local images
### Storage Rules
- **NO MinIO** - local filesystem only (`STORAGE_DRIVER=local`)
- Store full resolution, resize on-demand via `/img` proxy
- Convert to webp for consistency using **sharp**
- Preserve original Dutchie URL as fallback in `image_url` column
- Local path stored in `local_image_path` column
---
## Stealth & Anti-Detection
**PROXIES ARE REQUIRED** - Workers will fail to start if no active proxies are available in the database. All HTTP requests to Dutchie go through a proxy.
Workers automatically initialize anti-detection systems on startup.
### Components
| Component | Purpose | Source |
|-----------|---------|--------|
| **CrawlRotator** | Coordinates proxy + UA rotation | `src/services/crawl-rotator.ts` |
| **ProxyRotator** | Round-robin proxy selection, health tracking | `src/services/crawl-rotator.ts` |
| **UserAgentRotator** | Cycles through realistic browser fingerprints | `src/services/crawl-rotator.ts` |
| **Dutchie Client** | Curl-based HTTP with auto-retry on 403 | `src/platforms/dutchie/client.ts` |
### Initialization Flow
```
Worker Start
├─► initializeStealth()
│ │
│ ├─► CrawlRotator.initialize()
│ │ └─► Load proxies from `proxies` table
│ │
│ └─► setCrawlRotator(rotator)
│ └─► Wire to Dutchie client
└─► Process tasks...
```
### Stealth Session (per task)
Each crawl task starts a stealth session:
```typescript
// In product-refresh.ts, entry-point-discovery.ts
const session = startSession(dispensary.state || 'AZ', 'America/Phoenix');
```
This creates a new identity with:
- **Random fingerprint:** Chrome/Firefox/Safari/Edge on Win/Mac/Linux
- **Accept-Language:** Matches timezone (e.g., `America/Phoenix``en-US,en;q=0.9`)
- **sec-ch-ua headers:** Proper Client Hints for the browser profile
### On 403 Block
When Dutchie returns 403, the client automatically:
1. Records failure on current proxy (increments `failure_count`)
2. If proxy has 5+ failures, deactivates it
3. Rotates to next healthy proxy
4. Rotates fingerprint
5. Retries the request
### Proxy Table Schema
```sql
CREATE TABLE proxies (
id SERIAL PRIMARY KEY,
host VARCHAR(255) NOT NULL,
port INTEGER NOT NULL,
username VARCHAR(100),
password VARCHAR(100),
protocol VARCHAR(10) DEFAULT 'http', -- http, https, socks5
is_active BOOLEAN DEFAULT true,
last_used_at TIMESTAMPTZ,
failure_count INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
avg_response_time_ms INTEGER,
last_failure_at TIMESTAMPTZ,
last_error TEXT
);
```
### Configuration
Proxies are mandatory. There is no environment variable to disable them. Workers will refuse to start without active proxies in the database.
### Fingerprints Available
The client includes 6 browser fingerprints:
- Chrome 131 on Windows
- Chrome 131 on macOS
- Chrome 120 on Windows
- Firefox 133 on Windows
- Safari 17.2 on macOS
- Edge 131 on Windows
Each includes proper `sec-ch-ua`, `sec-ch-ua-platform`, and `sec-ch-ua-mobile` headers.
---
@@ -293,6 +520,7 @@ Crawls are scheduled via `worker_tasks` table:
- **Normalization errors:** Logged as warnings, continue with valid products
- **Image download errors:** Non-fatal, logged, continue
- **Database errors:** Task fails, will be retried
- **403 blocks:** Auto-rotate proxy + fingerprint, retry (up to 3 retries)
---
@@ -305,4 +533,6 @@ Crawls are scheduled via `worker_tasks` table:
| `src/platforms/dutchie/index.ts` | GraphQL client, session management |
| `src/hydration/normalizers/dutchie.ts` | Payload normalization |
| `src/hydration/canonical-upsert.ts` | Database upsert logic |
| `src/utils/image-storage.ts` | Image download and local storage |
| `src/routes/image-proxy.ts` | On-demand image resizing |
| `migrations/075_consecutive_misses.sql` | OOS tracking column |

View File

@@ -0,0 +1,69 @@
apiVersion: batch/v1
kind: CronJob
metadata:
name: ip2location-update
namespace: default
spec:
# Run on the 1st of every month at 3am UTC
schedule: "0 3 1 * *"
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
template:
spec:
containers:
- name: ip2location-updater
image: curlimages/curl:latest
command:
- /bin/sh
- -c
- |
set -e
echo "Downloading IP2Location LITE DB5..."
# Download to temp
cd /tmp
curl -L -o ip2location.zip "https://www.ip2location.com/download/?token=${IP2LOCATION_TOKEN}&file=DB5LITEBIN"
# Extract
unzip -o ip2location.zip
# Find and copy the BIN file
BIN_FILE=$(ls *.BIN 2>/dev/null | head -1)
if [ -z "$BIN_FILE" ]; then
echo "ERROR: No BIN file found"
exit 1
fi
# Copy to shared volume
cp "$BIN_FILE" /data/IP2LOCATION-LITE-DB5.BIN
echo "Done! Database updated: /data/IP2LOCATION-LITE-DB5.BIN"
env:
- name: IP2LOCATION_TOKEN
valueFrom:
secretKeyRef:
name: dutchie-backend-secret
key: IP2LOCATION_TOKEN
volumeMounts:
- name: ip2location-data
mountPath: /data
restartPolicy: OnFailure
volumes:
- name: ip2location-data
persistentVolumeClaim:
claimName: ip2location-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: ip2location-pvc
namespace: default
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Mi

View File

@@ -26,6 +26,12 @@ spec:
name: dutchie-backend-config
- secretRef:
name: dutchie-backend-secret
env:
- name: IP2LOCATION_DB_PATH
value: /data/ip2location/IP2LOCATION-LITE-DB5.BIN
volumeMounts:
- name: ip2location-data
mountPath: /data/ip2location
resources:
requests:
memory: "256Mi"
@@ -45,3 +51,7 @@ spec:
port: 3010
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: ip2location-data
persistentVolumeClaim:
claimName: ip2location-pvc

View File

@@ -0,0 +1,71 @@
-- Visitor location analytics for Findagram
-- Tracks visitor locations to understand popular areas
CREATE TABLE IF NOT EXISTS visitor_locations (
id SERIAL PRIMARY KEY,
-- Location data (from IP lookup)
ip_hash VARCHAR(64), -- Hashed IP for privacy (SHA256)
city VARCHAR(100),
state VARCHAR(100),
state_code VARCHAR(10),
country VARCHAR(100),
country_code VARCHAR(10),
latitude DECIMAL(10, 7),
longitude DECIMAL(10, 7),
-- Visit metadata
domain VARCHAR(50) NOT NULL, -- 'findagram.co', 'findadispo.com', etc.
page_path VARCHAR(255), -- '/products', '/dispensaries/123', etc.
referrer VARCHAR(500),
user_agent VARCHAR(500),
-- Session tracking
session_id VARCHAR(64), -- For grouping page views in a session
-- Timestamps
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes for analytics queries
CREATE INDEX IF NOT EXISTS idx_visitor_locations_domain ON visitor_locations(domain);
CREATE INDEX IF NOT EXISTS idx_visitor_locations_city_state ON visitor_locations(city, state_code);
CREATE INDEX IF NOT EXISTS idx_visitor_locations_created_at ON visitor_locations(created_at);
CREATE INDEX IF NOT EXISTS idx_visitor_locations_session ON visitor_locations(session_id);
-- Aggregated daily stats (materialized for performance)
CREATE TABLE IF NOT EXISTS visitor_location_stats (
id SERIAL PRIMARY KEY,
date DATE NOT NULL,
domain VARCHAR(50) NOT NULL,
city VARCHAR(100),
state VARCHAR(100),
state_code VARCHAR(10),
country_code VARCHAR(10),
-- Metrics
visit_count INTEGER DEFAULT 0,
unique_sessions INTEGER DEFAULT 0,
UNIQUE(date, domain, city, state_code, country_code)
);
CREATE INDEX IF NOT EXISTS idx_visitor_stats_date ON visitor_location_stats(date);
CREATE INDEX IF NOT EXISTS idx_visitor_stats_domain ON visitor_location_stats(domain);
CREATE INDEX IF NOT EXISTS idx_visitor_stats_state ON visitor_location_stats(state_code);
-- View for easy querying of top locations
CREATE OR REPLACE VIEW v_top_visitor_locations AS
SELECT
domain,
city,
state,
state_code,
country_code,
COUNT(*) as total_visits,
COUNT(DISTINCT session_id) as unique_sessions,
MAX(created_at) as last_visit
FROM visitor_locations
WHERE created_at > NOW() - INTERVAL '30 days'
GROUP BY domain, city, state, state_code, country_code
ORDER BY total_visits DESC;

View File

@@ -0,0 +1,141 @@
-- Migration 076: Worker Registry for Dynamic Workers
-- Workers register on startup, receive a friendly name, and report heartbeats
-- Name pool for workers (expandable, no hardcoding)
CREATE TABLE IF NOT EXISTS worker_name_pool (
id SERIAL PRIMARY KEY,
name VARCHAR(50) UNIQUE NOT NULL,
in_use BOOLEAN DEFAULT FALSE,
assigned_to VARCHAR(100), -- worker_id
assigned_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Seed with initial names (can add more via API)
INSERT INTO worker_name_pool (name) VALUES
('Alice'), ('Bella'), ('Clara'), ('Diana'), ('Elena'),
('Fiona'), ('Grace'), ('Hazel'), ('Iris'), ('Julia'),
('Katie'), ('Luna'), ('Mia'), ('Nora'), ('Olive'),
('Pearl'), ('Quinn'), ('Rosa'), ('Sara'), ('Tara'),
('Uma'), ('Vera'), ('Wendy'), ('Xena'), ('Yuki'), ('Zara'),
('Amber'), ('Blake'), ('Coral'), ('Dawn'), ('Echo'),
('Fleur'), ('Gem'), ('Haven'), ('Ivy'), ('Jade'),
('Kira'), ('Lotus'), ('Maple'), ('Nova'), ('Onyx'),
('Pixel'), ('Quest'), ('Raven'), ('Sage'), ('Terra'),
('Unity'), ('Violet'), ('Willow'), ('Xylo'), ('Yara'), ('Zen')
ON CONFLICT (name) DO NOTHING;
-- Worker registry - tracks active workers
CREATE TABLE IF NOT EXISTS worker_registry (
id SERIAL PRIMARY KEY,
worker_id VARCHAR(100) UNIQUE NOT NULL, -- e.g., "pod-abc123" or uuid
friendly_name VARCHAR(50), -- assigned from pool
role VARCHAR(50) NOT NULL, -- task role
pod_name VARCHAR(100), -- k8s pod name
hostname VARCHAR(100), -- machine hostname
ip_address VARCHAR(50), -- worker IP
status VARCHAR(20) DEFAULT 'starting', -- starting, active, idle, offline, terminated
started_at TIMESTAMPTZ DEFAULT NOW(),
last_heartbeat_at TIMESTAMPTZ DEFAULT NOW(),
last_task_at TIMESTAMPTZ,
tasks_completed INTEGER DEFAULT 0,
tasks_failed INTEGER DEFAULT 0,
current_task_id INTEGER,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes for worker registry
CREATE INDEX IF NOT EXISTS idx_worker_registry_status ON worker_registry(status);
CREATE INDEX IF NOT EXISTS idx_worker_registry_role ON worker_registry(role);
CREATE INDEX IF NOT EXISTS idx_worker_registry_heartbeat ON worker_registry(last_heartbeat_at);
-- Function to assign a name to a new worker
CREATE OR REPLACE FUNCTION assign_worker_name(p_worker_id VARCHAR(100))
RETURNS VARCHAR(50) AS $$
DECLARE
v_name VARCHAR(50);
BEGIN
-- Try to get an unused name
UPDATE worker_name_pool
SET in_use = TRUE, assigned_to = p_worker_id, assigned_at = NOW()
WHERE id = (
SELECT id FROM worker_name_pool
WHERE in_use = FALSE
ORDER BY RANDOM()
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING name INTO v_name;
-- If no names available, generate one
IF v_name IS NULL THEN
v_name := 'Worker-' || SUBSTRING(p_worker_id FROM 1 FOR 8);
END IF;
RETURN v_name;
END;
$$ LANGUAGE plpgsql;
-- Function to release a worker's name back to the pool
CREATE OR REPLACE FUNCTION release_worker_name(p_worker_id VARCHAR(100))
RETURNS VOID AS $$
BEGIN
UPDATE worker_name_pool
SET in_use = FALSE, assigned_to = NULL, assigned_at = NULL
WHERE assigned_to = p_worker_id;
END;
$$ LANGUAGE plpgsql;
-- Function to mark stale workers as offline
CREATE OR REPLACE FUNCTION mark_stale_workers(stale_threshold_minutes INTEGER DEFAULT 5)
RETURNS INTEGER AS $$
DECLARE
v_count INTEGER;
BEGIN
UPDATE worker_registry
SET status = 'offline', updated_at = NOW()
WHERE status IN ('active', 'idle', 'starting')
AND last_heartbeat_at < NOW() - (stale_threshold_minutes || ' minutes')::INTERVAL
RETURNING COUNT(*) INTO v_count;
-- Release names from offline workers
PERFORM release_worker_name(worker_id)
FROM worker_registry
WHERE status = 'offline'
AND last_heartbeat_at < NOW() - INTERVAL '30 minutes';
RETURN COALESCE(v_count, 0);
END;
$$ LANGUAGE plpgsql;
-- View for dashboard
CREATE OR REPLACE VIEW v_active_workers AS
SELECT
wr.id,
wr.worker_id,
wr.friendly_name,
wr.role,
wr.status,
wr.pod_name,
wr.hostname,
wr.started_at,
wr.last_heartbeat_at,
wr.last_task_at,
wr.tasks_completed,
wr.tasks_failed,
wr.current_task_id,
EXTRACT(EPOCH FROM (NOW() - wr.last_heartbeat_at)) as seconds_since_heartbeat,
CASE
WHEN wr.status = 'offline' THEN 'offline'
WHEN wr.last_heartbeat_at < NOW() - INTERVAL '2 minutes' THEN 'stale'
WHEN wr.current_task_id IS NOT NULL THEN 'busy'
ELSE 'ready'
END as health_status
FROM worker_registry wr
WHERE wr.status != 'terminated'
ORDER BY wr.status = 'active' DESC, wr.last_heartbeat_at DESC;
COMMENT ON TABLE worker_registry IS 'Tracks all workers that have registered with the system';
COMMENT ON TABLE worker_name_pool IS 'Pool of friendly names for workers - expandable via API';

View File

@@ -0,0 +1,35 @@
-- Migration: Add visitor location and dispensary name to click events
-- Captures where visitors are clicking from and which dispensary
-- Add visitor location columns
ALTER TABLE product_click_events
ADD COLUMN IF NOT EXISTS visitor_city VARCHAR(100);
ALTER TABLE product_click_events
ADD COLUMN IF NOT EXISTS visitor_state VARCHAR(10);
ALTER TABLE product_click_events
ADD COLUMN IF NOT EXISTS visitor_lat DECIMAL(10, 7);
ALTER TABLE product_click_events
ADD COLUMN IF NOT EXISTS visitor_lng DECIMAL(10, 7);
-- Add dispensary name for easier reporting
ALTER TABLE product_click_events
ADD COLUMN IF NOT EXISTS dispensary_name VARCHAR(255);
-- Create index for location-based analytics
CREATE INDEX IF NOT EXISTS idx_product_click_events_visitor_state
ON product_click_events(visitor_state)
WHERE visitor_state IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_product_click_events_visitor_city
ON product_click_events(visitor_city)
WHERE visitor_city IS NOT NULL;
-- Add comments
COMMENT ON COLUMN product_click_events.visitor_city IS 'City where the visitor is located (from IP geolocation)';
COMMENT ON COLUMN product_click_events.visitor_state IS 'State where the visitor is located (from IP geolocation)';
COMMENT ON COLUMN product_click_events.visitor_lat IS 'Visitor latitude (from IP geolocation)';
COMMENT ON COLUMN product_click_events.visitor_lng IS 'Visitor longitude (from IP geolocation)';
COMMENT ON COLUMN product_click_events.dispensary_name IS 'Name of the dispensary (denormalized for easier reporting)';

View File

@@ -1026,6 +1026,17 @@
"url": "https://github.com/sponsors/fb55"
}
},
"node_modules/csv-parser": {
"version": "3.2.0",
"resolved": "https://registry.npmjs.org/csv-parser/-/csv-parser-3.2.0.tgz",
"integrity": "sha512-fgKbp+AJbn1h2dcAHKIdKNSSjfp43BZZykXsCjzALjKy80VXQNHPFJ6T9Afwdzoj24aMkq8GwDS7KGcDPpejrA==",
"bin": {
"csv-parser": "bin/csv-parser"
},
"engines": {
"node": ">= 10"
}
},
"node_modules/data-uri-to-buffer": {
"version": "6.0.2",
"resolved": "https://registry.npmjs.org/data-uri-to-buffer/-/data-uri-to-buffer-6.0.2.tgz",
@@ -2235,6 +2246,14 @@
"node": ">= 12"
}
},
"node_modules/ip2location-nodejs": {
"version": "9.7.0",
"resolved": "https://registry.npmjs.org/ip2location-nodejs/-/ip2location-nodejs-9.7.0.tgz",
"integrity": "sha512-eQ4T5TXm1cx0+pQcRycPiuaiRuoDEMd9O89Be7Ugk555qi9UY9enXSznkkqr3kQRyUaXx7zj5dORC5LGTPOttA==",
"dependencies": {
"csv-parser": "^3.0.0"
}
},
"node_modules/ipaddr.js": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-2.2.0.tgz",

View File

@@ -21,6 +21,7 @@
"helmet": "^7.1.0",
"https-proxy-agent": "^7.0.2",
"ioredis": "^5.8.2",
"ip2location-nodejs": "^9.7.0",
"ipaddr.js": "^2.2.0",
"jsonwebtoken": "^9.0.2",
"minio": "^7.1.3",
@@ -1531,6 +1532,17 @@
"url": "https://github.com/sponsors/fb55"
}
},
"node_modules/csv-parser": {
"version": "3.2.0",
"resolved": "https://registry.npmjs.org/csv-parser/-/csv-parser-3.2.0.tgz",
"integrity": "sha512-fgKbp+AJbn1h2dcAHKIdKNSSjfp43BZZykXsCjzALjKy80VXQNHPFJ6T9Afwdzoj24aMkq8GwDS7KGcDPpejrA==",
"bin": {
"csv-parser": "bin/csv-parser"
},
"engines": {
"node": ">= 10"
}
},
"node_modules/data-uri-to-buffer": {
"version": "6.0.2",
"resolved": "https://registry.npmjs.org/data-uri-to-buffer/-/data-uri-to-buffer-6.0.2.tgz",
@@ -2754,6 +2766,14 @@
"node": ">= 12"
}
},
"node_modules/ip2location-nodejs": {
"version": "9.7.0",
"resolved": "https://registry.npmjs.org/ip2location-nodejs/-/ip2location-nodejs-9.7.0.tgz",
"integrity": "sha512-eQ4T5TXm1cx0+pQcRycPiuaiRuoDEMd9O89Be7Ugk555qi9UY9enXSznkkqr3kQRyUaXx7zj5dORC5LGTPOttA==",
"dependencies": {
"csv-parser": "^3.0.0"
}
},
"node_modules/ipaddr.js": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-2.2.0.tgz",

View File

@@ -35,6 +35,7 @@
"helmet": "^7.1.0",
"https-proxy-agent": "^7.0.2",
"ioredis": "^5.8.2",
"ip2location-nodejs": "^9.7.0",
"ipaddr.js": "^2.2.0",
"jsonwebtoken": "^9.0.2",
"minio": "^7.1.3",

View File

@@ -0,0 +1,65 @@
#!/bin/bash
# Download IP2Location LITE DB3 (City-level) database
# Free for commercial use with attribution
# https://lite.ip2location.com/database/db3-ip-country-region-city
set -e
DATA_DIR="${1:-./data/ip2location}"
DB_FILE="IP2LOCATION-LITE-DB3.BIN"
mkdir -p "$DATA_DIR"
cd "$DATA_DIR"
echo "Downloading IP2Location LITE DB3 database..."
# IP2Location LITE DB3 - includes city, region, country, lat/lng
# You need to register at https://lite.ip2location.com/ to get a download token
# Then set IP2LOCATION_TOKEN environment variable
if [ -z "$IP2LOCATION_TOKEN" ]; then
echo ""
echo "ERROR: IP2LOCATION_TOKEN not set"
echo ""
echo "To download the database:"
echo "1. Register free at https://lite.ip2location.com/"
echo "2. Get your download token from the dashboard"
echo "3. Run: IP2LOCATION_TOKEN=your_token ./scripts/download-ip2location.sh"
echo ""
exit 1
fi
# Download DB3.LITE (IPv4 + City)
DOWNLOAD_URL="https://www.ip2location.com/download/?token=${IP2LOCATION_TOKEN}&file=DB3LITEBIN"
echo "Downloading from IP2Location..."
curl -L -o ip2location.zip "$DOWNLOAD_URL"
echo "Extracting..."
unzip -o ip2location.zip
# Rename to standard name
if [ -f "IP2LOCATION-LITE-DB3.BIN" ]; then
echo "Database ready: $DATA_DIR/IP2LOCATION-LITE-DB3.BIN"
elif [ -f "IP-COUNTRY-REGION-CITY.BIN" ]; then
mv "IP-COUNTRY-REGION-CITY.BIN" "$DB_FILE"
echo "Database ready: $DATA_DIR/$DB_FILE"
else
# Find whatever BIN file was extracted
BIN_FILE=$(ls *.BIN 2>/dev/null | head -1)
if [ -n "$BIN_FILE" ]; then
mv "$BIN_FILE" "$DB_FILE"
echo "Database ready: $DATA_DIR/$DB_FILE"
else
echo "ERROR: No BIN file found in archive"
ls -la
exit 1
fi
fi
# Cleanup
rm -f ip2location.zip *.txt LICENSE* README*
echo ""
echo "Done! Database saved to: $DATA_DIR/$DB_FILE"
echo "Update monthly by re-running this script."

View File

@@ -191,6 +191,23 @@ export async function runFullDiscovery(
}
}
// Step 5: Detect dropped stores (in DB but not in discovery results)
if (!dryRun) {
console.log('\n[Discovery] Step 5: Detecting dropped stores...');
const droppedResult = await detectDroppedStores(pool, stateCode);
if (droppedResult.droppedCount > 0) {
console.log(`[Discovery] Found ${droppedResult.droppedCount} dropped stores:`);
droppedResult.droppedStores.slice(0, 10).forEach(s => {
console.log(` - ${s.name} (${s.city}, ${s.state}) - last seen: ${s.lastSeenAt}`);
});
if (droppedResult.droppedCount > 10) {
console.log(` ... and ${droppedResult.droppedCount - 10} more`);
}
} else {
console.log(`[Discovery] No dropped stores detected`);
}
}
return {
cities: cityResult,
locations: locationResults,
@@ -200,6 +217,107 @@ export async function runFullDiscovery(
};
}
// ============================================================
// DROPPED STORE DETECTION
// ============================================================
export interface DroppedStoreResult {
droppedCount: number;
droppedStores: Array<{
id: number;
name: string;
city: string;
state: string;
platformDispensaryId: string;
lastSeenAt: string;
}>;
}
/**
* Detect stores that exist in dispensaries but were not found in discovery.
* Marks them as status='dropped' for manual review.
*
* A store is considered "dropped" if:
* 1. It has a platform_dispensary_id (was verified via Dutchie)
* 2. It was NOT seen in the latest discovery crawl (last_seen_at in discovery < 24h ago)
* 3. It's currently marked as 'open' status
*/
export async function detectDroppedStores(
pool: Pool,
stateCode?: string
): Promise<DroppedStoreResult> {
// Find dispensaries that:
// 1. Have platform_dispensary_id (verified Dutchie stores)
// 2. Are currently 'open' status
// 3. Have a linked discovery record that wasn't seen in the last discovery run
// (last_seen_at in dutchie_discovery_locations is older than 24 hours)
const params: any[] = [];
let stateFilter = '';
if (stateCode) {
stateFilter = ` AND d.state = $1`;
params.push(stateCode);
}
const query = `
WITH recently_seen AS (
SELECT DISTINCT platform_location_id
FROM dutchie_discovery_locations
WHERE last_seen_at > NOW() - INTERVAL '24 hours'
AND active = true
)
SELECT
d.id,
d.name,
d.city,
d.state,
d.platform_dispensary_id,
d.updated_at as last_seen_at
FROM dispensaries d
WHERE d.platform_dispensary_id IS NOT NULL
AND d.platform = 'dutchie'
AND (d.status = 'open' OR d.status IS NULL)
AND d.crawl_enabled = true
AND d.platform_dispensary_id NOT IN (SELECT platform_location_id FROM recently_seen)
${stateFilter}
ORDER BY d.name
`;
const result = await pool.query(query, params);
const droppedStores = result.rows;
// Mark these stores as 'dropped' status
if (droppedStores.length > 0) {
const ids = droppedStores.map(s => s.id);
await pool.query(`
UPDATE dispensaries
SET status = 'dropped', updated_at = NOW()
WHERE id = ANY($1::int[])
`, [ids]);
// Log to promotion log for audit
for (const store of droppedStores) {
await pool.query(`
INSERT INTO dutchie_promotion_log
(dispensary_id, action, state_code, store_name, triggered_by)
VALUES ($1, 'dropped', $2, $3, 'discovery_detection')
`, [store.id, store.state, store.name]);
}
}
return {
droppedCount: droppedStores.length,
droppedStores: droppedStores.map(s => ({
id: s.id,
name: s.name,
city: s.city,
state: s.state,
platformDispensaryId: s.platform_dispensary_id,
lastSeenAt: s.last_seen_at,
})),
};
}
// ============================================================
// SINGLE CITY DISCOVERY
// ============================================================

View File

@@ -140,6 +140,7 @@ import clickAnalyticsRoutes from './routes/click-analytics';
import seoRoutes from './routes/seo';
import priceAnalyticsRoutes from './routes/price-analytics';
import tasksRoutes from './routes/tasks';
import workerRegistryRoutes from './routes/worker-registry';
// Mark requests from trusted domains (cannaiq.co, findagram.co, findadispo.com)
// These domains can access the API without authentication
@@ -216,6 +217,10 @@ console.log('[Workers] Routes registered at /api/workers, /api/monitor, and /api
app.use('/api/tasks', tasksRoutes);
console.log('[Tasks] Routes registered at /api/tasks');
// Worker registry - dynamic worker registration, heartbeats, and name management
app.use('/api/worker-registry', workerRegistryRoutes);
console.log('[WorkerRegistry] Routes registered at /api/worker-registry');
// Phase 3: Analytics V2 - Enhanced analytics with rec/med state segmentation
try {
const analyticsV2Router = createAnalyticsV2Router(getPool());

View File

@@ -5,33 +5,37 @@ import { pool } from '../db/pool';
const router = Router();
router.use(authMiddleware);
// Get categories (flat list)
// Get categories (flat list) - derived from actual product data
router.get('/', async (req, res) => {
try {
const { store_id } = req.query;
const { store_id, in_stock_only } = req.query;
let query = `
SELECT
c.*,
COUNT(DISTINCT p.id) as product_count,
pc.name as parent_name
FROM categories c
LEFT JOIN store_products p ON c.name = p.category_raw
LEFT JOIN categories pc ON c.parent_id = pc.id
category_raw as name,
category_raw as slug,
COUNT(*) as product_count,
COUNT(*) FILTER (WHERE is_in_stock = true) as in_stock_count
FROM store_products
WHERE category_raw IS NOT NULL
`;
const params: any[] = [];
if (store_id) {
query += ' WHERE c.store_id = $1';
params.push(store_id);
query += ` AND dispensary_id = $${params.length}`;
}
if (in_stock_only === 'true') {
query += ` AND is_in_stock = true`;
}
query += `
GROUP BY c.id, pc.name
ORDER BY c.display_order, c.name
GROUP BY category_raw
ORDER BY category_raw
`;
const result = await pool.query(query, params);
res.json({ categories: result.rows });
} catch (error) {
@@ -40,50 +44,86 @@ router.get('/', async (req, res) => {
}
});
// Get category tree (hierarchical)
// Get category tree (hierarchical) - category -> subcategory structure from product data
router.get('/tree', async (req, res) => {
try {
const { store_id } = req.query;
if (!store_id) {
return res.status(400).json({ error: 'store_id is required' });
}
// Get all categories for the store
const result = await pool.query(`
SELECT
c.*,
COUNT(DISTINCT p.id) as product_count
FROM categories c
LEFT JOIN store_products p ON c.name = p.category_raw AND p.is_in_stock = true AND p.dispensary_id = $1
WHERE c.store_id = $1
GROUP BY c.id
ORDER BY c.display_order, c.name
`, [store_id]);
// Build tree structure
const categories = result.rows;
const categoryMap = new Map();
const tree: any[] = [];
// First pass: create map
categories.forEach((cat: { id: number; parent_id?: number }) => {
categoryMap.set(cat.id, { ...cat, children: [] });
});
const { store_id, in_stock_only } = req.query;
// Second pass: build tree
categories.forEach((cat: { id: number; parent_id?: number }) => {
const node = categoryMap.get(cat.id);
if (cat.parent_id) {
const parent = categoryMap.get(cat.parent_id);
if (parent) {
parent.children.push(node);
}
} else {
tree.push(node);
// Get category + subcategory combinations with counts
let query = `
SELECT
category_raw as category,
subcategory_raw as subcategory,
COUNT(*) as product_count,
COUNT(*) FILTER (WHERE is_in_stock = true) as in_stock_count
FROM store_products
WHERE category_raw IS NOT NULL
`;
const params: any[] = [];
if (store_id) {
params.push(store_id);
query += ` AND dispensary_id = $${params.length}`;
}
if (in_stock_only === 'true') {
query += ` AND is_in_stock = true`;
}
query += `
GROUP BY category_raw, subcategory_raw
ORDER BY category_raw, subcategory_raw
`;
const result = await pool.query(query, params);
// Build tree structure: category -> subcategories
const categoryMap = new Map<string, {
name: string;
slug: string;
product_count: number;
in_stock_count: number;
subcategories: Array<{
name: string;
slug: string;
product_count: number;
in_stock_count: number;
}>;
}>();
for (const row of result.rows) {
const category = row.category;
const subcategory = row.subcategory;
const count = parseInt(row.product_count);
const inStockCount = parseInt(row.in_stock_count);
if (!categoryMap.has(category)) {
categoryMap.set(category, {
name: category,
slug: category.toLowerCase().replace(/\s+/g, '-'),
product_count: 0,
in_stock_count: 0,
subcategories: []
});
}
});
const cat = categoryMap.get(category)!;
cat.product_count += count;
cat.in_stock_count += inStockCount;
if (subcategory) {
cat.subcategories.push({
name: subcategory,
slug: subcategory.toLowerCase().replace(/\s+/g, '-'),
product_count: count,
in_stock_count: inStockCount
});
}
}
const tree = Array.from(categoryMap.values());
res.json({ tree });
} catch (error) {
console.error('Error fetching category tree:', error);
@@ -91,4 +131,91 @@ router.get('/tree', async (req, res) => {
}
});
// Get all unique subcategories for a category
router.get('/:category/subcategories', async (req, res) => {
try {
const { category } = req.params;
const { store_id, in_stock_only } = req.query;
let query = `
SELECT
subcategory_raw as name,
subcategory_raw as slug,
COUNT(*) as product_count,
COUNT(*) FILTER (WHERE is_in_stock = true) as in_stock_count
FROM store_products
WHERE category_raw = $1
AND subcategory_raw IS NOT NULL
`;
const params: any[] = [category];
if (store_id) {
params.push(store_id);
query += ` AND dispensary_id = $${params.length}`;
}
if (in_stock_only === 'true') {
query += ` AND is_in_stock = true`;
}
query += `
GROUP BY subcategory_raw
ORDER BY subcategory_raw
`;
const result = await pool.query(query, params);
res.json({
category,
subcategories: result.rows
});
} catch (error) {
console.error('Error fetching subcategories:', error);
res.status(500).json({ error: 'Failed to fetch subcategories' });
}
});
// Get global category summary (across all stores)
router.get('/summary', async (req, res) => {
try {
const { state } = req.query;
let query = `
SELECT
sp.category_raw as category,
COUNT(DISTINCT sp.id) as product_count,
COUNT(DISTINCT sp.dispensary_id) as store_count,
COUNT(*) FILTER (WHERE sp.is_in_stock = true) as in_stock_count
FROM store_products sp
`;
const params: any[] = [];
if (state) {
query += `
JOIN dispensaries d ON sp.dispensary_id = d.id
WHERE sp.category_raw IS NOT NULL
AND d.state = $1
`;
params.push(state);
} else {
query += ` WHERE sp.category_raw IS NOT NULL`;
}
query += `
GROUP BY sp.category_raw
ORDER BY product_count DESC
`;
const result = await pool.query(query, params);
res.json({
categories: result.rows,
total_categories: result.rows.length
});
} catch (error) {
console.error('Error fetching category summary:', error);
res.status(500).json({ error: 'Failed to fetch category summary' });
}
});
export default router;

View File

@@ -11,7 +11,7 @@ const VALID_MENU_TYPES = ['dutchie', 'treez', 'jane', 'weedmaps', 'leafly', 'mea
// Get all dispensaries (with pagination)
router.get('/', async (req, res) => {
try {
const { menu_type, city, state, crawl_enabled, dutchie_verified, limit, offset, search } = req.query;
const { menu_type, city, state, crawl_enabled, dutchie_verified, status, limit, offset, search } = req.query;
const pageLimit = Math.min(parseInt(limit as string) || 50, 500);
const pageOffset = parseInt(offset as string) || 0;
@@ -100,6 +100,12 @@ router.get('/', async (req, res) => {
}
}
// Filter by status (e.g., 'dropped', 'open', 'closed')
if (status) {
conditions.push(`status = $${params.length + 1}`);
params.push(status);
}
// Search filter (name, dba_name, city, company_name)
if (search) {
conditions.push(`(name ILIKE $${params.length + 1} OR dba_name ILIKE $${params.length + 1} OR city ILIKE $${params.length + 1})`);
@@ -161,6 +167,7 @@ router.get('/stats/crawl-status', async (req, res) => {
COUNT(*) FILTER (WHERE crawl_enabled = false OR crawl_enabled IS NULL) as disabled_count,
COUNT(*) FILTER (WHERE dutchie_verified = true) as verified_count,
COUNT(*) FILTER (WHERE dutchie_verified = false OR dutchie_verified IS NULL) as unverified_count,
COUNT(*) FILTER (WHERE status = 'dropped') as dropped_count,
COUNT(*) as total_count
FROM dispensaries
`;
@@ -190,6 +197,34 @@ router.get('/stats/crawl-status', async (req, res) => {
}
});
// Get dropped stores count (for dashboard alert)
router.get('/stats/dropped', async (req, res) => {
try {
const result = await pool.query(`
SELECT
COUNT(*) as dropped_count,
json_agg(json_build_object(
'id', id,
'name', name,
'city', city,
'state', state,
'dropped_at', updated_at
) ORDER BY updated_at DESC) FILTER (WHERE status = 'dropped') as dropped_stores
FROM dispensaries
WHERE status = 'dropped'
`);
const row = result.rows[0];
res.json({
dropped_count: parseInt(row.dropped_count) || 0,
dropped_stores: row.dropped_stores || []
});
} catch (error) {
console.error('Error fetching dropped stores:', error);
res.status(500).json({ error: 'Failed to fetch dropped stores' });
}
});
// Get single dispensary by slug or ID
router.get('/:slugOrId', async (req, res) => {
try {

View File

@@ -22,11 +22,17 @@ interface ProductClickEventPayload {
store_id?: string;
brand_id?: string;
campaign_id?: string;
dispensary_name?: string;
action: 'view' | 'open_store' | 'open_product' | 'compare' | 'other';
source: string;
page_type?: string; // Page where event occurred (e.g., StoreDetailPage, BrandsIntelligence)
url_path?: string; // URL path for debugging
occurred_at?: string;
// Visitor location (from frontend IP geolocation)
visitor_city?: string;
visitor_state?: string;
visitor_lat?: number;
visitor_lng?: number;
}
/**
@@ -77,13 +83,14 @@ router.post('/product-click', optionalAuthMiddleware, async (req: Request, res:
// Insert the event with enhanced fields
await pool.query(
`INSERT INTO product_click_events
(product_id, store_id, brand_id, campaign_id, action, source, user_id, ip_address, user_agent, occurred_at, event_type, page_type, url_path, device_type)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)`,
(product_id, store_id, brand_id, campaign_id, dispensary_name, action, source, user_id, ip_address, user_agent, occurred_at, event_type, page_type, url_path, device_type, visitor_city, visitor_state, visitor_lat, visitor_lng)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)`,
[
payload.product_id,
payload.store_id || null,
payload.brand_id || null,
payload.campaign_id || null,
payload.dispensary_name || null,
payload.action,
payload.source,
userId,
@@ -93,7 +100,11 @@ router.post('/product-click', optionalAuthMiddleware, async (req: Request, res:
'product_click', // event_type
payload.page_type || null,
payload.url_path || null,
deviceType
deviceType,
payload.visitor_city || null,
payload.visitor_state || null,
payload.visitor_lat || null,
payload.visitor_lng || null
]
);

View File

@@ -1,11 +1,29 @@
import { Router } from 'express';
import { authMiddleware } from '../auth/middleware';
import { pool } from '../db/pool';
import { getImageUrl } from '../utils/minio';
const router = Router();
router.use(authMiddleware);
/**
* Convert local image path to proxy URL
* /images/products/... -> /img/products/...
*/
function getImageUrl(localPath: string): string {
if (!localPath) return '';
// If already a full URL, return as-is
if (localPath.startsWith('http')) return localPath;
// Convert /images/ path to /img/ proxy path
if (localPath.startsWith('/images/')) {
return '/img' + localPath.substring(7);
}
// Handle paths without leading slash
if (localPath.startsWith('images/')) {
return '/img/' + localPath.substring(7);
}
return '/img/' + localPath;
}
// Freshness threshold: data older than this is considered stale
const STALE_THRESHOLD_HOURS = 4;

View File

@@ -463,7 +463,7 @@ router.get('/products', async (req: PublicApiRequest, res: Response) => {
// Filter by on special
if (on_special === 'true' || on_special === '1') {
whereClause += ` AND s.is_on_special = TRUE`;
whereClause += ` AND s.special = TRUE`;
}
// Search by name or brand
@@ -547,7 +547,7 @@ router.get('/products', async (req: PublicApiRequest, res: Response) => {
const { rows: countRows } = await pool.query(`
SELECT COUNT(*) as total FROM store_products p
LEFT JOIN LATERAL (
SELECT rec_min_price_cents / 100.0 as price_rec, med_min_price_cents / 100.0 as price_med, special as is_on_special FROM v_product_snapshots
SELECT rec_min_price_cents / 100.0 as price_rec, med_min_price_cents / 100.0 as price_med, special FROM v_product_snapshots
WHERE store_product_id = p.id
ORDER BY crawled_at DESC
LIMIT 1
@@ -1125,6 +1125,7 @@ router.get('/dispensaries', async (req: PublicApiRequest, res: Response) => {
SELECT
d.id,
d.name,
d.slug,
d.address1,
d.address2,
d.city,
@@ -1179,6 +1180,7 @@ router.get('/dispensaries', async (req: PublicApiRequest, res: Response) => {
const transformedDispensaries = dispensaries.map((d) => ({
id: d.id,
name: d.name,
slug: d.slug || null,
address1: d.address1,
address2: d.address2,
city: d.city,
@@ -1876,7 +1878,7 @@ router.get('/stats', async (req: PublicApiRequest, res: Response) => {
SELECT
(SELECT COUNT(*) FROM store_products) as product_count,
(SELECT COUNT(DISTINCT brand_name_raw) FROM store_products WHERE brand_name_raw IS NOT NULL) as brand_count,
(SELECT COUNT(*) FROM dispensaries WHERE crawl_enabled = true AND product_count > 0) as dispensary_count
(SELECT COUNT(DISTINCT dispensary_id) FROM store_products) as dispensary_count
`);
const s = stats[0] || {};
@@ -1996,4 +1998,235 @@ router.get('/menu', async (req: PublicApiRequest, res: Response) => {
}
});
// ============================================================
// VISITOR TRACKING & GEOLOCATION
// ============================================================
import crypto from 'crypto';
import { GeoLocation, lookupIP } from '../services/ip2location';
/**
* Get location from IP using local IP2Location database
*/
function getLocationFromIP(ip: string): GeoLocation | null {
return lookupIP(ip);
}
/**
* Hash IP for privacy (we don't store raw IPs)
*/
function hashIP(ip: string): string {
return crypto.createHash('sha256').update(ip).digest('hex').substring(0, 16);
}
/**
* POST /api/v1/visitor/track
* Track visitor location for analytics
*
* Body:
* - domain: string (required) - 'findagram.co', 'findadispo.com', etc.
* - page_path: string (optional) - current page path
* - session_id: string (optional) - client-generated session ID
* - referrer: string (optional) - document.referrer
*
* Returns:
* - location: { city, state, lat, lng } for client use
*/
router.post('/visitor/track', async (req: Request, res: Response) => {
try {
const { domain, page_path, session_id, referrer } = req.body;
if (!domain) {
return res.status(400).json({ error: 'domain is required' });
}
// Get client IP
const clientIp = (req.headers['x-forwarded-for'] as string)?.split(',')[0].trim() ||
req.headers['x-real-ip'] as string ||
req.ip ||
req.socket.remoteAddress ||
'';
// Get location from IP (local database lookup)
const location = getLocationFromIP(clientIp);
// Store visit (with hashed IP for privacy)
await pool.query(`
INSERT INTO visitor_locations (
ip_hash, city, state, state_code, country, country_code,
latitude, longitude, domain, page_path, referrer, user_agent, session_id
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
`, [
hashIP(clientIp),
location?.city || null,
location?.state || null,
location?.stateCode || null,
location?.country || null,
location?.countryCode || null,
location?.lat || null,
location?.lng || null,
domain,
page_path || null,
referrer || null,
req.headers['user-agent'] || null,
session_id || null
]);
// Return location to client (for nearby dispensary feature)
res.json({
success: true,
location: location ? {
city: location.city,
state: location.state,
stateCode: location.stateCode,
lat: location.lat,
lng: location.lng
} : null
});
} catch (error: any) {
console.error('Visitor tracking error:', error);
// Don't fail the request - tracking is non-critical
res.json({
success: false,
location: null
});
}
});
/**
* GET /api/v1/visitor/location
* Get visitor location without tracking (just IP lookup)
*/
router.get('/visitor/location', (req: Request, res: Response) => {
try {
const clientIp = (req.headers['x-forwarded-for'] as string)?.split(',')[0].trim() ||
req.headers['x-real-ip'] as string ||
req.ip ||
req.socket.remoteAddress ||
'';
const location = getLocationFromIP(clientIp);
res.json({
success: true,
location: location ? {
city: location.city,
state: location.state,
stateCode: location.stateCode,
lat: location.lat,
lng: location.lng
} : null
});
} catch (error: any) {
console.error('Location lookup error:', error);
res.json({
success: false,
location: null
});
}
});
/**
* GET /api/v1/analytics/visitors
* Get visitor analytics (admin only - requires auth)
*
* Query params:
* - domain: filter by domain
* - days: number of days to look back (default: 30)
* - limit: max results (default: 50)
*/
router.get('/analytics/visitors', async (req: PublicApiRequest, res: Response) => {
try {
const scope = req.scope;
// Only allow internal keys
if (!scope || scope.type !== 'internal') {
return res.status(403).json({ error: 'Access denied - internal key required' });
}
const { domain, days = '30', limit = '50' } = req.query;
const daysNum = Math.min(parseInt(days as string, 10) || 30, 90);
const limitNum = Math.min(parseInt(limit as string, 10) || 50, 200);
let whereClause = 'WHERE created_at > NOW() - $1::interval';
const params: any[] = [`${daysNum} days`];
let paramIndex = 2;
if (domain) {
whereClause += ` AND domain = $${paramIndex}`;
params.push(domain);
paramIndex++;
}
// Get top locations
const { rows: topLocations } = await pool.query(`
SELECT
city,
state,
state_code,
country_code,
COUNT(*) as visit_count,
COUNT(DISTINCT session_id) as unique_sessions,
MAX(created_at) as last_visit
FROM visitor_locations
${whereClause}
GROUP BY city, state, state_code, country_code
ORDER BY visit_count DESC
LIMIT $${paramIndex}
`, [...params, limitNum]);
// Get daily totals
const { rows: dailyStats } = await pool.query(`
SELECT
DATE(created_at) as date,
COUNT(*) as visits,
COUNT(DISTINCT session_id) as unique_sessions
FROM visitor_locations
${whereClause}
GROUP BY DATE(created_at)
ORDER BY date DESC
LIMIT 30
`, params);
// Get totals
const { rows: totals } = await pool.query(`
SELECT
COUNT(*) as total_visits,
COUNT(DISTINCT session_id) as total_sessions,
COUNT(DISTINCT city || state_code) as unique_locations
FROM visitor_locations
${whereClause}
`, params);
res.json({
success: true,
period: {
days: daysNum,
domain: domain || 'all'
},
totals: totals[0],
top_locations: topLocations.map(l => ({
city: l.city,
state: l.state,
state_code: l.state_code,
country_code: l.country_code,
visits: parseInt(l.visit_count, 10),
unique_sessions: parseInt(l.unique_sessions, 10),
last_visit: l.last_visit
})),
daily_stats: dailyStats.map(d => ({
date: d.date,
visits: parseInt(d.visits, 10),
unique_sessions: parseInt(d.unique_sessions, 10)
}))
});
} catch (error: any) {
console.error('Visitor analytics error:', error);
res.status(500).json({
error: 'Failed to fetch visitor analytics',
message: error.message
});
}
});
export default router;

View File

@@ -444,7 +444,7 @@ router.post('/migration/cancel-pending-crawl-jobs', async (_req: Request, res: R
/**
* POST /api/tasks/migration/create-resync-tasks
* Create product_resync tasks for all crawl-enabled dispensaries
* Create product_refresh tasks for all crawl-enabled dispensaries
*/
router.post('/migration/create-resync-tasks', async (req: Request, res: Response) => {
try {
@@ -474,7 +474,7 @@ router.post('/migration/create-resync-tasks', async (req: Request, res: Response
const hasActive = await taskService.hasActiveTask(disp.id);
if (!hasActive) {
await taskService.createTask({
role: 'product_resync',
role: 'product_refresh',
dispensary_id: disp.id,
platform: 'dutchie',
priority,

View File

@@ -0,0 +1,652 @@
/**
* Worker Registry API Routes
*
* Dynamic worker management - workers register on startup, get assigned names,
* and report heartbeats. Everything is API-driven, no hardcoding.
*
* Endpoints:
* POST /api/worker-registry/register - Worker reports for duty
* POST /api/worker-registry/heartbeat - Worker heartbeat
* POST /api/worker-registry/deregister - Worker signing off
* GET /api/worker-registry/workers - List all workers (for dashboard)
* GET /api/worker-registry/workers/:id - Get specific worker
* POST /api/worker-registry/cleanup - Mark stale workers offline
*
* GET /api/worker-registry/names - List all names in pool
* POST /api/worker-registry/names - Add names to pool
* DELETE /api/worker-registry/names/:name - Remove name from pool
*
* GET /api/worker-registry/roles - List available task roles
* POST /api/worker-registry/roles - Add a new role (future)
*/
import { Router, Request, Response } from 'express';
import { pool } from '../db/pool';
import os from 'os';
const router = Router();
// ============================================================
// WORKER REGISTRATION
// ============================================================
/**
* POST /api/worker-registry/register
* Worker reports for duty - gets assigned a friendly name
*
* Body:
* - role: string (optional) - task role, or null for role-agnostic workers
* - worker_id: string (optional) - custom ID, auto-generated if not provided
* - pod_name: string (optional) - k8s pod name
* - hostname: string (optional) - machine hostname
* - metadata: object (optional) - additional worker info
*
* Returns:
* - worker_id: assigned worker ID
* - friendly_name: assigned name from pool
* - role: confirmed role (or null if agnostic)
* - message: welcome message
*/
router.post('/register', async (req: Request, res: Response) => {
try {
const {
role = null, // Role is now optional - null means agnostic
worker_id,
pod_name,
hostname,
ip_address,
metadata = {}
} = req.body;
// Generate worker_id if not provided
const finalWorkerId = worker_id || `worker-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const finalHostname = hostname || os.hostname();
const clientIp = ip_address || req.ip || req.socket.remoteAddress;
// Check if worker already registered
const existing = await pool.query(
'SELECT id, friendly_name, status FROM worker_registry WHERE worker_id = $1',
[finalWorkerId]
);
if (existing.rows.length > 0) {
// Re-activate existing worker
const { rows } = await pool.query(`
UPDATE worker_registry
SET status = 'active',
role = $1,
pod_name = $2,
hostname = $3,
ip_address = $4,
last_heartbeat_at = NOW(),
started_at = NOW(),
metadata = $5,
updated_at = NOW()
WHERE worker_id = $6
RETURNING id, worker_id, friendly_name, role
`, [role, pod_name, finalHostname, clientIp, metadata, finalWorkerId]);
const worker = rows[0];
const roleMsg = role ? `for ${role}` : 'as role-agnostic';
console.log(`[WorkerRegistry] Worker "${worker.friendly_name}" (${finalWorkerId}) re-registered ${roleMsg}`);
return res.json({
success: true,
worker_id: worker.worker_id,
friendly_name: worker.friendly_name,
role: worker.role,
message: role
? `Welcome back, ${worker.friendly_name}! You are assigned to ${role}.`
: `Welcome back, ${worker.friendly_name}! You are ready to take any task.`
});
}
// Assign a friendly name
const nameResult = await pool.query('SELECT assign_worker_name($1) as name', [finalWorkerId]);
const friendlyName = nameResult.rows[0].name;
// Register the worker
const { rows } = await pool.query(`
INSERT INTO worker_registry (
worker_id, friendly_name, role, pod_name, hostname, ip_address, status, metadata
) VALUES ($1, $2, $3, $4, $5, $6, 'active', $7)
RETURNING id, worker_id, friendly_name, role
`, [finalWorkerId, friendlyName, role, pod_name, finalHostname, clientIp, metadata]);
const worker = rows[0];
const roleMsg = role ? `for ${role}` : 'as role-agnostic';
console.log(`[WorkerRegistry] New worker "${friendlyName}" (${finalWorkerId}) reporting for duty ${roleMsg}`);
res.json({
success: true,
worker_id: worker.worker_id,
friendly_name: worker.friendly_name,
role: worker.role,
message: role
? `Hello ${friendlyName}! You are now registered for ${role}. Ready for work!`
: `Hello ${friendlyName}! You are ready to take any task from the pool.`
});
} catch (error: any) {
console.error('[WorkerRegistry] Registration error:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/worker-registry/heartbeat
* Worker sends heartbeat to stay alive
*
* Body:
* - worker_id: string (required)
* - current_task_id: number (optional) - task currently being processed
* - status: string (optional) - 'active', 'idle'
*/
router.post('/heartbeat', async (req: Request, res: Response) => {
try {
const { worker_id, current_task_id, status = 'active', resources } = req.body;
if (!worker_id) {
return res.status(400).json({ success: false, error: 'worker_id is required' });
}
// Store resources in metadata jsonb column
const { rows } = await pool.query(`
UPDATE worker_registry
SET last_heartbeat_at = NOW(),
current_task_id = $1,
status = $2,
metadata = COALESCE(metadata, '{}'::jsonb) || COALESCE($4::jsonb, '{}'::jsonb),
updated_at = NOW()
WHERE worker_id = $3
RETURNING id, friendly_name, status
`, [current_task_id || null, status, worker_id, resources ? JSON.stringify(resources) : null]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found - please register first' });
}
res.json({
success: true,
worker: rows[0]
});
} catch (error: any) {
console.error('[WorkerRegistry] Heartbeat error:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/worker-registry/task-completed
* Worker reports task completion
*
* Body:
* - worker_id: string (required)
* - success: boolean (required)
*/
router.post('/task-completed', async (req: Request, res: Response) => {
try {
const { worker_id, success } = req.body;
if (!worker_id) {
return res.status(400).json({ success: false, error: 'worker_id is required' });
}
const incrementField = success ? 'tasks_completed' : 'tasks_failed';
const { rows } = await pool.query(`
UPDATE worker_registry
SET ${incrementField} = ${incrementField} + 1,
last_task_at = NOW(),
current_task_id = NULL,
status = 'idle',
updated_at = NOW()
WHERE worker_id = $1
RETURNING id, friendly_name, tasks_completed, tasks_failed
`, [worker_id]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
res.json({ success: true, worker: rows[0] });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/worker-registry/deregister
* Worker signing off (graceful shutdown)
*
* Body:
* - worker_id: string (required)
*/
router.post('/deregister', async (req: Request, res: Response) => {
try {
const { worker_id } = req.body;
if (!worker_id) {
return res.status(400).json({ success: false, error: 'worker_id is required' });
}
// Release the name back to the pool
await pool.query('SELECT release_worker_name($1)', [worker_id]);
// Mark as terminated
const { rows } = await pool.query(`
UPDATE worker_registry
SET status = 'terminated',
current_task_id = NULL,
updated_at = NOW()
WHERE worker_id = $1
RETURNING id, friendly_name
`, [worker_id]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
console.log(`[WorkerRegistry] Worker "${rows[0].friendly_name}" (${worker_id}) signed off`);
res.json({
success: true,
message: `Goodbye ${rows[0].friendly_name}! Thanks for your work.`
});
} catch (error: any) {
console.error('[WorkerRegistry] Deregister error:', error);
res.status(500).json({ success: false, error: error.message });
}
});
// ============================================================
// WORKER LISTING (for Dashboard)
// ============================================================
/**
* GET /api/worker-registry/workers
* List all workers (for dashboard)
*
* Query params:
* - status: filter by status (active, idle, offline, all)
* - role: filter by role
* - include_terminated: include terminated workers (default: false)
*/
router.get('/workers', async (req: Request, res: Response) => {
try {
const { status, role, include_terminated = 'false' } = req.query;
let whereClause = include_terminated === 'true' ? 'WHERE 1=1' : "WHERE status != 'terminated'";
const params: any[] = [];
let paramIndex = 1;
if (status && status !== 'all') {
whereClause += ` AND status = $${paramIndex}`;
params.push(status);
paramIndex++;
}
if (role) {
whereClause += ` AND role = $${paramIndex}`;
params.push(role);
paramIndex++;
}
const { rows } = await pool.query(`
SELECT
id,
worker_id,
friendly_name,
role,
pod_name,
hostname,
ip_address,
status,
started_at,
last_heartbeat_at,
last_task_at,
tasks_completed,
tasks_failed,
current_task_id,
metadata,
EXTRACT(EPOCH FROM (NOW() - last_heartbeat_at)) as seconds_since_heartbeat,
CASE
WHEN status = 'offline' OR status = 'terminated' THEN status
WHEN last_heartbeat_at < NOW() - INTERVAL '2 minutes' THEN 'stale'
WHEN current_task_id IS NOT NULL THEN 'busy'
ELSE 'ready'
END as health_status,
created_at
FROM worker_registry
${whereClause}
ORDER BY
CASE status
WHEN 'active' THEN 1
WHEN 'idle' THEN 2
WHEN 'offline' THEN 3
ELSE 4
END,
last_heartbeat_at DESC
`, params);
// Get summary counts
const { rows: summary } = await pool.query(`
SELECT
COUNT(*) FILTER (WHERE status = 'active') as active_count,
COUNT(*) FILTER (WHERE status = 'idle') as idle_count,
COUNT(*) FILTER (WHERE status = 'offline') as offline_count,
COUNT(*) FILTER (WHERE status != 'terminated') as total_count,
COUNT(DISTINCT role) FILTER (WHERE status IN ('active', 'idle')) as active_roles
FROM worker_registry
`);
res.json({
success: true,
workers: rows,
summary: summary[0]
});
} catch (error: any) {
console.error('[WorkerRegistry] List workers error:', error);
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/worker-registry/workers/:workerId
* Get specific worker details
*/
router.get('/workers/:workerId', async (req: Request, res: Response) => {
try {
const { workerId } = req.params;
const { rows } = await pool.query(`
SELECT * FROM worker_registry WHERE worker_id = $1
`, [workerId]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
res.json({ success: true, worker: rows[0] });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* DELETE /api/worker-registry/workers/:workerId
* Remove a worker (admin action)
*/
router.delete('/workers/:workerId', async (req: Request, res: Response) => {
try {
const { workerId } = req.params;
// Release name
await pool.query('SELECT release_worker_name($1)', [workerId]);
// Delete worker
const { rows } = await pool.query(`
DELETE FROM worker_registry WHERE worker_id = $1 RETURNING friendly_name
`, [workerId]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
res.json({ success: true, message: `Worker ${rows[0].friendly_name} removed` });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/worker-registry/cleanup
* Mark stale workers as offline
*
* Body:
* - stale_threshold_minutes: number (default: 5)
*/
router.post('/cleanup', async (req: Request, res: Response) => {
try {
const { stale_threshold_minutes = 5 } = req.body;
const { rows } = await pool.query(
'SELECT mark_stale_workers($1) as count',
[stale_threshold_minutes]
);
res.json({
success: true,
stale_workers_marked: rows[0].count,
message: `Marked ${rows[0].count} stale workers as offline`
});
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
// ============================================================
// NAME POOL MANAGEMENT
// ============================================================
/**
* GET /api/worker-registry/names
* List all names in the pool
*/
router.get('/names', async (_req: Request, res: Response) => {
try {
const { rows } = await pool.query(`
SELECT
id,
name,
in_use,
assigned_to,
assigned_at
FROM worker_name_pool
ORDER BY in_use DESC, name ASC
`);
const { rows: summary } = await pool.query(`
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE in_use = true) as in_use,
COUNT(*) FILTER (WHERE in_use = false) as available
FROM worker_name_pool
`);
res.json({
success: true,
names: rows,
summary: summary[0]
});
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/worker-registry/names
* Add names to the pool
*
* Body:
* - names: string[] (required) - array of names to add
*/
router.post('/names', async (req: Request, res: Response) => {
try {
const { names } = req.body;
if (!names || !Array.isArray(names) || names.length === 0) {
return res.status(400).json({ success: false, error: 'names array is required' });
}
const values = names.map(n => `('${n.replace(/'/g, "''")}')`).join(', ');
const { rowCount } = await pool.query(`
INSERT INTO worker_name_pool (name)
VALUES ${values}
ON CONFLICT (name) DO NOTHING
`);
res.json({
success: true,
added: rowCount,
message: `Added ${rowCount} new names to the pool`
});
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* DELETE /api/worker-registry/names/:name
* Remove a name from the pool (only if not in use)
*/
router.delete('/names/:name', async (req: Request, res: Response) => {
try {
const { name } = req.params;
const { rows } = await pool.query(`
DELETE FROM worker_name_pool
WHERE name = $1 AND in_use = false
RETURNING name
`, [name]);
if (rows.length === 0) {
return res.status(400).json({
success: false,
error: 'Name not found or currently in use'
});
}
res.json({ success: true, message: `Name "${name}" removed from pool` });
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
// ============================================================
// ROLE MANAGEMENT
// ============================================================
/**
* GET /api/worker-registry/roles
* List available task roles
*/
router.get('/roles', async (_req: Request, res: Response) => {
// These are the roles the task handlers support
const roles = [
{
id: 'product_refresh',
name: 'Product Refresh',
description: 'Re-crawl dispensary products for price/stock changes',
handler: 'handleProductRefresh'
},
{
id: 'product_discovery',
name: 'Product Discovery',
description: 'Initial product discovery for new dispensaries',
handler: 'handleProductDiscovery'
},
{
id: 'store_discovery',
name: 'Store Discovery',
description: 'Discover new dispensary locations',
handler: 'handleStoreDiscovery'
},
{
id: 'entry_point_discovery',
name: 'Entry Point Discovery',
description: 'Resolve platform IDs from menu URLs',
handler: 'handleEntryPointDiscovery'
},
{
id: 'analytics_refresh',
name: 'Analytics Refresh',
description: 'Refresh materialized views and analytics',
handler: 'handleAnalyticsRefresh'
}
];
// Get active worker counts per role
try {
const { rows } = await pool.query(`
SELECT role, COUNT(*) as worker_count
FROM worker_registry
WHERE status IN ('active', 'idle')
GROUP BY role
`);
const countMap = new Map(rows.map(r => [r.role, parseInt(r.worker_count)]));
const rolesWithCounts = roles.map(r => ({
...r,
active_workers: countMap.get(r.id) || 0
}));
res.json({ success: true, roles: rolesWithCounts });
} catch {
// If table doesn't exist yet, just return roles without counts
res.json({ success: true, roles: roles.map(r => ({ ...r, active_workers: 0 })) });
}
});
/**
* GET /api/worker-registry/capacity
* Get capacity planning info
*/
router.get('/capacity', async (_req: Request, res: Response) => {
try {
// Get worker counts by role
const { rows: workerCounts } = await pool.query(`
SELECT role, COUNT(*) as count
FROM worker_registry
WHERE status IN ('active', 'idle')
GROUP BY role
`);
// Get pending task counts by role (if worker_tasks exists)
let taskCounts: any[] = [];
try {
const result = await pool.query(`
SELECT role, COUNT(*) as pending_count
FROM worker_tasks
WHERE status = 'pending'
GROUP BY role
`);
taskCounts = result.rows;
} catch {
// worker_tasks might not exist yet
}
// Get crawl-enabled store count
const storeCountResult = await pool.query(`
SELECT COUNT(*) as count
FROM dispensaries
WHERE crawl_enabled = true AND platform_dispensary_id IS NOT NULL
`);
const totalStores = parseInt(storeCountResult.rows[0].count);
const workerMap = new Map(workerCounts.map(r => [r.role, parseInt(r.count)]));
const taskMap = new Map(taskCounts.map(r => [r.role, parseInt(r.pending_count)]));
const roles = ['product_refresh', 'product_discovery', 'store_discovery', 'entry_point_discovery', 'analytics_refresh'];
const capacity = roles.map(role => ({
role,
active_workers: workerMap.get(role) || 0,
pending_tasks: taskMap.get(role) || 0,
// Rough estimate: 20 seconds per task, 4-hour cycle
tasks_per_worker_per_cycle: 720,
workers_needed_for_all_stores: Math.ceil(totalStores / 720)
}));
res.json({
success: true,
total_stores: totalStores,
capacity
});
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
export default router;

View File

@@ -109,14 +109,14 @@ export class ProxyRotator {
username,
password,
protocol,
is_active as "isActive",
last_used_at as "lastUsedAt",
active as "isActive",
last_tested_at as "lastUsedAt",
failure_count as "failureCount",
success_count as "successCount",
avg_response_time_ms as "avgResponseTimeMs"
0 as "successCount",
response_time_ms as "avgResponseTimeMs"
FROM proxies
WHERE is_active = true
ORDER BY failure_count ASC, last_used_at ASC NULLS FIRST
WHERE active = true
ORDER BY failure_count ASC, last_tested_at ASC NULLS FIRST
`);
this.proxies = result.rows;
@@ -192,11 +192,11 @@ export class ProxyRotator {
UPDATE proxies
SET
failure_count = failure_count + 1,
last_failure_at = NOW(),
last_error = $2,
is_active = CASE WHEN failure_count >= 4 THEN false ELSE is_active END
updated_at = NOW(),
test_result = $2,
active = CASE WHEN failure_count >= 4 THEN false ELSE active END
WHERE id = $1
`, [proxyId, error || null]);
`, [proxyId, error || 'failed']);
} catch (err) {
console.error(`[ProxyRotator] Failed to update proxy ${proxyId}:`, err);
}
@@ -226,12 +226,13 @@ export class ProxyRotator {
await this.pool.query(`
UPDATE proxies
SET
success_count = success_count + 1,
last_used_at = NOW(),
avg_response_time_ms = CASE
WHEN avg_response_time_ms IS NULL THEN $2
ELSE (avg_response_time_ms * 0.8) + ($2 * 0.2)
END
last_tested_at = NOW(),
test_result = 'success',
response_time_ms = CASE
WHEN response_time_ms IS NULL THEN $2
ELSE (response_time_ms * 0.8 + $2 * 0.2)::integer
END,
updated_at = NOW()
WHERE id = $1
`, [proxyId, responseTimeMs || null]);
} catch (err) {

View File

@@ -0,0 +1,134 @@
/**
* IP2Location Service
*
* Uses local IP2Location LITE DB3 database for IP geolocation.
* No external API calls, no rate limits.
*
* Database: IP2Location LITE DB3 (free, monthly updates)
* Fields: country, region, city, latitude, longitude
*/
import path from 'path';
import fs from 'fs';
// @ts-ignore - no types for ip2location-nodejs
const { IP2Location } = require('ip2location-nodejs');
const DB_PATH = process.env.IP2LOCATION_DB_PATH ||
path.join(__dirname, '../../data/ip2location/IP2LOCATION-LITE-DB5.BIN');
let ip2location: any = null;
let dbLoaded = false;
/**
* Initialize IP2Location database
*/
export function initIP2Location(): boolean {
if (dbLoaded) return true;
try {
if (!fs.existsSync(DB_PATH)) {
console.warn(`IP2Location database not found at: ${DB_PATH}`);
console.warn('Run: ./scripts/download-ip2location.sh to download');
return false;
}
ip2location = new IP2Location();
ip2location.open(DB_PATH);
dbLoaded = true;
console.log('IP2Location database loaded successfully');
return true;
} catch (err) {
console.error('Failed to load IP2Location database:', err);
return false;
}
}
/**
* Close IP2Location database
*/
export function closeIP2Location(): void {
if (ip2location) {
ip2location.close();
ip2location = null;
dbLoaded = false;
}
}
export interface GeoLocation {
city: string | null;
state: string | null;
stateCode: string | null;
country: string | null;
countryCode: string | null;
lat: number | null;
lng: number | null;
}
/**
* Lookup IP address location
*
* @param ip - IPv4 or IPv6 address
* @returns Location data or null if not found
*/
export function lookupIP(ip: string): GeoLocation | null {
// Skip private/localhost IPs
if (!ip || ip === '127.0.0.1' || ip === '::1' ||
ip.startsWith('192.168.') || ip.startsWith('10.') ||
ip.startsWith('172.16.') || ip.startsWith('172.17.') ||
ip.startsWith('::ffff:127.') || ip.startsWith('::ffff:192.168.') ||
ip.startsWith('::ffff:10.')) {
return null;
}
// Strip IPv6 prefix if present
const cleanIP = ip.replace(/^::ffff:/, '');
// Initialize on first use if not already loaded
if (!dbLoaded) {
if (!initIP2Location()) {
return null;
}
}
try {
const result = ip2location.getAll(cleanIP);
if (!result || result.ip === '?' || result.countryShort === '-') {
return null;
}
// DB3 LITE doesn't include lat/lng - would need DB5+ for that
const lat = typeof result.latitude === 'number' && result.latitude !== 0 ? result.latitude : null;
const lng = typeof result.longitude === 'number' && result.longitude !== 0 ? result.longitude : null;
return {
city: result.city !== '-' ? result.city : null,
state: result.region !== '-' ? result.region : null,
stateCode: null, // DB3 doesn't include state codes
country: result.countryLong !== '-' ? result.countryLong : null,
countryCode: result.countryShort !== '-' ? result.countryShort : null,
lat,
lng,
};
} catch (err) {
console.error('IP2Location lookup error:', err);
return null;
}
}
/**
* Check if IP2Location database is available
*/
export function isIP2LocationAvailable(): boolean {
if (dbLoaded) return true;
return fs.existsSync(DB_PATH);
}
// Export singleton-style interface
export default {
init: initIP2Location,
close: closeIP2Location,
lookup: lookupIP,
isAvailable: isIP2LocationAvailable,
};

View File

@@ -3,7 +3,7 @@ import StealthPlugin from 'puppeteer-extra-plugin-stealth';
import { Browser, Page } from 'puppeteer';
import { SocksProxyAgent } from 'socks-proxy-agent';
import { pool } from '../db/pool';
import { uploadImageFromUrl, getImageUrl } from '../utils/minio';
import { downloadProductImageLegacy } from '../utils/image-storage';
import { logger } from './logger';
import { registerScraper, updateScraperStats, completeScraper } from '../routes/scraper-monitor';
import { incrementProxyFailure, getActiveProxy, isBotDetectionError, putProxyInTimeout } from './proxy';
@@ -767,7 +767,8 @@ export async function saveProducts(storeId: number, categoryId: number, products
if (product.imageUrl && !localImagePath) {
try {
localImagePath = await uploadImageFromUrl(product.imageUrl, productId);
const result = await downloadProductImageLegacy(product.imageUrl, 0, productId);
localImagePath = result.urls?.original || null;
await client.query(`
UPDATE products
SET local_image_path = $1

View File

@@ -1,13 +1,21 @@
/**
* Entry Point Discovery Handler
*
* Detects menu type and resolves platform IDs for a discovered store.
* Resolves platform IDs for a discovered store using Dutchie GraphQL.
* This is the step between store_discovery and product_discovery.
*
* TODO: Integrate with platform ID resolution when available
* Flow:
* 1. Load dispensary info from database
* 2. Extract slug from menu_url
* 3. Start stealth session (fingerprint + optional proxy)
* 4. Query Dutchie GraphQL to resolve slug → platform_dispensary_id
* 5. Update dispensary record with resolved ID
* 6. Queue product_discovery task if successful
*/
import { TaskContext, TaskResult } from '../task-worker';
import { startSession, endSession } from '../../platforms/dutchie';
import { resolveDispensaryIdWithDetails } from '../../platforms/dutchie/queries';
export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskResult> {
const { pool, task } = ctx;
@@ -18,9 +26,11 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
}
try {
// Get dispensary info
// ============================================================
// STEP 1: Load dispensary info
// ============================================================
const dispResult = await pool.query(`
SELECT id, name, menu_url, platform_dispensary_id, menu_type
SELECT id, name, menu_url, platform_dispensary_id, menu_type, state
FROM dispensaries
WHERE id = $1
`, [dispensaryId]);
@@ -33,7 +43,7 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
// If already has platform_dispensary_id, we're done
if (dispensary.platform_dispensary_id) {
console.log(`[EntryPointDiscovery] Dispensary ${dispensaryId} already has platform ID`);
console.log(`[EntryPointDiscovery] Dispensary ${dispensaryId} already has platform ID: ${dispensary.platform_dispensary_id}`);
return {
success: true,
alreadyResolved: true,
@@ -46,9 +56,12 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
return { success: false, error: `Dispensary ${dispensaryId} has no menu_url` };
}
console.log(`[EntryPointDiscovery] Would resolve platform ID for ${dispensary.name} from ${menuUrl}`);
console.log(`[EntryPointDiscovery] Resolving platform ID for ${dispensary.name}`);
console.log(`[EntryPointDiscovery] Menu URL: ${menuUrl}`);
// Extract slug from menu URL
// ============================================================
// STEP 2: Extract slug from menu URL
// ============================================================
let slug: string | null = null;
const embeddedMatch = menuUrl.match(/\/embedded-menu\/([^/?]+)/);
@@ -61,21 +74,109 @@ export async function handleEntryPointDiscovery(ctx: TaskContext): Promise<TaskR
}
if (!slug) {
// Mark as non-dutchie menu type
await pool.query(`
UPDATE dispensaries
SET menu_type = 'unknown', updated_at = NOW()
WHERE id = $1
`, [dispensaryId]);
return {
success: false,
error: `Could not extract slug from menu_url: ${menuUrl}`,
};
}
// TODO: Integrate with actual platform ID resolution
// For now, mark the task as needing manual resolution
console.log(`[EntryPointDiscovery] Found slug: ${slug} - manual resolution needed`);
console.log(`[EntryPointDiscovery] Extracted slug: ${slug}`);
await ctx.heartbeat();
// ============================================================
// STEP 3: Start stealth session
// ============================================================
const session = startSession(dispensary.state || 'AZ', 'America/Phoenix');
console.log(`[EntryPointDiscovery] Session started: ${session.sessionId}`);
try {
// ============================================================
// STEP 4: Resolve platform ID via GraphQL
// ============================================================
console.log(`[EntryPointDiscovery] Querying Dutchie GraphQL for slug: ${slug}`);
const result = await resolveDispensaryIdWithDetails(slug);
if (!result.dispensaryId) {
// Resolution failed - could be 403, 404, or invalid response
const reason = result.httpStatus
? `HTTP ${result.httpStatus}`
: result.error || 'Unknown error';
console.log(`[EntryPointDiscovery] Failed to resolve ${slug}: ${reason}`);
// Mark as failed resolution but keep menu_type as dutchie
await pool.query(`
UPDATE dispensaries
SET
menu_type = CASE
WHEN $2 = 404 THEN 'removed'
WHEN $2 = 403 THEN 'blocked'
ELSE 'dutchie'
END,
updated_at = NOW()
WHERE id = $1
`, [dispensaryId, result.httpStatus || 0]);
return {
success: false,
error: `Could not resolve platform ID: ${reason}`,
slug,
httpStatus: result.httpStatus,
};
}
const platformId = result.dispensaryId;
console.log(`[EntryPointDiscovery] Resolved ${slug} -> ${platformId}`);
await ctx.heartbeat();
// ============================================================
// STEP 5: Update dispensary with resolved ID
// ============================================================
await pool.query(`
UPDATE dispensaries
SET
platform_dispensary_id = $2,
menu_type = 'dutchie',
crawl_enabled = true,
updated_at = NOW()
WHERE id = $1
`, [dispensaryId, platformId]);
console.log(`[EntryPointDiscovery] Updated dispensary ${dispensaryId} with platform ID`);
// ============================================================
// STEP 6: Queue product_discovery task
// ============================================================
await pool.query(`
INSERT INTO worker_tasks (role, dispensary_id, priority, scheduled_for)
VALUES ('product_discovery', $1, 5, NOW())
ON CONFLICT DO NOTHING
`, [dispensaryId]);
console.log(`[EntryPointDiscovery] Queued product_discovery task for dispensary ${dispensaryId}`);
return {
success: true,
platformId,
slug,
queuedProductDiscovery: true,
};
} finally {
// Always end session
endSession();
}
return {
success: true,
message: 'Slug extracted, awaiting platform ID resolution',
slug,
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
console.error(`[EntryPointDiscovery] Error for dispensary ${dispensaryId}:`, errorMessage);

View File

@@ -4,7 +4,7 @@
* Exports all task handlers for the task worker.
*/
export { handleProductResync } from './product-resync';
export { handleProductRefresh } from './product-refresh';
export { handleProductDiscovery } from './product-discovery';
export { handleStoreDiscovery } from './store-discovery';
export { handleEntryPointDiscovery } from './entry-point-discovery';

View File

@@ -6,11 +6,11 @@
*/
import { TaskContext, TaskResult } from '../task-worker';
import { handleProductResync } from './product-resync';
import { handleProductRefresh } from './product-refresh';
export async function handleProductDiscovery(ctx: TaskContext): Promise<TaskResult> {
// Product discovery is essentially the same as resync for the first time
// Product discovery is essentially the same as refresh for the first time
// The main difference is in when this task is triggered (new store vs scheduled)
console.log(`[ProductDiscovery] Starting initial product fetch for dispensary ${ctx.task.dispensary_id}`);
return handleProductResync(ctx);
return handleProductRefresh(ctx);
}

View File

@@ -1,5 +1,5 @@
/**
* Product Resync Handler
* Product Refresh Handler
*
* Re-crawls a store to capture price/stock changes using the GraphQL pipeline.
*
@@ -31,12 +31,12 @@ import {
const normalizer = new DutchieNormalizer();
export async function handleProductResync(ctx: TaskContext): Promise<TaskResult> {
export async function handleProductRefresh(ctx: TaskContext): Promise<TaskResult> {
const { pool, task } = ctx;
const dispensaryId = task.dispensary_id;
if (!dispensaryId) {
return { success: false, error: 'No dispensary_id specified for product_resync task' };
return { success: false, error: 'No dispensary_id specified for product_refresh task' };
}
try {

View File

@@ -17,7 +17,7 @@ export {
export { TaskWorker, TaskContext, TaskResult } from './task-worker';
export {
handleProductResync,
handleProductRefresh,
handleProductDiscovery,
handleStoreDiscovery,
handleEntryPointDiscovery,

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env npx tsx
/**
* Start Pod - Simulates a Kubernetes pod locally
*
* Starts 5 workers with a pod name from the predefined list.
*
* Usage:
* npx tsx src/tasks/start-pod.ts <pod-index>
* npx tsx src/tasks/start-pod.ts 0 # Starts pod "Aethelgard" with 5 workers
* npx tsx src/tasks/start-pod.ts 1 # Starts pod "Xylos" with 5 workers
*/
import { spawn } from 'child_process';
import path from 'path';
const POD_NAMES = [
'Aethelgard',
'Xylos',
'Kryll',
'Coriolis',
'Dimidium',
'Veridia',
'Zetani',
'Talos IV',
'Onyx',
'Celestia',
'Gormand',
'Betha',
'Ragnar',
'Syphon',
'Axiom',
'Nadir',
'Terra Nova',
'Acheron',
'Nexus',
'Vespera',
'Helios Prime',
'Oasis',
'Mordina',
'Cygnus',
'Umbra',
];
const WORKERS_PER_POD = 5;
async function main() {
const podIndex = parseInt(process.argv[2] ?? '0', 10);
if (podIndex < 0 || podIndex >= POD_NAMES.length) {
console.error(`Invalid pod index: ${podIndex}. Must be 0-${POD_NAMES.length - 1}`);
process.exit(1);
}
const podName = POD_NAMES[podIndex];
console.log(`[Pod] Starting pod "${podName}" with ${WORKERS_PER_POD} workers...`);
const workerScript = path.join(__dirname, 'task-worker.ts');
const workers: ReturnType<typeof spawn>[] = [];
for (let i = 1; i <= WORKERS_PER_POD; i++) {
const workerId = `${podName}-worker-${i}`;
const worker = spawn('npx', ['tsx', workerScript], {
env: {
...process.env,
WORKER_ID: workerId,
POD_NAME: podName,
},
stdio: 'inherit',
});
workers.push(worker);
console.log(`[Pod] Started worker ${i}/${WORKERS_PER_POD}: ${workerId}`);
}
// Handle shutdown
const shutdown = () => {
console.log(`\n[Pod] Shutting down pod "${podName}"...`);
workers.forEach(w => w.kill('SIGTERM'));
setTimeout(() => process.exit(0), 2000);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
// Keep the process alive
await new Promise(() => {});
}
main().catch(err => {
console.error('[Pod] Fatal error:', err);
process.exit(1);
});

View File

@@ -14,7 +14,7 @@ export type TaskRole =
| 'store_discovery'
| 'entry_point_discovery'
| 'product_discovery'
| 'product_resync'
| 'product_refresh'
| 'analytics_refresh';
export type TaskStatus =
@@ -29,6 +29,8 @@ export interface WorkerTask {
id: number;
role: TaskRole;
dispensary_id: number | null;
dispensary_name?: string; // JOINed from dispensaries
dispensary_slug?: string; // JOINed from dispensaries
platform: string | null;
status: TaskStatus;
priority: number;
@@ -128,13 +130,42 @@ class TaskService {
/**
* Claim a task atomically for a worker
* Uses the SQL function for proper locking
* If role is null, claims ANY available task (role-agnostic worker)
*/
async claimTask(role: TaskRole, workerId: string): Promise<WorkerTask | null> {
const result = await pool.query(
`SELECT * FROM claim_task($1, $2)`,
[role, workerId]
);
async claimTask(role: TaskRole | null, workerId: string): Promise<WorkerTask | null> {
if (role) {
// Role-specific claiming - use the SQL function
const result = await pool.query(
`SELECT * FROM claim_task($1, $2)`,
[role, workerId]
);
return (result.rows[0] as WorkerTask) || null;
}
// Role-agnostic claiming - claim ANY pending task
const result = await pool.query(`
UPDATE worker_tasks
SET
status = 'claimed',
worker_id = $1,
claimed_at = NOW()
WHERE id = (
SELECT id FROM worker_tasks
WHERE status = 'pending'
AND (scheduled_for IS NULL OR scheduled_for <= NOW())
-- Exclude stores that already have an active task
AND (dispensary_id IS NULL OR dispensary_id NOT IN (
SELECT dispensary_id FROM worker_tasks
WHERE status IN ('claimed', 'running')
AND dispensary_id IS NOT NULL
))
ORDER BY priority DESC, created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING *
`, [workerId]);
return (result.rows[0] as WorkerTask) || null;
}
@@ -206,27 +237,27 @@ class TaskService {
let paramIndex = 1;
if (filter.role) {
conditions.push(`role = $${paramIndex++}`);
conditions.push(`t.role = $${paramIndex++}`);
params.push(filter.role);
}
if (filter.status) {
if (Array.isArray(filter.status)) {
conditions.push(`status = ANY($${paramIndex++})`);
conditions.push(`t.status = ANY($${paramIndex++})`);
params.push(filter.status);
} else {
conditions.push(`status = $${paramIndex++}`);
conditions.push(`t.status = $${paramIndex++}`);
params.push(filter.status);
}
}
if (filter.dispensary_id) {
conditions.push(`dispensary_id = $${paramIndex++}`);
conditions.push(`t.dispensary_id = $${paramIndex++}`);
params.push(filter.dispensary_id);
}
if (filter.worker_id) {
conditions.push(`worker_id = $${paramIndex++}`);
conditions.push(`t.worker_id = $${paramIndex++}`);
params.push(filter.worker_id);
}
@@ -235,9 +266,14 @@ class TaskService {
const offset = filter.offset ?? 0;
const result = await pool.query(
`SELECT * FROM worker_tasks
`SELECT
t.*,
d.name as dispensary_name,
d.slug as dispensary_slug
FROM worker_tasks t
LEFT JOIN dispensaries d ON d.id = t.dispensary_id
${whereClause}
ORDER BY created_at DESC
ORDER BY t.created_at DESC
LIMIT ${limit} OFFSET ${offset}`,
params
);

View File

@@ -1,26 +1,58 @@
/**
* Task Worker
*
* A unified worker that processes tasks from the worker_tasks queue.
* Replaces the fragmented job systems (job_schedules, dispensary_crawl_jobs, etc.)
* A unified worker that pulls tasks from the worker_tasks queue.
* Workers register on startup, get a friendly name, and pull tasks.
*
* Architecture:
* - Tasks are generated on schedule (by scheduler or API)
* - Workers PULL tasks from the pool (not assigned to them)
* - Tasks are claimed in order of priority (DESC) then creation time (ASC)
* - Workers report heartbeats to worker_registry
* - Workers are ROLE-AGNOSTIC by default (can handle any task type)
*
* Stealth & Anti-Detection:
* PROXIES ARE REQUIRED - workers will fail to start if no proxies available.
*
* On startup, workers initialize the CrawlRotator which provides:
* - Proxy rotation: Loads proxies from `proxies` table, ALL requests use proxy
* - User-Agent rotation: Cycles through realistic browser fingerprints
* - Fingerprint rotation: Changes browser profile on blocks
* - Locale/timezone: Matches Accept-Language to target state
*
* The CrawlRotator is wired to the Dutchie client via setCrawlRotator().
* Task handlers call startSession() which picks a random fingerprint.
* On 403 errors, the client automatically:
* 1. Records failure on current proxy
* 2. Rotates to next proxy
* 3. Rotates fingerprint
* 4. Retries the request
*
* Usage:
* WORKER_ROLE=product_resync npx tsx src/tasks/task-worker.ts
* npx tsx src/tasks/task-worker.ts # Role-agnostic (any task)
* WORKER_ROLE=product_refresh npx tsx src/tasks/task-worker.ts # Role-specific
*
* Environment:
* WORKER_ROLE - Which task role to process (required)
* WORKER_ID - Optional custom worker ID
* WORKER_ROLE - Which task role to process (optional, null = any task)
* WORKER_ID - Optional custom worker ID (auto-generated if not provided)
* POD_NAME - Kubernetes pod name (optional)
* POLL_INTERVAL_MS - How often to check for tasks (default: 5000)
* HEARTBEAT_INTERVAL_MS - How often to update heartbeat (default: 30000)
* API_BASE_URL - Backend API URL for registration (default: http://localhost:3010)
*/
import { Pool } from 'pg';
import { v4 as uuidv4 } from 'uuid';
import { taskService, TaskRole, WorkerTask } from './task-service';
import { getPool } from '../db/pool';
import os from 'os';
// Stealth/rotation support
import { CrawlRotator } from '../services/crawl-rotator';
import { setCrawlRotator } from '../platforms/dutchie';
// Task handlers by role
import { handleProductResync } from './handlers/product-resync';
import { handleProductRefresh } from './handlers/product-refresh';
import { handleProductDiscovery } from './handlers/product-discovery';
import { handleStoreDiscovery } from './handlers/store-discovery';
import { handleEntryPointDiscovery } from './handlers/entry-point-discovery';
@@ -28,6 +60,7 @@ import { handleAnalyticsRefresh } from './handlers/analytics-refresh';
const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000');
const HEARTBEAT_INTERVAL_MS = parseInt(process.env.HEARTBEAT_INTERVAL_MS || '30000');
const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010';
export interface TaskContext {
pool: Pool;
@@ -48,7 +81,7 @@ export interface TaskResult {
type TaskHandler = (ctx: TaskContext) => Promise<TaskResult>;
const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
product_resync: handleProductResync,
product_refresh: handleProductRefresh,
product_discovery: handleProductDiscovery,
store_discovery: handleStoreDiscovery,
entry_point_discovery: handleEntryPointDiscovery,
@@ -58,15 +91,160 @@ const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
export class TaskWorker {
private pool: Pool;
private workerId: string;
private role: TaskRole;
private role: TaskRole | null; // null = role-agnostic (any task)
private friendlyName: string = '';
private isRunning: boolean = false;
private heartbeatInterval: NodeJS.Timeout | null = null;
private registryHeartbeatInterval: NodeJS.Timeout | null = null;
private currentTask: WorkerTask | null = null;
private crawlRotator: CrawlRotator;
constructor(role: TaskRole, workerId?: string) {
constructor(role: TaskRole | null = null, workerId?: string) {
this.pool = getPool();
this.role = role;
this.workerId = workerId || `worker-${role}-${uuidv4().slice(0, 8)}`;
this.workerId = workerId || `worker-${uuidv4().slice(0, 8)}`;
this.crawlRotator = new CrawlRotator(this.pool);
}
/**
* Initialize stealth systems (proxy rotation, fingerprints)
* Called once on worker startup before processing any tasks.
*
* IMPORTANT: Proxies are REQUIRED. Workers will fail to start if no proxies available.
*/
private async initializeStealth(): Promise<void> {
// Load proxies from database
await this.crawlRotator.initialize();
const stats = this.crawlRotator.proxy.getStats();
if (stats.activeProxies === 0) {
throw new Error('No active proxies available. Workers MUST use proxies for all requests. Add proxies to the database before starting workers.');
}
console.log(`[TaskWorker] Loaded ${stats.activeProxies} proxies (${stats.avgSuccessRate.toFixed(1)}% avg success rate)`);
// Wire rotator to Dutchie client - proxies will be used for ALL requests
setCrawlRotator(this.crawlRotator);
console.log(`[TaskWorker] Stealth initialized: ${this.crawlRotator.userAgent.getCount()} fingerprints, proxy REQUIRED for all requests`);
}
/**
* Register worker with the registry (get friendly name)
*/
private async register(): Promise<void> {
try {
const response = await fetch(`${API_BASE_URL}/api/worker-registry/register`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
role: this.role,
worker_id: this.workerId,
pod_name: process.env.POD_NAME || process.env.HOSTNAME,
hostname: os.hostname(),
metadata: {
pid: process.pid,
node_version: process.version,
started_at: new Date().toISOString()
}
})
});
const data = await response.json();
if (data.success) {
this.friendlyName = data.friendly_name;
console.log(`[TaskWorker] ${data.message}`);
} else {
console.warn(`[TaskWorker] Registration warning: ${data.error}`);
this.friendlyName = this.workerId.slice(0, 12);
}
} catch (error: any) {
// Registration is optional - worker can still function without it
console.warn(`[TaskWorker] Could not register with API (will continue): ${error.message}`);
this.friendlyName = this.workerId.slice(0, 12);
}
}
/**
* Deregister worker from the registry
*/
private async deregister(): Promise<void> {
try {
await fetch(`${API_BASE_URL}/api/worker-registry/deregister`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ worker_id: this.workerId })
});
console.log(`[TaskWorker] ${this.friendlyName} signed off`);
} catch {
// Ignore deregistration errors
}
}
/**
* Send heartbeat to registry with resource usage
*/
private async sendRegistryHeartbeat(): Promise<void> {
try {
const memUsage = process.memoryUsage();
const cpuUsage = process.cpuUsage();
await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
worker_id: this.workerId,
current_task_id: this.currentTask?.id || null,
status: this.currentTask ? 'active' : 'idle',
resources: {
memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024),
memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024),
memory_rss_mb: Math.round(memUsage.rss / 1024 / 1024),
cpu_user_ms: Math.round(cpuUsage.user / 1000),
cpu_system_ms: Math.round(cpuUsage.system / 1000),
}
})
});
} catch {
// Ignore heartbeat errors
}
}
/**
* Report task completion to registry
*/
private async reportTaskCompletion(success: boolean): Promise<void> {
try {
await fetch(`${API_BASE_URL}/api/worker-registry/task-completed`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
worker_id: this.workerId,
success
})
});
} catch {
// Ignore errors
}
}
/**
* Start registry heartbeat interval
*/
private startRegistryHeartbeat(): void {
this.registryHeartbeatInterval = setInterval(async () => {
await this.sendRegistryHeartbeat();
}, HEARTBEAT_INTERVAL_MS);
}
/**
* Stop registry heartbeat interval
*/
private stopRegistryHeartbeat(): void {
if (this.registryHeartbeatInterval) {
clearInterval(this.registryHeartbeatInterval);
this.registryHeartbeatInterval = null;
}
}
/**
@@ -74,7 +252,18 @@ export class TaskWorker {
*/
async start(): Promise<void> {
this.isRunning = true;
console.log(`[TaskWorker] Starting worker ${this.workerId} for role: ${this.role}`);
// Initialize stealth systems (proxy rotation, fingerprints)
await this.initializeStealth();
// Register with the API to get a friendly name
await this.register();
// Start registry heartbeat
this.startRegistryHeartbeat();
const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)';
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg}`);
while (this.isRunning) {
try {
@@ -91,10 +280,12 @@ export class TaskWorker {
/**
* Stop the worker
*/
stop(): void {
async stop(): Promise<void> {
this.isRunning = false;
this.stopHeartbeat();
console.log(`[TaskWorker] Stopping worker ${this.workerId}...`);
this.stopRegistryHeartbeat();
await this.deregister();
console.log(`[TaskWorker] ${this.friendlyName} stopped`);
}
/**
@@ -142,7 +333,8 @@ export class TaskWorker {
if (result.success) {
// Mark as completed
await taskService.completeTask(task.id, result);
console.log(`[TaskWorker] Task ${task.id} completed successfully`);
await this.reportTaskCompletion(true);
console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id}`);
// Chain next task if applicable
const chainedTask = await taskService.chainNextTask({
@@ -156,12 +348,14 @@ export class TaskWorker {
} else {
// Mark as failed
await taskService.failTask(task.id, result.error || 'Unknown error');
console.log(`[TaskWorker] Task ${task.id} failed: ${result.error}`);
await this.reportTaskCompletion(false);
console.log(`[TaskWorker] ${this.friendlyName} failed task ${task.id}: ${result.error}`);
}
} catch (error: any) {
// Mark as failed
await taskService.failTask(task.id, error.message);
console.error(`[TaskWorker] Task ${task.id} threw error:`, error.message);
await this.reportTaskCompletion(false);
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} error:`, error.message);
} finally {
this.stopHeartbeat();
this.currentTask = null;
@@ -201,7 +395,7 @@ export class TaskWorker {
/**
* Get worker info
*/
getInfo(): { workerId: string; role: TaskRole; isRunning: boolean; currentTaskId: number | null } {
getInfo(): { workerId: string; role: TaskRole | null; isRunning: boolean; currentTaskId: number | null } {
return {
workerId: this.workerId,
role: this.role,
@@ -216,30 +410,27 @@ export class TaskWorker {
// ============================================================
async function main(): Promise<void> {
const role = process.env.WORKER_ROLE as TaskRole;
if (!role) {
console.error('Error: WORKER_ROLE environment variable is required');
console.error('Valid roles: store_discovery, entry_point_discovery, product_discovery, product_resync, analytics_refresh');
process.exit(1);
}
const role = process.env.WORKER_ROLE as TaskRole | undefined;
const validRoles: TaskRole[] = [
'store_discovery',
'entry_point_discovery',
'product_discovery',
'product_resync',
'product_refresh',
'analytics_refresh',
];
if (!validRoles.includes(role)) {
// If role specified, validate it
if (role && !validRoles.includes(role)) {
console.error(`Error: Invalid WORKER_ROLE: ${role}`);
console.error(`Valid roles: ${validRoles.join(', ')}`);
console.error('Or omit WORKER_ROLE for role-agnostic worker (any task)');
process.exit(1);
}
const workerId = process.env.WORKER_ID;
const worker = new TaskWorker(role, workerId);
// Pass null for role-agnostic, or the specific role
const worker = new TaskWorker(role || null, workerId);
// Handle graceful shutdown
process.on('SIGTERM', () => {