Platform isolation:
- Rename handlers to {task}-{platform}.ts convention
- Deprecate -curl variants (now _deprecated-*)
- Platform-based routing in task-worker.ts
- Add Jane platform handlers and client
Evomi geo-targeting:
- Add dynamic proxy URL builder with state/city targeting
- Session stickiness per worker per state (30 min)
- Fallback to static proxy table when API unavailable
- Add proxy tracking columns to worker_tasks
Proxy management:
- New /proxies admin page for visibility
- Track proxy_ip, proxy_geo, proxy_source per task
- Show active sessions and task history
Validation filtering:
- Filter by validated stores (platform_dispensary_id + menu_url)
- Mark incomplete stores as deprecated
- Update all dashboard/stats queries
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
626 lines
18 KiB
TypeScript
626 lines
18 KiB
TypeScript
/**
|
|
* Discovery Promotion Service
|
|
*
|
|
* Handles the promotion of discovery locations to dispensaries:
|
|
* 1. Discovery → Raw data in dutchie_discovery_locations (status='discovered')
|
|
* 2. Validation → Check required fields, reject incomplete records
|
|
* 3. Promotion → Idempotent upsert to dispensaries, link back via dispensary_id
|
|
*/
|
|
|
|
import { pool } from '../db/pool';
|
|
import { DiscoveryLocationRow, DiscoveryStatus } from './types';
|
|
|
|
// ============================================================
|
|
// VALIDATION
|
|
// ============================================================
|
|
|
|
export interface ValidationResult {
|
|
valid: boolean;
|
|
errors: string[];
|
|
}
|
|
|
|
export interface ValidationSummary {
|
|
totalChecked: number;
|
|
validCount: number;
|
|
invalidCount: number;
|
|
invalidRecords: Array<{
|
|
id: number;
|
|
name: string;
|
|
errors: string[];
|
|
}>;
|
|
}
|
|
|
|
/**
|
|
* Validate a single discovery location has all required fields for promotion
|
|
*/
|
|
export function validateForPromotion(loc: DiscoveryLocationRow): ValidationResult {
|
|
const errors: string[] = [];
|
|
|
|
// Required fields
|
|
if (!loc.platform_location_id) {
|
|
errors.push('Missing platform_location_id');
|
|
}
|
|
if (!loc.name || loc.name.trim() === '') {
|
|
errors.push('Missing name');
|
|
}
|
|
if (!loc.city || loc.city.trim() === '') {
|
|
errors.push('Missing city');
|
|
}
|
|
if (!loc.state_code || loc.state_code.trim() === '') {
|
|
errors.push('Missing state_code');
|
|
}
|
|
if (!loc.platform_menu_url) {
|
|
errors.push('Missing platform_menu_url');
|
|
}
|
|
|
|
return {
|
|
valid: errors.length === 0,
|
|
errors,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Validate all discovered locations and return summary
|
|
*/
|
|
export async function validateDiscoveredLocations(
|
|
stateCode?: string
|
|
): Promise<ValidationSummary> {
|
|
let query = `
|
|
SELECT * FROM dutchie_discovery_locations
|
|
WHERE status = 'discovered'
|
|
`;
|
|
const params: string[] = [];
|
|
|
|
if (stateCode) {
|
|
query += ` AND state_code = $1`;
|
|
params.push(stateCode);
|
|
}
|
|
|
|
const result = await pool.query(query, params);
|
|
const locations = result.rows as DiscoveryLocationRow[];
|
|
|
|
const invalidRecords: ValidationSummary['invalidRecords'] = [];
|
|
let validCount = 0;
|
|
|
|
for (const loc of locations) {
|
|
const validation = validateForPromotion(loc);
|
|
if (validation.valid) {
|
|
validCount++;
|
|
} else {
|
|
invalidRecords.push({
|
|
id: loc.id,
|
|
name: loc.name,
|
|
errors: validation.errors,
|
|
});
|
|
}
|
|
}
|
|
|
|
return {
|
|
totalChecked: locations.length,
|
|
validCount,
|
|
invalidCount: invalidRecords.length,
|
|
invalidRecords,
|
|
};
|
|
}
|
|
|
|
// ============================================================
|
|
// PROMOTION
|
|
// ============================================================
|
|
|
|
export interface PromotionResult {
|
|
discoveryId: number;
|
|
dispensaryId: number;
|
|
action: 'created' | 'updated' | 'skipped';
|
|
name: string;
|
|
}
|
|
|
|
export interface PromotionSummary {
|
|
totalProcessed: number;
|
|
created: number;
|
|
updated: number;
|
|
skipped: number;
|
|
rejected: number;
|
|
results: PromotionResult[];
|
|
rejectedRecords: Array<{
|
|
id: number;
|
|
name: string;
|
|
errors: string[];
|
|
}>;
|
|
durationMs: number;
|
|
// Per TASK_WORKFLOW_2024-12-10.md: Track new dispensary IDs for task chaining
|
|
newDispensaryIds: number[];
|
|
}
|
|
|
|
/**
|
|
* Task tracking info for modification audit trail
|
|
*/
|
|
export interface TaskTrackingInfo {
|
|
taskId: number;
|
|
taskRole: string;
|
|
}
|
|
|
|
/**
|
|
* Generate a URL-safe slug from name and city
|
|
*/
|
|
function generateSlug(name: string, city: string, state: string): string {
|
|
const base = `${name}-${city}-${state}`
|
|
.toLowerCase()
|
|
.replace(/[^a-z0-9]+/g, '-')
|
|
.replace(/^-|-$/g, '')
|
|
.substring(0, 100);
|
|
return base;
|
|
}
|
|
|
|
/**
|
|
* Derive menu_type from platform_menu_url pattern
|
|
*/
|
|
function deriveMenuType(url: string | null): string {
|
|
if (!url) return 'unknown';
|
|
if (url.includes('/dispensary/')) return 'standalone';
|
|
if (url.includes('/embedded-menu/')) return 'embedded';
|
|
if (url.includes('/stores/')) return 'standalone';
|
|
// Custom domain = embedded widget on store's site
|
|
if (!url.includes('dutchie.com')) return 'embedded';
|
|
return 'unknown';
|
|
}
|
|
|
|
/**
|
|
* Log a promotion action to dutchie_promotion_log
|
|
*/
|
|
async function logPromotionAction(
|
|
action: string,
|
|
discoveryId: number | null,
|
|
dispensaryId: number | null,
|
|
stateCode: string | null,
|
|
storeName: string | null,
|
|
validationErrors: string[] | null = null,
|
|
fieldChanges: Record<string, any> | null = null,
|
|
triggeredBy: string = 'auto'
|
|
): Promise<void> {
|
|
await pool.query(`
|
|
INSERT INTO dutchie_promotion_log
|
|
(discovery_id, dispensary_id, action, state_code, store_name, validation_errors, field_changes, triggered_by)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
`, [
|
|
discoveryId,
|
|
dispensaryId,
|
|
action,
|
|
stateCode,
|
|
storeName,
|
|
validationErrors,
|
|
fieldChanges ? JSON.stringify(fieldChanges) : null,
|
|
triggeredBy,
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Create a status alert for the dashboard
|
|
*/
|
|
export async function createStatusAlert(
|
|
dispensaryId: number,
|
|
profileId: number | null,
|
|
alertType: string,
|
|
severity: 'info' | 'warning' | 'error' | 'critical',
|
|
message: string,
|
|
previousStatus?: string | null,
|
|
newStatus?: string | null,
|
|
metadata?: Record<string, any>
|
|
): Promise<number> {
|
|
const result = await pool.query(`
|
|
INSERT INTO crawler_status_alerts
|
|
(dispensary_id, profile_id, alert_type, severity, message, previous_status, new_status, metadata)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
RETURNING id
|
|
`, [
|
|
dispensaryId,
|
|
profileId,
|
|
alertType,
|
|
severity,
|
|
message,
|
|
previousStatus || null,
|
|
newStatus || null,
|
|
metadata ? JSON.stringify(metadata) : null,
|
|
]);
|
|
return result.rows[0].id;
|
|
}
|
|
|
|
/**
|
|
* Create or update crawler profile for a dispensary with initial sandbox status
|
|
*/
|
|
async function ensureCrawlerProfile(
|
|
dispensaryId: number,
|
|
dispensaryName: string,
|
|
platformDispensaryId: string
|
|
): Promise<{ profileId: number; created: boolean }> {
|
|
// Check if profile already exists
|
|
const existingResult = await pool.query(`
|
|
SELECT id FROM dispensary_crawler_profiles
|
|
WHERE dispensary_id = $1 AND enabled = true
|
|
LIMIT 1
|
|
`, [dispensaryId]);
|
|
|
|
if (existingResult.rows.length > 0) {
|
|
return { profileId: existingResult.rows[0].id, created: false };
|
|
}
|
|
|
|
// Create new profile with sandbox status
|
|
const profileKey = dispensaryName
|
|
.toLowerCase()
|
|
.replace(/[^a-z0-9]+/g, '-')
|
|
.replace(/^-|-$/g, '')
|
|
.substring(0, 50);
|
|
|
|
const insertResult = await pool.query(`
|
|
INSERT INTO dispensary_crawler_profiles (
|
|
dispensary_id,
|
|
profile_name,
|
|
profile_key,
|
|
crawler_type,
|
|
status,
|
|
status_reason,
|
|
status_changed_at,
|
|
config,
|
|
enabled,
|
|
consecutive_successes,
|
|
consecutive_failures,
|
|
created_at,
|
|
updated_at
|
|
) VALUES (
|
|
$1, $2, $3, 'dutchie', 'sandbox', 'Newly promoted from discovery', CURRENT_TIMESTAMP,
|
|
$4::jsonb, true, 0, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
|
|
)
|
|
RETURNING id
|
|
`, [
|
|
dispensaryId,
|
|
dispensaryName,
|
|
profileKey,
|
|
JSON.stringify({
|
|
platformDispensaryId,
|
|
useBothModes: true,
|
|
downloadImages: true,
|
|
trackStock: true,
|
|
}),
|
|
]);
|
|
|
|
const profileId = insertResult.rows[0].id;
|
|
|
|
// Create status alert for new sandbox store
|
|
await createStatusAlert(
|
|
dispensaryId,
|
|
profileId,
|
|
'promoted',
|
|
'info',
|
|
`${dispensaryName} promoted to sandbox - awaiting first successful crawl`,
|
|
null,
|
|
'sandbox',
|
|
{ source: 'discovery_promotion', platformDispensaryId }
|
|
);
|
|
|
|
return { profileId, created: true };
|
|
}
|
|
|
|
/**
|
|
* Promote a single discovery location to dispensaries table
|
|
* Idempotent: uses ON CONFLICT on platform_dispensary_id
|
|
*/
|
|
async function promoteLocation(
|
|
loc: DiscoveryLocationRow,
|
|
taskTracking?: TaskTrackingInfo
|
|
): Promise<PromotionResult> {
|
|
const slug = loc.platform_slug || generateSlug(loc.name, loc.city || '', loc.state_code || '');
|
|
|
|
// Upsert into dispensaries
|
|
// ON CONFLICT by platform_dispensary_id ensures idempotency
|
|
const upsertResult = await pool.query(`
|
|
INSERT INTO dispensaries (
|
|
platform,
|
|
name,
|
|
slug,
|
|
city,
|
|
state,
|
|
address1,
|
|
address2,
|
|
zipcode,
|
|
postal_code,
|
|
phone,
|
|
website,
|
|
email,
|
|
latitude,
|
|
longitude,
|
|
timezone,
|
|
platform_dispensary_id,
|
|
menu_url,
|
|
menu_type,
|
|
description,
|
|
logo_image,
|
|
banner_image,
|
|
offer_pickup,
|
|
offer_delivery,
|
|
is_medical,
|
|
is_recreational,
|
|
chain_slug,
|
|
enterprise_id,
|
|
c_name,
|
|
country,
|
|
status,
|
|
crawl_enabled,
|
|
dutchie_verified,
|
|
dutchie_verified_at,
|
|
dutchie_discovery_id,
|
|
last_modified_at,
|
|
last_modified_by_task,
|
|
last_modified_task_id,
|
|
last_store_discovery_at,
|
|
created_at,
|
|
updated_at
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
|
|
$11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
|
|
$21, $22, $23, $24, $25, $26, $27, $28, $29, $30,
|
|
$31, $32, $33, $34, $35, $36, $37, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
|
|
)
|
|
ON CONFLICT (platform_dispensary_id) WHERE platform_dispensary_id IS NOT NULL
|
|
DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
city = EXCLUDED.city,
|
|
state = EXCLUDED.state,
|
|
address1 = EXCLUDED.address1,
|
|
address2 = EXCLUDED.address2,
|
|
zipcode = EXCLUDED.zipcode,
|
|
postal_code = EXCLUDED.postal_code,
|
|
phone = EXCLUDED.phone,
|
|
website = EXCLUDED.website,
|
|
email = EXCLUDED.email,
|
|
latitude = EXCLUDED.latitude,
|
|
longitude = EXCLUDED.longitude,
|
|
timezone = EXCLUDED.timezone,
|
|
menu_url = EXCLUDED.menu_url,
|
|
description = EXCLUDED.description,
|
|
logo_image = EXCLUDED.logo_image,
|
|
banner_image = EXCLUDED.banner_image,
|
|
offer_pickup = EXCLUDED.offer_pickup,
|
|
offer_delivery = EXCLUDED.offer_delivery,
|
|
is_medical = EXCLUDED.is_medical,
|
|
is_recreational = EXCLUDED.is_recreational,
|
|
chain_slug = EXCLUDED.chain_slug,
|
|
enterprise_id = EXCLUDED.enterprise_id,
|
|
c_name = EXCLUDED.c_name,
|
|
country = EXCLUDED.country,
|
|
status = EXCLUDED.status,
|
|
dutchie_discovery_id = EXCLUDED.dutchie_discovery_id,
|
|
dutchie_verified = TRUE,
|
|
dutchie_verified_at = COALESCE(dispensaries.dutchie_verified_at, CURRENT_TIMESTAMP),
|
|
crawl_enabled = COALESCE(dispensaries.crawl_enabled, TRUE),
|
|
last_modified_at = EXCLUDED.last_modified_at,
|
|
last_modified_by_task = EXCLUDED.last_modified_by_task,
|
|
last_modified_task_id = EXCLUDED.last_modified_task_id,
|
|
last_store_discovery_at = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
RETURNING id, (xmax = 0) AS inserted
|
|
`, [
|
|
loc.platform || 'dutchie', // $1 platform
|
|
loc.name, // $2 name
|
|
slug, // $3 slug
|
|
loc.city, // $4 city
|
|
loc.state_code, // $5 state
|
|
loc.address_line1, // $6 address1
|
|
loc.address_line2, // $7 address2
|
|
loc.postal_code, // $8 zipcode
|
|
loc.postal_code, // $9 postal_code
|
|
loc.phone, // $10 phone
|
|
loc.website, // $11 website
|
|
loc.email, // $12 email
|
|
loc.latitude, // $13 latitude
|
|
loc.longitude, // $14 longitude
|
|
loc.timezone, // $15 timezone
|
|
loc.platform_location_id, // $16 platform_dispensary_id
|
|
loc.platform_menu_url, // $17 menu_url
|
|
deriveMenuType(loc.platform_menu_url), // $18 menu_type
|
|
loc.description, // $19 description
|
|
loc.logo_image, // $20 logo_image
|
|
loc.banner_image, // $21 banner_image
|
|
loc.offers_pickup ?? true, // $22 offer_pickup
|
|
loc.offers_delivery ?? false, // $23 offer_delivery
|
|
loc.is_medical ?? false, // $24 is_medical
|
|
loc.is_recreational ?? true, // $25 is_recreational
|
|
loc.chain_slug, // $26 chain_slug
|
|
loc.enterprise_id, // $27 enterprise_id
|
|
loc.c_name, // $28 c_name
|
|
loc.country || 'United States', // $29 country
|
|
loc.store_status || 'open', // $30 status
|
|
true, // $31 crawl_enabled
|
|
true, // $32 dutchie_verified
|
|
new Date(), // $33 dutchie_verified_at
|
|
loc.id, // $34 dutchie_discovery_id
|
|
taskTracking ? new Date() : null, // $35 last_modified_at
|
|
taskTracking?.taskRole || null, // $36 last_modified_by_task
|
|
taskTracking?.taskId || null, // $37 last_modified_task_id
|
|
]);
|
|
|
|
const dispensaryId = upsertResult.rows[0].id;
|
|
const wasInserted = upsertResult.rows[0].inserted;
|
|
|
|
// Link discovery location back to dispensary and update status
|
|
await pool.query(`
|
|
UPDATE dutchie_discovery_locations
|
|
SET
|
|
dispensary_id = $1,
|
|
status = 'verified',
|
|
verified_at = CURRENT_TIMESTAMP,
|
|
verified_by = 'auto-promotion'
|
|
WHERE id = $2
|
|
`, [dispensaryId, loc.id]);
|
|
|
|
// Create crawler profile with sandbox status for new dispensaries
|
|
if (wasInserted && loc.platform_location_id) {
|
|
await ensureCrawlerProfile(dispensaryId, loc.name, loc.platform_location_id);
|
|
}
|
|
|
|
const action = wasInserted ? 'promoted_create' : 'promoted_update';
|
|
|
|
// Log the promotion
|
|
await logPromotionAction(
|
|
action,
|
|
loc.id,
|
|
dispensaryId,
|
|
loc.state_code,
|
|
loc.name,
|
|
null,
|
|
{ slug, city: loc.city, platform_location_id: loc.platform_location_id }
|
|
);
|
|
|
|
return {
|
|
discoveryId: loc.id,
|
|
dispensaryId,
|
|
action: wasInserted ? 'created' : 'updated',
|
|
name: loc.name,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Promote all valid discovered locations to dispensaries
|
|
*
|
|
* @param stateCode Optional filter by state (e.g., 'CA', 'AZ')
|
|
* @param dryRun If true, only validate without making changes
|
|
* @param taskTracking Optional task info for modification audit trail
|
|
*/
|
|
export async function promoteDiscoveredLocations(
|
|
stateCode?: string,
|
|
dryRun = false,
|
|
taskTracking?: TaskTrackingInfo
|
|
): Promise<PromotionSummary> {
|
|
const startTime = Date.now();
|
|
|
|
let query = `
|
|
SELECT * FROM dutchie_discovery_locations
|
|
WHERE status = 'discovered'
|
|
`;
|
|
const params: string[] = [];
|
|
|
|
if (stateCode) {
|
|
query += ` AND state_code = $1`;
|
|
params.push(stateCode);
|
|
}
|
|
|
|
query += ` ORDER BY id`;
|
|
|
|
const result = await pool.query(query, params);
|
|
const locations = result.rows as DiscoveryLocationRow[];
|
|
|
|
const results: PromotionResult[] = [];
|
|
const rejectedRecords: PromotionSummary['rejectedRecords'] = [];
|
|
// Per TASK_WORKFLOW_2024-12-10.md: Track new dispensary IDs for task chaining
|
|
const newDispensaryIds: number[] = [];
|
|
let created = 0;
|
|
let updated = 0;
|
|
let skipped = 0;
|
|
let rejected = 0;
|
|
|
|
for (const loc of locations) {
|
|
// Step 2: Validation
|
|
const validation = validateForPromotion(loc);
|
|
|
|
if (!validation.valid) {
|
|
rejected++;
|
|
rejectedRecords.push({
|
|
id: loc.id,
|
|
name: loc.name,
|
|
errors: validation.errors,
|
|
});
|
|
|
|
// Mark as rejected if not dry run
|
|
if (!dryRun) {
|
|
await pool.query(`
|
|
UPDATE dutchie_discovery_locations
|
|
SET status = 'rejected', notes = $1
|
|
WHERE id = $2
|
|
`, [validation.errors.join('; '), loc.id]);
|
|
|
|
// Log the rejection
|
|
await logPromotionAction(
|
|
'rejected',
|
|
loc.id,
|
|
null,
|
|
loc.state_code,
|
|
loc.name,
|
|
validation.errors
|
|
);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
// Step 3: Promotion (skip if dry run)
|
|
if (dryRun) {
|
|
skipped++;
|
|
results.push({
|
|
discoveryId: loc.id,
|
|
dispensaryId: 0,
|
|
action: 'skipped',
|
|
name: loc.name,
|
|
});
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
const promotionResult = await promoteLocation(loc, taskTracking);
|
|
results.push(promotionResult);
|
|
|
|
if (promotionResult.action === 'created') {
|
|
created++;
|
|
// Per TASK_WORKFLOW_2024-12-10.md: Track new IDs for task chaining
|
|
newDispensaryIds.push(promotionResult.dispensaryId);
|
|
} else {
|
|
updated++;
|
|
}
|
|
} catch (error: any) {
|
|
console.error(`Failed to promote location ${loc.id} (${loc.name}):`, error.message);
|
|
rejected++;
|
|
rejectedRecords.push({
|
|
id: loc.id,
|
|
name: loc.name,
|
|
errors: [`Promotion error: ${error.message}`],
|
|
});
|
|
}
|
|
}
|
|
|
|
return {
|
|
totalProcessed: locations.length,
|
|
created,
|
|
updated,
|
|
skipped,
|
|
rejected,
|
|
results,
|
|
rejectedRecords,
|
|
durationMs: Date.now() - startTime,
|
|
// Per TASK_WORKFLOW_2024-12-10.md: Return new IDs for task chaining
|
|
newDispensaryIds,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Promote a single discovery location by ID
|
|
*/
|
|
export async function promoteSingleLocation(
|
|
discoveryId: number
|
|
): Promise<PromotionResult> {
|
|
const result = await pool.query(
|
|
`SELECT * FROM dutchie_discovery_locations WHERE id = $1`,
|
|
[discoveryId]
|
|
);
|
|
|
|
if (result.rows.length === 0) {
|
|
throw new Error(`Discovery location ${discoveryId} not found`);
|
|
}
|
|
|
|
const loc = result.rows[0] as DiscoveryLocationRow;
|
|
|
|
// Validate
|
|
const validation = validateForPromotion(loc);
|
|
if (!validation.valid) {
|
|
throw new Error(`Validation failed: ${validation.errors.join(', ')}`);
|
|
}
|
|
|
|
// Promote
|
|
return promoteLocation(loc);
|
|
}
|