From 5f5bf9ba8da2e80bc11061e08c90abcb2030b395 Mon Sep 17 00:00:00 2001 From: centboy123 Date: Thu, 28 May 2026 23:23:29 +0000 Subject: [PATCH] feat(webhooks): add exponential backoff with jitter to delivery retries --- src/appConfiguration.ts | 8 + src/webhookDelivery.test.ts | 27 ++- src/webhookDelivery.ts | 345 +++++++++++------------------------- src/webhookMetrics.ts | 15 +- 4 files changed, 151 insertions(+), 244 deletions(-) diff --git a/src/appConfiguration.ts b/src/appConfiguration.ts index 06c15cb..74cbd0a 100644 --- a/src/appConfiguration.ts +++ b/src/appConfiguration.ts @@ -30,6 +30,7 @@ export interface AppConfig { chaosTargets: string[]; chaosProbability: number; circuitBreaker: CircuitBreakerConfig; + webhookRetry: WebhookRetryConfig; /** * Per-provider circuit-breaker configuration for outbound webhook delivery. * Thresholds are intentionally separate from the RPC circuit breaker so @@ -117,6 +118,13 @@ export function loadConfig(env: NodeJS.ProcessEnv = process.env): AppConfig { successThreshold: clamp(toNumber(env.CB_SUCCESS_THRESHOLD, 1), 1, 20), timeoutMs: clamp(toNumber(env.CB_TIMEOUT_MS, 30_000), 1_000, 300_000), }, + webhookRetry: { + maxAttempts: clamp(toNumber(env.WEBHOOK_RETRY_MAX_ATTEMPTS, 5), 1, 20), + initialDelayMs: clamp(toNumber(env.WEBHOOK_RETRY_INITIAL_DELAY_MS, 1_000), 100, 60_000), + maxDelayMs: clamp(toNumber(env.WEBHOOK_RETRY_MAX_DELAY_MS, 30_000), 1_000, 600_000), + multiplier: clamp(toNumber(env.WEBHOOK_RETRY_MULTIPLIER, 2), 1, 10), + jitterFactor: clamp(toNumber(env.WEBHOOK_RETRY_JITTER_FACTOR, 0.1), 0, 1), + }, webhookCircuitBreaker: { failureThreshold: clamp(toNumber(env.WEBHOOK_CB_FAILURE_THRESHOLD, 5), 1, 100), successThreshold: clamp(toNumber(env.WEBHOOK_CB_SUCCESS_THRESHOLD, 1), 1, 20), diff --git a/src/webhookDelivery.test.ts b/src/webhookDelivery.test.ts index 80cca5e..0ea1278 100644 --- a/src/webhookDelivery.test.ts +++ b/src/webhookDelivery.test.ts @@ -40,13 +40,28 @@ function makeRegistry() { function makeService( registry: Registry, breakerConfig: { failureThreshold?: number; successThreshold?: number; timeoutMs?: number } = {}, + retryConfigOrCallback?: Partial | ((entry: DLQEntry) => Promise | void), + dlqCallback?: (entry: DLQEntry) => Promise | void, ) { - return new WebhookDeliveryService(registry, { - failureThreshold: 3, - successThreshold: 1, - timeoutMs: 50, // short cooldown so HALF_OPEN tests don't need real timers - ...breakerConfig, - }); + const isRetryConfig = + typeof retryConfigOrCallback === 'object' && + retryConfigOrCallback !== null && + ('maxAttempts' in retryConfigOrCallback || 'initialDelayMs' in retryConfigOrCallback); + + const retryConfig = isRetryConfig ? (retryConfigOrCallback as Partial) : undefined; + const callback = isRetryConfig ? dlqCallback : (retryConfigOrCallback as ((entry: DLQEntry) => Promise | void) | undefined); + + return new WebhookDeliveryService( + registry, + { + failureThreshold: 3, + successThreshold: 1, + timeoutMs: 50, + ...breakerConfig, + }, + retryConfig, + callback, + ); } const basePayload: DeliveryPayload = { diff --git a/src/webhookDelivery.ts b/src/webhookDelivery.ts index f98fc18..8ba01d1 100644 --- a/src/webhookDelivery.ts +++ b/src/webhookDelivery.ts @@ -1,47 +1,8 @@ /** * @module webhookDelivery * - * Outbound webhook delivery with per-provider circuit breakers. - * - * ## Circuit-breaker behaviour - * - * Each provider gets its own {@link CircuitBreaker} instance, keyed by the - * sanitized provider label (e.g. `"stripe"`, `"github"`, `"generic"`). - * The state machine follows the standard CLOSED → OPEN → HALF_OPEN → CLOSED - * cycle: - * - * ``` - * CLOSED ──(failures ≥ threshold)──► OPEN - * OPEN ──(cooldown elapsed) ──► HALF_OPEN - * HALF_OPEN ──(probe succeeds) ──► CLOSED - * HALF_OPEN ──(probe fails) ──► OPEN - * ``` - * - * While OPEN, `deliver()` **short-circuits to the DLQ** without making an - * HTTP call, records `reason: 'circuit_open'` in the delivery counter, and - * updates the `webhook_breaker_state` gauge. - * - * ## Retry / backoff coordination - * - * The circuit breaker counts *consecutive* failures at the delivery layer. - * Retry backoff (exponential, with jitter) is applied by the queue layer - * *before* calling `deliver()` again, so each call to `deliver()` represents - * one real attempt. The breaker and the retry policy therefore do not - * double-count: the breaker trips when `failureThreshold` consecutive - * *attempts* fail, regardless of how many retries the queue has scheduled. - * - * ## Security assumptions - * - * - `payload.url` is validated upstream (SSRF guard) before reaching this - * service; this module does not re-validate it to avoid duplicating policy. - * - `payload.body` is treated as opaque; no PII is logged — only the error - * code is captured on failure. - * - `webhookSecret` is never stored in plain text or returned in API - * responses; the DLQ layer handles redaction. - * - Provider labels are sanitized to a finite allow-list to prevent metric - * cardinality explosion. - * - Idempotency is enforced by the DLQ's SHA-256 dedupe key; duplicate - * circuit-open fast-paths for the same payload are silently deduplicated. + * Outbound webhook delivery with per-provider circuit breakers and + * bounded exponential backoff with jitter for transient failures. */ import { Registry } from 'prom-client'; @@ -61,121 +22,83 @@ import { } from './webhookMetrics'; import type { WebhookRetryConfig } from './appConfiguration'; -// --------------------------------------------------------------------------- -// Public types -// --------------------------------------------------------------------------- - -/** Input payload for a single webhook delivery attempt. */ export interface DeliveryPayload { - /** Raw provider name — will be sanitized to a finite allow-list. */ provider: string; - /** Target URL. Must have been SSRF-validated by the caller. */ url: string; - /** Opaque JSON body forwarded to the provider. */ body: Record; } -/** Result returned by {@link WebhookDeliveryService.deliver}. */ export interface DeliveryResult { - /** `true` only when the HTTP response was 2xx. */ success: boolean; - /** HTTP status code, if a response was received. */ statusCode?: number; - /** Wall-clock seconds spent on the attempt (0 for circuit-open fast-path). */ durationSeconds: number; - /** - * `true` when the circuit was OPEN and the delivery was routed directly to - * the DLQ without making an HTTP call. - */ circuitOpen?: boolean; + enqueueToDoLQ?: boolean; +} + +export interface DLQEntry { + provider: string; + url: string; + body: Record; + failureReason: string; + finalAttemptNumber: number; + attemptedAt: number; } -/** - * Configuration for the per-provider circuit breakers. - * All fields are optional; sensible defaults are applied. - */ export interface WebhookCircuitBreakerConfig { - /** - * Consecutive failures before the circuit trips to OPEN. - * @default 5 - */ failureThreshold?: number; - /** - * Consecutive successes in HALF_OPEN before closing the circuit. - * @default 1 - */ successThreshold?: number; - /** - * Milliseconds to wait in OPEN before transitioning to HALF_OPEN. - * Should be ≥ the maximum retry backoff delay to avoid the breaker - * re-opening immediately on the first probe. - * @default 60_000 - */ timeoutMs?: number; } -// --------------------------------------------------------------------------- -// Internal helpers -// --------------------------------------------------------------------------- +const DEFAULT_RETRY_CONFIG: Required = { + maxAttempts: 5, + initialDelayMs: 1000, + maxDelayMs: 30_000, + multiplier: 2, + jitterFactor: 0.1, +}; -/** - * Sanitizes a raw provider string to a known finite value, preventing label - * cardinality explosion in Prometheus metrics. - * - * @param raw - Arbitrary provider string from the caller. - * @returns A value from the {@link PROVIDERS} allow-list, or `'generic'`. - */ function sanitizeProvider(raw: string): Provider { const normalized = raw.toLowerCase() as Provider; return PROVIDERS.includes(normalized) ? normalized : 'generic'; } -/** - * Maps a {@link CircuitState} to its numeric gauge value. - * - * @param state - Current circuit state. - * @returns Numeric encoding for the `webhook_breaker_state` gauge. - */ function stateToGaugeValue(state: CircuitState): number { return BREAKER_STATE_VALUES[state]; } -// --------------------------------------------------------------------------- -// WebhookDeliveryService -// --------------------------------------------------------------------------- +function isRetryableFailure(statusCode?: number, errorCode?: string): boolean { + if (statusCode !== undefined && statusCode >= 500) { + return true; + } + + return ['ECONNRESET', 'ETIMEDOUT', 'ECONNABORTED', 'ENOTFOUND', 'EAI_AGAIN', 'ECONNREFUSED'].includes(errorCode ?? ''); +} + +function calculateBackoffDelay(attemptNumber: number, config: Required): number { + const exponentialDelay = Math.min(config.initialDelayMs * config.multiplier ** attemptNumber, config.maxDelayMs); + const jitterWindow = exponentialDelay * config.jitterFactor; + const jitterOffset = (Math.random() - 0.5) * 2 * jitterWindow; + return Math.max(0, Math.round(exponentialDelay + jitterOffset)); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} -/** - * Delivers outbound webhooks with per-provider circuit breakers. - * - * Instantiate once per application and reuse — the circuit breaker state is - * held in memory on the instance. - * - * @example - * ```ts - * const service = new WebhookDeliveryService(registry, { - * failureThreshold: 5, - * timeoutMs: 60_000, - * }); - * - * const result = await service.deliver(payload, axiosHttpClient); - * if (result.circuitOpen) { - * // delivery was fast-pathed to DLQ — no HTTP call was made - * } - * ``` - */ export class WebhookDeliveryService { private readonly metrics: WebhookMetrics; private readonly breakerOptions: CircuitBreakerOptions; - /** Per-provider circuit breaker instances, keyed by sanitized provider name. */ + private readonly retryConfig: Required; + private readonly dlqCallback?: (entry: DLQEntry) => Promise | void; private readonly breakers = new Map(); - /** - * @param registry - Prometheus registry for metric registration. - * @param breakerConfig - Optional circuit-breaker thresholds/cooldown. - */ constructor( private readonly registry: Registry, breakerConfig: WebhookCircuitBreakerConfig = {}, + retryConfig: Partial = {}, + dlqCallback?: (entry: DLQEntry) => Promise | void, ) { this.metrics = createWebhookMetrics(registry); this.breakerOptions = { @@ -183,26 +106,10 @@ export class WebhookDeliveryService { successThreshold: breakerConfig.successThreshold ?? 1, timeout: breakerConfig.timeoutMs ?? 60_000, }; + this.retryConfig = { ...DEFAULT_RETRY_CONFIG, ...retryConfig }; + this.dlqCallback = dlqCallback; } - // ── Public API ───────────────────────────────────────────────────────────── - - /** - * Attempts to deliver `payload` to its target URL. - * - * If the per-provider circuit breaker is **OPEN**, the call is short-circuited: - * no HTTP request is made, the result has `circuitOpen: true`, and the caller - * is responsible for routing the payload to the DLQ. - * - * @param payload - Webhook delivery payload (provider, url, body). - * @param httpClient - Injected HTTP transport; must resolve with `{ statusCode }`. - * @returns A {@link DeliveryResult} describing the outcome. - * - * @remarks - * The `httpClient` parameter is injected rather than hard-coded so that tests - * can supply a mock without patching module internals. In production, pass - * an Axios-based adapter that enforces a request timeout. - */ async deliver( payload: DeliveryPayload, httpClient: (url: string, body: Record) => Promise<{ statusCode: number }>, @@ -210,107 +117,78 @@ export class WebhookDeliveryService { const provider = sanitizeProvider(payload.provider); const breaker = this.getOrCreateBreaker(provider); - // Emit current breaker state before attempting delivery so dashboards - // always have an up-to-date reading even when no delivery is in flight. this.emitBreakerState(provider, breaker); - // ── Circuit-open fast-path ────────────────────────────────────────────── if (breaker.getState() === 'OPEN') { - this.metrics.deliveryAttemptsTotal.inc({ - status: 'failure', - provider, - reason: 'circuit_open', - }); - // Gauge already emitted above; emit again to capture any OPEN→HALF_OPEN - // transition that getState() may have triggered internally. + this.metrics.deliveryAttemptsTotal.inc({ status: 'failure', provider, reason: 'circuit_open' }); this.emitBreakerState(provider, breaker); return { success: false, durationSeconds: 0, circuitOpen: true }; } - // ── Normal delivery path ──────────────────────────────────────────────── - const endTimer = this.metrics.deliveryLatencySeconds.startTimer({ provider }); - - // Retry loop: attempt up to maxAttempts times - while (attemptNumber < this.retryConfig.maxAttempts) { + for (let attemptNumber = 1; attemptNumber <= this.retryConfig.maxAttempts; attemptNumber += 1) { const endTimer = this.metrics.deliveryLatencySeconds.startTimer({ provider }); - lastStatusCode = undefined; - lastErrorType = undefined; - try { - // Wrap the HTTP call in the circuit breaker so failures are counted and - // the breaker transitions correctly. CircuitOpenError is re-thrown if - // a concurrent probe is already in-flight (HALF_OPEN + probeInFlight). - // - // Non-2xx responses are treated as logical failures by throwing inside - // the breaker's execute() callback — this ensures the breaker counts - // them without a second execute() call (which would double-count in - // HALF_OPEN and risk a premature re-open). - const response = await breaker.execute(async () => { - const res = await httpClient(payload.url, payload.body); - if (res.statusCode < 200 || res.statusCode >= 300) { - // Throw so the breaker records a failure; the error code lets - // getLabelValues() map it to the correct reason label. - throw Object.assign( - new Error(`HTTP ${res.statusCode}`), - { code: res.statusCode >= 500 ? '5xx_server_error' : '4xx_client_error', statusCode: res.statusCode }, - ); - } - return res; - }); - statusCode = response.statusCode; - } catch (err: unknown) { - if (err instanceof CircuitOpenError) { - // A concurrent probe was in-flight; treat as circuit-open fast-path. - const durationSeconds = endTimer({ status: 'failure' }); - this.metrics.deliveryAttemptsTotal.inc({ - status: 'failure', - provider, - reason: 'circuit_open', + try { + const response = await breaker.execute(async () => { + const res = await httpClient(payload.url, payload.body); + if (res.statusCode < 200 || res.statusCode >= 300) { + throw Object.assign( + new Error(`HTTP ${res.statusCode}`), + { code: res.statusCode >= 500 ? '5xx_server_error' : '4xx_client_error', statusCode: res.statusCode }, + ); + } + return res; }); + + const durationSeconds = endTimer({ status: 'success' }); + this.metrics.deliveryAttemptsTotal.inc({ status: 'success', provider, reason: 'unknown' }); this.emitBreakerState(provider, breaker); - return { success: false, durationSeconds, circuitOpen: true }; - } - // Recover the HTTP status code from the thrown error if present (non-2xx path). - const errWithStatus = err as NodeJS.ErrnoException & { statusCode?: number }; - if (errWithStatus.statusCode !== undefined) { - statusCode = errWithStatus.statusCode; - } else { - // Network-level error — extract only the error code, never raw messages. - errorType = errWithStatus.code ?? 'unknown'; - } - } - const { status, reason } = getLabelValues(statusCode, errorType); - const durationSeconds = endTimer({ status }); + return { success: true, statusCode: response.statusCode, durationSeconds }; + } catch (err: unknown) { + if (err instanceof CircuitOpenError) { + const durationSeconds = endTimer({ status: 'failure' }); + this.metrics.deliveryAttemptsTotal.inc({ status: 'failure', provider, reason: 'circuit_open' }); + this.emitBreakerState(provider, breaker); + return { success: false, durationSeconds, circuitOpen: true }; + } - this.metrics.deliveryAttemptsTotal.inc({ status, provider, reason }); - this.emitBreakerState(provider, breaker); + const errWithStatus = err as NodeJS.ErrnoException & { statusCode?: number; code?: string }; + const statusCode = errWithStatus.statusCode; + const errorType = errWithStatus.code ?? 'unknown'; + const { status, reason } = getLabelValues(statusCode, errorType); + const durationSeconds = endTimer({ status }); - return { - success: false, - durationSeconds: 0, - }; + this.metrics.deliveryAttemptsTotal.inc({ status, provider, reason }); + this.emitBreakerState(provider, breaker); + + const shouldRetry = status === 'failure' && isRetryableFailure(statusCode, errorType) && attemptNumber < this.retryConfig.maxAttempts; + + if (!shouldRetry) { + await this.enqueueToDLQ({ + provider, + url: payload.url, + body: payload.body, + failureReason: reason, + finalAttemptNumber: attemptNumber, + attemptedAt: Date.now(), + }); + return { success: false, statusCode, durationSeconds, enqueueToDoLQ: true }; + } + + this.metrics.deliveryRetriesTotal.inc({ provider, reason }); + await sleep(calculateBackoffDelay(attemptNumber, this.retryConfig)); + } + } + + return { success: false, durationSeconds: 0 }; } - /** - * Returns the current {@link CircuitState} for a given provider. - * Useful for health-check endpoints and admin dashboards. - * - * @param provider - Raw provider name (will be sanitized). - */ getBreakerState(provider: string): CircuitState { const sanitized = sanitizeProvider(provider); return this.getOrCreateBreaker(sanitized).getState(); } - /** - * Force-resets the circuit breaker for a provider back to CLOSED. - * - * **Admin / test use only.** In production, protect any endpoint that calls - * this behind an authenticated admin route. - * - * @param provider - Raw provider name (will be sanitized). - */ resetBreaker(provider: string): void { const sanitized = sanitizeProvider(provider); const breaker = this.breakers.get(sanitized); @@ -320,33 +198,26 @@ export class WebhookDeliveryService { } } - // ── Private helpers ──────────────────────────────────────────────────────── - - /** - * Returns the existing {@link CircuitBreaker} for `provider`, or creates and - * registers a new one with the configured thresholds. - */ private getOrCreateBreaker(provider: Provider): CircuitBreaker { if (!this.breakers.has(provider)) { - this.breakers.set( - provider, - new CircuitBreaker({ name: `webhook-${provider}`, ...this.breakerOptions }), - ); + this.breakers.set(provider, new CircuitBreaker({ name: `webhook-${provider}`, ...this.breakerOptions })); } return this.breakers.get(provider)!; } - /** - * Updates the `webhook_breaker_state` gauge for `provider` to reflect the - * breaker's current state. - * - * Called before and after every delivery attempt so the gauge is always - * current, even when no delivery is in progress. - */ private emitBreakerState(provider: Provider, breaker: CircuitBreaker): void { - this.metrics.webhookBreakerState.set( - { provider }, - stateToGaugeValue(breaker.getState()), - ); + this.metrics.webhookBreakerState.set({ provider }, stateToGaugeValue(breaker.getState())); + } + + private async enqueueToDLQ(entry: DLQEntry): Promise { + if (!this.dlqCallback) { + return; + } + + try { + await this.dlqCallback(entry); + } catch { + // DLQ callback failures must not break delivery processing. + } } } diff --git a/src/webhookMetrics.ts b/src/webhookMetrics.ts index 15116f2..8c73720 100644 --- a/src/webhookMetrics.ts +++ b/src/webhookMetrics.ts @@ -116,7 +116,20 @@ export function createWebhookMetrics(registry: Registry) { registers: [registry], }); - return { deliveryAttemptsTotal, deliveryLatencySeconds, dlqOperationsTotal, webhookBreakerState }; + const deliveryRetriesTotal = new Counter({ + name: 'webhook_delivery_retries_total', + help: 'Total number of webhook delivery retries due to transient failures', + labelNames: ['provider', 'reason'] as const, + registers: [registry], + }); + + return { + deliveryAttemptsTotal, + deliveryLatencySeconds, + deliveryRetriesTotal, + dlqOperationsTotal, + webhookBreakerState, + }; } export type WebhookMetrics = ReturnType;