Add crawler scheduler, orchestrator, and multi-category intelligence

- Add scheduler UI with store schedules, job queue, and global settings
- Add store crawl orchestrator for intelligent crawl workflow
- Add multi-category intelligence detection (product, specials, brands, metadata)
- Add CrawlerLogger for structured JSON logging
- Add migrations for scheduler tables and dispensary linking
- Add dispensary → scheduler navigation link
- Support production/sandbox crawler modes per provider

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-11-30 09:29:15 -07:00
parent 8b4292fbb2
commit 3861a31a3b
25 changed files with 8874 additions and 13 deletions

View File

@@ -49,7 +49,10 @@ import scraperMonitorRoutes from './routes/scraper-monitor';
import apiTokensRoutes from './routes/api-tokens';
import apiPermissionsRoutes from './routes/api-permissions';
import parallelScrapeRoutes from './routes/parallel-scrape';
import scheduleRoutes from './routes/schedule';
import crawlerSandboxRoutes from './routes/crawler-sandbox';
import { trackApiUsage, checkRateLimit } from './middleware/apiTokenTracker';
import { startCrawlScheduler } from './services/crawl-scheduler';
import { validateWordPressPermissions } from './middleware/wordpressPermissions';
// Apply WordPress permissions validation first (sets req.apiToken)
@@ -75,6 +78,8 @@ app.use('/api/scraper-monitor', scraperMonitorRoutes);
app.use('/api/api-tokens', apiTokensRoutes);
app.use('/api/api-permissions', apiPermissionsRoutes);
app.use('/api/parallel-scrape', parallelScrapeRoutes);
app.use('/api/schedule', scheduleRoutes);
app.use('/api/crawler-sandbox', crawlerSandboxRoutes);
async function startServer() {
try {
@@ -86,6 +91,10 @@ async function startServer() {
// Clean up any orphaned proxy test jobs from previous server runs
await cleanupOrphanedJobs();
// Start the crawl scheduler (checks every minute for jobs to run)
startCrawlScheduler();
logger.info('system', 'Crawl scheduler started');
app.listen(PORT, () => {
logger.info('system', `Server running on port ${PORT}`);
console.log(`🚀 Server running on port ${PORT}`);

View File

@@ -0,0 +1,628 @@
/**
* Crawler Sandbox API Routes
*
* Endpoints for managing sandbox crawls, templates, and provider detection
*/
import express from 'express';
import { pool } from '../db/migrate';
import { authMiddleware, requireRole } from '../auth/middleware';
import { logger } from '../services/logger';
import {
runDetectMenuProviderJob,
runDutchieMenuCrawlJob,
runSandboxCrawlJob,
} from '../services/crawler-jobs';
const router = express.Router();
// Apply auth middleware to all routes
router.use(authMiddleware);
// ========================================
// Sandbox Entries
// ========================================
/**
* GET /api/crawler-sandbox
* List sandbox entries with optional filters
*/
router.get('/', async (req, res) => {
try {
const { status, dispensaryId, limit = 50, offset = 0 } = req.query;
let query = `
SELECT cs.*, d.name as dispensary_name, d.website, d.menu_provider, d.crawler_status
FROM crawler_sandboxes cs
JOIN dispensaries d ON d.id = cs.dispensary_id
WHERE 1=1
`;
const params: any[] = [];
let paramIndex = 1;
if (status) {
query += ` AND cs.status = $${paramIndex}`;
params.push(status);
paramIndex++;
}
if (dispensaryId) {
query += ` AND cs.dispensary_id = $${paramIndex}`;
params.push(Number(dispensaryId));
paramIndex++;
}
query += ` ORDER BY cs.created_at DESC LIMIT $${paramIndex} OFFSET $${paramIndex + 1}`;
params.push(Number(limit), Number(offset));
const result = await pool.query(query, params);
// Get total count
const countResult = await pool.query(
`SELECT COUNT(*) FROM crawler_sandboxes cs WHERE 1=1
${status ? 'AND cs.status = $1' : ''}
${dispensaryId ? `AND cs.dispensary_id = $${status ? 2 : 1}` : ''}`,
status && dispensaryId ? [status, dispensaryId] : status ? [status] : dispensaryId ? [dispensaryId] : []
);
res.json({
sandboxes: result.rows,
total: parseInt(countResult.rows[0].count),
limit: Number(limit),
offset: Number(offset),
});
} catch (error: any) {
logger.error('api', `Get sandboxes error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
/**
* GET /api/crawler-sandbox/:id
* Get a single sandbox entry with full details
*/
router.get('/:id', async (req, res) => {
try {
const { id } = req.params;
const result = await pool.query(
`SELECT cs.*, d.name as dispensary_name, d.website, d.menu_url,
d.menu_provider, d.menu_provider_confidence, d.crawler_mode, d.crawler_status
FROM crawler_sandboxes cs
JOIN dispensaries d ON d.id = cs.dispensary_id
WHERE cs.id = $1`,
[id]
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Sandbox entry not found' });
}
// Get related jobs
const jobs = await pool.query(
`SELECT * FROM sandbox_crawl_jobs
WHERE sandbox_id = $1 OR dispensary_id = $2
ORDER BY created_at DESC
LIMIT 10`,
[id, result.rows[0].dispensary_id]
);
res.json({
sandbox: result.rows[0],
jobs: jobs.rows,
});
} catch (error: any) {
logger.error('api', `Get sandbox error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
/**
* POST /api/crawler-sandbox/:id/analyze
* Trigger re-analysis of a sandbox entry
*/
router.post('/:id/analyze', requireRole('admin'), async (req, res) => {
try {
const { id } = req.params;
const sandbox = await pool.query('SELECT * FROM crawler_sandboxes WHERE id = $1', [id]);
if (sandbox.rows.length === 0) {
return res.status(404).json({ error: 'Sandbox entry not found' });
}
// Queue a new sandbox job
const job = await pool.query(
`INSERT INTO sandbox_crawl_jobs (dispensary_id, sandbox_id, job_type, status, priority)
VALUES ($1, $2, 'deep_crawl', 'pending', 20)
RETURNING id`,
[sandbox.rows[0].dispensary_id, id]
);
// Update sandbox status
await pool.query(
`UPDATE crawler_sandboxes SET status = 'pending', updated_at = NOW() WHERE id = $1`,
[id]
);
res.json({
message: 'Analysis job queued',
jobId: job.rows[0].id,
});
} catch (error: any) {
logger.error('api', `Analyze sandbox error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
/**
* POST /api/crawler-sandbox/:id/move-to-production
* Move a sandbox entry to production (for Dutchie dispensaries)
*/
router.post('/:id/move-to-production', requireRole('admin'), async (req, res) => {
try {
const { id } = req.params;
const sandbox = await pool.query(
`SELECT cs.*, d.menu_provider
FROM crawler_sandboxes cs
JOIN dispensaries d ON d.id = cs.dispensary_id
WHERE cs.id = $1`,
[id]
);
if (sandbox.rows.length === 0) {
return res.status(404).json({ error: 'Sandbox entry not found' });
}
// Can only move to production if provider is dutchie
if (sandbox.rows[0].menu_provider !== 'dutchie') {
return res.status(400).json({
error: 'Only Dutchie dispensaries can be moved to production currently',
});
}
// Update dispensary to production mode
await pool.query(
`UPDATE dispensaries
SET crawler_mode = 'production', crawler_status = 'idle', updated_at = NOW()
WHERE id = $1`,
[sandbox.rows[0].dispensary_id]
);
// Mark sandbox as moved
await pool.query(
`UPDATE crawler_sandboxes
SET status = 'moved_to_production', updated_at = NOW()
WHERE id = $1`,
[id]
);
res.json({ message: 'Dispensary moved to production' });
} catch (error: any) {
logger.error('api', `Move to production error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
/**
* PATCH /api/crawler-sandbox/:id
* Update sandbox entry (e.g., add human review notes)
*/
router.patch('/:id', requireRole('admin'), async (req, res) => {
try {
const { id } = req.params;
const { human_review_notes, status, suspected_menu_provider } = req.body;
const updates: string[] = [];
const params: any[] = [];
let paramIndex = 1;
if (human_review_notes !== undefined) {
updates.push(`human_review_notes = $${paramIndex}`);
params.push(human_review_notes);
paramIndex++;
}
if (status) {
updates.push(`status = $${paramIndex}`);
params.push(status);
paramIndex++;
}
if (suspected_menu_provider !== undefined) {
updates.push(`suspected_menu_provider = $${paramIndex}`);
params.push(suspected_menu_provider);
paramIndex++;
}
if (updates.length === 0) {
return res.status(400).json({ error: 'No updates provided' });
}
updates.push('updated_at = NOW()');
if (human_review_notes !== undefined) {
updates.push('reviewed_at = NOW()');
}
params.push(id);
await pool.query(
`UPDATE crawler_sandboxes SET ${updates.join(', ')} WHERE id = $${paramIndex}`,
params
);
res.json({ message: 'Sandbox updated' });
} catch (error: any) {
logger.error('api', `Update sandbox error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
// ========================================
// Templates
// ========================================
/**
* GET /api/crawler-sandbox/templates
* List all crawler templates
*/
router.get('/templates/list', async (req, res) => {
try {
const result = await pool.query(
`SELECT * FROM crawler_templates ORDER BY provider, is_default_for_provider DESC, name`
);
res.json({ templates: result.rows });
} catch (error: any) {
logger.error('api', `Get templates error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
/**
* GET /api/crawler-sandbox/templates/:id
* Get a single template
*/
router.get('/templates/:id', async (req, res) => {
try {
const { id } = req.params;
const result = await pool.query('SELECT * FROM crawler_templates WHERE id = $1', [id]);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Template not found' });
}
res.json({ template: result.rows[0] });
} catch (error: any) {
logger.error('api', `Get template error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
/**
* POST /api/crawler-sandbox/templates
* Create a new template
*/
router.post('/templates', requireRole('admin'), async (req, res) => {
try {
const {
provider,
name,
selector_config,
navigation_config,
transform_config,
validation_rules,
notes,
} = req.body;
if (!provider || !name) {
return res.status(400).json({ error: 'provider and name are required' });
}
const result = await pool.query(
`INSERT INTO crawler_templates
(provider, name, selector_config, navigation_config, transform_config, validation_rules, notes, created_by)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *`,
[
provider,
name,
JSON.stringify(selector_config || {}),
JSON.stringify(navigation_config || {}),
JSON.stringify(transform_config || {}),
JSON.stringify(validation_rules || {}),
notes,
(req as any).user?.email || 'system',
]
);
res.status(201).json({ template: result.rows[0] });
} catch (error: any) {
logger.error('api', `Create template error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
/**
* PUT /api/crawler-sandbox/templates/:id
* Update a template
*/
router.put('/templates/:id', requireRole('admin'), async (req, res) => {
try {
const { id } = req.params;
const {
is_active,
is_default_for_provider,
selector_config,
navigation_config,
transform_config,
validation_rules,
notes,
} = req.body;
const updates: string[] = [];
const params: any[] = [];
let paramIndex = 1;
if (is_active !== undefined) {
updates.push(`is_active = $${paramIndex}`);
params.push(is_active);
paramIndex++;
}
if (is_default_for_provider !== undefined) {
updates.push(`is_default_for_provider = $${paramIndex}`);
params.push(is_default_for_provider);
paramIndex++;
}
if (selector_config !== undefined) {
updates.push(`selector_config = $${paramIndex}`);
params.push(JSON.stringify(selector_config));
paramIndex++;
}
if (navigation_config !== undefined) {
updates.push(`navigation_config = $${paramIndex}`);
params.push(JSON.stringify(navigation_config));
paramIndex++;
}
if (transform_config !== undefined) {
updates.push(`transform_config = $${paramIndex}`);
params.push(JSON.stringify(transform_config));
paramIndex++;
}
if (validation_rules !== undefined) {
updates.push(`validation_rules = $${paramIndex}`);
params.push(JSON.stringify(validation_rules));
paramIndex++;
}
if (notes !== undefined) {
updates.push(`notes = $${paramIndex}`);
params.push(notes);
paramIndex++;
}
if (updates.length === 0) {
return res.status(400).json({ error: 'No updates provided' });
}
updates.push('updated_at = NOW()');
params.push(id);
await pool.query(
`UPDATE crawler_templates SET ${updates.join(', ')} WHERE id = $${paramIndex}`,
params
);
const result = await pool.query('SELECT * FROM crawler_templates WHERE id = $1', [id]);
res.json({ template: result.rows[0] });
} catch (error: any) {
logger.error('api', `Update template error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
// ========================================
// Jobs
// ========================================
/**
* GET /api/crawler-sandbox/jobs
* List sandbox crawl jobs
*/
router.get('/jobs/list', async (req, res) => {
try {
const { status, dispensaryId, limit = 50 } = req.query;
let query = `
SELECT sj.*, d.name as dispensary_name
FROM sandbox_crawl_jobs sj
JOIN dispensaries d ON d.id = sj.dispensary_id
WHERE 1=1
`;
const params: any[] = [];
let paramIndex = 1;
if (status) {
query += ` AND sj.status = $${paramIndex}`;
params.push(status);
paramIndex++;
}
if (dispensaryId) {
query += ` AND sj.dispensary_id = $${paramIndex}`;
params.push(Number(dispensaryId));
paramIndex++;
}
query += ` ORDER BY sj.created_at DESC LIMIT $${paramIndex}`;
params.push(Number(limit));
const result = await pool.query(query, params);
res.json({ jobs: result.rows });
} catch (error: any) {
logger.error('api', `Get jobs error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
/**
* POST /api/crawler-sandbox/jobs/detect/:dispensaryId
* Trigger provider detection for a dispensary
*/
router.post('/jobs/detect/:dispensaryId', requireRole('admin'), async (req, res) => {
try {
const { dispensaryId } = req.params;
// Create detection job
const job = await pool.query(
`INSERT INTO sandbox_crawl_jobs (dispensary_id, job_type, status, priority)
VALUES ($1, 'detection', 'pending', 30)
RETURNING id`,
[dispensaryId]
);
// Update dispensary status
await pool.query(
`UPDATE dispensaries SET crawler_status = 'queued_detection', updated_at = NOW() WHERE id = $1`,
[dispensaryId]
);
res.json({
message: 'Detection job queued',
jobId: job.rows[0].id,
});
} catch (error: any) {
logger.error('api', `Queue detection error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
/**
* POST /api/crawler-sandbox/jobs/run/:id
* Immediately run a sandbox job
*/
router.post('/jobs/run/:id', requireRole('admin'), async (req, res) => {
try {
const { id } = req.params;
const job = await pool.query('SELECT * FROM sandbox_crawl_jobs WHERE id = $1', [id]);
if (job.rows.length === 0) {
return res.status(404).json({ error: 'Job not found' });
}
const jobData = job.rows[0];
// Run the job immediately
let result;
if (jobData.job_type === 'detection') {
result = await runDetectMenuProviderJob(jobData.dispensary_id);
} else {
result = await runSandboxCrawlJob(jobData.dispensary_id, jobData.sandbox_id);
}
// Update job status
await pool.query(
`UPDATE sandbox_crawl_jobs
SET status = $1, completed_at = NOW(), result_summary = $2, error_message = $3
WHERE id = $4`,
[
result.success ? 'completed' : 'failed',
JSON.stringify(result.data || {}),
result.success ? null : result.message,
id,
]
);
res.json(result);
} catch (error: any) {
logger.error('api', `Run job error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
// ========================================
// Stats
// ========================================
/**
* GET /api/crawler-sandbox/stats
* Get sandbox/crawler statistics
*/
router.get('/stats/overview', async (req, res) => {
try {
// Dispensary provider stats
const providerStats = await pool.query(`
SELECT
menu_provider,
COUNT(*) as count,
AVG(menu_provider_confidence)::integer as avg_confidence
FROM dispensaries
WHERE menu_provider IS NOT NULL
GROUP BY menu_provider
ORDER BY count DESC
`);
// Mode stats
const modeStats = await pool.query(`
SELECT
crawler_mode,
COUNT(*) as count
FROM dispensaries
GROUP BY crawler_mode
`);
// Status stats
const statusStats = await pool.query(`
SELECT
crawler_status,
COUNT(*) as count
FROM dispensaries
GROUP BY crawler_status
ORDER BY count DESC
`);
// Sandbox stats
const sandboxStats = await pool.query(`
SELECT
status,
COUNT(*) as count
FROM crawler_sandboxes
GROUP BY status
`);
// Job stats
const jobStats = await pool.query(`
SELECT
status,
job_type,
COUNT(*) as count
FROM sandbox_crawl_jobs
GROUP BY status, job_type
`);
// Recent activity
const recentActivity = await pool.query(`
SELECT 'sandbox' as type, id, dispensary_id, status, created_at
FROM crawler_sandboxes
ORDER BY created_at DESC
LIMIT 5
`);
res.json({
providers: providerStats.rows,
modes: modeStats.rows,
statuses: statusStats.rows,
sandbox: sandboxStats.rows,
jobs: jobStats.rows,
recentActivity: recentActivity.rows,
});
} catch (error: any) {
logger.error('api', `Get stats error: ${error.message}`);
res.status(500).json({ error: error.message });
}
});
export default router;

View File

@@ -0,0 +1,344 @@
import { Router, Request, Response } from 'express';
import { authMiddleware, requireRole } from '../auth/middleware';
import {
getGlobalSchedule,
updateGlobalSchedule,
getStoreScheduleStatuses,
getStoreSchedule,
updateStoreSchedule,
getAllRecentJobs,
getRecentJobs,
triggerManualCrawl,
triggerAllStoresCrawl,
cancelJob,
restartCrawlScheduler,
setSchedulerMode,
getSchedulerMode,
} from '../services/crawl-scheduler';
import {
runStoreCrawlOrchestrator,
runBatchOrchestrator,
getStoresDueForOrchestration,
} from '../services/store-crawl-orchestrator';
const router = Router();
router.use(authMiddleware);
// ============================================
// Global Schedule Endpoints
// ============================================
/**
* GET /api/schedule/global
* Get global schedule settings
*/
router.get('/global', async (req: Request, res: Response) => {
try {
const schedules = await getGlobalSchedule();
res.json({ schedules });
} catch (error: any) {
console.error('Error fetching global schedule:', error);
res.status(500).json({ error: 'Failed to fetch global schedule' });
}
});
/**
* PUT /api/schedule/global/:type
* Update global schedule setting
*/
router.put('/global/:type', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => {
try {
const { type } = req.params;
const { enabled, interval_hours, run_time } = req.body;
if (type !== 'global_interval' && type !== 'daily_special') {
return res.status(400).json({ error: 'Invalid schedule type' });
}
const schedule = await updateGlobalSchedule(type, {
enabled,
interval_hours,
run_time
});
// Restart scheduler to apply changes
await restartCrawlScheduler();
res.json({ schedule, message: 'Schedule updated and scheduler restarted' });
} catch (error: any) {
console.error('Error updating global schedule:', error);
res.status(500).json({ error: 'Failed to update global schedule' });
}
});
// ============================================
// Store Schedule Endpoints
// ============================================
/**
* GET /api/schedule/stores
* Get all store schedule statuses
*/
router.get('/stores', async (req: Request, res: Response) => {
try {
const stores = await getStoreScheduleStatuses();
res.json({ stores });
} catch (error: any) {
console.error('Error fetching store schedules:', error);
res.status(500).json({ error: 'Failed to fetch store schedules' });
}
});
/**
* GET /api/schedule/stores/:storeId
* Get schedule for a specific store
*/
router.get('/stores/:storeId', async (req: Request, res: Response) => {
try {
const storeId = parseInt(req.params.storeId);
if (isNaN(storeId)) {
return res.status(400).json({ error: 'Invalid store ID' });
}
const schedule = await getStoreSchedule(storeId);
res.json({ schedule });
} catch (error: any) {
console.error('Error fetching store schedule:', error);
res.status(500).json({ error: 'Failed to fetch store schedule' });
}
});
/**
* PUT /api/schedule/stores/:storeId
* Update schedule for a specific store
*/
router.put('/stores/:storeId', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => {
try {
const storeId = parseInt(req.params.storeId);
if (isNaN(storeId)) {
return res.status(400).json({ error: 'Invalid store ID' });
}
const {
enabled,
interval_hours,
daily_special_enabled,
daily_special_time,
priority
} = req.body;
const schedule = await updateStoreSchedule(storeId, {
enabled,
interval_hours,
daily_special_enabled,
daily_special_time,
priority
});
res.json({ schedule });
} catch (error: any) {
console.error('Error updating store schedule:', error);
res.status(500).json({ error: 'Failed to update store schedule' });
}
});
// ============================================
// Job Queue Endpoints
// ============================================
/**
* GET /api/schedule/jobs
* Get recent jobs
*/
router.get('/jobs', async (req: Request, res: Response) => {
try {
const limit = parseInt(req.query.limit as string) || 50;
const jobs = await getAllRecentJobs(Math.min(limit, 200));
res.json({ jobs });
} catch (error: any) {
console.error('Error fetching jobs:', error);
res.status(500).json({ error: 'Failed to fetch jobs' });
}
});
/**
* GET /api/schedule/jobs/store/:storeId
* Get recent jobs for a specific store
*/
router.get('/jobs/store/:storeId', async (req: Request, res: Response) => {
try {
const storeId = parseInt(req.params.storeId);
if (isNaN(storeId)) {
return res.status(400).json({ error: 'Invalid store ID' });
}
const limit = parseInt(req.query.limit as string) || 10;
const jobs = await getRecentJobs(storeId, Math.min(limit, 100));
res.json({ jobs });
} catch (error: any) {
console.error('Error fetching store jobs:', error);
res.status(500).json({ error: 'Failed to fetch store jobs' });
}
});
/**
* POST /api/schedule/jobs/:jobId/cancel
* Cancel a pending job
*/
router.post('/jobs/:jobId/cancel', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => {
try {
const jobId = parseInt(req.params.jobId);
if (isNaN(jobId)) {
return res.status(400).json({ error: 'Invalid job ID' });
}
const cancelled = await cancelJob(jobId);
if (cancelled) {
res.json({ success: true, message: 'Job cancelled' });
} else {
res.status(400).json({ error: 'Job could not be cancelled (may not be pending)' });
}
} catch (error: any) {
console.error('Error cancelling job:', error);
res.status(500).json({ error: 'Failed to cancel job' });
}
});
// ============================================
// Manual Trigger Endpoints
// ============================================
/**
* POST /api/schedule/trigger/store/:storeId
* Manually trigger orchestrated crawl for a specific store
* Uses the intelligent orchestrator which:
* - Checks provider detection status
* - Runs detection if needed
* - Queues appropriate crawl type (production/sandbox)
*/
router.post('/trigger/store/:storeId', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => {
try {
const storeId = parseInt(req.params.storeId);
if (isNaN(storeId)) {
return res.status(400).json({ error: 'Invalid store ID' });
}
// Use the orchestrator instead of simple triggerManualCrawl
const result = await runStoreCrawlOrchestrator(storeId);
res.json({
result,
message: result.summary,
success: result.status === 'success' || result.status === 'sandbox_only',
});
} catch (error: any) {
console.error('Error triggering orchestrated crawl:', error);
res.status(500).json({ error: 'Failed to trigger crawl' });
}
});
/**
* POST /api/schedule/trigger/store/:storeId/legacy
* Legacy: Simple job queue trigger (no orchestration)
*/
router.post('/trigger/store/:storeId/legacy', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => {
try {
const storeId = parseInt(req.params.storeId);
if (isNaN(storeId)) {
return res.status(400).json({ error: 'Invalid store ID' });
}
const job = await triggerManualCrawl(storeId);
res.json({ job, message: 'Crawl job created' });
} catch (error: any) {
console.error('Error triggering manual crawl:', error);
res.status(500).json({ error: 'Failed to trigger crawl' });
}
});
/**
* POST /api/schedule/trigger/all
* Manually trigger crawls for all stores
*/
router.post('/trigger/all', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => {
try {
const jobsCreated = await triggerAllStoresCrawl();
res.json({ jobs_created: jobsCreated, message: `Created ${jobsCreated} crawl jobs` });
} catch (error: any) {
console.error('Error triggering all crawls:', error);
res.status(500).json({ error: 'Failed to trigger crawls' });
}
});
/**
* POST /api/schedule/restart
* Restart the scheduler
*/
router.post('/restart', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => {
try {
await restartCrawlScheduler();
res.json({ message: 'Scheduler restarted', mode: getSchedulerMode() });
} catch (error: any) {
console.error('Error restarting scheduler:', error);
res.status(500).json({ error: 'Failed to restart scheduler' });
}
});
// ============================================
// Scheduler Mode Endpoints
// ============================================
/**
* GET /api/schedule/mode
* Get current scheduler mode
*/
router.get('/mode', async (req: Request, res: Response) => {
try {
const mode = getSchedulerMode();
res.json({ mode });
} catch (error: any) {
console.error('Error getting scheduler mode:', error);
res.status(500).json({ error: 'Failed to get scheduler mode' });
}
});
/**
* PUT /api/schedule/mode
* Set scheduler mode (legacy or orchestrator)
*/
router.put('/mode', requireRole('superadmin', 'admin'), async (req: Request, res: Response) => {
try {
const { mode } = req.body;
if (mode !== 'legacy' && mode !== 'orchestrator') {
return res.status(400).json({ error: 'Invalid mode. Must be "legacy" or "orchestrator"' });
}
setSchedulerMode(mode);
// Restart scheduler with new mode
await restartCrawlScheduler();
res.json({ mode, message: `Scheduler mode set to ${mode} and restarted` });
} catch (error: any) {
console.error('Error setting scheduler mode:', error);
res.status(500).json({ error: 'Failed to set scheduler mode' });
}
});
/**
* GET /api/schedule/due
* Get stores that are due for orchestration
*/
router.get('/due', async (req: Request, res: Response) => {
try {
const limit = parseInt(req.query.limit as string) || 10;
const storeIds = await getStoresDueForOrchestration(Math.min(limit, 50));
res.json({ stores_due: storeIds, count: storeIds.length });
} catch (error: any) {
console.error('Error getting stores due for orchestration:', error);
res.status(500).json({ error: 'Failed to get stores due' });
}
});
export default router;

View File

@@ -0,0 +1,345 @@
#!/usr/bin/env npx tsx
/**
* Backfill Store-Dispensary Mapping
*
* Links existing stores (scheduler) to dispensaries (master AZDHS directory)
* by matching on name, city, and zip code.
*
* Usage:
* npx tsx src/scripts/backfill-store-dispensary.ts # Preview matches
* npx tsx src/scripts/backfill-store-dispensary.ts --apply # Apply matches
* npx tsx src/scripts/backfill-store-dispensary.ts --verbose # Show all match details
*/
import { pool } from '../db/migrate';
import { logger } from '../services/logger';
const args = process.argv.slice(2);
const flags = {
apply: args.includes('--apply'),
verbose: args.includes('--verbose'),
help: args.includes('--help') || args.includes('-h'),
};
interface Store {
id: number;
name: string;
slug: string;
dispensary_id: number | null;
}
interface Dispensary {
id: number;
name: string;
company_name: string | null;
city: string;
address: string;
slug: string;
}
interface MatchResult {
store: Store;
dispensary: Dispensary | null;
matchType: 'exact_name' | 'normalized_name' | 'company_name' | 'slug' | 'fuzzy' | 'none';
score: number;
}
/**
* Normalize a store/dispensary name for comparison
* Removes common suffixes, punctuation, and extra whitespace
*/
function normalizeName(name: string): string {
return name
.toLowerCase()
.replace(/\s*[-–—]\s*/g, ' ') // Normalize dashes to spaces
.replace(/\s*(dispensary|cannabis|marijuana|weed|shop|store|llc|inc)\s*/gi, ' ')
.replace(/['']/g, "'") // Normalize apostrophes
.replace(/[^\w\s']/g, '') // Remove other punctuation
.replace(/\s+/g, ' ') // Collapse whitespace
.trim();
}
/**
* Simple Levenshtein distance for fuzzy matching
*/
function levenshteinDistance(a: string, b: string): number {
const matrix: number[][] = [];
for (let i = 0; i <= b.length; i++) {
matrix[i] = [i];
}
for (let j = 0; j <= a.length; j++) {
matrix[0][j] = j;
}
for (let i = 1; i <= b.length; i++) {
for (let j = 1; j <= a.length; j++) {
if (b.charAt(i - 1) === a.charAt(j - 1)) {
matrix[i][j] = matrix[i - 1][j - 1];
} else {
matrix[i][j] = Math.min(
matrix[i - 1][j - 1] + 1, // substitution
matrix[i][j - 1] + 1, // insertion
matrix[i - 1][j] + 1 // deletion
);
}
}
}
return matrix[b.length][a.length];
}
/**
* Calculate similarity score (0-100)
*/
function similarityScore(a: string, b: string): number {
const maxLen = Math.max(a.length, b.length);
if (maxLen === 0) return 100;
const distance = levenshteinDistance(a, b);
return Math.round((1 - distance / maxLen) * 100);
}
/**
* Find the best dispensary match for a store
*/
function findBestMatch(store: Store, dispensaries: Dispensary[]): MatchResult {
const normalizedStoreName = normalizeName(store.name);
const storeSlug = store.slug.toLowerCase();
let bestMatch: MatchResult = {
store,
dispensary: null,
matchType: 'none',
score: 0,
};
for (const disp of dispensaries) {
const normalizedDispName = normalizeName(disp.name);
const normalizedCompanyName = disp.company_name ? normalizeName(disp.company_name) : '';
const dispSlug = disp.slug.toLowerCase();
// 1. Exact name match (case-insensitive)
if (store.name.toLowerCase() === disp.name.toLowerCase()) {
return {
store,
dispensary: disp,
matchType: 'exact_name',
score: 100,
};
}
// 2. Normalized name match
if (normalizedStoreName === normalizedDispName) {
return {
store,
dispensary: disp,
matchType: 'normalized_name',
score: 95,
};
}
// 3. Store name matches company name
if (normalizedCompanyName && normalizedStoreName === normalizedCompanyName) {
return {
store,
dispensary: disp,
matchType: 'company_name',
score: 90,
};
}
// 4. Slug match
if (storeSlug === dispSlug) {
return {
store,
dispensary: disp,
matchType: 'slug',
score: 85,
};
}
// 5. Fuzzy matching (only if score > 70)
const nameScore = similarityScore(normalizedStoreName, normalizedDispName);
const companyScore = normalizedCompanyName
? similarityScore(normalizedStoreName, normalizedCompanyName)
: 0;
const fuzzyScore = Math.max(nameScore, companyScore);
if (fuzzyScore > bestMatch.score && fuzzyScore >= 70) {
bestMatch = {
store,
dispensary: disp,
matchType: 'fuzzy',
score: fuzzyScore,
};
}
}
return bestMatch;
}
async function main() {
if (flags.help) {
console.log(`
Backfill Store-Dispensary Mapping
Links existing stores (scheduler) to dispensaries (master AZDHS directory)
by matching on name, company name, or slug similarity.
USAGE:
npx tsx src/scripts/backfill-store-dispensary.ts [OPTIONS]
OPTIONS:
--apply Apply the mappings to the database (default: preview only)
--verbose Show detailed match information for all stores
--help, -h Show this help message
EXAMPLES:
# Preview what would be matched
npx tsx src/scripts/backfill-store-dispensary.ts
# Apply the mappings
npx tsx src/scripts/backfill-store-dispensary.ts --apply
# Show verbose output
npx tsx src/scripts/backfill-store-dispensary.ts --verbose
`);
process.exit(0);
}
console.log('\n📦 Backfill Store-Dispensary Mapping');
console.log('=====================================\n');
try {
// Fetch all stores without a dispensary_id
const storesResult = await pool.query<Store>(`
SELECT id, name, slug, dispensary_id
FROM stores
WHERE dispensary_id IS NULL
ORDER BY name
`);
const unmappedStores = storesResult.rows;
// Fetch all already-mapped stores for context
const mappedResult = await pool.query<Store>(`
SELECT id, name, slug, dispensary_id
FROM stores
WHERE dispensary_id IS NOT NULL
ORDER BY name
`);
const mappedStores = mappedResult.rows;
// Fetch all dispensaries
const dispResult = await pool.query<Dispensary>(`
SELECT id, name, company_name, city, address, slug
FROM dispensaries
ORDER BY name
`);
const dispensaries = dispResult.rows;
console.log(`📊 Current Status:`);
console.log(` Stores without dispensary_id: ${unmappedStores.length}`);
console.log(` Stores already mapped: ${mappedStores.length}`);
console.log(` Total dispensaries: ${dispensaries.length}\n`);
if (unmappedStores.length === 0) {
console.log('✅ All stores are already mapped to dispensaries!\n');
await pool.end();
process.exit(0);
}
// Find matches for each unmapped store
const matches: MatchResult[] = [];
const noMatches: Store[] = [];
for (const store of unmappedStores) {
const match = findBestMatch(store, dispensaries);
if (match.dispensary) {
matches.push(match);
} else {
noMatches.push(store);
}
}
// Sort matches by score (highest first)
matches.sort((a, b) => b.score - a.score);
// Display results
console.log(`\n🔗 Matches Found: ${matches.length}`);
console.log('----------------------------------\n');
if (matches.length > 0) {
// Group by match type
const byType: Record<string, MatchResult[]> = {};
for (const m of matches) {
if (!byType[m.matchType]) byType[m.matchType] = [];
byType[m.matchType].push(m);
}
const typeLabels: Record<string, string> = {
exact_name: '✅ Exact Name Match',
normalized_name: '✅ Normalized Name Match',
company_name: '🏢 Company Name Match',
slug: '🔗 Slug Match',
fuzzy: '🔍 Fuzzy Match',
};
for (const [type, results] of Object.entries(byType)) {
console.log(`${typeLabels[type]} (${results.length}):`);
for (const r of results) {
const dispInfo = r.dispensary!;
console.log(` • "${r.store.name}" → "${dispInfo.name}" (${dispInfo.city}) [${r.score}%]`);
}
console.log('');
}
}
if (noMatches.length > 0) {
console.log(`\n❌ No Match Found: ${noMatches.length}`);
console.log('----------------------------------\n');
for (const store of noMatches) {
console.log(` • "${store.name}" (slug: ${store.slug})`);
}
console.log('');
}
// Apply if requested
if (flags.apply && matches.length > 0) {
console.log('\n🔧 Applying mappings...\n');
let updated = 0;
for (const match of matches) {
if (!match.dispensary) continue;
await pool.query(
'UPDATE stores SET dispensary_id = $1 WHERE id = $2',
[match.dispensary.id, match.store.id]
);
updated++;
if (flags.verbose) {
console.log(` ✓ Linked store ${match.store.id} to dispensary ${match.dispensary.id}`);
}
}
console.log(`\n✅ Updated ${updated} stores with dispensary mappings\n`);
logger.info('system', `Backfill complete: linked ${updated} stores to dispensaries`);
} else if (matches.length > 0 && !flags.apply) {
console.log('\n💡 Run with --apply to update the database\n');
}
// Summary
console.log('📈 Summary:');
console.log(` Would match: ${matches.length} stores`);
console.log(` No match: ${noMatches.length} stores`);
console.log(` Match rate: ${Math.round((matches.length / unmappedStores.length) * 100)}%\n`);
} catch (error) {
console.error('Error:', error);
process.exit(1);
} finally {
await pool.end();
}
}
main().catch(console.error);

View File

@@ -0,0 +1,424 @@
#!/usr/bin/env npx tsx
/**
* Queue Dispensaries Script
*
* Orchestrates the multi-provider crawler system:
* 1. Queue dispensaries that need provider detection
* 2. Queue Dutchie dispensaries for production crawl
* 3. Queue sandbox dispensaries for learning crawls
*
* Usage:
* npx tsx src/scripts/queue-dispensaries.ts [--detection] [--production] [--sandbox] [--all]
* npx tsx src/scripts/queue-dispensaries.ts --dry-run
* npx tsx src/scripts/queue-dispensaries.ts --process # Process queued jobs
*/
import { pool } from '../db/migrate';
import { logger } from '../services/logger';
import {
runDetectMenuProviderJob,
runDutchieMenuCrawlJob,
runSandboxCrawlJob,
processSandboxJobs,
} from '../services/crawler-jobs';
// Parse command line args
const args = process.argv.slice(2);
const flags = {
detection: args.includes('--detection') || args.includes('--all'),
production: args.includes('--production') || args.includes('--all'),
sandbox: args.includes('--sandbox') || args.includes('--all'),
dryRun: args.includes('--dry-run'),
process: args.includes('--process'),
help: args.includes('--help') || args.includes('-h'),
limit: parseInt(args.find(a => a.startsWith('--limit='))?.split('=')[1] || '10'),
};
// If no specific flags, default to all
if (!flags.detection && !flags.production && !flags.sandbox && !flags.process) {
flags.detection = true;
flags.production = true;
flags.sandbox = true;
}
async function showHelp() {
console.log(`
Queue Dispensaries - Multi-Provider Crawler Orchestration
USAGE:
npx tsx src/scripts/queue-dispensaries.ts [OPTIONS]
OPTIONS:
--detection Queue dispensaries that need provider detection
--production Queue Dutchie production crawls
--sandbox Queue sandbox/learning crawls
--all Queue all job types (default if no specific flag)
--process Process queued jobs instead of just queuing
--dry-run Show what would be queued without making changes
--limit=N Maximum dispensaries to queue per type (default: 10)
--help, -h Show this help message
EXAMPLES:
# Queue all dispensaries for appropriate jobs
npx tsx src/scripts/queue-dispensaries.ts
# Only queue detection jobs
npx tsx src/scripts/queue-dispensaries.ts --detection --limit=20
# Dry run to see what would be queued
npx tsx src/scripts/queue-dispensaries.ts --dry-run
# Process sandbox jobs
npx tsx src/scripts/queue-dispensaries.ts --process
`);
}
async function queueDetectionJobs(): Promise<number> {
console.log('\n📡 Queueing Detection Jobs...');
// Find dispensaries that need provider detection:
// - menu_provider is null OR
// - menu_provider_confidence < 70 AND
// - crawler_status is idle (not already queued/running)
// - has a website URL
const query = `
SELECT id, name, website, menu_url, menu_provider, menu_provider_confidence
FROM dispensaries
WHERE (website IS NOT NULL OR menu_url IS NOT NULL)
AND crawler_status = 'idle'
AND (menu_provider IS NULL OR menu_provider_confidence < 70)
ORDER BY
CASE WHEN menu_provider IS NULL THEN 0 ELSE 1 END,
menu_provider_confidence ASC
LIMIT $1
`;
const result = await pool.query(query, [flags.limit]);
if (flags.dryRun) {
console.log(` Would queue ${result.rows.length} dispensaries for detection:`);
for (const row of result.rows) {
console.log(` - [${row.id}] ${row.name} (current: ${row.menu_provider || 'unknown'}, confidence: ${row.menu_provider_confidence}%)`);
}
return result.rows.length;
}
let queued = 0;
for (const dispensary of result.rows) {
try {
// Update status to queued
await pool.query(
`UPDATE dispensaries SET crawler_status = 'queued_detection', updated_at = NOW() WHERE id = $1`,
[dispensary.id]
);
// Create sandbox job for detection
await pool.query(
`INSERT INTO sandbox_crawl_jobs (dispensary_id, job_type, status, priority)
VALUES ($1, 'detection', 'pending', 10)`,
[dispensary.id]
);
console.log(` ✓ Queued detection: [${dispensary.id}] ${dispensary.name}`);
queued++;
} catch (error: any) {
console.error(` ✗ Failed to queue [${dispensary.id}]: ${error.message}`);
}
}
return queued;
}
async function queueProductionCrawls(): Promise<number> {
console.log('\n🏭 Queueing Production Dutchie Crawls...');
// Find Dutchie dispensaries ready for production crawl:
// - menu_provider = 'dutchie'
// - crawler_mode = 'production'
// - crawler_status is idle
// - last_menu_scrape is old or null
const query = `
SELECT d.id, d.name, d.last_menu_scrape, d.menu_url
FROM dispensaries d
WHERE d.menu_provider = 'dutchie'
AND d.crawler_mode = 'production'
AND d.crawler_status = 'idle'
AND (d.last_menu_scrape IS NULL OR d.last_menu_scrape < NOW() - INTERVAL '4 hours')
ORDER BY
CASE WHEN d.last_menu_scrape IS NULL THEN 0 ELSE 1 END,
d.last_menu_scrape ASC
LIMIT $1
`;
const result = await pool.query(query, [flags.limit]);
if (flags.dryRun) {
console.log(` Would queue ${result.rows.length} Dutchie dispensaries for production crawl:`);
for (const row of result.rows) {
const lastScrape = row.last_menu_scrape ? new Date(row.last_menu_scrape).toISOString() : 'never';
console.log(` - [${row.id}] ${row.name} (last scrape: ${lastScrape})`);
}
return result.rows.length;
}
let queued = 0;
for (const dispensary of result.rows) {
try {
// Update status to queued
await pool.query(
`UPDATE dispensaries SET crawler_status = 'queued_crawl', updated_at = NOW() WHERE id = $1`,
[dispensary.id]
);
// Create crawl job in the main crawl_jobs table (production queue)
await pool.query(
`INSERT INTO crawl_jobs (store_id, job_type, trigger_type, status, priority, metadata)
SELECT s.id, 'full_crawl', 'scheduled', 'pending', 50,
jsonb_build_object('dispensary_id', $1, 'source', 'queue-dispensaries')
FROM stores s
JOIN dispensaries d ON (d.menu_url = s.dutchie_plus_url OR d.name ILIKE '%' || s.name || '%')
WHERE d.id = $1
LIMIT 1`,
[dispensary.id]
);
console.log(` ✓ Queued production crawl: [${dispensary.id}] ${dispensary.name}`);
queued++;
} catch (error: any) {
console.error(` ✗ Failed to queue [${dispensary.id}]: ${error.message}`);
}
}
return queued;
}
async function queueSandboxCrawls(): Promise<number> {
console.log('\n🧪 Queueing Sandbox Crawls...');
// Find sandbox dispensaries needing crawls:
// - crawler_mode = 'sandbox'
// - crawler_status in (idle, error_needs_review)
// - No recent sandbox job
const query = `
SELECT d.id, d.name, d.menu_provider, d.crawler_status, d.website
FROM dispensaries d
WHERE d.crawler_mode = 'sandbox'
AND d.crawler_status IN ('idle', 'error_needs_review')
AND (d.website IS NOT NULL OR d.menu_url IS NOT NULL)
AND NOT EXISTS (
SELECT 1 FROM sandbox_crawl_jobs sj
WHERE sj.dispensary_id = d.id
AND sj.status IN ('pending', 'running')
)
ORDER BY d.updated_at ASC
LIMIT $1
`;
const result = await pool.query(query, [flags.limit]);
if (flags.dryRun) {
console.log(` Would queue ${result.rows.length} dispensaries for sandbox crawl:`);
for (const row of result.rows) {
console.log(` - [${row.id}] ${row.name} (provider: ${row.menu_provider || 'unknown'}, status: ${row.crawler_status})`);
}
return result.rows.length;
}
let queued = 0;
for (const dispensary of result.rows) {
try {
// Update status
await pool.query(
`UPDATE dispensaries SET crawler_status = 'queued_crawl', updated_at = NOW() WHERE id = $1`,
[dispensary.id]
);
// Create sandbox job
await pool.query(
`INSERT INTO sandbox_crawl_jobs (dispensary_id, job_type, status, priority)
VALUES ($1, 'deep_crawl', 'pending', 5)`,
[dispensary.id]
);
console.log(` ✓ Queued sandbox crawl: [${dispensary.id}] ${dispensary.name}`);
queued++;
} catch (error: any) {
console.error(` ✗ Failed to queue [${dispensary.id}]: ${error.message}`);
}
}
return queued;
}
async function processJobs(): Promise<void> {
console.log('\n⚙ Processing Queued Jobs...\n');
// Process sandbox jobs (detection + sandbox crawls)
const sandboxJobs = await pool.query(
`SELECT * FROM sandbox_crawl_jobs
WHERE status = 'pending'
ORDER BY priority DESC, scheduled_at ASC
LIMIT $1`,
[flags.limit]
);
console.log(`Found ${sandboxJobs.rows.length} pending sandbox jobs\n`);
for (const job of sandboxJobs.rows) {
console.log(`Processing job ${job.id} (${job.job_type}) for dispensary ${job.dispensary_id}...`);
try {
// Mark as running
await pool.query(
`UPDATE sandbox_crawl_jobs SET status = 'running', started_at = NOW() WHERE id = $1`,
[job.id]
);
let result;
if (job.job_type === 'detection') {
result = await runDetectMenuProviderJob(job.dispensary_id);
} else {
result = await runSandboxCrawlJob(job.dispensary_id, job.sandbox_id);
}
// Update job status
await pool.query(
`UPDATE sandbox_crawl_jobs
SET status = $1, completed_at = NOW(), result_summary = $2, error_message = $3
WHERE id = $4`,
[
result.success ? 'completed' : 'failed',
JSON.stringify(result.data || {}),
result.success ? null : result.message,
job.id,
]
);
console.log(` ${result.success ? '✓' : '✗'} ${result.message}\n`);
} catch (error: any) {
await pool.query(
`UPDATE sandbox_crawl_jobs SET status = 'failed', error_message = $1 WHERE id = $2`,
[error.message, job.id]
);
console.log(` ✗ Error: ${error.message}\n`);
}
}
}
async function showStats(): Promise<void> {
console.log('\n📊 Current Stats:');
// Dispensary stats
const stats = await pool.query(`
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE menu_provider IS NULL) as no_provider,
COUNT(*) FILTER (WHERE menu_provider = 'dutchie') as dutchie,
COUNT(*) FILTER (WHERE menu_provider NOT IN ('dutchie', 'unknown') AND menu_provider IS NOT NULL) as other_providers,
COUNT(*) FILTER (WHERE menu_provider = 'unknown') as unknown,
COUNT(*) FILTER (WHERE crawler_mode = 'production') as production_mode,
COUNT(*) FILTER (WHERE crawler_mode = 'sandbox') as sandbox_mode,
COUNT(*) FILTER (WHERE crawler_status = 'idle') as idle,
COUNT(*) FILTER (WHERE crawler_status LIKE 'queued%') as queued,
COUNT(*) FILTER (WHERE crawler_status = 'running') as running,
COUNT(*) FILTER (WHERE crawler_status = 'ok') as ok,
COUNT(*) FILTER (WHERE crawler_status = 'error_needs_review') as needs_review
FROM dispensaries
`);
const s = stats.rows[0];
console.log(`
Dispensaries: ${s.total}
- No provider detected: ${s.no_provider}
- Dutchie: ${s.dutchie}
- Other providers: ${s.other_providers}
- Unknown: ${s.unknown}
Crawler Mode:
- Production: ${s.production_mode}
- Sandbox: ${s.sandbox_mode}
Status:
- Idle: ${s.idle}
- Queued: ${s.queued}
- Running: ${s.running}
- OK: ${s.ok}
- Needs Review: ${s.needs_review}
`);
// Job stats
const jobStats = await pool.query(`
SELECT
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'running') as running,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed
FROM sandbox_crawl_jobs
`);
const j = jobStats.rows[0];
console.log(` Sandbox Jobs:
- Pending: ${j.pending}
- Running: ${j.running}
- Completed: ${j.completed}
- Failed: ${j.failed}
`);
}
async function main() {
if (flags.help) {
await showHelp();
process.exit(0);
}
console.log('═══════════════════════════════════════════════════════');
console.log(' Multi-Provider Crawler Queue Manager');
console.log('═══════════════════════════════════════════════════════');
if (flags.dryRun) {
console.log('\n🔍 DRY RUN MODE - No changes will be made\n');
}
try {
// Show current stats first
await showStats();
if (flags.process) {
// Process mode - run jobs instead of queuing
await processJobs();
} else {
// Queuing mode
let totalQueued = 0;
if (flags.detection) {
totalQueued += await queueDetectionJobs();
}
if (flags.production) {
totalQueued += await queueProductionCrawls();
}
if (flags.sandbox) {
totalQueued += await queueSandboxCrawls();
}
console.log('\n═══════════════════════════════════════════════════════');
console.log(` Total dispensaries queued: ${totalQueued}`);
console.log('═══════════════════════════════════════════════════════\n');
}
// Show updated stats
if (!flags.dryRun) {
await showStats();
}
} catch (error) {
console.error('Fatal error:', error);
process.exit(1);
} finally {
await pool.end();
}
}
main();

View File

@@ -0,0 +1,583 @@
#!/usr/bin/env npx tsx
/**
* Queue Intelligence Script
*
* Orchestrates the multi-category intelligence crawler system:
* 1. Queue dispensaries that need provider detection (all 4 categories)
* 2. Queue per-category production crawls (Dutchie products only for now)
* 3. Queue per-category sandbox crawls (all providers)
*
* Each category (product, specials, brand, metadata) is handled independently.
* A failure in one category does NOT affect other categories.
*
* Usage:
* npx tsx src/scripts/queue-intelligence.ts [--detection] [--production] [--sandbox] [--all]
* npx tsx src/scripts/queue-intelligence.ts --category=product --sandbox
* npx tsx src/scripts/queue-intelligence.ts --process --category=product
* npx tsx src/scripts/queue-intelligence.ts --dry-run
*/
import { pool } from '../db/migrate';
import { logger } from '../services/logger';
import {
detectMultiCategoryProviders,
updateAllCategoryProviders,
IntelligenceCategory,
} from '../services/intelligence-detector';
import {
runCrawlProductsJob,
runCrawlSpecialsJob,
runCrawlBrandIntelligenceJob,
runCrawlMetadataJob,
runSandboxProductsJob,
runSandboxSpecialsJob,
runSandboxBrandJob,
runSandboxMetadataJob,
runAllCategoryProductionCrawls,
runAllCategorySandboxCrawls,
processCategorySandboxJobs,
} from '../services/category-crawler-jobs';
// Parse command line args
const args = process.argv.slice(2);
const flags = {
detection: args.includes('--detection') || args.includes('--all'),
production: args.includes('--production') || args.includes('--all'),
sandbox: args.includes('--sandbox') || args.includes('--all'),
dryRun: args.includes('--dry-run'),
process: args.includes('--process'),
help: args.includes('--help') || args.includes('-h'),
limit: parseInt(args.find(a => a.startsWith('--limit='))?.split('=')[1] || '10'),
category: args.find(a => a.startsWith('--category='))?.split('=')[1] as IntelligenceCategory | undefined,
dispensary: parseInt(args.find(a => a.startsWith('--dispensary='))?.split('=')[1] || '0'),
};
// If no specific flags, default to all
if (!flags.detection && !flags.production && !flags.sandbox && !flags.process) {
flags.detection = true;
flags.production = true;
flags.sandbox = true;
}
const CATEGORIES: IntelligenceCategory[] = ['product', 'specials', 'brand', 'metadata'];
async function showHelp() {
console.log(`
Queue Intelligence - Multi-Category Crawler Orchestration
USAGE:
npx tsx src/scripts/queue-intelligence.ts [OPTIONS]
OPTIONS:
--detection Queue dispensaries that need multi-category detection
--production Queue per-category production crawls
--sandbox Queue per-category sandbox crawls
--all Queue all job types (default if no specific flag)
--process Process queued jobs instead of just queuing
--category=CATEGORY Filter to specific category (product|specials|brand|metadata)
--dispensary=ID Process only a specific dispensary
--dry-run Show what would be queued without making changes
--limit=N Maximum dispensaries to queue per type (default: 10)
--help, -h Show this help message
CATEGORIES:
product - Product/menu data (Dutchie=production, others=sandbox)
specials - Deals and specials (all sandbox for now)
brand - Brand intelligence (all sandbox for now)
metadata - Categories/taxonomy (all sandbox for now)
EXAMPLES:
# Queue all dispensaries for appropriate jobs
npx tsx src/scripts/queue-intelligence.ts
# Only queue product detection jobs
npx tsx src/scripts/queue-intelligence.ts --detection --category=product
# Process sandbox jobs for specials category
npx tsx src/scripts/queue-intelligence.ts --process --category=specials --limit=5
# Run full detection for a specific dispensary
npx tsx src/scripts/queue-intelligence.ts --process --detection --dispensary=123
# Dry run to see what would be queued
npx tsx src/scripts/queue-intelligence.ts --dry-run
`);
}
async function queueMultiCategoryDetection(): Promise<number> {
console.log('\n📡 Queueing Multi-Category Detection Jobs...');
// Find dispensaries that need provider detection for any category:
// - Any *_provider is null OR
// - Any *_confidence < 70
// - has a website URL
const query = `
SELECT id, name, website, menu_url,
product_provider, product_confidence, product_crawler_mode,
specials_provider, specials_confidence, specials_crawler_mode,
brand_provider, brand_confidence, brand_crawler_mode,
metadata_provider, metadata_confidence, metadata_crawler_mode
FROM dispensaries
WHERE (website IS NOT NULL OR menu_url IS NOT NULL)
AND (
product_provider IS NULL OR product_confidence < 70 OR
specials_provider IS NULL OR specials_confidence < 70 OR
brand_provider IS NULL OR brand_confidence < 70 OR
metadata_provider IS NULL OR metadata_confidence < 70
)
ORDER BY
CASE WHEN product_provider IS NULL THEN 0 ELSE 1 END,
product_confidence ASC
LIMIT $1
`;
const result = await pool.query(query, [flags.limit]);
if (flags.dryRun) {
console.log(` Would queue ${result.rows.length} dispensaries for multi-category detection:`);
for (const row of result.rows) {
const needsDetection: string[] = [];
if (!row.product_provider || row.product_confidence < 70) needsDetection.push('product');
if (!row.specials_provider || row.specials_confidence < 70) needsDetection.push('specials');
if (!row.brand_provider || row.brand_confidence < 70) needsDetection.push('brand');
if (!row.metadata_provider || row.metadata_confidence < 70) needsDetection.push('metadata');
console.log(` - [${row.id}] ${row.name} (needs: ${needsDetection.join(', ')})`);
}
return result.rows.length;
}
let queued = 0;
for (const dispensary of result.rows) {
try {
// Create detection jobs for each category that needs it
for (const category of CATEGORIES) {
const provider = dispensary[`${category}_provider`];
const confidence = dispensary[`${category}_confidence`];
if (!provider || confidence < 70) {
await pool.query(
`INSERT INTO sandbox_crawl_jobs (dispensary_id, category, job_type, status, priority)
VALUES ($1, $2, 'detection', 'pending', 10)
ON CONFLICT DO NOTHING`,
[dispensary.id, category]
);
}
}
console.log(` ✓ Queued detection: [${dispensary.id}] ${dispensary.name}`);
queued++;
} catch (error: any) {
console.error(` ✗ Failed to queue [${dispensary.id}]: ${error.message}`);
}
}
return queued;
}
async function queueCategoryProductionCrawls(category?: IntelligenceCategory): Promise<number> {
const categories = category ? [category] : CATEGORIES;
let totalQueued = 0;
for (const cat of categories) {
console.log(`\n🏭 Queueing Production ${cat.toUpperCase()} Crawls...`);
// For now, only products have production-ready crawlers (Dutchie only)
if (cat !== 'product') {
console.log(` ⏭️ No production crawler for ${cat} yet - skipping`);
continue;
}
// Find dispensaries ready for production crawl
const query = `
SELECT id, name, ${cat}_provider as provider, last_${cat}_scan_at as last_scan
FROM dispensaries
WHERE ${cat}_provider = 'dutchie'
AND ${cat}_crawler_mode = 'production'
AND ${cat}_confidence >= 70
AND (last_${cat}_scan_at IS NULL OR last_${cat}_scan_at < NOW() - INTERVAL '4 hours')
ORDER BY
CASE WHEN last_${cat}_scan_at IS NULL THEN 0 ELSE 1 END,
last_${cat}_scan_at ASC
LIMIT $1
`;
const result = await pool.query(query, [flags.limit]);
if (flags.dryRun) {
console.log(` Would queue ${result.rows.length} dispensaries for ${cat} production crawl:`);
for (const row of result.rows) {
const lastScan = row.last_scan ? new Date(row.last_scan).toISOString() : 'never';
console.log(` - [${row.id}] ${row.name} (provider: ${row.provider}, last: ${lastScan})`);
}
totalQueued += result.rows.length;
continue;
}
for (const dispensary of result.rows) {
try {
// For products, use the existing crawl_jobs table for production
await pool.query(
`INSERT INTO crawl_jobs (store_id, job_type, trigger_type, status, priority, metadata)
SELECT s.id, 'full_crawl', 'scheduled', 'pending', 50,
jsonb_build_object('dispensary_id', $1, 'category', $2, 'source', 'queue-intelligence')
FROM stores s
JOIN dispensaries d ON (d.menu_url = s.dutchie_plus_url OR d.name ILIKE '%' || s.name || '%')
WHERE d.id = $1
LIMIT 1`,
[dispensary.id, cat]
);
console.log(` ✓ Queued ${cat} production: [${dispensary.id}] ${dispensary.name}`);
totalQueued++;
} catch (error: any) {
console.error(` ✗ Failed to queue [${dispensary.id}]: ${error.message}`);
}
}
}
return totalQueued;
}
async function queueCategorySandboxCrawls(category?: IntelligenceCategory): Promise<number> {
const categories = category ? [category] : CATEGORIES;
let totalQueued = 0;
for (const cat of categories) {
console.log(`\n🧪 Queueing Sandbox ${cat.toUpperCase()} Crawls...`);
// Find dispensaries in sandbox mode for this category
const query = `
SELECT d.id, d.name, d.${cat}_provider as provider, d.${cat}_confidence as confidence,
d.website, d.menu_url
FROM dispensaries d
WHERE d.${cat}_crawler_mode = 'sandbox'
AND d.${cat}_provider IS NOT NULL
AND (d.website IS NOT NULL OR d.menu_url IS NOT NULL)
AND NOT EXISTS (
SELECT 1 FROM sandbox_crawl_jobs sj
WHERE sj.dispensary_id = d.id
AND sj.category = $1
AND sj.status IN ('pending', 'running')
)
ORDER BY d.${cat}_confidence DESC, d.updated_at ASC
LIMIT $2
`;
const result = await pool.query(query, [cat, flags.limit]);
if (flags.dryRun) {
console.log(` Would queue ${result.rows.length} dispensaries for ${cat} sandbox crawl:`);
for (const row of result.rows) {
console.log(` - [${row.id}] ${row.name} (provider: ${row.provider}, confidence: ${row.confidence}%)`);
}
totalQueued += result.rows.length;
continue;
}
for (const dispensary of result.rows) {
try {
// Create sandbox entry if needed
const sandboxResult = await pool.query(
`INSERT INTO crawler_sandboxes (dispensary_id, category, suspected_menu_provider, mode, status)
VALUES ($1, $2, $3, 'template_learning', 'pending')
ON CONFLICT (dispensary_id, category) WHERE status NOT IN ('moved_to_production', 'failed')
DO UPDATE SET updated_at = NOW()
RETURNING id`,
[dispensary.id, cat, dispensary.provider]
);
const sandboxId = sandboxResult.rows[0]?.id;
// Create sandbox job
await pool.query(
`INSERT INTO sandbox_crawl_jobs (dispensary_id, sandbox_id, category, job_type, status, priority)
VALUES ($1, $2, $3, 'crawl', 'pending', 5)`,
[dispensary.id, sandboxId, cat]
);
console.log(` ✓ Queued ${cat} sandbox: [${dispensary.id}] ${dispensary.name} (${dispensary.provider})`);
totalQueued++;
} catch (error: any) {
console.error(` ✗ Failed to queue [${dispensary.id}]: ${error.message}`);
}
}
}
return totalQueued;
}
async function processDetectionJobs(): Promise<void> {
console.log('\n🔍 Processing Detection Jobs...');
// Get pending detection jobs
const jobs = await pool.query(
`SELECT DISTINCT dispensary_id
FROM sandbox_crawl_jobs
WHERE job_type = 'detection' AND status = 'pending'
${flags.category ? `AND category = $2` : ''}
${flags.dispensary ? `AND dispensary_id = $${flags.category ? '3' : '2'}` : ''}
LIMIT $1`,
flags.category
? (flags.dispensary ? [flags.limit, flags.category, flags.dispensary] : [flags.limit, flags.category])
: (flags.dispensary ? [flags.limit, flags.dispensary] : [flags.limit])
);
for (const job of jobs.rows) {
console.log(`\nProcessing detection for dispensary ${job.dispensary_id}...`);
try {
// Get dispensary info
const dispResult = await pool.query(
'SELECT id, name, website, menu_url FROM dispensaries WHERE id = $1',
[job.dispensary_id]
);
const dispensary = dispResult.rows[0];
if (!dispensary) {
console.log(` ✗ Dispensary not found`);
continue;
}
const websiteUrl = dispensary.website || dispensary.menu_url;
if (!websiteUrl) {
console.log(` ✗ No website URL`);
continue;
}
// Mark jobs as running
await pool.query(
`UPDATE sandbox_crawl_jobs SET status = 'running', started_at = NOW()
WHERE dispensary_id = $1 AND job_type = 'detection' AND status = 'pending'`,
[job.dispensary_id]
);
// Run multi-category detection
console.log(` Detecting providers for ${dispensary.name}...`);
const detection = await detectMultiCategoryProviders(websiteUrl, { timeout: 45000 });
// Update all categories
await updateAllCategoryProviders(job.dispensary_id, detection);
// Mark jobs as completed
await pool.query(
`UPDATE sandbox_crawl_jobs SET status = 'completed', completed_at = NOW(),
result_summary = $1
WHERE dispensary_id = $2 AND job_type = 'detection' AND status = 'running'`,
[JSON.stringify({
product: { provider: detection.product.provider, confidence: detection.product.confidence },
specials: { provider: detection.specials.provider, confidence: detection.specials.confidence },
brand: { provider: detection.brand.provider, confidence: detection.brand.confidence },
metadata: { provider: detection.metadata.provider, confidence: detection.metadata.confidence },
}), job.dispensary_id]
);
console.log(` ✓ Detection complete:`);
console.log(` Product: ${detection.product.provider} (${detection.product.confidence}%) -> ${detection.product.mode}`);
console.log(` Specials: ${detection.specials.provider} (${detection.specials.confidence}%) -> ${detection.specials.mode}`);
console.log(` Brand: ${detection.brand.provider} (${detection.brand.confidence}%) -> ${detection.brand.mode}`);
console.log(` Metadata: ${detection.metadata.provider} (${detection.metadata.confidence}%) -> ${detection.metadata.mode}`);
} catch (error: any) {
console.log(` ✗ Error: ${error.message}`);
await pool.query(
`UPDATE sandbox_crawl_jobs SET status = 'failed', error_message = $1
WHERE dispensary_id = $2 AND job_type = 'detection' AND status = 'running'`,
[error.message, job.dispensary_id]
);
}
}
}
async function processCrawlJobs(): Promise<void> {
const categories = flags.category ? [flags.category] : CATEGORIES;
for (const cat of categories) {
console.log(`\n⚙ Processing ${cat.toUpperCase()} Crawl Jobs...\n`);
// Process sandbox jobs for this category
if (flags.sandbox || !flags.production) {
await processCategorySandboxJobs(cat, flags.limit);
}
// Process production jobs for this category
if (flags.production && cat === 'product') {
// Get pending production crawls
const prodJobs = await pool.query(
`SELECT d.id
FROM dispensaries d
WHERE d.product_provider = 'dutchie'
AND d.product_crawler_mode = 'production'
AND d.product_confidence >= 70
${flags.dispensary ? 'AND d.id = $2' : ''}
LIMIT $1`,
flags.dispensary ? [flags.limit, flags.dispensary] : [flags.limit]
);
for (const job of prodJobs.rows) {
console.log(`Processing production ${cat} crawl for dispensary ${job.id}...`);
const result = await runCrawlProductsJob(job.id);
console.log(` ${result.success ? '✓' : '✗'} ${result.message}`);
}
}
}
}
async function processSpecificDispensary(): Promise<void> {
if (!flags.dispensary) return;
console.log(`\n🎯 Processing Dispensary ${flags.dispensary}...\n`);
const dispResult = await pool.query(
'SELECT * FROM dispensaries WHERE id = $1',
[flags.dispensary]
);
if (dispResult.rows.length === 0) {
console.log('Dispensary not found');
return;
}
const dispensary = dispResult.rows[0];
console.log(`Name: ${dispensary.name}`);
console.log(`Website: ${dispensary.website || dispensary.menu_url || 'none'}`);
console.log('');
if (flags.detection) {
console.log('Running multi-category detection...');
const websiteUrl = dispensary.website || dispensary.menu_url;
if (websiteUrl) {
const detection = await detectMultiCategoryProviders(websiteUrl);
await updateAllCategoryProviders(flags.dispensary, detection);
console.log('Detection results:');
console.log(` Product: ${detection.product.provider} (${detection.product.confidence}%) -> ${detection.product.mode}`);
console.log(` Specials: ${detection.specials.provider} (${detection.specials.confidence}%) -> ${detection.specials.mode}`);
console.log(` Brand: ${detection.brand.provider} (${detection.brand.confidence}%) -> ${detection.brand.mode}`);
console.log(` Metadata: ${detection.metadata.provider} (${detection.metadata.confidence}%) -> ${detection.metadata.mode}`);
}
}
if (flags.production) {
console.log('\nRunning production crawls...');
const results = await runAllCategoryProductionCrawls(flags.dispensary);
console.log(` ${results.summary}`);
}
if (flags.sandbox) {
console.log('\nRunning sandbox crawls...');
const results = await runAllCategorySandboxCrawls(flags.dispensary);
console.log(` ${results.summary}`);
}
}
async function showStats(): Promise<void> {
console.log('\n📊 Multi-Category Intelligence Stats:');
// Per-category stats
for (const cat of CATEGORIES) {
const stats = await pool.query(`
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE ${cat}_provider IS NULL) as no_provider,
COUNT(*) FILTER (WHERE ${cat}_provider = 'dutchie') as dutchie,
COUNT(*) FILTER (WHERE ${cat}_provider = 'treez') as treez,
COUNT(*) FILTER (WHERE ${cat}_provider NOT IN ('dutchie', 'treez', 'unknown') AND ${cat}_provider IS NOT NULL) as other,
COUNT(*) FILTER (WHERE ${cat}_provider = 'unknown') as unknown,
COUNT(*) FILTER (WHERE ${cat}_crawler_mode = 'production') as production,
COUNT(*) FILTER (WHERE ${cat}_crawler_mode = 'sandbox') as sandbox,
AVG(${cat}_confidence) as avg_confidence
FROM dispensaries
`);
const s = stats.rows[0];
console.log(`
${cat.toUpperCase()}:
Providers: Dutchie=${s.dutchie}, Treez=${s.treez}, Other=${s.other}, Unknown=${s.unknown}, None=${s.no_provider}
Modes: Production=${s.production}, Sandbox=${s.sandbox}
Avg Confidence: ${Math.round(s.avg_confidence || 0)}%`);
}
// Job stats per category
console.log('\n Sandbox Jobs by Category:');
const jobStats = await pool.query(`
SELECT
category,
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'running') as running,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed
FROM sandbox_crawl_jobs
GROUP BY category
ORDER BY category
`);
for (const row of jobStats.rows) {
console.log(` ${row.category}: pending=${row.pending}, running=${row.running}, completed=${row.completed}, failed=${row.failed}`);
}
}
async function main() {
if (flags.help) {
await showHelp();
process.exit(0);
}
console.log('═══════════════════════════════════════════════════════');
console.log(' Multi-Category Intelligence Queue Manager');
console.log('═══════════════════════════════════════════════════════');
if (flags.dryRun) {
console.log('\n🔍 DRY RUN MODE - No changes will be made\n');
}
if (flags.category) {
console.log(`\n📌 Filtering to category: ${flags.category}\n`);
}
try {
// Show current stats first
await showStats();
// If specific dispensary specified, process it directly
if (flags.dispensary && flags.process) {
await processSpecificDispensary();
} else if (flags.process) {
// Process mode - run jobs
if (flags.detection) {
await processDetectionJobs();
}
await processCrawlJobs();
} else {
// Queuing mode
let totalQueued = 0;
if (flags.detection) {
totalQueued += await queueMultiCategoryDetection();
}
if (flags.production) {
totalQueued += await queueCategoryProductionCrawls(flags.category);
}
if (flags.sandbox) {
totalQueued += await queueCategorySandboxCrawls(flags.category);
}
console.log('\n═══════════════════════════════════════════════════════');
console.log(` Total queued: ${totalQueued}`);
console.log('═══════════════════════════════════════════════════════\n');
}
// Show updated stats
if (!flags.dryRun) {
await showStats();
}
} catch (error) {
console.error('Fatal error:', error);
process.exit(1);
} finally {
await pool.end();
}
}
main();

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,651 @@
/**
* Crawl Scheduler Service
*
* This service manages crawl scheduling using a job queue approach.
* It does NOT modify the crawler - it only TRIGGERS the existing crawler.
*
* Features:
* - Global schedule: crawl all stores every N hours
* - Daily special run: 12:01 AM local store time
* - Per-store schedule overrides
* - Job queue for tracking pending/running crawls
*/
import cron from 'node-cron';
import { pool } from '../db/migrate';
import { scrapeStore } from '../scraper-v2';
import {
runStoreCrawlOrchestrator,
getStoresDueForOrchestration,
} from './store-crawl-orchestrator';
// Worker identification
const WORKER_ID = `worker-${process.pid}-${Date.now()}`;
let schedulerCronJob: cron.ScheduledTask | null = null;
let jobProcessorRunning = false;
let orchestratorProcessorRunning = false;
// Scheduler mode: 'legacy' uses job queue, 'orchestrator' uses intelligent orchestration
let schedulerMode: 'legacy' | 'orchestrator' = 'orchestrator';
// ============================================
// Types
// ============================================
interface GlobalSchedule {
id: number;
schedule_type: string;
enabled: boolean;
interval_hours: number | null;
run_time: string | null;
}
interface StoreScheduleStatus {
store_id: number;
store_name: string;
store_slug: string;
timezone: string;
active: boolean;
scrape_enabled: boolean;
last_scraped_at: Date | null;
schedule_enabled: boolean;
interval_hours: number;
daily_special_enabled: boolean;
daily_special_time: string;
priority: number;
next_scheduled_run: Date;
latest_job_id: number | null;
latest_job_status: string | null;
}
interface CrawlJob {
id: number;
store_id: number;
job_type: string;
trigger_type: string;
status: string;
priority: number;
scheduled_at: Date;
started_at: Date | null;
completed_at: Date | null;
products_found: number | null;
error_message: string | null;
}
// ============================================
// Schedule Management
// ============================================
/**
* Get global schedule settings
*/
export async function getGlobalSchedule(): Promise<GlobalSchedule[]> {
const result = await pool.query(`
SELECT * FROM crawler_schedule ORDER BY id
`);
return result.rows;
}
/**
* Update global schedule setting
*/
export async function updateGlobalSchedule(
scheduleType: string,
updates: { enabled?: boolean; interval_hours?: number; run_time?: string }
): Promise<GlobalSchedule> {
const setClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
if (updates.enabled !== undefined) {
setClauses.push(`enabled = $${paramIndex++}`);
values.push(updates.enabled);
}
if (updates.interval_hours !== undefined) {
setClauses.push(`interval_hours = $${paramIndex++}`);
values.push(updates.interval_hours);
}
if (updates.run_time !== undefined) {
setClauses.push(`run_time = $${paramIndex++}`);
values.push(updates.run_time);
}
values.push(scheduleType);
const result = await pool.query(`
UPDATE crawler_schedule
SET ${setClauses.join(', ')}
WHERE schedule_type = $${paramIndex}
RETURNING *
`, values);
return result.rows[0];
}
/**
* Get all store schedule statuses
*/
export async function getStoreScheduleStatuses(): Promise<StoreScheduleStatus[]> {
const result = await pool.query(`SELECT * FROM crawl_schedule_status ORDER BY priority DESC, store_name`);
return result.rows;
}
/**
* Get or create per-store schedule override
*/
export async function getStoreSchedule(storeId: number): Promise<any> {
const result = await pool.query(`
SELECT * FROM store_crawl_schedule WHERE store_id = $1
`, [storeId]);
if (result.rows.length > 0) {
return result.rows[0];
}
// Return default (use global)
return {
store_id: storeId,
enabled: true,
interval_hours: null,
daily_special_enabled: true,
daily_special_time: null,
priority: 0
};
}
/**
* Update per-store schedule override
*/
export async function updateStoreSchedule(
storeId: number,
updates: {
enabled?: boolean;
interval_hours?: number | null;
daily_special_enabled?: boolean;
daily_special_time?: string | null;
priority?: number;
}
): Promise<any> {
const result = await pool.query(`
INSERT INTO store_crawl_schedule (store_id, enabled, interval_hours, daily_special_enabled, daily_special_time, priority)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (store_id) DO UPDATE SET
enabled = COALESCE(EXCLUDED.enabled, store_crawl_schedule.enabled),
interval_hours = EXCLUDED.interval_hours,
daily_special_enabled = COALESCE(EXCLUDED.daily_special_enabled, store_crawl_schedule.daily_special_enabled),
daily_special_time = EXCLUDED.daily_special_time,
priority = COALESCE(EXCLUDED.priority, store_crawl_schedule.priority),
updated_at = NOW()
RETURNING *
`, [
storeId,
updates.enabled ?? true,
updates.interval_hours ?? null,
updates.daily_special_enabled ?? true,
updates.daily_special_time ?? null,
updates.priority ?? 0
]);
return result.rows[0];
}
// ============================================
// Job Queue Management
// ============================================
/**
* Create a new crawl job
*/
export async function createCrawlJob(
storeId: number,
jobType: 'full_crawl' | 'specials_only' | 'category' = 'full_crawl',
triggerType: 'scheduled' | 'manual' | 'daily_special' = 'scheduled',
scheduledAt: Date = new Date(),
priority: number = 0
): Promise<CrawlJob> {
// Check if there's already a pending or running job for this store
const existing = await pool.query(`
SELECT id FROM crawl_jobs
WHERE store_id = $1 AND status IN ('pending', 'running')
LIMIT 1
`, [storeId]);
if (existing.rows.length > 0) {
console.log(`Skipping job creation for store ${storeId} - already has pending/running job`);
return existing.rows[0];
}
const result = await pool.query(`
INSERT INTO crawl_jobs (store_id, job_type, trigger_type, scheduled_at, priority, status)
VALUES ($1, $2, $3, $4, $5, 'pending')
RETURNING *
`, [storeId, jobType, triggerType, scheduledAt, priority]);
console.log(`Created crawl job ${result.rows[0].id} for store ${storeId} (${triggerType})`);
return result.rows[0];
}
/**
* Get pending jobs ready to run
*/
export async function getPendingJobs(limit: number = 5): Promise<CrawlJob[]> {
const result = await pool.query(`
SELECT cj.*, s.name as store_name
FROM crawl_jobs cj
JOIN stores s ON s.id = cj.store_id
WHERE cj.status = 'pending'
AND cj.scheduled_at <= NOW()
ORDER BY cj.priority DESC, cj.scheduled_at ASC
LIMIT $1
`, [limit]);
return result.rows;
}
/**
* Claim a job for processing
*/
export async function claimJob(jobId: number): Promise<boolean> {
const result = await pool.query(`
UPDATE crawl_jobs
SET status = 'running', started_at = NOW(), worker_id = $2
WHERE id = $1 AND status = 'pending'
RETURNING id
`, [jobId, WORKER_ID]);
return result.rows.length > 0;
}
/**
* Complete a job
*/
export async function completeJob(
jobId: number,
success: boolean,
results?: { products_found?: number; products_new?: number; products_updated?: number; error_message?: string }
): Promise<void> {
await pool.query(`
UPDATE crawl_jobs
SET
status = $2,
completed_at = NOW(),
products_found = $3,
error_message = $4
WHERE id = $1
`, [
jobId,
success ? 'completed' : 'failed',
results?.products_found ?? null,
results?.error_message ?? null
]);
}
/**
* Get recent jobs for a store
*/
export async function getRecentJobs(storeId: number, limit: number = 10): Promise<CrawlJob[]> {
const result = await pool.query(`
SELECT * FROM crawl_jobs
WHERE store_id = $1
ORDER BY created_at DESC
LIMIT $2
`, [storeId, limit]);
return result.rows;
}
/**
* Get all recent jobs
*/
export async function getAllRecentJobs(limit: number = 50): Promise<any[]> {
const result = await pool.query(`
SELECT cj.*, s.name as store_name, s.slug as store_slug
FROM crawl_jobs cj
JOIN stores s ON s.id = cj.store_id
ORDER BY cj.created_at DESC
LIMIT $1
`, [limit]);
return result.rows;
}
// ============================================
// Scheduler Logic
// ============================================
/**
* Check which stores are due for a crawl and create jobs
*/
export async function checkAndCreateScheduledJobs(): Promise<number> {
console.log('Checking for stores due for crawl...');
// Get global schedule settings
const globalSchedule = await pool.query(`
SELECT * FROM crawler_schedule WHERE schedule_type = 'global_interval'
`);
if (globalSchedule.rows.length === 0 || !globalSchedule.rows[0].enabled) {
console.log('Global scheduler is disabled');
return 0;
}
const intervalHours = globalSchedule.rows[0].interval_hours || 4;
// Find stores due for crawl
const result = await pool.query(`
SELECT
s.id,
s.name,
s.timezone,
s.last_scraped_at,
COALESCE(scs.enabled, TRUE) as schedule_enabled,
COALESCE(scs.interval_hours, $1) as interval_hours,
COALESCE(scs.priority, 0) as priority
FROM stores s
LEFT JOIN store_crawl_schedule scs ON scs.store_id = s.id
WHERE s.active = TRUE
AND s.scrape_enabled = TRUE
AND COALESCE(scs.enabled, TRUE) = TRUE
AND (
s.last_scraped_at IS NULL
OR s.last_scraped_at < NOW() - (COALESCE(scs.interval_hours, $1) || ' hours')::INTERVAL
)
AND NOT EXISTS (
SELECT 1 FROM crawl_jobs cj
WHERE cj.store_id = s.id AND cj.status IN ('pending', 'running')
)
ORDER BY COALESCE(scs.priority, 0) DESC, s.last_scraped_at ASC NULLS FIRST
`, [intervalHours]);
let jobsCreated = 0;
for (const store of result.rows) {
try {
await createCrawlJob(store.id, 'full_crawl', 'scheduled', new Date(), store.priority);
jobsCreated++;
console.log(`Scheduled crawl job for: ${store.name}`);
} catch (error) {
console.error(`Failed to create job for store ${store.name}:`, error);
}
}
console.log(`Created ${jobsCreated} scheduled crawl jobs`);
return jobsCreated;
}
/**
* Check for daily special runs (12:01 AM local time)
*/
export async function checkAndCreateDailySpecialJobs(): Promise<number> {
console.log('Checking for daily special runs...');
// Get daily special schedule
const dailySchedule = await pool.query(`
SELECT * FROM crawler_schedule WHERE schedule_type = 'daily_special'
`);
if (dailySchedule.rows.length === 0 || !dailySchedule.rows[0].enabled) {
console.log('Daily special scheduler is disabled');
return 0;
}
const targetTime = dailySchedule.rows[0].run_time || '00:01';
// Find stores where it's currently the target time in their local timezone
// and they haven't had a daily special run today
const result = await pool.query(`
SELECT
s.id,
s.name,
s.timezone,
COALESCE(scs.daily_special_enabled, TRUE) as daily_special_enabled,
COALESCE(scs.daily_special_time, $1::TIME) as daily_special_time,
COALESCE(scs.priority, 0) as priority
FROM stores s
LEFT JOIN store_crawl_schedule scs ON scs.store_id = s.id
WHERE s.active = TRUE
AND s.scrape_enabled = TRUE
AND COALESCE(scs.daily_special_enabled, TRUE) = TRUE
-- Check if current time in store timezone matches the target time (within 2 minutes)
AND ABS(
EXTRACT(EPOCH FROM (
(NOW() AT TIME ZONE COALESCE(s.timezone, 'America/Phoenix'))::TIME
- COALESCE(scs.daily_special_time, $1::TIME)
))
) < 120 -- within 2 minutes
-- Ensure we haven't already created a daily_special job today for this store
AND NOT EXISTS (
SELECT 1 FROM crawl_jobs cj
WHERE cj.store_id = s.id
AND cj.trigger_type = 'daily_special'
AND cj.created_at > (NOW() AT TIME ZONE COALESCE(s.timezone, 'America/Phoenix'))::DATE
)
AND NOT EXISTS (
SELECT 1 FROM crawl_jobs cj
WHERE cj.store_id = s.id AND cj.status IN ('pending', 'running')
)
ORDER BY COALESCE(scs.priority, 0) DESC
`, [targetTime]);
let jobsCreated = 0;
for (const store of result.rows) {
try {
await createCrawlJob(store.id, 'full_crawl', 'daily_special', new Date(), store.priority + 10);
jobsCreated++;
console.log(`Created daily special job for: ${store.name} (${store.timezone})`);
} catch (error) {
console.error(`Failed to create daily special job for store ${store.name}:`, error);
}
}
if (jobsCreated > 0) {
console.log(`Created ${jobsCreated} daily special crawl jobs`);
}
return jobsCreated;
}
/**
* Process pending jobs
*/
export async function processJobs(): Promise<void> {
if (jobProcessorRunning) {
console.log('Job processor already running, skipping...');
return;
}
jobProcessorRunning = true;
try {
const jobs = await getPendingJobs(1); // Process one at a time for safety
for (const job of jobs) {
console.log(`Processing job ${job.id} for store: ${(job as any).store_name}`);
const claimed = await claimJob(job.id);
if (!claimed) {
console.log(`Job ${job.id} already claimed by another worker`);
continue;
}
try {
// Call the existing scraper - DO NOT MODIFY SCRAPER LOGIC
await scrapeStore(job.store_id);
// Update store's last_scraped_at
await pool.query(`
UPDATE stores SET last_scraped_at = NOW() WHERE id = $1
`, [job.store_id]);
await completeJob(job.id, true, {});
console.log(`Job ${job.id} completed successfully`);
} catch (error: any) {
console.error(`Job ${job.id} failed:`, error);
await completeJob(job.id, false, { error_message: error.message });
}
}
} finally {
jobProcessorRunning = false;
}
}
/**
* Process stores using the intelligent orchestrator
* This replaces the simple job queue approach with intelligent provider detection
*/
export async function processOrchestrator(): Promise<void> {
if (orchestratorProcessorRunning) {
console.log('Orchestrator processor already running, skipping...');
return;
}
orchestratorProcessorRunning = true;
try {
// Get stores due for orchestration (respects schedule, intervals, etc.)
const storeIds = await getStoresDueForOrchestration(3); // Process up to 3 at a time
if (storeIds.length === 0) {
return;
}
console.log(`Orchestrator: Processing ${storeIds.length} stores due for crawl`);
// Process each store through the orchestrator
for (const storeId of storeIds) {
try {
console.log(`Orchestrator: Starting crawl for store ${storeId}`);
const result = await runStoreCrawlOrchestrator(storeId);
console.log(`Orchestrator: Store ${storeId} completed - ${result.summary}`);
} catch (error: any) {
console.error(`Orchestrator: Store ${storeId} failed - ${error.message}`);
}
}
console.log(`Orchestrator: Finished processing ${storeIds.length} stores`);
} finally {
orchestratorProcessorRunning = false;
}
}
// ============================================
// Scheduler Control
// ============================================
/**
* Set scheduler mode
*/
export function setSchedulerMode(mode: 'legacy' | 'orchestrator'): void {
schedulerMode = mode;
console.log(`Scheduler mode set to: ${mode}`);
}
/**
* Get current scheduler mode
*/
export function getSchedulerMode(): 'legacy' | 'orchestrator' {
return schedulerMode;
}
/**
* Start the scheduler (runs every minute to check for due jobs)
*/
export async function startCrawlScheduler(): Promise<void> {
stopCrawlScheduler();
console.log(`Starting crawl scheduler in ${schedulerMode} mode...`);
// Run every minute
schedulerCronJob = cron.schedule('* * * * *', async () => {
try {
if (schedulerMode === 'orchestrator') {
// Use intelligent orchestrator (handles detection + crawl)
await processOrchestrator();
} else {
// Legacy mode: job queue approach
// Check for interval-based scheduled jobs
await checkAndCreateScheduledJobs();
// Check for daily special runs
await checkAndCreateDailySpecialJobs();
// Process any pending jobs
await processJobs();
}
} catch (error) {
console.error('Scheduler tick error:', error);
}
});
console.log(`Crawl scheduler started in ${schedulerMode} mode (checking every minute)`);
}
/**
* Stop the scheduler
*/
export function stopCrawlScheduler(): void {
if (schedulerCronJob) {
schedulerCronJob.stop();
schedulerCronJob = null;
console.log('Crawl scheduler stopped');
}
}
/**
* Restart the scheduler
*/
export async function restartCrawlScheduler(): Promise<void> {
await startCrawlScheduler();
}
// ============================================
// Manual Triggers
// ============================================
/**
* Manually trigger a crawl for a specific store (creates a job immediately)
*/
export async function triggerManualCrawl(storeId: number): Promise<CrawlJob> {
console.log(`Manual crawl triggered for store ID: ${storeId}`);
return await createCrawlJob(storeId, 'full_crawl', 'manual', new Date(), 100); // High priority
}
/**
* Manually trigger crawls for all stores
*/
export async function triggerAllStoresCrawl(): Promise<number> {
console.log('Manual crawl triggered for all stores');
const result = await pool.query(`
SELECT id, name FROM stores
WHERE active = TRUE AND scrape_enabled = TRUE
AND NOT EXISTS (
SELECT 1 FROM crawl_jobs cj
WHERE cj.store_id = stores.id AND cj.status IN ('pending', 'running')
)
`);
let jobsCreated = 0;
for (const store of result.rows) {
await createCrawlJob(store.id, 'full_crawl', 'manual', new Date(), 50);
jobsCreated++;
}
console.log(`Created ${jobsCreated} manual crawl jobs`);
return jobsCreated;
}
/**
* Cancel a pending job
*/
export async function cancelJob(jobId: number): Promise<boolean> {
const result = await pool.query(`
UPDATE crawl_jobs
SET status = 'cancelled'
WHERE id = $1 AND status = 'pending'
RETURNING id
`, [jobId]);
return result.rows.length > 0;
}

View File

@@ -0,0 +1,645 @@
/**
* Crawler Jobs Service
*
* Handles three types of jobs:
* 1. DetectMenuProviderJob - Detect menu provider for a dispensary
* 2. DutchieMenuCrawlJob - Production Dutchie crawl
* 3. SandboxCrawlJob - Learning/testing crawl for unknown providers
*/
import { pool } from '../db/migrate';
import { logger } from './logger';
import { detectMenuProvider, detectProviderChange, MenuProvider } from './menu-provider-detector';
import { scrapeStore } from '../scraper-v2';
import puppeteer, { Browser, Page } from 'puppeteer';
import { promises as fs } from 'fs';
import path from 'path';
const WORKER_ID = `crawler-${process.pid}-${Date.now()}`;
// ========================================
// Types
// ========================================
interface Dispensary {
id: number;
name: string;
website: string | null;
menu_url: string | null;
menu_provider: MenuProvider | null;
menu_provider_confidence: number;
crawler_mode: 'production' | 'sandbox';
crawler_status: string;
scraper_template: string | null;
}
interface JobResult {
success: boolean;
message: string;
data?: Record<string, any>;
}
// ========================================
// Helper Functions
// ========================================
async function getDispensary(dispensaryId: number): Promise<Dispensary | null> {
const result = await pool.query(
`SELECT id, name, website, menu_url, menu_provider, menu_provider_confidence,
crawler_mode, crawler_status, scraper_template
FROM dispensaries WHERE id = $1`,
[dispensaryId]
);
return result.rows[0] || null;
}
async function updateDispensary(
dispensaryId: number,
updates: Partial<Dispensary> & { last_menu_error_at?: Date; last_error_message?: string; provider_detection_data?: any; last_menu_scrape?: Date; menu_scrape_status?: string }
): Promise<void> {
const setClauses: string[] = [];
const values: any[] = [];
let paramIndex = 1;
for (const [key, value] of Object.entries(updates)) {
setClauses.push(`${key} = $${paramIndex}`);
values.push(value);
paramIndex++;
}
setClauses.push(`updated_at = NOW()`);
values.push(dispensaryId);
await pool.query(
`UPDATE dispensaries SET ${setClauses.join(', ')} WHERE id = $${paramIndex}`,
values
);
}
async function createSandboxEntry(
dispensaryId: number,
suspectedProvider: string | null,
mode: string,
detectionSignals?: any
): Promise<number> {
// First, check if there's an existing active sandbox
const existing = await pool.query(
`SELECT id FROM crawler_sandboxes
WHERE dispensary_id = $1 AND status NOT IN ('moved_to_production', 'failed')`,
[dispensaryId]
);
if (existing.rows.length > 0) {
// Update existing
await pool.query(
`UPDATE crawler_sandboxes
SET suspected_menu_provider = $2, mode = $3, detection_signals = COALESCE($4, detection_signals), updated_at = NOW()
WHERE id = $1`,
[existing.rows[0].id, suspectedProvider, mode, detectionSignals ? JSON.stringify(detectionSignals) : null]
);
return existing.rows[0].id;
}
// Create new
const result = await pool.query(
`INSERT INTO crawler_sandboxes (dispensary_id, suspected_menu_provider, mode, detection_signals, status)
VALUES ($1, $2, $3, $4, 'pending')
RETURNING id`,
[dispensaryId, suspectedProvider, mode, detectionSignals ? JSON.stringify(detectionSignals) : '{}']
);
return result.rows[0].id;
}
async function createSandboxJob(
dispensaryId: number,
sandboxId: number | null,
jobType: string,
priority: number = 0
): Promise<number> {
const result = await pool.query(
`INSERT INTO sandbox_crawl_jobs (dispensary_id, sandbox_id, job_type, status, priority)
VALUES ($1, $2, $3, 'pending', $4)
RETURNING id`,
[dispensaryId, sandboxId, jobType, priority]
);
return result.rows[0].id;
}
// Get linked store ID for a dispensary (for using existing scraper)
async function getStoreIdForDispensary(dispensaryId: number): Promise<number | null> {
// Check if there's a stores entry linked to this dispensary
const result = await pool.query(
`SELECT s.id FROM stores s
JOIN dispensaries d ON d.menu_url = s.dutchie_plus_url OR d.name ILIKE '%' || s.name || '%'
WHERE d.id = $1
LIMIT 1`,
[dispensaryId]
);
if (result.rows.length > 0) {
return result.rows[0].id;
}
// Try to find by website
const result2 = await pool.query(
`SELECT s.id FROM stores s
JOIN dispensaries d ON d.website ILIKE '%' || s.slug || '%'
WHERE d.id = $1
LIMIT 1`,
[dispensaryId]
);
return result2.rows[0]?.id || null;
}
// ========================================
// Job 1: Detect Menu Provider
// ========================================
export async function runDetectMenuProviderJob(dispensaryId: number): Promise<JobResult> {
logger.info('crawler-jobs', `Starting menu provider detection for dispensary ${dispensaryId}`);
const dispensary = await getDispensary(dispensaryId);
if (!dispensary) {
return { success: false, message: `Dispensary ${dispensaryId} not found` };
}
// Check for website URL
const websiteUrl = dispensary.website || dispensary.menu_url;
if (!websiteUrl) {
await updateDispensary(dispensaryId, {
crawler_status: 'error_needs_review',
last_menu_error_at: new Date(),
last_error_message: 'No website URL available for detection',
});
return { success: false, message: 'No website URL available' };
}
try {
// Run detection
const detection = await detectMenuProvider(websiteUrl, {
checkMenuPaths: true,
timeout: 30000,
});
// Update dispensary with results
const updates: any = {
menu_provider: detection.provider,
menu_provider_confidence: detection.confidence,
provider_detection_data: JSON.stringify({
signals: detection.signals,
urlsTested: detection.urlsTested,
menuEntryPoints: detection.menuEntryPoints,
rawSignals: detection.rawSignals,
detectedAt: new Date().toISOString(),
}),
crawler_status: 'idle',
};
// Decide crawler mode based on provider
if (detection.provider === 'dutchie' && detection.confidence >= 70) {
// Dutchie with high confidence -> production
updates.crawler_mode = 'production';
logger.info('crawler-jobs', `Dispensary ${dispensaryId} detected as Dutchie (${detection.confidence}%), setting to production`);
} else {
// Unknown or non-Dutchie -> sandbox
updates.crawler_mode = 'sandbox';
// Create sandbox entry for further analysis
const sandboxId = await createSandboxEntry(
dispensaryId,
detection.provider,
'detection',
{
signals: detection.signals,
rawSignals: detection.rawSignals,
}
);
// Queue sandbox crawl job
await createSandboxJob(dispensaryId, sandboxId, 'detection');
logger.info('crawler-jobs', `Dispensary ${dispensaryId} detected as ${detection.provider} (${detection.confidence}%), setting to sandbox`);
}
// Update menu entry points if found
if (detection.menuEntryPoints.length > 0 && !dispensary.menu_url) {
updates.menu_url = detection.menuEntryPoints[0];
}
await updateDispensary(dispensaryId, updates);
return {
success: true,
message: `Detected provider: ${detection.provider} (${detection.confidence}%)`,
data: {
provider: detection.provider,
confidence: detection.confidence,
mode: updates.crawler_mode,
menuEntryPoints: detection.menuEntryPoints,
},
};
} catch (error: any) {
logger.error('crawler-jobs', `Detection failed for dispensary ${dispensaryId}: ${error.message}`);
await updateDispensary(dispensaryId, {
crawler_status: 'error_needs_review',
last_menu_error_at: new Date(),
last_error_message: `Detection failed: ${error.message}`,
});
return { success: false, message: error.message };
}
}
// ========================================
// Job 2: Dutchie Menu Crawl (Production)
// ========================================
export async function runDutchieMenuCrawlJob(dispensaryId: number): Promise<JobResult> {
logger.info('crawler-jobs', `Starting Dutchie production crawl for dispensary ${dispensaryId}`);
const dispensary = await getDispensary(dispensaryId);
if (!dispensary) {
return { success: false, message: `Dispensary ${dispensaryId} not found` };
}
// Verify it's a Dutchie production dispensary
if (dispensary.menu_provider !== 'dutchie') {
logger.warn('crawler-jobs', `Dispensary ${dispensaryId} is not Dutchie, skipping production crawl`);
return { success: false, message: 'Not a Dutchie dispensary' };
}
if (dispensary.crawler_mode !== 'production') {
logger.warn('crawler-jobs', `Dispensary ${dispensaryId} is not in production mode, skipping`);
return { success: false, message: 'Not in production mode' };
}
// Find linked store ID
const storeId = await getStoreIdForDispensary(dispensaryId);
if (!storeId) {
// Need to create a store entry or handle differently
logger.warn('crawler-jobs', `No linked store found for dispensary ${dispensaryId}`);
return { success: false, message: 'No linked store found - needs setup' };
}
try {
// Update status to running
await updateDispensary(dispensaryId, { crawler_status: 'running' });
// Run the existing Dutchie scraper
await scrapeStore(storeId, 3); // 3 parallel workers
// Update success status
await updateDispensary(dispensaryId, {
crawler_status: 'ok',
last_menu_scrape: new Date() as any,
menu_scrape_status: 'active' as any,
});
logger.info('crawler-jobs', `Dutchie crawl completed for dispensary ${dispensaryId}`);
return {
success: true,
message: 'Dutchie crawl completed successfully',
data: { storeId },
};
} catch (error: any) {
logger.error('crawler-jobs', `Dutchie crawl failed for dispensary ${dispensaryId}: ${error.message}`);
// Check if this might be a provider change
let providerChanged = false;
try {
const browser = await puppeteer.launch({ headless: true, args: ['--no-sandbox'] });
const page = await browser.newPage();
const url = dispensary.menu_url || dispensary.website;
if (url) {
await page.goto(url, { waitUntil: 'networkidle2', timeout: 30000 });
const changeResult = await detectProviderChange(page, 'dutchie');
providerChanged = changeResult.changed;
if (providerChanged) {
// Provider changed - move to sandbox
await updateDispensary(dispensaryId, {
crawler_mode: 'sandbox',
crawler_status: 'error_needs_review',
last_menu_error_at: new Date(),
last_error_message: `Provider appears to have changed from Dutchie to ${changeResult.newProvider}`,
});
const sandboxId = await createSandboxEntry(
dispensaryId,
changeResult.newProvider || 'unknown',
'detection',
{ providerChangeDetected: true, previousProvider: 'dutchie' }
);
await createSandboxJob(dispensaryId, sandboxId, 'detection');
logger.warn('crawler-jobs', `Provider change detected for dispensary ${dispensaryId}: Dutchie -> ${changeResult.newProvider}`);
}
}
await browser.close();
} catch {
// Ignore detection errors during failure handling
}
if (!providerChanged) {
await updateDispensary(dispensaryId, {
crawler_status: 'error_needs_review',
last_menu_error_at: new Date(),
last_error_message: error.message,
});
}
return { success: false, message: error.message };
}
}
// ========================================
// Job 3: Sandbox Crawl (Learning Mode)
// ========================================
export async function runSandboxCrawlJob(dispensaryId: number, sandboxId?: number): Promise<JobResult> {
logger.info('crawler-jobs', `Starting sandbox crawl for dispensary ${dispensaryId}`);
const dispensary = await getDispensary(dispensaryId);
if (!dispensary) {
return { success: false, message: `Dispensary ${dispensaryId} not found` };
}
// Get or create sandbox entry
let sandbox: any;
if (sandboxId) {
const result = await pool.query('SELECT * FROM crawler_sandboxes WHERE id = $1', [sandboxId]);
sandbox = result.rows[0];
} else {
const result = await pool.query(
`SELECT * FROM crawler_sandboxes
WHERE dispensary_id = $1 AND status NOT IN ('moved_to_production', 'failed')
ORDER BY created_at DESC LIMIT 1`,
[dispensaryId]
);
sandbox = result.rows[0];
if (!sandbox) {
const newSandboxId = await createSandboxEntry(dispensaryId, dispensary.menu_provider, 'template_learning');
const result = await pool.query('SELECT * FROM crawler_sandboxes WHERE id = $1', [newSandboxId]);
sandbox = result.rows[0];
}
}
const websiteUrl = dispensary.menu_url || dispensary.website;
if (!websiteUrl) {
await pool.query(
`UPDATE crawler_sandboxes SET status = 'failed', failure_reason = 'No website URL' WHERE id = $1`,
[sandbox.id]
);
return { success: false, message: 'No website URL available' };
}
let browser: Browser | null = null;
try {
// Update status
await pool.query(
`UPDATE crawler_sandboxes SET status = 'analyzing', updated_at = NOW() WHERE id = $1`,
[sandbox.id]
);
await updateDispensary(dispensaryId, { crawler_status: 'running' });
// Launch browser
browser = await puppeteer.launch({
headless: true,
args: ['--no-sandbox', '--disable-setuid-sandbox'],
});
const page = await browser.newPage();
await page.setUserAgent(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
);
// URLs to crawl (limited depth for sandbox)
const urlsToVisit = [websiteUrl];
const menuPaths = ['/menu', '/shop', '/products', '/order'];
for (const path of menuPaths) {
const baseUrl = new URL(websiteUrl).origin;
urlsToVisit.push(`${baseUrl}${path}`);
}
const urlsTested: string[] = [];
const menuEntryPoints: string[] = [];
const capturedHtml: { url: string; html: string }[] = [];
const analysisData: any = {
provider_signals: {},
selector_candidates: [],
page_structures: [],
};
// Crawl each URL
for (const url of urlsToVisit) {
try {
urlsTested.push(url);
await page.goto(url, { waitUntil: 'networkidle2', timeout: 30000 });
await new Promise(r => setTimeout(r, 2000)); // Wait for dynamic content
// Get page HTML
const html = await page.content();
// Check if this looks like a menu page
const hasMenuContent = await page.evaluate(() => {
const text = document.body.innerText.toLowerCase();
return (
text.includes('add to cart') ||
text.includes('thc') ||
text.includes('indica') ||
text.includes('sativa')
);
});
if (hasMenuContent) {
menuEntryPoints.push(url);
capturedHtml.push({ url, html });
// Analyze page structure for selector candidates
const structure = await page.evaluate(() => {
const candidates: any[] = [];
// Look for product-like containers
const productSelectors = [
'.product', '.product-card', '.menu-item', '.item-card',
'[data-product]', '[data-item]', '.strain', '.listing',
];
for (const selector of productSelectors) {
const els = document.querySelectorAll(selector);
if (els.length > 3) { // Likely a list
candidates.push({
selector,
count: els.length,
type: 'product_container',
});
}
}
// Look for price patterns
const pricePattern = /\$\d+(\.\d{2})?/;
const textNodes = document.body.innerText;
const priceMatches = textNodes.match(/\$\d+(\.\d{2})?/g);
return {
candidates,
priceCount: priceMatches?.length || 0,
hasAddToCart: textNodes.toLowerCase().includes('add to cart'),
};
});
analysisData.page_structures.push({
url,
...structure,
});
}
} catch (pageError: any) {
if (!pageError.message.includes('404')) {
logger.warn('crawler-jobs', `Sandbox crawl error for ${url}: ${pageError.message}`);
}
}
}
// Save HTML to storage (local for now, S3 later)
let rawHtmlLocation: string | null = null;
if (capturedHtml.length > 0) {
const htmlDir = path.join(process.cwd(), 'sandbox-data', `dispensary-${dispensaryId}`);
await fs.mkdir(htmlDir, { recursive: true });
for (const { url, html } of capturedHtml) {
const filename = `${Date.now()}-${url.replace(/[^a-z0-9]/gi, '_')}.html`;
await fs.writeFile(path.join(htmlDir, filename), html);
}
rawHtmlLocation = htmlDir;
}
// Update sandbox with results
await pool.query(
`UPDATE crawler_sandboxes SET
status = $1,
urls_tested = $2,
menu_entry_points = $3,
raw_html_location = $4,
analysis_json = $5,
confidence_score = $6,
analyzed_at = NOW(),
updated_at = NOW()
WHERE id = $7`,
[
menuEntryPoints.length > 0 ? 'needs_human_review' : 'pending',
JSON.stringify(urlsTested),
JSON.stringify(menuEntryPoints),
rawHtmlLocation,
JSON.stringify(analysisData),
menuEntryPoints.length > 0 ? 50 : 20,
sandbox.id,
]
);
// Update dispensary status
await updateDispensary(dispensaryId, {
crawler_status: 'error_needs_review', // Sandbox results need review
});
logger.info('crawler-jobs', `Sandbox crawl completed for dispensary ${dispensaryId}: ${menuEntryPoints.length} menu pages found`);
return {
success: true,
message: `Sandbox crawl completed. Found ${menuEntryPoints.length} menu entry points.`,
data: {
sandboxId: sandbox.id,
urlsTested: urlsTested.length,
menuEntryPoints,
analysisData,
},
};
} catch (error: any) {
logger.error('crawler-jobs', `Sandbox crawl failed for dispensary ${dispensaryId}: ${error.message}`);
await pool.query(
`UPDATE crawler_sandboxes SET status = 'failed', failure_reason = $1 WHERE id = $2`,
[error.message, sandbox.id]
);
await updateDispensary(dispensaryId, {
crawler_status: 'error_needs_review',
last_menu_error_at: new Date(),
last_error_message: `Sandbox crawl failed: ${error.message}`,
});
return { success: false, message: error.message };
} finally {
if (browser) {
await browser.close();
}
}
}
// ========================================
// Queue Processing Functions
// ========================================
/**
* Process pending sandbox jobs
*/
export async function processSandboxJobs(limit: number = 5): Promise<void> {
// Claim pending jobs
const jobs = await pool.query(
`UPDATE sandbox_crawl_jobs
SET status = 'running', worker_id = $1, started_at = NOW()
WHERE id IN (
SELECT id FROM sandbox_crawl_jobs
WHERE status = 'pending' AND scheduled_at <= NOW()
ORDER BY priority DESC, scheduled_at ASC
LIMIT $2
FOR UPDATE SKIP LOCKED
)
RETURNING *`,
[WORKER_ID, limit]
);
for (const job of jobs.rows) {
try {
let result: JobResult;
if (job.job_type === 'detection') {
result = await runDetectMenuProviderJob(job.dispensary_id);
} else {
result = await runSandboxCrawlJob(job.dispensary_id, job.sandbox_id);
}
await pool.query(
`UPDATE sandbox_crawl_jobs
SET status = $1, completed_at = NOW(), result_summary = $2, error_message = $3
WHERE id = $4`,
[
result.success ? 'completed' : 'failed',
JSON.stringify(result.data || {}),
result.success ? null : result.message,
job.id,
]
);
} catch (error: any) {
await pool.query(
`UPDATE sandbox_crawl_jobs SET status = 'failed', error_message = $1 WHERE id = $2`,
[error.message, job.id]
);
}
}
}

View File

@@ -0,0 +1,414 @@
/**
* CrawlerLogger - Structured logging for crawler operations
*
* High-signal, low-noise logging with JSON output for:
* - Job lifecycle (one summary per job)
* - Provider/mode changes
* - Sandbox events
* - Queue failures
*
* NO per-product logging - that's too noisy.
*/
export type LogLevel = 'info' | 'warn' | 'error' | 'debug';
export type LogEvent =
| 'job_started'
| 'job_completed'
| 'job_failed'
| 'job_cancelled'
| 'provider_detected'
| 'provider_changed'
| 'mode_changed'
| 'sandbox_started'
| 'sandbox_completed'
| 'sandbox_failed'
| 'queue_failure'
| 'detection_scan'
| 'crawl_batch'
| 'intelligence_run';
interface BaseLogPayload {
timestamp: string;
level: LogLevel;
event: LogEvent;
dispensary_id?: number;
store_id?: number;
job_id?: number;
provider?: string;
category?: 'product' | 'specials' | 'brand' | 'metadata';
}
interface JobStartedPayload extends BaseLogPayload {
event: 'job_started';
job_type: string;
trigger_type: string;
store_name: string;
}
interface JobCompletedPayload extends BaseLogPayload {
event: 'job_completed';
store_name: string;
duration_ms: number;
products_found: number;
products_new: number;
products_updated: number;
products_marked_oos?: number;
}
interface JobFailedPayload extends BaseLogPayload {
event: 'job_failed';
store_name: string;
duration_ms: number;
error_message: string;
error_code?: string;
}
interface ProviderDetectedPayload extends BaseLogPayload {
event: 'provider_detected';
dispensary_name: string;
detected_provider: string;
confidence: number;
detection_method: string;
menu_url?: string;
}
interface ProviderChangedPayload extends BaseLogPayload {
event: 'provider_changed';
dispensary_name: string;
old_provider: string | null;
new_provider: string;
old_confidence: number;
new_confidence: number;
}
interface ModeChangedPayload extends BaseLogPayload {
event: 'mode_changed';
dispensary_name: string;
old_mode: string;
new_mode: string;
reason: string;
}
interface SandboxEventPayload extends BaseLogPayload {
event: 'sandbox_started' | 'sandbox_completed' | 'sandbox_failed';
dispensary_name: string;
template_name: string;
quality_score?: number;
products_extracted?: number;
fields_missing?: number;
error_message?: string;
}
interface QueueFailurePayload extends BaseLogPayload {
event: 'queue_failure';
queue_type: string;
error_message: string;
affected_items?: number;
}
interface DetectionScanPayload extends BaseLogPayload {
event: 'detection_scan';
total_scanned: number;
detected: number;
failed: number;
skipped: number;
duration_ms: number;
}
interface IntelligenceRunPayload extends BaseLogPayload {
event: 'intelligence_run';
run_type: 'detection' | 'production' | 'sandbox' | 'full';
dispensaries_processed: number;
jobs_queued: number;
duration_ms: number;
}
type LogPayload =
| JobStartedPayload
| JobCompletedPayload
| JobFailedPayload
| ProviderDetectedPayload
| ProviderChangedPayload
| ModeChangedPayload
| SandboxEventPayload
| QueueFailurePayload
| DetectionScanPayload
| IntelligenceRunPayload;
class CrawlerLoggerService {
private formatLog(payload: LogPayload): string {
return JSON.stringify(payload);
}
private log(payload: LogPayload): void {
const formatted = this.formatLog(payload);
switch (payload.level) {
case 'error':
console.error(`[CRAWLER] ${formatted}`);
break;
case 'warn':
console.warn(`[CRAWLER] ${formatted}`);
break;
case 'debug':
console.debug(`[CRAWLER] ${formatted}`);
break;
default:
console.log(`[CRAWLER] ${formatted}`);
}
}
/**
* Log when a crawl job starts
*/
jobStarted(params: {
job_id: number;
store_id: number;
store_name: string;
job_type: string;
trigger_type: string;
provider?: string;
}): void {
this.log({
timestamp: new Date().toISOString(),
level: 'info',
event: 'job_started',
job_id: params.job_id,
store_id: params.store_id,
store_name: params.store_name,
job_type: params.job_type,
trigger_type: params.trigger_type,
provider: params.provider,
});
}
/**
* Log when a crawl job completes successfully
*/
jobCompleted(params: {
job_id: number;
store_id: number;
store_name: string;
duration_ms: number;
products_found: number;
products_new: number;
products_updated: number;
products_marked_oos?: number;
provider?: string;
}): void {
this.log({
timestamp: new Date().toISOString(),
level: 'info',
event: 'job_completed',
job_id: params.job_id,
store_id: params.store_id,
store_name: params.store_name,
duration_ms: params.duration_ms,
products_found: params.products_found,
products_new: params.products_new,
products_updated: params.products_updated,
products_marked_oos: params.products_marked_oos,
provider: params.provider,
});
}
/**
* Log when a crawl job fails
*/
jobFailed(params: {
job_id: number;
store_id: number;
store_name: string;
duration_ms: number;
error_message: string;
error_code?: string;
provider?: string;
}): void {
this.log({
timestamp: new Date().toISOString(),
level: 'error',
event: 'job_failed',
job_id: params.job_id,
store_id: params.store_id,
store_name: params.store_name,
duration_ms: params.duration_ms,
error_message: params.error_message,
error_code: params.error_code,
provider: params.provider,
});
}
/**
* Log when a provider is detected for a dispensary
*/
providerDetected(params: {
dispensary_id: number;
dispensary_name: string;
detected_provider: string;
confidence: number;
detection_method: string;
menu_url?: string;
category?: 'product' | 'specials' | 'brand' | 'metadata';
}): void {
this.log({
timestamp: new Date().toISOString(),
level: 'info',
event: 'provider_detected',
dispensary_id: params.dispensary_id,
dispensary_name: params.dispensary_name,
detected_provider: params.detected_provider,
confidence: params.confidence,
detection_method: params.detection_method,
menu_url: params.menu_url,
category: params.category,
});
}
/**
* Log when a dispensary's provider changes
*/
providerChanged(params: {
dispensary_id: number;
dispensary_name: string;
old_provider: string | null;
new_provider: string;
old_confidence: number;
new_confidence: number;
category?: 'product' | 'specials' | 'brand' | 'metadata';
}): void {
this.log({
timestamp: new Date().toISOString(),
level: 'info',
event: 'provider_changed',
dispensary_id: params.dispensary_id,
dispensary_name: params.dispensary_name,
old_provider: params.old_provider,
new_provider: params.new_provider,
old_confidence: params.old_confidence,
new_confidence: params.new_confidence,
category: params.category,
});
}
/**
* Log when a dispensary's crawler mode changes (sandbox -> production, etc.)
*/
modeChanged(params: {
dispensary_id: number;
dispensary_name: string;
old_mode: string;
new_mode: string;
reason: string;
category?: 'product' | 'specials' | 'brand' | 'metadata';
provider?: string;
}): void {
this.log({
timestamp: new Date().toISOString(),
level: 'info',
event: 'mode_changed',
dispensary_id: params.dispensary_id,
dispensary_name: params.dispensary_name,
old_mode: params.old_mode,
new_mode: params.new_mode,
reason: params.reason,
category: params.category,
provider: params.provider,
});
}
/**
* Log sandbox crawl events
*/
sandboxEvent(params: {
event: 'sandbox_started' | 'sandbox_completed' | 'sandbox_failed';
dispensary_id: number;
dispensary_name: string;
template_name: string;
category?: 'product' | 'specials' | 'brand' | 'metadata';
quality_score?: number;
products_extracted?: number;
fields_missing?: number;
error_message?: string;
provider?: string;
}): void {
const level: LogLevel = params.event === 'sandbox_failed' ? 'error' : 'info';
this.log({
timestamp: new Date().toISOString(),
level,
event: params.event,
dispensary_id: params.dispensary_id,
dispensary_name: params.dispensary_name,
template_name: params.template_name,
category: params.category,
quality_score: params.quality_score,
products_extracted: params.products_extracted,
fields_missing: params.fields_missing,
error_message: params.error_message,
provider: params.provider,
});
}
/**
* Log queue processing failures
*/
queueFailure(params: {
queue_type: string;
error_message: string;
affected_items?: number;
}): void {
this.log({
timestamp: new Date().toISOString(),
level: 'error',
event: 'queue_failure',
queue_type: params.queue_type,
error_message: params.error_message,
affected_items: params.affected_items,
});
}
/**
* Log detection scan summary
*/
detectionScan(params: {
total_scanned: number;
detected: number;
failed: number;
skipped: number;
duration_ms: number;
}): void {
this.log({
timestamp: new Date().toISOString(),
level: 'info',
event: 'detection_scan',
total_scanned: params.total_scanned,
detected: params.detected,
failed: params.failed,
skipped: params.skipped,
duration_ms: params.duration_ms,
});
}
/**
* Log intelligence run summary
*/
intelligenceRun(params: {
run_type: 'detection' | 'production' | 'sandbox' | 'full';
dispensaries_processed: number;
jobs_queued: number;
duration_ms: number;
}): void {
this.log({
timestamp: new Date().toISOString(),
level: 'info',
event: 'intelligence_run',
run_type: params.run_type,
dispensaries_processed: params.dispensaries_processed,
jobs_queued: params.jobs_queued,
duration_ms: params.duration_ms,
});
}
}
// Export singleton instance
export const crawlerLogger = new CrawlerLoggerService();

View File

@@ -0,0 +1,620 @@
/**
* Multi-Category Intelligence Detector
*
* Detects providers for each intelligence category independently:
* - Products: Which provider serves product data
* - Specials: Which provider serves deals/specials
* - Brand: Which provider serves brand information
* - Metadata: Which provider serves taxonomy/category data
*/
import { pool } from '../db/migrate';
import { logger } from './logger';
import puppeteer, { Browser, Page } from 'puppeteer';
// ========================================
// Types
// ========================================
export type IntelligenceCategory = 'product' | 'specials' | 'brand' | 'metadata';
export type MenuProvider =
| 'dutchie'
| 'treez'
| 'jane'
| 'iheartjane'
| 'weedmaps'
| 'leafly'
| 'meadow'
| 'greenlight'
| 'blaze'
| 'flowhub'
| 'dispense'
| 'cova'
| 'custom_html'
| 'custom_json'
| 'dutchie_json'
| 'other'
| 'unknown';
export interface CategoryDetectionResult {
provider: MenuProvider;
confidence: number;
mode: 'production' | 'sandbox';
signals: Record<string, any>;
templateName?: string;
}
export interface MultiCategoryDetectionResult {
product: CategoryDetectionResult;
specials: CategoryDetectionResult;
brand: CategoryDetectionResult;
metadata: CategoryDetectionResult;
urlsTested: string[];
rawSignals: Record<string, any>;
}
// Production-ready providers per category
// Only these combinations can be set to production mode
const PRODUCTION_READY: Record<IntelligenceCategory, MenuProvider[]> = {
product: ['dutchie'], // Only Dutchie products are production-ready
specials: [], // None yet
brand: [], // None yet
metadata: [], // None yet
};
// Provider detection patterns
const PROVIDER_PATTERNS: Record<string, {
scripts: RegExp[];
iframes: RegExp[];
html: RegExp[];
apiEndpoints: RegExp[];
metaTags: RegExp[];
}> = {
dutchie: {
scripts: [
/dutchie\.com/i,
/dutchie-plus/i,
/dutchie\.js/i,
/__DUTCHIE__/i,
/dutchie-embed/i,
],
iframes: [
/dutchie\.com/i,
/dutchie-plus\.com/i,
/embed\.dutchie/i,
],
html: [
/class="dutchie/i,
/id="dutchie/i,
/data-dutchie/i,
/"menuType":\s*"dutchie"/i,
],
apiEndpoints: [
/dutchie\.com\/graphql/i,
/plus\.dutchie\.com/i,
],
metaTags: [
/dutchie/i,
],
},
treez: {
scripts: [
/treez\.io/i,
/treez-ecommerce/i,
/treez\.js/i,
],
iframes: [
/treez\.io/i,
/shop\.treez/i,
],
html: [
/class="treez/i,
/data-treez/i,
/treez-menu/i,
],
apiEndpoints: [
/api\.treez\.io/i,
/treez\.io\/api/i,
],
metaTags: [],
},
jane: {
scripts: [
/jane\.co/i,
/iheartjane\.com/i,
/jane-frame/i,
/jane\.js/i,
],
iframes: [
/jane\.co/i,
/iheartjane\.com/i,
/embed\.iheartjane/i,
],
html: [
/class="jane/i,
/data-jane/i,
/jane-embed/i,
],
apiEndpoints: [
/api\.iheartjane/i,
/jane\.co\/api/i,
],
metaTags: [],
},
weedmaps: {
scripts: [
/weedmaps\.com/i,
/wm-menu/i,
],
iframes: [
/weedmaps\.com/i,
/menu\.weedmaps/i,
],
html: [
/data-weedmaps/i,
/wm-menu/i,
],
apiEndpoints: [
/api-g\.weedmaps/i,
/weedmaps\.com\/api/i,
],
metaTags: [],
},
leafly: {
scripts: [
/leafly\.com/i,
/leafly-menu/i,
],
iframes: [
/leafly\.com/i,
/order\.leafly/i,
],
html: [
/data-leafly/i,
/leafly-embed/i,
],
apiEndpoints: [
/api\.leafly/i,
],
metaTags: [],
},
};
// Category-specific detection signals
const CATEGORY_SIGNALS: Record<IntelligenceCategory, {
urlPatterns: RegExp[];
htmlPatterns: RegExp[];
jsonKeys: string[];
}> = {
product: {
urlPatterns: [/\/menu/i, /\/products/i, /\/shop/i, /\/order/i],
htmlPatterns: [/product-card/i, /menu-item/i, /product-list/i, /product-grid/i],
jsonKeys: ['products', 'menuItems', 'items', 'inventory'],
},
specials: {
urlPatterns: [/\/specials/i, /\/deals/i, /\/promotions/i, /\/offers/i],
htmlPatterns: [/special/i, /deal/i, /promotion/i, /discount/i, /sale/i],
jsonKeys: ['specials', 'deals', 'promotions', 'offers'],
},
brand: {
urlPatterns: [/\/brands/i, /\/vendors/i, /\/producers/i],
htmlPatterns: [/brand-list/i, /vendor/i, /producer/i, /manufacturer/i],
jsonKeys: ['brands', 'vendors', 'producers', 'manufacturers'],
},
metadata: {
urlPatterns: [/\/categories/i, /\/taxonomy/i],
htmlPatterns: [/category-nav/i, /menu-categories/i, /filter-category/i],
jsonKeys: ['categories', 'taxonomy', 'filters', 'types'],
},
};
// ========================================
// Main Detection Function
// ========================================
export async function detectMultiCategoryProviders(
websiteUrl: string,
options: {
timeout?: number;
headless?: boolean;
existingBrowser?: Browser;
} = {}
): Promise<MultiCategoryDetectionResult> {
const { timeout = 30000, headless = true, existingBrowser } = options;
let browser: Browser | null = null;
let page: Page | null = null;
const urlsTested: string[] = [];
const rawSignals: Record<string, any> = {};
try {
browser = existingBrowser || await puppeteer.launch({
headless,
args: ['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage'],
});
page = await browser.newPage();
await page.setViewport({ width: 1920, height: 1080 });
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36');
// Navigate to main site
const baseUrl = normalizeUrl(websiteUrl);
urlsTested.push(baseUrl);
await page.goto(baseUrl, { waitUntil: 'networkidle2', timeout });
// Collect signals from main page
const mainPageSignals = await collectPageSignals(page);
rawSignals.mainPage = mainPageSignals;
// Try common menu URLs
const menuUrls = ['/menu', '/shop', '/products', '/order', '/specials', '/deals', '/brands'];
for (const path of menuUrls) {
try {
const fullUrl = new URL(path, baseUrl).toString();
urlsTested.push(fullUrl);
await page.goto(fullUrl, { waitUntil: 'networkidle2', timeout: 15000 });
const signals = await collectPageSignals(page);
rawSignals[path] = signals;
} catch {
// URL doesn't exist or timed out
}
}
// Analyze signals for each category
const result: MultiCategoryDetectionResult = {
product: analyzeCategorySignals('product', rawSignals),
specials: analyzeCategorySignals('specials', rawSignals),
brand: analyzeCategorySignals('brand', rawSignals),
metadata: analyzeCategorySignals('metadata', rawSignals),
urlsTested,
rawSignals,
};
logger.info('provider-detection', `Multi-category detection complete for ${websiteUrl}`);
return result;
} catch (error: any) {
logger.error('provider-detection', `Detection failed for ${websiteUrl}: ${error.message}`);
// Return unknown results for all categories
return {
product: createUnknownResult(),
specials: createUnknownResult(),
brand: createUnknownResult(),
metadata: createUnknownResult(),
urlsTested,
rawSignals: { error: error.message },
};
} finally {
if (page) await page.close().catch(() => {});
if (browser && !existingBrowser) await browser.close().catch(() => {});
}
}
// ========================================
// Helper Functions
// ========================================
function normalizeUrl(url: string): string {
if (!url.startsWith('http')) {
url = 'https://' + url;
}
return url.replace(/\/$/, '');
}
async function collectPageSignals(page: Page): Promise<Record<string, any>> {
return page.evaluate(() => {
const signals: Record<string, any> = {
scripts: [] as string[],
iframes: [] as string[],
links: [] as string[],
metaTags: [] as string[],
bodyClasses: document.body?.className || '',
bodyId: document.body?.id || '',
htmlSnippet: document.documentElement.outerHTML.slice(0, 10000),
};
// Collect script sources
document.querySelectorAll('script[src]').forEach((el) => {
signals.scripts.push((el as HTMLScriptElement).src);
});
// Collect inline scripts
document.querySelectorAll('script:not([src])').forEach((el) => {
const content = el.textContent || '';
if (content.length < 5000) {
signals.scripts.push(`inline:${content.slice(0, 500)}`);
}
});
// Collect iframes
document.querySelectorAll('iframe').forEach((el) => {
signals.iframes.push(el.src);
});
// Collect links
document.querySelectorAll('a[href]').forEach((el) => {
signals.links.push((el as HTMLAnchorElement).href);
});
// Collect meta tags
document.querySelectorAll('meta').forEach((el) => {
const content = el.getAttribute('content') || '';
const name = el.getAttribute('name') || el.getAttribute('property') || '';
if (content || name) {
signals.metaTags.push(`${name}:${content}`);
}
});
// Look for JSON data
const jsonBlocks: string[] = [];
document.querySelectorAll('script[type="application/json"]').forEach((el) => {
jsonBlocks.push(el.textContent?.slice(0, 2000) || '');
});
signals.jsonBlocks = jsonBlocks;
return signals;
});
}
function analyzeCategorySignals(
category: IntelligenceCategory,
allSignals: Record<string, any>
): CategoryDetectionResult {
const providerScores: Record<MenuProvider, number> = {} as any;
const detectedSignals: Record<string, any> = {};
// Initialize scores
for (const provider of Object.keys(PROVIDER_PATTERNS)) {
providerScores[provider as MenuProvider] = 0;
}
// Analyze each page's signals
for (const [pagePath, signals] of Object.entries(allSignals)) {
if (!signals || typeof signals !== 'object') continue;
// Check for provider-specific patterns
for (const [provider, patterns] of Object.entries(PROVIDER_PATTERNS)) {
let score = 0;
// Check scripts
if (signals.scripts) {
for (const script of signals.scripts) {
for (const pattern of patterns.scripts) {
if (pattern.test(script)) {
score += 20;
detectedSignals[`${provider}_script_${pagePath}`] = script;
}
}
}
}
// Check iframes
if (signals.iframes) {
for (const iframe of signals.iframes) {
for (const pattern of patterns.iframes) {
if (pattern.test(iframe)) {
score += 25;
detectedSignals[`${provider}_iframe_${pagePath}`] = iframe;
}
}
}
}
// Check HTML content
if (signals.htmlSnippet) {
for (const pattern of patterns.html) {
if (pattern.test(signals.htmlSnippet)) {
score += 15;
detectedSignals[`${provider}_html_${pagePath}`] = true;
}
}
}
providerScores[provider as MenuProvider] += score;
}
// Check for category-specific signals on relevant pages
const categorySignals = CATEGORY_SIGNALS[category];
const isRelevantPage = categorySignals.urlPatterns.some((p) => p.test(pagePath));
if (isRelevantPage && signals.htmlSnippet) {
for (const pattern of categorySignals.htmlPatterns) {
if (pattern.test(signals.htmlSnippet)) {
detectedSignals[`${category}_html_pattern`] = true;
}
}
}
// Check JSON blocks for category data
if (signals.jsonBlocks) {
for (const json of signals.jsonBlocks) {
for (const key of categorySignals.jsonKeys) {
if (json.toLowerCase().includes(`"${key}"`)) {
detectedSignals[`${category}_json_key_${key}`] = true;
}
}
}
}
}
// Determine winning provider
let bestProvider: MenuProvider = 'unknown';
let bestScore = 0;
for (const [provider, score] of Object.entries(providerScores)) {
if (score > bestScore) {
bestScore = score;
bestProvider = provider as MenuProvider;
}
}
// Calculate confidence (0-100)
const confidence = Math.min(100, bestScore);
// Determine mode based on provider and confidence
const isProductionReady = PRODUCTION_READY[category].includes(bestProvider);
const mode: 'production' | 'sandbox' = isProductionReady && confidence >= 70
? 'production'
: 'sandbox';
// Get template name if available
let templateName: string | undefined;
if (bestProvider === 'dutchie' && category === 'product') {
templateName = 'dutchie_standard';
} else if (bestProvider === 'treez') {
templateName = 'treez_products_v0';
}
return {
provider: bestProvider,
confidence,
mode,
signals: detectedSignals,
templateName,
};
}
function createUnknownResult(): CategoryDetectionResult {
return {
provider: 'unknown',
confidence: 0,
mode: 'sandbox',
signals: {},
};
}
// ========================================
// Lightweight Per-Category Change Detection
// ========================================
export async function detectCategoryProviderChange(
page: Page,
category: IntelligenceCategory,
expectedProvider: MenuProvider
): Promise<{ changed: boolean; newProvider?: MenuProvider; confidence?: number }> {
try {
const signals = await collectPageSignals(page);
const result = analyzeCategorySignals(category, { currentPage: signals });
if (result.provider !== expectedProvider && result.confidence > 50) {
logger.warn(
'provider-detection',
`Provider change detected for ${category}: ${expectedProvider} -> ${result.provider}`
);
return {
changed: true,
newProvider: result.provider,
confidence: result.confidence,
};
}
return { changed: false };
} catch (error: any) {
logger.error('provider-detection', `Change detection failed: ${error.message}`);
return { changed: false };
}
}
// ========================================
// Database Operations
// ========================================
export async function updateDispensaryCategoryProvider(
dispensaryId: number,
category: IntelligenceCategory,
result: CategoryDetectionResult
): Promise<void> {
const columnPrefix = category === 'product' ? 'product' :
category === 'specials' ? 'specials' :
category === 'brand' ? 'brand' : 'metadata';
await pool.query(
`UPDATE dispensaries SET
${columnPrefix}_provider = $1,
${columnPrefix}_confidence = $2,
${columnPrefix}_crawler_mode = $3,
${columnPrefix}_detection_data = $4,
updated_at = NOW()
WHERE id = $5`,
[
result.provider,
result.confidence,
result.mode,
JSON.stringify(result.signals),
dispensaryId,
]
);
}
export async function updateAllCategoryProviders(
dispensaryId: number,
result: MultiCategoryDetectionResult
): Promise<void> {
await pool.query(
`UPDATE dispensaries SET
product_provider = $1,
product_confidence = $2,
product_crawler_mode = $3,
product_detection_data = $4,
specials_provider = $5,
specials_confidence = $6,
specials_crawler_mode = $7,
specials_detection_data = $8,
brand_provider = $9,
brand_confidence = $10,
brand_crawler_mode = $11,
brand_detection_data = $12,
metadata_provider = $13,
metadata_confidence = $14,
metadata_crawler_mode = $15,
metadata_detection_data = $16,
updated_at = NOW()
WHERE id = $17`,
[
result.product.provider,
result.product.confidence,
result.product.mode,
JSON.stringify(result.product.signals),
result.specials.provider,
result.specials.confidence,
result.specials.mode,
JSON.stringify(result.specials.signals),
result.brand.provider,
result.brand.confidence,
result.brand.mode,
JSON.stringify(result.brand.signals),
result.metadata.provider,
result.metadata.confidence,
result.metadata.mode,
JSON.stringify(result.metadata.signals),
dispensaryId,
]
);
}
export async function moveCategoryToSandbox(
dispensaryId: number,
category: IntelligenceCategory,
reason: string
): Promise<void> {
const columnPrefix = category === 'product' ? 'product' :
category === 'specials' ? 'specials' :
category === 'brand' ? 'brand' : 'metadata';
await pool.query(
`UPDATE dispensaries SET
${columnPrefix}_crawler_mode = 'sandbox',
${columnPrefix}_detection_data = ${columnPrefix}_detection_data || $1::jsonb,
updated_at = NOW()
WHERE id = $2`,
[
JSON.stringify({ sandbox_reason: reason, sandbox_at: new Date().toISOString() }),
dispensaryId,
]
);
logger.info('provider-detection', `Moved dispensary ${dispensaryId} ${category} to sandbox: ${reason}`);
}

View File

@@ -1,7 +1,7 @@
interface LogEntry {
timestamp: Date;
level: 'info' | 'error' | 'warn' | 'debug';
category: 'scraper' | 'images' | 'categories' | 'system' | 'api' | 'pipeline' | 'age-gate' | 'proxy';
category: 'scraper' | 'images' | 'categories' | 'system' | 'api' | 'pipeline' | 'age-gate' | 'proxy' | 'crawler-jobs' | 'provider-detection' | 'sandbox' | 'intelligence';
message: string;
}

View File

@@ -0,0 +1,726 @@
/**
* Menu Provider Detection Service
*
* Detects which menu platform a dispensary is using by analyzing:
* - HTML content patterns (scripts, iframes, classes)
* - URL patterns (embedded menu paths)
* - API endpoint signatures
* - Meta tags and headers
*/
import puppeteer, { Browser, Page } from 'puppeteer';
import { logger } from './logger';
// Known menu provider signatures
export type MenuProvider =
| 'dutchie'
| 'treez'
| 'jane'
| 'iheartjane'
| 'weedmaps'
| 'leafly'
| 'meadow'
| 'greenlight'
| 'blaze'
| 'flowhub'
| 'dispense'
| 'cova'
| 'other'
| 'unknown';
export interface DetectionSignal {
provider: MenuProvider;
confidence: number; // 0-100
source: string; // What triggered this detection
details?: string; // Additional context
}
export interface DetectionResult {
provider: MenuProvider;
confidence: number;
signals: DetectionSignal[];
urlsTested: string[];
menuEntryPoints: string[];
rawSignals: Record<string, boolean | string | number>;
error?: string;
}
// Provider detection patterns
const PROVIDER_PATTERNS: Record<string, {
scripts: RegExp[];
iframes: RegExp[];
classes: RegExp[];
urls: RegExp[];
meta: RegExp[];
apiEndpoints: RegExp[];
htmlPatterns: RegExp[];
}> = {
dutchie: {
scripts: [
/dutchie/i,
/dutchie-plus/i,
/dutchie\.com/i,
/dutchie-embed/i,
],
iframes: [
/dutchie\.com/i,
/embed\.dutchie/i,
/iframe\.dutchie/i,
],
classes: [
/dutchie-/i,
/DutchieEmbed/i,
],
urls: [
/dutchie\.com/i,
/\.dutchie\./i,
],
meta: [
/dutchie/i,
],
apiEndpoints: [
/graphql.*dutchie/i,
/api\.dutchie/i,
],
htmlPatterns: [
/data-dutchie/i,
/__DUTCHIE__/i,
/dutchie-plus-iframe/i,
],
},
treez: {
scripts: [
/treez/i,
/treez\.io/i,
/treezpay/i,
],
iframes: [
/treez\.io/i,
/menu\.treez/i,
],
classes: [
/treez-/i,
],
urls: [
/treez\.io/i,
/\.treez\./i,
],
meta: [
/treez/i,
],
apiEndpoints: [
/api\.treez/i,
],
htmlPatterns: [
/data-treez/i,
/treez-embed/i,
],
},
jane: {
scripts: [
/jane\.co/i,
/iheartjane/i,
/jane-embed/i,
/janetechnologies/i,
],
iframes: [
/jane\.co/i,
/iheartjane\.com/i,
/menu\.jane/i,
],
classes: [
/jane-/i,
/iheartjane/i,
],
urls: [
/jane\.co/i,
/iheartjane\.com/i,
],
meta: [
/jane/i,
/iheartjane/i,
],
apiEndpoints: [
/api\.iheartjane/i,
/api\.jane\.co/i,
],
htmlPatterns: [
/data-jane/i,
/jane-root/i,
/jane-embed/i,
],
},
weedmaps: {
scripts: [
/weedmaps/i,
/wm\.com/i,
],
iframes: [
/weedmaps\.com/i,
/menu\.weedmaps/i,
],
classes: [
/weedmaps-/i,
/wm-/i,
],
urls: [
/weedmaps\.com/i,
],
meta: [
/weedmaps/i,
],
apiEndpoints: [
/api.*weedmaps/i,
],
htmlPatterns: [
/data-weedmaps/i,
],
},
leafly: {
scripts: [
/leafly/i,
/leafly\.com/i,
],
iframes: [
/leafly\.com/i,
/menu\.leafly/i,
],
classes: [
/leafly-/i,
],
urls: [
/leafly\.com/i,
],
meta: [
/leafly/i,
],
apiEndpoints: [
/api\.leafly/i,
],
htmlPatterns: [
/data-leafly/i,
],
},
meadow: {
scripts: [
/meadow/i,
/getmeadow/i,
],
iframes: [
/getmeadow\.com/i,
],
classes: [
/meadow-/i,
],
urls: [
/getmeadow\.com/i,
],
meta: [],
apiEndpoints: [
/api\.getmeadow/i,
],
htmlPatterns: [],
},
greenlight: {
scripts: [
/greenlight/i,
/greenlightmenu/i,
],
iframes: [
/greenlight/i,
],
classes: [
/greenlight-/i,
],
urls: [
/greenlight/i,
],
meta: [],
apiEndpoints: [],
htmlPatterns: [],
},
blaze: {
scripts: [
/blaze\.me/i,
/blazepos/i,
],
iframes: [
/blaze\.me/i,
],
classes: [
/blaze-/i,
],
urls: [
/blaze\.me/i,
],
meta: [],
apiEndpoints: [
/api\.blaze/i,
],
htmlPatterns: [],
},
flowhub: {
scripts: [
/flowhub/i,
],
iframes: [
/flowhub\.com/i,
],
classes: [
/flowhub-/i,
],
urls: [
/flowhub\.com/i,
],
meta: [],
apiEndpoints: [],
htmlPatterns: [],
},
dispense: {
scripts: [
/dispenseapp/i,
],
iframes: [
/dispenseapp\.com/i,
],
classes: [
/dispense-/i,
],
urls: [
/dispenseapp\.com/i,
],
meta: [],
apiEndpoints: [],
htmlPatterns: [],
},
cova: {
scripts: [
/covasoftware/i,
/cova\.software/i,
],
iframes: [
/cova/i,
],
classes: [
/cova-/i,
],
urls: [
/cova/i,
],
meta: [],
apiEndpoints: [],
htmlPatterns: [],
},
};
// Common menu URL paths to check
const MENU_PATHS = [
'/menu',
'/shop',
'/products',
'/order',
'/store',
'/dispensary-menu',
'/online-menu',
'/shop-all',
'/browse',
'/catalog',
];
/**
* Analyze a single page for provider signals
*/
async function analyzePageForProviders(
page: Page,
url: string
): Promise<DetectionSignal[]> {
const signals: DetectionSignal[] = [];
try {
// Get page HTML
const html = await page.content();
const lowerHtml = html.toLowerCase();
// Check each provider's patterns
for (const [provider, patterns] of Object.entries(PROVIDER_PATTERNS)) {
// Check script sources
const scripts = await page.$$eval('script[src]', els =>
els.map(el => el.getAttribute('src') || '')
);
for (const script of scripts) {
for (const pattern of patterns.scripts) {
if (pattern.test(script)) {
signals.push({
provider: provider as MenuProvider,
confidence: 90,
source: 'script_src',
details: script,
});
}
}
}
// Check inline scripts
const inlineScripts = await page.$$eval('script:not([src])', els =>
els.map(el => el.textContent || '')
);
for (const scriptContent of inlineScripts) {
for (const pattern of patterns.scripts) {
if (pattern.test(scriptContent)) {
signals.push({
provider: provider as MenuProvider,
confidence: 70,
source: 'inline_script',
details: `Pattern: ${pattern}`,
});
}
}
}
// Check iframes
const iframes = await page.$$eval('iframe', els =>
els.map(el => el.getAttribute('src') || '')
);
for (const iframe of iframes) {
for (const pattern of patterns.iframes) {
if (pattern.test(iframe)) {
signals.push({
provider: provider as MenuProvider,
confidence: 95,
source: 'iframe_src',
details: iframe,
});
}
}
}
// Check HTML patterns
for (const pattern of patterns.htmlPatterns) {
if (pattern.test(html)) {
signals.push({
provider: provider as MenuProvider,
confidence: 85,
source: 'html_pattern',
details: `Pattern: ${pattern}`,
});
}
}
// Check CSS classes
for (const pattern of patterns.classes) {
if (pattern.test(html)) {
signals.push({
provider: provider as MenuProvider,
confidence: 60,
source: 'css_class',
details: `Pattern: ${pattern}`,
});
}
}
// Check meta tags
const metaTags = await page.$$eval('meta', els =>
els.map(el => `${el.getAttribute('name')} ${el.getAttribute('content')}`)
);
for (const meta of metaTags) {
for (const pattern of patterns.meta) {
if (pattern.test(meta)) {
signals.push({
provider: provider as MenuProvider,
confidence: 80,
source: 'meta_tag',
details: meta,
});
}
}
}
}
// Check for network requests (if we intercepted them)
// This would be enhanced with request interception
} catch (error) {
logger.error('provider-detection', `Error analyzing page ${url}: ${error}`);
}
return signals;
}
/**
* Aggregate signals into a final detection result
*/
function aggregateSignals(signals: DetectionSignal[]): { provider: MenuProvider; confidence: number } {
if (signals.length === 0) {
return { provider: 'unknown', confidence: 0 };
}
// Group signals by provider
const providerScores: Record<string, number[]> = {};
for (const signal of signals) {
if (!providerScores[signal.provider]) {
providerScores[signal.provider] = [];
}
providerScores[signal.provider].push(signal.confidence);
}
// Calculate weighted score for each provider
const scores: { provider: MenuProvider; score: number }[] = [];
for (const [provider, confidences] of Object.entries(providerScores)) {
// Use max confidence + bonus for multiple signals
const maxConf = Math.max(...confidences);
const multiSignalBonus = Math.min(10, (confidences.length - 1) * 3);
const score = Math.min(100, maxConf + multiSignalBonus);
scores.push({ provider: provider as MenuProvider, score });
}
// Sort by score descending
scores.sort((a, b) => b.score - a.score);
const best = scores[0];
// If there's a clear winner (20+ point lead), use it
if (scores.length === 1 || best.score - scores[1].score >= 20) {
return { provider: best.provider, confidence: best.score };
}
// Multiple contenders - reduce confidence
return { provider: best.provider, confidence: Math.max(50, best.score - 20) };
}
/**
* Detect the menu provider for a dispensary
*/
export async function detectMenuProvider(
websiteUrl: string,
options: {
checkMenuPaths?: boolean;
timeout?: number;
} = {}
): Promise<DetectionResult> {
const { checkMenuPaths = true, timeout = 30000 } = options;
const result: DetectionResult = {
provider: 'unknown',
confidence: 0,
signals: [],
urlsTested: [],
menuEntryPoints: [],
rawSignals: {},
};
let browser: Browser | null = null;
try {
// Normalize URL
let baseUrl = websiteUrl.trim();
if (!baseUrl.startsWith('http')) {
baseUrl = `https://${baseUrl}`;
}
baseUrl = baseUrl.replace(/\/$/, ''); // Remove trailing slash
// Launch browser
browser = await puppeteer.launch({
headless: true,
args: [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
],
});
const page = await browser.newPage();
await page.setUserAgent(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
);
// Track network requests for API detection
const apiRequests: string[] = [];
await page.setRequestInterception(true);
page.on('request', (request) => {
const url = request.url();
if (url.includes('api') || url.includes('graphql')) {
apiRequests.push(url);
}
request.continue();
});
// URLs to check
const urlsToCheck = [baseUrl];
if (checkMenuPaths) {
for (const path of MENU_PATHS) {
urlsToCheck.push(`${baseUrl}${path}`);
}
}
// Check each URL
for (const url of urlsToCheck) {
try {
result.urlsTested.push(url);
await page.goto(url, {
waitUntil: 'networkidle2',
timeout,
});
// Wait a bit for dynamic content
await new Promise(r => setTimeout(r, 2000));
// Analyze page
const pageSignals = await analyzePageForProviders(page, url);
result.signals.push(...pageSignals);
// Track if this URL has menu content
const hasMenuContent = await page.evaluate(() => {
const text = document.body.innerText.toLowerCase();
return (
text.includes('add to cart') ||
text.includes('add to bag') ||
text.includes('product') ||
text.includes('indica') ||
text.includes('sativa') ||
text.includes('hybrid') ||
text.includes('thc') ||
text.includes('cbd')
);
});
if (hasMenuContent && url !== baseUrl) {
result.menuEntryPoints.push(url);
}
} catch (pageError: any) {
// 404s are fine, just skip
if (!pageError.message?.includes('404')) {
logger.warn('provider-detection', `Could not load ${url}: ${pageError.message}`);
}
}
}
// Check API requests for provider hints
for (const apiUrl of apiRequests) {
for (const [provider, patterns] of Object.entries(PROVIDER_PATTERNS)) {
for (const pattern of patterns.apiEndpoints) {
if (pattern.test(apiUrl)) {
result.signals.push({
provider: provider as MenuProvider,
confidence: 95,
source: 'api_request',
details: apiUrl,
});
}
}
}
}
// Record raw signals
result.rawSignals = {
apiRequestsFound: apiRequests.length,
menuEntryPointsFound: result.menuEntryPoints.length,
totalSignals: result.signals.length,
uniqueProviders: [...new Set(result.signals.map(s => s.provider))].length,
};
// Aggregate signals into final result
const aggregated = aggregateSignals(result.signals);
result.provider = aggregated.provider;
result.confidence = aggregated.confidence;
} catch (error: any) {
result.error = error.message;
logger.error('provider-detection', `Detection failed for ${websiteUrl}: ${error.message}`);
} finally {
if (browser) {
await browser.close();
}
}
return result;
}
/**
* Quick check if a site has Dutchie - used during production crawls
*/
export async function quickDutchieCheck(page: Page): Promise<boolean> {
try {
const html = await page.content();
// Check for Dutchie-specific patterns
const dutchiePatterns = [
/dutchie/i,
/dutchie-plus/i,
/__DUTCHIE__/i,
/data-dutchie/i,
/embed\.dutchie/i,
];
for (const pattern of dutchiePatterns) {
if (pattern.test(html)) {
return true;
}
}
// Check iframes
const iframes = await page.$$eval('iframe', els =>
els.map(el => el.getAttribute('src') || '')
);
for (const iframe of iframes) {
if (/dutchie/i.test(iframe)) {
return true;
}
}
return false;
} catch {
return false;
}
}
/**
* Check if provider has changed from expected
*/
export async function detectProviderChange(
page: Page,
expectedProvider: MenuProvider
): Promise<{ changed: boolean; newProvider?: MenuProvider; confidence?: number }> {
try {
const signals = await analyzePageForProviders(page, page.url());
const aggregated = aggregateSignals(signals);
// If we expected Dutchie but found something else with high confidence
if (expectedProvider === 'dutchie' && aggregated.provider !== 'dutchie' && aggregated.confidence >= 70) {
return {
changed: true,
newProvider: aggregated.provider,
confidence: aggregated.confidence,
};
}
// If we expected Dutchie and found nothing/low confidence, might have switched
if (expectedProvider === 'dutchie' && aggregated.confidence < 30) {
// Check if Dutchie is definitely NOT present
const hasDutchie = await quickDutchieCheck(page);
if (!hasDutchie) {
return {
changed: true,
newProvider: aggregated.provider !== 'unknown' ? aggregated.provider : 'other',
confidence: Math.max(30, aggregated.confidence),
};
}
}
return { changed: false };
} catch {
return { changed: false };
}
}

View File

@@ -0,0 +1,441 @@
/**
* Store Crawl Orchestrator
*
* Orchestrates the complete crawl workflow for a store:
* 1. Load store and its linked dispensary
* 2. Check if provider detection is needed
* 3. Run provider detection if needed
* 4. Queue appropriate crawl jobs based on provider/mode
* 5. Update store_crawl_schedule with meaningful status
*
* This replaces the simple "triggerManualCrawl" with intelligent orchestration.
*/
import { v4 as uuidv4 } from 'uuid';
import { pool } from '../db/migrate';
import { crawlerLogger } from './crawler-logger';
import {
detectMultiCategoryProviders,
updateAllCategoryProviders,
MultiCategoryDetectionResult,
} from './intelligence-detector';
import { runCrawlProductsJob, runSandboxProductsJob } from './category-crawler-jobs';
import { scrapeStore } from '../scraper-v2';
// ========================================
// Types
// ========================================
export type OrchestratorStatus = 'success' | 'error' | 'sandbox_only' | 'detection_only' | 'pending' | 'running';
export interface OrchestratorResult {
status: OrchestratorStatus;
summary: string;
runId: string;
storeId: number;
dispensaryId: number | null;
detectionRan: boolean;
detectionResult?: MultiCategoryDetectionResult;
crawlRan: boolean;
crawlType?: 'production' | 'sandbox' | 'none';
productsFound?: number;
productsNew?: number;
productsUpdated?: number;
error?: string;
durationMs: number;
}
interface StoreWithDispensary {
id: number;
name: string;
slug: string;
timezone: string;
dispensary_id: number | null;
dispensary_name: string | null;
dispensary_menu_url: string | null;
dispensary_website: string | null;
product_provider: string | null;
product_confidence: number | null;
product_crawler_mode: string | null;
last_product_scan_at: Date | null;
}
// ========================================
// Main Orchestrator Function
// ========================================
/**
* Run the complete crawl orchestration for a store
*
* Behavior:
* 1. Load the store and its linked dispensary
* 2. If no dispensary is linked, report error
* 3. If product_provider is missing or stale (>7 days), run detection
* 4. After detection:
* - If product_provider = 'dutchie' and product_crawler_mode = 'production': Run production crawl
* - Otherwise: Run sandbox crawl
* 5. Update store_crawl_schedule with status/summary
*/
export async function runStoreCrawlOrchestrator(storeId: number): Promise<OrchestratorResult> {
const startTime = Date.now();
const runId = uuidv4();
let result: OrchestratorResult = {
status: 'pending',
summary: '',
runId,
storeId,
dispensaryId: null,
detectionRan: false,
crawlRan: false,
durationMs: 0,
};
try {
// Mark schedule as running
await updateScheduleStatus(storeId, 'running', 'Starting orchestrator...', runId);
// 1. Load store with dispensary info
const store = await getStoreWithDispensary(storeId);
if (!store) {
throw new Error(`Store ${storeId} not found`);
}
result.dispensaryId = store.dispensary_id;
// 2. Check if dispensary is linked
if (!store.dispensary_id) {
result.status = 'error';
result.summary = 'No dispensary linked - cannot determine provider';
result.error = 'Store is not linked to a dispensary. Link it in the Dispensaries page.';
await updateScheduleStatus(storeId, 'error', result.summary, runId, result.error);
result.durationMs = Date.now() - startTime;
return result;
}
// 3. Check if provider detection is needed
const needsDetection = await checkNeedsDetection(store);
if (needsDetection) {
// Run provider detection
const websiteUrl = store.dispensary_menu_url || store.dispensary_website;
if (!websiteUrl) {
result.status = 'error';
result.summary = 'No website URL available for detection';
result.error = 'Dispensary has no menu_url or website configured';
await updateScheduleStatus(storeId, 'error', result.summary, runId, result.error);
result.durationMs = Date.now() - startTime;
return result;
}
await updateScheduleStatus(storeId, 'running', 'Running provider detection...', runId);
const detectionResult = await detectMultiCategoryProviders(websiteUrl);
result.detectionRan = true;
result.detectionResult = detectionResult;
// Save detection results to dispensary
await updateAllCategoryProviders(store.dispensary_id, detectionResult);
crawlerLogger.providerDetected({
dispensary_id: store.dispensary_id,
dispensary_name: store.dispensary_name || store.name,
detected_provider: detectionResult.product.provider,
confidence: detectionResult.product.confidence,
detection_method: 'orchestrator_run',
menu_url: websiteUrl,
category: 'product',
});
// Refresh store info after detection
const updatedStore = await getStoreWithDispensary(storeId);
if (updatedStore) {
Object.assign(store, updatedStore);
}
}
// 4. Determine crawl type and run
const provider = store.product_provider;
const mode = store.product_crawler_mode;
if (provider === 'dutchie' && mode === 'production') {
// Production Dutchie crawl
await updateScheduleStatus(storeId, 'running', 'Running Dutchie production crawl...', runId);
try {
// Run the actual scraper
await scrapeStore(storeId);
// Get crawl stats from the latest job
const stats = await getLatestCrawlStats(storeId);
result.crawlRan = true;
result.crawlType = 'production';
result.productsFound = stats.products_found ?? undefined;
result.productsNew = stats.products_new ?? undefined;
result.productsUpdated = stats.products_updated ?? undefined;
const detectionPart = result.detectionRan ? 'Detection + ' : '';
result.summary = `${detectionPart}Dutchie products crawl (${stats.products_found || 0} items, ${stats.products_new || 0} new, ${stats.products_updated || 0} updated)`;
result.status = 'success';
// Update store's last_scraped_at
await pool.query('UPDATE stores SET last_scraped_at = NOW() WHERE id = $1', [storeId]);
crawlerLogger.jobCompleted({
job_id: 0, // Orchestrator doesn't create traditional jobs
store_id: storeId,
store_name: store.name,
duration_ms: Date.now() - startTime,
products_found: stats.products_found || 0,
products_new: stats.products_new || 0,
products_updated: stats.products_updated || 0,
provider: 'dutchie',
});
} catch (crawlError: any) {
result.status = 'error';
result.error = crawlError.message;
result.summary = `Dutchie crawl failed: ${crawlError.message.slice(0, 100)}`;
result.crawlRan = true;
result.crawlType = 'production';
crawlerLogger.jobFailed({
job_id: 0,
store_id: storeId,
store_name: store.name,
duration_ms: Date.now() - startTime,
error_message: crawlError.message,
provider: 'dutchie',
});
}
} else if (provider && provider !== 'unknown') {
// Sandbox crawl for non-Dutchie or sandbox mode
await updateScheduleStatus(storeId, 'running', `Running ${provider} sandbox crawl...`, runId);
try {
const sandboxResult = await runSandboxProductsJob(store.dispensary_id);
result.crawlRan = true;
result.crawlType = 'sandbox';
result.productsFound = sandboxResult.data?.productsExtracted || 0;
const detectionPart = result.detectionRan ? 'Detection + ' : '';
if (sandboxResult.success) {
result.summary = `${detectionPart}${provider} sandbox crawl (${result.productsFound} items, quality ${sandboxResult.data?.qualityScore || 0}%)`;
result.status = 'sandbox_only';
} else {
result.summary = `${detectionPart}${provider} sandbox failed: ${sandboxResult.message}`;
result.status = 'error';
result.error = sandboxResult.message;
}
} catch (sandboxError: any) {
result.status = 'error';
result.error = sandboxError.message;
result.summary = `Sandbox crawl failed: ${sandboxError.message.slice(0, 100)}`;
result.crawlRan = true;
result.crawlType = 'sandbox';
}
} else {
// No provider detected - detection only
if (result.detectionRan) {
result.summary = `Detection complete: provider=${store.product_provider || 'unknown'}, confidence=${store.product_confidence || 0}%`;
result.status = 'detection_only';
} else {
result.summary = 'No provider detected and no crawl possible';
result.status = 'error';
result.error = 'Could not determine menu provider';
}
}
} catch (error: any) {
result.status = 'error';
result.error = error.message;
result.summary = `Orchestrator error: ${error.message.slice(0, 100)}`;
crawlerLogger.queueFailure({
queue_type: 'orchestrator',
error_message: error.message,
});
}
result.durationMs = Date.now() - startTime;
// Update final schedule status
await updateScheduleStatus(storeId, result.status, result.summary, runId, result.error);
// Create a crawl_job record for tracking
await createOrchestratorJobRecord(storeId, result);
return result;
}
// ========================================
// Helper Functions
// ========================================
async function getStoreWithDispensary(storeId: number): Promise<StoreWithDispensary | null> {
const result = await pool.query(
`SELECT
s.id, s.name, s.slug, s.timezone, s.dispensary_id,
d.name as dispensary_name,
d.menu_url as dispensary_menu_url,
d.website as dispensary_website,
d.product_provider,
d.product_confidence,
d.product_crawler_mode,
d.last_product_scan_at
FROM stores s
LEFT JOIN dispensaries d ON d.id = s.dispensary_id
WHERE s.id = $1`,
[storeId]
);
return result.rows[0] || null;
}
async function checkNeedsDetection(store: StoreWithDispensary): Promise<boolean> {
// No dispensary = can't detect
if (!store.dispensary_id) return false;
// No provider = definitely needs detection
if (!store.product_provider) return true;
// Unknown provider = needs detection
if (store.product_provider === 'unknown') return true;
// Low confidence = needs re-detection
if (store.product_confidence !== null && store.product_confidence < 50) return true;
// Stale detection (> 7 days) = needs refresh
if (store.last_product_scan_at) {
const daysSince = (Date.now() - new Date(store.last_product_scan_at).getTime()) / (1000 * 60 * 60 * 24);
if (daysSince > 7) return true;
}
return false;
}
async function updateScheduleStatus(
storeId: number,
status: OrchestratorStatus,
summary: string,
runId: string,
error?: string
): Promise<void> {
await pool.query(
`INSERT INTO store_crawl_schedule (store_id, last_status, last_summary, last_run_at, last_error)
VALUES ($1, $2, $3, NOW(), $4)
ON CONFLICT (store_id) DO UPDATE SET
last_status = $2,
last_summary = $3,
last_run_at = NOW(),
last_error = $4,
updated_at = NOW()`,
[storeId, status, summary, error || null]
);
}
async function getLatestCrawlStats(storeId: number): Promise<{
products_found: number | null;
products_new: number | null;
products_updated: number | null;
}> {
// Get count of products for this store
const result = await pool.query(
`SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE created_at > NOW() - INTERVAL '1 hour') as recent_new,
COUNT(*) FILTER (WHERE updated_at > NOW() - INTERVAL '1 hour' AND created_at < NOW() - INTERVAL '1 hour') as recent_updated
FROM products
WHERE store_id = $1`,
[storeId]
);
return {
products_found: parseInt(result.rows[0]?.total || '0'),
products_new: parseInt(result.rows[0]?.recent_new || '0'),
products_updated: parseInt(result.rows[0]?.recent_updated || '0'),
};
}
async function createOrchestratorJobRecord(storeId: number, result: OrchestratorResult): Promise<void> {
await pool.query(
`INSERT INTO crawl_jobs (
store_id, job_type, trigger_type, status, priority,
scheduled_at, started_at, completed_at,
products_found, products_new, products_updated,
error_message, orchestrator_run_id, detection_result
) VALUES (
$1, 'orchestrator', 'manual', $2, 100,
NOW(), NOW(), NOW(),
$3, $4, $5,
$6, $7, $8
)`,
[
storeId,
result.status === 'success' ? 'completed' : result.status === 'error' ? 'failed' : 'completed',
result.productsFound || null,
result.productsNew || null,
result.productsUpdated || null,
result.error || null,
result.runId,
result.detectionResult ? JSON.stringify({
product_provider: result.detectionResult.product.provider,
product_confidence: result.detectionResult.product.confidence,
product_mode: result.detectionResult.product.mode,
}) : null,
]
);
}
// ========================================
// Batch Orchestration
// ========================================
/**
* Run orchestrator for multiple stores
*/
export async function runBatchOrchestrator(
storeIds: number[],
concurrency: number = 3
): Promise<OrchestratorResult[]> {
const results: OrchestratorResult[] = [];
// Process in batches
for (let i = 0; i < storeIds.length; i += concurrency) {
const batch = storeIds.slice(i, i + concurrency);
const batchResults = await Promise.all(
batch.map(storeId => runStoreCrawlOrchestrator(storeId))
);
results.push(...batchResults);
}
return results;
}
/**
* Get stores that are due for orchestration
*/
export async function getStoresDueForOrchestration(limit: number = 10): Promise<number[]> {
const result = await pool.query(
`SELECT s.id
FROM stores s
LEFT JOIN store_crawl_schedule scs ON scs.store_id = s.id
WHERE s.active = TRUE
AND s.scrape_enabled = TRUE
AND COALESCE(scs.enabled, TRUE) = TRUE
AND (
scs.last_run_at IS NULL
OR scs.last_run_at < NOW() - (COALESCE(scs.interval_hours, 4) || ' hours')::INTERVAL
)
AND (scs.last_status IS NULL OR scs.last_status NOT IN ('running', 'pending'))
ORDER BY COALESCE(scs.priority, 0) DESC, scs.last_run_at ASC NULLS FIRST
LIMIT $1`,
[limit]
);
return result.rows.map(row => row.id);
}