Files
cannaiq/backend/scrape-with-jobs.ts
2025-11-28 19:45:44 -07:00

553 lines
18 KiB
TypeScript
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { firefox } from 'playwright';
import { pool } from './src/db/migrate.js';
import { getRandomProxy } from './src/utils/proxyManager.js';
const workerNum = process.argv[2] || `W${Date.now().toString().slice(-4)}`;
const dispensaryId = parseInt(process.argv[3] || '112', 10);
interface Product {
slug: string;
name: string;
brand?: string;
variant?: string;
description?: string;
regularPrice?: number;
salePrice?: number;
discountType?: string;
discountValue?: string;
thcPercentage?: number;
cbdPercentage?: number;
strainType?: string;
terpenes: string[];
effects: string[];
flavors: string[];
imageUrl?: string;
dutchieUrl: string;
inStock: boolean;
stockQuantity?: number;
stockStatus?: string;
}
interface Brand {
slug: string;
name: string;
url: string;
}
interface Job {
id: number;
brand_slug: string;
brand_name: string;
dispensary_id: number;
}
// Claim next available job from queue
async function claimNextJob(): Promise<Job | null> {
const client = await pool.connect();
try {
await client.query('BEGIN');
const result = await client.query(`
SELECT id, dispensary_id, brand_slug, brand_name
FROM brand_scrape_jobs
WHERE status = 'pending'
AND dispensary_id = $1
AND (retry_count < 3 OR retry_count IS NULL)
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
`, [dispensaryId]);
if (result.rows.length === 0) {
await client.query('ROLLBACK');
return null;
}
const job = result.rows[0];
// Update to in_progress
await client.query(`
UPDATE brand_scrape_jobs
SET status = 'in_progress',
worker_id = $1,
started_at = NOW(),
updated_at = NOW()
WHERE id = $2
`, [workerNum, job.id]);
await client.query('COMMIT');
return job;
} catch (error: any) {
await client.query('ROLLBACK');
console.error(`[${workerNum}] ❌ Error claiming job:`, error.message);
return null;
} finally {
client.release();
}
}
// Update job as completed
async function completeJob(jobId: number, productsFound: number, productsSaved: number) {
try {
await pool.query(`
UPDATE brand_scrape_jobs
SET status = 'completed',
completed_at = NOW(),
products_found = $1,
products_saved = $2,
updated_at = NOW()
WHERE id = $3
`, [productsFound, productsSaved, jobId]);
} catch (error: any) {
console.error(`[${workerNum}] ❌ Error completing job:`, error.message);
}
}
// Update job as failed
async function failJob(jobId: number, errorMessage: string) {
try {
await pool.query(`
UPDATE brand_scrape_jobs
SET status = 'failed',
error_message = $1,
retry_count = COALESCE(retry_count, 0) + 1,
updated_at = NOW()
WHERE id = $2
`, [errorMessage, jobId]);
} catch (error: any) {
console.error(`[${workerNum}] ❌ Error marking job as failed:`, error.message);
}
}
// Reset job back to pending for retry
async function resetJobForRetry(jobId: number) {
try {
await pool.query(`
UPDATE brand_scrape_jobs
SET status = 'pending',
worker_id = NULL,
error_message = NULL,
updated_at = NOW()
WHERE id = $1
`, [jobId]);
} catch (error: any) {
console.error(`[${workerNum}] ❌ Error resetting job:`, error.message);
}
}
async function scrapeProductsFromBrand(
menuUrl: string,
brand: Brand,
dispensaryId: number,
page: any
): Promise<Product[]> {
try {
const brandUrl = `${menuUrl}/brands/${brand.slug}`;
console.log(`[${workerNum}] 📄 Loading brand page: ${brandUrl}`);
await page.goto(brandUrl, {
waitUntil: 'domcontentloaded',
timeout: 60000
});
await page.waitForTimeout(3000);
// Scroll to load all products
for (let i = 0; i < 20; i++) {
await page.evaluate(() => window.scrollBy(0, window.innerHeight));
await page.waitForTimeout(1500);
}
// Extract product URLs from brand page
const productUrls = await page.evaluate(() => {
const productCards = Array.from(document.querySelectorAll('a[href*="/product/"]'));
const urls = new Set<string>();
productCards.forEach(card => {
const href = card.getAttribute('href') || '';
if (href.includes('/product/')) {
const fullUrl = href.startsWith('http') ? href : `https://dutchie.com${href}`;
urls.add(fullUrl);
}
});
return Array.from(urls);
});
console.log(`[${workerNum}] 📦 Found ${productUrls.length} product links on brand page`);
const products: Product[] = [];
// Visit each product detail page
for (let i = 0; i < productUrls.length; i++) {
const productUrl = productUrls[i];
try {
console.log(`[${workerNum}] [${i + 1}/${productUrls.length}] Loading: ${productUrl}`);
await page.goto(productUrl, {
waitUntil: 'domcontentloaded',
timeout: 30000
});
await page.waitForTimeout(2000);
// Extract complete product data from detail page
const productData = await page.evaluate((brandName: string) => {
// Try JSON-LD structured data first
const scripts = Array.from(document.querySelectorAll('script[type="application/ld+json"]'));
let jsonLdData: any = null;
for (const script of scripts) {
try {
const data = JSON.parse(script.textContent || '');
if (data['@type'] === 'Product') {
jsonLdData = data;
break;
}
} catch (e) {}
}
// Get slug from URL
const slug = window.location.pathname.split('/product/')[1]?.replace(/\/$/, '') || '';
// Extract name
let name = '';
if (jsonLdData && jsonLdData.name) {
name = jsonLdData.name;
} else {
const h1 = document.querySelector('h1');
name = h1?.textContent?.trim() || '';
}
// Extract prices
let regularPrice: number | undefined;
let salePrice: number | undefined;
let discountType: string | undefined;
let discountValue: string | undefined;
if (jsonLdData && jsonLdData.offers && jsonLdData.offers.price) {
regularPrice = parseFloat(jsonLdData.offers.price);
} else {
// Fallback: extract from page text
const pageText = document.body.textContent || '';
const priceMatches = pageText.match(/\$(\d+\.?\d*)/g);
if (priceMatches && priceMatches.length > 0) {
const prices = priceMatches.map(p => parseFloat(p.replace('$', '')));
if (prices.length >= 2) {
salePrice = Math.min(prices[0], prices[1]);
regularPrice = Math.max(prices[0], prices[1]);
} else if (prices.length === 1) {
regularPrice = prices[0];
}
}
}
// Look for discount indicators
const discountElements = Array.from(document.querySelectorAll('[class*="Discount"], [class*="discount"], [class*="Sale"], [class*="sale"], [class*="special"], [class*="Special"]'));
if (discountElements.length > 0 && regularPrice) {
// Try to find discount text
let discountText = '';
for (const el of discountElements) {
const text = el.textContent?.trim() || '';
if (text.toLowerCase().includes('off') || text.includes('%') || text.includes('$')) {
discountText = text;
break;
}
}
// Parse discount percentage (e.g., "25% off", "25% Off")
const percentMatch = discountText.match(/(\d+)\s*%\s*off/i);
if (percentMatch) {
const percent = parseFloat(percentMatch[1]);
discountType = 'percentage';
discountValue = `${percent}% Off`;
salePrice = parseFloat((regularPrice * (1 - percent / 100)).toFixed(2));
} else {
// Parse dollar discount (e.g., "$10 off", "$10 Off")
const dollarMatch = discountText.match(/\$(\d+\.?\d*)\s*off/i);
if (dollarMatch) {
const dollarAmount = parseFloat(dollarMatch[1]);
discountType = 'dollar';
discountValue = `$${dollarAmount} Off`;
salePrice = parseFloat((regularPrice - dollarAmount).toFixed(2));
}
}
}
// Extract description
let description: string | undefined;
if (jsonLdData && jsonLdData.description) {
description = jsonLdData.description;
}
// Extract variant, THC, CBD from page
const pageText = document.body.textContent || '';
const thcMatch = pageText.match(/THC[:\s]*(\d+\.?\d*)\s*%/i);
const cbdMatch = pageText.match(/CBD[:\s]*(\d+\.?\d*)\s*%/i);
const thc = thcMatch ? parseFloat(thcMatch[1]) : undefined;
const cbd = cbdMatch ? parseFloat(cbdMatch[1]) : undefined;
// Strain type
let strainType: string | undefined;
if (pageText.match(/indica/i)) strainType = 'Indica';
else if (pageText.match(/sativa/i)) strainType = 'Sativa';
else if (pageText.match(/hybrid/i)) strainType = 'Hybrid';
// Stock status
const pageTextLower = pageText.toLowerCase();
let inStock = true;
let stockQuantity: number | undefined;
let stockStatus: string | undefined;
if (pageTextLower.includes('out of stock')) {
inStock = false;
stockStatus = 'out of stock';
} else {
const quantityMatch = pageText.match(/(\d+)\s+left\s+in\s+stock/i);
if (quantityMatch) {
stockQuantity = parseInt(quantityMatch[1]);
stockStatus = `${stockQuantity} left in stock`;
} else if (jsonLdData && jsonLdData.offers && jsonLdData.offers.availability) {
inStock = jsonLdData.offers.availability.includes('InStock');
stockStatus = inStock ? 'in stock' : 'out of stock';
}
}
// Image URL
let imageUrl: string | undefined;
if (jsonLdData && jsonLdData.image) {
imageUrl = Array.isArray(jsonLdData.image) ? jsonLdData.image[0] : jsonLdData.image;
} else {
const img = document.querySelector('img[src*="product"]');
if (img) {
imageUrl = img.getAttribute('src') || undefined;
}
}
return {
slug,
name,
brand: brandName,
description,
regularPrice,
salePrice,
discountType,
discountValue,
thcPercentage: thc,
cbdPercentage: cbd,
strainType,
imageUrl,
inStock,
stockQuantity,
stockStatus
};
}, brand.name);
products.push({
...productData,
dutchieUrl: productUrl,
variant: undefined,
terpenes: [],
effects: [],
flavors: []
});
console.log(`[${workerNum}] ✅ ${productData.name} - $${productData.regularPrice?.toFixed(2) || 'N/A'}`);
// Small delay between product pages
await page.waitForTimeout(500);
} catch (error: any) {
console.log(`[${workerNum}] ⚠️ Error loading product: ${error.message}`);
}
}
console.log(`[${workerNum}] ✅ Extracted ${products.length} products from brand: ${brand.name}`);
return products;
} catch (error: any) {
console.error(`[${workerNum}] ❌ Error scraping brand ${brand.name}:`, error.message);
throw error;
}
}
async function saveProduct(product: Product, dispensaryId: number): Promise<boolean> {
const client = await pool.connect();
try {
await client.query('BEGIN');
const existing = await client.query(
'SELECT id FROM products WHERE dispensary_id = $1 AND slug = $2',
[dispensaryId, product.slug]
);
let productId: number;
if (existing.rows.length > 0) {
productId = existing.rows[0].id;
await client.query(`
UPDATE products
SET name = $1, brand = $2, variant = $3, description = $4,
regular_price = $5, sale_price = $6,
discount_type = $7, discount_value = $8,
thc_percentage = $9, cbd_percentage = $10, strain_type = $11,
terpenes = $12, effects = $13, flavors = $14,
image_url = $15, dutchie_url = $16, in_stock = $17,
stock_quantity = $18, stock_status = $19,
last_seen_at = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
WHERE id = $20
`, [
product.name, product.brand, product.variant, product.description,
product.regularPrice, product.salePrice,
product.discountType, product.discountValue,
product.thcPercentage, product.cbdPercentage, product.strainType,
product.terpenes, product.effects, product.flavors,
product.imageUrl, product.dutchieUrl, product.inStock,
product.stockQuantity, product.stockStatus,
productId
]);
} else {
const result = await client.query(`
INSERT INTO products (
dispensary_id, slug, name, brand, variant, description,
regular_price, sale_price, discount_type, discount_value,
thc_percentage, cbd_percentage, strain_type,
terpenes, effects, flavors,
image_url, dutchie_url, in_stock, stock_quantity, stock_status
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)
RETURNING id
`, [
dispensaryId, product.slug, product.name, product.brand, product.variant, product.description,
product.regularPrice, product.salePrice, product.discountType, product.discountValue,
product.thcPercentage, product.cbdPercentage, product.strainType,
product.terpenes, product.effects, product.flavors,
product.imageUrl, product.dutchieUrl, product.inStock, product.stockQuantity, product.stockStatus
]);
productId = result.rows[0].id;
}
await client.query('COMMIT');
return true;
} catch (error: any) {
await client.query('ROLLBACK');
console.error(`[${workerNum}] ❌ Error saving product:`, error.message);
return false;
} finally {
client.release();
}
}
async function main() {
console.log(`\n${'='.repeat(60)}`);
console.log(`🚀 JOB-BASED SCRAPER - ${workerNum}`);
console.log(` Dispensary ID: ${dispensaryId}`);
console.log(`${'='.repeat(60)}\n`);
// Get dispensary info
const dispensaryResult = await pool.query(
"SELECT id, name, menu_url FROM dispensaries WHERE id = $1",
[dispensaryId]
);
if (dispensaryResult.rows.length === 0) {
console.error(`[${workerNum}] ❌ Dispensary ID ${dispensaryId} not found`);
process.exit(1);
}
const menuUrl = dispensaryResult.rows[0].menu_url;
console.log(`[${workerNum}] ✅ Dispensary: ${dispensaryResult.rows[0].name}`);
// Get proxy
const proxy = await getRandomProxy();
if (!proxy) {
console.log(`[${workerNum}] ❌ No proxy available`);
process.exit(1);
}
console.log(`[${workerNum}] 🔐 Using proxy: ${proxy.server}`);
// Launch browser
const browser = await firefox.launch({ headless: true });
const context = await browser.newContext({
viewport: { width: 1920, height: 1080 },
userAgent: 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
proxy: {
server: proxy.server,
username: proxy.username,
password: proxy.password
}
});
const page = await context.newPage();
let jobsProcessed = 0;
let totalProducts = 0;
let totalSaved = 0;
// Keep claiming and processing jobs
while (true) {
const job = await claimNextJob();
if (!job) {
console.log(`[${workerNum}] No more jobs available`);
break;
}
console.log(`\n[${workerNum}] ${'='.repeat(60)}`);
console.log(`[${workerNum}] 🏷️ JOB #${job.id}: ${job.brand_name}`);
console.log(`[${workerNum}] ${'='.repeat(60)}\n`);
try {
const brand: Brand = {
slug: job.brand_slug,
name: job.brand_name,
url: `${menuUrl}/brands/${job.brand_slug}`
};
const products = await scrapeProductsFromBrand(menuUrl, brand, dispensaryId, page);
const productsFound = products.length;
let productsSaved = 0;
// Save products
for (const product of products) {
const saved = await saveProduct(product, dispensaryId);
if (saved) productsSaved++;
}
await completeJob(job.id, productsFound, productsSaved);
jobsProcessed++;
totalProducts += productsFound;
totalSaved += productsSaved;
console.log(`[${workerNum}] ✅ Job #${job.id} complete: ${productsFound} found, ${productsSaved} saved`);
await page.waitForTimeout(2000);
} catch (error: any) {
console.error(`[${workerNum}] ❌ Job #${job.id} failed:`, error.message);
await failJob(job.id, error.message);
}
}
console.log(`\n[${workerNum}] ${'='.repeat(60)}`);
console.log(`[${workerNum}] ✅ WORKER ${workerNum} COMPLETE`);
console.log(`[${workerNum}] Jobs processed: ${jobsProcessed}`);
console.log(`[${workerNum}] Total products: ${totalProducts}`);
console.log(`[${workerNum}] Products saved: ${totalSaved}`);
console.log(`[${workerNum}] ${'='.repeat(60)}\n`);
await browser.close();
await pool.end();
}
main().catch(console.error);