From 0295637ed6d1263d96538ef250e163b30cfd25a8 Mon Sep 17 00:00:00 2001 From: Kelly Date: Tue, 9 Dec 2025 20:44:53 -0700 Subject: [PATCH] fix: Public API column mappings and OOS detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix store_products column references (name_raw, brand_name_raw, category_raw) - Fix v_product_snapshots column references (crawled_at, *_cents pricing) - Fix dispensaries column references (zipcode, logo_image, remove hours/amenities) - Add services and license_type to dispensary API response - Add consecutive_misses OOS tracking to product-resync handler - Add migration 075 for consecutive_misses column - Add CRAWL_PIPELINE.md documentation πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/docs/CRAWL_PIPELINE.md | 308 ++++++++++++++++++ backend/migrations/075_consecutive_misses.sql | 13 + backend/src/routes/public-api.ts | 175 ++++++---- backend/src/tasks/handlers/product-resync.ts | 299 ++++++++++++++++- 4 files changed, 727 insertions(+), 68 deletions(-) create mode 100644 backend/docs/CRAWL_PIPELINE.md create mode 100644 backend/migrations/075_consecutive_misses.sql diff --git a/backend/docs/CRAWL_PIPELINE.md b/backend/docs/CRAWL_PIPELINE.md new file mode 100644 index 00000000..239c2161 --- /dev/null +++ b/backend/docs/CRAWL_PIPELINE.md @@ -0,0 +1,308 @@ +# Crawl Pipeline Documentation + +## Overview + +The crawl pipeline fetches product data from Dutchie dispensary menus and stores it in the canonical database. This document covers the complete flow from task scheduling to data storage. + +--- + +## Pipeline Stages + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ store_discovery β”‚ Find new dispensaries +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ entry_point_discoveryβ”‚ Resolve slug β†’ platform_dispensary_id +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ product_discovery β”‚ Initial product crawl +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β–Ό +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ product_resync β”‚ Recurring crawl (every 4 hours) +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +--- + +## Stage Details + +### 1. Store Discovery +**Purpose:** Find new dispensaries to crawl + +**Handler:** `src/tasks/handlers/store-discovery.ts` + +**Flow:** +1. Query Dutchie `ConsumerDispensaries` GraphQL for cities/states +2. Extract dispensary info (name, address, menu_url) +3. Insert into `dutchie_discovery_locations` +4. Queue `entry_point_discovery` for each new location + +--- + +### 2. Entry Point Discovery +**Purpose:** Resolve menu URL slug to platform_dispensary_id (MongoDB ObjectId) + +**Handler:** `src/tasks/handlers/entry-point-discovery.ts` + +**Flow:** +1. Load dispensary from database +2. Extract slug from `menu_url`: + - `/embedded-menu/` or `/dispensary/` +3. Start stealth session (fingerprint + proxy) +4. Query `resolveDispensaryIdWithDetails(slug)` via GraphQL +5. Update dispensary with `platform_dispensary_id` +6. Queue `product_discovery` task + +**Example:** +``` +menu_url: https://dutchie.com/embedded-menu/deeply-rooted +slug: deeply-rooted +platform_dispensary_id: 6405ef617056e8014d79101b +``` + +--- + +### 3. Product Discovery +**Purpose:** Initial crawl of a new dispensary + +**Handler:** `src/tasks/handlers/product-discovery.ts` + +Same as product_resync but for first-time crawls. + +--- + +### 4. Product Resync +**Purpose:** Recurring crawl to capture price/stock changes + +**Handler:** `src/tasks/handlers/product-resync.ts` + +**Flow:** + +#### Step 1: Load Dispensary Info +```sql +SELECT id, name, platform_dispensary_id, menu_url, state +FROM dispensaries +WHERE id = $1 AND crawl_enabled = true +``` + +#### Step 2: Start Stealth Session +- Generate random browser fingerprint +- Set locale/timezone matching state +- Optional proxy rotation + +#### Step 3: Fetch Products via GraphQL +**Endpoint:** `https://dutchie.com/api-3/graphql` + +**Variables:** +```javascript +{ + includeEnterpriseSpecials: false, + productsFilter: { + dispensaryId: "", + pricingType: "rec", + Status: "All", + types: [], + useCache: false, + isDefaultSort: true, + sortBy: "popularSortIdx", + sortDirection: 1, + bypassOnlineThresholds: true, + isKioskMenu: false, + removeProductsBelowOptionThresholds: false + }, + page: 0, + perPage: 100 +} +``` + +**Key Notes:** +- `Status: "All"` returns all products (Active returns same count) +- `Status: null` returns 0 products (broken) +- `pricingType: "rec"` returns BOTH rec and med prices +- Paginate until `products.length < perPage` or `allProducts.length >= totalCount` + +#### Step 4: Normalize Data +Transform raw Dutchie payload to canonical format via `DutchieNormalizer`. + +#### Step 5: Upsert Products +Insert/update `store_products` table with normalized data. + +#### Step 6: Create Snapshots +Insert point-in-time record to `store_product_snapshots`. + +#### Step 7: Track Missing Products (OOS Detection) +```sql +-- Reset consecutive_misses for products IN the feed +UPDATE store_products +SET consecutive_misses = 0, last_seen_at = NOW() +WHERE dispensary_id = $1 + AND provider = 'dutchie' + AND provider_product_id = ANY($2) + +-- Increment for products NOT in feed +UPDATE store_products +SET consecutive_misses = consecutive_misses + 1 +WHERE dispensary_id = $1 + AND provider = 'dutchie' + AND provider_product_id NOT IN (...) + AND consecutive_misses < 3 + +-- Mark OOS at 3 consecutive misses +UPDATE store_products +SET stock_status = 'oos', is_in_stock = false +WHERE dispensary_id = $1 + AND consecutive_misses >= 3 + AND stock_status != 'oos' +``` + +#### Step 8: Download Images +For new products, download and store images locally. + +#### Step 9: Update Dispensary +```sql +UPDATE dispensaries SET last_crawl_at = NOW() WHERE id = $1 +``` + +--- + +## GraphQL Payload Structure + +### Product Fields (from filteredProducts.products[]) + +| Field | Type | Description | +|-------|------|-------------| +| `_id` / `id` | string | MongoDB ObjectId (24 hex chars) | +| `Name` | string | Product display name | +| `brandName` | string | Brand name | +| `brand.name` | string | Brand name (nested) | +| `brand.description` | string | Brand description | +| `type` | string | Category (Flower, Edible, Concentrate, etc.) | +| `subcategory` | string | Subcategory | +| `strainType` | string | Hybrid, Indica, Sativa, N/A | +| `Status` | string | Always "Active" in feed | +| `Image` | string | Primary image URL | +| `images[]` | array | All product images | + +### Pricing Fields + +| Field | Type | Description | +|-------|------|-------------| +| `Prices[]` | number[] | Rec prices per option | +| `recPrices[]` | number[] | Rec prices | +| `medicalPrices[]` | number[] | Medical prices | +| `recSpecialPrices[]` | number[] | Rec sale prices | +| `medicalSpecialPrices[]` | number[] | Medical sale prices | +| `Options[]` | string[] | Size options ("1/8oz", "1g", etc.) | +| `rawOptions[]` | string[] | Raw weight options ("3.5g") | + +### Inventory Fields (POSMetaData.children[]) + +| Field | Type | Description | +|-------|------|-------------| +| `quantity` | number | Total inventory count | +| `quantityAvailable` | number | Available for online orders | +| `kioskQuantityAvailable` | number | Available for kiosk orders | +| `option` | string | Which size option this is for | + +### Potency Fields + +| Field | Type | Description | +|-------|------|-------------| +| `THCContent.range[]` | number[] | THC percentage | +| `CBDContent.range[]` | number[] | CBD percentage | +| `cannabinoidsV2[]` | array | Detailed cannabinoid breakdown | + +### Specials (specialData.bogoSpecials[]) + +| Field | Type | Description | +|-------|------|-------------| +| `specialName` | string | Deal name | +| `specialType` | string | "bogo", "sale", etc. | +| `itemsForAPrice.value` | string | Bundle price | +| `bogoRewards[].totalQuantity.quantity` | number | Required quantity | + +--- + +## OOS Detection Logic + +Products disappear from the Dutchie feed when they go out of stock. We track this via `consecutive_misses`: + +| Scenario | Action | +|----------|--------| +| Product in feed | `consecutive_misses = 0` | +| Product missing 1st time | `consecutive_misses = 1` | +| Product missing 2nd time | `consecutive_misses = 2` | +| Product missing 3rd time | `consecutive_misses = 3`, mark `stock_status = 'oos'` | +| Product returns to feed | `consecutive_misses = 0`, update stock_status | + +**Why 3 misses?** +- Protects against false positives from crawl failures +- Single bad crawl doesn't trigger mass OOS alerts +- Balances detection speed vs accuracy + +--- + +## Database Tables + +### store_products +Current state of each product: +- `provider_product_id` - Dutchie's MongoDB ObjectId +- `name_raw`, `brand_name_raw` - Raw values from feed +- `price_rec`, `price_med` - Current prices +- `is_in_stock`, `stock_status` - Availability +- `consecutive_misses` - OOS detection counter +- `last_seen_at` - Last time product was in feed + +### store_product_snapshots +Point-in-time records for historical analysis: +- One row per product per crawl +- Captures price, stock, potency at that moment +- Used for price history, analytics + +### dispensaries +Store metadata: +- `platform_dispensary_id` - MongoDB ObjectId for GraphQL +- `menu_url` - Source URL +- `last_crawl_at` - Last successful crawl +- `crawl_enabled` - Whether to crawl + +--- + +## Scheduling + +Crawls are scheduled via `worker_tasks` table: + +| Role | Frequency | Description | +|------|-----------|-------------| +| `product_resync` | Every 4 hours | Regular product refresh | +| `entry_point_discovery` | On-demand | New store setup | +| `store_discovery` | Daily | Find new stores | + +--- + +## Error Handling + +- **GraphQL errors:** Logged, task marked failed, retried later +- **Normalization errors:** Logged as warnings, continue with valid products +- **Image download errors:** Non-fatal, logged, continue +- **Database errors:** Task fails, will be retried + +--- + +## Files + +| File | Purpose | +|------|---------| +| `src/tasks/handlers/product-resync.ts` | Main crawl handler | +| `src/tasks/handlers/entry-point-discovery.ts` | Slug β†’ ID resolution | +| `src/platforms/dutchie/index.ts` | GraphQL client, session management | +| `src/hydration/normalizers/dutchie.ts` | Payload normalization | +| `src/hydration/canonical-upsert.ts` | Database upsert logic | +| `migrations/075_consecutive_misses.sql` | OOS tracking column | diff --git a/backend/migrations/075_consecutive_misses.sql b/backend/migrations/075_consecutive_misses.sql new file mode 100644 index 00000000..17f38a49 --- /dev/null +++ b/backend/migrations/075_consecutive_misses.sql @@ -0,0 +1,13 @@ +-- Migration 075: Add consecutive_misses column to store_products +-- Used to track how many consecutive crawls a product has been missing from the feed +-- After 3 consecutive misses, product is marked as OOS + +ALTER TABLE store_products +ADD COLUMN IF NOT EXISTS consecutive_misses INTEGER NOT NULL DEFAULT 0; + +-- Index for finding products that need OOS check +CREATE INDEX IF NOT EXISTS idx_store_products_consecutive_misses +ON store_products (dispensary_id, consecutive_misses) +WHERE consecutive_misses > 0; + +COMMENT ON COLUMN store_products.consecutive_misses IS 'Number of consecutive crawls where product was not in feed. Reset to 0 when seen. At 3, mark OOS.'; diff --git a/backend/src/routes/public-api.ts b/backend/src/routes/public-api.ts index f59392a9..ff22722c 100644 --- a/backend/src/routes/public-api.ts +++ b/backend/src/routes/public-api.ts @@ -430,14 +430,14 @@ router.get('/products', async (req: PublicApiRequest, res: Response) => { // Filter by category if (category) { - whereClause += ` AND LOWER(p.category) = LOWER($${paramIndex})`; + whereClause += ` AND LOWER(p.category_raw) = LOWER($${paramIndex})`; params.push(category); paramIndex++; } // Filter by brand if (brand) { - whereClause += ` AND LOWER(p.brand_name) LIKE LOWER($${paramIndex})`; + whereClause += ` AND LOWER(p.brand_name_raw) LIKE LOWER($${paramIndex})`; params.push(`%${brand}%`); paramIndex++; } @@ -468,7 +468,7 @@ router.get('/products', async (req: PublicApiRequest, res: Response) => { // Search by name or brand if (search) { - whereClause += ` AND (LOWER(p.name) LIKE LOWER($${paramIndex}) OR LOWER(p.brand_name) LIKE LOWER($${paramIndex}))`; + whereClause += ` AND (LOWER(p.name_raw) LIKE LOWER($${paramIndex}) OR LOWER(p.brand_name_raw) LIKE LOWER($${paramIndex}))`; params.push(`%${search}%`); paramIndex++; } @@ -479,10 +479,11 @@ router.get('/products', async (req: PublicApiRequest, res: Response) => { // Build ORDER BY clause (use pricing_type for price sorting) const sortDirection = sort_dir === 'desc' ? 'DESC' : 'ASC'; - let orderBy = 'p.name ASC'; + let orderBy = 'p.name_raw ASC'; switch (sort_by) { case 'price': - const sortPriceCol = pricing_type === 'med' ? 's.price_med' : 's.price_rec'; + // View uses *_cents columns, but we SELECT as price_rec/price_med + const sortPriceCol = pricing_type === 'med' ? 's.med_min_price_cents' : 's.rec_min_price_cents'; orderBy = `${sortPriceCol} ${sortDirection} NULLS LAST`; break; case 'thc': @@ -493,13 +494,14 @@ router.get('/products', async (req: PublicApiRequest, res: Response) => { break; case 'name': default: - orderBy = `p.name ${sortDirection}`; + orderBy = `p.name_raw ${sortDirection}`; } params.push(limitNum, offsetNum); // Determine which price column to use for filtering based on pricing_type - const priceColumn = pricing_type === 'med' ? 's.price_med' : 's.price_rec'; + // View uses *_cents columns, divide by 100 for dollar comparison + const priceColumn = pricing_type === 'med' ? 's.med_min_price_cents / 100.0' : 's.rec_min_price_cents / 100.0'; // Query products with latest snapshot data // Uses store_products + v_product_snapshots (canonical tables with raw_data) @@ -508,10 +510,10 @@ router.get('/products', async (req: PublicApiRequest, res: Response) => { p.id, p.dispensary_id, p.provider_product_id as dutchie_id, - p.name, - p.brand_name as brand, - p.category, - p.subcategory, + p.name_raw as name, + p.brand_name_raw as brand, + p.category_raw as category, + p.subcategory_raw as subcategory, p.strain_type, p.stock_status, p.thc_percent as thc, @@ -519,19 +521,19 @@ router.get('/products', async (req: PublicApiRequest, res: Response) => { p.image_url, p.created_at, p.updated_at, - s.price_rec, - s.price_med, - s.price_rec_special, - s.price_med_special, + s.rec_min_price_cents / 100.0 as price_rec, + s.med_min_price_cents / 100.0 as price_med, + s.rec_min_special_price_cents / 100.0 as price_rec_special, + s.med_min_special_price_cents / 100.0 as price_med_special, s.stock_quantity as total_quantity_available, - s.is_on_special as special, - s.captured_at as snapshot_at, + s.special, + s.crawled_at as snapshot_at, ${include_variants === 'true' || include_variants === '1' ? "s.raw_data->'POSMetaData'->'children' as variants_raw" : 'NULL as variants_raw'} FROM store_products p LEFT JOIN LATERAL ( SELECT * FROM v_product_snapshots WHERE store_product_id = p.id - ORDER BY captured_at DESC + ORDER BY crawled_at DESC LIMIT 1 ) s ON true ${whereClause} @@ -545,9 +547,9 @@ router.get('/products', async (req: PublicApiRequest, res: Response) => { const { rows: countRows } = await pool.query(` SELECT COUNT(*) as total FROM store_products p LEFT JOIN LATERAL ( - SELECT price_rec, price_med, is_on_special FROM v_product_snapshots + SELECT rec_min_price_cents / 100.0 as price_rec, med_min_price_cents / 100.0 as price_med, special as is_on_special FROM v_product_snapshots WHERE store_product_id = p.id - ORDER BY captured_at DESC + ORDER BY crawled_at DESC LIMIT 1 ) s ON true ${whereClause} @@ -1002,22 +1004,27 @@ router.get('/dispensaries', async (req: PublicApiRequest, res: Response) => { SELECT d.id, d.name, - d.address, + d.address1, + d.address2, d.city, d.state, - d.zip, + d.zipcode as zip, d.phone, + d.email, d.website, d.latitude, d.longitude, d.menu_type as platform, d.menu_url, - d.hours, - d.amenities, d.description, - d.image_url, + d.logo_image as image_url, d.google_rating, d.google_review_count, + d.offer_pickup, + d.offer_delivery, + d.offer_curbside_pickup, + d.is_medical, + d.is_recreational, COALESCE(pc.product_count, 0) as product_count, COALESCE(pc.in_stock_count, 0) as in_stock_count, pc.last_updated @@ -1051,11 +1058,13 @@ router.get('/dispensaries', async (req: PublicApiRequest, res: Response) => { dispensaries: [{ id: d.id, name: d.name, - address: d.address, + address1: d.address1, + address2: d.address2, city: d.city, state: d.state, zip: d.zip, phone: d.phone, + email: d.email, website: d.website, menu_url: d.menu_url, location: d.latitude && d.longitude ? { @@ -1063,10 +1072,17 @@ router.get('/dispensaries', async (req: PublicApiRequest, res: Response) => { longitude: parseFloat(d.longitude) } : null, platform: d.platform, - hours: d.hours || null, - amenities: d.amenities || [], description: d.description || null, image_url: d.image_url || null, + services: { + pickup: d.offer_pickup || false, + delivery: d.offer_delivery || false, + curbside: d.offer_curbside_pickup || false + }, + license_type: { + medical: d.is_medical || false, + recreational: d.is_recreational || false + }, rating: d.google_rating ? parseFloat(d.google_rating) : null, review_count: d.google_review_count ? parseInt(d.google_review_count, 10) : null, product_count: parseInt(d.product_count || '0', 10), @@ -1109,22 +1125,27 @@ router.get('/dispensaries', async (req: PublicApiRequest, res: Response) => { SELECT d.id, d.name, - d.address, + d.address1, + d.address2, d.city, d.state, - d.zip, + d.zipcode as zip, d.phone, + d.email, d.website, d.latitude, d.longitude, d.menu_type as platform, d.menu_url, - d.hours, - d.amenities, d.description, - d.image_url, + d.logo_image as image_url, d.google_rating, d.google_review_count, + d.offer_pickup, + d.offer_delivery, + d.offer_curbside_pickup, + d.is_medical, + d.is_recreational, COALESCE(pc.product_count, 0) as product_count, COALESCE(pc.in_stock_count, 0) as in_stock_count, pc.last_updated @@ -1158,11 +1179,13 @@ router.get('/dispensaries', async (req: PublicApiRequest, res: Response) => { const transformedDispensaries = dispensaries.map((d) => ({ id: d.id, name: d.name, - address: d.address, + address1: d.address1, + address2: d.address2, city: d.city, state: d.state, zip: d.zip, phone: d.phone, + email: d.email, website: d.website, menu_url: d.menu_url, location: d.latitude && d.longitude ? { @@ -1170,10 +1193,17 @@ router.get('/dispensaries', async (req: PublicApiRequest, res: Response) => { longitude: parseFloat(d.longitude) } : null, platform: d.platform, - hours: d.hours || null, - amenities: d.amenities || [], description: d.description || null, image_url: d.image_url || null, + services: { + pickup: d.offer_pickup || false, + delivery: d.offer_delivery || false, + curbside: d.offer_curbside_pickup || false + }, + license_type: { + medical: d.is_medical || false, + recreational: d.is_recreational || false + }, rating: d.google_rating ? parseFloat(d.google_rating) : null, review_count: d.google_review_count ? parseInt(d.google_review_count, 10) : null, product_count: parseInt(d.product_count || '0', 10), @@ -1415,8 +1445,8 @@ router.get('/stores/:id/metrics', async (req: PublicApiRequest, res: Response) = COUNT(*) as total_products, COUNT(*) FILTER (WHERE stock_status = 'in_stock') as in_stock, COUNT(*) FILTER (WHERE stock_status = 'out_of_stock') as out_of_stock, - COUNT(DISTINCT brand_name) FILTER (WHERE brand_name IS NOT NULL) as unique_brands, - COUNT(DISTINCT category) FILTER (WHERE category IS NOT NULL) as unique_categories + COUNT(DISTINCT brand_name_raw) FILTER (WHERE brand_name_raw IS NOT NULL) as unique_brands, + COUNT(DISTINCT category_raw) FILTER (WHERE category_raw IS NOT NULL) as unique_categories FROM store_products WHERE dispensary_id = $1 `, [storeId]); @@ -1441,12 +1471,12 @@ router.get('/stores/:id/metrics', async (req: PublicApiRequest, res: Response) = // Get category breakdown const { rows: categoryBreakdown } = await pool.query(` SELECT - COALESCE(category, 'Uncategorized') as category, + COALESCE(category_raw, 'Uncategorized') as category, COUNT(*) as count, COUNT(*) FILTER (WHERE stock_status = 'in_stock') as in_stock FROM store_products WHERE dispensary_id = $1 - GROUP BY category + GROUP BY category_raw ORDER BY count DESC LIMIT 10 `, [storeId]); @@ -1584,9 +1614,9 @@ router.get('/stores/:id/product-metrics', async (req: PublicApiRequest, res: Res ) SELECT sp.id, - sp.name, - sp.brand_name, - sp.category, + sp.name_raw as name, + sp.brand_name_raw as brand_name, + sp.category_raw as category, sp.stock_status, ls.current_price, ls.current_special_price, @@ -1606,7 +1636,7 @@ router.get('/stores/:id/product-metrics', async (req: PublicApiRequest, res: Res ${whereClause} ORDER BY ${sort_by === 'price' ? 'ls.current_price DESC NULLS LAST' : - sort_by === 'stock_status' ? "CASE sp.stock_status WHEN 'out_of_stock' THEN 0 ELSE 1 END, sp.name" : + sort_by === 'stock_status' ? "CASE sp.stock_status WHEN 'out_of_stock' THEN 0 ELSE 1 END, sp.name_raw" : 'ABS(COALESCE(price_change_percent, 0)) DESC'} LIMIT $${paramIndex} `, params); @@ -1719,7 +1749,7 @@ router.get('/stores/:id/competitor-snapshot', async (req: PublicApiRequest, res: // Get this store's average prices by category const { rows: storePrices } = await pool.query(` SELECT - sp.category, + sp.category_raw as category, ROUND(AVG(sps.price_rec)::numeric, 2) as avg_price, COUNT(*) as product_count FROM store_products sp @@ -1729,8 +1759,8 @@ router.get('/stores/:id/competitor-snapshot', async (req: PublicApiRequest, res: WHERE dispensary_id = $1 ORDER BY store_product_id, captured_at DESC ) sps ON sp.id = sps.store_product_id - WHERE sp.dispensary_id = $1 AND sp.category IS NOT NULL AND sps.price_rec > 0 - GROUP BY sp.category + WHERE sp.dispensary_id = $1 AND sp.category_raw IS NOT NULL AND sps.price_rec > 0 + GROUP BY sp.category_raw `, [storeId]); // Get market average prices by category (all competitors) @@ -1740,7 +1770,7 @@ router.get('/stores/:id/competitor-snapshot', async (req: PublicApiRequest, res: if (competitorIds.length > 0) { const { rows } = await pool.query(` SELECT - sp.category, + sp.category_raw as category, ROUND(AVG(sps.price_rec)::numeric, 2) as market_avg_price, COUNT(DISTINCT sp.dispensary_id) as store_count FROM store_products sp @@ -1750,17 +1780,17 @@ router.get('/stores/:id/competitor-snapshot', async (req: PublicApiRequest, res: WHERE dispensary_id = ANY($1) ORDER BY store_product_id, captured_at DESC ) sps ON sp.id = sps.store_product_id - WHERE sp.dispensary_id = ANY($1) AND sp.category IS NOT NULL AND sps.price_rec > 0 - GROUP BY sp.category + WHERE sp.dispensary_id = ANY($1) AND sp.category_raw IS NOT NULL AND sps.price_rec > 0 + GROUP BY sp.category_raw `, [competitorIds]); marketPrices = rows; } // Get this store's brands const { rows: storeBrands } = await pool.query(` - SELECT DISTINCT brand_name + SELECT DISTINCT brand_name_raw as brand_name FROM store_products - WHERE dispensary_id = $1 AND brand_name IS NOT NULL + WHERE dispensary_id = $1 AND brand_name_raw IS NOT NULL `, [storeId]); const storeBrandSet = new Set(storeBrands.map(b => b.brand_name.toLowerCase())); @@ -1772,13 +1802,13 @@ router.get('/stores/:id/competitor-snapshot', async (req: PublicApiRequest, res: SELECT d.id as competitor_id, d.name as competitor_name, - COUNT(DISTINCT sp.brand_name) as total_brands, - COUNT(DISTINCT sp.brand_name) FILTER ( - WHERE LOWER(sp.brand_name) = ANY($2) + COUNT(DISTINCT sp.brand_name_raw) as total_brands, + COUNT(DISTINCT sp.brand_name_raw) FILTER ( + WHERE LOWER(sp.brand_name_raw) = ANY($2) ) as shared_brands FROM dispensaries d INNER JOIN store_products sp ON sp.dispensary_id = d.id - WHERE d.id = ANY($1) AND sp.brand_name IS NOT NULL + WHERE d.id = ANY($1) AND sp.brand_name_raw IS NOT NULL GROUP BY d.id, d.name `, [competitorIds, Array.from(storeBrandSet)]); brandOverlap = rows; @@ -1835,6 +1865,39 @@ router.get('/stores/:id/competitor-snapshot', async (req: PublicApiRequest, res: } }); +/** + * GET /api/v1/stats + * Get aggregate stats for consumer sites (product count, brand count, dispensary count) + */ +router.get('/stats', async (req: PublicApiRequest, res: Response) => { + try { + // Get aggregate stats across all data + const { rows: stats } = await pool.query(` + SELECT + (SELECT COUNT(*) FROM store_products) as product_count, + (SELECT COUNT(DISTINCT brand_name_raw) FROM store_products WHERE brand_name_raw IS NOT NULL) as brand_count, + (SELECT COUNT(*) FROM dispensaries WHERE crawl_enabled = true AND product_count > 0) as dispensary_count + `); + + const s = stats[0] || {}; + + res.json({ + success: true, + stats: { + products: parseInt(s.product_count || '0', 10), + brands: parseInt(s.brand_count || '0', 10), + dispensaries: parseInt(s.dispensary_count || '0', 10) + } + }); + } catch (error: any) { + console.error('Public API stats error:', error); + res.status(500).json({ + error: 'Failed to fetch stats', + message: error.message + }); + } +}); + /** * GET /api/v1/menu * Get complete menu summary for the authenticated dispensary diff --git a/backend/src/tasks/handlers/product-resync.ts b/backend/src/tasks/handlers/product-resync.ts index b0167397..eef6517b 100644 --- a/backend/src/tasks/handlers/product-resync.ts +++ b/backend/src/tasks/handlers/product-resync.ts @@ -1,12 +1,35 @@ /** * Product Resync Handler * - * Re-crawls a store that already has products to capture price/stock changes. - * Uses the scraper-v2 engine for crawling. + * Re-crawls a store to capture price/stock changes using the GraphQL pipeline. + * + * Flow: + * 1. Load dispensary info from database + * 2. Start stealth session (fingerprint + optional proxy) + * 3. Fetch products via GraphQL (Status: 'All') + * 4. Normalize data via DutchieNormalizer + * 5. Upsert to store_products and store_product_snapshots + * 6. Track missing products (increment consecutive_misses, mark OOS at 3) + * 7. Download new product images + * 8. End session */ import { TaskContext, TaskResult } from '../task-worker'; -import { scrapeStore } from '../../scraper-v2'; +import { + executeGraphQL, + startSession, + endSession, + GRAPHQL_HASHES, + DUTCHIE_CONFIG, +} from '../../platforms/dutchie'; +import { DutchieNormalizer } from '../../hydration/normalizers/dutchie'; +import { + upsertStoreProducts, + createStoreProductSnapshots, + downloadProductImages, +} from '../../hydration/canonical-upsert'; + +const normalizer = new DutchieNormalizer(); export async function handleProductResync(ctx: TaskContext): Promise { const { pool, task } = ctx; @@ -17,9 +40,12 @@ export async function handleProductResync(ctx: TaskContext): Promise } try { - // Get dispensary info + // ============================================================ + // STEP 1: Load dispensary info + // ============================================================ const dispResult = await pool.query(` - SELECT id, name, platform_dispensary_id, menu_url + SELECT + id, name, platform_dispensary_id, menu_url, menu_type, city, state FROM dispensaries WHERE id = $1 AND crawl_enabled = true `, [dispensaryId]); @@ -35,21 +61,264 @@ export async function handleProductResync(ctx: TaskContext): Promise return { success: false, error: `Dispensary ${dispensaryId} has no platform_dispensary_id` }; } - console.log(`[ProductResync] Crawling ${dispensary.name} (${dispensaryId})`); + // Extract cName from menu_url + const cNameMatch = dispensary.menu_url?.match(/\/(?:embedded-menu|dispensary)\/([^/?]+)/); + const cName = cNameMatch ? cNameMatch[1] : 'dispensary'; + + console.log(`[ProductResync] Starting crawl for ${dispensary.name} (ID: ${dispensaryId})`); + console.log(`[ProductResync] Platform ID: ${platformId}, cName: ${cName}`); + + // ============================================================ + // STEP 2: Start stealth session + // ============================================================ + const session = startSession(dispensary.state || 'AZ', 'America/Phoenix'); + console.log(`[ProductResync] Session started: ${session.sessionId}`); - // Send heartbeat before long operation await ctx.heartbeat(); - // Use scraper-v2 scrapeStore function - await scrapeStore(dispensaryId); + // ============================================================ + // STEP 3: Fetch products via GraphQL (Status: 'All') + // ============================================================ + const allProducts: any[] = []; + let page = 0; + let totalCount = 0; + const perPage = DUTCHIE_CONFIG.perPage; + const maxPages = DUTCHIE_CONFIG.maxPages; + + try { + while (page < maxPages) { + const variables = { + includeEnterpriseSpecials: false, + productsFilter: { + dispensaryId: platformId, + pricingType: 'rec', + Status: 'All', + types: [], + useCache: false, + isDefaultSort: true, + sortBy: 'popularSortIdx', + sortDirection: 1, + bypassOnlineThresholds: true, + isKioskMenu: false, + removeProductsBelowOptionThresholds: false, + }, + page, + perPage, + }; + + console.log(`[ProductResync] Fetching page ${page + 1}...`); + + const result = await executeGraphQL( + 'FilteredProducts', + variables, + GRAPHQL_HASHES.FilteredProducts, + { cName, maxRetries: 3 } + ); + + const data = result?.data?.filteredProducts; + if (!data || !data.products) { + if (page === 0) { + throw new Error('No product data returned from GraphQL'); + } + break; + } + + const products = data.products; + allProducts.push(...products); + + if (page === 0) { + totalCount = data.queryInfo?.totalCount || products.length; + console.log(`[ProductResync] Total products reported: ${totalCount}`); + } + + if (allProducts.length >= totalCount || products.length < perPage) { + break; + } + + page++; + + if (page < maxPages) { + await new Promise(r => setTimeout(r, DUTCHIE_CONFIG.pageDelayMs)); + } + + if (page % 5 === 0) { + await ctx.heartbeat(); + } + } + + console.log(`[ProductResync] Fetched ${allProducts.length} products in ${page + 1} pages`); + + } finally { + endSession(); + } + + if (allProducts.length === 0) { + return { + success: false, + error: 'No products returned from GraphQL', + productsProcessed: 0, + }; + } - // Heartbeat again await ctx.heartbeat(); - // Update dispensary last_crawled_at + // ============================================================ + // STEP 4: Normalize data + // ============================================================ + console.log(`[ProductResync] Normalizing ${allProducts.length} products...`); + + // Build RawPayload for the normalizer + const rawPayload = { + id: `resync-${dispensaryId}-${Date.now()}`, + dispensary_id: dispensaryId, + crawl_run_id: null, + platform: 'dutchie', + payload_version: 1, + raw_json: { data: { filteredProducts: { products: allProducts } } }, + product_count: allProducts.length, + pricing_type: 'dual', + crawl_mode: 'dual_mode', + fetched_at: new Date(), + processed: false, + normalized_at: null, + hydration_error: null, + hydration_attempts: 0, + created_at: new Date(), + }; + + const normalizationResult = normalizer.normalize(rawPayload); + + if (normalizationResult.errors.length > 0) { + console.warn(`[ProductResync] Normalization warnings: ${normalizationResult.errors.map(e => e.message).join(', ')}`); + } + + if (normalizationResult.products.length === 0) { + return { + success: false, + error: 'Normalization produced no products', + productsProcessed: 0, + }; + } + + console.log(`[ProductResync] Normalized ${normalizationResult.products.length} products`); + + await ctx.heartbeat(); + + // ============================================================ + // STEP 5: Upsert to canonical tables + // ============================================================ + console.log(`[ProductResync] Upserting to store_products...`); + + const upsertResult = await upsertStoreProducts( + pool, + normalizationResult.products, + normalizationResult.pricing, + normalizationResult.availability + ); + + console.log(`[ProductResync] Upserted: ${upsertResult.upserted} (${upsertResult.new} new, ${upsertResult.updated} updated)`); + + await ctx.heartbeat(); + + // Create snapshots + console.log(`[ProductResync] Creating snapshots...`); + + const snapshotsResult = await createStoreProductSnapshots( + pool, + dispensaryId, + normalizationResult.products, + normalizationResult.pricing, + normalizationResult.availability, + null // No crawl_run_id in new system + ); + + console.log(`[ProductResync] Created ${snapshotsResult.created} snapshots`); + + await ctx.heartbeat(); + + // ============================================================ + // STEP 6: Track missing products (consecutive_misses logic) + // - Products in feed: reset consecutive_misses to 0 + // - Products not in feed: increment consecutive_misses + // - At 3 consecutive misses: mark as OOS + // ============================================================ + const currentProductIds = allProducts + .map((p: any) => p._id || p.id) + .filter(Boolean); + + // Reset consecutive_misses for products that ARE in the feed + if (currentProductIds.length > 0) { + await pool.query(` + UPDATE store_products + SET consecutive_misses = 0, last_seen_at = NOW() + WHERE dispensary_id = $1 + AND provider = 'dutchie' + AND provider_product_id = ANY($2) + `, [dispensaryId, currentProductIds]); + } + + // Increment consecutive_misses for products NOT in the feed + const incrementResult = await pool.query(` + UPDATE store_products + SET consecutive_misses = consecutive_misses + 1 + WHERE dispensary_id = $1 + AND provider = 'dutchie' + AND provider_product_id NOT IN (SELECT unnest($2::text[])) + AND consecutive_misses < 3 + RETURNING id + `, [dispensaryId, currentProductIds]); + + const incrementedCount = incrementResult.rowCount || 0; + if (incrementedCount > 0) { + console.log(`[ProductResync] Incremented consecutive_misses for ${incrementedCount} products`); + } + + // Mark as OOS any products that hit 3 consecutive misses + const oosResult = await pool.query(` + UPDATE store_products + SET stock_status = 'oos', is_in_stock = false + WHERE dispensary_id = $1 + AND provider = 'dutchie' + AND consecutive_misses >= 3 + AND stock_status != 'oos' + RETURNING id + `, [dispensaryId]); + + const markedOosCount = oosResult.rowCount || 0; + if (markedOosCount > 0) { + console.log(`[ProductResync] Marked ${markedOosCount} products as OOS (3+ consecutive misses)`); + } + + await ctx.heartbeat(); + + // ============================================================ + // STEP 7: Download images for new products + // ============================================================ + if (upsertResult.productsNeedingImages.length > 0) { + console.log(`[ProductResync] Downloading images for ${upsertResult.productsNeedingImages.length} products...`); + + try { + const dispensaryContext = { + stateCode: dispensary.state || 'AZ', + storeSlug: cName, + }; + await downloadProductImages( + pool, + upsertResult.productsNeedingImages, + dispensaryContext + ); + } catch (imgError: any) { + // Image download errors shouldn't fail the whole task + console.warn(`[ProductResync] Image download error (non-fatal): ${imgError.message}`); + } + } + + // ============================================================ + // STEP 8: Update dispensary last_crawl_at + // ============================================================ await pool.query(` UPDATE dispensaries - SET last_crawled_at = NOW() + SET last_crawl_at = NOW() WHERE id = $1 `, [dispensaryId]); @@ -57,7 +326,13 @@ export async function handleProductResync(ctx: TaskContext): Promise return { success: true, + productsProcessed: normalizationResult.products.length, + snapshotsCreated: snapshotsResult.created, + newProducts: upsertResult.new, + updatedProducts: upsertResult.updated, + markedOos: markedOosCount, }; + } catch (error: unknown) { const errorMessage = error instanceof Error ? error.message : 'Unknown error'; console.error(`[ProductResync] Error for dispensary ${dispensaryId}:`, errorMessage);