diff --git a/backend/src/tasks/task-worker.ts b/backend/src/tasks/task-worker.ts index 03b4d79c..39604d45 100644 --- a/backend/src/tasks/task-worker.ts +++ b/backend/src/tasks/task-worker.ts @@ -116,23 +116,41 @@ export class TaskWorker { * Initialize stealth systems (proxy rotation, fingerprints) * Called once on worker startup before processing any tasks. * - * IMPORTANT: Proxies are REQUIRED. Workers will fail to start if no proxies available. + * IMPORTANT: Proxies are REQUIRED. Workers will wait until proxies are available. */ private async initializeStealth(): Promise { - // Load proxies from database - await this.crawlRotator.initialize(); + const MAX_WAIT_MINUTES = 60; + const RETRY_INTERVAL_MS = 30000; // 30 seconds + const maxAttempts = (MAX_WAIT_MINUTES * 60 * 1000) / RETRY_INTERVAL_MS; + let attempts = 0; - const stats = this.crawlRotator.proxy.getStats(); - if (stats.activeProxies === 0) { - throw new Error('No active proxies available. Workers MUST use proxies for all requests. Add proxies to the database before starting workers.'); + while (attempts < maxAttempts) { + try { + // Load proxies from database + await this.crawlRotator.initialize(); + + const stats = this.crawlRotator.proxy.getStats(); + if (stats.activeProxies > 0) { + console.log(`[TaskWorker] Loaded ${stats.activeProxies} proxies (${stats.avgSuccessRate.toFixed(1)}% avg success rate)`); + + // Wire rotator to Dutchie client - proxies will be used for ALL requests + setCrawlRotator(this.crawlRotator); + + console.log(`[TaskWorker] Stealth initialized: ${this.crawlRotator.userAgent.getCount()} fingerprints, proxy REQUIRED for all requests`); + return; + } + + attempts++; + console.log(`[TaskWorker] No active proxies available (attempt ${attempts}). Waiting ${RETRY_INTERVAL_MS / 1000}s for proxies to be added...`); + await this.sleep(RETRY_INTERVAL_MS); + } catch (error: any) { + attempts++; + console.log(`[TaskWorker] Error loading proxies (attempt ${attempts}): ${error.message}. Retrying in ${RETRY_INTERVAL_MS / 1000}s...`); + await this.sleep(RETRY_INTERVAL_MS); + } } - console.log(`[TaskWorker] Loaded ${stats.activeProxies} proxies (${stats.avgSuccessRate.toFixed(1)}% avg success rate)`); - - // Wire rotator to Dutchie client - proxies will be used for ALL requests - setCrawlRotator(this.crawlRotator); - - console.log(`[TaskWorker] Stealth initialized: ${this.crawlRotator.userAgent.getCount()} fingerprints, proxy REQUIRED for all requests`); + throw new Error(`No active proxies available after waiting ${MAX_WAIT_MINUTES} minutes. Add proxies to the database.`); } /** diff --git a/k8s/scraper-worker.yaml b/k8s/scraper-worker.yaml index ea47e77d..ddff6f34 100644 --- a/k8s/scraper-worker.yaml +++ b/k8s/scraper-worker.yaml @@ -1,4 +1,67 @@ -# Task Worker Pods +# Task Worker Deployment +# +# Simple Deployment that runs task-worker.js to process tasks from worker_tasks queue. +# Workers pull tasks using DB-level locking (FOR UPDATE SKIP LOCKED). +# +# The worker will wait up to 60 minutes for active proxies to be added before failing. +# This allows deployment to succeed even if proxies aren't configured yet. +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: scraper-worker + namespace: dispensary-scraper +spec: + replicas: 5 + selector: + matchLabels: + app: scraper-worker + template: + metadata: + labels: + app: scraper-worker + spec: + imagePullSecrets: + - name: regcred + containers: + - name: worker + image: code.cannabrands.app/creationshop/dispensary-scraper:latest + command: ["node"] + args: ["dist/tasks/task-worker.js"] + envFrom: + - configMapRef: + name: scraper-config + - secretRef: + name: scraper-secrets + env: + - name: WORKER_MODE + value: "true" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "500m" + livenessProbe: + exec: + command: + - /bin/sh + - -c + - "pgrep -f 'task-worker' > /dev/null" + initialDelaySeconds: 60 + periodSeconds: 30 + failureThreshold: 3 + terminationGracePeriodSeconds: 60 +--- +# ============================================================================= +# ALTERNATIVE: StatefulSet with multiple workers per pod (not currently used) +# ============================================================================= +# Task Worker Pods (StatefulSet) # Each pod runs 5 role-agnostic workers that pull tasks from worker_tasks queue. # # Architecture: