diff --git a/src/index.ts b/src/index.ts index f99f9e4..896ccfa 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,9 +17,9 @@ import { stopDefaultCheckerScheduler, } from "./services/defaultChecker.js"; import { - startWebhookRetryScheduler, - stopWebhookRetryScheduler, -} from "./services/webhookRetryScheduler.js"; + startWebhookRetryProcessor, + stopWebhookRetryProcessor, +} from "./services/webhookRetryProcessor.js"; import { eventStreamService } from "./services/eventStreamService.js"; import { startNotificationCleanupScheduler, @@ -61,8 +61,8 @@ const server = app.listen(port, () => { // Start periodic on-chain default checks (if configured) startDefaultCheckerScheduler(); - // Start webhook retry scheduler - startWebhookRetryScheduler(); + // Start webhook retry processor + startWebhookRetryProcessor(); // Start scheduled score reconciliation against on-chain state startScoreReconciliationScheduler(); @@ -87,7 +87,7 @@ const shutdown = async (signal: "SIGTERM" | "SIGINT") => { try { await stopIndexer(); stopDefaultCheckerScheduler(); - stopWebhookRetryScheduler(); + stopWebhookRetryProcessor(); stopScoreReconciliationScheduler(); stopNotificationCleanupScheduler(); diff --git a/src/services/__tests__/webhookRetryProcessor.test.ts b/src/services/__tests__/webhookRetryProcessor.test.ts new file mode 100644 index 0000000..13a841a --- /dev/null +++ b/src/services/__tests__/webhookRetryProcessor.test.ts @@ -0,0 +1,29 @@ +import { jest, describe, it, expect } from "@jest/globals"; +import { + WEBHOOK_RETRY_CONFIG, + getRetryDelayMs, +} from "../webhookService.js"; + +describe("Webhook Retry Processor", () => { + it("respects the configured backoff delays", () => { + expect(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS).toEqual([ + 5 * 60 * 1000, + 15 * 60 * 1000, + 45 * 60 * 1000, + ]); + + expect(getRetryDelayMs(1)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[0]); + expect(getRetryDelayMs(2)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[1]); + expect(getRetryDelayMs(3)).toBe(WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[2]); + }); + + it("caps the backoff delay at the last configured value", () => { + const maxDelay = WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS[WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS.length - 1]; + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS)).toBe(maxDelay); + expect(getRetryDelayMs(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS + 5)).toBe(maxDelay); + }); + + it("configures exactly 4 max attempts", () => { + expect(WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS).toBe(4); + }); +}); diff --git a/src/services/webhookRetryScheduler.ts b/src/services/webhookRetryScheduler.ts deleted file mode 100644 index cf62b26..0000000 --- a/src/services/webhookRetryScheduler.ts +++ /dev/null @@ -1,118 +0,0 @@ -import { query } from "../db/connection.js"; -import logger from "../utils/logger.js"; -import { WebhookService, type WebhookEventType } from "./webhookService.js"; -import { cacheService } from "./cacheService.js"; - -const BACKOFF = [60, 300, 1800]; // seconds - -const LOCK_KEY = "webhook_retry_scheduler:running"; -const LOCK_TTL_SECONDS = 120; // 2 minutes - -let schedulerInterval: NodeJS.Timeout | null = null; - -async function markAsFailed(deliveryId: number) { - await query( - `UPDATE webhook_deliveries - SET next_retry_at = NULL, - last_error = $1, - updated_at = NOW() - WHERE id = $2`, - ["Permanently failed after max attempts reached", deliveryId], - ); - logger.error(`Webhook delivery ${deliveryId} marked as permanently failed.`); -} - -function shouldRetry(delivery: any, delay: number): boolean { - const lastAttempt = new Date(delivery.updated_at).getTime(); - const now = Date.now(); - return now >= lastAttempt + delay * 1000; -} - -async function sendWebhookAgain(delivery: any) { - logger.info( - `Retrying webhook delivery ${delivery.id} (attempt ${delivery.attempt_count + 1})`, - ); - - await WebhookService.retryWebhookDelivery( - delivery.id, - delivery.subscription_id, - delivery.callback_url, - delivery.secret || undefined, - delivery.event_id, - delivery.event_type as WebhookEventType, - delivery.payload, - delivery.attempt_count, - ); -} - -export async function retryFailedWebhooks() { - let lockAcquired = false; - try { - const lockValue = `${Date.now()}-${Math.random().toString(16).slice(2)}`; - lockAcquired = await cacheService.setNotExists( - LOCK_KEY, - lockValue, - LOCK_TTL_SECONDS, - ); - } catch (error) { - logger.error("Failed to acquire webhook retry scheduler lock", { error }); - } - - if (!lockAcquired) { - logger.warn( - "Webhook retry scheduler run skipped - another instance is already running", - ); - return; - } - - try { - const result = await query(` - SELECT wd.*, ws.max_attempts, ws.callback_url, ws.secret - FROM webhook_deliveries wd - JOIN webhook_subscriptions ws ON wd.subscription_id = ws.id - WHERE wd.delivered_at IS NULL - AND (wd.next_retry_at IS NOT NULL OR wd.attempt_count = 0) - `); - - const failed = result.rows; - - for (const delivery of failed) { - const delay = BACKOFF[delivery.attempt_count] || 3600; - - if (delivery.attempt_count >= delivery.max_attempts) { - await markAsFailed(delivery.id); - continue; - } - - if (shouldRetry(delivery, delay)) { - await sendWebhookAgain(delivery); - } - } - } catch (error) { - logger.error("Error in webhook retry scheduler", { error }); - } finally { - try { - await cacheService.delete(LOCK_KEY); - } catch (error) { - logger.error("Failed to release webhook retry scheduler lock", { error }); - } - } -} - -export function startWebhookRetryScheduler() { - if (schedulerInterval) { - logger.warn("Webhook retry scheduler already running"); - return; - } - - logger.info("Starting webhook retry scheduler (60s interval)"); - schedulerInterval = setInterval(retryFailedWebhooks, 60000); -} - -export function stopWebhookRetryScheduler() { - if (schedulerInterval) { - logger.info("Stopping webhook retry scheduler"); - clearInterval(schedulerInterval); - schedulerInterval = null; - } -} diff --git a/src/services/webhookService.ts b/src/services/webhookService.ts index 964c14c..59e378d 100644 --- a/src/services/webhookService.ts +++ b/src/services/webhookService.ts @@ -270,22 +270,19 @@ async function postWebhook( } } -// Retry configuration for webhook delivery. -// This yields retry attempts at ~5m, ~15m, and ~45m after a failed delivery, -// for a total retry window a little over one hour after the initial attempt. -const RETRY_DELAYS_MS = [ - 5 * 60 * 1000, - 15 * 60 * 1000, - 45 * 60 * 1000, -] as const; - -const MAX_RETRY_ATTEMPTS = RETRY_DELAYS_MS.length + 1; +export const WEBHOOK_RETRY_CONFIG = { + RETRY_DELAYS_MS: [ + 5 * 60 * 1000, + 15 * 60 * 1000, + 45 * 60 * 1000, + ] as const, + MAX_RETRY_ATTEMPTS: 4, +}; export const getRetryDelayMs = (attemptNumber: number): number => { - const delayIndex = Math.min(attemptNumber - 1, RETRY_DELAYS_MS.length - 1); - return ( - RETRY_DELAYS_MS[delayIndex] ?? RETRY_DELAYS_MS[RETRY_DELAYS_MS.length - 1]! - ); + const delays = WEBHOOK_RETRY_CONFIG.RETRY_DELAYS_MS; + const delayIndex = Math.min(attemptNumber - 1, delays.length - 1); + return delays[delayIndex] ?? delays[delays.length - 1]!; }; export class WebhookService { @@ -296,8 +293,8 @@ export class WebhookService { try { const now = new Date(); const result = await query( - `SELECT id, subscription_id, callback_url, secret, event_id, event_type, - payload, attempt_count + `SELECT wd.id, wd.subscription_id, ws.callback_url, ws.secret, wd.event_id, wd.event_type, + wd.payload, wd.attempt_count FROM webhook_deliveries wd JOIN webhook_subscriptions ws ON wd.subscription_id = ws.id WHERE wd.delivered_at IS NULL @@ -306,7 +303,7 @@ export class WebhookService { AND wd.attempt_count < $2 ORDER BY wd.next_retry_at ASC LIMIT 100`, - [now, MAX_RETRY_ATTEMPTS], + [now, WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS], ); if (result.rows.length === 0) { @@ -397,7 +394,7 @@ export class WebhookService { } else { // Schedule next retry or mark as permanently failed const nextRetryTime = - newAttemptCount < MAX_RETRY_ATTEMPTS + newAttemptCount < WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS ? new Date(Date.now() + getRetryDelayMs(newAttemptCount)) : null; @@ -446,7 +443,7 @@ export class WebhookService { } catch (error) { const newAttemptCount = attemptCount + 1; const nextRetryTime = - newAttemptCount < MAX_RETRY_ATTEMPTS + newAttemptCount < WEBHOOK_RETRY_CONFIG.MAX_RETRY_ATTEMPTS ? new Date(Date.now() + getRetryDelayMs(newAttemptCount)) : null;