## Worker System - Role-agnostic workers that can handle any task type - Pod-based architecture with StatefulSet (5-15 pods, 5 workers each) - Custom pod names (Aethelgard, Xylos, Kryll, etc.) - Worker registry with friendly names and resource monitoring - Hub-and-spoke visualization on JobQueue page ## Stealth & Anti-Detection (REQUIRED) - Proxies are MANDATORY - workers fail to start without active proxies - CrawlRotator initializes on worker startup - Loads proxies from `proxies` table - Auto-rotates proxy + fingerprint on 403 errors - 12 browser fingerprints (Chrome, Firefox, Safari, Edge) - Locale/timezone matching for geographic consistency ## Task System - Renamed product_resync → product_refresh - Task chaining: store_discovery → entry_point → product_discovery - Priority-based claiming with FOR UPDATE SKIP LOCKED - Heartbeat and stale task recovery ## UI Updates - JobQueue: Pod visualization, resource monitoring on hover - WorkersDashboard: Simplified worker list - Removed unused filters from task list ## Other - IP2Location service for visitor analytics - Findagram consumer features scaffolding - Documentation updates 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
16 KiB
Crawl Pipeline Documentation
Overview
The crawl pipeline fetches product data from Dutchie dispensary menus and stores it in the canonical database. This document covers the complete flow from task scheduling to data storage.
Pipeline Stages
┌─────────────────────┐
│ store_discovery │ Find new dispensaries
└─────────┬───────────┘
│
▼
┌─────────────────────┐
│ entry_point_discovery│ Resolve slug → platform_dispensary_id
└─────────┬───────────┘
│
▼
┌─────────────────────┐
│ product_discovery │ Initial product crawl
└─────────┬───────────┘
│
▼
┌─────────────────────┐
│ product_resync │ Recurring crawl (every 4 hours)
└─────────────────────┘
Stage Details
1. Store Discovery
Purpose: Find new dispensaries to crawl
Handler: src/tasks/handlers/store-discovery.ts
Flow:
- Query Dutchie
ConsumerDispensariesGraphQL for cities/states - Extract dispensary info (name, address, menu_url)
- Insert into
dutchie_discovery_locations - Queue
entry_point_discoveryfor each new location
2. Entry Point Discovery
Purpose: Resolve menu URL slug to platform_dispensary_id (MongoDB ObjectId)
Handler: src/tasks/handlers/entry-point-discovery.ts
Flow:
- Load dispensary from database
- Extract slug from
menu_url:/embedded-menu/<slug>or/dispensary/<slug>
- Start stealth session (fingerprint + proxy)
- Query
resolveDispensaryIdWithDetails(slug)via GraphQL - Update dispensary with
platform_dispensary_id - Queue
product_discoverytask
Example:
menu_url: https://dutchie.com/embedded-menu/deeply-rooted
slug: deeply-rooted
platform_dispensary_id: 6405ef617056e8014d79101b
3. Product Discovery
Purpose: Initial crawl of a new dispensary
Handler: src/tasks/handlers/product-discovery.ts
Same as product_resync but for first-time crawls.
4. Product Resync
Purpose: Recurring crawl to capture price/stock changes
Handler: src/tasks/handlers/product-resync.ts
Flow:
Step 1: Load Dispensary Info
SELECT id, name, platform_dispensary_id, menu_url, state
FROM dispensaries
WHERE id = $1 AND crawl_enabled = true
Step 2: Start Stealth Session
- Generate random browser fingerprint
- Set locale/timezone matching state
- Optional proxy rotation
Step 3: Fetch Products via GraphQL
Endpoint: https://dutchie.com/api-3/graphql
Variables:
{
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: "<platform_dispensary_id>",
pricingType: "rec",
Status: "All",
types: [],
useCache: false,
isDefaultSort: true,
sortBy: "popularSortIdx",
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false
},
page: 0,
perPage: 100
}
Key Notes:
Status: "All"returns all products (Active returns same count)Status: nullreturns 0 products (broken)pricingType: "rec"returns BOTH rec and med prices- Paginate until
products.length < perPageorallProducts.length >= totalCount
Step 4: Normalize Data
Transform raw Dutchie payload to canonical format via DutchieNormalizer.
Step 5: Upsert Products
Insert/update store_products table with normalized data.
Step 6: Create Snapshots
Insert point-in-time record to store_product_snapshots.
Step 7: Track Missing Products (OOS Detection)
-- Reset consecutive_misses for products IN the feed
UPDATE store_products
SET consecutive_misses = 0, last_seen_at = NOW()
WHERE dispensary_id = $1
AND provider = 'dutchie'
AND provider_product_id = ANY($2)
-- Increment for products NOT in feed
UPDATE store_products
SET consecutive_misses = consecutive_misses + 1
WHERE dispensary_id = $1
AND provider = 'dutchie'
AND provider_product_id NOT IN (...)
AND consecutive_misses < 3
-- Mark OOS at 3 consecutive misses
UPDATE store_products
SET stock_status = 'oos', is_in_stock = false
WHERE dispensary_id = $1
AND consecutive_misses >= 3
AND stock_status != 'oos'
Step 8: Download Images
For new products, download and store images locally.
Step 9: Update Dispensary
UPDATE dispensaries SET last_crawl_at = NOW() WHERE id = $1
GraphQL Payload Structure
Product Fields (from filteredProducts.products[])
| Field | Type | Description |
|---|---|---|
_id / id |
string | MongoDB ObjectId (24 hex chars) |
Name |
string | Product display name |
brandName |
string | Brand name |
brand.name |
string | Brand name (nested) |
brand.description |
string | Brand description |
type |
string | Category (Flower, Edible, Concentrate, etc.) |
subcategory |
string | Subcategory |
strainType |
string | Hybrid, Indica, Sativa, N/A |
Status |
string | Always "Active" in feed |
Image |
string | Primary image URL |
images[] |
array | All product images |
Pricing Fields
| Field | Type | Description |
|---|---|---|
Prices[] |
number[] | Rec prices per option |
recPrices[] |
number[] | Rec prices |
medicalPrices[] |
number[] | Medical prices |
recSpecialPrices[] |
number[] | Rec sale prices |
medicalSpecialPrices[] |
number[] | Medical sale prices |
Options[] |
string[] | Size options ("1/8oz", "1g", etc.) |
rawOptions[] |
string[] | Raw weight options ("3.5g") |
Inventory Fields (POSMetaData.children[])
| Field | Type | Description |
|---|---|---|
quantity |
number | Total inventory count |
quantityAvailable |
number | Available for online orders |
kioskQuantityAvailable |
number | Available for kiosk orders |
option |
string | Which size option this is for |
Potency Fields
| Field | Type | Description |
|---|---|---|
THCContent.range[] |
number[] | THC percentage |
CBDContent.range[] |
number[] | CBD percentage |
cannabinoidsV2[] |
array | Detailed cannabinoid breakdown |
Specials (specialData.bogoSpecials[])
| Field | Type | Description |
|---|---|---|
specialName |
string | Deal name |
specialType |
string | "bogo", "sale", etc. |
itemsForAPrice.value |
string | Bundle price |
bogoRewards[].totalQuantity.quantity |
number | Required quantity |
OOS Detection Logic
Products disappear from the Dutchie feed when they go out of stock. We track this via consecutive_misses:
| Scenario | Action |
|---|---|
| Product in feed | consecutive_misses = 0 |
| Product missing 1st time | consecutive_misses = 1 |
| Product missing 2nd time | consecutive_misses = 2 |
| Product missing 3rd time | consecutive_misses = 3, mark stock_status = 'oos' |
| Product returns to feed | consecutive_misses = 0, update stock_status |
Why 3 misses?
- Protects against false positives from crawl failures
- Single bad crawl doesn't trigger mass OOS alerts
- Balances detection speed vs accuracy
Database Tables
store_products
Current state of each product:
provider_product_id- Dutchie's MongoDB ObjectIdname_raw,brand_name_raw- Raw values from feedprice_rec,price_med- Current pricesis_in_stock,stock_status- Availabilityconsecutive_misses- OOS detection counterlast_seen_at- Last time product was in feed
store_product_snapshots
Point-in-time records for historical analysis:
- One row per product per crawl
- Captures price, stock, potency at that moment
- Used for price history, analytics
dispensaries
Store metadata:
platform_dispensary_id- MongoDB ObjectId for GraphQLmenu_url- Source URLlast_crawl_at- Last successful crawlcrawl_enabled- Whether to crawl
Worker Roles
Workers pull tasks from the worker_tasks queue based on their assigned role.
| Role | Name | Description | Handler |
|---|---|---|---|
product_resync |
Product Resync | Re-crawl dispensary products for price/stock changes | handleProductResync |
product_discovery |
Product Discovery | Initial product discovery for new dispensaries | handleProductDiscovery |
store_discovery |
Store Discovery | Discover new dispensary locations | handleStoreDiscovery |
entry_point_discovery |
Entry Point Discovery | Resolve platform IDs from menu URLs | handleEntryPointDiscovery |
analytics_refresh |
Analytics Refresh | Refresh materialized views and analytics | handleAnalyticsRefresh |
API Endpoint: GET /api/worker-registry/roles
Scheduling
Crawls are scheduled via worker_tasks table:
| Role | Frequency | Description |
|---|---|---|
product_resync |
Every 4 hours | Regular product refresh |
product_discovery |
On-demand | First crawl for new stores |
entry_point_discovery |
On-demand | New store setup |
store_discovery |
Daily | Find new stores |
analytics_refresh |
Daily | Refresh analytics materialized views |
Priority & On-Demand Tasks
Tasks are claimed by workers in order of priority DESC, created_at ASC.
Priority Levels
| Priority | Use Case | Example |
|---|---|---|
| 0 | Scheduled/batch tasks | Daily product_resync generation |
| 10 | On-demand/chained tasks | entry_point → product_discovery |
| Higher | Urgent/manual triggers | Admin-triggered immediate crawl |
Task Chaining
When a task completes, the system automatically creates follow-up tasks:
store_discovery (completed)
└─► entry_point_discovery (priority: 10) for each new store
entry_point_discovery (completed, success)
└─► product_discovery (priority: 10) for that store
product_discovery (completed)
└─► [no chain] Store enters regular resync schedule
On-Demand Task Creation
Use the task service to create high-priority tasks:
// Create immediate product resync for a store
await taskService.createTask({
role: 'product_resync',
dispensary_id: 123,
platform: 'dutchie',
priority: 20, // Higher than batch tasks
});
// Convenience methods with default high priority (10)
await taskService.createEntryPointTask(dispensaryId, 'dutchie');
await taskService.createProductDiscoveryTask(dispensaryId, 'dutchie');
await taskService.createStoreDiscoveryTask('dutchie', 'AZ');
Claim Function
The claim_task() SQL function atomically claims tasks:
- Respects priority ordering (higher = first)
- Uses
FOR UPDATE SKIP LOCKEDfor concurrency - Prevents multiple active tasks per store
Image Storage
Images are downloaded from Dutchie's AWS S3 and stored locally with on-demand resizing.
Storage Path
/storage/images/products/<state>/<store>/<brand>/<product_id>/image-<hash>.webp
/storage/images/brands/<brand>/logo-<hash>.webp
Example:
/storage/images/products/az/az-deeply-rooted/bud-bros/6913e3cd444eac3935e928b9/image-ae38b1f9.webp
Image Proxy API
Served via /img/* with on-demand resizing using sharp:
GET /img/products/az/az-deeply-rooted/bud-bros/6913e3cd444eac3935e928b9/image-ae38b1f9.webp?w=200
| Param | Description |
|---|---|
w |
Width in pixels (max 4000) |
h |
Height in pixels (max 4000) |
q |
Quality 1-100 (default 80) |
fit |
cover, contain, fill, inside, outside |
blur |
Blur sigma (0.3-1000) |
gray |
Grayscale (1 = enabled) |
format |
webp, jpeg, png, avif (default webp) |
Key Files
| File | Purpose |
|---|---|
src/utils/image-storage.ts |
Download & save images to local filesystem |
src/routes/image-proxy.ts |
On-demand resize/transform at /img/* |
Download Rules
| Scenario | Image Action |
|---|---|
| New product (first crawl) | Download if primaryImageUrl exists |
| Existing product (refresh) | Download only if local_image_path is NULL (backfill) |
| Product already has local image | Skip download entirely |
Logic:
- Images are downloaded once and never re-downloaded on subsequent crawls
skipIfExists: true- filesystem check prevents re-download even if queued- First crawl: all products get images
- Refresh crawl: only new products or products missing local images
Storage Rules
- NO MinIO - local filesystem only (
STORAGE_DRIVER=local) - Store full resolution, resize on-demand via
/imgproxy - Convert to webp for consistency using sharp
- Preserve original Dutchie URL as fallback in
image_urlcolumn - Local path stored in
local_image_pathcolumn
Stealth & Anti-Detection
PROXIES ARE REQUIRED - Workers will fail to start if no active proxies are available in the database. All HTTP requests to Dutchie go through a proxy.
Workers automatically initialize anti-detection systems on startup.
Components
| Component | Purpose | Source |
|---|---|---|
| CrawlRotator | Coordinates proxy + UA rotation | src/services/crawl-rotator.ts |
| ProxyRotator | Round-robin proxy selection, health tracking | src/services/crawl-rotator.ts |
| UserAgentRotator | Cycles through realistic browser fingerprints | src/services/crawl-rotator.ts |
| Dutchie Client | Curl-based HTTP with auto-retry on 403 | src/platforms/dutchie/client.ts |
Initialization Flow
Worker Start
│
├─► initializeStealth()
│ │
│ ├─► CrawlRotator.initialize()
│ │ └─► Load proxies from `proxies` table
│ │
│ └─► setCrawlRotator(rotator)
│ └─► Wire to Dutchie client
│
└─► Process tasks...
Stealth Session (per task)
Each crawl task starts a stealth session:
// In product-refresh.ts, entry-point-discovery.ts
const session = startSession(dispensary.state || 'AZ', 'America/Phoenix');
This creates a new identity with:
- Random fingerprint: Chrome/Firefox/Safari/Edge on Win/Mac/Linux
- Accept-Language: Matches timezone (e.g.,
America/Phoenix→en-US,en;q=0.9) - sec-ch-ua headers: Proper Client Hints for the browser profile
On 403 Block
When Dutchie returns 403, the client automatically:
- Records failure on current proxy (increments
failure_count) - If proxy has 5+ failures, deactivates it
- Rotates to next healthy proxy
- Rotates fingerprint
- Retries the request
Proxy Table Schema
CREATE TABLE proxies (
id SERIAL PRIMARY KEY,
host VARCHAR(255) NOT NULL,
port INTEGER NOT NULL,
username VARCHAR(100),
password VARCHAR(100),
protocol VARCHAR(10) DEFAULT 'http', -- http, https, socks5
is_active BOOLEAN DEFAULT true,
last_used_at TIMESTAMPTZ,
failure_count INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
avg_response_time_ms INTEGER,
last_failure_at TIMESTAMPTZ,
last_error TEXT
);
Configuration
Proxies are mandatory. There is no environment variable to disable them. Workers will refuse to start without active proxies in the database.
Fingerprints Available
The client includes 6 browser fingerprints:
- Chrome 131 on Windows
- Chrome 131 on macOS
- Chrome 120 on Windows
- Firefox 133 on Windows
- Safari 17.2 on macOS
- Edge 131 on Windows
Each includes proper sec-ch-ua, sec-ch-ua-platform, and sec-ch-ua-mobile headers.
Error Handling
- GraphQL errors: Logged, task marked failed, retried later
- Normalization errors: Logged as warnings, continue with valid products
- Image download errors: Non-fatal, logged, continue
- Database errors: Task fails, will be retried
- 403 blocks: Auto-rotate proxy + fingerprint, retry (up to 3 retries)
Files
| File | Purpose |
|---|---|
src/tasks/handlers/product-resync.ts |
Main crawl handler |
src/tasks/handlers/entry-point-discovery.ts |
Slug → ID resolution |
src/platforms/dutchie/index.ts |
GraphQL client, session management |
src/hydration/normalizers/dutchie.ts |
Payload normalization |
src/hydration/canonical-upsert.ts |
Database upsert logic |
src/utils/image-storage.ts |
Image download and local storage |
src/routes/image-proxy.ts |
On-demand image resizing |
migrations/075_consecutive_misses.sql |
OOS tracking column |