feat(discovery): Add self-healing and rename schedule

- Rename 'store_discovery_dutchie' to 'Store Discovery' (platform badge via platform field)
- Add self-healing: scan for stores missing payloads and queue product_discovery
- Catches stores added before chaining was implemented
- Limits to 50 stores per run to avoid overwhelming the system

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Kelly
2025-12-13 14:14:21 -07:00
parent e9a688fbb3
commit 59e0e45f8f
4 changed files with 58 additions and 4 deletions

View File

@@ -24,11 +24,11 @@ SET interval_hours = 168, -- 7 days
is_immutable = TRUE, is_immutable = TRUE,
method = 'http', method = 'http',
description = 'Discover new Dutchie stores weekly (HTTP transport)' description = 'Discover new Dutchie stores weekly (HTTP transport)'
WHERE name = 'store_discovery_dutchie'; WHERE name IN ('store_discovery_dutchie', 'Store Discovery');
-- Insert if doesn't exist -- Insert if doesn't exist
INSERT INTO task_schedules (name, role, interval_hours, priority, description, is_immutable, method, platform, next_run_at) INSERT INTO task_schedules (name, role, interval_hours, priority, description, is_immutable, method, platform, next_run_at)
VALUES ('store_discovery_dutchie', 'store_discovery', 168, 5, 'Discover new Dutchie stores weekly (HTTP transport)', TRUE, 'http', 'dutchie', NOW()) VALUES ('Store Discovery', 'store_discovery', 168, 5, 'Discover new Dutchie stores weekly (HTTP transport)', TRUE, 'http', 'dutchie', NOW())
ON CONFLICT (name) DO UPDATE SET ON CONFLICT (name) DO UPDATE SET
interval_hours = 168, interval_hours = 168,
is_immutable = TRUE, is_immutable = TRUE,

View File

@@ -0,0 +1,10 @@
-- Migration: 106_rename_store_discovery_schedule.sql
-- Description: Rename store_discovery_dutchie to 'Store Discovery'
-- Created: 2025-12-13
-- Update the schedule name for better display
-- The platform='dutchie' field is preserved for badge display in UI
UPDATE task_schedules
SET name = 'Store Discovery',
updated_at = NOW()
WHERE name = 'store_discovery_dutchie';

View File

@@ -99,7 +99,7 @@ class TaskScheduler {
// Core schedules - all use HTTP transport for browser-based scraping // Core schedules - all use HTTP transport for browser-based scraping
const defaults = [ const defaults = [
{ {
name: 'store_discovery_dutchie', name: 'Store Discovery',
role: 'store_discovery' as TaskRole, role: 'store_discovery' as TaskRole,
interval_hours: 168, // Weekly interval_hours: 168, // Weekly
priority: 5, priority: 5,

View File

@@ -438,7 +438,50 @@ export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskRe
await browser.close(); await browser.close();
browser = null; browser = null;
console.log(`[StoreDiscoveryHTTP] Complete: ${totalDiscovered} new, ${totalUpserted} upserted, ${allNewStoreIds.length} promoted`); // ============================================================
// SELF-HEALING: Find existing stores missing payloads
// This catches stores that were added before chaining was implemented,
// or stores where product_discovery previously failed.
// ============================================================
let healedStoreIds: number[] = [];
try {
const healResult = await pool.query(`
SELECT d.id, d.name
FROM dispensaries d
WHERE d.platform = 'dutchie'
AND d.crawl_enabled = true
AND (d.stage IS NULL OR d.stage NOT IN ('deprecated', 'failing'))
AND d.platform_dispensary_id IS NOT NULL
AND d.last_payload_at IS NULL
AND NOT EXISTS (
SELECT 1 FROM task_queue t
WHERE t.dispensary_id = d.id
AND t.role = 'product_discovery'
AND t.status IN ('pending', 'running')
)
ORDER BY d.id
LIMIT 50
`);
if (healResult.rows.length > 0) {
console.log(`[StoreDiscoveryHTTP] Self-healing: Found ${healResult.rows.length} stores missing payloads`);
for (const store of healResult.rows) {
await pool.query(`
INSERT INTO task_queue (role, dispensary_id, priority, scheduled_for, method, platform)
VALUES ('product_discovery', $1, 5, NOW(), 'http', 'dutchie')
ON CONFLICT DO NOTHING
`, [store.id]);
healedStoreIds.push(store.id);
}
console.log(`[StoreDiscoveryHTTP] Self-healing: Queued ${healedStoreIds.length} product_discovery tasks`);
}
} catch (healErr: any) {
console.error(`[StoreDiscoveryHTTP] Self-healing error:`, healErr.message);
}
console.log(`[StoreDiscoveryHTTP] Complete: ${totalDiscovered} new, ${totalUpserted} upserted, ${allNewStoreIds.length} promoted, ${healedStoreIds.length} healed`);
return { return {
success: true, success: true,
@@ -446,6 +489,7 @@ export async function handleStoreDiscoveryHttp(ctx: TaskContext): Promise<TaskRe
storesUpserted: totalUpserted, storesUpserted: totalUpserted,
statesProcessed: stateCodesToDiscover.length, statesProcessed: stateCodesToDiscover.length,
newStoreIds: allNewStoreIds, newStoreIds: allNewStoreIds,
healedStoreIds,
}; };
} catch (error: unknown) { } catch (error: unknown) {