Files
cannaiq/backend/src/discovery/promotion.ts
Kelly 4949b22457 feat(tasks): Refactor task workflow with payload/refresh separation
Major changes:
- Split crawl into payload_fetch (API → disk) and product_refresh (disk → DB)
- Add task chaining: store_discovery → product_discovery → payload_fetch → product_refresh
- Add payload storage utilities for gzipped JSON on filesystem
- Add /api/payloads endpoints for payload access and diffing
- Add DB-driven TaskScheduler with schedule persistence
- Track newDispensaryIds through discovery promotion for chaining
- Add stealth improvements: HTTP fingerprinting, proxy rotation enhancements
- Add Workers dashboard K8s scaling controls

New files:
- src/tasks/handlers/payload-fetch.ts - Fetches from API, saves to disk
- src/services/task-scheduler.ts - DB-driven schedule management
- src/utils/payload-storage.ts - Payload save/load utilities
- src/routes/payloads.ts - Payload API endpoints
- src/services/http-fingerprint.ts - Browser fingerprint generation
- docs/TASK_WORKFLOW_2024-12-10.md - Complete workflow documentation

Migrations:
- 078: Proxy consecutive 403 tracking
- 079: task_schedules table
- 080: raw_crawl_payloads table
- 081: payload column and last_fetch_at

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-10 22:15:35 -07:00

588 lines
16 KiB
TypeScript

/**
* Discovery Promotion Service
*
* Handles the promotion of discovery locations to dispensaries:
* 1. Discovery → Raw data in dutchie_discovery_locations (status='discovered')
* 2. Validation → Check required fields, reject incomplete records
* 3. Promotion → Idempotent upsert to dispensaries, link back via dispensary_id
*/
import { pool } from '../db/pool';
import { DiscoveryLocationRow, DiscoveryStatus } from './types';
// ============================================================
// VALIDATION
// ============================================================
export interface ValidationResult {
valid: boolean;
errors: string[];
}
export interface ValidationSummary {
totalChecked: number;
validCount: number;
invalidCount: number;
invalidRecords: Array<{
id: number;
name: string;
errors: string[];
}>;
}
/**
* Validate a single discovery location has all required fields for promotion
*/
export function validateForPromotion(loc: DiscoveryLocationRow): ValidationResult {
const errors: string[] = [];
// Required fields
if (!loc.platform_location_id) {
errors.push('Missing platform_location_id');
}
if (!loc.name || loc.name.trim() === '') {
errors.push('Missing name');
}
if (!loc.city || loc.city.trim() === '') {
errors.push('Missing city');
}
if (!loc.state_code || loc.state_code.trim() === '') {
errors.push('Missing state_code');
}
if (!loc.platform_menu_url) {
errors.push('Missing platform_menu_url');
}
return {
valid: errors.length === 0,
errors,
};
}
/**
* Validate all discovered locations and return summary
*/
export async function validateDiscoveredLocations(
stateCode?: string
): Promise<ValidationSummary> {
let query = `
SELECT * FROM dutchie_discovery_locations
WHERE status = 'discovered'
`;
const params: string[] = [];
if (stateCode) {
query += ` AND state_code = $1`;
params.push(stateCode);
}
const result = await pool.query(query, params);
const locations = result.rows as DiscoveryLocationRow[];
const invalidRecords: ValidationSummary['invalidRecords'] = [];
let validCount = 0;
for (const loc of locations) {
const validation = validateForPromotion(loc);
if (validation.valid) {
validCount++;
} else {
invalidRecords.push({
id: loc.id,
name: loc.name,
errors: validation.errors,
});
}
}
return {
totalChecked: locations.length,
validCount,
invalidCount: invalidRecords.length,
invalidRecords,
};
}
// ============================================================
// PROMOTION
// ============================================================
export interface PromotionResult {
discoveryId: number;
dispensaryId: number;
action: 'created' | 'updated' | 'skipped';
name: string;
}
export interface PromotionSummary {
totalProcessed: number;
created: number;
updated: number;
skipped: number;
rejected: number;
results: PromotionResult[];
rejectedRecords: Array<{
id: number;
name: string;
errors: string[];
}>;
durationMs: number;
// Per TASK_WORKFLOW_2024-12-10.md: Track new dispensary IDs for task chaining
newDispensaryIds: number[];
}
/**
* Generate a URL-safe slug from name and city
*/
function generateSlug(name: string, city: string, state: string): string {
const base = `${name}-${city}-${state}`
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-|-$/g, '')
.substring(0, 100);
return base;
}
/**
* Log a promotion action to dutchie_promotion_log
*/
async function logPromotionAction(
action: string,
discoveryId: number | null,
dispensaryId: number | null,
stateCode: string | null,
storeName: string | null,
validationErrors: string[] | null = null,
fieldChanges: Record<string, any> | null = null,
triggeredBy: string = 'auto'
): Promise<void> {
await pool.query(`
INSERT INTO dutchie_promotion_log
(discovery_id, dispensary_id, action, state_code, store_name, validation_errors, field_changes, triggered_by)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, [
discoveryId,
dispensaryId,
action,
stateCode,
storeName,
validationErrors,
fieldChanges ? JSON.stringify(fieldChanges) : null,
triggeredBy,
]);
}
/**
* Create a status alert for the dashboard
*/
export async function createStatusAlert(
dispensaryId: number,
profileId: number | null,
alertType: string,
severity: 'info' | 'warning' | 'error' | 'critical',
message: string,
previousStatus?: string | null,
newStatus?: string | null,
metadata?: Record<string, any>
): Promise<number> {
const result = await pool.query(`
INSERT INTO crawler_status_alerts
(dispensary_id, profile_id, alert_type, severity, message, previous_status, new_status, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id
`, [
dispensaryId,
profileId,
alertType,
severity,
message,
previousStatus || null,
newStatus || null,
metadata ? JSON.stringify(metadata) : null,
]);
return result.rows[0].id;
}
/**
* Create or update crawler profile for a dispensary with initial sandbox status
*/
async function ensureCrawlerProfile(
dispensaryId: number,
dispensaryName: string,
platformDispensaryId: string
): Promise<{ profileId: number; created: boolean }> {
// Check if profile already exists
const existingResult = await pool.query(`
SELECT id FROM dispensary_crawler_profiles
WHERE dispensary_id = $1 AND enabled = true
LIMIT 1
`, [dispensaryId]);
if (existingResult.rows.length > 0) {
return { profileId: existingResult.rows[0].id, created: false };
}
// Create new profile with sandbox status
const profileKey = dispensaryName
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-|-$/g, '')
.substring(0, 50);
const insertResult = await pool.query(`
INSERT INTO dispensary_crawler_profiles (
dispensary_id,
profile_name,
profile_key,
crawler_type,
status,
status_reason,
status_changed_at,
config,
enabled,
consecutive_successes,
consecutive_failures,
created_at,
updated_at
) VALUES (
$1, $2, $3, 'dutchie', 'sandbox', 'Newly promoted from discovery', CURRENT_TIMESTAMP,
$4::jsonb, true, 0, 0, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
)
RETURNING id
`, [
dispensaryId,
dispensaryName,
profileKey,
JSON.stringify({
platformDispensaryId,
useBothModes: true,
downloadImages: true,
trackStock: true,
}),
]);
const profileId = insertResult.rows[0].id;
// Create status alert for new sandbox store
await createStatusAlert(
dispensaryId,
profileId,
'promoted',
'info',
`${dispensaryName} promoted to sandbox - awaiting first successful crawl`,
null,
'sandbox',
{ source: 'discovery_promotion', platformDispensaryId }
);
return { profileId, created: true };
}
/**
* Promote a single discovery location to dispensaries table
* Idempotent: uses ON CONFLICT on platform_dispensary_id
*/
async function promoteLocation(
loc: DiscoveryLocationRow
): Promise<PromotionResult> {
const slug = loc.platform_slug || generateSlug(loc.name, loc.city || '', loc.state_code || '');
// Upsert into dispensaries
// ON CONFLICT by platform_dispensary_id ensures idempotency
const upsertResult = await pool.query(`
INSERT INTO dispensaries (
platform,
name,
slug,
city,
state,
address1,
address2,
zipcode,
postal_code,
phone,
website,
email,
latitude,
longitude,
timezone,
platform_dispensary_id,
menu_url,
menu_type,
description,
logo_image,
banner_image,
offer_pickup,
offer_delivery,
is_medical,
is_recreational,
chain_slug,
enterprise_id,
c_name,
country,
status,
crawl_enabled,
dutchie_verified,
dutchie_verified_at,
dutchie_discovery_id,
created_at,
updated_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
$11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
$21, $22, $23, $24, $25, $26, $27, $28, $29, $30,
$31, $32, $33, $34, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
)
ON CONFLICT (platform_dispensary_id) WHERE platform_dispensary_id IS NOT NULL
DO UPDATE SET
name = EXCLUDED.name,
city = EXCLUDED.city,
state = EXCLUDED.state,
address1 = EXCLUDED.address1,
address2 = EXCLUDED.address2,
zipcode = EXCLUDED.zipcode,
postal_code = EXCLUDED.postal_code,
phone = EXCLUDED.phone,
website = EXCLUDED.website,
email = EXCLUDED.email,
latitude = EXCLUDED.latitude,
longitude = EXCLUDED.longitude,
timezone = EXCLUDED.timezone,
menu_url = EXCLUDED.menu_url,
description = EXCLUDED.description,
logo_image = EXCLUDED.logo_image,
banner_image = EXCLUDED.banner_image,
offer_pickup = EXCLUDED.offer_pickup,
offer_delivery = EXCLUDED.offer_delivery,
is_medical = EXCLUDED.is_medical,
is_recreational = EXCLUDED.is_recreational,
chain_slug = EXCLUDED.chain_slug,
enterprise_id = EXCLUDED.enterprise_id,
c_name = EXCLUDED.c_name,
country = EXCLUDED.country,
status = EXCLUDED.status,
dutchie_discovery_id = EXCLUDED.dutchie_discovery_id,
updated_at = CURRENT_TIMESTAMP
RETURNING id, (xmax = 0) AS inserted
`, [
loc.platform || 'dutchie', // $1 platform
loc.name, // $2 name
slug, // $3 slug
loc.city, // $4 city
loc.state_code, // $5 state
loc.address_line1, // $6 address1
loc.address_line2, // $7 address2
loc.postal_code, // $8 zipcode
loc.postal_code, // $9 postal_code
loc.phone, // $10 phone
loc.website, // $11 website
loc.email, // $12 email
loc.latitude, // $13 latitude
loc.longitude, // $14 longitude
loc.timezone, // $15 timezone
loc.platform_location_id, // $16 platform_dispensary_id
loc.platform_menu_url, // $17 menu_url
'dutchie', // $18 menu_type
loc.description, // $19 description
loc.logo_image, // $20 logo_image
loc.banner_image, // $21 banner_image
loc.offers_pickup ?? true, // $22 offer_pickup
loc.offers_delivery ?? false, // $23 offer_delivery
loc.is_medical ?? false, // $24 is_medical
loc.is_recreational ?? true, // $25 is_recreational
loc.chain_slug, // $26 chain_slug
loc.enterprise_id, // $27 enterprise_id
loc.c_name, // $28 c_name
loc.country || 'United States', // $29 country
loc.store_status || 'open', // $30 status
true, // $31 crawl_enabled
true, // $32 dutchie_verified
new Date(), // $33 dutchie_verified_at
loc.id, // $34 dutchie_discovery_id
]);
const dispensaryId = upsertResult.rows[0].id;
const wasInserted = upsertResult.rows[0].inserted;
// Link discovery location back to dispensary and update status
await pool.query(`
UPDATE dutchie_discovery_locations
SET
dispensary_id = $1,
status = 'verified',
verified_at = CURRENT_TIMESTAMP,
verified_by = 'auto-promotion'
WHERE id = $2
`, [dispensaryId, loc.id]);
// Create crawler profile with sandbox status for new dispensaries
if (wasInserted && loc.platform_location_id) {
await ensureCrawlerProfile(dispensaryId, loc.name, loc.platform_location_id);
}
const action = wasInserted ? 'promoted_create' : 'promoted_update';
// Log the promotion
await logPromotionAction(
action,
loc.id,
dispensaryId,
loc.state_code,
loc.name,
null,
{ slug, city: loc.city, platform_location_id: loc.platform_location_id }
);
return {
discoveryId: loc.id,
dispensaryId,
action: wasInserted ? 'created' : 'updated',
name: loc.name,
};
}
/**
* Promote all valid discovered locations to dispensaries
*
* @param stateCode Optional filter by state (e.g., 'CA', 'AZ')
* @param dryRun If true, only validate without making changes
*/
export async function promoteDiscoveredLocations(
stateCode?: string,
dryRun = false
): Promise<PromotionSummary> {
const startTime = Date.now();
let query = `
SELECT * FROM dutchie_discovery_locations
WHERE status = 'discovered'
`;
const params: string[] = [];
if (stateCode) {
query += ` AND state_code = $1`;
params.push(stateCode);
}
query += ` ORDER BY id`;
const result = await pool.query(query, params);
const locations = result.rows as DiscoveryLocationRow[];
const results: PromotionResult[] = [];
const rejectedRecords: PromotionSummary['rejectedRecords'] = [];
// Per TASK_WORKFLOW_2024-12-10.md: Track new dispensary IDs for task chaining
const newDispensaryIds: number[] = [];
let created = 0;
let updated = 0;
let skipped = 0;
let rejected = 0;
for (const loc of locations) {
// Step 2: Validation
const validation = validateForPromotion(loc);
if (!validation.valid) {
rejected++;
rejectedRecords.push({
id: loc.id,
name: loc.name,
errors: validation.errors,
});
// Mark as rejected if not dry run
if (!dryRun) {
await pool.query(`
UPDATE dutchie_discovery_locations
SET status = 'rejected', notes = $1
WHERE id = $2
`, [validation.errors.join('; '), loc.id]);
// Log the rejection
await logPromotionAction(
'rejected',
loc.id,
null,
loc.state_code,
loc.name,
validation.errors
);
}
continue;
}
// Step 3: Promotion (skip if dry run)
if (dryRun) {
skipped++;
results.push({
discoveryId: loc.id,
dispensaryId: 0,
action: 'skipped',
name: loc.name,
});
continue;
}
try {
const promotionResult = await promoteLocation(loc);
results.push(promotionResult);
if (promotionResult.action === 'created') {
created++;
// Per TASK_WORKFLOW_2024-12-10.md: Track new IDs for task chaining
newDispensaryIds.push(promotionResult.dispensaryId);
} else {
updated++;
}
} catch (error: any) {
console.error(`Failed to promote location ${loc.id} (${loc.name}):`, error.message);
rejected++;
rejectedRecords.push({
id: loc.id,
name: loc.name,
errors: [`Promotion error: ${error.message}`],
});
}
}
return {
totalProcessed: locations.length,
created,
updated,
skipped,
rejected,
results,
rejectedRecords,
durationMs: Date.now() - startTime,
// Per TASK_WORKFLOW_2024-12-10.md: Return new IDs for task chaining
newDispensaryIds,
};
}
/**
* Promote a single discovery location by ID
*/
export async function promoteSingleLocation(
discoveryId: number
): Promise<PromotionResult> {
const result = await pool.query(
`SELECT * FROM dutchie_discovery_locations WHERE id = $1`,
[discoveryId]
);
if (result.rows.length === 0) {
throw new Error(`Discovery location ${discoveryId} not found`);
}
const loc = result.rows[0] as DiscoveryLocationRow;
// Validate
const validation = validateForPromotion(loc);
if (!validation.valid) {
throw new Error(`Validation failed: ${validation.errors.join(', ')}`);
}
// Promote
return promoteLocation(loc);
}