Compare commits

..

74 Commits

Author SHA1 Message Date
Kelly
80f048ad57 feat(k8s): Add StatefulSet for persistent workers
- Add scraper-worker-statefulset.yaml with 8 persistent pods
- updateStrategy: OnDelete prevents automatic restarts
- Workers maintain stable identity across restarts
- Document worker architecture in CLAUDE.md
- Add worker registry API endpoint documentation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 23:37:28 -07:00
kelly
2ed088b4d8 Merge pull request 'feat(api): Add preflight columns to worker registry API response' (#50) from feat/preflight-api-fields into master 2025-12-12 06:22:06 +00:00
Kelly
d3c49fa246 feat(api): Add preflight columns to worker registry API response
Exposes curl_ip, http_ip, preflight_status, preflight_at, and fingerprint_data
in the /api/worker-registry/workers response.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 23:12:55 -07:00
kelly
52cb5014fd Merge pull request 'feat: Dual-transport preflight system for worker fingerprinting' (#49) from fix/ci-and-preflight-enforcement into master 2025-12-12 06:08:13 +00:00
Kelly
50654be910 fix: Restore hydration and product_refresh for store updates
- Moved hydration module back from _deprecated (needed for product_refresh)
- Restored product_refresh handler for processing stored payloads
- Restored geolocation service for findadispo/findagram
- Stubbed system routes that depend on deprecated SyncOrchestrator
- Removed crawler-sandbox route (deprecated)
- Fixed all TypeScript compilation errors

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 23:03:39 -07:00
Kelly
cdab71a1ee feat(workers): Add dual-transport preflight system
Workers now run both curl and http (Puppeteer) preflights on startup:
- curl-preflight.ts: Tests axios + proxy via httpbin.org
- puppeteer-preflight.ts: Tests browser + StealthPlugin via fingerprint.com
  (with amiunique.org fallback)
- Migration 084: Adds preflight columns to worker_registry and method
  column to worker_tasks
- Workers report preflight status, IP, fingerprint, and response time
- Tasks can require specific transport method (curl/http)
- Dashboard shows Transport column with preflight status badges

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 22:47:52 -07:00
Kelly
a35976b9e9 chore: Clean up deprecated code and docs
- Move deprecated directories to src/_deprecated/:
  - hydration/ (old pipeline approach)
  - scraper-v2/ (old Puppeteer scraper)
  - canonical-hydration/ (merged into tasks)
  - Unused services: availability, crawler-logger, geolocation, etc
  - Unused utils: age-gate-playwright, HomepageValidator, stealthBrowser

- Archive outdated docs to docs/_archive/:
  - ANALYTICS_RUNBOOK.md
  - ANALYTICS_V2_EXAMPLES.md
  - BRAND_INTELLIGENCE_API.md
  - CRAWL_PIPELINE.md
  - TASK_WORKFLOW_2024-12-10.md
  - WORKER_TASK_ARCHITECTURE.md
  - ORGANIC_SCRAPING_GUIDE.md

- Add docs/CODEBASE_MAP.md as single source of truth
- Add warning files to deprecated/archived directories
- Slim down CLAUDE.md to essential rules only

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 22:17:40 -07:00
kelly
c68210c485 Merge pull request 'fix(ci): Remove buildx cache and add preflight enforcement' (#48) from fix/ci-and-preflight-enforcement into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/48
2025-12-12 04:53:20 +00:00
Kelly
f2864bd2ad fix(ci): Remove buildx cache and add preflight enforcement
- Remove cache_from/cache_to from CI (plugin bug splitting commas)
- Add preflight() method to CrawlRotator - tests proxy + anti-detect
- Add pre-task preflight check - workers MUST pass before executing
- Add releaseTask() to release tasks back to pending on preflight fail
- Rename proxy_test task to whoami for clarity

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 21:37:22 -07:00
kelly
eca9e85242 Merge pull request 'fix(ci): Fix buildx cache syntax and add proxy_test task' (#47) from feat/ui-polish-and-ci-caching into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/47
2025-12-12 04:30:21 +00:00
Kelly
3f958fbff3 fix(ci): Fix buildx cache_from syntax for array format
Plugin was splitting comma-separated values incorrectly.
Use array format with quoted strings instead.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 21:10:26 -07:00
Kelly
c84ef0396b feat(tasks): Add proxy_test task handler and discovery run tracking
- Add proxy_test task handler that fetches IP via proxy to verify connectivity
- Add discovery_runs migration (083) for tracking store discovery progress
- Register proxy_test in task service and worker

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 21:07:58 -07:00
kelly
e1c67dcee5 Merge pull request 'feat: UI polish and CI caching improvements' (#46) from feat/ui-polish-and-ci-caching into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/46
2025-12-12 03:47:46 +00:00
kelly
34c8a8cc67 Merge pull request 'feat(cannaiq): Add clickable logo, favicon, and remove state selector' (#45) from feat/cannaiq-ui-polish into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/45
2025-12-12 03:36:04 +00:00
Kelly
6cd1f55119 fix(workers): Preserve fantasy names on pod restart
- Re-registration no longer overwrites pod_name with K8s name
- New workers get fantasy name (Aethelgard, Xylos, etc.) as pod_name
- Document worker naming convention in CLAUDE.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 20:35:25 -07:00
Kelly
e918234928 feat(ci): Add npm cache volume for faster typechecks
- Create PVC for shared npm cache across CI jobs
- Configure Woodpecker agent to allow npm-cache volume mount
- Update typecheck steps to use shared cache directory
- First run populates cache, subsequent runs are ~3-4x faster

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 19:27:32 -07:00
Kelly
888a608485 feat(cannaiq): Add clickable logo, favicon, and remove state selector
- Make CannaIQ logo clickable to return to dashboard (sidebar + mobile header)
- Add custom favicon matching the logo design
- Remove state selector dropdown from sidebar navigation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 19:24:21 -07:00
kelly
b5c3b05246 Merge pull request 'fix(workers): Fix false memory backoff and add backing-off color coding' (#44) from fix/worker-memory-backoff into master 2025-12-12 02:13:51 +00:00
Kelly
fdce5e0302 fix(workers): Fix false memory backoff and add backing-off color coding
- Fix memory calculation to use max-old-space-size (1500MB) instead of
  V8's dynamic heapTotal. This prevents false 95%+ readings when idle.
- Add yellow color for backing-off workers in pod visualization
- Update legend and tooltips with backing-off status
- Remove pool toggle from TasksDashboard (moved to Workers page)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 19:11:42 -07:00
Kelly
4679b245de perf(ci): Enable Docker layer caching for faster builds
Add cache_from and cache_to settings to all docker-buildx steps.
Uses registry-based caching to avoid rebuilding npm install layer
when package.json hasn't changed.

Expected improvement: 14min backend build → ~3-4min on cache hit.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 18:43:43 -07:00
kelly
a837070f54 Merge pull request 'refactor(admin): Consolidate JobQueue into TasksDashboard + CI worker resilience' (#43) from fix/ci-worker-resilience into master 2025-12-12 01:21:28 +00:00
Kelly
5a929e9803 refactor(admin): Consolidate JobQueue into TasksDashboard
- Move Create Task modal from JobQueue to TasksDashboard
- Add pagination to TasksDashboard (25 tasks per page)
- Add delete action for failed/completed/pending tasks
- Remove JobQueue page and route
- Rename nav item from "Task Queue" to "Tasks"

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 17:41:21 -07:00
kelly
52b0fad410 Merge pull request 'ci: Add worker resilience check to deploy step' (#42) from fix/ci-worker-resilience into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/42
2025-12-12 00:09:48 +00:00
Kelly
9944031eea ci: Add worker resilience check to deploy step
If workers are scaled to 0, CI will now automatically scale them to 5
before updating the image. This prevents workers from being stuck at 0
if manually scaled down for maintenance.

The check only scales up if replicas=0, so it won't interfere with
normal deployments or HPA scaling.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 16:40:53 -07:00
kelly
2babaa7136 Merge pull request 'ci: Remove explicit migration step from deploy' (#41) from fix/ci-remove-migration-step into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/41
2025-12-11 23:11:48 +00:00
Kelly
90567511dd ci: Remove explicit migration step from deploy
Auto-migrate runs at server startup and handles migration errors gracefully.
The explicit kubectl exec migration step was failing due to trigger
already existing (schema_migrations table out of sync).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:56:47 -07:00
kelly
beb16ad0cb Merge pull request 'ci: Run migrations via kubectl exec instead of separate step' (#40) from fix/ci-migrate-via-kubectl into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/40
2025-12-11 22:38:21 +00:00
Kelly
fc7fc5ea85 ci: Run migrations via kubectl exec instead of separate step
Removes the migrate step that required db_* secrets (which CI can't
access since postgres is cluster-internal). Instead, run migrations
via kubectl exec on the deployed scraper pod, which already has DB
access via its env vars.

Deploy order:
1. Deploy scraper image
2. Wait for rollout
3. Run migrations via kubectl exec
4. Deploy remaining services

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:16:12 -07:00
kelly
ab8956b14b Merge pull request 'fix: Revert CI event array syntax to single value' (#39) from fix/ci-event-syntax into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/39
2025-12-11 22:10:03 +00:00
Kelly
1d9c90641f fix: Revert event array syntax to single value
The [push, manual] array syntax broke CI config parsing.
Reverting to event: push which is known to work.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 14:48:34 -07:00
kelly
6126b907f2 Merge pull request 'ci: Support manual pipeline events for deploy' (#38) from ci/support-manual-pipelines into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/38
2025-12-11 21:12:26 +00:00
Kelly
cc93d2d483 ci: Support manual pipeline events for deploy
Allow deploy steps to run on both push and manual events.
This enables triggering deploys via `woodpecker-cli pipeline create`.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 13:54:04 -07:00
kelly
7642c17ec0 Merge pull request 'ci: Fix pipeline config path' (#37) from fix/ci-config-path into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/37
2025-12-11 20:01:30 +00:00
Kelly
cb60dcf352 ci: Add root-level woodpecker config for CI detection 2025-12-11 12:37:35 -07:00
kelly
5ffe05d519 Merge pull request 'feat: Concurrent task processing with resource-based backoff' (#36) from feat/worker-scaling into master 2025-12-11 18:49:01 +00:00
Kelly
8e2f07c941 feat(workers): Concurrent task processing with resource-based backoff
Workers can now process multiple tasks concurrently (default: 3 max).
Self-regulate based on resource usage - back off at 85% memory or 90% CPU.

Backend changes:
- TaskWorker handles concurrent tasks using async Maps
- Resource monitoring (memory %, CPU %) with backoff logic
- Heartbeat reports active_task_count, max_concurrent_tasks, resource stats
- Decommission support via worker_commands table

Frontend changes:
- Workers Dashboard shows tasks per worker (N/M format)
- Resource badges with color-coded thresholds
- Pod visualization with clickable selection
- Decommission controls per worker

New env vars:
- MAX_CONCURRENT_TASKS (default: 3)
- MEMORY_BACKOFF_THRESHOLD (default: 0.85)
- CPU_BACKOFF_THRESHOLD (default: 0.90)
- BACKOFF_DURATION_MS (default: 10000)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 11:47:24 -07:00
kelly
0b6e615075 Merge pull request 'fix: Use React Router Link to prevent scroll reset' (#35) from feat/worker-scaling into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/35
2025-12-11 17:14:25 +00:00
Kelly
be251c6fb3 fix: Use React Router Link for nav to prevent scroll reset
Changed sidebar NavLink from <a> to <Link> for client-side navigation.
This prevents full page reload and scroll position reset.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 09:59:34 -07:00
kelly
efb1e89e33 Merge pull request 'fix: Show only git SHA in header' (#34) from feat/worker-scaling into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/34
2025-12-11 16:56:04 +00:00
Kelly
529c447413 refactor: Move worker scaling to Workers page with password confirmation
- Worker scaling controls now on /workers page only (removed from /tasks)
- Password confirmation required before scaling
- Show only git SHA in header (removed version number)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 09:41:58 -07:00
Kelly
1eaf95c06b fix: Show only git SHA in header, remove version number
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 09:35:41 -07:00
kelly
138ed17d8b Merge pull request 'feat: Worker scaling from admin UI with password confirmation' (#33) from feat/worker-scaling into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/33
2025-12-11 16:29:04 +00:00
Kelly
a880c41d89 feat: Add password confirmation for worker scaling + RBAC
- Add /api/auth/verify-password endpoint for re-authentication
- Add PasswordConfirmModal component for sensitive actions
- Worker scaling (+/-) now requires password confirmation
- Add RBAC (ServiceAccount, Role, RoleBinding) for scraper pod
- Scraper pod can now read/scale worker deployment via k8s API

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 09:16:27 -07:00
Kelly
2a9ae61dce fix(ui): Make Tasks Dashboard header sticky
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 09:04:21 -07:00
kelly
1f21911fa1 Merge pull request 'feat(admin): Worker scaling controls via k8s API' (#32) from feat/worker-scaling into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/32
2025-12-11 16:01:25 +00:00
Kelly
6f0a58f5d2 fix(k8s): Correct API call signatures for k8s client v1.4
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 08:47:27 -07:00
Kelly
8206dce821 feat(admin): Worker scaling controls via k8s API
- Add /api/k8s/workers endpoint to get deployment status
- Add /api/k8s/workers/scale endpoint to scale replicas (0-50)
- Add worker scaling UI to Tasks Dashboard (+/- 5 workers)
- Shows ready/desired replica count
- Uses in-cluster config in k8s, kubeconfig locally

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 08:24:32 -07:00
kelly
ced1afaa8a Merge pull request 'fix(ci): CI config fix + 25 workers + pool starts paused' (#31) from fix/ci-filename into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/31
2025-12-11 08:47:26 +00:00
Kelly
d6c602c567 fix(ui): Remove Cleanup Stale button from Workers page
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 01:29:25 -07:00
Kelly
a252a7fefd feat(tasks): 25 workers, pool starts paused by default
- Increase worker replicas from 5 to 25
- Task pool now starts PAUSED on deploy, admin must click Start Pool
- Prevents workers from grabbing tasks before system is ready

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 01:19:02 -07:00
Kelly
83b06c21cc fix(tasks): Stop spinner in status cards when pool is paused
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 01:14:25 -07:00
kelly
f5214da54c Merge pull request 'fix(ci): Fix Woodpecker config - remove invalid top-level when' (#30) from fix/ci-filename into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/30
2025-12-11 08:07:16 +00:00
Kelly
e3d4dd0127 fix(ci): Fix Woodpecker config - remove invalid top-level when
🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 00:45:08 -07:00
kelly
d0ee0d72f5 Merge pull request 'feat(tasks): Add task pool start/stop toggle' (#29) from feat/task-pool-toggle into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/29
2025-12-11 07:21:02 +00:00
kelly
521f0550cd Merge pull request 'feat: Admin UI cleanup and dispensary schedule page' (#28) from fix/worker-proxy-wait into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/28
2025-12-11 07:08:03 +00:00
Kelly
8a09691e91 feat(tasks): Add task pool start/stop toggle
- Add task-pool-state.ts for shared pause/resume state
- Add /api/tasks/pool/status, pause, resume endpoints
- Add Start/Stop Pool toggle button to TasksDashboard
- Spinner stops when pool is closed
- Fix is_active column name in store-discovery.ts
- Fix missing active column in task-service.ts claimTask
- Auto-refresh every 15 seconds

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 00:07:14 -07:00
Kelly
459ad7d9c9 fix(tasks): Fix missing column errors in task queries
- Change 'active' to 'is_active' in states table query (store-discovery.ts)
- Remove non-existent 'active' column check from worker_tasks query (task-service.ts)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:54:28 -07:00
Kelly
d102d27731 feat(admin): Dispensary schedule page and UI cleanup
- Add DispensarySchedule page showing crawl history and upcoming schedule
- Add /dispensaries/:state/:city/:slug/schedule route
- Add API endpoint for store crawl history
- Update View Schedule link to use dispensary-specific route
- Remove colored badges from DispensaryDetail product table (plain text)
- Make Details button ghost style in product table
- Add "Sort by States" option to IntelligenceBrands
- Remove status filter dropdown from Dispensaries page

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:50:47 -07:00
kelly
01810c40a1 Merge pull request 'fix(worker): Wait for proxies instead of crashing' (#27) from fix/worker-proxy-wait into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/27
2025-12-11 06:43:29 +00:00
Kelly
b7d33e1cbf fix(admin): Clean up store detail and intelligence pages
- Remove Update dropdown from DispensaryDetail page
- Remove Crawl Now button from StoreDetailPage
- Change "Last Crawl" to "Last Updated" on both detail pages
- Tone down emerald colors on StoreDetailPage (use gray borders/tabs)
- Simplify THC/CBD/Stock badges to plain text
- Remove duplicate state dropdown from IntelligenceStores filters
- Make store rows clickable in IntelligenceStores

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:42:50 -07:00
Kelly
5b34b5a78c fix(admin): Consistent navigation across Intelligence pages
- Add state selector dropdown to all three Intelligence pages (Brands, Stores, Pricing)
- Use consistent emerald-styled page navigation badges with current page highlighted
- Remove Refresh buttons from all Intelligence pages
- Update chart styling to use emerald gradient bars (matching Pricing page)
- Load all available states from orchestrator API instead of extracting from local data
- Fix z-index and styling on state dropdown for better visibility

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:32:57 -07:00
Kelly
c091d2316b fix(dashboard): Remove refresh button and HealthPanel
- Removed refresh button and refreshing state from Dashboard
- Removed HealthPanel component (deploy status auto-refresh)
- Simplified header layout

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:28:18 -07:00
Kelly
e8862b8a8b fix(national): Remove Refresh Metrics button
Removed unused refresh button and related state/handlers from
National Dashboard.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:27:01 -07:00
Kelly
1b46ab699d fix(national): Show all states count, not filtered "active" states
The "Active States" metric was arbitrary and confusing. Changed to
show total states count - all states in the system regardless of
whether they have data or not.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:25:50 -07:00
Kelly
ac1995f63f fix(pricing): Simplify category chart to prevent overflow
- Replace complex price range bars with simple horizontal bars
- Use overflow-hidden to prevent bars extending beyond container
- Calculate bar width as percentage of max avg price
- Limit to top 12 categories for cleaner display
- Fixed-width labels prevent layout shift

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:24:31 -07:00
Kelly
de93669652 fix(national): Count active states by product data, not crawl status
Active states should count states with actual product data, not just
states where crawling is enabled. A state can have historical data
even if crawling is currently disabled.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:23:20 -07:00
Kelly
dffc124920 fix(national): Fix active states count and remove StateBadge
- Change active_states to count states with crawl_enabled=true dispensaries
- Filter all national summary queries by crawl_enabled=true
- Remove unused StateBadge from National Dashboard header
- StateBadge was showing "Arizona" with no way to change it

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:22:19 -07:00
Kelly
932ceb0287 feat(intelligence): Add state filter to all Intelligence pages
- Add state filter to Intelligence Brands API and frontend
- Add state filter to Intelligence Pricing API and frontend
- Add state filter to Intelligence Stores API and frontend
- Fix null safety issues with toLocaleString() calls
- Update backend /stores endpoint to return skuCount, snapshotCount, chainName
- Add overall stats to pricing endpoint

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:19:54 -07:00
Kelly
824d48fd85 fix: Add curl to Docker, add active flag to worker_tasks
- Install curl in Docker container for Dutchie HTTP requests
- Add 'active' column to worker_tasks (default false) to prevent
  accidental task execution on startup
- Update task-service to only claim tasks where active=true

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:12:09 -07:00
Kelly
47fdab0382 fix: Filter orchestrator states by crawl_enabled
The states dropdown was showing count of ALL dispensaries instead of
just crawl-enabled ones. Now correctly filters to match the actual
stores that would be displayed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:09:04 -07:00
Kelly
ed7ddc6375 ci: Add database migration step to deploy pipeline
Migrations now run automatically before deployments:
1. Build new Docker image
2. Run migrations using the new image
3. Deploy to Kubernetes

Requires new secrets: db_host, db_port, db_name, db_user, db_pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 23:07:25 -07:00
Kelly
cf06f4a8c0 feat(worker): Listen for proxy_added notifications
- Workers now use PostgreSQL LISTEN/NOTIFY to wake up immediately when proxies are added
- Added trigger on proxies table to NOTIFY 'proxy_added' when active proxy inserted/updated
- Falls back to 30s polling if LISTEN fails

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 22:58:00 -07:00
kelly
61e915968f Merge pull request 'feat(tasks): Refactor task workflow with payload/refresh separation' (#26) from feat/task-workflow-refactor into master 2025-12-11 05:24:11 +00:00
kelly
a4338669a9 Merge pull request 'fix(auth): Prioritize JWT token over trusted origin bypass' (#24) from fix/auth-token-priority into master
Reviewed-on: https://code.cannabrands.app/Creationshop/dispensary-scraper/pulls/24
2025-12-11 01:34:10 +00:00
110 changed files with 7097 additions and 3712 deletions

View File

@@ -1,6 +1,3 @@
when:
- event: [push, pull_request]
steps:
# ===========================================
# PR VALIDATION: Parallel type checks (PRs only)
@@ -72,6 +69,7 @@ steps:
# ===========================================
# MASTER DEPLOY: Parallel Docker builds
# NOTE: cache_from/cache_to removed due to plugin bug splitting on commas
# ===========================================
docker-backend:
image: woodpeckerci/plugin-docker-buildx
@@ -163,7 +161,7 @@ steps:
event: push
# ===========================================
# STAGE 3: Deploy (after Docker builds)
# STAGE 3: Deploy and Run Migrations
# ===========================================
deploy:
image: bitnami/kubectl:latest
@@ -174,12 +172,17 @@ steps:
- mkdir -p ~/.kube
- echo "$KUBECONFIG_CONTENT" | tr -d '[:space:]' | base64 -d > ~/.kube/config
- chmod 600 ~/.kube/config
# Deploy backend first
- kubectl set image deployment/scraper scraper=code.cannabrands.app/creationshop/dispensary-scraper:${CI_COMMIT_SHA:0:8} -n dispensary-scraper
- kubectl rollout status deployment/scraper -n dispensary-scraper --timeout=300s
# Note: Migrations run automatically at startup via auto-migrate
# Deploy remaining services
# Resilience: ensure workers are scaled up if at 0
- REPLICAS=$(kubectl get deployment scraper-worker -n dispensary-scraper -o jsonpath='{.spec.replicas}'); if [ "$REPLICAS" = "0" ]; then echo "Scaling workers from 0 to 5"; kubectl scale deployment/scraper-worker --replicas=5 -n dispensary-scraper; fi
- kubectl set image deployment/scraper-worker worker=code.cannabrands.app/creationshop/dispensary-scraper:${CI_COMMIT_SHA:0:8} -n dispensary-scraper
- kubectl set image deployment/cannaiq-frontend cannaiq-frontend=code.cannabrands.app/creationshop/cannaiq-frontend:${CI_COMMIT_SHA:0:8} -n dispensary-scraper
- kubectl set image deployment/findadispo-frontend findadispo-frontend=code.cannabrands.app/creationshop/findadispo-frontend:${CI_COMMIT_SHA:0:8} -n dispensary-scraper
- kubectl set image deployment/findagram-frontend findagram-frontend=code.cannabrands.app/creationshop/findagram-frontend:${CI_COMMIT_SHA:0:8} -n dispensary-scraper
- kubectl rollout status deployment/scraper -n dispensary-scraper --timeout=300s
- kubectl rollout status deployment/cannaiq-frontend -n dispensary-scraper --timeout=120s
depends_on:
- docker-backend

191
.woodpecker/ci.yml Normal file
View File

@@ -0,0 +1,191 @@
steps:
# ===========================================
# PR VALIDATION: Only typecheck changed projects
# ===========================================
typecheck-backend:
image: code.cannabrands.app/creationshop/node:20
commands:
- npm config set cache /npm-cache/backend --global
- cd backend
- npm ci --prefer-offline
- npx tsc --noEmit
volumes:
- npm-cache:/npm-cache
depends_on: []
when:
event: pull_request
path:
include: ['backend/**']
typecheck-cannaiq:
image: code.cannabrands.app/creationshop/node:20
commands:
- npm config set cache /npm-cache/cannaiq --global
- cd cannaiq
- npm ci --prefer-offline
- npx tsc --noEmit
volumes:
- npm-cache:/npm-cache
depends_on: []
when:
event: pull_request
path:
include: ['cannaiq/**']
# findadispo/findagram typechecks skipped - they have || true anyway
# ===========================================
# AUTO-MERGE: Merge PR after all checks pass
# ===========================================
auto-merge:
image: alpine:latest
environment:
GITEA_TOKEN:
from_secret: gitea_token
commands:
- apk add --no-cache curl
- |
echo "Merging PR #${CI_COMMIT_PULL_REQUEST}..."
curl -s -X POST \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d '{"Do":"merge"}' \
"https://code.cannabrands.app/api/v1/repos/Creationshop/dispensary-scraper/pulls/${CI_COMMIT_PULL_REQUEST}/merge"
depends_on:
- typecheck-backend
- typecheck-cannaiq
when:
event: pull_request
# ===========================================
# MASTER DEPLOY: Parallel Docker builds
# ===========================================
docker-backend:
image: woodpeckerci/plugin-docker-buildx
settings:
registry: code.cannabrands.app
repo: code.cannabrands.app/creationshop/dispensary-scraper
tags:
- latest
- ${CI_COMMIT_SHA:0:8}
dockerfile: backend/Dockerfile
context: backend
username:
from_secret: registry_username
password:
from_secret: registry_password
platforms: linux/amd64
provenance: false
cache_from: type=registry,ref=code.cannabrands.app/creationshop/dispensary-scraper:cache
cache_to: type=registry,ref=code.cannabrands.app/creationshop/dispensary-scraper:cache,mode=max
build_args:
APP_BUILD_VERSION: ${CI_COMMIT_SHA:0:8}
APP_GIT_SHA: ${CI_COMMIT_SHA}
APP_BUILD_TIME: ${CI_PIPELINE_CREATED}
CONTAINER_IMAGE_TAG: ${CI_COMMIT_SHA:0:8}
depends_on: []
when:
branch: master
event: push
docker-cannaiq:
image: woodpeckerci/plugin-docker-buildx
settings:
registry: code.cannabrands.app
repo: code.cannabrands.app/creationshop/cannaiq-frontend
tags:
- latest
- ${CI_COMMIT_SHA:0:8}
dockerfile: cannaiq/Dockerfile
context: cannaiq
username:
from_secret: registry_username
password:
from_secret: registry_password
platforms: linux/amd64
provenance: false
cache_from: type=registry,ref=code.cannabrands.app/creationshop/cannaiq-frontend:cache
cache_to: type=registry,ref=code.cannabrands.app/creationshop/cannaiq-frontend:cache,mode=max
depends_on: []
when:
branch: master
event: push
docker-findadispo:
image: woodpeckerci/plugin-docker-buildx
settings:
registry: code.cannabrands.app
repo: code.cannabrands.app/creationshop/findadispo-frontend
tags:
- latest
- ${CI_COMMIT_SHA:0:8}
dockerfile: findadispo/frontend/Dockerfile
context: findadispo/frontend
username:
from_secret: registry_username
password:
from_secret: registry_password
platforms: linux/amd64
provenance: false
cache_from: type=registry,ref=code.cannabrands.app/creationshop/findadispo-frontend:cache
cache_to: type=registry,ref=code.cannabrands.app/creationshop/findadispo-frontend:cache,mode=max
depends_on: []
when:
branch: master
event: push
docker-findagram:
image: woodpeckerci/plugin-docker-buildx
settings:
registry: code.cannabrands.app
repo: code.cannabrands.app/creationshop/findagram-frontend
tags:
- latest
- ${CI_COMMIT_SHA:0:8}
dockerfile: findagram/frontend/Dockerfile
context: findagram/frontend
username:
from_secret: registry_username
password:
from_secret: registry_password
platforms: linux/amd64
provenance: false
cache_from: type=registry,ref=code.cannabrands.app/creationshop/findagram-frontend:cache
cache_to: type=registry,ref=code.cannabrands.app/creationshop/findagram-frontend:cache,mode=max
depends_on: []
when:
branch: master
event: push
# ===========================================
# STAGE 3: Deploy and Run Migrations
# ===========================================
deploy:
image: bitnami/kubectl:latest
environment:
KUBECONFIG_CONTENT:
from_secret: kubeconfig_data
commands:
- mkdir -p ~/.kube
- echo "$KUBECONFIG_CONTENT" | tr -d '[:space:]' | base64 -d > ~/.kube/config
- chmod 600 ~/.kube/config
# Deploy backend first
- kubectl set image deployment/scraper scraper=code.cannabrands.app/creationshop/dispensary-scraper:${CI_COMMIT_SHA:0:8} -n dispensary-scraper
- kubectl rollout status deployment/scraper -n dispensary-scraper --timeout=300s
# Note: Migrations run automatically at startup via auto-migrate
# Deploy remaining services
# Resilience: ensure workers are scaled up if at 0
- REPLICAS=$(kubectl get deployment scraper-worker -n dispensary-scraper -o jsonpath='{.spec.replicas}'); if [ "$REPLICAS" = "0" ]; then echo "Scaling workers from 0 to 5"; kubectl scale deployment/scraper-worker --replicas=5 -n dispensary-scraper; fi
- kubectl set image deployment/scraper-worker worker=code.cannabrands.app/creationshop/dispensary-scraper:${CI_COMMIT_SHA:0:8} -n dispensary-scraper
- kubectl set image deployment/cannaiq-frontend cannaiq-frontend=code.cannabrands.app/creationshop/cannaiq-frontend:${CI_COMMIT_SHA:0:8} -n dispensary-scraper
- kubectl set image deployment/findadispo-frontend findadispo-frontend=code.cannabrands.app/creationshop/findadispo-frontend:${CI_COMMIT_SHA:0:8} -n dispensary-scraper
- kubectl set image deployment/findagram-frontend findagram-frontend=code.cannabrands.app/creationshop/findagram-frontend:${CI_COMMIT_SHA:0:8} -n dispensary-scraper
- kubectl rollout status deployment/cannaiq-frontend -n dispensary-scraper --timeout=120s
depends_on:
- docker-backend
- docker-cannaiq
- docker-findadispo
- docker-findagram
when:
branch: master
event: push

1442
CLAUDE.md

File diff suppressed because it is too large Load Diff

View File

@@ -25,8 +25,9 @@ ENV APP_GIT_SHA=${APP_GIT_SHA}
ENV APP_BUILD_TIME=${APP_BUILD_TIME}
ENV CONTAINER_IMAGE_TAG=${CONTAINER_IMAGE_TAG}
# Install Chromium dependencies
# Install Chromium dependencies and curl for HTTP requests
RUN apt-get update && apt-get install -y \
curl \
chromium \
fonts-liberation \
libnss3 \

View File

@@ -0,0 +1,218 @@
# CannaiQ Backend Codebase Map
**Last Updated:** 2025-12-12
**Purpose:** Help Claude and developers understand which code is current vs deprecated
---
## Quick Reference: What to Use
### For Crawling/Scraping
| Task | Use This | NOT This |
|------|----------|----------|
| Fetch products | `src/tasks/handlers/payload-fetch.ts` | `src/hydration/*` |
| Process products | `src/tasks/handlers/product-refresh.ts` | `src/scraper-v2/*` |
| GraphQL client | `src/platforms/dutchie/client.ts` | `src/dutchie-az/services/graphql-client.ts` |
| Worker system | `src/tasks/task-worker.ts` | `src/dutchie-az/services/worker.ts` |
### For Database
| Task | Use This | NOT This |
|------|----------|----------|
| Get DB pool | `src/db/pool.ts` | `src/dutchie-az/db/connection.ts` |
| Run migrations | `src/db/migrate.ts` (CLI only) | Never import at runtime |
| Query products | `store_products` table | `products`, `dutchie_products` |
| Query stores | `dispensaries` table | `stores` table |
### For Discovery
| Task | Use This |
|------|----------|
| Discover stores | `src/discovery/*.ts` |
| Run discovery | `npx tsx src/scripts/run-discovery.ts` |
---
## Directory Status
### ACTIVE DIRECTORIES (Use These)
```
src/
├── auth/ # JWT/session auth, middleware
├── db/ # Database pool, migrations
├── discovery/ # Dutchie store discovery pipeline
├── middleware/ # Express middleware
├── multi-state/ # Multi-state query support
├── platforms/ # Platform-specific clients (Dutchie, Jane, etc)
│ └── dutchie/ # THE Dutchie client - use this one
├── routes/ # Express API routes
├── services/ # Core services (logger, scheduler, etc)
├── tasks/ # Task system (workers, handlers, scheduler)
│ └── handlers/ # Task handlers (payload_fetch, product_refresh, etc)
├── types/ # TypeScript types
└── utils/ # Utilities (storage, image processing)
```
### DEPRECATED DIRECTORIES (DO NOT USE)
```
src/
├── hydration/ # DEPRECATED - Old pipeline approach
├── scraper-v2/ # DEPRECATED - Old scraper engine
├── canonical-hydration/# DEPRECATED - Merged into tasks/handlers
├── dutchie-az/ # PARTIAL - Some parts deprecated, some active
│ ├── db/ # DEPRECATED - Use src/db/pool.ts
│ └── services/ # PARTIAL - worker.ts still runs, graphql-client.ts deprecated
├── portals/ # FUTURE - Not yet implemented
├── seo/ # PARTIAL - Settings work, templates WIP
└── system/ # DEPRECATED - Old orchestration system
```
### DEPRECATED FILES (DO NOT USE)
```
src/dutchie-az/db/connection.ts # Use src/db/pool.ts instead
src/dutchie-az/services/graphql-client.ts # Use src/platforms/dutchie/client.ts
src/hydration/*.ts # Entire directory deprecated
src/scraper-v2/*.ts # Entire directory deprecated
```
---
## Key Files Reference
### Entry Points
| File | Purpose | Status |
|------|---------|--------|
| `src/index.ts` | Main Express server | ACTIVE |
| `src/dutchie-az/services/worker.ts` | Worker process entry | ACTIVE |
| `src/tasks/task-worker.ts` | Task worker (new system) | ACTIVE |
### Dutchie Integration
| File | Purpose | Status |
|------|---------|--------|
| `src/platforms/dutchie/client.ts` | GraphQL client, hashes, curl | **PRIMARY** |
| `src/platforms/dutchie/queries.ts` | High-level query functions | ACTIVE |
| `src/platforms/dutchie/index.ts` | Re-exports | ACTIVE |
### Task Handlers
| File | Purpose | Status |
|------|---------|--------|
| `src/tasks/handlers/payload-fetch.ts` | Fetch products from Dutchie | **PRIMARY** |
| `src/tasks/handlers/product-refresh.ts` | Process payload into DB | **PRIMARY** |
| `src/tasks/handlers/menu-detection.ts` | Detect menu type | ACTIVE |
| `src/tasks/handlers/id-resolution.ts` | Resolve platform IDs | ACTIVE |
| `src/tasks/handlers/image-download.ts` | Download product images | ACTIVE |
### Database
| File | Purpose | Status |
|------|---------|--------|
| `src/db/pool.ts` | Canonical DB pool | **PRIMARY** |
| `src/db/migrate.ts` | Migration runner (CLI only) | CLI ONLY |
| `src/db/auto-migrate.ts` | Auto-run migrations on startup | ACTIVE |
### Configuration
| File | Purpose | Status |
|------|---------|--------|
| `.env` | Environment variables | ACTIVE |
| `package.json` | Dependencies | ACTIVE |
| `tsconfig.json` | TypeScript config | ACTIVE |
---
## GraphQL Hashes (CRITICAL)
The correct hashes are in `src/platforms/dutchie/client.ts`:
```typescript
export const GRAPHQL_HASHES = {
FilteredProducts: 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0',
GetAddressBasedDispensaryData: '13461f73abf7268770dfd05fe7e10c523084b2bb916a929c08efe3d87531977b',
ConsumerDispensaries: '0a5bfa6ca1d64ae47bcccb7c8077c87147cbc4e6982c17ceec97a2a4948b311b',
GetAllCitiesByState: 'ae547a0466ace5a48f91e55bf6699eacd87e3a42841560f0c0eabed5a0a920e6',
};
```
**ALWAYS** use `Status: 'Active'` for FilteredProducts (not `null` or `'All'`).
---
## Scripts Reference
### Useful Scripts (in `src/scripts/`)
| Script | Purpose |
|--------|---------|
| `run-discovery.ts` | Run Dutchie discovery |
| `crawl-single-store.ts` | Test crawl a single store |
| `test-dutchie-graphql.ts` | Test GraphQL queries |
### One-Off Scripts (probably don't need)
| Script | Purpose |
|--------|---------|
| `harmonize-az-dispensaries.ts` | One-time data cleanup |
| `bootstrap-stores-for-dispensaries.ts` | One-time migration |
| `backfill-*.ts` | Historical backfill scripts |
---
## API Routes
### Active Routes (in `src/routes/`)
| Route File | Mount Point | Purpose |
|------------|-------------|---------|
| `auth.ts` | `/api/auth` | Login/logout/session |
| `stores.ts` | `/api/stores` | Store CRUD |
| `dashboard.ts` | `/api/dashboard` | Dashboard stats |
| `workers.ts` | `/api/workers` | Worker monitoring |
| `pipeline.ts` | `/api/pipeline` | Crawl triggers |
| `discovery.ts` | `/api/discovery` | Discovery management |
| `analytics.ts` | `/api/analytics` | Analytics queries |
| `wordpress.ts` | `/api/v1/wordpress` | WordPress plugin API |
---
## Documentation Files
### Current Docs (in `backend/docs/`)
| Doc | Purpose | Currency |
|-----|---------|----------|
| `TASK_WORKFLOW_2024-12-10.md` | Task system architecture | CURRENT |
| `WORKER_TASK_ARCHITECTURE.md` | Worker/task design | CURRENT |
| `CRAWL_PIPELINE.md` | Crawl pipeline overview | CURRENT |
| `ORGANIC_SCRAPING_GUIDE.md` | Browser-based scraping | CURRENT |
| `CODEBASE_MAP.md` | This file | CURRENT |
| `ANALYTICS_V2_EXAMPLES.md` | Analytics API examples | CURRENT |
| `BRAND_INTELLIGENCE_API.md` | Brand API docs | CURRENT |
### Root Docs
| Doc | Purpose | Currency |
|-----|---------|----------|
| `CLAUDE.md` | Claude instructions | **PRIMARY** |
| `README.md` | Project overview | NEEDS UPDATE |
---
## Common Mistakes to Avoid
1. **Don't use `src/hydration/`** - It's an old approach that was superseded by the task system
2. **Don't use `src/dutchie-az/db/connection.ts`** - Use `src/db/pool.ts` instead
3. **Don't import `src/db/migrate.ts` at runtime** - It will crash. Only use for CLI migrations.
4. **Don't query `stores` table** - It's empty. Use `dispensaries`.
5. **Don't query `products` table** - It's empty. Use `store_products`.
6. **Don't use wrong GraphQL hash** - Always get hash from `GRAPHQL_HASHES` in client.ts
7. **Don't use `Status: null`** - It returns 0 products. Use `Status: 'Active'`.
---
## When in Doubt
1. Check if the file is imported in `src/index.ts` - if not, it may be deprecated
2. Check the last modified date - older files may be stale
3. Look for `DEPRECATED` comments in the code
4. Ask: "Is there a newer version of this in `src/tasks/` or `src/platforms/`?"
5. Read the relevant doc in `docs/` before modifying code

View File

@@ -0,0 +1,297 @@
# Organic Browser-Based Scraping Guide
**Last Updated:** 2025-12-12
**Status:** Production-ready proof of concept
---
## Overview
This document describes the "organic" browser-based approach to scraping Dutchie dispensary menus. Unlike direct curl/axios requests, this method uses a real browser session to make API calls, making requests appear natural and reducing detection risk.
---
## Why Organic Scraping?
| Approach | Detection Risk | Speed | Complexity |
|----------|---------------|-------|------------|
| Direct curl | Higher | Fast | Low |
| curl-impersonate | Medium | Fast | Medium |
| **Browser-based (organic)** | **Lowest** | Slower | Higher |
Direct curl requests can be fingerprinted via:
- TLS fingerprint (cipher suites, extensions)
- Header order and values
- Missing cookies/session data
- Request patterns
Browser-based requests inherit:
- Real Chrome TLS fingerprint
- Session cookies from page visit
- Natural header order
- JavaScript execution environment
---
## Implementation
### Dependencies
```bash
npm install puppeteer puppeteer-extra puppeteer-extra-plugin-stealth
```
### Core Script: `test-intercept.js`
Located at: `backend/test-intercept.js`
```javascript
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
const fs = require('fs');
puppeteer.use(StealthPlugin());
async function capturePayload(config) {
const { dispensaryId, platformId, cName, outputPath } = config;
const browser = await puppeteer.launch({
headless: 'new',
args: ['--no-sandbox', '--disable-setuid-sandbox']
});
const page = await browser.newPage();
// STEP 1: Establish session by visiting the menu
const embedUrl = `https://dutchie.com/embedded-menu/${cName}?menuType=rec`;
await page.goto(embedUrl, { waitUntil: 'networkidle2', timeout: 60000 });
// STEP 2: Fetch ALL products using GraphQL from browser context
const result = await page.evaluate(async (platformId) => {
const allProducts = [];
let pageNum = 0;
const perPage = 100;
let totalCount = 0;
const sessionId = 'browser-session-' + Date.now();
while (pageNum < 30) {
const variables = {
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: platformId,
pricingType: 'rec',
Status: 'Active', // CRITICAL: Must be 'Active', not null
types: [],
useCache: true,
isDefaultSort: true,
sortBy: 'popularSortIdx',
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false,
},
page: pageNum,
perPage: perPage,
};
const extensions = {
persistedQuery: {
version: 1,
sha256Hash: 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0'
}
};
const qs = new URLSearchParams({
operationName: 'FilteredProducts',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions)
});
const response = await fetch(`https://dutchie.com/api-3/graphql?${qs}`, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
'x-dutchie-session': sessionId,
'apollographql-client-name': 'Marketplace (production)',
},
credentials: 'include'
});
const json = await response.json();
const data = json?.data?.filteredProducts;
if (!data?.products) break;
allProducts.push(...data.products);
if (pageNum === 0) totalCount = data.queryInfo?.totalCount || 0;
if (allProducts.length >= totalCount) break;
pageNum++;
await new Promise(r => setTimeout(r, 200)); // Polite delay
}
return { products: allProducts, totalCount };
}, platformId);
await browser.close();
// STEP 3: Save payload
const payload = {
dispensaryId,
platformId,
cName,
fetchedAt: new Date().toISOString(),
productCount: result.products.length,
products: result.products,
};
fs.writeFileSync(outputPath, JSON.stringify(payload, null, 2));
return payload;
}
```
---
## Critical Parameters
### GraphQL Hash (FilteredProducts)
```
ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0
```
**WARNING:** Using the wrong hash returns HTTP 400.
### Status Parameter
| Value | Result |
|-------|--------|
| `'Active'` | Returns in-stock products (1019 in test) |
| `null` | Returns 0 products |
| `'All'` | Returns HTTP 400 |
**ALWAYS use `Status: 'Active'`**
### Required Headers
```javascript
{
'Accept': 'application/json',
'content-type': 'application/json',
'x-dutchie-session': 'unique-session-id',
'apollographql-client-name': 'Marketplace (production)',
}
```
### Endpoint
```
https://dutchie.com/api-3/graphql
```
---
## Performance Benchmarks
Test store: AZ-Deeply-Rooted (1019 products)
| Metric | Value |
|--------|-------|
| Total products | 1019 |
| Time | 18.5 seconds |
| Payload size | 11.8 MB |
| Pages fetched | 11 (100 per page) |
| Success rate | 100% |
---
## Payload Format
The output matches the existing `payload-fetch.ts` handler format:
```json
{
"dispensaryId": 123,
"platformId": "6405ef617056e8014d79101b",
"cName": "AZ-Deeply-Rooted",
"fetchedAt": "2025-12-12T05:05:19.837Z",
"productCount": 1019,
"products": [
{
"id": "6927508db4851262f629a869",
"Name": "Product Name",
"brand": { "name": "Brand Name", ... },
"type": "Flower",
"THC": "25%",
"Prices": [...],
"Options": [...],
...
}
]
}
```
---
## Integration Points
### As a Task Handler
The organic approach can be integrated as an alternative to curl-based fetching:
```typescript
// In src/tasks/handlers/organic-payload-fetch.ts
export async function handleOrganicPayloadFetch(ctx: TaskContext): Promise<TaskResult> {
// Use puppeteer-based capture
// Save to same payload storage
// Queue product_refresh task
}
```
### Worker Configuration
Add to job_schedules:
```sql
INSERT INTO job_schedules (name, role, cron_expression)
VALUES ('organic_product_crawl', 'organic_payload_fetch', '0 */6 * * *');
```
---
## Troubleshooting
### HTTP 400 Bad Request
- Check hash is correct: `ee29c060...`
- Verify Status is `'Active'` (string, not null)
### 0 Products Returned
- Status was likely `null` or `'All'` - use `'Active'`
- Check platformId is valid MongoDB ObjectId
### Session Not Established
- Increase timeout on initial page.goto()
- Check cName is valid (matches embedded-menu URL)
### Detection/Blocking
- StealthPlugin should handle most cases
- Add random delays between pages
- Use headless: 'new' (not true/false)
---
## Files Reference
| File | Purpose |
|------|---------|
| `backend/test-intercept.js` | Proof of concept script |
| `backend/src/platforms/dutchie/client.ts` | GraphQL hashes, curl implementation |
| `backend/src/tasks/handlers/payload-fetch.ts` | Current curl-based handler |
| `backend/src/utils/payload-storage.ts` | Payload save/load utilities |
---
## See Also
- `DUTCHIE_CRAWL_WORKFLOW.md` - Full crawl pipeline documentation
- `TASK_WORKFLOW_2024-12-10.md` - Task system architecture
- `CLAUDE.md` - Project rules and constraints

View File

@@ -0,0 +1,25 @@
# ARCHIVED DOCUMENTATION
**WARNING: These docs may be outdated or inaccurate.**
The code has evolved significantly. These docs are kept for historical reference only.
## What to Use Instead
**The single source of truth is:**
- `CLAUDE.md` (root) - Essential rules and quick reference
- `docs/CODEBASE_MAP.md` - Current file/directory reference
## Why Archive?
These docs were written during development iterations and may reference:
- Old file paths that no longer exist
- Deprecated approaches (hydration, scraper-v2)
- APIs that have changed
- Database schemas that evolved
## If You Need Details
1. First check CODEBASE_MAP.md for current file locations
2. Then read the actual source code
3. Only use archive docs as a last resort for historical context

View File

@@ -362,6 +362,148 @@ SET status = 'pending', retry_count = retry_count + 1
WHERE status = 'failed' AND retry_count < max_retries;
```
## Concurrent Task Processing (Added 2024-12)
Workers can now process multiple tasks concurrently within a single worker instance. This improves throughput by utilizing async I/O efficiently.
### Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ Pod (K8s) │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ TaskWorker │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Task 1 │ │ Task 2 │ │ Task 3 │ (concurrent)│ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ │ Resource Monitor │ │
│ │ ├── Memory: 65% (threshold: 85%) │ │
│ │ ├── CPU: 45% (threshold: 90%) │ │
│ │ └── Status: Normal │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
```
### Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `MAX_CONCURRENT_TASKS` | 3 | Maximum tasks a worker will run concurrently |
| `MEMORY_BACKOFF_THRESHOLD` | 0.85 | Back off when heap memory exceeds 85% |
| `CPU_BACKOFF_THRESHOLD` | 0.90 | Back off when CPU exceeds 90% |
| `BACKOFF_DURATION_MS` | 10000 | How long to wait when backing off (10s) |
### How It Works
1. **Main Loop**: Worker continuously tries to fill up to `MAX_CONCURRENT_TASKS`
2. **Resource Monitoring**: Before claiming a new task, worker checks memory and CPU
3. **Backoff**: If resources exceed thresholds, worker pauses and stops claiming new tasks
4. **Concurrent Execution**: Tasks run in parallel using `Promise` - they don't block each other
5. **Graceful Shutdown**: On SIGTERM/decommission, worker stops claiming but waits for active tasks
### Resource Monitoring
```typescript
// ResourceStats interface
interface ResourceStats {
memoryPercent: number; // Current heap usage as decimal (0.0-1.0)
memoryMb: number; // Current heap used in MB
memoryTotalMb: number; // Total heap available in MB
cpuPercent: number; // CPU usage as percentage (0-100)
isBackingOff: boolean; // True if worker is in backoff state
backoffReason: string; // Why the worker is backing off
}
```
### Heartbeat Data
Workers report the following in their heartbeat:
```json
{
"worker_id": "worker-abc123",
"current_task_id": 456,
"current_task_ids": [456, 457, 458],
"active_task_count": 3,
"max_concurrent_tasks": 3,
"status": "active",
"resources": {
"memory_mb": 256,
"memory_total_mb": 512,
"memory_rss_mb": 320,
"memory_percent": 50,
"cpu_user_ms": 12500,
"cpu_system_ms": 3200,
"cpu_percent": 45,
"is_backing_off": false,
"backoff_reason": null
}
}
```
### Backoff Behavior
When resources exceed thresholds:
1. Worker logs the backoff reason:
```
[TaskWorker] MyWorker backing off: Memory at 87.3% (threshold: 85%)
```
2. Worker stops claiming new tasks but continues existing tasks
3. After `BACKOFF_DURATION_MS`, worker rechecks resources
4. When resources return to normal:
```
[TaskWorker] MyWorker resuming normal operation
```
### UI Display
The Workers Dashboard shows:
- **Tasks Column**: `2/3 tasks` (active/max concurrent)
- **Resources Column**: Memory % and CPU % with color coding
- Green: < 50%
- Yellow: 50-74%
- Amber: 75-89%
- Red: 90%+
- **Backing Off**: Orange warning badge when worker is in backoff state
### Task Count Badge Details
```
┌─────────────────────────────────────────────┐
│ Worker: "MyWorker" │
│ Tasks: 2/3 tasks #456, #457
│ Resources: 🧠 65% 💻 45% │
│ Status: ● Active │
└─────────────────────────────────────────────┘
```
### Best Practices
1. **Start Conservative**: Use `MAX_CONCURRENT_TASKS=3` initially
2. **Monitor Resources**: Watch for frequent backoffs in logs
3. **Tune Per Workload**: I/O-bound tasks benefit from higher concurrency
4. **Scale Horizontally**: Add more pods rather than cranking concurrency too high
### Code References
| File | Purpose |
|------|---------|
| `src/tasks/task-worker.ts:68-71` | Concurrency environment variables |
| `src/tasks/task-worker.ts:104-111` | ResourceStats interface |
| `src/tasks/task-worker.ts:149-179` | getResourceStats() method |
| `src/tasks/task-worker.ts:184-196` | shouldBackOff() method |
| `src/tasks/task-worker.ts:462-516` | mainLoop() with concurrent claiming |
| `src/routes/worker-registry.ts:148-195` | Heartbeat endpoint handling |
| `cannaiq/src/pages/WorkersDashboard.tsx:233-305` | UI components for resources |
## Monitoring
### Logs

View File

@@ -0,0 +1,77 @@
apiVersion: v1
kind: Service
metadata:
name: scraper-worker
namespace: dispensary-scraper
labels:
app: scraper-worker
spec:
clusterIP: None # Headless service required for StatefulSet
selector:
app: scraper-worker
ports:
- port: 3010
name: http
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: scraper-worker
namespace: dispensary-scraper
spec:
serviceName: scraper-worker
replicas: 8
podManagementPolicy: Parallel # Start all pods at once
updateStrategy:
type: OnDelete # Pods only update when manually deleted - no automatic restarts
selector:
matchLabels:
app: scraper-worker
template:
metadata:
labels:
app: scraper-worker
spec:
terminationGracePeriodSeconds: 60
imagePullSecrets:
- name: regcred
containers:
- name: worker
image: code.cannabrands.app/creationshop/dispensary-scraper:2ed088b4
imagePullPolicy: Always
command: ["node"]
args: ["dist/tasks/task-worker.js"]
env:
- name: WORKER_MODE
value: "true"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: MAX_CONCURRENT_TASKS
value: "50"
- name: API_BASE_URL
value: http://scraper
- name: NODE_OPTIONS
value: --max-old-space-size=1500
envFrom:
- configMapRef:
name: scraper-config
- secretRef:
name: scraper-secrets
resources:
requests:
cpu: 100m
memory: 1Gi
limits:
cpu: 500m
memory: 2Gi
livenessProbe:
exec:
command:
- /bin/sh
- -c
- pgrep -f 'task-worker' > /dev/null
initialDelaySeconds: 10
periodSeconds: 30
failureThreshold: 3

View File

@@ -0,0 +1,27 @@
-- Migration: Worker Commands Table
-- Purpose: Store commands for workers (decommission, etc.)
-- Workers poll this table after each task to check for commands
CREATE TABLE IF NOT EXISTS worker_commands (
id SERIAL PRIMARY KEY,
worker_id TEXT NOT NULL,
command TEXT NOT NULL, -- 'decommission', 'pause', 'resume'
reason TEXT,
issued_by TEXT,
issued_at TIMESTAMPTZ DEFAULT NOW(),
acknowledged_at TIMESTAMPTZ,
executed_at TIMESTAMPTZ,
status TEXT DEFAULT 'pending' -- 'pending', 'acknowledged', 'executed', 'cancelled'
);
-- Index for worker lookups
CREATE INDEX IF NOT EXISTS idx_worker_commands_worker_id ON worker_commands(worker_id);
CREATE INDEX IF NOT EXISTS idx_worker_commands_pending ON worker_commands(worker_id, status) WHERE status = 'pending';
-- Add decommission_requested column to worker_registry for quick checks
ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS decommission_requested BOOLEAN DEFAULT FALSE;
ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS decommission_reason TEXT;
ALTER TABLE worker_registry ADD COLUMN IF NOT EXISTS decommission_requested_at TIMESTAMPTZ;
-- Comment
COMMENT ON TABLE worker_commands IS 'Commands issued to workers (decommission after task, pause, etc.)';

View File

@@ -0,0 +1,27 @@
-- Migration: 082_proxy_notification_trigger
-- Date: 2024-12-11
-- Description: Add PostgreSQL NOTIFY trigger to alert workers when proxies are added
-- Create function to notify workers when active proxy is added/activated
CREATE OR REPLACE FUNCTION notify_proxy_added()
RETURNS TRIGGER AS $$
BEGIN
-- Only notify if proxy is active
IF NEW.active = true THEN
PERFORM pg_notify('proxy_added', NEW.id::text);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Drop existing trigger if any
DROP TRIGGER IF EXISTS proxy_added_trigger ON proxies;
-- Create trigger on insert and update of active column
CREATE TRIGGER proxy_added_trigger
AFTER INSERT OR UPDATE OF active ON proxies
FOR EACH ROW
EXECUTE FUNCTION notify_proxy_added();
COMMENT ON FUNCTION notify_proxy_added() IS
'Sends PostgreSQL NOTIFY to proxy_added channel when an active proxy is added or activated. Workers LISTEN on this channel to wake up immediately.';

View File

@@ -0,0 +1,88 @@
-- Migration 083: Discovery Run Tracking
-- Tracks progress of store discovery runs step-by-step
-- Main discovery runs table
CREATE TABLE IF NOT EXISTS discovery_runs (
id SERIAL PRIMARY KEY,
platform VARCHAR(50) NOT NULL DEFAULT 'dutchie',
status VARCHAR(20) NOT NULL DEFAULT 'running', -- running, completed, failed
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
finished_at TIMESTAMPTZ,
task_id INTEGER REFERENCES worker_task_queue(id),
-- Totals
states_total INTEGER DEFAULT 0,
states_completed INTEGER DEFAULT 0,
locations_discovered INTEGER DEFAULT 0,
locations_promoted INTEGER DEFAULT 0,
new_store_ids INTEGER[] DEFAULT '{}',
-- Error info
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Per-state progress within a run
CREATE TABLE IF NOT EXISTS discovery_run_states (
id SERIAL PRIMARY KEY,
run_id INTEGER NOT NULL REFERENCES discovery_runs(id) ON DELETE CASCADE,
state_code VARCHAR(2) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending, running, completed, failed
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
-- Results
cities_found INTEGER DEFAULT 0,
locations_found INTEGER DEFAULT 0,
locations_upserted INTEGER DEFAULT 0,
new_dispensary_ids INTEGER[] DEFAULT '{}',
-- Error info
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(run_id, state_code)
);
-- Step-by-step log for detailed progress tracking
CREATE TABLE IF NOT EXISTS discovery_run_steps (
id SERIAL PRIMARY KEY,
run_id INTEGER NOT NULL REFERENCES discovery_runs(id) ON DELETE CASCADE,
state_code VARCHAR(2),
step_name VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'started', -- started, completed, failed
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
finished_at TIMESTAMPTZ,
-- Details (JSON for flexibility)
details JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Indexes for querying
CREATE INDEX IF NOT EXISTS idx_discovery_runs_status ON discovery_runs(status);
CREATE INDEX IF NOT EXISTS idx_discovery_runs_platform ON discovery_runs(platform);
CREATE INDEX IF NOT EXISTS idx_discovery_runs_started_at ON discovery_runs(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_discovery_run_states_run_id ON discovery_run_states(run_id);
CREATE INDEX IF NOT EXISTS idx_discovery_run_steps_run_id ON discovery_run_steps(run_id);
-- View for latest run status per platform
CREATE OR REPLACE VIEW v_latest_discovery_runs AS
SELECT DISTINCT ON (platform)
id,
platform,
status,
started_at,
finished_at,
states_total,
states_completed,
locations_discovered,
locations_promoted,
array_length(new_store_ids, 1) as new_stores_count,
error_message,
EXTRACT(EPOCH FROM (COALESCE(finished_at, NOW()) - started_at)) as duration_seconds
FROM discovery_runs
ORDER BY platform, started_at DESC;

View File

@@ -0,0 +1,253 @@
-- Migration 084: Dual Transport Preflight System
-- Workers run both curl and http (Puppeteer) preflights on startup
-- Tasks can require a specific transport method
-- ===================================================================
-- PART 1: Add preflight columns to worker_registry
-- ===================================================================
-- Preflight status for curl/axios transport (proxy-based)
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_curl_status VARCHAR(20) DEFAULT 'pending';
-- Preflight status for http/Puppeteer transport (browser-based)
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_http_status VARCHAR(20) DEFAULT 'pending';
-- Timestamps for when each preflight completed
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_curl_at TIMESTAMPTZ;
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_http_at TIMESTAMPTZ;
-- Error messages for failed preflights
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_curl_error TEXT;
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_http_error TEXT;
-- Response time for successful preflights (ms)
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_curl_ms INTEGER;
ALTER TABLE worker_registry
ADD COLUMN IF NOT EXISTS preflight_http_ms INTEGER;
-- Constraints for preflight status values
ALTER TABLE worker_registry
DROP CONSTRAINT IF EXISTS valid_preflight_curl_status;
ALTER TABLE worker_registry
ADD CONSTRAINT valid_preflight_curl_status
CHECK (preflight_curl_status IN ('pending', 'passed', 'failed', 'skipped'));
ALTER TABLE worker_registry
DROP CONSTRAINT IF EXISTS valid_preflight_http_status;
ALTER TABLE worker_registry
ADD CONSTRAINT valid_preflight_http_status
CHECK (preflight_http_status IN ('pending', 'passed', 'failed', 'skipped'));
-- ===================================================================
-- PART 2: Add method column to worker_tasks
-- ===================================================================
-- Transport method requirement for the task
-- NULL = no preference (any worker can claim)
-- 'curl' = requires curl/axios transport (proxy-based, fast)
-- 'http' = requires http/Puppeteer transport (browser-based, anti-detect)
ALTER TABLE worker_tasks
ADD COLUMN IF NOT EXISTS method VARCHAR(10);
-- Constraint for valid method values
ALTER TABLE worker_tasks
DROP CONSTRAINT IF EXISTS valid_task_method;
ALTER TABLE worker_tasks
ADD CONSTRAINT valid_task_method
CHECK (method IS NULL OR method IN ('curl', 'http'));
-- Index for method-based task claiming
CREATE INDEX IF NOT EXISTS idx_worker_tasks_method
ON worker_tasks(method)
WHERE status = 'pending';
-- Set default method for all existing pending tasks to 'http'
-- ALL current tasks require Puppeteer/browser-based transport
UPDATE worker_tasks
SET method = 'http'
WHERE method IS NULL;
-- ===================================================================
-- PART 3: Update claim_task function for method compatibility
-- ===================================================================
CREATE OR REPLACE FUNCTION claim_task(
p_role VARCHAR(50),
p_worker_id VARCHAR(100),
p_curl_passed BOOLEAN DEFAULT TRUE,
p_http_passed BOOLEAN DEFAULT FALSE
) RETURNS worker_tasks AS $$
DECLARE
claimed_task worker_tasks;
BEGIN
UPDATE worker_tasks
SET
status = 'claimed',
worker_id = p_worker_id,
claimed_at = NOW(),
updated_at = NOW()
WHERE id = (
SELECT id FROM worker_tasks
WHERE role = p_role
AND status = 'pending'
AND (scheduled_for IS NULL OR scheduled_for <= NOW())
-- Method compatibility: worker must have passed the required preflight
AND (
method IS NULL -- No preference, any worker can claim
OR (method = 'curl' AND p_curl_passed = TRUE)
OR (method = 'http' AND p_http_passed = TRUE)
)
-- Exclude stores that already have an active task
AND (dispensary_id IS NULL OR dispensary_id NOT IN (
SELECT dispensary_id FROM worker_tasks
WHERE status IN ('claimed', 'running')
AND dispensary_id IS NOT NULL
))
ORDER BY priority DESC, created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING * INTO claimed_task;
RETURN claimed_task;
END;
$$ LANGUAGE plpgsql;
-- ===================================================================
-- PART 4: Update v_active_workers view
-- ===================================================================
DROP VIEW IF EXISTS v_active_workers;
CREATE VIEW v_active_workers AS
SELECT
wr.id,
wr.worker_id,
wr.friendly_name,
wr.role,
wr.status,
wr.pod_name,
wr.hostname,
wr.started_at,
wr.last_heartbeat_at,
wr.last_task_at,
wr.tasks_completed,
wr.tasks_failed,
wr.current_task_id,
-- Preflight status
wr.preflight_curl_status,
wr.preflight_http_status,
wr.preflight_curl_at,
wr.preflight_http_at,
wr.preflight_curl_error,
wr.preflight_http_error,
wr.preflight_curl_ms,
wr.preflight_http_ms,
-- Computed fields
EXTRACT(EPOCH FROM (NOW() - wr.last_heartbeat_at)) as seconds_since_heartbeat,
CASE
WHEN wr.status = 'offline' THEN 'offline'
WHEN wr.last_heartbeat_at < NOW() - INTERVAL '2 minutes' THEN 'stale'
WHEN wr.current_task_id IS NOT NULL THEN 'busy'
ELSE 'ready'
END as health_status,
-- Capability flags (can this worker handle curl/http tasks?)
(wr.preflight_curl_status = 'passed') as can_curl,
(wr.preflight_http_status = 'passed') as can_http
FROM worker_registry wr
WHERE wr.status != 'terminated'
ORDER BY wr.status = 'active' DESC, wr.last_heartbeat_at DESC;
-- ===================================================================
-- PART 5: View for task queue with method info
-- ===================================================================
DROP VIEW IF EXISTS v_task_history;
CREATE VIEW v_task_history AS
SELECT
t.id,
t.role,
t.dispensary_id,
d.name as dispensary_name,
t.platform,
t.status,
t.priority,
t.method,
t.worker_id,
t.scheduled_for,
t.claimed_at,
t.started_at,
t.completed_at,
t.error_message,
t.retry_count,
t.created_at,
EXTRACT(EPOCH FROM (t.completed_at - t.started_at)) as duration_sec
FROM worker_tasks t
LEFT JOIN dispensaries d ON d.id = t.dispensary_id
ORDER BY t.created_at DESC;
-- ===================================================================
-- PART 6: Helper function to update worker preflight status
-- ===================================================================
CREATE OR REPLACE FUNCTION update_worker_preflight(
p_worker_id VARCHAR(100),
p_transport VARCHAR(10), -- 'curl' or 'http'
p_status VARCHAR(20), -- 'passed', 'failed', 'skipped'
p_response_ms INTEGER DEFAULT NULL,
p_error TEXT DEFAULT NULL
) RETURNS VOID AS $$
BEGIN
IF p_transport = 'curl' THEN
UPDATE worker_registry
SET
preflight_curl_status = p_status,
preflight_curl_at = NOW(),
preflight_curl_ms = p_response_ms,
preflight_curl_error = p_error,
updated_at = NOW()
WHERE worker_id = p_worker_id;
ELSIF p_transport = 'http' THEN
UPDATE worker_registry
SET
preflight_http_status = p_status,
preflight_http_at = NOW(),
preflight_http_ms = p_response_ms,
preflight_http_error = p_error,
updated_at = NOW()
WHERE worker_id = p_worker_id;
END IF;
END;
$$ LANGUAGE plpgsql;
-- ===================================================================
-- Comments
-- ===================================================================
COMMENT ON COLUMN worker_registry.preflight_curl_status IS 'Status of curl/axios preflight: pending, passed, failed, skipped';
COMMENT ON COLUMN worker_registry.preflight_http_status IS 'Status of http/Puppeteer preflight: pending, passed, failed, skipped';
COMMENT ON COLUMN worker_registry.preflight_curl_at IS 'When curl preflight completed';
COMMENT ON COLUMN worker_registry.preflight_http_at IS 'When http preflight completed';
COMMENT ON COLUMN worker_registry.preflight_curl_error IS 'Error message if curl preflight failed';
COMMENT ON COLUMN worker_registry.preflight_http_error IS 'Error message if http preflight failed';
COMMENT ON COLUMN worker_registry.preflight_curl_ms IS 'Response time of successful curl preflight (ms)';
COMMENT ON COLUMN worker_registry.preflight_http_ms IS 'Response time of successful http preflight (ms)';
COMMENT ON COLUMN worker_tasks.method IS 'Transport method required: NULL=any, curl=proxy-based, http=browser-based';
COMMENT ON FUNCTION claim_task IS 'Atomically claim a task, respecting method requirements and per-store locking';
COMMENT ON FUNCTION update_worker_preflight IS 'Update a workers preflight status for a given transport';

View File

@@ -1,6 +1,6 @@
{
"name": "dutchie-menus-backend",
"version": "1.5.1",
"version": "1.6.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
@@ -46,6 +46,97 @@
"resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.4.0.tgz",
"integrity": "sha512-aFT2yemJJo+TZCmieA7qnYGQooOS7QfNmYrzGtsYd3g9j5iDP8AimYYAesf79ohjbLG12XxC4nG5DyEnC88AsQ=="
},
"node_modules/@jsep-plugin/assignment": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/@jsep-plugin/assignment/-/assignment-1.3.0.tgz",
"integrity": "sha512-VVgV+CXrhbMI3aSusQyclHkenWSAm95WaiKrMxRFam3JSUiIaQjoMIw2sEs/OX4XifnqeQUN4DYbJjlA8EfktQ==",
"engines": {
"node": ">= 10.16.0"
},
"peerDependencies": {
"jsep": "^0.4.0||^1.0.0"
}
},
"node_modules/@jsep-plugin/regex": {
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/@jsep-plugin/regex/-/regex-1.0.4.tgz",
"integrity": "sha512-q7qL4Mgjs1vByCaTnDFcBnV9HS7GVPJX5vyVoCgZHNSC9rjwIlmbXG5sUuorR5ndfHAIlJ8pVStxvjXHbNvtUg==",
"engines": {
"node": ">= 10.16.0"
},
"peerDependencies": {
"jsep": "^0.4.0||^1.0.0"
}
},
"node_modules/@kubernetes/client-node": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/@kubernetes/client-node/-/client-node-1.4.0.tgz",
"integrity": "sha512-Zge3YvF7DJi264dU1b3wb/GmzR99JhUpqTvp+VGHfwZT+g7EOOYNScDJNZwXy9cszyIGPIs0VHr+kk8e95qqrA==",
"dependencies": {
"@types/js-yaml": "^4.0.1",
"@types/node": "^24.0.0",
"@types/node-fetch": "^2.6.13",
"@types/stream-buffers": "^3.0.3",
"form-data": "^4.0.0",
"hpagent": "^1.2.0",
"isomorphic-ws": "^5.0.0",
"js-yaml": "^4.1.0",
"jsonpath-plus": "^10.3.0",
"node-fetch": "^2.7.0",
"openid-client": "^6.1.3",
"rfc4648": "^1.3.0",
"socks-proxy-agent": "^8.0.4",
"stream-buffers": "^3.0.2",
"tar-fs": "^3.0.9",
"ws": "^8.18.2"
}
},
"node_modules/@kubernetes/client-node/node_modules/@types/node": {
"version": "24.10.3",
"resolved": "https://registry.npmjs.org/@types/node/-/node-24.10.3.tgz",
"integrity": "sha512-gqkrWUsS8hcm0r44yn7/xZeV1ERva/nLgrLxFRUGb7aoNMIJfZJ3AC261zDQuOAKC7MiXai1WCpYc48jAHoShQ==",
"dependencies": {
"undici-types": "~7.16.0"
}
},
"node_modules/@kubernetes/client-node/node_modules/tar-fs": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/tar-fs/-/tar-fs-3.1.1.tgz",
"integrity": "sha512-LZA0oaPOc2fVo82Txf3gw+AkEd38szODlptMYejQUhndHMLQ9M059uXR+AfS7DNo0NpINvSqDsvyaCrBVkptWg==",
"dependencies": {
"pump": "^3.0.0",
"tar-stream": "^3.1.5"
},
"optionalDependencies": {
"bare-fs": "^4.0.1",
"bare-path": "^3.0.0"
}
},
"node_modules/@kubernetes/client-node/node_modules/undici-types": {
"version": "7.16.0",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.16.0.tgz",
"integrity": "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw=="
},
"node_modules/@kubernetes/client-node/node_modules/ws": {
"version": "8.18.3",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.18.3.tgz",
"integrity": "sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==",
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
},
"node_modules/@mapbox/node-pre-gyp": {
"version": "1.0.11",
"resolved": "https://registry.npmjs.org/@mapbox/node-pre-gyp/-/node-pre-gyp-1.0.11.tgz",
@@ -251,6 +342,11 @@
"integrity": "sha512-r8Tayk8HJnX0FztbZN7oVqGccWgw98T/0neJphO91KkmOzug1KkofZURD4UaD5uH8AqcFLfdPErnBod0u71/qg==",
"dev": true
},
"node_modules/@types/js-yaml": {
"version": "4.0.9",
"resolved": "https://registry.npmjs.org/@types/js-yaml/-/js-yaml-4.0.9.tgz",
"integrity": "sha512-k4MGaQl5TGo/iipqb2UDG2UwjXziSWkh0uysQelTlJpX1qGlpUZYm8PnO4DxG1qBomtJUdYJ6qR6xdIah10JLg=="
},
"node_modules/@types/jsonwebtoken": {
"version": "9.0.10",
"resolved": "https://registry.npmjs.org/@types/jsonwebtoken/-/jsonwebtoken-9.0.10.tgz",
@@ -276,7 +372,6 @@
"version": "20.19.25",
"resolved": "https://registry.npmjs.org/@types/node/-/node-20.19.25.tgz",
"integrity": "sha512-ZsJzA5thDQMSQO788d7IocwwQbI8B5OPzmqNvpf3NY/+MHDAS759Wo0gd2WQeXYt5AAAQjzcrTVC6SKCuYgoCQ==",
"devOptional": true,
"dependencies": {
"undici-types": "~6.21.0"
}
@@ -287,6 +382,15 @@
"integrity": "sha512-0ikrnug3/IyneSHqCBeslAhlK2aBfYek1fGo4bP4QnZPmiqSGRK+Oy7ZMisLWkesffJvQ1cqAcBnJC+8+nxIAg==",
"dev": true
},
"node_modules/@types/node-fetch": {
"version": "2.6.13",
"resolved": "https://registry.npmjs.org/@types/node-fetch/-/node-fetch-2.6.13.tgz",
"integrity": "sha512-QGpRVpzSaUs30JBSGPjOg4Uveu384erbHBoT1zeONvyCfwQxIkUshLAOqN/k9EjGviPRmWTTe6aH2qySWKTVSw==",
"dependencies": {
"@types/node": "*",
"form-data": "^4.0.4"
}
},
"node_modules/@types/pg": {
"version": "8.15.6",
"resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.15.6.tgz",
@@ -340,6 +444,14 @@
"@types/node": "*"
}
},
"node_modules/@types/stream-buffers": {
"version": "3.0.8",
"resolved": "https://registry.npmjs.org/@types/stream-buffers/-/stream-buffers-3.0.8.tgz",
"integrity": "sha512-J+7VaHKNvlNPJPEJXX/fKa9DZtR/xPMwuIbe+yNOwp1YB+ApUOBv2aUpEoBJEi8nJgbgs1x8e73ttg0r1rSUdw==",
"dependencies": {
"@types/node": "*"
}
},
"node_modules/@types/uuid": {
"version": "9.0.8",
"resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.8.tgz",
@@ -520,6 +632,78 @@
}
}
},
"node_modules/bare-fs": {
"version": "4.5.2",
"resolved": "https://registry.npmjs.org/bare-fs/-/bare-fs-4.5.2.tgz",
"integrity": "sha512-veTnRzkb6aPHOvSKIOy60KzURfBdUflr5VReI+NSaPL6xf+XLdONQgZgpYvUuZLVQ8dCqxpBAudaOM1+KpAUxw==",
"optional": true,
"dependencies": {
"bare-events": "^2.5.4",
"bare-path": "^3.0.0",
"bare-stream": "^2.6.4",
"bare-url": "^2.2.2",
"fast-fifo": "^1.3.2"
},
"engines": {
"bare": ">=1.16.0"
},
"peerDependencies": {
"bare-buffer": "*"
},
"peerDependenciesMeta": {
"bare-buffer": {
"optional": true
}
}
},
"node_modules/bare-os": {
"version": "3.6.2",
"resolved": "https://registry.npmjs.org/bare-os/-/bare-os-3.6.2.tgz",
"integrity": "sha512-T+V1+1srU2qYNBmJCXZkUY5vQ0B4FSlL3QDROnKQYOqeiQR8UbjNHlPa+TIbM4cuidiN9GaTaOZgSEgsvPbh5A==",
"optional": true,
"engines": {
"bare": ">=1.14.0"
}
},
"node_modules/bare-path": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/bare-path/-/bare-path-3.0.0.tgz",
"integrity": "sha512-tyfW2cQcB5NN8Saijrhqn0Zh7AnFNsnczRcuWODH0eYAXBsJ5gVxAUuNr7tsHSC6IZ77cA0SitzT+s47kot8Mw==",
"optional": true,
"dependencies": {
"bare-os": "^3.0.1"
}
},
"node_modules/bare-stream": {
"version": "2.7.0",
"resolved": "https://registry.npmjs.org/bare-stream/-/bare-stream-2.7.0.tgz",
"integrity": "sha512-oyXQNicV1y8nc2aKffH+BUHFRXmx6VrPzlnaEvMhram0nPBrKcEdcyBg5r08D0i8VxngHFAiVyn1QKXpSG0B8A==",
"optional": true,
"dependencies": {
"streamx": "^2.21.0"
},
"peerDependencies": {
"bare-buffer": "*",
"bare-events": "*"
},
"peerDependenciesMeta": {
"bare-buffer": {
"optional": true
},
"bare-events": {
"optional": true
}
}
},
"node_modules/bare-url": {
"version": "2.3.2",
"resolved": "https://registry.npmjs.org/bare-url/-/bare-url-2.3.2.tgz",
"integrity": "sha512-ZMq4gd9ngV5aTMa5p9+UfY0b3skwhHELaDkhEHetMdX0LRkW9kzaym4oo/Eh+Ghm0CCDuMTsRIGM/ytUc1ZYmw==",
"optional": true,
"dependencies": {
"bare-path": "^3.0.0"
}
},
"node_modules/base64-js": {
"version": "1.5.1",
"resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz",
@@ -2019,6 +2203,14 @@
"node": ">=16.0.0"
}
},
"node_modules/hpagent": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/hpagent/-/hpagent-1.2.0.tgz",
"integrity": "sha512-A91dYTeIB6NoXG+PxTQpCCDDnfHsW9kc06Lvpu1TEe9gnd6ZFeiBoRO9JvzEv6xK7EX97/dUE8g/vBMTqTS3CA==",
"engines": {
"node": ">=14"
}
},
"node_modules/htmlparser2": {
"version": "10.0.0",
"resolved": "https://registry.npmjs.org/htmlparser2/-/htmlparser2-10.0.0.tgz",
@@ -2382,6 +2574,22 @@
"node": ">=0.10.0"
}
},
"node_modules/isomorphic-ws": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/isomorphic-ws/-/isomorphic-ws-5.0.0.tgz",
"integrity": "sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw==",
"peerDependencies": {
"ws": "*"
}
},
"node_modules/jose": {
"version": "6.1.3",
"resolved": "https://registry.npmjs.org/jose/-/jose-6.1.3.tgz",
"integrity": "sha512-0TpaTfihd4QMNwrz/ob2Bp7X04yuxJkjRGi4aKmOqwhov54i6u79oCv7T+C7lo70MKH6BesI3vscD1yb/yzKXQ==",
"funding": {
"url": "https://github.com/sponsors/panva"
}
},
"node_modules/js-tokens": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/js-tokens/-/js-tokens-4.0.0.tgz",
@@ -2398,6 +2606,14 @@
"js-yaml": "bin/js-yaml.js"
}
},
"node_modules/jsep": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/jsep/-/jsep-1.4.0.tgz",
"integrity": "sha512-B7qPcEVE3NVkmSJbaYxvv4cHkVW7DQsZz13pUMrfS8z8Q/BuShN+gcTXrUlPiGqM2/t/EEaI030bpxMqY8gMlw==",
"engines": {
"node": ">= 10.16.0"
}
},
"node_modules/json-parse-even-better-errors": {
"version": "2.3.1",
"resolved": "https://registry.npmjs.org/json-parse-even-better-errors/-/json-parse-even-better-errors-2.3.1.tgz",
@@ -2419,6 +2635,23 @@
"graceful-fs": "^4.1.6"
}
},
"node_modules/jsonpath-plus": {
"version": "10.3.0",
"resolved": "https://registry.npmjs.org/jsonpath-plus/-/jsonpath-plus-10.3.0.tgz",
"integrity": "sha512-8TNmfeTCk2Le33A3vRRwtuworG/L5RrgMvdjhKZxvyShO+mBu2fP50OWUjRLNtvw344DdDarFh9buFAZs5ujeA==",
"dependencies": {
"@jsep-plugin/assignment": "^1.3.0",
"@jsep-plugin/regex": "^1.0.4",
"jsep": "^1.4.0"
},
"bin": {
"jsonpath": "bin/jsonpath-cli.js",
"jsonpath-plus": "bin/jsonpath-cli.js"
},
"engines": {
"node": ">=18.0.0"
}
},
"node_modules/jsonwebtoken": {
"version": "9.0.2",
"resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.2.tgz",
@@ -2493,6 +2726,11 @@
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
"integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg=="
},
"node_modules/lodash.clonedeep": {
"version": "4.5.0",
"resolved": "https://registry.npmjs.org/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz",
"integrity": "sha512-H5ZhCF25riFd9uB5UCkVKo61m3S/xZk1x4wA6yp/L3RFP6Z/eHH1ymQcGLo7J3GMPfm0V/7m1tryHuGVxpqEBQ=="
},
"node_modules/lodash.defaults": {
"version": "4.2.0",
"resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz",
@@ -2942,6 +3180,14 @@
"url": "https://github.com/fb55/nth-check?sponsor=1"
}
},
"node_modules/oauth4webapi": {
"version": "3.8.3",
"resolved": "https://registry.npmjs.org/oauth4webapi/-/oauth4webapi-3.8.3.tgz",
"integrity": "sha512-pQ5BsX3QRTgnt5HxgHwgunIRaDXBdkT23tf8dfzmtTIL2LTpdmxgbpbBm0VgFWAIDlezQvQCTgnVIUmHupXHxw==",
"funding": {
"url": "https://github.com/sponsors/panva"
}
},
"node_modules/object-assign": {
"version": "4.1.1",
"resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz",
@@ -2980,6 +3226,18 @@
"wrappy": "1"
}
},
"node_modules/openid-client": {
"version": "6.8.1",
"resolved": "https://registry.npmjs.org/openid-client/-/openid-client-6.8.1.tgz",
"integrity": "sha512-VoYT6enBo6Vj2j3Q5Ec0AezS+9YGzQo1f5Xc42lreMGlfP4ljiXPKVDvCADh+XHCV/bqPu/wWSiCVXbJKvrODw==",
"dependencies": {
"jose": "^6.1.0",
"oauth4webapi": "^3.8.2"
},
"funding": {
"url": "https://github.com/sponsors/panva"
}
},
"node_modules/pac-proxy-agent": {
"version": "7.2.0",
"resolved": "https://registry.npmjs.org/pac-proxy-agent/-/pac-proxy-agent-7.2.0.tgz",
@@ -3883,6 +4141,11 @@
"url": "https://github.com/privatenumber/resolve-pkg-maps?sponsor=1"
}
},
"node_modules/rfc4648": {
"version": "1.5.4",
"resolved": "https://registry.npmjs.org/rfc4648/-/rfc4648-1.5.4.tgz",
"integrity": "sha512-rRg/6Lb+IGfJqO05HZkN50UtY7K/JhxJag1kP23+zyMfrvoB0B7RWv06MbOzoc79RgCdNTiUaNsTT1AJZ7Z+cg=="
},
"node_modules/rimraf": {
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/rimraf/-/rimraf-3.0.2.tgz",
@@ -4313,6 +4576,14 @@
"node": ">= 0.8"
}
},
"node_modules/stream-buffers": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/stream-buffers/-/stream-buffers-3.0.3.tgz",
"integrity": "sha512-pqMqwQCso0PBJt2PQmDO0cFj0lyqmiwOMiMSkVtRokl7e+ZTRYgDHKnuZNbqjiJXgsg4nuqtD/zxuo9KqTp0Yw==",
"engines": {
"node": ">= 0.10.0"
}
},
"node_modules/streamx": {
"version": "2.23.0",
"resolved": "https://registry.npmjs.org/streamx/-/streamx-2.23.0.tgz",
@@ -4532,8 +4803,7 @@
"node_modules/undici-types": {
"version": "6.21.0",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz",
"integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==",
"devOptional": true
"integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ=="
},
"node_modules/universalify": {
"version": "2.0.1",
@@ -4556,6 +4826,14 @@
"resolved": "https://registry.npmjs.org/urlpattern-polyfill/-/urlpattern-polyfill-10.0.0.tgz",
"integrity": "sha512-H/A06tKD7sS1O1X2SshBVeA5FLycRpjqiBeqGKmBwBDBy28EnRjORxTNe269KSSr5un5qyWi1iL61wLxpd+ZOg=="
},
"node_modules/user-agents": {
"version": "1.1.669",
"resolved": "https://registry.npmjs.org/user-agents/-/user-agents-1.1.669.tgz",
"integrity": "sha512-pbIzG+AOqCaIpySKJ4IAm1l0VyE4jMnK4y1thV8lm8PYxI+7X5uWcppOK7zY79TCKKTAnJH3/4gaVIZHsjrmJA==",
"dependencies": {
"lodash.clonedeep": "^4.5.0"
}
},
"node_modules/util": {
"version": "0.12.5",
"resolved": "https://registry.npmjs.org/util/-/util-0.12.5.tgz",

View File

@@ -0,0 +1,46 @@
# DEPRECATED CODE - DO NOT USE
**These directories contain OLD, ABANDONED code.**
## What's Here
| Directory | What It Was | Why Deprecated |
|-----------|-------------|----------------|
| `hydration/` | Old pipeline for processing crawl data | Replaced by `src/tasks/handlers/` |
| `scraper-v2/` | Old Puppeteer-based scraper engine | Replaced by curl-based `src/platforms/dutchie/client.ts` |
| `canonical-hydration/` | Intermediate step toward canonical schema | Merged into task handlers |
## What to Use Instead
| Old (DONT USE) | New (USE THIS) |
|----------------|----------------|
| `hydration/normalizers/dutchie.ts` | `src/tasks/handlers/product-refresh.ts` |
| `hydration/producer.ts` | `src/tasks/handlers/payload-fetch.ts` |
| `scraper-v2/engine.ts` | `src/platforms/dutchie/client.ts` |
| `scraper-v2/scheduler.ts` | `src/services/task-scheduler.ts` |
## Why Keep This Code?
- Historical reference only
- Some patterns may be useful for debugging
- Will be deleted once confirmed not needed
## Claude Instructions
**IF YOU ARE CLAUDE:**
1. NEVER import from `src/_deprecated/`
2. NEVER reference these files as examples
3. NEVER try to "fix" or "update" code in here
4. If you see imports from these directories, suggest replacing them
**Correct imports:**
```typescript
// GOOD
import { executeGraphQL } from '../platforms/dutchie/client';
import { pool } from '../db/pool';
// BAD - DO NOT USE
import { something } from '../_deprecated/hydration/...';
import { something } from '../_deprecated/scraper-v2/...';
```

View File

@@ -0,0 +1,584 @@
/**
* System API Routes
*
* Provides REST API endpoints for system monitoring and control:
* - /api/system/sync/* - Sync orchestrator
* - /api/system/dlq/* - Dead-letter queue
* - /api/system/integrity/* - Integrity checks
* - /api/system/fix/* - Auto-fix routines
* - /api/system/alerts/* - System alerts
* - /metrics - Prometheus metrics
*
* Phase 5: Full Production Sync + Monitoring
*/
import { Router, Request, Response } from 'express';
import { Pool } from 'pg';
import {
SyncOrchestrator,
MetricsService,
DLQService,
AlertService,
IntegrityService,
AutoFixService,
} from '../services';
export function createSystemRouter(pool: Pool): Router {
const router = Router();
// Initialize services
const metrics = new MetricsService(pool);
const dlq = new DLQService(pool);
const alerts = new AlertService(pool);
const integrity = new IntegrityService(pool, alerts);
const autoFix = new AutoFixService(pool, alerts);
const orchestrator = new SyncOrchestrator(pool, metrics, dlq, alerts);
// ============================================================
// SYNC ORCHESTRATOR ENDPOINTS
// ============================================================
/**
* GET /api/system/sync/status
* Get current sync status
*/
router.get('/sync/status', async (_req: Request, res: Response) => {
try {
const status = await orchestrator.getStatus();
res.json(status);
} catch (error) {
console.error('[System] Sync status error:', error);
res.status(500).json({ error: 'Failed to get sync status' });
}
});
/**
* POST /api/system/sync/run
* Trigger a sync run
*/
router.post('/sync/run', async (req: Request, res: Response) => {
try {
const triggeredBy = req.body.triggeredBy || 'api';
const result = await orchestrator.runSync();
res.json({
success: true,
triggeredBy,
metrics: result,
});
} catch (error) {
console.error('[System] Sync run error:', error);
res.status(500).json({
success: false,
error: error instanceof Error ? error.message : 'Sync run failed',
});
}
});
/**
* GET /api/system/sync/queue-depth
* Get queue depth information
*/
router.get('/sync/queue-depth', async (_req: Request, res: Response) => {
try {
const depth = await orchestrator.getQueueDepth();
res.json(depth);
} catch (error) {
console.error('[System] Queue depth error:', error);
res.status(500).json({ error: 'Failed to get queue depth' });
}
});
/**
* GET /api/system/sync/health
* Get sync health status
*/
router.get('/sync/health', async (_req: Request, res: Response) => {
try {
const health = await orchestrator.getHealth();
res.status(health.healthy ? 200 : 503).json(health);
} catch (error) {
console.error('[System] Health check error:', error);
res.status(500).json({ healthy: false, error: 'Health check failed' });
}
});
/**
* POST /api/system/sync/pause
* Pause the orchestrator
*/
router.post('/sync/pause', async (req: Request, res: Response) => {
try {
const reason = req.body.reason || 'Manual pause';
await orchestrator.pause(reason);
res.json({ success: true, message: 'Orchestrator paused' });
} catch (error) {
console.error('[System] Pause error:', error);
res.status(500).json({ error: 'Failed to pause orchestrator' });
}
});
/**
* POST /api/system/sync/resume
* Resume the orchestrator
*/
router.post('/sync/resume', async (_req: Request, res: Response) => {
try {
await orchestrator.resume();
res.json({ success: true, message: 'Orchestrator resumed' });
} catch (error) {
console.error('[System] Resume error:', error);
res.status(500).json({ error: 'Failed to resume orchestrator' });
}
});
// ============================================================
// DLQ ENDPOINTS
// ============================================================
/**
* GET /api/system/dlq
* List DLQ payloads
*/
router.get('/dlq', async (req: Request, res: Response) => {
try {
const options = {
status: req.query.status as string,
errorType: req.query.errorType as string,
dispensaryId: req.query.dispensaryId ? parseInt(req.query.dispensaryId as string) : undefined,
limit: req.query.limit ? parseInt(req.query.limit as string) : 50,
offset: req.query.offset ? parseInt(req.query.offset as string) : 0,
};
const result = await dlq.listPayloads(options);
res.json(result);
} catch (error) {
console.error('[System] DLQ list error:', error);
res.status(500).json({ error: 'Failed to list DLQ payloads' });
}
});
/**
* GET /api/system/dlq/stats
* Get DLQ statistics
*/
router.get('/dlq/stats', async (_req: Request, res: Response) => {
try {
const stats = await dlq.getStats();
res.json(stats);
} catch (error) {
console.error('[System] DLQ stats error:', error);
res.status(500).json({ error: 'Failed to get DLQ stats' });
}
});
/**
* GET /api/system/dlq/summary
* Get DLQ summary by error type
*/
router.get('/dlq/summary', async (_req: Request, res: Response) => {
try {
const summary = await dlq.getSummary();
res.json(summary);
} catch (error) {
console.error('[System] DLQ summary error:', error);
res.status(500).json({ error: 'Failed to get DLQ summary' });
}
});
/**
* GET /api/system/dlq/:id
* Get a specific DLQ payload
*/
router.get('/dlq/:id', async (req: Request, res: Response) => {
try {
const payload = await dlq.getPayload(req.params.id);
if (!payload) {
return res.status(404).json({ error: 'Payload not found' });
}
res.json(payload);
} catch (error) {
console.error('[System] DLQ get error:', error);
res.status(500).json({ error: 'Failed to get DLQ payload' });
}
});
/**
* POST /api/system/dlq/:id/retry
* Retry a DLQ payload
*/
router.post('/dlq/:id/retry', async (req: Request, res: Response) => {
try {
const result = await dlq.retryPayload(req.params.id);
if (result.success) {
res.json(result);
} else {
res.status(400).json(result);
}
} catch (error) {
console.error('[System] DLQ retry error:', error);
res.status(500).json({ error: 'Failed to retry payload' });
}
});
/**
* POST /api/system/dlq/:id/abandon
* Abandon a DLQ payload
*/
router.post('/dlq/:id/abandon', async (req: Request, res: Response) => {
try {
const reason = req.body.reason || 'Manually abandoned';
const abandonedBy = req.body.abandonedBy || 'api';
const success = await dlq.abandonPayload(req.params.id, reason, abandonedBy);
res.json({ success });
} catch (error) {
console.error('[System] DLQ abandon error:', error);
res.status(500).json({ error: 'Failed to abandon payload' });
}
});
/**
* POST /api/system/dlq/bulk-retry
* Bulk retry payloads by error type
*/
router.post('/dlq/bulk-retry', async (req: Request, res: Response) => {
try {
const { errorType } = req.body;
if (!errorType) {
return res.status(400).json({ error: 'errorType is required' });
}
const result = await dlq.bulkRetryByErrorType(errorType);
res.json(result);
} catch (error) {
console.error('[System] DLQ bulk retry error:', error);
res.status(500).json({ error: 'Failed to bulk retry' });
}
});
// ============================================================
// INTEGRITY CHECK ENDPOINTS
// ============================================================
/**
* POST /api/system/integrity/run
* Run all integrity checks
*/
router.post('/integrity/run', async (req: Request, res: Response) => {
try {
const triggeredBy = req.body.triggeredBy || 'api';
const result = await integrity.runAllChecks(triggeredBy);
res.json(result);
} catch (error) {
console.error('[System] Integrity run error:', error);
res.status(500).json({ error: 'Failed to run integrity checks' });
}
});
/**
* GET /api/system/integrity/runs
* Get recent integrity check runs
*/
router.get('/integrity/runs', async (req: Request, res: Response) => {
try {
const limit = req.query.limit ? parseInt(req.query.limit as string) : 10;
const runs = await integrity.getRecentRuns(limit);
res.json(runs);
} catch (error) {
console.error('[System] Integrity runs error:', error);
res.status(500).json({ error: 'Failed to get integrity runs' });
}
});
/**
* GET /api/system/integrity/runs/:runId
* Get results for a specific integrity run
*/
router.get('/integrity/runs/:runId', async (req: Request, res: Response) => {
try {
const results = await integrity.getRunResults(req.params.runId);
res.json(results);
} catch (error) {
console.error('[System] Integrity run results error:', error);
res.status(500).json({ error: 'Failed to get run results' });
}
});
// ============================================================
// AUTO-FIX ENDPOINTS
// ============================================================
/**
* GET /api/system/fix/routines
* Get available fix routines
*/
router.get('/fix/routines', (_req: Request, res: Response) => {
try {
const routines = autoFix.getAvailableRoutines();
res.json(routines);
} catch (error) {
console.error('[System] Get routines error:', error);
res.status(500).json({ error: 'Failed to get routines' });
}
});
/**
* POST /api/system/fix/:routine
* Run a fix routine
*/
router.post('/fix/:routine', async (req: Request, res: Response) => {
try {
const routineName = req.params.routine;
const dryRun = req.body.dryRun === true;
const triggeredBy = req.body.triggeredBy || 'api';
const result = await autoFix.runRoutine(routineName as any, triggeredBy, { dryRun });
res.json(result);
} catch (error) {
console.error('[System] Fix routine error:', error);
res.status(500).json({ error: 'Failed to run fix routine' });
}
});
/**
* GET /api/system/fix/runs
* Get recent fix runs
*/
router.get('/fix/runs', async (req: Request, res: Response) => {
try {
const limit = req.query.limit ? parseInt(req.query.limit as string) : 20;
const runs = await autoFix.getRecentRuns(limit);
res.json(runs);
} catch (error) {
console.error('[System] Fix runs error:', error);
res.status(500).json({ error: 'Failed to get fix runs' });
}
});
// ============================================================
// ALERTS ENDPOINTS
// ============================================================
/**
* GET /api/system/alerts
* List alerts
*/
router.get('/alerts', async (req: Request, res: Response) => {
try {
const options = {
status: req.query.status as any,
severity: req.query.severity as any,
type: req.query.type as string,
limit: req.query.limit ? parseInt(req.query.limit as string) : 50,
offset: req.query.offset ? parseInt(req.query.offset as string) : 0,
};
const result = await alerts.listAlerts(options);
res.json(result);
} catch (error) {
console.error('[System] Alerts list error:', error);
res.status(500).json({ error: 'Failed to list alerts' });
}
});
/**
* GET /api/system/alerts/active
* Get active alerts
*/
router.get('/alerts/active', async (_req: Request, res: Response) => {
try {
const activeAlerts = await alerts.getActiveAlerts();
res.json(activeAlerts);
} catch (error) {
console.error('[System] Active alerts error:', error);
res.status(500).json({ error: 'Failed to get active alerts' });
}
});
/**
* GET /api/system/alerts/summary
* Get alert summary
*/
router.get('/alerts/summary', async (_req: Request, res: Response) => {
try {
const summary = await alerts.getSummary();
res.json(summary);
} catch (error) {
console.error('[System] Alerts summary error:', error);
res.status(500).json({ error: 'Failed to get alerts summary' });
}
});
/**
* POST /api/system/alerts/:id/acknowledge
* Acknowledge an alert
*/
router.post('/alerts/:id/acknowledge', async (req: Request, res: Response) => {
try {
const alertId = parseInt(req.params.id);
const acknowledgedBy = req.body.acknowledgedBy || 'api';
const success = await alerts.acknowledgeAlert(alertId, acknowledgedBy);
res.json({ success });
} catch (error) {
console.error('[System] Acknowledge alert error:', error);
res.status(500).json({ error: 'Failed to acknowledge alert' });
}
});
/**
* POST /api/system/alerts/:id/resolve
* Resolve an alert
*/
router.post('/alerts/:id/resolve', async (req: Request, res: Response) => {
try {
const alertId = parseInt(req.params.id);
const resolvedBy = req.body.resolvedBy || 'api';
const success = await alerts.resolveAlert(alertId, resolvedBy);
res.json({ success });
} catch (error) {
console.error('[System] Resolve alert error:', error);
res.status(500).json({ error: 'Failed to resolve alert' });
}
});
/**
* POST /api/system/alerts/bulk-acknowledge
* Bulk acknowledge alerts
*/
router.post('/alerts/bulk-acknowledge', async (req: Request, res: Response) => {
try {
const { ids, acknowledgedBy } = req.body;
if (!ids || !Array.isArray(ids)) {
return res.status(400).json({ error: 'ids array is required' });
}
const count = await alerts.bulkAcknowledge(ids, acknowledgedBy || 'api');
res.json({ acknowledged: count });
} catch (error) {
console.error('[System] Bulk acknowledge error:', error);
res.status(500).json({ error: 'Failed to bulk acknowledge' });
}
});
// ============================================================
// METRICS ENDPOINTS
// ============================================================
/**
* GET /api/system/metrics
* Get all current metrics
*/
router.get('/metrics', async (_req: Request, res: Response) => {
try {
const allMetrics = await metrics.getAllMetrics();
res.json(allMetrics);
} catch (error) {
console.error('[System] Metrics error:', error);
res.status(500).json({ error: 'Failed to get metrics' });
}
});
/**
* GET /api/system/metrics/:name
* Get a specific metric
*/
router.get('/metrics/:name', async (req: Request, res: Response) => {
try {
const metric = await metrics.getMetric(req.params.name);
if (!metric) {
return res.status(404).json({ error: 'Metric not found' });
}
res.json(metric);
} catch (error) {
console.error('[System] Metric error:', error);
res.status(500).json({ error: 'Failed to get metric' });
}
});
/**
* GET /api/system/metrics/:name/history
* Get metric time series
*/
router.get('/metrics/:name/history', async (req: Request, res: Response) => {
try {
const hours = req.query.hours ? parseInt(req.query.hours as string) : 24;
const history = await metrics.getMetricHistory(req.params.name, hours);
res.json(history);
} catch (error) {
console.error('[System] Metric history error:', error);
res.status(500).json({ error: 'Failed to get metric history' });
}
});
/**
* GET /api/system/errors
* Get error summary
*/
router.get('/errors', async (_req: Request, res: Response) => {
try {
const summary = await metrics.getErrorSummary();
res.json(summary);
} catch (error) {
console.error('[System] Error summary error:', error);
res.status(500).json({ error: 'Failed to get error summary' });
}
});
/**
* GET /api/system/errors/recent
* Get recent errors
*/
router.get('/errors/recent', async (req: Request, res: Response) => {
try {
const limit = req.query.limit ? parseInt(req.query.limit as string) : 50;
const errorType = req.query.type as string;
const errors = await metrics.getRecentErrors(limit, errorType);
res.json(errors);
} catch (error) {
console.error('[System] Recent errors error:', error);
res.status(500).json({ error: 'Failed to get recent errors' });
}
});
/**
* POST /api/system/errors/acknowledge
* Acknowledge errors
*/
router.post('/errors/acknowledge', async (req: Request, res: Response) => {
try {
const { ids, acknowledgedBy } = req.body;
if (!ids || !Array.isArray(ids)) {
return res.status(400).json({ error: 'ids array is required' });
}
const count = await metrics.acknowledgeErrors(ids, acknowledgedBy || 'api');
res.json({ acknowledged: count });
} catch (error) {
console.error('[System] Acknowledge errors error:', error);
res.status(500).json({ error: 'Failed to acknowledge errors' });
}
});
return router;
}
/**
* Create Prometheus metrics endpoint (standalone)
*/
export function createPrometheusRouter(pool: Pool): Router {
const router = Router();
const metrics = new MetricsService(pool);
/**
* GET /metrics
* Prometheus-compatible metrics endpoint
*/
router.get('/', async (_req: Request, res: Response) => {
try {
const prometheusOutput = await metrics.getPrometheusMetrics();
res.set('Content-Type', 'text/plain; version=0.0.4');
res.send(prometheusOutput);
} catch (error) {
console.error('[Prometheus] Metrics error:', error);
res.status(500).send('# Error generating metrics');
}
});
return router;
}

View File

@@ -109,7 +109,7 @@ import scraperMonitorRoutes from './routes/scraper-monitor';
import apiTokensRoutes from './routes/api-tokens';
import apiPermissionsRoutes from './routes/api-permissions';
import parallelScrapeRoutes from './routes/parallel-scrape';
import crawlerSandboxRoutes from './routes/crawler-sandbox';
// crawler-sandbox moved to _deprecated
import versionRoutes from './routes/version';
import deployStatusRoutes from './routes/deploy-status';
import publicApiRoutes from './routes/public-api';
@@ -146,6 +146,7 @@ import tasksRoutes from './routes/tasks';
import workerRegistryRoutes from './routes/worker-registry';
// Per TASK_WORKFLOW_2024-12-10.md: Raw payload access API
import payloadsRoutes from './routes/payloads';
import k8sRoutes from './routes/k8s';
// Mark requests from trusted domains (cannaiq.co, findagram.co, findadispo.com)
// These domains can access the API without authentication
@@ -186,7 +187,7 @@ app.use('/api/scraper-monitor', scraperMonitorRoutes);
app.use('/api/api-tokens', apiTokensRoutes);
app.use('/api/api-permissions', apiPermissionsRoutes);
app.use('/api/parallel-scrape', parallelScrapeRoutes);
app.use('/api/crawler-sandbox', crawlerSandboxRoutes);
// crawler-sandbox moved to _deprecated
app.use('/api/version', versionRoutes);
app.use('/api/admin/deploy-status', deployStatusRoutes);
console.log('[DeployStatus] Routes registered at /api/admin/deploy-status');
@@ -230,6 +231,10 @@ console.log('[WorkerRegistry] Routes registered at /api/worker-registry');
app.use('/api/payloads', payloadsRoutes);
console.log('[Payloads] Routes registered at /api/payloads');
// K8s control routes - worker scaling from admin UI
app.use('/api/k8s', k8sRoutes);
console.log('[K8s] Routes registered at /api/k8s');
// Phase 3: Analytics V2 - Enhanced analytics with rec/med state segmentation
try {
const analyticsV2Router = createAnalyticsV2Router(getPool());

View File

@@ -702,12 +702,10 @@ export class StateQueryService {
async getNationalSummary(): Promise<NationalSummary> {
const stateMetrics = await this.getAllStateMetrics();
// Get all states count and aggregate metrics
const result = await this.pool.query(`
SELECT
COUNT(DISTINCT s.code) AS total_states,
COUNT(DISTINCT CASE WHEN EXISTS (
SELECT 1 FROM dispensaries d WHERE d.state = s.code AND d.menu_type IS NOT NULL
) THEN s.code END) AS active_states,
(SELECT COUNT(*) FROM dispensaries WHERE state IS NOT NULL) AS total_stores,
(SELECT COUNT(*) FROM store_products sp
JOIN dispensaries d ON sp.dispensary_id = d.id
@@ -725,7 +723,7 @@ export class StateQueryService {
return {
totalStates: parseInt(data.total_states),
activeStates: parseInt(data.active_states),
activeStates: parseInt(data.total_states), // Same as totalStates - all states shown
totalStores: parseInt(data.total_stores),
totalProducts: parseInt(data.total_products),
totalBrands: parseInt(data.total_brands),

View File

@@ -47,4 +47,27 @@ router.post('/refresh', authMiddleware, async (req: AuthRequest, res) => {
res.json({ token });
});
// Verify password for sensitive actions (requires current user to be authenticated)
router.post('/verify-password', authMiddleware, async (req: AuthRequest, res) => {
try {
const { password } = req.body;
if (!password) {
return res.status(400).json({ error: 'Password required' });
}
// Re-authenticate the current user with the provided password
const user = await authenticateUser(req.user!.email, password);
if (!user) {
return res.status(401).json({ error: 'Invalid password', verified: false });
}
res.json({ verified: true });
} catch (error) {
console.error('Password verification error:', error);
res.status(500).json({ error: 'Internal server error' });
}
});
export default router;

View File

@@ -14,13 +14,25 @@ router.use(authMiddleware);
/**
* GET /api/admin/intelligence/brands
* List all brands with state presence, store counts, and pricing
* Query params:
* - state: Filter by state (e.g., "AZ")
* - limit: Max results (default 500)
* - offset: Pagination offset
*/
router.get('/brands', async (req: Request, res: Response) => {
try {
const { limit = '500', offset = '0' } = req.query;
const { limit = '500', offset = '0', state } = req.query;
const limitNum = Math.min(parseInt(limit as string, 10), 1000);
const offsetNum = parseInt(offset as string, 10);
// Build WHERE clause based on state filter
let stateFilter = '';
const params: any[] = [limitNum, offsetNum];
if (state && state !== 'all') {
stateFilter = 'AND d.state = $3';
params.push(state);
}
const { rows } = await pool.query(`
SELECT
sp.brand_name_raw as brand_name,
@@ -32,17 +44,26 @@ router.get('/brands', async (req: Request, res: Response) => {
FROM store_products sp
JOIN dispensaries d ON sp.dispensary_id = d.id
WHERE sp.brand_name_raw IS NOT NULL AND sp.brand_name_raw != ''
${stateFilter}
GROUP BY sp.brand_name_raw
ORDER BY store_count DESC, sku_count DESC
LIMIT $1 OFFSET $2
`, [limitNum, offsetNum]);
`, params);
// Get total count
// Get total count with same state filter
const countParams: any[] = [];
let countStateFilter = '';
if (state && state !== 'all') {
countStateFilter = 'AND d.state = $1';
countParams.push(state);
}
const { rows: countRows } = await pool.query(`
SELECT COUNT(DISTINCT brand_name_raw) as total
FROM store_products
WHERE brand_name_raw IS NOT NULL AND brand_name_raw != ''
`);
SELECT COUNT(DISTINCT sp.brand_name_raw) as total
FROM store_products sp
JOIN dispensaries d ON sp.dispensary_id = d.id
WHERE sp.brand_name_raw IS NOT NULL AND sp.brand_name_raw != ''
${countStateFilter}
`, countParams);
res.json({
brands: rows.map((r: any) => ({
@@ -147,23 +168,58 @@ router.get('/brands/:brandName/penetration', async (req: Request, res: Response)
/**
* GET /api/admin/intelligence/pricing
* Get pricing analytics by category
* Query params:
* - state: Filter by state (e.g., "AZ")
*/
router.get('/pricing', async (req: Request, res: Response) => {
try {
const { rows: categoryRows } = await pool.query(`
SELECT
sp.category_raw as category,
ROUND(AVG(sp.price_rec)::numeric, 2) as avg_price,
MIN(sp.price_rec) as min_price,
MAX(sp.price_rec) as max_price,
ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY sp.price_rec)::numeric, 2) as median_price,
COUNT(*) as product_count
FROM store_products sp
WHERE sp.category_raw IS NOT NULL AND sp.price_rec > 0
GROUP BY sp.category_raw
ORDER BY product_count DESC
`);
const { state } = req.query;
// Build WHERE clause based on state filter
let stateFilter = '';
const categoryParams: any[] = [];
const stateQueryParams: any[] = [];
const overallParams: any[] = [];
if (state && state !== 'all') {
stateFilter = 'AND d.state = $1';
categoryParams.push(state);
overallParams.push(state);
}
// Category pricing with optional state filter
const categoryQuery = state && state !== 'all'
? `
SELECT
sp.category_raw as category,
ROUND(AVG(sp.price_rec)::numeric, 2) as avg_price,
MIN(sp.price_rec) as min_price,
MAX(sp.price_rec) as max_price,
ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY sp.price_rec)::numeric, 2) as median_price,
COUNT(*) as product_count
FROM store_products sp
JOIN dispensaries d ON sp.dispensary_id = d.id
WHERE sp.category_raw IS NOT NULL AND sp.price_rec > 0 ${stateFilter}
GROUP BY sp.category_raw
ORDER BY product_count DESC
`
: `
SELECT
sp.category_raw as category,
ROUND(AVG(sp.price_rec)::numeric, 2) as avg_price,
MIN(sp.price_rec) as min_price,
MAX(sp.price_rec) as max_price,
ROUND(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY sp.price_rec)::numeric, 2) as median_price,
COUNT(*) as product_count
FROM store_products sp
WHERE sp.category_raw IS NOT NULL AND sp.price_rec > 0
GROUP BY sp.category_raw
ORDER BY product_count DESC
`;
const { rows: categoryRows } = await pool.query(categoryQuery, categoryParams);
// State pricing
const { rows: stateRows } = await pool.query(`
SELECT
d.state,
@@ -178,6 +234,31 @@ router.get('/pricing', async (req: Request, res: Response) => {
ORDER BY avg_price DESC
`);
// Overall stats with optional state filter
const overallQuery = state && state !== 'all'
? `
SELECT
ROUND(AVG(sp.price_rec)::numeric, 2) as avg_price,
MIN(sp.price_rec) as min_price,
MAX(sp.price_rec) as max_price,
COUNT(*) as total_products
FROM store_products sp
JOIN dispensaries d ON sp.dispensary_id = d.id
WHERE sp.price_rec > 0 ${stateFilter}
`
: `
SELECT
ROUND(AVG(sp.price_rec)::numeric, 2) as avg_price,
MIN(sp.price_rec) as min_price,
MAX(sp.price_rec) as max_price,
COUNT(*) as total_products
FROM store_products sp
WHERE sp.price_rec > 0
`;
const { rows: overallRows } = await pool.query(overallQuery, overallParams);
const overall = overallRows[0];
res.json({
byCategory: categoryRows.map((r: any) => ({
category: r.category,
@@ -194,6 +275,12 @@ router.get('/pricing', async (req: Request, res: Response) => {
maxPrice: r.max_price ? parseFloat(r.max_price) : null,
productCount: parseInt(r.product_count, 10),
})),
overall: {
avgPrice: overall?.avg_price ? parseFloat(overall.avg_price) : null,
minPrice: overall?.min_price ? parseFloat(overall.min_price) : null,
maxPrice: overall?.max_price ? parseFloat(overall.max_price) : null,
totalProducts: parseInt(overall?.total_products || '0', 10),
},
});
} catch (error: any) {
console.error('[Intelligence] Error fetching pricing:', error.message);
@@ -204,9 +291,23 @@ router.get('/pricing', async (req: Request, res: Response) => {
/**
* GET /api/admin/intelligence/stores
* Get store intelligence summary
* Query params:
* - state: Filter by state (e.g., "AZ")
* - limit: Max results (default 200)
*/
router.get('/stores', async (req: Request, res: Response) => {
try {
const { state, limit = '200' } = req.query;
const limitNum = Math.min(parseInt(limit as string, 10), 500);
// Build WHERE clause based on state filter
let stateFilter = '';
const params: any[] = [limitNum];
if (state && state !== 'all') {
stateFilter = 'AND d.state = $2';
params.push(state);
}
const { rows: storeRows } = await pool.query(`
SELECT
d.id,
@@ -216,17 +317,22 @@ router.get('/stores', async (req: Request, res: Response) => {
d.state,
d.menu_type,
d.crawl_enabled,
COUNT(DISTINCT sp.id) as product_count,
c.name as chain_name,
COUNT(DISTINCT sp.id) as sku_count,
COUNT(DISTINCT sp.brand_name_raw) as brand_count,
ROUND(AVG(sp.price_rec)::numeric, 2) as avg_price,
MAX(sp.updated_at) as last_product_update
MAX(sp.updated_at) as last_crawl,
(SELECT COUNT(*) FROM store_product_snapshots sps
WHERE sps.store_product_id IN (SELECT id FROM store_products WHERE dispensary_id = d.id)) as snapshot_count
FROM dispensaries d
LEFT JOIN store_products sp ON sp.dispensary_id = d.id
WHERE d.state IS NOT NULL
GROUP BY d.id, d.name, d.dba_name, d.city, d.state, d.menu_type, d.crawl_enabled
ORDER BY product_count DESC
LIMIT 200
`);
LEFT JOIN chains c ON d.chain_id = c.id
WHERE d.state IS NOT NULL AND d.crawl_enabled = true
${stateFilter}
GROUP BY d.id, d.name, d.dba_name, d.city, d.state, d.menu_type, d.crawl_enabled, c.name
ORDER BY sku_count DESC
LIMIT $1
`, params);
res.json({
stores: storeRows.map((r: any) => ({
@@ -237,10 +343,13 @@ router.get('/stores', async (req: Request, res: Response) => {
state: r.state,
menuType: r.menu_type,
crawlEnabled: r.crawl_enabled,
productCount: parseInt(r.product_count || '0', 10),
chainName: r.chain_name || null,
skuCount: parseInt(r.sku_count || '0', 10),
snapshotCount: parseInt(r.snapshot_count || '0', 10),
brandCount: parseInt(r.brand_count || '0', 10),
avgPrice: r.avg_price ? parseFloat(r.avg_price) : null,
lastProductUpdate: r.last_product_update,
lastCrawl: r.last_crawl,
crawlFrequencyHours: 4, // Default crawl frequency
})),
total: storeRows.length,
});

140
backend/src/routes/k8s.ts Normal file
View File

@@ -0,0 +1,140 @@
/**
* Kubernetes Control Routes
*
* Provides admin UI control over k8s resources like worker scaling.
* Uses in-cluster config when running in k8s, or kubeconfig locally.
*/
import { Router, Request, Response } from 'express';
import * as k8s from '@kubernetes/client-node';
const router = Router();
// K8s client setup - lazy initialization
let appsApi: k8s.AppsV1Api | null = null;
let k8sError: string | null = null;
function getK8sClient(): k8s.AppsV1Api | null {
if (appsApi) return appsApi;
if (k8sError) return null;
try {
const kc = new k8s.KubeConfig();
// Try in-cluster config first (when running in k8s)
try {
kc.loadFromCluster();
console.log('[K8s] Loaded in-cluster config');
} catch {
// Fall back to default kubeconfig (local dev)
try {
kc.loadFromDefault();
console.log('[K8s] Loaded default kubeconfig');
} catch (e) {
k8sError = 'No k8s config available';
console.log('[K8s] No config available - k8s routes disabled');
return null;
}
}
appsApi = kc.makeApiClient(k8s.AppsV1Api);
return appsApi;
} catch (e: any) {
k8sError = e.message;
console.error('[K8s] Failed to initialize client:', e.message);
return null;
}
}
const NAMESPACE = process.env.K8S_NAMESPACE || 'dispensary-scraper';
const WORKER_DEPLOYMENT = 'scraper-worker';
/**
* GET /api/k8s/workers
* Get current worker deployment status
*/
router.get('/workers', async (_req: Request, res: Response) => {
const client = getK8sClient();
if (!client) {
return res.json({
success: true,
available: false,
error: k8sError || 'K8s not available',
replicas: 0,
readyReplicas: 0,
});
}
try {
const deployment = await client.readNamespacedDeployment({
name: WORKER_DEPLOYMENT,
namespace: NAMESPACE,
});
res.json({
success: true,
available: true,
replicas: deployment.spec?.replicas || 0,
readyReplicas: deployment.status?.readyReplicas || 0,
availableReplicas: deployment.status?.availableReplicas || 0,
updatedReplicas: deployment.status?.updatedReplicas || 0,
});
} catch (e: any) {
console.error('[K8s] Error getting deployment:', e.message);
res.status(500).json({
success: false,
error: e.message,
});
}
});
/**
* POST /api/k8s/workers/scale
* Scale worker deployment
* Body: { replicas: number }
*/
router.post('/workers/scale', async (req: Request, res: Response) => {
const client = getK8sClient();
if (!client) {
return res.status(503).json({
success: false,
error: k8sError || 'K8s not available',
});
}
const { replicas } = req.body;
if (typeof replicas !== 'number' || replicas < 0 || replicas > 50) {
return res.status(400).json({
success: false,
error: 'replicas must be a number between 0 and 50',
});
}
try {
// Patch the deployment to set replicas
await client.patchNamespacedDeploymentScale({
name: WORKER_DEPLOYMENT,
namespace: NAMESPACE,
body: { spec: { replicas } },
});
console.log(`[K8s] Scaled ${WORKER_DEPLOYMENT} to ${replicas} replicas`);
res.json({
success: true,
replicas,
message: `Scaled to ${replicas} workers`,
});
} catch (e: any) {
console.error('[K8s] Error scaling deployment:', e.message);
res.status(500).json({
success: false,
error: e.message,
});
}
});
export default router;

View File

@@ -291,6 +291,107 @@ router.get('/stores/:id/summary', async (req: Request, res: Response) => {
}
});
/**
* GET /api/markets/stores/:id/crawl-history
* Get crawl history for a specific store
*/
router.get('/stores/:id/crawl-history', async (req: Request, res: Response) => {
try {
const { id } = req.params;
const { limit = '50' } = req.query;
const dispensaryId = parseInt(id, 10);
const limitNum = Math.min(parseInt(limit as string, 10), 100);
// Get crawl history from crawl_orchestration_traces
const { rows: historyRows } = await pool.query(`
SELECT
id,
run_id,
profile_key,
crawler_module,
state_at_start,
state_at_end,
total_steps,
duration_ms,
success,
error_message,
products_found,
started_at,
completed_at
FROM crawl_orchestration_traces
WHERE dispensary_id = $1
ORDER BY started_at DESC
LIMIT $2
`, [dispensaryId, limitNum]);
// Get next scheduled crawl if available
const { rows: scheduleRows } = await pool.query(`
SELECT
js.id as schedule_id,
js.job_name,
js.enabled,
js.base_interval_minutes,
js.jitter_minutes,
js.next_run_at,
js.last_run_at,
js.last_status
FROM job_schedules js
WHERE js.enabled = true
AND js.job_config->>'dispensaryId' = $1::text
ORDER BY js.next_run_at
LIMIT 1
`, [dispensaryId.toString()]);
// Get dispensary info for slug
const { rows: dispRows } = await pool.query(`
SELECT
id,
name,
dba_name,
slug,
state,
city,
menu_type,
platform_dispensary_id,
last_menu_scrape
FROM dispensaries
WHERE id = $1
`, [dispensaryId]);
res.json({
dispensary: dispRows[0] || null,
history: historyRows.map(row => ({
id: row.id,
runId: row.run_id,
profileKey: row.profile_key,
crawlerModule: row.crawler_module,
stateAtStart: row.state_at_start,
stateAtEnd: row.state_at_end,
totalSteps: row.total_steps,
durationMs: row.duration_ms,
success: row.success,
errorMessage: row.error_message,
productsFound: row.products_found,
startedAt: row.started_at?.toISOString() || null,
completedAt: row.completed_at?.toISOString() || null,
})),
nextSchedule: scheduleRows[0] ? {
scheduleId: scheduleRows[0].schedule_id,
jobName: scheduleRows[0].job_name,
enabled: scheduleRows[0].enabled,
baseIntervalMinutes: scheduleRows[0].base_interval_minutes,
jitterMinutes: scheduleRows[0].jitter_minutes,
nextRunAt: scheduleRows[0].next_run_at?.toISOString() || null,
lastRunAt: scheduleRows[0].last_run_at?.toISOString() || null,
lastStatus: scheduleRows[0].last_status,
} : null,
});
} catch (error: any) {
console.error('[Markets] Error fetching crawl history:', error.message);
res.status(500).json({ error: error.message });
}
});
/**
* GET /api/markets/stores/:id/products
* Get products for a store with filtering and pagination

View File

@@ -78,14 +78,14 @@ router.get('/metrics', async (_req: Request, res: Response) => {
/**
* GET /api/admin/orchestrator/states
* Returns array of states with at least one known dispensary
* Returns array of states with at least one crawl-enabled dispensary
*/
router.get('/states', async (_req: Request, res: Response) => {
try {
const { rows } = await pool.query(`
SELECT DISTINCT state, COUNT(*) as store_count
FROM dispensaries
WHERE state IS NOT NULL
WHERE state IS NOT NULL AND crawl_enabled = true
GROUP BY state
ORDER BY state
`);

View File

@@ -278,7 +278,7 @@ router.post('/update-locations', requireRole('superadmin', 'admin'), async (req,
// Run in background
updateAllProxyLocations().catch(err => {
console.error('Location update failed:', err);
console.error('Location update failed:', err);
});
res.json({ message: 'Location update job started' });

View File

@@ -13,6 +13,12 @@ import {
TaskFilter,
} from '../tasks/task-service';
import { pool } from '../db/pool';
import {
isTaskPoolPaused,
pauseTaskPool,
resumeTaskPool,
getTaskPoolStatus,
} from '../tasks/task-pool-state';
const router = Router();
@@ -592,4 +598,42 @@ router.post('/migration/full-migrate', async (req: Request, res: Response) => {
}
});
/**
* GET /api/tasks/pool/status
* Check if task pool is paused
*/
router.get('/pool/status', async (_req: Request, res: Response) => {
const status = getTaskPoolStatus();
res.json({
success: true,
...status,
});
});
/**
* POST /api/tasks/pool/pause
* Pause the task pool - workers won't pick up new tasks
*/
router.post('/pool/pause', async (_req: Request, res: Response) => {
pauseTaskPool();
res.json({
success: true,
paused: true,
message: 'Task pool paused - workers will not pick up new tasks',
});
});
/**
* POST /api/tasks/pool/resume
* Resume the task pool - workers will pick up tasks again
*/
router.post('/pool/resume', async (_req: Request, res: Response) => {
resumeTaskPool();
res.json({
success: true,
paused: false,
message: 'Task pool resumed - workers will pick up new tasks',
});
});
export default router;

View File

@@ -70,21 +70,20 @@ router.post('/register', async (req: Request, res: Response) => {
);
if (existing.rows.length > 0) {
// Re-activate existing worker
// Re-activate existing worker - keep existing pod_name (fantasy name), don't overwrite with K8s name
const { rows } = await pool.query(`
UPDATE worker_registry
SET status = 'active',
role = $1,
pod_name = $2,
hostname = $3,
ip_address = $4,
hostname = $2,
ip_address = $3,
last_heartbeat_at = NOW(),
started_at = NOW(),
metadata = $5,
metadata = $4,
updated_at = NOW()
WHERE worker_id = $6
RETURNING id, worker_id, friendly_name, role
`, [role, pod_name, finalHostname, clientIp, metadata, finalWorkerId]);
WHERE worker_id = $5
RETURNING id, worker_id, friendly_name, pod_name, role
`, [role, finalHostname, clientIp, metadata, finalWorkerId]);
const worker = rows[0];
const roleMsg = role ? `for ${role}` : 'as role-agnostic';
@@ -105,13 +104,13 @@ router.post('/register', async (req: Request, res: Response) => {
const nameResult = await pool.query('SELECT assign_worker_name($1) as name', [finalWorkerId]);
const friendlyName = nameResult.rows[0].name;
// Register the worker
// Register the worker - use friendlyName as pod_name (not K8s name)
const { rows } = await pool.query(`
INSERT INTO worker_registry (
worker_id, friendly_name, role, pod_name, hostname, ip_address, status, metadata
) VALUES ($1, $2, $3, $4, $5, $6, 'active', $7)
RETURNING id, worker_id, friendly_name, role
`, [finalWorkerId, friendlyName, role, pod_name, finalHostname, clientIp, metadata]);
RETURNING id, worker_id, friendly_name, pod_name, role
`, [finalWorkerId, friendlyName, role, friendlyName, finalHostname, clientIp, metadata]);
const worker = rows[0];
const roleMsg = role ? `for ${role}` : 'as role-agnostic';
@@ -138,17 +137,36 @@ router.post('/register', async (req: Request, res: Response) => {
*
* Body:
* - worker_id: string (required)
* - current_task_id: number (optional) - task currently being processed
* - current_task_id: number (optional) - task currently being processed (primary task)
* - current_task_ids: number[] (optional) - all tasks currently being processed (concurrent)
* - active_task_count: number (optional) - number of tasks currently running
* - max_concurrent_tasks: number (optional) - max concurrent tasks this worker can handle
* - status: string (optional) - 'active', 'idle'
* - resources: object (optional) - memory_mb, cpu_user_ms, cpu_system_ms, etc.
*/
router.post('/heartbeat', async (req: Request, res: Response) => {
try {
const { worker_id, current_task_id, status = 'active', resources } = req.body;
const {
worker_id,
current_task_id,
current_task_ids,
active_task_count,
max_concurrent_tasks,
status = 'active',
resources
} = req.body;
if (!worker_id) {
return res.status(400).json({ success: false, error: 'worker_id is required' });
}
// Build metadata object with all the new fields
const metadata: Record<string, unknown> = {};
if (resources) Object.assign(metadata, resources);
if (current_task_ids) metadata.current_task_ids = current_task_ids;
if (active_task_count !== undefined) metadata.active_task_count = active_task_count;
if (max_concurrent_tasks !== undefined) metadata.max_concurrent_tasks = max_concurrent_tasks;
// Store resources in metadata jsonb column
const { rows } = await pool.query(`
UPDATE worker_registry
@@ -159,7 +177,7 @@ router.post('/heartbeat', async (req: Request, res: Response) => {
updated_at = NOW()
WHERE worker_id = $3
RETURNING id, friendly_name, status
`, [current_task_id || null, status, worker_id, resources ? JSON.stringify(resources) : null]);
`, [current_task_id || null, status, worker_id, Object.keys(metadata).length > 0 ? JSON.stringify(metadata) : null]);
if (rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found - please register first' });
@@ -330,12 +348,27 @@ router.get('/workers', async (req: Request, res: Response) => {
tasks_completed,
tasks_failed,
current_task_id,
-- Concurrent task fields from metadata
(metadata->>'current_task_ids')::jsonb as current_task_ids,
(metadata->>'active_task_count')::int as active_task_count,
(metadata->>'max_concurrent_tasks')::int as max_concurrent_tasks,
-- Decommission fields
COALESCE(decommission_requested, false) as decommission_requested,
decommission_reason,
-- Preflight fields (dual-transport verification)
curl_ip,
http_ip,
preflight_status,
preflight_at,
fingerprint_data,
-- Full metadata for resources
metadata,
EXTRACT(EPOCH FROM (NOW() - last_heartbeat_at)) as seconds_since_heartbeat,
CASE
WHEN status = 'offline' OR status = 'terminated' THEN status
WHEN last_heartbeat_at < NOW() - INTERVAL '2 minutes' THEN 'stale'
WHEN current_task_id IS NOT NULL THEN 'busy'
WHEN (metadata->>'active_task_count')::int > 0 THEN 'busy'
ELSE 'ready'
END as health_status,
created_at
@@ -672,4 +705,163 @@ router.get('/capacity', async (_req: Request, res: Response) => {
}
});
// ============================================================
// WORKER LIFECYCLE MANAGEMENT
// ============================================================
/**
* POST /api/worker-registry/workers/:workerId/decommission
* Request graceful decommission of a worker (will stop after current task)
*/
router.post('/workers/:workerId/decommission', async (req: Request, res: Response) => {
try {
const { workerId } = req.params;
const { reason, issued_by } = req.body;
// Update worker_registry to flag for decommission
const result = await pool.query(
`UPDATE worker_registry
SET decommission_requested = true,
decommission_reason = $2,
decommission_requested_at = NOW()
WHERE worker_id = $1
RETURNING friendly_name, status, current_task_id`,
[workerId, reason || 'Manual decommission from admin']
);
if (result.rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
const worker = result.rows[0];
// Also log to worker_commands for audit trail
await pool.query(
`INSERT INTO worker_commands (worker_id, command, reason, issued_by)
VALUES ($1, 'decommission', $2, $3)
ON CONFLICT DO NOTHING`,
[workerId, reason || 'Manual decommission', issued_by || 'admin']
).catch(() => {
// Table might not exist yet - ignore
});
res.json({
success: true,
message: worker.current_task_id
? `Worker ${worker.friendly_name} will stop after completing task #${worker.current_task_id}`
: `Worker ${worker.friendly_name} will stop on next poll`,
worker: {
friendly_name: worker.friendly_name,
status: worker.status,
current_task_id: worker.current_task_id,
decommission_requested: true
}
});
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/worker-registry/workers/:workerId/cancel-decommission
* Cancel a pending decommission request
*/
router.post('/workers/:workerId/cancel-decommission', async (req: Request, res: Response) => {
try {
const { workerId } = req.params;
const result = await pool.query(
`UPDATE worker_registry
SET decommission_requested = false,
decommission_reason = NULL,
decommission_requested_at = NULL
WHERE worker_id = $1
RETURNING friendly_name`,
[workerId]
);
if (result.rows.length === 0) {
return res.status(404).json({ success: false, error: 'Worker not found' });
}
res.json({
success: true,
message: `Decommission cancelled for ${result.rows[0].friendly_name}`
});
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* POST /api/worker-registry/spawn
* Spawn a new worker in the current pod (only works in multi-worker-per-pod mode)
* For now, this is a placeholder - actual spawning requires the pod supervisor
*/
router.post('/spawn', async (req: Request, res: Response) => {
try {
const { pod_name, role } = req.body;
// For now, we can't actually spawn workers from the API
// This would require a supervisor process in each pod that listens for spawn commands
// Instead, return instructions for how to scale
res.json({
success: false,
error: 'Direct worker spawning not yet implemented',
instructions: 'To add workers, scale the K8s deployment: kubectl scale deployment/scraper-worker --replicas=N'
});
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
/**
* GET /api/worker-registry/pods
* Get workers grouped by pod
*/
router.get('/pods', async (_req: Request, res: Response) => {
try {
const { rows } = await pool.query(`
SELECT
COALESCE(pod_name, 'Unknown') as pod_name,
COUNT(*) as worker_count,
COUNT(*) FILTER (WHERE current_task_id IS NOT NULL) as busy_count,
COUNT(*) FILTER (WHERE current_task_id IS NULL) as idle_count,
SUM(tasks_completed) as total_completed,
SUM(tasks_failed) as total_failed,
SUM((metadata->>'memory_rss_mb')::int) as total_memory_mb,
array_agg(json_build_object(
'worker_id', worker_id,
'friendly_name', friendly_name,
'status', status,
'current_task_id', current_task_id,
'tasks_completed', tasks_completed,
'tasks_failed', tasks_failed,
'decommission_requested', COALESCE(decommission_requested, false),
'last_heartbeat_at', last_heartbeat_at
)) as workers
FROM worker_registry
WHERE status NOT IN ('offline', 'terminated')
GROUP BY pod_name
ORDER BY pod_name
`);
res.json({
success: true,
pods: rows.map(row => ({
pod_name: row.pod_name,
worker_count: parseInt(row.worker_count),
busy_count: parseInt(row.busy_count),
idle_count: parseInt(row.idle_count),
total_completed: parseInt(row.total_completed) || 0,
total_failed: parseInt(row.total_failed) || 0,
total_memory_mb: parseInt(row.total_memory_mb) || 0,
workers: row.workers
}))
});
} catch (error: any) {
res.status(500).json({ success: false, error: error.message });
}
});
export default router;

View File

@@ -35,7 +35,7 @@ const router = Router();
// ============================================================
const K8S_NAMESPACE = process.env.K8S_NAMESPACE || 'dispensary-scraper';
const K8S_STATEFULSET_NAME = process.env.K8S_WORKER_STATEFULSET || 'scraper-worker';
const K8S_DEPLOYMENT_NAME = process.env.K8S_WORKER_DEPLOYMENT || 'scraper-worker';
// Initialize K8s client - uses in-cluster config when running in K8s,
// or kubeconfig when running locally
@@ -70,7 +70,7 @@ function getK8sClient(): k8s.AppsV1Api | null {
/**
* GET /api/workers/k8s/replicas - Get current worker replica count
* Returns current and desired replica counts from the StatefulSet
* Returns current and desired replica counts from the Deployment
*/
router.get('/k8s/replicas', async (_req: Request, res: Response) => {
const client = getK8sClient();
@@ -84,21 +84,21 @@ router.get('/k8s/replicas', async (_req: Request, res: Response) => {
}
try {
const response = await client.readNamespacedStatefulSet({
name: K8S_STATEFULSET_NAME,
const response = await client.readNamespacedDeployment({
name: K8S_DEPLOYMENT_NAME,
namespace: K8S_NAMESPACE,
});
const statefulSet = response;
const deployment = response;
res.json({
success: true,
replicas: {
current: statefulSet.status?.readyReplicas || 0,
desired: statefulSet.spec?.replicas || 0,
available: statefulSet.status?.availableReplicas || 0,
updated: statefulSet.status?.updatedReplicas || 0,
current: deployment.status?.readyReplicas || 0,
desired: deployment.spec?.replicas || 0,
available: deployment.status?.availableReplicas || 0,
updated: deployment.status?.updatedReplicas || 0,
},
statefulset: K8S_STATEFULSET_NAME,
deployment: K8S_DEPLOYMENT_NAME,
namespace: K8S_NAMESPACE,
});
} catch (err: any) {
@@ -112,7 +112,7 @@ router.get('/k8s/replicas', async (_req: Request, res: Response) => {
/**
* POST /api/workers/k8s/scale - Scale worker replicas
* Body: { replicas: number } - desired replica count (1-20)
* Body: { replicas: number } - desired replica count (0-20)
*/
router.post('/k8s/scale', async (req: Request, res: Response) => {
const client = getK8sClient();
@@ -136,21 +136,21 @@ router.post('/k8s/scale', async (req: Request, res: Response) => {
try {
// Get current state first
const currentResponse = await client.readNamespacedStatefulSetScale({
name: K8S_STATEFULSET_NAME,
const currentResponse = await client.readNamespacedDeploymentScale({
name: K8S_DEPLOYMENT_NAME,
namespace: K8S_NAMESPACE,
});
const currentReplicas = currentResponse.spec?.replicas || 0;
// Update scale using replaceNamespacedStatefulSetScale
await client.replaceNamespacedStatefulSetScale({
name: K8S_STATEFULSET_NAME,
// Update scale using replaceNamespacedDeploymentScale
await client.replaceNamespacedDeploymentScale({
name: K8S_DEPLOYMENT_NAME,
namespace: K8S_NAMESPACE,
body: {
apiVersion: 'autoscaling/v1',
kind: 'Scale',
metadata: {
name: K8S_STATEFULSET_NAME,
name: K8S_DEPLOYMENT_NAME,
namespace: K8S_NAMESPACE,
},
spec: {
@@ -159,14 +159,14 @@ router.post('/k8s/scale', async (req: Request, res: Response) => {
},
});
console.log(`[Workers] Scaled ${K8S_STATEFULSET_NAME} from ${currentReplicas} to ${replicas} replicas`);
console.log(`[Workers] Scaled ${K8S_DEPLOYMENT_NAME} from ${currentReplicas} to ${replicas} replicas`);
res.json({
success: true,
message: `Scaled from ${currentReplicas} to ${replicas} replicas`,
previous: currentReplicas,
desired: replicas,
statefulset: K8S_STATEFULSET_NAME,
deployment: K8S_DEPLOYMENT_NAME,
namespace: K8S_NAMESPACE,
});
} catch (err: any) {
@@ -178,6 +178,73 @@ router.post('/k8s/scale', async (req: Request, res: Response) => {
}
});
/**
* POST /api/workers/k8s/scale-up - Scale up worker replicas by 1
* Convenience endpoint for adding a single worker
*/
router.post('/k8s/scale-up', async (_req: Request, res: Response) => {
const client = getK8sClient();
if (!client) {
return res.status(503).json({
success: false,
error: 'K8s client not available (not running in cluster or no kubeconfig)',
});
}
try {
// Get current replica count
const currentResponse = await client.readNamespacedDeploymentScale({
name: K8S_DEPLOYMENT_NAME,
namespace: K8S_NAMESPACE,
});
const currentReplicas = currentResponse.spec?.replicas || 0;
const newReplicas = currentReplicas + 1;
// Cap at 20 replicas
if (newReplicas > 20) {
return res.status(400).json({
success: false,
error: 'Maximum replica count (20) reached',
});
}
// Scale up by 1
await client.replaceNamespacedDeploymentScale({
name: K8S_DEPLOYMENT_NAME,
namespace: K8S_NAMESPACE,
body: {
apiVersion: 'autoscaling/v1',
kind: 'Scale',
metadata: {
name: K8S_DEPLOYMENT_NAME,
namespace: K8S_NAMESPACE,
},
spec: {
replicas: newReplicas,
},
},
});
console.log(`[Workers] Scaled up ${K8S_DEPLOYMENT_NAME} from ${currentReplicas} to ${newReplicas} replicas`);
res.json({
success: true,
message: `Added worker (${currentReplicas}${newReplicas} replicas)`,
previous: currentReplicas,
desired: newReplicas,
deployment: K8S_DEPLOYMENT_NAME,
namespace: K8S_NAMESPACE,
});
} catch (err: any) {
console.error('[Workers] K8s scale-up error:', err.body?.message || err.message);
res.status(500).json({
success: false,
error: err.body?.message || err.message,
});
}
});
// ============================================================
// STATIC ROUTES (must come before parameterized routes)
// ============================================================

View File

@@ -683,6 +683,118 @@ export class CrawlRotator {
const current = this.proxy.getCurrent();
return current?.timezone;
}
/**
* Preflight check - verifies proxy and anti-detect are working
* MUST be called before any task execution to ensure anonymity.
*
* Tests:
* 1. Proxy available - a proxy must be loaded and active
* 2. Proxy connectivity - makes HTTP request through proxy to verify connection
* 3. Anti-detect headers - verifies fingerprint is set with required headers
*
* @returns Promise<PreflightResult> with pass/fail status and details
*/
async preflight(): Promise<PreflightResult> {
const result: PreflightResult = {
passed: false,
proxyAvailable: false,
proxyConnected: false,
antidetectReady: false,
proxyIp: null,
fingerprint: null,
error: null,
responseTimeMs: null,
};
// Step 1: Check proxy is available
const currentProxy = this.proxy.getCurrent();
if (!currentProxy) {
result.error = 'No proxy available';
console.log('[Preflight] FAILED - No proxy available');
return result;
}
result.proxyAvailable = true;
result.proxyIp = currentProxy.host;
// Step 2: Check fingerprint/anti-detect is ready
const fingerprint = this.userAgent.getCurrent();
if (!fingerprint || !fingerprint.userAgent) {
result.error = 'Anti-detect fingerprint not initialized';
console.log('[Preflight] FAILED - No fingerprint');
return result;
}
result.antidetectReady = true;
result.fingerprint = {
userAgent: fingerprint.userAgent,
browserName: fingerprint.browserName,
deviceCategory: fingerprint.deviceCategory,
};
// Step 3: Test proxy connectivity with an actual HTTP request
// Use httpbin.org/ip to verify request goes through proxy
const proxyUrl = this.proxy.getProxyUrl(currentProxy);
const testUrl = 'https://httpbin.org/ip';
try {
const { default: axios } = await import('axios');
const { HttpsProxyAgent } = await import('https-proxy-agent');
const agent = new HttpsProxyAgent(proxyUrl);
const startTime = Date.now();
const response = await axios.get(testUrl, {
httpsAgent: agent,
timeout: 15000, // 15 second timeout
headers: {
'User-Agent': fingerprint.userAgent,
'Accept-Language': fingerprint.acceptLanguage,
...(fingerprint.secChUa && { 'sec-ch-ua': fingerprint.secChUa }),
...(fingerprint.secChUaPlatform && { 'sec-ch-ua-platform': fingerprint.secChUaPlatform }),
...(fingerprint.secChUaMobile && { 'sec-ch-ua-mobile': fingerprint.secChUaMobile }),
},
});
result.responseTimeMs = Date.now() - startTime;
result.proxyConnected = true;
result.passed = true;
// Mark success on proxy stats
await this.proxy.markSuccess(currentProxy.id, result.responseTimeMs);
console.log(`[Preflight] PASSED - Proxy ${currentProxy.host} connected (${result.responseTimeMs}ms), UA: ${fingerprint.browserName}/${fingerprint.deviceCategory}`);
} catch (err: any) {
result.error = `Proxy connection failed: ${err.message || 'Unknown error'}`;
console.log(`[Preflight] FAILED - Proxy connection error: ${err.message}`);
// Mark failure on proxy stats
await this.proxy.markFailed(currentProxy.id, err.message);
}
return result;
}
}
/**
* Result from preflight check
*/
export interface PreflightResult {
/** Overall pass/fail */
passed: boolean;
/** Step 1: Is a proxy loaded? */
proxyAvailable: boolean;
/** Step 2: Did HTTP request through proxy succeed? */
proxyConnected: boolean;
/** Step 3: Is fingerprint/anti-detect ready? */
antidetectReady: boolean;
/** Current proxy IP */
proxyIp: string | null;
/** Fingerprint summary */
fingerprint: { userAgent: string; browserName: string; deviceCategory: string } | null;
/** Error message if failed */
error: string | null;
/** Proxy response time in ms */
responseTimeMs: number | null;
}
// ============================================================

View File

@@ -0,0 +1,100 @@
/**
* Curl Preflight - Verify curl/axios transport works through proxy
*
* Tests:
* 1. Proxy is available and active
* 2. HTTP request through proxy succeeds
* 3. Anti-detect headers are properly set
*
* Use case: Fast, simple API requests that don't need browser fingerprint
*/
import axios from 'axios';
import { HttpsProxyAgent } from 'https-proxy-agent';
import { CrawlRotator, PreflightResult } from './crawl-rotator';
export interface CurlPreflightResult extends PreflightResult {
method: 'curl';
}
/**
* Run curl preflight check
* Tests proxy connectivity using axios/curl through the proxy
*/
export async function runCurlPreflight(
crawlRotator: CrawlRotator
): Promise<CurlPreflightResult> {
const result: CurlPreflightResult = {
method: 'curl',
passed: false,
proxyAvailable: false,
proxyConnected: false,
antidetectReady: false,
proxyIp: null,
fingerprint: null,
error: null,
responseTimeMs: null,
};
// Step 1: Check proxy is available
const currentProxy = crawlRotator.proxy.getCurrent();
if (!currentProxy) {
result.error = 'No proxy available';
console.log('[CurlPreflight] FAILED - No proxy available');
return result;
}
result.proxyAvailable = true;
result.proxyIp = currentProxy.host;
// Step 2: Check fingerprint/anti-detect is ready
const fingerprint = crawlRotator.userAgent.getCurrent();
if (!fingerprint || !fingerprint.userAgent) {
result.error = 'Anti-detect fingerprint not initialized';
console.log('[CurlPreflight] FAILED - No fingerprint');
return result;
}
result.antidetectReady = true;
result.fingerprint = {
userAgent: fingerprint.userAgent,
browserName: fingerprint.browserName,
deviceCategory: fingerprint.deviceCategory,
};
// Step 3: Test proxy connectivity with an actual HTTP request
const proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy);
const testUrl = 'https://httpbin.org/ip';
try {
const agent = new HttpsProxyAgent(proxyUrl);
const startTime = Date.now();
const response = await axios.get(testUrl, {
httpsAgent: agent,
timeout: 15000, // 15 second timeout
headers: {
'User-Agent': fingerprint.userAgent,
'Accept-Language': fingerprint.acceptLanguage,
...(fingerprint.secChUa && { 'sec-ch-ua': fingerprint.secChUa }),
...(fingerprint.secChUaPlatform && { 'sec-ch-ua-platform': fingerprint.secChUaPlatform }),
...(fingerprint.secChUaMobile && { 'sec-ch-ua-mobile': fingerprint.secChUaMobile }),
},
});
result.responseTimeMs = Date.now() - startTime;
result.proxyConnected = true;
result.passed = true;
// Mark success on proxy stats
await crawlRotator.proxy.markSuccess(currentProxy.id, result.responseTimeMs);
console.log(`[CurlPreflight] PASSED - Proxy ${currentProxy.host} connected (${result.responseTimeMs}ms), UA: ${fingerprint.browserName}/${fingerprint.deviceCategory}`);
} catch (err: any) {
result.error = `Proxy connection failed: ${err.message || 'Unknown error'}`;
console.log(`[CurlPreflight] FAILED - Proxy connection error: ${err.message}`);
// Mark failure on proxy stats
await crawlRotator.proxy.markFailed(currentProxy.id, err.message);
}
return result;
}

View File

@@ -0,0 +1,399 @@
/**
* Puppeteer Preflight - Verify browser-based transport works with anti-detect
*
* Uses Puppeteer + StealthPlugin to:
* 1. Launch headless browser with stealth mode + PROXY
* 2. Visit fingerprint.com demo to verify anti-detect and confirm proxy IP
* 3. Establish session by visiting Dutchie embedded menu
* 4. Make GraphQL request from browser context
* 5. Verify we get a valid response (not blocked)
*
* Use case: Anti-detect scraping that needs real browser fingerprint through proxy
*
* Based on test-intercept.js which successfully captures 1000+ products
*/
import { PreflightResult, CrawlRotator } from './crawl-rotator';
// GraphQL hash for FilteredProducts query - MUST match CLAUDE.md
const FILTERED_PRODUCTS_HASH = 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0';
// Test dispensary - AZ-Deeply-Rooted (known working)
const TEST_CNAME = 'AZ-Deeply-Rooted';
const TEST_PLATFORM_ID = '6405ef617056e8014d79101b';
// Anti-detect verification sites (primary + fallback)
const FINGERPRINT_DEMO_URL = 'https://demo.fingerprint.com/';
const AMIUNIQUE_URL = 'https://amiunique.org/fingerprint';
export interface PuppeteerPreflightResult extends PreflightResult {
method: 'http';
/** Number of products returned (proves API access) */
productsReturned?: number;
/** Browser user agent used */
browserUserAgent?: string;
/** Bot detection result from fingerprint.com */
botDetection?: {
detected: boolean;
probability?: number;
type?: string;
};
/** Expected proxy IP (from pool) */
expectedProxyIp?: string;
/** Whether IP verification passed (detected IP matches proxy) */
ipVerified?: boolean;
}
/**
* Run Puppeteer preflight check with proxy
* Tests browser-based access with anti-detect verification via fingerprint.com
*
* @param crawlRotator - CrawlRotator instance to get proxy from pool
*/
export async function runPuppeteerPreflight(
crawlRotator?: CrawlRotator
): Promise<PuppeteerPreflightResult> {
const result: PuppeteerPreflightResult = {
method: 'http',
passed: false,
proxyAvailable: false,
proxyConnected: false,
antidetectReady: false,
proxyIp: null,
fingerprint: null,
error: null,
responseTimeMs: null,
productsReturned: 0,
ipVerified: false,
};
let browser: any = null;
try {
// Step 0: Get a proxy from the pool
let proxyUrl: string | null = null;
let expectedProxyHost: string | null = null;
if (crawlRotator) {
const currentProxy = crawlRotator.proxy.getCurrent();
if (currentProxy) {
result.proxyAvailable = true;
proxyUrl = crawlRotator.proxy.getProxyUrl(currentProxy);
expectedProxyHost = currentProxy.host;
result.expectedProxyIp = expectedProxyHost;
console.log(`[PuppeteerPreflight] Using proxy: ${currentProxy.host}:${currentProxy.port}`);
} else {
result.error = 'No proxy available from pool';
console.log(`[PuppeteerPreflight] FAILED - No proxy available`);
return result;
}
} else {
console.log(`[PuppeteerPreflight] WARNING: No CrawlRotator provided - using direct connection`);
result.proxyAvailable = true; // No proxy needed for direct
}
// Dynamic imports to avoid loading Puppeteer unless needed
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
puppeteer.use(StealthPlugin());
const startTime = Date.now();
// Build browser args
const browserArgs = ['--no-sandbox', '--disable-setuid-sandbox'];
if (proxyUrl) {
// Extract host:port for Puppeteer (it handles auth separately)
const proxyUrlParsed = new URL(proxyUrl);
browserArgs.push(`--proxy-server=${proxyUrlParsed.host}`);
}
// Launch browser with stealth + proxy
browser = await puppeteer.launch({
headless: 'new',
args: browserArgs,
});
const page = await browser.newPage();
// If proxy has auth, set it up
if (proxyUrl) {
const proxyUrlParsed = new URL(proxyUrl);
if (proxyUrlParsed.username && proxyUrlParsed.password) {
await page.authenticate({
username: decodeURIComponent(proxyUrlParsed.username),
password: decodeURIComponent(proxyUrlParsed.password),
});
}
}
// Get browser user agent
const userAgent = await page.evaluate(() => navigator.userAgent);
result.browserUserAgent = userAgent;
result.fingerprint = {
userAgent,
browserName: 'Chrome (Puppeteer)',
deviceCategory: 'desktop',
};
// =========================================================================
// STEP 1: Visit fingerprint.com demo to verify anti-detect and get IP
// =========================================================================
console.log(`[PuppeteerPreflight] Testing anti-detect at ${FINGERPRINT_DEMO_URL}...`);
try {
await page.goto(FINGERPRINT_DEMO_URL, {
waitUntil: 'networkidle2',
timeout: 30000,
});
result.proxyConnected = true; // If we got here, proxy is working
// Wait for fingerprint results to load
await page.waitForSelector('[data-test="visitor-id"]', { timeout: 10000 }).catch(() => {});
// Extract fingerprint data from the page
const fingerprintData = await page.evaluate(() => {
// Try to find the IP address displayed on the page
const ipElement = document.querySelector('[data-test="ip-address"]');
const ip = ipElement?.textContent?.trim() || null;
// Try to find bot detection info
const botElement = document.querySelector('[data-test="bot-detected"]');
const botDetected = botElement?.textContent?.toLowerCase().includes('true') || false;
// Try to find visitor ID (proves fingerprinting worked)
const visitorIdElement = document.querySelector('[data-test="visitor-id"]');
const visitorId = visitorIdElement?.textContent?.trim() || null;
// Alternative: look for common UI patterns if data-test attrs not present
let detectedIp = ip;
if (!detectedIp) {
// Look for IP in any element containing IP-like pattern
const allText = document.body.innerText;
const ipMatch = allText.match(/\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b/);
detectedIp = ipMatch ? ipMatch[1] : null;
}
return {
ip: detectedIp,
botDetected,
visitorId,
pageLoaded: !!document.body,
};
});
if (fingerprintData.ip) {
result.proxyIp = fingerprintData.ip;
console.log(`[PuppeteerPreflight] Detected IP: ${fingerprintData.ip}`);
// Verify IP matches expected proxy
if (expectedProxyHost) {
// Check if detected IP contains the proxy host (or is close match)
if (fingerprintData.ip === expectedProxyHost ||
expectedProxyHost.includes(fingerprintData.ip) ||
fingerprintData.ip.includes(expectedProxyHost.split('.').slice(0, 3).join('.'))) {
result.ipVerified = true;
console.log(`[PuppeteerPreflight] IP VERIFIED - matches proxy`);
} else {
console.log(`[PuppeteerPreflight] IP mismatch: expected ${expectedProxyHost}, got ${fingerprintData.ip}`);
// Don't fail - residential proxies often show different egress IPs
}
}
}
if (fingerprintData.visitorId) {
console.log(`[PuppeteerPreflight] Fingerprint visitor ID: ${fingerprintData.visitorId}`);
}
result.botDetection = {
detected: fingerprintData.botDetected,
};
if (fingerprintData.botDetected) {
console.log(`[PuppeteerPreflight] WARNING: Bot detection triggered!`);
} else {
console.log(`[PuppeteerPreflight] Anti-detect check: NOT detected as bot`);
result.antidetectReady = true;
}
} catch (fpErr: any) {
// Could mean proxy connection failed
console.log(`[PuppeteerPreflight] Fingerprint.com check failed: ${fpErr.message}`);
if (fpErr.message.includes('net::ERR_PROXY') || fpErr.message.includes('ECONNREFUSED')) {
result.error = `Proxy connection failed: ${fpErr.message}`;
return result;
}
// Try fallback: amiunique.org
console.log(`[PuppeteerPreflight] Trying fallback: ${AMIUNIQUE_URL}...`);
try {
await page.goto(AMIUNIQUE_URL, {
waitUntil: 'networkidle2',
timeout: 30000,
});
result.proxyConnected = true;
// Extract IP from amiunique.org page
const amiData = await page.evaluate(() => {
const allText = document.body.innerText;
const ipMatch = allText.match(/\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b/);
return {
ip: ipMatch ? ipMatch[1] : null,
pageLoaded: !!document.body,
};
});
if (amiData.ip) {
result.proxyIp = amiData.ip;
console.log(`[PuppeteerPreflight] Detected IP via amiunique.org: ${amiData.ip}`);
}
result.antidetectReady = true;
console.log(`[PuppeteerPreflight] amiunique.org fallback succeeded`);
} catch (amiErr: any) {
console.log(`[PuppeteerPreflight] amiunique.org fallback also failed: ${amiErr.message}`);
// Continue with Dutchie test anyway
result.proxyConnected = true;
result.antidetectReady = true;
}
}
// =========================================================================
// STEP 2: Test Dutchie API access (the real test)
// =========================================================================
const embedUrl = `https://dutchie.com/embedded-menu/${TEST_CNAME}?menuType=rec`;
console.log(`[PuppeteerPreflight] Establishing session at ${embedUrl}...`);
await page.goto(embedUrl, {
waitUntil: 'networkidle2',
timeout: 30000,
});
// Make GraphQL request from browser context
const graphqlResult = await page.evaluate(
async (platformId: string, hash: string) => {
try {
const variables = {
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: platformId,
pricingType: 'rec',
Status: 'Active', // CRITICAL: Must be 'Active' per CLAUDE.md
types: [],
useCache: true,
isDefaultSort: true,
sortBy: 'popularSortIdx',
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false,
},
page: 0,
perPage: 10, // Just need a few to prove it works
};
const extensions = {
persistedQuery: {
version: 1,
sha256Hash: hash,
},
};
const qs = new URLSearchParams({
operationName: 'FilteredProducts',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions),
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const sessionId = 'preflight-' + Date.now();
const response = await fetch(url, {
method: 'GET',
headers: {
Accept: 'application/json',
'content-type': 'application/json',
'x-dutchie-session': sessionId,
'apollographql-client-name': 'Marketplace (production)',
},
credentials: 'include',
});
if (!response.ok) {
return { error: `HTTP ${response.status}`, products: 0 };
}
const json = await response.json();
if (json.errors) {
return { error: JSON.stringify(json.errors).slice(0, 200), products: 0 };
}
const products = json?.data?.filteredProducts?.products || [];
return { error: null, products: products.length };
} catch (err: any) {
return { error: err.message || 'Unknown error', products: 0 };
}
},
TEST_PLATFORM_ID,
FILTERED_PRODUCTS_HASH
);
result.responseTimeMs = Date.now() - startTime;
if (graphqlResult.error) {
result.error = `GraphQL error: ${graphqlResult.error}`;
console.log(`[PuppeteerPreflight] FAILED - ${result.error}`);
} else if (graphqlResult.products === 0) {
result.error = 'GraphQL returned 0 products';
console.log(`[PuppeteerPreflight] FAILED - No products returned`);
} else {
result.passed = true;
result.productsReturned = graphqlResult.products;
console.log(
`[PuppeteerPreflight] PASSED - Got ${graphqlResult.products} products in ${result.responseTimeMs}ms`
);
if (result.proxyIp) {
console.log(`[PuppeteerPreflight] Browser IP via proxy: ${result.proxyIp}`);
}
}
} catch (err: any) {
result.error = `Browser error: ${err.message || 'Unknown error'}`;
console.log(`[PuppeteerPreflight] FAILED - ${result.error}`);
} finally {
if (browser) {
await browser.close().catch(() => {});
}
}
return result;
}
/**
* Run Puppeteer preflight with retry
* Retries once on failure to handle transient issues
*
* @param crawlRotator - CrawlRotator instance to get proxy from pool
* @param maxRetries - Number of retry attempts (default 1)
*/
export async function runPuppeteerPreflightWithRetry(
crawlRotator?: CrawlRotator,
maxRetries: number = 1
): Promise<PuppeteerPreflightResult> {
let lastResult: PuppeteerPreflightResult | null = null;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
if (attempt > 0) {
console.log(`[PuppeteerPreflight] Retry attempt ${attempt}/${maxRetries}...`);
await new Promise((r) => setTimeout(r, 5000)); // Wait 5s between retries
}
lastResult = await runPuppeteerPreflight(crawlRotator);
if (lastResult.passed) {
return lastResult;
}
}
return lastResult!;
}

View File

@@ -1,566 +1,30 @@
/**
* System API Routes
* System API Routes (Stub)
*
* Provides REST API endpoints for system monitoring and control:
* - /api/system/sync/* - Sync orchestrator
* - /api/system/dlq/* - Dead-letter queue
* - /api/system/integrity/* - Integrity checks
* - /api/system/fix/* - Auto-fix routines
* - /api/system/alerts/* - System alerts
* - /metrics - Prometheus metrics
* The full system routes depend on SyncOrchestrator which was moved to _deprecated.
* This stub provides empty routers to maintain backward compatibility.
*
* Phase 5: Full Production Sync + Monitoring
* Full implementation available at: src/_deprecated/system/routes/index.ts
*/
import { Router, Request, Response } from 'express';
import { Pool } from 'pg';
import {
SyncOrchestrator,
MetricsService,
DLQService,
AlertService,
IntegrityService,
AutoFixService,
} from '../services';
import { MetricsService } from '../services';
export function createSystemRouter(pool: Pool): Router {
export function createSystemRouter(_pool: Pool): Router {
const router = Router();
// Initialize services
const metrics = new MetricsService(pool);
const dlq = new DLQService(pool);
const alerts = new AlertService(pool);
const integrity = new IntegrityService(pool, alerts);
const autoFix = new AutoFixService(pool, alerts);
const orchestrator = new SyncOrchestrator(pool, metrics, dlq, alerts);
// ============================================================
// SYNC ORCHESTRATOR ENDPOINTS
// ============================================================
/**
* GET /api/system/sync/status
* Get current sync status
*/
router.get('/sync/status', async (_req: Request, res: Response) => {
try {
const status = await orchestrator.getStatus();
res.json(status);
} catch (error) {
console.error('[System] Sync status error:', error);
res.status(500).json({ error: 'Failed to get sync status' });
}
});
/**
* POST /api/system/sync/run
* Trigger a sync run
*/
router.post('/sync/run', async (req: Request, res: Response) => {
try {
const triggeredBy = req.body.triggeredBy || 'api';
const result = await orchestrator.runSync();
res.json({
success: true,
triggeredBy,
metrics: result,
});
} catch (error) {
console.error('[System] Sync run error:', error);
res.status(500).json({
success: false,
error: error instanceof Error ? error.message : 'Sync run failed',
});
}
});
/**
* GET /api/system/sync/queue-depth
* Get queue depth information
*/
router.get('/sync/queue-depth', async (_req: Request, res: Response) => {
try {
const depth = await orchestrator.getQueueDepth();
res.json(depth);
} catch (error) {
console.error('[System] Queue depth error:', error);
res.status(500).json({ error: 'Failed to get queue depth' });
}
});
/**
* GET /api/system/sync/health
* Get sync health status
*/
router.get('/sync/health', async (_req: Request, res: Response) => {
try {
const health = await orchestrator.getHealth();
res.status(health.healthy ? 200 : 503).json(health);
} catch (error) {
console.error('[System] Health check error:', error);
res.status(500).json({ healthy: false, error: 'Health check failed' });
}
});
/**
* POST /api/system/sync/pause
* Pause the orchestrator
*/
router.post('/sync/pause', async (req: Request, res: Response) => {
try {
const reason = req.body.reason || 'Manual pause';
await orchestrator.pause(reason);
res.json({ success: true, message: 'Orchestrator paused' });
} catch (error) {
console.error('[System] Pause error:', error);
res.status(500).json({ error: 'Failed to pause orchestrator' });
}
});
/**
* POST /api/system/sync/resume
* Resume the orchestrator
*/
router.post('/sync/resume', async (_req: Request, res: Response) => {
try {
await orchestrator.resume();
res.json({ success: true, message: 'Orchestrator resumed' });
} catch (error) {
console.error('[System] Resume error:', error);
res.status(500).json({ error: 'Failed to resume orchestrator' });
}
});
// ============================================================
// DLQ ENDPOINTS
// ============================================================
/**
* GET /api/system/dlq
* List DLQ payloads
*/
router.get('/dlq', async (req: Request, res: Response) => {
try {
const options = {
status: req.query.status as string,
errorType: req.query.errorType as string,
dispensaryId: req.query.dispensaryId ? parseInt(req.query.dispensaryId as string) : undefined,
limit: req.query.limit ? parseInt(req.query.limit as string) : 50,
offset: req.query.offset ? parseInt(req.query.offset as string) : 0,
};
const result = await dlq.listPayloads(options);
res.json(result);
} catch (error) {
console.error('[System] DLQ list error:', error);
res.status(500).json({ error: 'Failed to list DLQ payloads' });
}
});
/**
* GET /api/system/dlq/stats
* Get DLQ statistics
*/
router.get('/dlq/stats', async (_req: Request, res: Response) => {
try {
const stats = await dlq.getStats();
res.json(stats);
} catch (error) {
console.error('[System] DLQ stats error:', error);
res.status(500).json({ error: 'Failed to get DLQ stats' });
}
});
/**
* GET /api/system/dlq/summary
* Get DLQ summary by error type
*/
router.get('/dlq/summary', async (_req: Request, res: Response) => {
try {
const summary = await dlq.getSummary();
res.json(summary);
} catch (error) {
console.error('[System] DLQ summary error:', error);
res.status(500).json({ error: 'Failed to get DLQ summary' });
}
});
/**
* GET /api/system/dlq/:id
* Get a specific DLQ payload
*/
router.get('/dlq/:id', async (req: Request, res: Response) => {
try {
const payload = await dlq.getPayload(req.params.id);
if (!payload) {
return res.status(404).json({ error: 'Payload not found' });
}
res.json(payload);
} catch (error) {
console.error('[System] DLQ get error:', error);
res.status(500).json({ error: 'Failed to get DLQ payload' });
}
});
/**
* POST /api/system/dlq/:id/retry
* Retry a DLQ payload
*/
router.post('/dlq/:id/retry', async (req: Request, res: Response) => {
try {
const result = await dlq.retryPayload(req.params.id);
if (result.success) {
res.json(result);
} else {
res.status(400).json(result);
}
} catch (error) {
console.error('[System] DLQ retry error:', error);
res.status(500).json({ error: 'Failed to retry payload' });
}
});
/**
* POST /api/system/dlq/:id/abandon
* Abandon a DLQ payload
*/
router.post('/dlq/:id/abandon', async (req: Request, res: Response) => {
try {
const reason = req.body.reason || 'Manually abandoned';
const abandonedBy = req.body.abandonedBy || 'api';
const success = await dlq.abandonPayload(req.params.id, reason, abandonedBy);
res.json({ success });
} catch (error) {
console.error('[System] DLQ abandon error:', error);
res.status(500).json({ error: 'Failed to abandon payload' });
}
});
/**
* POST /api/system/dlq/bulk-retry
* Bulk retry payloads by error type
*/
router.post('/dlq/bulk-retry', async (req: Request, res: Response) => {
try {
const { errorType } = req.body;
if (!errorType) {
return res.status(400).json({ error: 'errorType is required' });
}
const result = await dlq.bulkRetryByErrorType(errorType);
res.json(result);
} catch (error) {
console.error('[System] DLQ bulk retry error:', error);
res.status(500).json({ error: 'Failed to bulk retry' });
}
});
// ============================================================
// INTEGRITY CHECK ENDPOINTS
// ============================================================
/**
* POST /api/system/integrity/run
* Run all integrity checks
*/
router.post('/integrity/run', async (req: Request, res: Response) => {
try {
const triggeredBy = req.body.triggeredBy || 'api';
const result = await integrity.runAllChecks(triggeredBy);
res.json(result);
} catch (error) {
console.error('[System] Integrity run error:', error);
res.status(500).json({ error: 'Failed to run integrity checks' });
}
});
/**
* GET /api/system/integrity/runs
* Get recent integrity check runs
*/
router.get('/integrity/runs', async (req: Request, res: Response) => {
try {
const limit = req.query.limit ? parseInt(req.query.limit as string) : 10;
const runs = await integrity.getRecentRuns(limit);
res.json(runs);
} catch (error) {
console.error('[System] Integrity runs error:', error);
res.status(500).json({ error: 'Failed to get integrity runs' });
}
});
/**
* GET /api/system/integrity/runs/:runId
* Get results for a specific integrity run
*/
router.get('/integrity/runs/:runId', async (req: Request, res: Response) => {
try {
const results = await integrity.getRunResults(req.params.runId);
res.json(results);
} catch (error) {
console.error('[System] Integrity run results error:', error);
res.status(500).json({ error: 'Failed to get run results' });
}
});
// ============================================================
// AUTO-FIX ENDPOINTS
// ============================================================
/**
* GET /api/system/fix/routines
* Get available fix routines
*/
router.get('/fix/routines', (_req: Request, res: Response) => {
try {
const routines = autoFix.getAvailableRoutines();
res.json(routines);
} catch (error) {
console.error('[System] Get routines error:', error);
res.status(500).json({ error: 'Failed to get routines' });
}
});
/**
* POST /api/system/fix/:routine
* Run a fix routine
*/
router.post('/fix/:routine', async (req: Request, res: Response) => {
try {
const routineName = req.params.routine;
const dryRun = req.body.dryRun === true;
const triggeredBy = req.body.triggeredBy || 'api';
const result = await autoFix.runRoutine(routineName as any, triggeredBy, { dryRun });
res.json(result);
} catch (error) {
console.error('[System] Fix routine error:', error);
res.status(500).json({ error: 'Failed to run fix routine' });
}
});
/**
* GET /api/system/fix/runs
* Get recent fix runs
*/
router.get('/fix/runs', async (req: Request, res: Response) => {
try {
const limit = req.query.limit ? parseInt(req.query.limit as string) : 20;
const runs = await autoFix.getRecentRuns(limit);
res.json(runs);
} catch (error) {
console.error('[System] Fix runs error:', error);
res.status(500).json({ error: 'Failed to get fix runs' });
}
});
// ============================================================
// ALERTS ENDPOINTS
// ============================================================
/**
* GET /api/system/alerts
* List alerts
*/
router.get('/alerts', async (req: Request, res: Response) => {
try {
const options = {
status: req.query.status as any,
severity: req.query.severity as any,
type: req.query.type as string,
limit: req.query.limit ? parseInt(req.query.limit as string) : 50,
offset: req.query.offset ? parseInt(req.query.offset as string) : 0,
};
const result = await alerts.listAlerts(options);
res.json(result);
} catch (error) {
console.error('[System] Alerts list error:', error);
res.status(500).json({ error: 'Failed to list alerts' });
}
});
/**
* GET /api/system/alerts/active
* Get active alerts
*/
router.get('/alerts/active', async (_req: Request, res: Response) => {
try {
const activeAlerts = await alerts.getActiveAlerts();
res.json(activeAlerts);
} catch (error) {
console.error('[System] Active alerts error:', error);
res.status(500).json({ error: 'Failed to get active alerts' });
}
});
/**
* GET /api/system/alerts/summary
* Get alert summary
*/
router.get('/alerts/summary', async (_req: Request, res: Response) => {
try {
const summary = await alerts.getSummary();
res.json(summary);
} catch (error) {
console.error('[System] Alerts summary error:', error);
res.status(500).json({ error: 'Failed to get alerts summary' });
}
});
/**
* POST /api/system/alerts/:id/acknowledge
* Acknowledge an alert
*/
router.post('/alerts/:id/acknowledge', async (req: Request, res: Response) => {
try {
const alertId = parseInt(req.params.id);
const acknowledgedBy = req.body.acknowledgedBy || 'api';
const success = await alerts.acknowledgeAlert(alertId, acknowledgedBy);
res.json({ success });
} catch (error) {
console.error('[System] Acknowledge alert error:', error);
res.status(500).json({ error: 'Failed to acknowledge alert' });
}
});
/**
* POST /api/system/alerts/:id/resolve
* Resolve an alert
*/
router.post('/alerts/:id/resolve', async (req: Request, res: Response) => {
try {
const alertId = parseInt(req.params.id);
const resolvedBy = req.body.resolvedBy || 'api';
const success = await alerts.resolveAlert(alertId, resolvedBy);
res.json({ success });
} catch (error) {
console.error('[System] Resolve alert error:', error);
res.status(500).json({ error: 'Failed to resolve alert' });
}
});
/**
* POST /api/system/alerts/bulk-acknowledge
* Bulk acknowledge alerts
*/
router.post('/alerts/bulk-acknowledge', async (req: Request, res: Response) => {
try {
const { ids, acknowledgedBy } = req.body;
if (!ids || !Array.isArray(ids)) {
return res.status(400).json({ error: 'ids array is required' });
}
const count = await alerts.bulkAcknowledge(ids, acknowledgedBy || 'api');
res.json({ acknowledged: count });
} catch (error) {
console.error('[System] Bulk acknowledge error:', error);
res.status(500).json({ error: 'Failed to bulk acknowledge' });
}
});
// ============================================================
// METRICS ENDPOINTS
// ============================================================
/**
* GET /api/system/metrics
* Get all current metrics
*/
router.get('/metrics', async (_req: Request, res: Response) => {
try {
const allMetrics = await metrics.getAllMetrics();
res.json(allMetrics);
} catch (error) {
console.error('[System] Metrics error:', error);
res.status(500).json({ error: 'Failed to get metrics' });
}
});
/**
* GET /api/system/metrics/:name
* Get a specific metric
*/
router.get('/metrics/:name', async (req: Request, res: Response) => {
try {
const metric = await metrics.getMetric(req.params.name);
if (!metric) {
return res.status(404).json({ error: 'Metric not found' });
}
res.json(metric);
} catch (error) {
console.error('[System] Metric error:', error);
res.status(500).json({ error: 'Failed to get metric' });
}
});
/**
* GET /api/system/metrics/:name/history
* Get metric time series
*/
router.get('/metrics/:name/history', async (req: Request, res: Response) => {
try {
const hours = req.query.hours ? parseInt(req.query.hours as string) : 24;
const history = await metrics.getMetricHistory(req.params.name, hours);
res.json(history);
} catch (error) {
console.error('[System] Metric history error:', error);
res.status(500).json({ error: 'Failed to get metric history' });
}
});
/**
* GET /api/system/errors
* Get error summary
*/
router.get('/errors', async (_req: Request, res: Response) => {
try {
const summary = await metrics.getErrorSummary();
res.json(summary);
} catch (error) {
console.error('[System] Error summary error:', error);
res.status(500).json({ error: 'Failed to get error summary' });
}
});
/**
* GET /api/system/errors/recent
* Get recent errors
*/
router.get('/errors/recent', async (req: Request, res: Response) => {
try {
const limit = req.query.limit ? parseInt(req.query.limit as string) : 50;
const errorType = req.query.type as string;
const errors = await metrics.getRecentErrors(limit, errorType);
res.json(errors);
} catch (error) {
console.error('[System] Recent errors error:', error);
res.status(500).json({ error: 'Failed to get recent errors' });
}
});
/**
* POST /api/system/errors/acknowledge
* Acknowledge errors
*/
router.post('/errors/acknowledge', async (req: Request, res: Response) => {
try {
const { ids, acknowledgedBy } = req.body;
if (!ids || !Array.isArray(ids)) {
return res.status(400).json({ error: 'ids array is required' });
}
const count = await metrics.acknowledgeErrors(ids, acknowledgedBy || 'api');
res.json({ acknowledged: count });
} catch (error) {
console.error('[System] Acknowledge errors error:', error);
res.status(500).json({ error: 'Failed to acknowledge errors' });
}
// Stub - full sync/dlq/integrity/fix/alerts routes moved to _deprecated
router.get('/status', (_req: Request, res: Response) => {
res.json({
message: 'System routes temporarily disabled - see _deprecated/system/routes',
status: 'stub',
});
});
return router;
}
/**
* Create Prometheus metrics endpoint (standalone)
*/
export function createPrometheusRouter(pool: Pool): Router {
const router = Router();
const metrics = new MetricsService(pool);

View File

@@ -4,7 +4,7 @@
* Phase 5: Full Production Sync + Monitoring
*/
export { SyncOrchestrator, type SyncStatus, type QueueDepth, type SyncRunMetrics, type OrchestratorStatus } from './sync-orchestrator';
// SyncOrchestrator moved to _deprecated (depends on hydration module)
export { MetricsService, ERROR_TYPES, type Metric, type MetricTimeSeries, type ErrorBucket, type ErrorType } from './metrics';
export { DLQService, type DLQPayload, type DLQStats } from './dlq';
export { AlertService, type SystemAlert, type AlertSummary, type AlertSeverity, type AlertStatus } from './alerts';

View File

@@ -4,8 +4,9 @@
* Exports all task handlers for the task worker.
*/
export { handleProductRefresh } from './product-refresh';
export { handleProductDiscovery } from './product-discovery';
export { handleProductRefresh } from './product-refresh';
export { handleStoreDiscovery } from './store-discovery';
export { handleEntryPointDiscovery } from './entry-point-discovery';
export { handleAnalyticsRefresh } from './analytics-refresh';
export { handleWhoami } from './whoami';

View File

@@ -25,7 +25,7 @@ export async function handleStoreDiscovery(ctx: TaskContext): Promise<TaskResult
try {
// Get states to discover
const statesResult = await pool.query(`
SELECT code FROM states WHERE active = true ORDER BY code
SELECT code FROM states WHERE is_active = true ORDER BY code
`);
const stateCodes = statesResult.rows.map(r => r.code);

View File

@@ -0,0 +1,80 @@
/**
* WhoAmI Handler
* Tests proxy connectivity and anti-detect by fetching public IP
* Reports: proxy IP, fingerprint info, and connection status
*/
import { TaskContext, TaskResult } from '../task-worker';
import { execSync } from 'child_process';
export async function handleWhoami(ctx: TaskContext): Promise<TaskResult> {
const { pool, crawlRotator } = ctx;
console.log('[WhoAmI] Testing proxy and anti-detect...');
try {
// Use the preflight check which tests proxy + anti-detect
if (crawlRotator) {
const preflight = await crawlRotator.preflight();
if (!preflight.passed) {
return {
success: false,
error: preflight.error || 'Preflight check failed',
proxyAvailable: preflight.proxyAvailable,
proxyConnected: preflight.proxyConnected,
antidetectReady: preflight.antidetectReady,
};
}
console.log(`[WhoAmI] Proxy IP: ${preflight.proxyIp}, Response: ${preflight.responseTimeMs}ms`);
console.log(`[WhoAmI] Fingerprint: ${preflight.fingerprint?.browserName}/${preflight.fingerprint?.deviceCategory}`);
return {
success: true,
proxyIp: preflight.proxyIp,
responseTimeMs: preflight.responseTimeMs,
fingerprint: preflight.fingerprint,
proxyAvailable: preflight.proxyAvailable,
proxyConnected: preflight.proxyConnected,
antidetectReady: preflight.antidetectReady,
};
}
// Fallback: Direct proxy test without CrawlRotator
const proxyResult = await pool.query(`
SELECT host, port, username, password
FROM proxies
WHERE is_active = true
LIMIT 1
`);
if (proxyResult.rows.length === 0) {
return { success: false, error: 'No active proxy configured' };
}
const p = proxyResult.rows[0];
const proxyUrl = p.username
? `http://${p.username}:${p.password}@${p.host}:${p.port}`
: `http://${p.host}:${p.port}`;
console.log(`[WhoAmI] Using proxy: ${p.host}:${p.port}`);
// Fetch IP via proxy
const cmd = `curl -s --proxy '${proxyUrl}' 'https://api.ipify.org?format=json'`;
const output = execSync(cmd, { timeout: 30000 }).toString().trim();
const data = JSON.parse(output);
console.log(`[WhoAmI] Proxy IP: ${data.ip}`);
return {
success: true,
proxyIp: data.ip,
proxyHost: p.host,
proxyPort: p.port,
};
} catch (error: any) {
console.error('[WhoAmI] Error:', error.message);
return { success: false, error: error.message };
}
}

View File

@@ -17,8 +17,8 @@ export {
export { TaskWorker, TaskContext, TaskResult } from './task-worker';
export {
handleProductRefresh,
handleProductDiscovery,
handleProductRefresh,
handleStoreDiscovery,
handleEntryPointDiscovery,
handleAnalyticsRefresh,

View File

@@ -0,0 +1,37 @@
/**
* Task Pool State
*
* Shared state for task pool pause/resume functionality.
* This is kept separate to avoid circular dependencies between
* task-service.ts and routes/tasks.ts.
*
* State is in-memory and resets on server restart.
* By default, the pool is PAUSED (closed) - admin must explicitly start it.
* This prevents workers from immediately grabbing tasks on deploy before
* the system is ready.
*/
let taskPoolPaused = true;
export function isTaskPoolPaused(): boolean {
return taskPoolPaused;
}
export function pauseTaskPool(): void {
taskPoolPaused = true;
console.log('[TaskPool] Task pool PAUSED - workers will not pick up new tasks');
}
export function resumeTaskPool(): void {
taskPoolPaused = false;
console.log('[TaskPool] Task pool RESUMED - workers can pick up tasks');
}
export function getTaskPoolStatus(): { paused: boolean; message: string } {
return {
paused: taskPoolPaused,
message: taskPoolPaused
? 'Task pool is paused - workers will not pick up new tasks'
: 'Task pool is open - workers are picking up tasks',
};
}

View File

@@ -9,6 +9,7 @@
*/
import { pool } from '../db/pool';
import { isTaskPoolPaused } from './task-pool-state';
// Helper to check if a table exists
async function tableExists(tableName: string): Promise<boolean> {
@@ -23,14 +24,16 @@ async function tableExists(tableName: string): Promise<boolean> {
// Per TASK_WORKFLOW_2024-12-10.md: Task roles
// payload_fetch: Hits Dutchie API, saves raw payload to filesystem
// product_refresh: Reads local payload, normalizes, upserts to DB
// product_discovery: Main product crawl handler
// product_refresh: Legacy role (deprecated but kept for compatibility)
export type TaskRole =
| 'store_discovery'
| 'entry_point_discovery'
| 'product_discovery'
| 'payload_fetch' // NEW: Fetches from API, saves to disk
| 'product_refresh' // CHANGED: Now reads from local payload
| 'analytics_refresh';
| 'payload_fetch' // Fetches from API, saves to disk
| 'product_refresh' // DEPRECATED: Use product_discovery instead
| 'analytics_refresh'
| 'whoami'; // Tests proxy + anti-detect connectivity
export type TaskStatus =
| 'pending'
@@ -49,6 +52,7 @@ export interface WorkerTask {
platform: string | null;
status: TaskStatus;
priority: number;
method: 'curl' | 'http' | null; // Transport method: curl=axios/proxy, http=Puppeteer/browser
scheduled_for: Date | null;
worker_id: string | null;
claimed_at: Date | null;
@@ -149,18 +153,34 @@ class TaskService {
/**
* Claim a task atomically for a worker
* If role is null, claims ANY available task (role-agnostic worker)
* Returns null if task pool is paused.
*
* @param role - Task role to claim, or null for any task
* @param workerId - Worker ID claiming the task
* @param curlPassed - Whether worker passed curl preflight (default true for backward compat)
* @param httpPassed - Whether worker passed http/Puppeteer preflight (default false)
*/
async claimTask(role: TaskRole | null, workerId: string): Promise<WorkerTask | null> {
async claimTask(
role: TaskRole | null,
workerId: string,
curlPassed: boolean = true,
httpPassed: boolean = false
): Promise<WorkerTask | null> {
// Check if task pool is paused - don't claim any tasks
if (isTaskPoolPaused()) {
return null;
}
if (role) {
// Role-specific claiming - use the SQL function
// Role-specific claiming - use the SQL function with preflight capabilities
const result = await pool.query(
`SELECT * FROM claim_task($1, $2)`,
[role, workerId]
`SELECT * FROM claim_task($1, $2, $3, $4)`,
[role, workerId, curlPassed, httpPassed]
);
return (result.rows[0] as WorkerTask) || null;
}
// Role-agnostic claiming - claim ANY pending task
// Role-agnostic claiming - claim ANY pending task matching worker capabilities
const result = await pool.query(`
UPDATE worker_tasks
SET
@@ -171,6 +191,12 @@ class TaskService {
SELECT id FROM worker_tasks
WHERE status = 'pending'
AND (scheduled_for IS NULL OR scheduled_for <= NOW())
-- Method compatibility: worker must have passed the required preflight
AND (
method IS NULL -- No preference, any worker can claim
OR (method = 'curl' AND $2 = TRUE)
OR (method = 'http' AND $3 = TRUE)
)
-- Exclude stores that already have an active task
AND (dispensary_id IS NULL OR dispensary_id NOT IN (
SELECT dispensary_id FROM worker_tasks
@@ -182,7 +208,7 @@ class TaskService {
FOR UPDATE SKIP LOCKED
)
RETURNING *
`, [workerId]);
`, [workerId, curlPassed, httpPassed]);
return (result.rows[0] as WorkerTask) || null;
}
@@ -223,6 +249,24 @@ class TaskService {
);
}
/**
* Release a claimed task back to pending (e.g., when preflight fails)
* This allows another worker to pick it up.
*/
async releaseTask(taskId: number): Promise<void> {
await pool.query(
`UPDATE worker_tasks
SET status = 'pending',
worker_id = NULL,
claimed_at = NULL,
started_at = NULL,
updated_at = NOW()
WHERE id = $1 AND status IN ('claimed', 'running')`,
[taskId]
);
console.log(`[TaskService] Task ${taskId} released back to pending`);
}
/**
* Mark a task as failed, with auto-retry if under max_retries
* Returns true if task was re-queued for retry, false if permanently failed

View File

@@ -51,6 +51,10 @@ import os from 'os';
import { CrawlRotator } from '../services/crawl-rotator';
import { setCrawlRotator } from '../platforms/dutchie';
// Dual-transport preflight system
import { runCurlPreflight, CurlPreflightResult } from '../services/curl-preflight';
import { runPuppeteerPreflightWithRetry, PuppeteerPreflightResult } from '../services/puppeteer-preflight';
// Task handlers by role
// Per TASK_WORKFLOW_2024-12-10.md: payload_fetch and product_refresh are now separate
import { handlePayloadFetch } from './handlers/payload-fetch';
@@ -59,16 +63,59 @@ import { handleProductDiscovery } from './handlers/product-discovery';
import { handleStoreDiscovery } from './handlers/store-discovery';
import { handleEntryPointDiscovery } from './handlers/entry-point-discovery';
import { handleAnalyticsRefresh } from './handlers/analytics-refresh';
import { handleWhoami } from './handlers/whoami';
const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS || '5000');
const HEARTBEAT_INTERVAL_MS = parseInt(process.env.HEARTBEAT_INTERVAL_MS || '30000');
const API_BASE_URL = process.env.API_BASE_URL || 'http://localhost:3010';
// =============================================================================
// CONCURRENT TASK PROCESSING SETTINGS
// =============================================================================
// Workers can process multiple tasks simultaneously using async I/O.
// This improves throughput for I/O-bound tasks (network calls, DB queries).
//
// Resource thresholds trigger "backoff" - the worker stops claiming new tasks
// but continues processing existing ones until resources return to normal.
//
// See: docs/WORKER_TASK_ARCHITECTURE.md#concurrent-task-processing
// =============================================================================
// Maximum number of tasks this worker will run concurrently
// Tune based on workload: I/O-bound tasks benefit from higher concurrency
const MAX_CONCURRENT_TASKS = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
// When heap memory usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
// Default 85% - gives headroom before OOM
const MEMORY_BACKOFF_THRESHOLD = parseFloat(process.env.MEMORY_BACKOFF_THRESHOLD || '0.85');
// Parse max heap size from NODE_OPTIONS (--max-old-space-size=1500)
// This is used as the denominator for memory percentage calculation
// V8's heapTotal is dynamic and stays small when idle, causing false high percentages
function getMaxHeapSizeMb(): number {
const nodeOptions = process.env.NODE_OPTIONS || '';
const match = nodeOptions.match(/--max-old-space-size=(\d+)/);
if (match) {
return parseInt(match[1], 10);
}
// Fallback: use 512MB if not specified
return 512;
}
const MAX_HEAP_SIZE_MB = getMaxHeapSizeMb();
// When CPU usage exceeds this threshold (as decimal 0.0-1.0), stop claiming new tasks
// Default 90% - allows some burst capacity
const CPU_BACKOFF_THRESHOLD = parseFloat(process.env.CPU_BACKOFF_THRESHOLD || '0.90');
// How long to wait (ms) when in backoff state before rechecking resources
const BACKOFF_DURATION_MS = parseInt(process.env.BACKOFF_DURATION_MS || '10000');
export interface TaskContext {
pool: Pool;
workerId: string;
task: WorkerTask;
heartbeat: () => Promise<void>;
crawlRotator?: CrawlRotator;
}
export interface TaskResult {
@@ -83,17 +130,38 @@ export interface TaskResult {
type TaskHandler = (ctx: TaskContext) => Promise<TaskResult>;
// Per TASK_WORKFLOW_2024-12-10.md: Handler registry
// payload_fetch: Fetches from Dutchie API, saves to disk, chains to product_refresh
// payload_fetch: Fetches from Dutchie API, saves to disk
// product_refresh: Reads local payload, normalizes, upserts to DB
// product_discovery: Main handler for product crawling
const TASK_HANDLERS: Record<TaskRole, TaskHandler> = {
payload_fetch: handlePayloadFetch, // NEW: API fetch -> disk
product_refresh: handleProductRefresh, // CHANGED: disk -> DB
payload_fetch: handlePayloadFetch, // API fetch -> disk
product_refresh: handleProductRefresh, // disk -> DB
product_discovery: handleProductDiscovery,
store_discovery: handleStoreDiscovery,
entry_point_discovery: handleEntryPointDiscovery,
analytics_refresh: handleAnalyticsRefresh,
whoami: handleWhoami, // Tests proxy + anti-detect
};
/**
* Resource usage stats reported to the registry and used for backoff decisions.
* These values are included in worker heartbeats and displayed in the UI.
*/
interface ResourceStats {
/** Current heap memory usage as decimal (0.0 to 1.0) */
memoryPercent: number;
/** Current heap used in MB */
memoryMb: number;
/** Total heap available in MB */
memoryTotalMb: number;
/** CPU usage percentage since last check (0 to 100) */
cpuPercent: number;
/** True if worker is currently in backoff state */
isBackingOff: boolean;
/** Reason for backoff (e.g., "Memory at 87.3% (threshold: 85%)") */
backoffReason: string | null;
}
export class TaskWorker {
private pool: Pool;
private workerId: string;
@@ -102,14 +170,125 @@ export class TaskWorker {
private isRunning: boolean = false;
private heartbeatInterval: NodeJS.Timeout | null = null;
private registryHeartbeatInterval: NodeJS.Timeout | null = null;
private currentTask: WorkerTask | null = null;
private crawlRotator: CrawlRotator;
// ==========================================================================
// CONCURRENT TASK TRACKING
// ==========================================================================
// activeTasks: Map of task ID -> task object for all currently running tasks
// taskPromises: Map of task ID -> Promise for cleanup when task completes
// maxConcurrentTasks: How many tasks this worker will run in parallel
// ==========================================================================
private activeTasks: Map<number, WorkerTask> = new Map();
private taskPromises: Map<number, Promise<void>> = new Map();
private maxConcurrentTasks: number = MAX_CONCURRENT_TASKS;
// ==========================================================================
// RESOURCE MONITORING FOR BACKOFF
// ==========================================================================
// CPU tracking uses differential measurement - we track last values and
// calculate percentage based on elapsed time since last check.
// ==========================================================================
private lastCpuUsage: { user: number; system: number } = { user: 0, system: 0 };
private lastCpuCheck: number = Date.now();
private isBackingOff: boolean = false;
private backoffReason: string | null = null;
// ==========================================================================
// DUAL-TRANSPORT PREFLIGHT STATUS
// ==========================================================================
// Workers run BOTH preflights on startup:
// - curl: axios/proxy transport - fast, for simple API calls
// - http: Puppeteer/browser transport - anti-detect, for Dutchie GraphQL
//
// Task claiming checks method compatibility - worker must have passed
// the preflight for the task's required method.
// ==========================================================================
private preflightCurlPassed: boolean = false;
private preflightHttpPassed: boolean = false;
private preflightCurlResult: CurlPreflightResult | null = null;
private preflightHttpResult: PuppeteerPreflightResult | null = null;
constructor(role: TaskRole | null = null, workerId?: string) {
this.pool = getPool();
this.role = role;
this.workerId = workerId || `worker-${uuidv4().slice(0, 8)}`;
this.crawlRotator = new CrawlRotator(this.pool);
// Initialize CPU tracking
const cpuUsage = process.cpuUsage();
this.lastCpuUsage = { user: cpuUsage.user, system: cpuUsage.system };
this.lastCpuCheck = Date.now();
}
/**
* Get current resource usage
* Memory percentage is calculated against MAX_HEAP_SIZE_MB (from --max-old-space-size)
* NOT against V8's dynamic heapTotal which stays small when idle
*/
private getResourceStats(): ResourceStats {
const memUsage = process.memoryUsage();
const heapUsedMb = memUsage.heapUsed / 1024 / 1024;
// Use MAX_HEAP_SIZE_MB as ceiling, not dynamic heapTotal
// V8's heapTotal stays small when idle (e.g., 36MB) causing false 95%+ readings
// With --max-old-space-size=1500, we should calculate against 1500MB
const memoryPercent = heapUsedMb / MAX_HEAP_SIZE_MB;
// Calculate CPU usage since last check
const cpuUsage = process.cpuUsage();
const now = Date.now();
const elapsed = now - this.lastCpuCheck;
let cpuPercent = 0;
if (elapsed > 0) {
const userDiff = (cpuUsage.user - this.lastCpuUsage.user) / 1000; // microseconds to ms
const systemDiff = (cpuUsage.system - this.lastCpuUsage.system) / 1000;
cpuPercent = ((userDiff + systemDiff) / elapsed) * 100;
}
// Update last values
this.lastCpuUsage = { user: cpuUsage.user, system: cpuUsage.system };
this.lastCpuCheck = now;
return {
memoryPercent,
memoryMb: Math.round(heapUsedMb),
memoryTotalMb: MAX_HEAP_SIZE_MB, // Use max-old-space-size, not dynamic heapTotal
cpuPercent: Math.min(100, cpuPercent), // Cap at 100%
isBackingOff: this.isBackingOff,
backoffReason: this.backoffReason,
};
}
/**
* Check if we should back off from taking new tasks
*/
private shouldBackOff(): { backoff: boolean; reason: string | null } {
const stats = this.getResourceStats();
if (stats.memoryPercent > MEMORY_BACKOFF_THRESHOLD) {
return { backoff: true, reason: `Memory at ${(stats.memoryPercent * 100).toFixed(1)}% (threshold: ${MEMORY_BACKOFF_THRESHOLD * 100}%)` };
}
if (stats.cpuPercent > CPU_BACKOFF_THRESHOLD * 100) {
return { backoff: true, reason: `CPU at ${stats.cpuPercent.toFixed(1)}% (threshold: ${CPU_BACKOFF_THRESHOLD * 100}%)` };
}
return { backoff: false, reason: null };
}
/**
* Get count of currently running tasks
*/
get activeTaskCount(): number {
return this.activeTasks.size;
}
/**
* Check if we can accept more tasks
*/
private canAcceptMoreTasks(): boolean {
return this.activeTasks.size < this.maxConcurrentTasks;
}
/**
@@ -117,40 +296,172 @@ export class TaskWorker {
* Called once on worker startup before processing any tasks.
*
* IMPORTANT: Proxies are REQUIRED. Workers will wait until proxies are available.
* Workers listen for PostgreSQL NOTIFY 'proxy_added' to wake up immediately when proxies are added.
*/
private async initializeStealth(): Promise<void> {
const MAX_WAIT_MINUTES = 60;
const RETRY_INTERVAL_MS = 30000; // 30 seconds
const maxAttempts = (MAX_WAIT_MINUTES * 60 * 1000) / RETRY_INTERVAL_MS;
const POLL_INTERVAL_MS = 30000; // 30 seconds fallback polling
const maxAttempts = (MAX_WAIT_MINUTES * 60 * 1000) / POLL_INTERVAL_MS;
let attempts = 0;
let notifyClient: any = null;
while (attempts < maxAttempts) {
try {
// Load proxies from database
await this.crawlRotator.initialize();
const stats = this.crawlRotator.proxy.getStats();
if (stats.activeProxies > 0) {
console.log(`[TaskWorker] Loaded ${stats.activeProxies} proxies (${stats.avgSuccessRate.toFixed(1)}% avg success rate)`);
// Wire rotator to Dutchie client - proxies will be used for ALL requests
setCrawlRotator(this.crawlRotator);
console.log(`[TaskWorker] Stealth initialized: ${this.crawlRotator.userAgent.getCount()} fingerprints, proxy REQUIRED for all requests`);
return;
}
attempts++;
console.log(`[TaskWorker] No active proxies available (attempt ${attempts}). Waiting ${RETRY_INTERVAL_MS / 1000}s for proxies to be added...`);
await this.sleep(RETRY_INTERVAL_MS);
} catch (error: any) {
attempts++;
console.log(`[TaskWorker] Error loading proxies (attempt ${attempts}): ${error.message}. Retrying in ${RETRY_INTERVAL_MS / 1000}s...`);
await this.sleep(RETRY_INTERVAL_MS);
}
// Set up PostgreSQL LISTEN for proxy notifications
try {
notifyClient = await this.pool.connect();
await notifyClient.query('LISTEN proxy_added');
console.log(`[TaskWorker] Listening for proxy_added notifications...`);
} catch (err: any) {
console.log(`[TaskWorker] Could not set up LISTEN (will poll): ${err.message}`);
}
throw new Error(`No active proxies available after waiting ${MAX_WAIT_MINUTES} minutes. Add proxies to the database.`);
// Create a promise that resolves when notified
let notifyResolve: (() => void) | null = null;
if (notifyClient) {
notifyClient.on('notification', (msg: any) => {
if (msg.channel === 'proxy_added') {
console.log(`[TaskWorker] Received proxy_added notification!`);
if (notifyResolve) notifyResolve();
}
});
}
try {
while (attempts < maxAttempts) {
try {
// Load proxies from database
await this.crawlRotator.initialize();
const stats = this.crawlRotator.proxy.getStats();
if (stats.activeProxies > 0) {
console.log(`[TaskWorker] Loaded ${stats.activeProxies} proxies (${stats.avgSuccessRate.toFixed(1)}% avg success rate)`);
// Wire rotator to Dutchie client - proxies will be used for ALL requests
setCrawlRotator(this.crawlRotator);
console.log(`[TaskWorker] Stealth initialized: ${this.crawlRotator.userAgent.getCount()} fingerprints, proxy REQUIRED for all requests`);
return;
}
attempts++;
console.log(`[TaskWorker] No active proxies available (attempt ${attempts}). Waiting for proxies...`);
// Wait for either notification or timeout
await new Promise<void>((resolve) => {
notifyResolve = resolve;
setTimeout(resolve, POLL_INTERVAL_MS);
});
} catch (error: any) {
attempts++;
console.log(`[TaskWorker] Error loading proxies (attempt ${attempts}): ${error.message}. Retrying...`);
await this.sleep(POLL_INTERVAL_MS);
}
}
throw new Error(`No active proxies available after waiting ${MAX_WAIT_MINUTES} minutes. Add proxies to the database.`);
} finally {
// Clean up LISTEN connection
if (notifyClient) {
try {
await notifyClient.query('UNLISTEN proxy_added');
notifyClient.release();
} catch {
// Ignore cleanup errors
}
}
}
}
/**
* Run dual-transport preflights on startup
* Tests both curl (axios/proxy) and http (Puppeteer/browser) transport methods.
* Results are reported to worker_registry and used for task claiming.
*
* NOTE: All current tasks require 'http' method, so http preflight must pass
* for the worker to claim any tasks. Curl preflight is for future use.
*/
private async runDualPreflights(): Promise<void> {
console.log(`[TaskWorker] Running dual-transport preflights...`);
// Run both preflights in parallel for efficiency
const [curlResult, httpResult] = await Promise.all([
runCurlPreflight(this.crawlRotator).catch((err): CurlPreflightResult => ({
method: 'curl',
passed: false,
proxyAvailable: false,
proxyConnected: false,
antidetectReady: false,
proxyIp: null,
fingerprint: null,
error: `Preflight error: ${err.message}`,
responseTimeMs: null,
})),
runPuppeteerPreflightWithRetry(this.crawlRotator, 1).catch((err): PuppeteerPreflightResult => ({
method: 'http',
passed: false,
proxyAvailable: false,
proxyConnected: false,
antidetectReady: false,
proxyIp: null,
fingerprint: null,
error: `Preflight error: ${err.message}`,
responseTimeMs: null,
productsReturned: 0,
})),
]);
// Store results
this.preflightCurlResult = curlResult;
this.preflightHttpResult = httpResult;
this.preflightCurlPassed = curlResult.passed;
this.preflightHttpPassed = httpResult.passed;
// Log results
console.log(`[TaskWorker] CURL preflight: ${curlResult.passed ? 'PASSED' : 'FAILED'}${curlResult.error ? ` - ${curlResult.error}` : ''}`);
console.log(`[TaskWorker] HTTP preflight: ${httpResult.passed ? 'PASSED' : 'FAILED'}${httpResult.error ? ` - ${httpResult.error}` : ''}`);
if (httpResult.passed && httpResult.productsReturned) {
console.log(`[TaskWorker] HTTP preflight returned ${httpResult.productsReturned} products from test store`);
}
// Report to worker_registry via API
await this.reportPreflightStatus();
// Since all tasks require 'http', warn if http preflight failed
if (!this.preflightHttpPassed) {
console.warn(`[TaskWorker] WARNING: HTTP preflight failed - this worker cannot claim any tasks!`);
console.warn(`[TaskWorker] Error: ${httpResult.error}`);
}
}
/**
* Report preflight status to worker_registry
*/
private async reportPreflightStatus(): Promise<void> {
try {
// Update worker_registry directly via SQL (more reliable than API)
await this.pool.query(`
SELECT update_worker_preflight($1, 'curl', $2, $3, $4)
`, [
this.workerId,
this.preflightCurlPassed ? 'passed' : 'failed',
this.preflightCurlResult?.responseTimeMs || null,
this.preflightCurlResult?.error || null,
]);
await this.pool.query(`
SELECT update_worker_preflight($1, 'http', $2, $3, $4)
`, [
this.workerId,
this.preflightHttpPassed ? 'passed' : 'failed',
this.preflightHttpResult?.responseTimeMs || null,
this.preflightHttpResult?.error || null,
]);
console.log(`[TaskWorker] Preflight status reported to worker_registry`);
} catch (err: any) {
// Non-fatal - worker can still function
console.warn(`[TaskWorker] Could not report preflight status: ${err.message}`);
}
}
/**
@@ -213,21 +524,32 @@ export class TaskWorker {
const memUsage = process.memoryUsage();
const cpuUsage = process.cpuUsage();
const proxyLocation = this.crawlRotator.getProxyLocation();
const resourceStats = this.getResourceStats();
// Get array of active task IDs
const activeTaskIds = Array.from(this.activeTasks.keys());
await fetch(`${API_BASE_URL}/api/worker-registry/heartbeat`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
worker_id: this.workerId,
current_task_id: this.currentTask?.id || null,
status: this.currentTask ? 'active' : 'idle',
current_task_id: activeTaskIds[0] || null, // Primary task for backwards compat
current_task_ids: activeTaskIds, // All active tasks
active_task_count: this.activeTasks.size,
max_concurrent_tasks: this.maxConcurrentTasks,
status: this.activeTasks.size > 0 ? 'active' : 'idle',
resources: {
memory_mb: Math.round(memUsage.heapUsed / 1024 / 1024),
memory_total_mb: Math.round(memUsage.heapTotal / 1024 / 1024),
memory_rss_mb: Math.round(memUsage.rss / 1024 / 1024),
memory_percent: Math.round(resourceStats.memoryPercent * 100),
cpu_user_ms: Math.round(cpuUsage.user / 1000),
cpu_system_ms: Math.round(cpuUsage.system / 1000),
cpu_percent: Math.round(resourceStats.cpuPercent),
proxy_location: proxyLocation,
is_backing_off: this.isBackingOff,
backoff_reason: this.backoffReason,
}
})
});
@@ -285,24 +607,119 @@ export class TaskWorker {
// Register with the API to get a friendly name
await this.register();
// Run dual-transport preflights
await this.runDualPreflights();
// Start registry heartbeat
this.startRegistryHeartbeat();
const roleMsg = this.role ? `for role: ${this.role}` : '(role-agnostic - any task)';
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg}`);
const preflightMsg = `curl=${this.preflightCurlPassed ? '✓' : '✗'} http=${this.preflightHttpPassed ? '✓' : '✗'}`;
console.log(`[TaskWorker] ${this.friendlyName} starting ${roleMsg} (${preflightMsg}, max ${this.maxConcurrentTasks} concurrent tasks)`);
while (this.isRunning) {
try {
await this.processNextTask();
await this.mainLoop();
} catch (error: any) {
console.error(`[TaskWorker] Loop error:`, error.message);
await this.sleep(POLL_INTERVAL_MS);
}
}
// Wait for any remaining tasks to complete
if (this.taskPromises.size > 0) {
console.log(`[TaskWorker] Waiting for ${this.taskPromises.size} active tasks to complete...`);
await Promise.allSettled(this.taskPromises.values());
}
console.log(`[TaskWorker] Worker ${this.workerId} stopped`);
}
/**
* Main loop - tries to fill up to maxConcurrentTasks
*/
private async mainLoop(): Promise<void> {
// Check resource usage and backoff if needed
const { backoff, reason } = this.shouldBackOff();
if (backoff) {
if (!this.isBackingOff) {
console.log(`[TaskWorker] ${this.friendlyName} backing off: ${reason}`);
}
this.isBackingOff = true;
this.backoffReason = reason;
await this.sleep(BACKOFF_DURATION_MS);
return;
}
// Clear backoff state
if (this.isBackingOff) {
console.log(`[TaskWorker] ${this.friendlyName} resuming normal operation`);
this.isBackingOff = false;
this.backoffReason = null;
}
// Check for decommission signal
const shouldDecommission = await this.checkDecommission();
if (shouldDecommission) {
console.log(`[TaskWorker] ${this.friendlyName} received decommission signal - waiting for ${this.activeTasks.size} tasks to complete`);
// Stop accepting new tasks, wait for current to finish
this.isRunning = false;
return;
}
// Try to claim more tasks if we have capacity
if (this.canAcceptMoreTasks()) {
// Pass preflight capabilities to only claim compatible tasks
const task = await taskService.claimTask(
this.role,
this.workerId,
this.preflightCurlPassed,
this.preflightHttpPassed
);
if (task) {
console.log(`[TaskWorker] ${this.friendlyName} claimed task ${task.id} (${task.role}) [${this.activeTasks.size + 1}/${this.maxConcurrentTasks}]`);
// =================================================================
// PREFLIGHT CHECK - CRITICAL: Worker MUST pass before task execution
// Verifies: 1) Proxy available 2) Proxy connected 3) Anti-detect ready
// =================================================================
const preflight = await this.crawlRotator.preflight();
if (!preflight.passed) {
console.log(`[TaskWorker] ${this.friendlyName} PREFLIGHT FAILED for task ${task.id}: ${preflight.error}`);
console.log(`[TaskWorker] Releasing task ${task.id} back to pending - worker cannot proceed without proxy/anti-detect`);
// Release task back to pending so another worker can pick it up
await taskService.releaseTask(task.id);
// Wait before trying again - give proxies time to recover
await this.sleep(30000); // 30 second wait on preflight failure
return;
}
console.log(`[TaskWorker] ${this.friendlyName} preflight PASSED for task ${task.id} (proxy: ${preflight.proxyIp}, ${preflight.responseTimeMs}ms)`);
this.activeTasks.set(task.id, task);
// Start task in background (don't await)
const taskPromise = this.executeTask(task);
this.taskPromises.set(task.id, taskPromise);
// Clean up when done
taskPromise.finally(() => {
this.activeTasks.delete(task.id);
this.taskPromises.delete(task.id);
});
// Immediately try to claim more tasks (don't wait for poll interval)
return;
}
}
// No task claimed or at capacity - wait before next poll
await this.sleep(POLL_INTERVAL_MS);
}
/**
* Stop the worker
*/
@@ -315,23 +732,10 @@ export class TaskWorker {
}
/**
* Process the next available task
* Execute a single task (runs concurrently with other tasks)
*/
private async processNextTask(): Promise<void> {
// Try to claim a task
const task = await taskService.claimTask(this.role, this.workerId);
if (!task) {
// No tasks available, wait and retry
await this.sleep(POLL_INTERVAL_MS);
return;
}
this.currentTask = task;
console.log(`[TaskWorker] Claimed task ${task.id} (${task.role}) for dispensary ${task.dispensary_id || 'N/A'}`);
// Start heartbeat
this.startHeartbeat(task.id);
private async executeTask(task: WorkerTask): Promise<void> {
console.log(`[TaskWorker] ${this.friendlyName} starting task ${task.id} (${task.role}) for dispensary ${task.dispensary_id || 'N/A'}`);
try {
// Mark as running
@@ -351,6 +755,7 @@ export class TaskWorker {
heartbeat: async () => {
await taskService.heartbeat(task.id);
},
crawlRotator: this.crawlRotator,
};
// Execute the task
@@ -360,7 +765,7 @@ export class TaskWorker {
// Mark as completed
await taskService.completeTask(task.id, result);
await this.reportTaskCompletion(true);
console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id}`);
console.log(`[TaskWorker] ${this.friendlyName} completed task ${task.id} [${this.activeTasks.size}/${this.maxConcurrentTasks} active]`);
// Chain next task if applicable
const chainedTask = await taskService.chainNextTask({
@@ -382,9 +787,35 @@ export class TaskWorker {
await taskService.failTask(task.id, error.message);
await this.reportTaskCompletion(false);
console.error(`[TaskWorker] ${this.friendlyName} task ${task.id} error:`, error.message);
} finally {
this.stopHeartbeat();
this.currentTask = null;
}
// Note: cleanup (removing from activeTasks) is handled in mainLoop's finally block
}
/**
* Check if this worker has been flagged for decommission
* Returns true if worker should stop after current task
*/
private async checkDecommission(): Promise<boolean> {
try {
// Check worker_registry for decommission flag
const result = await this.pool.query(
`SELECT decommission_requested, decommission_reason
FROM worker_registry
WHERE worker_id = $1`,
[this.workerId]
);
if (result.rows.length > 0 && result.rows[0].decommission_requested) {
const reason = result.rows[0].decommission_reason || 'No reason provided';
console.log(`[TaskWorker] Decommission requested: ${reason}`);
return true;
}
return false;
} catch (error: any) {
// If we can't check, continue running
console.warn(`[TaskWorker] Could not check decommission status: ${error.message}`);
return false;
}
}
@@ -421,12 +852,29 @@ export class TaskWorker {
/**
* Get worker info
*/
getInfo(): { workerId: string; role: TaskRole | null; isRunning: boolean; currentTaskId: number | null } {
getInfo(): {
workerId: string;
role: TaskRole | null;
isRunning: boolean;
activeTaskIds: number[];
activeTaskCount: number;
maxConcurrentTasks: number;
isBackingOff: boolean;
backoffReason: string | null;
preflightCurlPassed: boolean;
preflightHttpPassed: boolean;
} {
return {
workerId: this.workerId,
role: this.role,
isRunning: this.isRunning,
currentTaskId: this.currentTask?.id || null,
activeTaskIds: Array.from(this.activeTasks.keys()),
activeTaskCount: this.activeTasks.size,
maxConcurrentTasks: this.maxConcurrentTasks,
isBackingOff: this.isBackingOff,
backoffReason: this.backoffReason,
preflightCurlPassed: this.preflightCurlPassed,
preflightHttpPassed: this.preflightHttpPassed,
};
}
}
@@ -443,8 +891,8 @@ async function main(): Promise<void> {
'store_discovery',
'entry_point_discovery',
'product_discovery',
'payload_fetch', // NEW: Fetches from API, saves to disk
'product_refresh', // CHANGED: Reads from disk, processes to DB
'payload_fetch', // Fetches from API, saves to disk
'product_refresh', // Reads from disk, processes to DB
'analytics_refresh',
];

180
backend/test-intercept.js Normal file
View File

@@ -0,0 +1,180 @@
/**
* Stealth Browser Payload Capture - Direct GraphQL Injection
*
* Uses the browser session to make GraphQL requests that look organic.
* Adds proper headers matching what Dutchie's frontend sends.
*/
const puppeteer = require('puppeteer-extra');
const StealthPlugin = require('puppeteer-extra-plugin-stealth');
const fs = require('fs');
puppeteer.use(StealthPlugin());
async function capturePayload(config) {
const {
dispensaryId = null,
platformId,
cName,
outputPath = `/tmp/payload_${cName}_${Date.now()}.json`,
} = config;
const browser = await puppeteer.launch({
headless: 'new',
args: ['--no-sandbox', '--disable-setuid-sandbox']
});
const page = await browser.newPage();
// Establish session by visiting the embedded menu
const embedUrl = `https://dutchie.com/embedded-menu/${cName}?menuType=rec`;
console.log(`[Capture] Establishing session at ${embedUrl}...`);
await page.goto(embedUrl, {
waitUntil: 'networkidle2',
timeout: 60000
});
console.log('[Capture] Session established, fetching ALL products...');
// Fetch all products using GET requests with proper headers
const result = await page.evaluate(async (platformId, cName) => {
const allProducts = [];
const logs = [];
let pageNum = 0;
const perPage = 100;
let totalCount = 0;
const sessionId = 'browser-session-' + Date.now();
try {
while (pageNum < 30) { // Max 30 pages = 3000 products
const variables = {
includeEnterpriseSpecials: false,
productsFilter: {
dispensaryId: platformId,
pricingType: 'rec',
Status: 'Active', // 'Active' for in-stock products per CLAUDE.md
types: [],
useCache: true,
isDefaultSort: true,
sortBy: 'popularSortIdx',
sortDirection: 1,
bypassOnlineThresholds: true,
isKioskMenu: false,
removeProductsBelowOptionThresholds: false,
},
page: pageNum,
perPage: perPage,
};
const extensions = {
persistedQuery: {
version: 1,
sha256Hash: 'ee29c060826dc41c527e470e9ae502c9b2c169720faa0a9f5d25e1b9a530a4a0'
}
};
// Build GET URL like the browser does
const qs = new URLSearchParams({
operationName: 'FilteredProducts',
variables: JSON.stringify(variables),
extensions: JSON.stringify(extensions)
});
const url = `https://dutchie.com/api-3/graphql?${qs.toString()}`;
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'content-type': 'application/json',
'x-dutchie-session': sessionId,
'apollographql-client-name': 'Marketplace (production)',
},
credentials: 'include'
});
logs.push(`Page ${pageNum}: HTTP ${response.status}`);
if (!response.ok) {
const text = await response.text();
logs.push(`HTTP error: ${response.status} - ${text.slice(0, 200)}`);
break;
}
const json = await response.json();
if (json.errors) {
logs.push(`GraphQL error: ${JSON.stringify(json.errors).slice(0, 200)}`);
break;
}
const data = json?.data?.filteredProducts;
if (!data || !data.products) {
logs.push('No products in response');
break;
}
const products = data.products;
allProducts.push(...products);
if (pageNum === 0) {
totalCount = data.queryInfo?.totalCount || 0;
logs.push(`Total reported: ${totalCount}`);
}
logs.push(`Got ${products.length} products (total: ${allProducts.length}/${totalCount})`);
if (allProducts.length >= totalCount || products.length < perPage) {
break;
}
pageNum++;
// Small delay between pages to be polite
await new Promise(r => setTimeout(r, 200));
}
} catch (err) {
logs.push(`Error: ${err.message}`);
}
return { products: allProducts, totalCount, logs };
}, platformId, cName);
await browser.close();
// Print logs from browser context
result.logs.forEach(log => console.log(`[Browser] ${log}`));
console.log(`[Capture] Got ${result.products.length} products (API reported ${result.totalCount})`);
const payload = {
dispensaryId: dispensaryId,
platformId: platformId,
cName,
fetchedAt: new Date().toISOString(),
productCount: result.products.length,
products: result.products,
};
fs.writeFileSync(outputPath, JSON.stringify(payload, null, 2));
console.log(`\n=== Capture Complete ===`);
console.log(`Total products: ${result.products.length}`);
console.log(`Saved to: ${outputPath}`);
console.log(`File size: ${(fs.statSync(outputPath).size / 1024).toFixed(1)} KB`);
return payload;
}
// Run
(async () => {
const payload = await capturePayload({
cName: 'AZ-Deeply-Rooted',
platformId: '6405ef617056e8014d79101b',
});
if (payload.products.length > 0) {
const sample = payload.products[0];
console.log(`\nSample: ${sample.Name || sample.name} - ${sample.brand?.name || sample.brandName}`);
}
})().catch(console.error);

View File

@@ -14,5 +14,5 @@
"allowSyntheticDefaultImports": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "src/**/*.test.ts", "src/**/__tests__/**"]
"exclude": ["node_modules", "dist", "src/**/*.test.ts", "src/**/__tests__/**", "src/_deprecated/**"]
}

View File

@@ -7,8 +7,8 @@
<title>CannaIQ - Cannabis Menu Intelligence Platform</title>
<meta name="description" content="CannaIQ provides real-time cannabis dispensary menu data, product tracking, and analytics for dispensaries across Arizona." />
<meta name="keywords" content="cannabis, dispensary, menu, products, analytics, Arizona" />
<script type="module" crossorigin src="/assets/index-BML8-px1.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-B2gR-58G.css">
<script type="module" crossorigin src="/assets/index-Dq9S0rVi.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-DhM09B-d.css">
</head>
<body>
<div id="root"></div>

View File

@@ -2,7 +2,7 @@
<html lang="en">
<head>
<meta charset="UTF-8" />
<link rel="icon" type="image/svg+xml" href="/vite.svg" />
<link rel="icon" type="image/svg+xml" href="/favicon.svg" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>CannaIQ - Cannabis Menu Intelligence Platform</title>
<meta name="description" content="CannaIQ provides real-time cannabis dispensary menu data, product tracking, and analytics for dispensaries across Arizona." />

View File

@@ -0,0 +1,5 @@
<svg viewBox="0 0 32 32" xmlns="http://www.w3.org/2000/svg">
<rect width="32" height="32" rx="6" fill="#059669"/>
<path d="M16 6C12.5 6 9.5 7.5 7.5 10L16 16L24.5 10C22.5 7.5 19.5 6 16 6Z" fill="white"/>
<path d="M7.5 10C6 12 5 14.5 5 17C5 22.5 10 26 16 26C22 26 27 22.5 27 17C27 14.5 26 12 24.5 10L16 16L7.5 10Z" fill="white" fill-opacity="0.7"/>
</svg>

After

Width:  |  Height:  |  Size: 360 B

View File

@@ -8,6 +8,7 @@ import { ProductDetail } from './pages/ProductDetail';
import { Stores } from './pages/Stores';
import { Dispensaries } from './pages/Dispensaries';
import { DispensaryDetail } from './pages/DispensaryDetail';
import { DispensarySchedule } from './pages/DispensarySchedule';
import { StoreDetail } from './pages/StoreDetail';
import { StoreBrands } from './pages/StoreBrands';
import { StoreSpecials } from './pages/StoreSpecials';
@@ -46,7 +47,6 @@ import CrossStateCompare from './pages/CrossStateCompare';
import StateDetail from './pages/StateDetail';
import { Discovery } from './pages/Discovery';
import { WorkersDashboard } from './pages/WorkersDashboard';
import { JobQueue } from './pages/JobQueue';
import TasksDashboard from './pages/TasksDashboard';
import { ScraperOverviewDashboard } from './pages/ScraperOverviewDashboard';
import { SeoOrchestrator } from './pages/admin/seo/SeoOrchestrator';
@@ -66,6 +66,7 @@ export default function App() {
<Route path="/stores" element={<PrivateRoute><Stores /></PrivateRoute>} />
<Route path="/dispensaries" element={<PrivateRoute><Dispensaries /></PrivateRoute>} />
<Route path="/dispensaries/:state/:city/:slug" element={<PrivateRoute><DispensaryDetail /></PrivateRoute>} />
<Route path="/dispensaries/:state/:city/:slug/schedule" element={<PrivateRoute><DispensarySchedule /></PrivateRoute>} />
<Route path="/stores/:state/:storeName/:slug/brands" element={<PrivateRoute><StoreBrands /></PrivateRoute>} />
<Route path="/stores/:state/:storeName/:slug/specials" element={<PrivateRoute><StoreSpecials /></PrivateRoute>} />
<Route path="/stores/:state/:storeName/:slug" element={<PrivateRoute><StoreDetail /></PrivateRoute>} />
@@ -123,8 +124,6 @@ export default function App() {
<Route path="/discovery" element={<PrivateRoute><Discovery /></PrivateRoute>} />
{/* Workers Dashboard */}
<Route path="/workers" element={<PrivateRoute><WorkersDashboard /></PrivateRoute>} />
{/* Job Queue Management */}
<Route path="/job-queue" element={<PrivateRoute><JobQueue /></PrivateRoute>} />
{/* Task Queue Dashboard */}
<Route path="/tasks" element={<PrivateRoute><TasksDashboard /></PrivateRoute>} />
{/* Scraper Overview Dashboard (new primary) */}

View File

@@ -1,8 +1,7 @@
import { ReactNode, useEffect, useState } from 'react';
import { useNavigate, useLocation } from 'react-router-dom';
import { ReactNode, useEffect, useState, useRef } from 'react';
import { useNavigate, useLocation, Link } from 'react-router-dom';
import { useAuthStore } from '../store/authStore';
import { api } from '../lib/api';
import { StateSelector } from './StateSelector';
import {
LayoutDashboard,
Building2,
@@ -48,8 +47,8 @@ interface NavLinkProps {
function NavLink({ to, icon, label, isActive }: NavLinkProps) {
return (
<a
href={to}
<Link
to={to}
className={`flex items-center gap-3 px-3 py-2 rounded-lg text-sm font-medium transition-colors ${
isActive
? 'bg-emerald-50 text-emerald-700'
@@ -58,7 +57,7 @@ function NavLink({ to, icon, label, isActive }: NavLinkProps) {
>
<span className={`flex-shrink-0 ${isActive ? 'text-emerald-600' : 'text-gray-400'}`}>{icon}</span>
<span>{label}</span>
</a>
</Link>
);
}
@@ -86,6 +85,8 @@ export function Layout({ children }: LayoutProps) {
const { user, logout } = useAuthStore();
const [versionInfo, setVersionInfo] = useState<VersionInfo | null>(null);
const [sidebarOpen, setSidebarOpen] = useState(false);
const navRef = useRef<HTMLElement>(null);
const scrollPositionRef = useRef<number>(0);
useEffect(() => {
const fetchVersion = async () => {
@@ -111,16 +112,34 @@ export function Layout({ children }: LayoutProps) {
return location.pathname.startsWith(path);
};
// Close sidebar on route change (mobile)
// Save scroll position before route change
useEffect(() => {
const nav = navRef.current;
if (nav) {
const handleScroll = () => {
scrollPositionRef.current = nav.scrollTop;
};
nav.addEventListener('scroll', handleScroll);
return () => nav.removeEventListener('scroll', handleScroll);
}
}, []);
// Restore scroll position after route change and close mobile sidebar
useEffect(() => {
setSidebarOpen(false);
// Restore scroll position after render
requestAnimationFrame(() => {
if (navRef.current) {
navRef.current.scrollTop = scrollPositionRef.current;
}
});
}, [location.pathname]);
const sidebarContent = (
<>
{/* Logo/Brand */}
<div className="px-6 py-5 border-b border-gray-200">
<div className="flex items-center gap-3">
<Link to="/dashboard" className="flex items-center gap-3 hover:opacity-80 transition-opacity">
<div className="w-8 h-8 bg-emerald-600 rounded-lg flex items-center justify-center">
<svg viewBox="0 0 24 24" className="w-5 h-5 text-white" fill="currentColor">
<path d="M12 2C8.5 2 5.5 3.5 3.5 6L12 12L20.5 6C18.5 3.5 15.5 2 12 2Z" />
@@ -131,21 +150,17 @@ export function Layout({ children }: LayoutProps) {
<span className="text-lg font-bold text-gray-900">CannaIQ</span>
{versionInfo && (
<p className="text-xs text-gray-400">
v{versionInfo.version} ({versionInfo.git_sha}) {versionInfo.build_time !== 'unknown' && `- ${new Date(versionInfo.build_time).toLocaleDateString()}`}
{versionInfo.git_sha || 'dev'}
</p>
)}
</div>
</div>
</Link>
<p className="text-xs text-gray-500 mt-2 truncate">{user?.email}</p>
</div>
{/* State Selector */}
<div className="px-4 py-3 border-b border-gray-200 bg-gray-50">
<StateSelector showLabel={false} />
</div>
{/* Navigation */}
<nav className="flex-1 px-3 py-4 space-y-6 overflow-y-auto">
<nav ref={navRef} className="flex-1 px-3 py-4 space-y-6 overflow-y-auto">
<NavSection title="Main">
<NavLink to="/dashboard" icon={<LayoutDashboard className="w-4 h-4" />} label="Dashboard" isActive={isActive('/dashboard', true)} />
<NavLink to="/dispensaries" icon={<Building2 className="w-4 h-4" />} label="Dispensaries" isActive={isActive('/dispensaries')} />
@@ -164,8 +179,7 @@ export function Layout({ children }: LayoutProps) {
<NavLink to="/admin/orchestrator" icon={<Activity className="w-4 h-4" />} label="Orchestrator" isActive={isActive('/admin/orchestrator')} />
<NavLink to="/users" icon={<UserCog className="w-4 h-4" />} label="Users" isActive={isActive('/users')} />
<NavLink to="/workers" icon={<Users className="w-4 h-4" />} label="Workers" isActive={isActive('/workers')} />
<NavLink to="/job-queue" icon={<ListOrdered className="w-4 h-4" />} label="Job Queue" isActive={isActive('/job-queue')} />
<NavLink to="/tasks" icon={<ListChecks className="w-4 h-4" />} label="Task Queue" isActive={isActive('/tasks')} />
<NavLink to="/tasks" icon={<ListChecks className="w-4 h-4" />} label="Tasks" isActive={isActive('/tasks')} />
<NavLink to="/admin/seo" icon={<FileText className="w-4 h-4" />} label="SEO Pages" isActive={isActive('/admin/seo')} />
<NavLink to="/proxies" icon={<Shield className="w-4 h-4" />} label="Proxies" isActive={isActive('/proxies')} />
<NavLink to="/api-permissions" icon={<Key className="w-4 h-4" />} label="API Keys" isActive={isActive('/api-permissions')} />
@@ -214,7 +228,7 @@ export function Layout({ children }: LayoutProps) {
<button onClick={() => setSidebarOpen(true)} className="p-2 -ml-2 rounded-lg hover:bg-gray-100">
<Menu className="w-5 h-5 text-gray-600" />
</button>
<div className="flex items-center gap-2">
<Link to="/dashboard" className="flex items-center gap-2 hover:opacity-80 transition-opacity">
<div className="w-6 h-6 bg-emerald-600 rounded flex items-center justify-center">
<svg viewBox="0 0 24 24" className="w-4 h-4 text-white" fill="currentColor">
<path d="M12 2C8.5 2 5.5 3.5 3.5 6L12 12L20.5 6C18.5 3.5 15.5 2 12 2Z" />
@@ -222,7 +236,7 @@ export function Layout({ children }: LayoutProps) {
</svg>
</div>
<span className="font-semibold text-gray-900">CannaIQ</span>
</div>
</Link>
</div>
{/* Page content */}

View File

@@ -0,0 +1,138 @@
import { useState, useEffect, useRef } from 'react';
import { api } from '../lib/api';
import { Shield, X, Loader2 } from 'lucide-react';
interface PasswordConfirmModalProps {
isOpen: boolean;
onClose: () => void;
onConfirm: () => void;
title: string;
description: string;
}
export function PasswordConfirmModal({
isOpen,
onClose,
onConfirm,
title,
description,
}: PasswordConfirmModalProps) {
const [password, setPassword] = useState('');
const [error, setError] = useState('');
const [loading, setLoading] = useState(false);
const inputRef = useRef<HTMLInputElement>(null);
useEffect(() => {
if (isOpen) {
setPassword('');
setError('');
// Focus the input when modal opens
setTimeout(() => inputRef.current?.focus(), 100);
}
}, [isOpen]);
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
if (!password.trim()) {
setError('Password is required');
return;
}
setLoading(true);
setError('');
try {
const result = await api.verifyPassword(password);
if (result.verified) {
onConfirm();
onClose();
} else {
setError('Invalid password');
}
} catch (err: any) {
setError(err.message || 'Verification failed');
} finally {
setLoading(false);
}
};
if (!isOpen) return null;
return (
<div className="fixed inset-0 z-50 flex items-center justify-center">
{/* Backdrop */}
<div
className="absolute inset-0 bg-black bg-opacity-50"
onClick={onClose}
/>
{/* Modal */}
<div className="relative bg-white rounded-lg shadow-xl max-w-md w-full mx-4">
{/* Header */}
<div className="flex items-center justify-between px-6 py-4 border-b border-gray-200">
<div className="flex items-center gap-3">
<div className="p-2 bg-amber-100 rounded-lg">
<Shield className="w-5 h-5 text-amber-600" />
</div>
<h3 className="text-lg font-semibold text-gray-900">{title}</h3>
</div>
<button
onClick={onClose}
className="p-1 hover:bg-gray-100 rounded-lg transition-colors"
>
<X className="w-5 h-5 text-gray-500" />
</button>
</div>
{/* Body */}
<form onSubmit={handleSubmit}>
<div className="px-6 py-4">
<p className="text-gray-600 mb-4">{description}</p>
<div className="space-y-2">
<label
htmlFor="password"
className="block text-sm font-medium text-gray-700"
>
Enter your password to continue
</label>
<input
ref={inputRef}
type="password"
id="password"
value={password}
onChange={(e) => setPassword(e.target.value)}
className="w-full px-4 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-emerald-500 focus:border-emerald-500"
placeholder="Password"
disabled={loading}
/>
{error && (
<p className="text-sm text-red-600">{error}</p>
)}
</div>
</div>
{/* Footer */}
<div className="flex justify-end gap-3 px-6 py-4 border-t border-gray-200 bg-gray-50 rounded-b-lg">
<button
type="button"
onClick={onClose}
disabled={loading}
className="px-4 py-2 text-gray-700 hover:bg-gray-100 rounded-lg transition-colors"
>
Cancel
</button>
<button
type="submit"
disabled={loading}
className="px-4 py-2 bg-emerald-600 text-white rounded-lg hover:bg-emerald-700 transition-colors disabled:opacity-50 flex items-center gap-2"
>
{loading && <Loader2 className="w-4 h-4 animate-spin" />}
Confirm
</button>
</div>
</form>
</div>
</div>
);
}

View File

@@ -84,6 +84,13 @@ class ApiClient {
});
}
async verifyPassword(password: string) {
return this.request<{ verified: boolean; error?: string }>('/api/auth/verify-password', {
method: 'POST',
body: JSON.stringify({ password }),
});
}
async getMe() {
return this.request<{ user: any }>('/api/auth/me');
}
@@ -983,6 +990,47 @@ class ApiClient {
}>(`/api/markets/stores/${id}/categories`);
}
async getStoreCrawlHistory(id: number, limit = 50) {
return this.request<{
dispensary: {
id: number;
name: string;
dba_name: string | null;
slug: string;
state: string;
city: string;
menu_type: string | null;
platform_dispensary_id: string | null;
last_menu_scrape: string | null;
} | null;
history: Array<{
id: number;
runId: string | null;
profileKey: string | null;
crawlerModule: string | null;
stateAtStart: string | null;
stateAtEnd: string | null;
totalSteps: number;
durationMs: number | null;
success: boolean;
errorMessage: string | null;
productsFound: number | null;
startedAt: string | null;
completedAt: string | null;
}>;
nextSchedule: {
scheduleId: number;
jobName: string;
enabled: boolean;
baseIntervalMinutes: number;
jitterMinutes: number;
nextRunAt: string | null;
lastRunAt: string | null;
lastStatus: string | null;
} | null;
}>(`/api/markets/stores/${id}/crawl-history?limit=${limit}`);
}
// Global Brands/Categories (from v_brands/v_categories views)
async getMarketBrands(params?: { limit?: number; offset?: number }) {
const searchParams = new URLSearchParams();
@@ -1518,10 +1566,11 @@ class ApiClient {
}
// Intelligence API
async getIntelligenceBrands(params?: { limit?: number; offset?: number }) {
async getIntelligenceBrands(params?: { limit?: number; offset?: number; state?: string }) {
const searchParams = new URLSearchParams();
if (params?.limit) searchParams.append('limit', params.limit.toString());
if (params?.offset) searchParams.append('offset', params.offset.toString());
if (params?.state) searchParams.append('state', params.state);
const queryString = searchParams.toString() ? `?${searchParams.toString()}` : '';
return this.request<{
brands: Array<{
@@ -1536,7 +1585,10 @@ class ApiClient {
}>(`/api/admin/intelligence/brands${queryString}`);
}
async getIntelligencePricing() {
async getIntelligencePricing(params?: { state?: string }) {
const searchParams = new URLSearchParams();
if (params?.state) searchParams.append('state', params.state);
const queryString = searchParams.toString() ? `?${searchParams.toString()}` : '';
return this.request<{
byCategory: Array<{
category: string;
@@ -1552,7 +1604,7 @@ class ApiClient {
maxPrice: number;
totalProducts: number;
};
}>('/api/admin/intelligence/pricing');
}>(`/api/admin/intelligence/pricing${queryString}`);
}
async getIntelligenceStoreActivity(params?: { state?: string; chainId?: number; limit?: number }) {
@@ -2884,6 +2936,46 @@ class ApiClient {
`/api/tasks/store/${dispensaryId}/active`
);
}
// Task Pool Control
async getTaskPoolStatus() {
return this.request<{ success: boolean; paused: boolean; message: string }>(
'/api/tasks/pool/status'
);
}
async pauseTaskPool() {
return this.request<{ success: boolean; paused: boolean; message: string }>(
'/api/tasks/pool/pause',
{ method: 'POST' }
);
}
async resumeTaskPool() {
return this.request<{ success: boolean; paused: boolean; message: string }>(
'/api/tasks/pool/resume',
{ method: 'POST' }
);
}
// K8s Worker Control
async getK8sWorkers() {
return this.request<{
success: boolean;
available: boolean;
replicas: number;
readyReplicas: number;
availableReplicas?: number;
error?: string;
}>('/api/k8s/workers');
}
async scaleK8sWorkers(replicas: number) {
return this.request<{ success: boolean; replicas: number; message?: string; error?: string }>(
'/api/k8s/workers/scale',
{ method: 'POST', body: JSON.stringify({ replicas }) }
);
}
}
export const api = new ApiClient(API_URL);

View File

@@ -1,6 +1,5 @@
import { useEffect, useState } from 'react';
import { Layout } from '../components/Layout';
import { HealthPanel } from '../components/HealthPanel';
import { api } from '../lib/api';
import { useNavigate } from 'react-router-dom';
import {
@@ -42,7 +41,6 @@ export function Dashboard() {
const [activity, setActivity] = useState<any>(null);
const [nationalStats, setNationalStats] = useState<any>(null);
const [loading, setLoading] = useState(true);
const [refreshing, setRefreshing] = useState(false);
const [pendingChangesCount, setPendingChangesCount] = useState(0);
const [showNotification, setShowNotification] = useState(false);
const [taskCounts, setTaskCounts] = useState<Record<string, number> | null>(null);
@@ -93,10 +91,7 @@ export function Dashboard() {
}
};
const loadData = async (isRefresh = false) => {
if (isRefresh) {
setRefreshing(true);
}
const loadData = async () => {
try {
// Fetch dashboard data (primary data source)
const dashboard = await api.getMarketDashboard();
@@ -158,7 +153,6 @@ export function Dashboard() {
console.error('Failed to load dashboard:', error);
} finally {
setLoading(false);
setRefreshing(false);
}
};
@@ -271,24 +265,11 @@ export function Dashboard() {
<div className="space-y-8">
{/* Header */}
<div className="flex flex-col sm:flex-row sm:justify-between sm:items-center gap-4">
<div>
<h1 className="text-xl sm:text-2xl font-semibold text-gray-900">Dashboard</h1>
<p className="text-sm text-gray-500 mt-1">Monitor your dispensary data aggregation</p>
</div>
<button
onClick={() => loadData(true)}
disabled={refreshing}
className="inline-flex items-center justify-center gap-2 px-4 py-2 bg-white border border-gray-200 rounded-lg hover:bg-gray-50 transition-colors text-sm font-medium text-gray-700 self-start sm:self-auto disabled:opacity-50 disabled:cursor-not-allowed"
>
<RefreshCw className={`w-4 h-4 ${refreshing ? 'animate-spin' : ''}`} />
{refreshing ? 'Refreshing...' : 'Refresh'}
</button>
<div>
<h1 className="text-xl sm:text-2xl font-semibold text-gray-900">Dashboard</h1>
<p className="text-sm text-gray-500 mt-1">Monitor your dispensary data aggregation</p>
</div>
{/* System Health */}
<HealthPanel showQueues={false} refreshInterval={60000} />
{/* Stats Grid */}
<div className="grid grid-cols-2 lg:grid-cols-3 gap-3 sm:gap-6">
{/* Products */}

View File

@@ -161,23 +161,6 @@ export function Dispensaries() {
))}
</select>
</div>
<div>
<label className="block text-sm font-medium text-gray-700 mb-2">
Filter by Status
</label>
<select
value={filterStatus}
onChange={(e) => handleStatusFilter(e.target.value)}
className={`w-full px-3 py-2 border rounded-lg focus:ring-2 focus:ring-blue-500 focus:border-blue-500 ${
filterStatus === 'dropped' ? 'border-red-300 bg-red-50' : 'border-gray-300'
}`}
>
<option value="">All Statuses</option>
<option value="open">Open</option>
<option value="dropped">Dropped (Needs Review)</option>
<option value="closed">Closed</option>
</select>
</div>
</div>
</div>

View File

@@ -204,47 +204,6 @@ export function DispensaryDetail() {
Back to Dispensaries
</button>
{/* Update Dropdown */}
<div className="relative">
<button
onClick={() => setShowUpdateDropdown(!showUpdateDropdown)}
disabled={isUpdating}
className="flex items-center gap-2 px-4 py-2 text-sm font-medium text-white bg-blue-600 hover:bg-blue-700 rounded-lg disabled:opacity-50 disabled:cursor-not-allowed"
>
<RefreshCw className={`w-4 h-4 ${isUpdating ? 'animate-spin' : ''}`} />
{isUpdating ? 'Updating...' : 'Update'}
{!isUpdating && <ChevronDown className="w-4 h-4" />}
</button>
{showUpdateDropdown && !isUpdating && (
<div className="absolute right-0 mt-2 w-48 bg-white rounded-lg shadow-lg border border-gray-200 z-10">
<button
onClick={() => handleUpdate('products')}
className="w-full text-left px-4 py-2 text-sm text-gray-700 hover:bg-gray-100 rounded-t-lg"
>
Products
</button>
<button
onClick={() => handleUpdate('brands')}
className="w-full text-left px-4 py-2 text-sm text-gray-700 hover:bg-gray-100"
>
Brands
</button>
<button
onClick={() => handleUpdate('specials')}
className="w-full text-left px-4 py-2 text-sm text-gray-700 hover:bg-gray-100"
>
Specials
</button>
<button
onClick={() => handleUpdate('all')}
className="w-full text-left px-4 py-2 text-sm text-gray-700 hover:bg-gray-100 rounded-b-lg border-t border-gray-200"
>
All
</button>
</div>
)}
</div>
</div>
{/* Dispensary Header */}
@@ -266,7 +225,7 @@ export function DispensaryDetail() {
<div className="flex items-center gap-2 text-sm text-gray-600 bg-gray-50 px-4 py-2 rounded-lg">
<Calendar className="w-4 h-4" />
<div>
<span className="font-medium">Last Crawl Date:</span>
<span className="font-medium">Last Updated:</span>
<span className="ml-2">
{dispensary.last_menu_scrape
? new Date(dispensary.last_menu_scrape).toLocaleDateString('en-US', {
@@ -331,7 +290,7 @@ export function DispensaryDetail() {
</a>
)}
<Link
to="/schedule"
to={`/dispensaries/${state}/${city}/${slug}/schedule`}
className="flex items-center gap-2 text-sm text-blue-600 hover:text-blue-800"
>
<Clock className="w-4 h-4" />
@@ -533,57 +492,31 @@ export function DispensaryDetail() {
`$${product.regular_price}`
) : '-'}
</td>
<td className="text-center whitespace-nowrap">
{product.quantity != null ? (
<span className={`badge badge-sm ${product.quantity > 0 ? 'badge-info' : 'badge-error'}`}>
{product.quantity}
</span>
) : '-'}
<td className="text-center whitespace-nowrap text-sm text-gray-700">
{product.quantity != null ? product.quantity : '-'}
</td>
<td className="text-center whitespace-nowrap">
{product.thc_percentage ? (
<span className="badge badge-success badge-sm">{product.thc_percentage}%</span>
) : '-'}
<td className="text-center whitespace-nowrap text-sm text-gray-700">
{product.thc_percentage ? `${product.thc_percentage}%` : '-'}
</td>
<td className="text-center whitespace-nowrap">
{product.cbd_percentage ? (
<span className="badge badge-info badge-sm">{product.cbd_percentage}%</span>
) : '-'}
<td className="text-center whitespace-nowrap text-sm text-gray-700">
{product.cbd_percentage ? `${product.cbd_percentage}%` : '-'}
</td>
<td className="text-center whitespace-nowrap">
{product.strain_type ? (
<span className="badge badge-ghost badge-sm">{product.strain_type}</span>
) : '-'}
<td className="text-center whitespace-nowrap text-sm text-gray-700">
{product.strain_type || '-'}
</td>
<td className="text-center whitespace-nowrap">
{product.in_stock ? (
<span className="badge badge-success badge-sm">Yes</span>
) : product.in_stock === false ? (
<span className="badge badge-error badge-sm">No</span>
) : '-'}
<td className="text-center whitespace-nowrap text-sm text-gray-700">
{product.in_stock ? 'Yes' : product.in_stock === false ? 'No' : '-'}
</td>
<td className="whitespace-nowrap text-xs text-gray-500">
{product.updated_at ? formatDate(product.updated_at) : '-'}
</td>
<td>
<div className="flex gap-1">
{product.dutchie_url && (
<a
href={product.dutchie_url}
target="_blank"
rel="noopener noreferrer"
className="btn btn-xs btn-outline"
>
Dutchie
</a>
)}
<button
onClick={() => navigate(`/products/${product.id}`)}
className="btn btn-xs btn-primary"
>
Details
</button>
</div>
<button
onClick={() => navigate(`/products/${product.id}`)}
className="btn btn-xs btn-ghost text-gray-500 hover:text-gray-700"
>
Details
</button>
</td>
</tr>
))}

View File

@@ -0,0 +1,378 @@
import { useEffect, useState } from 'react';
import { useParams, useNavigate, Link } from 'react-router-dom';
import { Layout } from '../components/Layout';
import { api } from '../lib/api';
import {
ArrowLeft,
Clock,
Calendar,
CheckCircle,
XCircle,
AlertCircle,
Package,
Timer,
Building2,
} from 'lucide-react';
interface CrawlHistoryItem {
id: number;
runId: string | null;
profileKey: string | null;
crawlerModule: string | null;
stateAtStart: string | null;
stateAtEnd: string | null;
totalSteps: number;
durationMs: number | null;
success: boolean;
errorMessage: string | null;
productsFound: number | null;
startedAt: string | null;
completedAt: string | null;
}
interface NextSchedule {
scheduleId: number;
jobName: string;
enabled: boolean;
baseIntervalMinutes: number;
jitterMinutes: number;
nextRunAt: string | null;
lastRunAt: string | null;
lastStatus: string | null;
}
interface Dispensary {
id: number;
name: string;
dba_name: string | null;
slug: string;
state: string;
city: string;
menu_type: string | null;
platform_dispensary_id: string | null;
last_menu_scrape: string | null;
}
export function DispensarySchedule() {
const { state, city, slug } = useParams();
const navigate = useNavigate();
const [dispensary, setDispensary] = useState<Dispensary | null>(null);
const [history, setHistory] = useState<CrawlHistoryItem[]>([]);
const [nextSchedule, setNextSchedule] = useState<NextSchedule | null>(null);
const [loading, setLoading] = useState(true);
useEffect(() => {
loadScheduleData();
}, [slug]);
const loadScheduleData = async () => {
setLoading(true);
try {
// First get the dispensary to get the ID
const dispData = await api.getDispensary(slug!);
if (dispData?.id) {
const data = await api.getStoreCrawlHistory(dispData.id);
setDispensary(data.dispensary);
setHistory(data.history || []);
setNextSchedule(data.nextSchedule);
}
} catch (error) {
console.error('Failed to load schedule data:', error);
} finally {
setLoading(false);
}
};
const formatDate = (dateStr: string | null) => {
if (!dateStr) return 'Never';
const date = new Date(dateStr);
return date.toLocaleDateString('en-US', {
year: 'numeric',
month: 'short',
day: 'numeric',
hour: '2-digit',
minute: '2-digit',
});
};
const formatTimeAgo = (dateStr: string | null) => {
if (!dateStr) return 'Never';
const date = new Date(dateStr);
const now = new Date();
const diffMs = now.getTime() - date.getTime();
const diffMinutes = Math.floor(diffMs / (1000 * 60));
const diffHours = Math.floor(diffMs / (1000 * 60 * 60));
const diffDays = Math.floor(diffMs / (1000 * 60 * 60 * 24));
if (diffMinutes < 1) return 'Just now';
if (diffMinutes < 60) return `${diffMinutes}m ago`;
if (diffHours < 24) return `${diffHours}h ago`;
if (diffDays === 1) return 'Yesterday';
if (diffDays < 7) return `${diffDays} days ago`;
return date.toLocaleDateString();
};
const formatTimeUntil = (dateStr: string | null) => {
if (!dateStr) return 'Not scheduled';
const date = new Date(dateStr);
const now = new Date();
const diffMs = date.getTime() - now.getTime();
if (diffMs < 0) return 'Overdue';
const diffMinutes = Math.floor(diffMs / (1000 * 60));
const diffHours = Math.floor(diffMinutes / 60);
if (diffMinutes < 60) return `in ${diffMinutes}m`;
return `in ${diffHours}h ${diffMinutes % 60}m`;
};
const formatDuration = (ms: number | null) => {
if (!ms) return '-';
if (ms < 1000) return `${ms}ms`;
const seconds = Math.floor(ms / 1000);
const minutes = Math.floor(seconds / 60);
if (minutes < 1) return `${seconds}s`;
return `${minutes}m ${seconds % 60}s`;
};
const formatInterval = (baseMinutes: number, jitterMinutes: number) => {
const hours = Math.floor(baseMinutes / 60);
const mins = baseMinutes % 60;
let base = hours > 0 ? `${hours}h` : '';
if (mins > 0) base += `${mins}m`;
return `Every ${base} (+/- ${jitterMinutes}m jitter)`;
};
if (loading) {
return (
<Layout>
<div className="text-center py-12">
<div className="inline-block animate-spin rounded-full h-8 w-8 border-4 border-gray-400 border-t-transparent"></div>
<p className="mt-2 text-sm text-gray-600">Loading schedule...</p>
</div>
</Layout>
);
}
if (!dispensary) {
return (
<Layout>
<div className="text-center py-12">
<p className="text-gray-600">Dispensary not found</p>
</div>
</Layout>
);
}
// Stats from history
const successCount = history.filter(h => h.success).length;
const failureCount = history.filter(h => !h.success).length;
const lastSuccess = history.find(h => h.success);
const avgDuration = history.length > 0
? Math.round(history.reduce((sum, h) => sum + (h.durationMs || 0), 0) / history.length)
: 0;
return (
<Layout>
<div className="space-y-6">
{/* Header */}
<div className="flex items-center justify-between gap-4">
<button
onClick={() => navigate(`/dispensaries/${state}/${city}/${slug}`)}
className="flex items-center gap-2 text-sm text-gray-600 hover:text-gray-900"
>
<ArrowLeft className="w-4 h-4" />
Back to {dispensary.dba_name || dispensary.name}
</button>
</div>
{/* Dispensary Info */}
<div className="bg-white rounded-lg border border-gray-200 p-6">
<div className="flex items-start gap-4">
<div className="p-3 bg-blue-50 rounded-lg">
<Building2 className="w-8 h-8 text-blue-600" />
</div>
<div>
<h1 className="text-2xl font-bold text-gray-900">
{dispensary.dba_name || dispensary.name}
</h1>
<p className="text-sm text-gray-600 mt-1">
{dispensary.city}, {dispensary.state} - Crawl Schedule & History
</p>
<div className="flex items-center gap-4 mt-2 text-sm text-gray-500">
<span>Slug: {dispensary.slug}</span>
{dispensary.menu_type && (
<span className="px-2 py-0.5 bg-gray-100 rounded text-xs">
{dispensary.menu_type}
</span>
)}
</div>
</div>
</div>
</div>
{/* Next Scheduled Crawl */}
{nextSchedule && (
<div className="bg-white rounded-lg border border-gray-200 p-6">
<h2 className="text-lg font-semibold text-gray-900 mb-4 flex items-center gap-2">
<Clock className="w-5 h-5 text-blue-500" />
Upcoming Schedule
</h2>
<div className="grid grid-cols-4 gap-6">
<div>
<p className="text-sm text-gray-500">Next Run</p>
<p className="text-xl font-semibold text-blue-600">
{formatTimeUntil(nextSchedule.nextRunAt)}
</p>
<p className="text-xs text-gray-400">
{formatDate(nextSchedule.nextRunAt)}
</p>
</div>
<div>
<p className="text-sm text-gray-500">Interval</p>
<p className="text-lg font-medium">
{formatInterval(nextSchedule.baseIntervalMinutes, nextSchedule.jitterMinutes)}
</p>
</div>
<div>
<p className="text-sm text-gray-500">Last Run</p>
<p className="text-lg font-medium">
{formatTimeAgo(nextSchedule.lastRunAt)}
</p>
</div>
<div>
<p className="text-sm text-gray-500">Last Status</p>
<p className={`text-lg font-medium ${
nextSchedule.lastStatus === 'success' ? 'text-green-600' :
nextSchedule.lastStatus === 'error' ? 'text-red-600' : 'text-gray-600'
}`}>
{nextSchedule.lastStatus || '-'}
</p>
</div>
</div>
</div>
)}
{/* Stats Summary */}
<div className="grid grid-cols-4 gap-4">
<div className="bg-white rounded-lg border border-gray-200 p-4">
<div className="flex items-center gap-3">
<CheckCircle className="w-8 h-8 text-green-500" />
<div>
<p className="text-sm text-gray-500">Successful Runs</p>
<p className="text-2xl font-bold text-green-600">{successCount}</p>
</div>
</div>
</div>
<div className="bg-white rounded-lg border border-gray-200 p-4">
<div className="flex items-center gap-3">
<XCircle className="w-8 h-8 text-red-500" />
<div>
<p className="text-sm text-gray-500">Failed Runs</p>
<p className="text-2xl font-bold text-red-600">{failureCount}</p>
</div>
</div>
</div>
<div className="bg-white rounded-lg border border-gray-200 p-4">
<div className="flex items-center gap-3">
<Timer className="w-8 h-8 text-blue-500" />
<div>
<p className="text-sm text-gray-500">Avg Duration</p>
<p className="text-2xl font-bold">{formatDuration(avgDuration)}</p>
</div>
</div>
</div>
<div className="bg-white rounded-lg border border-gray-200 p-4">
<div className="flex items-center gap-3">
<Package className="w-8 h-8 text-purple-500" />
<div>
<p className="text-sm text-gray-500">Last Products Found</p>
<p className="text-2xl font-bold">
{lastSuccess?.productsFound?.toLocaleString() || '-'}
</p>
</div>
</div>
</div>
</div>
{/* Crawl History Table */}
<div className="bg-white rounded-lg border border-gray-200">
<div className="p-4 border-b border-gray-200">
<h2 className="text-lg font-semibold text-gray-900 flex items-center gap-2">
<Calendar className="w-5 h-5 text-gray-500" />
Crawl History
</h2>
</div>
<div className="overflow-x-auto">
<table className="table table-sm w-full">
<thead className="bg-gray-50">
<tr>
<th>Status</th>
<th>Started</th>
<th>Duration</th>
<th className="text-right">Products</th>
<th>State</th>
<th>Error</th>
</tr>
</thead>
<tbody>
{history.length === 0 ? (
<tr>
<td colSpan={6} className="text-center py-8 text-gray-500">
No crawl history available
</td>
</tr>
) : (
history.map((item) => (
<tr key={item.id} className="hover:bg-gray-50">
<td>
<span className={`inline-flex items-center gap-1 px-2 py-1 rounded text-xs font-medium ${
item.success
? 'bg-green-100 text-green-700'
: 'bg-red-100 text-red-700'
}`}>
{item.success ? (
<CheckCircle className="w-3 h-3" />
) : (
<XCircle className="w-3 h-3" />
)}
{item.success ? 'Success' : 'Failed'}
</span>
</td>
<td>
<div className="text-sm">{formatDate(item.startedAt)}</div>
<div className="text-xs text-gray-400">{formatTimeAgo(item.startedAt)}</div>
</td>
<td className="font-mono text-sm">
{formatDuration(item.durationMs)}
</td>
<td className="text-right font-mono text-sm">
{item.productsFound?.toLocaleString() || '-'}
</td>
<td className="text-sm text-gray-600">
{item.stateAtEnd || item.stateAtStart || '-'}
</td>
<td className="max-w-[200px]">
{item.errorMessage ? (
<span
className="text-xs text-red-600 truncate block cursor-help"
title={item.errorMessage}
>
{item.errorMessage.substring(0, 50)}...
</span>
) : '-'}
</td>
</tr>
))
)}
</tbody>
</table>
</div>
</div>
</div>
</Layout>
);
}
export default DispensarySchedule;

View File

@@ -3,15 +3,16 @@ import { useNavigate } from 'react-router-dom';
import { Layout } from '../components/Layout';
import { api } from '../lib/api';
import { trackProductClick } from '../lib/analytics';
import { useStateFilter } from '../hooks/useStateFilter';
import {
Building2,
MapPin,
Package,
DollarSign,
RefreshCw,
Search,
TrendingUp,
BarChart3,
ChevronDown,
} from 'lucide-react';
interface BrandData {
@@ -25,19 +26,28 @@ interface BrandData {
export function IntelligenceBrands() {
const navigate = useNavigate();
const { selectedState, setSelectedState, stateParam, stateLabel, isAllStates } = useStateFilter();
const [availableStates, setAvailableStates] = useState<string[]>([]);
const [brands, setBrands] = useState<BrandData[]>([]);
const [loading, setLoading] = useState(true);
const [searchTerm, setSearchTerm] = useState('');
const [sortBy, setSortBy] = useState<'stores' | 'skus' | 'name'>('stores');
const [sortBy, setSortBy] = useState<'stores' | 'skus' | 'name' | 'states'>('stores');
useEffect(() => {
loadBrands();
}, [stateParam]);
useEffect(() => {
// Load available states
api.getOrchestratorStates().then(data => {
setAvailableStates(data.states?.map((s: any) => s.state) || []);
}).catch(console.error);
}, []);
const loadBrands = async () => {
try {
setLoading(true);
const data = await api.getIntelligenceBrands({ limit: 500 });
const data = await api.getIntelligenceBrands({ limit: 500, state: stateParam });
setBrands(data.brands || []);
} catch (error) {
console.error('Failed to load brands:', error);
@@ -58,6 +68,8 @@ export function IntelligenceBrands() {
return b.skuCount - a.skuCount;
case 'name':
return a.brandName.localeCompare(b.brandName);
case 'states':
return b.states.length - a.states.length;
default:
return 0;
}
@@ -89,35 +101,60 @@ export function IntelligenceBrands() {
<Layout>
<div className="space-y-6">
{/* Header */}
<div className="flex items-center justify-between">
<div className="flex flex-col gap-4 sm:flex-row sm:items-center sm:justify-between">
<div>
<h1 className="text-2xl font-bold text-gray-900">Brands Intelligence</h1>
<p className="text-sm text-gray-600 mt-1">
Brand penetration and pricing analytics across markets
</p>
</div>
<div className="flex gap-2">
<button
onClick={() => navigate('/admin/intelligence/pricing')}
className="btn btn-sm btn-outline gap-1"
>
<DollarSign className="w-4 h-4" />
Pricing
</button>
<button
onClick={() => navigate('/admin/intelligence/stores')}
className="btn btn-sm btn-outline gap-1"
>
<MapPin className="w-4 h-4" />
Stores
</button>
<button
onClick={loadBrands}
className="btn btn-sm btn-outline gap-2"
>
<RefreshCw className="w-4 h-4" />
Refresh
</button>
<div className="flex flex-wrap gap-2 items-center">
{/* State Selector */}
<div className="dropdown dropdown-end">
<button tabIndex={0} className="btn btn-sm gap-2 bg-emerald-50 border-emerald-200 hover:bg-emerald-100">
{stateLabel}
<ChevronDown className="w-4 h-4" />
</button>
<ul tabIndex={0} className="dropdown-content z-50 menu p-2 shadow-lg bg-white rounded-box w-44 max-h-60 overflow-y-auto border border-gray-200">
<li>
<a onClick={() => setSelectedState(null)} className={isAllStates ? 'active bg-emerald-100' : ''}>
All States
</a>
</li>
<div className="divider my-1"></div>
{availableStates.map((state) => (
<li key={state}>
<a onClick={() => setSelectedState(state)} className={selectedState === state ? 'active bg-emerald-100' : ''}>
{state}
</a>
</li>
))}
</ul>
</div>
{/* Page Navigation */}
<div className="flex gap-1">
<button
className="btn btn-sm gap-1 bg-emerald-600 text-white hover:bg-emerald-700 border-emerald-600"
>
<Building2 className="w-4 h-4" />
<span>Brands</span>
</button>
<button
onClick={() => navigate('/admin/intelligence/stores')}
className="btn btn-sm gap-1 bg-white border-gray-300 text-gray-700 hover:bg-gray-100"
>
<MapPin className="w-4 h-4" />
<span>Stores</span>
</button>
<button
onClick={() => navigate('/admin/intelligence/pricing')}
className="btn btn-sm gap-1 bg-white border-gray-300 text-gray-700 hover:bg-gray-100"
>
<DollarSign className="w-4 h-4" />
<span>Pricing</span>
</button>
</div>
</div>
</div>
@@ -169,28 +206,32 @@ export function IntelligenceBrands() {
{/* Top Brands Chart */}
<div className="bg-white rounded-lg border border-gray-200 p-4">
<h3 className="text-lg font-semibold text-gray-900 mb-4 flex items-center gap-2">
<BarChart3 className="w-5 h-5 text-blue-500" />
<h3 className="text-lg font-semibold text-gray-900 flex items-center gap-2 mb-4">
<BarChart3 className="w-5 h-5 text-emerald-500" />
Top 10 Brands by Store Count
</h3>
<div className="space-y-2">
{topBrands.map((brand, idx) => (
<div key={brand.brandName} className="flex items-center gap-3">
<span className="text-sm text-gray-500 w-6">{idx + 1}.</span>
<span className="text-sm font-medium w-40 truncate" title={brand.brandName}>
{brand.brandName}
</span>
<div className="flex-1 bg-gray-100 rounded-full h-4 relative">
<div
className="bg-blue-500 rounded-full h-4"
style={{ width: `${(brand.storeCount / maxStoreCount) * 100}%` }}
/>
{topBrands.map((brand) => {
const barWidth = Math.min((brand.storeCount / maxStoreCount) * 100, 100);
return (
<div key={brand.brandName} className="flex items-center gap-3">
<span className="text-sm font-medium w-28 truncate shrink-0" title={brand.brandName}>
{brand.brandName}
</span>
<div className="flex-1 min-w-0">
<div className="bg-gray-100 rounded h-5 overflow-hidden">
<div
className="bg-gradient-to-r from-emerald-400 to-emerald-500 h-5 rounded transition-all"
style={{ width: `${barWidth}%` }}
/>
</div>
</div>
<span className="text-sm font-mono font-semibold text-emerald-600 w-16 text-right shrink-0">
{brand.storeCount}
</span>
</div>
<span className="text-sm text-gray-600 w-16 text-right">
{brand.storeCount} stores
</span>
</div>
))}
);
})}
</div>
</div>
@@ -213,6 +254,7 @@ export function IntelligenceBrands() {
>
<option value="stores">Sort by Stores</option>
<option value="skus">Sort by SKUs</option>
<option value="states">Sort by States</option>
<option value="name">Sort by Name</option>
</select>
<span className="text-sm text-gray-500">

View File

@@ -2,15 +2,16 @@ import { useEffect, useState } from 'react';
import { useNavigate } from 'react-router-dom';
import { Layout } from '../components/Layout';
import { api } from '../lib/api';
import { useStateFilter } from '../hooks/useStateFilter';
import {
DollarSign,
Building2,
MapPin,
Package,
RefreshCw,
TrendingUp,
TrendingDown,
BarChart3,
ChevronDown,
} from 'lucide-react';
interface CategoryPricing {
@@ -31,18 +32,27 @@ interface OverallPricing {
export function IntelligencePricing() {
const navigate = useNavigate();
const { selectedState, setSelectedState, stateParam, stateLabel, isAllStates } = useStateFilter();
const [availableStates, setAvailableStates] = useState<string[]>([]);
const [categories, setCategories] = useState<CategoryPricing[]>([]);
const [overall, setOverall] = useState<OverallPricing | null>(null);
const [loading, setLoading] = useState(true);
useEffect(() => {
loadPricing();
}, [stateParam]);
useEffect(() => {
// Load available states
api.getOrchestratorStates().then(data => {
setAvailableStates(data.states?.map((s: any) => s.state) || []);
}).catch(console.error);
}, []);
const loadPricing = async () => {
try {
setLoading(true);
const data = await api.getIntelligencePricing();
const data = await api.getIntelligencePricing({ state: stateParam });
setCategories(data.byCategory || []);
setOverall(data.overall || null);
} catch (error) {
@@ -76,35 +86,60 @@ export function IntelligencePricing() {
<Layout>
<div className="space-y-6">
{/* Header */}
<div className="flex items-center justify-between">
<div className="flex flex-col gap-4 sm:flex-row sm:items-center sm:justify-between">
<div>
<h1 className="text-2xl font-bold text-gray-900">Pricing Intelligence</h1>
<p className="text-sm text-gray-600 mt-1">
Price distribution and trends by category
</p>
</div>
<div className="flex gap-2">
<button
onClick={() => navigate('/admin/intelligence/brands')}
className="btn btn-sm btn-outline gap-1"
>
<Building2 className="w-4 h-4" />
Brands
</button>
<button
onClick={() => navigate('/admin/intelligence/stores')}
className="btn btn-sm btn-outline gap-1"
>
<MapPin className="w-4 h-4" />
Stores
</button>
<button
onClick={loadPricing}
className="btn btn-sm btn-outline gap-2"
>
<RefreshCw className="w-4 h-4" />
Refresh
</button>
<div className="flex flex-wrap gap-2 items-center">
{/* State Selector */}
<div className="dropdown dropdown-end">
<button tabIndex={0} className="btn btn-sm gap-2 bg-emerald-50 border-emerald-200 hover:bg-emerald-100">
{stateLabel}
<ChevronDown className="w-4 h-4" />
</button>
<ul tabIndex={0} className="dropdown-content z-50 menu p-2 shadow-lg bg-white rounded-box w-44 max-h-60 overflow-y-auto border border-gray-200">
<li>
<a onClick={() => setSelectedState(null)} className={isAllStates ? 'active bg-emerald-100' : ''}>
All States
</a>
</li>
<div className="divider my-1"></div>
{availableStates.map((state) => (
<li key={state}>
<a onClick={() => setSelectedState(state)} className={selectedState === state ? 'active bg-emerald-100' : ''}>
{state}
</a>
</li>
))}
</ul>
</div>
{/* Page Navigation */}
<div className="flex gap-1">
<button
onClick={() => navigate('/admin/intelligence/brands')}
className="btn btn-sm gap-1 bg-white border-gray-300 text-gray-700 hover:bg-gray-100"
>
<Building2 className="w-4 h-4" />
<span>Brands</span>
</button>
<button
onClick={() => navigate('/admin/intelligence/stores')}
className="btn btn-sm gap-1 bg-white border-gray-300 text-gray-700 hover:bg-gray-100"
>
<MapPin className="w-4 h-4" />
<span>Stores</span>
</button>
<button
className="btn btn-sm gap-1 bg-emerald-600 text-white hover:bg-emerald-700 border-emerald-600"
>
<DollarSign className="w-4 h-4" />
<span>Pricing</span>
</button>
</div>
</div>
</div>
@@ -150,7 +185,7 @@ export function IntelligencePricing() {
<div>
<p className="text-sm text-gray-500">Products Priced</p>
<p className="text-2xl font-bold">
{overall.totalProducts.toLocaleString()}
{(overall.totalProducts || 0).toLocaleString()}
</p>
</div>
</div>
@@ -164,43 +199,29 @@ export function IntelligencePricing() {
<BarChart3 className="w-5 h-5 text-green-500" />
Average Price by Category
</h3>
<div className="space-y-3">
{sortedCategories.map((cat) => (
<div key={cat.category} className="flex items-center gap-3">
<span className="text-sm font-medium w-32 truncate" title={cat.category}>
{cat.category || 'Unknown'}
</span>
<div className="flex-1 relative">
{/* Price range bar */}
<div className="bg-gray-100 rounded-full h-6 relative">
{/* Min-Max range */}
<div
className="absolute top-0 h-6 bg-blue-100 rounded-full"
style={{
left: `${(cat.minPrice / (overall?.maxPrice || 100)) * 100}%`,
width: `${((cat.maxPrice - cat.minPrice) / (overall?.maxPrice || 100)) * 100}%`,
}}
/>
{/* Average marker */}
<div
className="absolute top-0 h-6 w-1 bg-green-500 rounded"
style={{ left: `${(cat.avgPrice / (overall?.maxPrice || 100)) * 100}%` }}
/>
<div className="space-y-2">
{sortedCategories.slice(0, 12).map((cat) => {
const maxPrice = Math.max(...sortedCategories.map(c => c.avgPrice || 0), 1);
const barWidth = Math.min(((cat.avgPrice || 0) / maxPrice) * 100, 100);
return (
<div key={cat.category} className="flex items-center gap-3">
<span className="text-sm font-medium w-28 truncate shrink-0" title={cat.category}>
{cat.category || 'Unknown'}
</span>
<div className="flex-1 min-w-0">
<div className="bg-gray-100 rounded h-5 overflow-hidden">
<div
className="bg-gradient-to-r from-emerald-400 to-emerald-500 h-5 rounded transition-all"
style={{ width: `${barWidth}%` }}
/>
</div>
</div>
</div>
<div className="flex gap-4 text-xs w-48">
<span className="text-gray-500">
Min: <span className="text-blue-600 font-mono">{formatPrice(cat.minPrice)}</span>
</span>
<span className="text-gray-500">
Avg: <span className="text-green-600 font-mono font-bold">{formatPrice(cat.avgPrice)}</span>
</span>
<span className="text-gray-500">
Max: <span className="text-orange-600 font-mono">{formatPrice(cat.maxPrice)}</span>
<span className="text-sm font-mono font-semibold text-emerald-600 w-16 text-right shrink-0">
{formatPrice(cat.avgPrice)}
</span>
</div>
</div>
))}
);
})}
</div>
</div>
@@ -236,7 +257,7 @@ export function IntelligencePricing() {
<span className="font-medium">{cat.category || 'Unknown'}</span>
</td>
<td className="text-center">
<span className="font-mono">{cat.productCount.toLocaleString()}</span>
<span className="font-mono">{(cat.productCount || 0).toLocaleString()}</span>
</td>
<td className="text-right">
<span className="font-mono text-blue-600">{formatPrice(cat.minPrice)}</span>

View File

@@ -8,7 +8,6 @@ import {
Building2,
DollarSign,
Package,
RefreshCw,
Search,
Clock,
Activity,
@@ -34,12 +33,19 @@ export function IntelligenceStores() {
const [stores, setStores] = useState<StoreActivity[]>([]);
const [loading, setLoading] = useState(true);
const [searchTerm, setSearchTerm] = useState('');
const [localStates, setLocalStates] = useState<string[]>([]);
const [availableStates, setAvailableStates] = useState<string[]>([]);
useEffect(() => {
loadStores();
}, [selectedState]);
useEffect(() => {
// Load available states from orchestrator API
api.getOrchestratorStates().then(data => {
setAvailableStates(data.states?.map((s: any) => s.state) || []);
}).catch(console.error);
}, []);
const loadStores = async () => {
try {
setLoading(true);
@@ -48,10 +54,6 @@ export function IntelligenceStores() {
limit: 500,
});
setStores(data.stores || []);
// Extract unique states from response for dropdown counts
const uniqueStates = [...new Set(data.stores.map((s: StoreActivity) => s.state))].sort();
setLocalStates(uniqueStates);
} catch (error) {
console.error('Failed to load stores:', error);
} finally {
@@ -97,47 +99,72 @@ export function IntelligenceStores() {
);
}
// Calculate stats
const totalSKUs = stores.reduce((sum, s) => sum + s.skuCount, 0);
const totalSnapshots = stores.reduce((sum, s) => sum + s.snapshotCount, 0);
const avgFrequency = stores.filter(s => s.crawlFrequencyHours).length > 0
? stores.filter(s => s.crawlFrequencyHours).reduce((sum, s) => sum + (s.crawlFrequencyHours || 0), 0) /
stores.filter(s => s.crawlFrequencyHours).length
// Calculate stats with null safety
const totalSKUs = stores.reduce((sum, s) => sum + (s.skuCount || 0), 0);
const totalSnapshots = stores.reduce((sum, s) => sum + (s.snapshotCount || 0), 0);
const storesWithFrequency = stores.filter(s => s.crawlFrequencyHours != null);
const avgFrequency = storesWithFrequency.length > 0
? storesWithFrequency.reduce((sum, s) => sum + (s.crawlFrequencyHours || 0), 0) / storesWithFrequency.length
: 0;
return (
<Layout>
<div className="space-y-6">
{/* Header */}
<div className="flex items-center justify-between">
<div className="flex flex-col gap-4 sm:flex-row sm:items-center sm:justify-between">
<div>
<h1 className="text-2xl font-bold text-gray-900">Store Activity</h1>
<p className="text-sm text-gray-600 mt-1">
Per-store SKU counts, snapshots, and crawl frequency
</p>
</div>
<div className="flex gap-2">
<button
onClick={() => navigate('/admin/intelligence/brands')}
className="btn btn-sm btn-outline gap-1"
>
<Building2 className="w-4 h-4" />
Brands
</button>
<button
onClick={() => navigate('/admin/intelligence/pricing')}
className="btn btn-sm btn-outline gap-1"
>
<DollarSign className="w-4 h-4" />
Pricing
</button>
<button
onClick={loadStores}
className="btn btn-sm btn-outline gap-2"
>
<RefreshCw className="w-4 h-4" />
Refresh
</button>
<div className="flex flex-wrap gap-2 items-center">
{/* State Selector */}
<div className="dropdown dropdown-end">
<button tabIndex={0} className="btn btn-sm gap-2 bg-emerald-50 border-emerald-200 hover:bg-emerald-100">
{stateLabel}
<ChevronDown className="w-4 h-4" />
</button>
<ul tabIndex={0} className="dropdown-content z-50 menu p-2 shadow-lg bg-white rounded-box w-44 max-h-60 overflow-y-auto border border-gray-200">
<li>
<a onClick={() => setSelectedState(null)} className={isAllStates ? 'active bg-emerald-100' : ''}>
All States
</a>
</li>
<div className="divider my-1"></div>
{availableStates.map((state) => (
<li key={state}>
<a onClick={() => setSelectedState(state)} className={selectedState === state ? 'active bg-emerald-100' : ''}>
{state}
</a>
</li>
))}
</ul>
</div>
{/* Page Navigation */}
<div className="flex gap-1">
<button
onClick={() => navigate('/admin/intelligence/brands')}
className="btn btn-sm gap-1 bg-white border-gray-300 text-gray-700 hover:bg-gray-100"
>
<Building2 className="w-4 h-4" />
<span>Brands</span>
</button>
<button
className="btn btn-sm gap-1 bg-emerald-600 text-white hover:bg-emerald-700 border-emerald-600"
>
<MapPin className="w-4 h-4" />
<span>Stores</span>
</button>
<button
onClick={() => navigate('/admin/intelligence/pricing')}
className="btn btn-sm gap-1 bg-white border-gray-300 text-gray-700 hover:bg-gray-100"
>
<DollarSign className="w-4 h-4" />
<span>Pricing</span>
</button>
</div>
</div>
</div>
@@ -193,26 +220,6 @@ export function IntelligenceStores() {
className="input input-bordered input-sm w-full pl-10"
/>
</div>
<div className="dropdown">
<button tabIndex={0} className="btn btn-sm btn-outline gap-2">
{stateLabel}
<ChevronDown className="w-4 h-4" />
</button>
<ul tabIndex={0} className="dropdown-content z-[1] menu p-2 shadow bg-base-100 rounded-box w-40 max-h-60 overflow-y-auto">
<li>
<a onClick={() => setSelectedState(null)} className={isAllStates ? 'active' : ''}>
All States
</a>
</li>
{localStates.map(state => (
<li key={state}>
<a onClick={() => setSelectedState(state)} className={selectedState === state ? 'active' : ''}>
{state}
</a>
</li>
))}
</ul>
</div>
<span className="text-sm text-gray-500">
Showing {filteredStores.length} of {stores.length} stores
</span>
@@ -246,7 +253,7 @@ export function IntelligenceStores() {
<tr
key={store.id}
className="hover:bg-gray-50 cursor-pointer"
onClick={() => navigate(`/admin/orchestrator/stores?storeId=${store.id}`)}
onClick={() => navigate(`/stores/list/${store.id}`)}
>
<td>
<span className="font-medium">{store.name}</span>
@@ -262,10 +269,10 @@ export function IntelligenceStores() {
)}
</td>
<td className="text-center">
<span className="font-mono">{store.skuCount.toLocaleString()}</span>
<span className="font-mono">{(store.skuCount || 0).toLocaleString()}</span>
</td>
<td className="text-center">
<span className="font-mono">{store.snapshotCount.toLocaleString()}</span>
<span className="font-mono">{(store.snapshotCount || 0).toLocaleString()}</span>
</td>
<td>
<span className={store.lastCrawl ? 'text-green-600' : 'text-gray-400'}>

File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More