From f6c0400e73cbf3e3b7beebef92befefb9e4fa66a Mon Sep 17 00:00:00 2001 From: bigben-7 <103938678+BigBen-7@users.noreply.github.com> Date: Thu, 18 Jun 2026 23:54:22 +0100 Subject: [PATCH] feat(risk-management): production-ready circuit breaker & health checks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #11 - Rewrote CircuitBreakerService with full CLOSED/OPEN/HALF_OPEN state machine, configurable thresholds (failure rate, slow-call rate) per protected service, and in-memory metrics (failure count, success count, last failure time, transition timestamps). - Half-open probe support: the first call after recovery time acts as a probe — success closes the circuit, failure reopens it. - EventEmitter2 integration: emits `circuit-breaker.opened` and `circuit-breaker.closed` on every state transition. RiskManagementService emits `risk.threshold.breached` for each alert generated so the alerts module can subscribe without coupling. - Added RiskManagementHealthIndicator extending HealthIndicator from @nestjs/terminus; reports OPEN state as unhealthy. - Created HealthModule/HealthController that wires the terminus health check into GET /api/v1/health, replacing the previous basic handler. - Registered EventEmitterModule.forRoot() in AppModule. - 19 unit tests covering all three circuit breaker states, event emissions, per-service isolation, and reset behaviour. --- src/app.controller.ts | 29 +-- src/app.module.ts | 7 + src/health/health.controller.ts | 19 ++ src/health/health.module.ts | 10 + .../circuit-breaker.service.spec.ts | 170 +++++++++++++++ .../circuit-breaker.service.ts | 200 +++++++++++++++--- .../risk-management/risk-management.health.ts | 24 +++ .../risk-management/risk-management.module.ts | 15 +- .../risk-management.service.ts | 8 +- 9 files changed, 417 insertions(+), 65 deletions(-) create mode 100644 src/health/health.controller.ts create mode 100644 src/health/health.module.ts create mode 100644 src/investment/risk-management/circuit-breaker.service.spec.ts create mode 100644 src/investment/risk-management/risk-management.health.ts diff --git a/src/app.controller.ts b/src/app.controller.ts index 96a6391..7efb73a 100644 --- a/src/app.controller.ts +++ b/src/app.controller.ts @@ -4,43 +4,16 @@ import { ApiOperation, ApiResponse, ApiBearerAuth, - ApiSecurity, } from "@nestjs/swagger"; import { AppService } from "./app.service"; import { RateLimit } from "./common/decorators/rate-limit.decorator"; import { JwtAuthGuard } from "./core/auth/jwt.guard"; -@ApiTags("Health") +@ApiTags("Info") @Controller() export class AppController { constructor(private readonly appService: AppService) {} - @Get("health") - @RateLimit({ level: "free", limit: 2, windowMs: 60000 }) // Max 2 requests per minute for health - @ApiOperation({ - summary: "Health Check", - description: "Check if the API is running and healthy", - operationId: "getHealth", - }) - @ApiResponse({ - status: 200, - description: "Service is healthy", - schema: { - type: "object", - properties: { - status: { type: "string", example: "OK" }, - timestamp: { type: "string", example: "2024-02-25T05:30:00.000Z" }, - }, - }, - }) - @ApiResponse({ - status: 429, - description: "Too many requests", - }) - getHealth(): { status: string; timestamp: string } { - return this.appService.getHealth(); - } - @Get("info") @RateLimit({ level: "standard" }) // Default standard level @ApiOperation({ diff --git a/src/app.module.ts b/src/app.module.ts index 7e9d984..d8d2c1d 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -4,6 +4,7 @@ import { TypeOrmModule } from "@nestjs/typeorm"; import { join } from "path"; import { APP_GUARD } from "@nestjs/core"; import { ThrottlerModule } from "@nestjs/throttler"; +import { EventEmitterModule } from "@nestjs/event-emitter"; import { validate } from "class-validator"; import { plainToInstance } from "class-transformer"; import { EnvironmentVariables } from "./config/env.validation"; @@ -32,6 +33,9 @@ import { DeFiModule } from "./defi/defi/defi.module"; // Modules – growth import { AlertsModule } from "./growth/alerts/alerts.module"; +// Modules – health +import { HealthModule } from "./health/health.module"; + // Auth entities import { User } from "./core/user/entities/user.entity"; import { EmailVerification } from "./core/auth/entities/email-verification.entity"; @@ -152,6 +156,8 @@ import { SubmissionVerifierService } from "./blockchain/oracle/submission-verifi }, }), + EventEmitterModule.forRoot(), + ThrottlerModule.forRoot({ throttlers: [ { name: 'global', ttl: 60_000, limit: 100 }, @@ -170,6 +176,7 @@ import { SubmissionVerifierService } from "./blockchain/oracle/submission-verifi RiskManagementModule, DeFiModule, AlertsModule, + HealthModule, ], controllers: [AppController], diff --git a/src/health/health.controller.ts b/src/health/health.controller.ts new file mode 100644 index 0000000..f75993b --- /dev/null +++ b/src/health/health.controller.ts @@ -0,0 +1,19 @@ +import { Controller, Get } from '@nestjs/common'; +import { HealthCheck, HealthCheckService } from '@nestjs/terminus'; +import { RiskManagementHealthIndicator } from '../investment/risk-management/risk-management.health'; + +@Controller('health') +export class HealthController { + constructor( + private readonly health: HealthCheckService, + private readonly riskHealth: RiskManagementHealthIndicator, + ) {} + + @Get() + @HealthCheck() + check() { + return this.health.check([ + () => this.riskHealth.isHealthy('risk-management'), + ]); + } +} diff --git a/src/health/health.module.ts b/src/health/health.module.ts new file mode 100644 index 0000000..d6fd27d --- /dev/null +++ b/src/health/health.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { TerminusModule } from '@nestjs/terminus'; +import { HealthController } from './health.controller'; +import { RiskManagementModule } from '../investment/risk-management/risk-management.module'; + +@Module({ + imports: [TerminusModule, RiskManagementModule], + controllers: [HealthController], +}) +export class HealthModule {} diff --git a/src/investment/risk-management/circuit-breaker.service.spec.ts b/src/investment/risk-management/circuit-breaker.service.spec.ts new file mode 100644 index 0000000..37f3939 --- /dev/null +++ b/src/investment/risk-management/circuit-breaker.service.spec.ts @@ -0,0 +1,170 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { CircuitBreakerService } from './circuit-breaker.service'; + +const mockEmitter = { emit: jest.fn() }; + +async function buildService(): Promise { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + CircuitBreakerService, + { provide: EventEmitter2, useValue: mockEmitter }, + ], + }).compile(); + return module.get(CircuitBreakerService); +} + +describe('CircuitBreakerService', () => { + let service: CircuitBreakerService; + + beforeEach(async () => { + jest.clearAllMocks(); + service = await buildService(); + }); + + describe('CLOSED state', () => { + it('starts in CLOSED state', () => { + expect(service.getStatus().state).toBe('CLOSED'); + }); + + it('stays CLOSED while failures are below threshold', () => { + for (let i = 0; i < 4; i++) service.recordFailure(); + expect(service.getStatus().state).toBe('CLOSED'); + expect(mockEmitter.emit).not.toHaveBeenCalledWith('circuit-breaker.opened', expect.anything()); + }); + + it('records success metrics in CLOSED state', () => { + service.recordSuccess(); + service.recordSuccess(); + expect(service.getStatus().successCount).toBe(2); + expect(service.getStatus().totalCallCount).toBe(2); + }); + + it('isOpen returns false in CLOSED state', () => { + expect(service.isOpen()).toBe(false); + }); + }); + + describe('OPEN state', () => { + beforeEach(() => { + for (let i = 0; i < 5; i++) service.recordFailure(); + }); + + it('transitions to OPEN after reaching failure threshold', () => { + expect(service.getStatus().state).toBe('OPEN'); + }); + + it('emits circuit-breaker.opened when transitioning to OPEN', () => { + expect(mockEmitter.emit).toHaveBeenCalledWith( + 'circuit-breaker.opened', + expect.objectContaining({ serviceName: 'default', failureCount: 5 }), + ); + }); + + it('isOpen returns true in OPEN state', () => { + // Use getStatus to verify OPEN without triggering auto-transition + expect(service.getStatus().state).toBe('OPEN'); + }); + + it('does not reset on recordSuccess while OPEN (no probe yet)', () => { + service.recordSuccess(); + // In OPEN state, recordSuccess increments counts but doesn't reset — only HALF_OPEN does + expect(service.getStatus().state).toBe('OPEN'); + }); + + it('transitions to HALF_OPEN after recovery time elapses', () => { + jest.useFakeTimers(); + jest.advanceTimersByTime(61_000); + service.isOpen(); // triggers the auto-transition check + expect(service.getStatus().state).toBe('HALF_OPEN'); + jest.useRealTimers(); + }); + }); + + describe('HALF_OPEN state', () => { + beforeEach(() => { + jest.useFakeTimers(); + for (let i = 0; i < 5; i++) service.recordFailure(); + jest.advanceTimersByTime(61_000); + service.isOpen(); // trigger OPEN → HALF_OPEN + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it('transitions to HALF_OPEN after recovery time', () => { + expect(service.getStatus().state).toBe('HALF_OPEN'); + }); + + it('transitions to CLOSED after a successful probe', () => { + service.recordSuccess(); + expect(service.getStatus().state).toBe('CLOSED'); + }); + + it('emits circuit-breaker.closed after successful probe', () => { + jest.clearAllMocks(); + service.recordSuccess(); + expect(mockEmitter.emit).toHaveBeenCalledWith( + 'circuit-breaker.closed', + expect.objectContaining({ serviceName: 'default' }), + ); + }); + + it('transitions back to OPEN after a failed probe', () => { + service.recordFailure(); + expect(service.getStatus().state).toBe('OPEN'); + }); + + it('emits circuit-breaker.opened after failed probe', () => { + jest.clearAllMocks(); + service.recordFailure(); + expect(mockEmitter.emit).toHaveBeenCalledWith( + 'circuit-breaker.opened', + expect.objectContaining({ serviceName: 'default' }), + ); + }); + }); + + describe('reset', () => { + it('resets counters and returns to CLOSED', () => { + for (let i = 0; i < 5; i++) service.recordFailure(); + service.reset(); + const status = service.getStatus(); + expect(status.state).toBe('CLOSED'); + expect(status.failureCount).toBe(0); + expect(status.totalCallCount).toBe(0); + }); + + it('emits circuit-breaker.closed when resetting from OPEN', () => { + for (let i = 0; i < 5; i++) service.recordFailure(); + jest.clearAllMocks(); + service.reset(); + expect(mockEmitter.emit).toHaveBeenCalledWith( + 'circuit-breaker.closed', + expect.objectContaining({ serviceName: 'default' }), + ); + }); + + it('does not emit circuit-breaker.closed when already CLOSED', () => { + service.reset(); + expect(mockEmitter.emit).not.toHaveBeenCalledWith('circuit-breaker.closed', expect.anything()); + }); + }); + + describe('per-service configuration', () => { + it('uses custom failureThreshold from configure()', () => { + service.configure('svc-a', { failureThreshold: 2 }); + service.recordFailure('svc-a'); + expect(service.getStatus('svc-a').state).toBe('CLOSED'); + service.recordFailure('svc-a'); + expect(service.getStatus('svc-a').state).toBe('OPEN'); + }); + + it('isolates state between named services', () => { + for (let i = 0; i < 5; i++) service.recordFailure('svc-x'); + expect(service.getStatus('svc-x').state).toBe('OPEN'); + expect(service.getStatus('svc-y').state).toBe('CLOSED'); + }); + }); +}); diff --git a/src/investment/risk-management/circuit-breaker.service.ts b/src/investment/risk-management/circuit-breaker.service.ts index 098f6b6..eb2e22d 100644 --- a/src/investment/risk-management/circuit-breaker.service.ts +++ b/src/investment/risk-management/circuit-breaker.service.ts @@ -1,50 +1,190 @@ import { Injectable, Logger } from '@nestjs/common'; +import { EventEmitter2 } from '@nestjs/event-emitter'; -type CircuitState = 'closed' | 'open' | 'half-open'; +export type CircuitState = 'CLOSED' | 'OPEN' | 'HALF_OPEN'; + +export interface CircuitBreakerConfig { + failureThreshold?: number; + slowCallDurationMs?: number; + slowCallRateThreshold?: number; + recoveryTimeMs?: number; +} + +interface ServiceCircuitState { + state: CircuitState; + failureCount: number; + successCount: number; + slowCallCount: number; + totalCallCount: number; + lastFailureTime?: Date; + lastTransitionTime: Date; + config: Required; +} + +const DEFAULT_CONFIG: Required = { + failureThreshold: 5, + slowCallDurationMs: 5000, + slowCallRateThreshold: 50, + recoveryTimeMs: 60000, +}; + +const MIN_CALLS_FOR_SLOW_RATE = 10; @Injectable() export class CircuitBreakerService { private readonly logger = new Logger(CircuitBreakerService.name); - private state: CircuitState = 'closed'; - private failureCount = 0; - private lastFailureTime?: Date; - private readonly failureThreshold = 5; - private readonly recoveryTimeMs = 60000; // 1 minute + private readonly services = new Map(); + + constructor(private readonly eventEmitter: EventEmitter2) {} - isOpen(): boolean { - if (this.state === 'open') { - // Auto-transition to half-open after recovery time - if (this.lastFailureTime && Date.now() - this.lastFailureTime.getTime() > this.recoveryTimeMs) { - this.state = 'half-open'; - this.logger.log('Circuit breaker transitioned to half-open'); - } + configure(serviceName: string, config: CircuitBreakerConfig): void { + const merged = { ...DEFAULT_CONFIG, ...config }; + const existing = this.services.get(serviceName); + if (existing) { + existing.config = merged; + } else { + this.services.set(serviceName, this.createInitialState(merged)); } - return this.state === 'open'; } - recordSuccess(): void { - if (this.state === 'half-open') { - this.reset(); + isOpen(serviceName = 'default'): boolean { + const service = this.getOrCreate(serviceName); + if ( + service.state === 'OPEN' && + service.lastFailureTime && + Date.now() - service.lastFailureTime.getTime() > service.config.recoveryTimeMs + ) { + this.transitionTo(serviceName, service, 'HALF_OPEN'); } + return service.state === 'OPEN'; + } + + isHalfOpen(serviceName = 'default'): boolean { + return this.getOrCreate(serviceName).state === 'HALF_OPEN'; } - recordFailure(): void { - this.failureCount++; - this.lastFailureTime = new Date(); - if (this.failureCount >= this.failureThreshold) { - this.state = 'open'; - this.logger.warn(`Circuit breaker OPENED after ${this.failureCount} failures`); + recordSuccess(serviceName = 'default', durationMs?: number): void { + const service = this.getOrCreate(serviceName); + service.totalCallCount++; + service.successCount++; + + if (durationMs !== undefined && durationMs > service.config.slowCallDurationMs) { + service.slowCallCount++; + this.checkSlowCallRate(serviceName, service); + } + + if (service.state === 'HALF_OPEN') { + this.reset(serviceName); } } - reset(): void { - this.state = 'closed'; - this.failureCount = 0; - this.lastFailureTime = undefined; - this.logger.log('Circuit breaker reset to closed'); + recordFailure(serviceName = 'default'): void { + const service = this.getOrCreate(serviceName); + service.failureCount++; + service.totalCallCount++; + service.lastFailureTime = new Date(); + + if (service.state === 'HALF_OPEN') { + this.transitionTo(serviceName, service, 'OPEN'); + } else if (service.state === 'CLOSED' && service.failureCount >= service.config.failureThreshold) { + this.transitionTo(serviceName, service, 'OPEN'); + } } - getStatus(): { state: CircuitState; failureCount: number; lastFailureTime?: Date } { - return { state: this.state, failureCount: this.failureCount, lastFailureTime: this.lastFailureTime }; + reset(serviceName = 'default'): void { + const service = this.getOrCreate(serviceName); + const wasOpen = service.state !== 'CLOSED'; + service.state = 'CLOSED'; + service.failureCount = 0; + service.successCount = 0; + service.slowCallCount = 0; + service.totalCallCount = 0; + service.lastFailureTime = undefined; + service.lastTransitionTime = new Date(); + + if (wasOpen) { + this.eventEmitter.emit('circuit-breaker.closed', { + serviceName, + timestamp: service.lastTransitionTime, + }); + } + this.logger.log(`Circuit breaker [${serviceName}] reset to CLOSED`); + } + + getStatus(serviceName = 'default'): { + state: CircuitState; + failureCount: number; + successCount: number; + slowCallCount: number; + totalCallCount: number; + lastFailureTime?: Date; + lastTransitionTime: Date; + } { + const s = this.getOrCreate(serviceName); + return { + state: s.state, + failureCount: s.failureCount, + successCount: s.successCount, + slowCallCount: s.slowCallCount, + totalCallCount: s.totalCallCount, + lastFailureTime: s.lastFailureTime, + lastTransitionTime: s.lastTransitionTime, + }; + } + + getAllStatuses(): Record> { + const result: Record> = {}; + for (const [name] of this.services) { + result[name] = this.getStatus(name); + } + return result; + } + + private getOrCreate(serviceName: string): ServiceCircuitState { + if (!this.services.has(serviceName)) { + this.services.set(serviceName, this.createInitialState(DEFAULT_CONFIG)); + } + return this.services.get(serviceName)!; + } + + private createInitialState(config: Required): ServiceCircuitState { + return { + state: 'CLOSED', + failureCount: 0, + successCount: 0, + slowCallCount: 0, + totalCallCount: 0, + lastTransitionTime: new Date(), + config, + }; + } + + private transitionTo(serviceName: string, service: ServiceCircuitState, newState: CircuitState): void { + const prev = service.state; + service.state = newState; + service.lastTransitionTime = new Date(); + this.logger.log(`Circuit breaker [${serviceName}]: ${prev} → ${newState}`); + + if (newState === 'OPEN') { + this.logger.warn(`Circuit breaker [${serviceName}] OPENED after ${service.failureCount} failures`); + this.eventEmitter.emit('circuit-breaker.opened', { + serviceName, + failureCount: service.failureCount, + timestamp: service.lastTransitionTime, + }); + } else if (newState === 'CLOSED') { + this.eventEmitter.emit('circuit-breaker.closed', { + serviceName, + timestamp: service.lastTransitionTime, + }); + } + } + + private checkSlowCallRate(serviceName: string, service: ServiceCircuitState): void { + if (service.totalCallCount < MIN_CALLS_FOR_SLOW_RATE) return; + const slowRate = (service.slowCallCount / service.totalCallCount) * 100; + if (slowRate >= service.config.slowCallRateThreshold && service.state === 'CLOSED') { + this.transitionTo(serviceName, service, 'OPEN'); + } } } diff --git a/src/investment/risk-management/risk-management.health.ts b/src/investment/risk-management/risk-management.health.ts new file mode 100644 index 0000000..c8159d9 --- /dev/null +++ b/src/investment/risk-management/risk-management.health.ts @@ -0,0 +1,24 @@ +import { Injectable } from '@nestjs/common'; +import { HealthCheckError, HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus'; +import { CircuitBreakerService } from './circuit-breaker.service'; + +@Injectable() +export class RiskManagementHealthIndicator extends HealthIndicator { + constructor(private readonly circuitBreaker: CircuitBreakerService) { + super(); + } + + async isHealthy(key: string): Promise { + const status = this.circuitBreaker.getStatus('default'); + const isHealthy = status.state !== 'OPEN'; + + const result = this.getStatus(key, isHealthy, { + circuitBreakerState: status.state, + failureCount: status.failureCount, + lastFailureTime: status.lastFailureTime, + }); + + if (isHealthy) return result; + throw new HealthCheckError('Risk management circuit breaker is open', result); + } +} diff --git a/src/investment/risk-management/risk-management.module.ts b/src/investment/risk-management/risk-management.module.ts index 06df1f3..a816fc2 100644 --- a/src/investment/risk-management/risk-management.module.ts +++ b/src/investment/risk-management/risk-management.module.ts @@ -1,11 +1,14 @@ -import { Module } from "@nestjs/common"; -import { RiskManagementService } from "./risk-management.service"; -import { RiskManagementController } from "./risk-management.controller"; -import { CircuitBreakerService } from "./circuit-breaker.service"; +import { Module } from '@nestjs/common'; +import { EventEmitterModule } from '@nestjs/event-emitter'; +import { RiskManagementService } from './risk-management.service'; +import { RiskManagementController } from './risk-management.controller'; +import { CircuitBreakerService } from './circuit-breaker.service'; +import { RiskManagementHealthIndicator } from './risk-management.health'; @Module({ + imports: [EventEmitterModule.forRoot()], controllers: [RiskManagementController], - providers: [RiskManagementService, CircuitBreakerService], - exports: [RiskManagementService, CircuitBreakerService], + providers: [RiskManagementService, CircuitBreakerService, RiskManagementHealthIndicator], + exports: [RiskManagementService, CircuitBreakerService, RiskManagementHealthIndicator], }) export class RiskManagementModule {} diff --git a/src/investment/risk-management/risk-management.service.ts b/src/investment/risk-management/risk-management.service.ts index 4cb5a56..5d3318e 100644 --- a/src/investment/risk-management/risk-management.service.ts +++ b/src/investment/risk-management/risk-management.service.ts @@ -1,4 +1,5 @@ import { Injectable, Logger } from "@nestjs/common"; +import { EventEmitter2 } from "@nestjs/event-emitter"; import { RiskConfigDto, PortfolioRiskDto, @@ -18,9 +19,10 @@ interface Position { @Injectable() export class RiskManagementService { private readonly logger = new Logger(RiskManagementService.name); - private readonly riskConfigs = new Map(); + constructor(private readonly eventEmitter: EventEmitter2) {} + setRiskConfig(dto: RiskConfigDto): void { this.riskConfigs.set(dto.userId, dto); this.logger.log(`Risk config updated for user ${dto.userId}`); @@ -273,6 +275,10 @@ export class RiskManagementService { } } + for (const alert of alerts) { + this.eventEmitter.emit('risk.threshold.breached', { userId, alert }); + } + return alerts; } }