From cf7b45ea9090c6b6c6289e675f19c3406d0883fe Mon Sep 17 00:00:00 2001 From: cybermaxi7 Date: Thu, 18 Jun 2026 12:10:31 +0100 Subject: [PATCH] feat: add siem integration framework and multi-chain monitoring abstraction closes #132: siem integration - ISiemProvider interface, SiemEvent type, config DTOs - SplunkSiemProvider (HEC), ElasticSiemProvider (bulk api / ECS) - SiemService fans out to all providers and isolates failures - SiemModule registers providers conditionally from env vars - 6 unit tests: forwarding, failure isolation, health checks closes #135: multi-chain monitoring - IChainMonitor interface with chainId, normalizeEvent, subscribe, isHealthy - NormalizedChainEvent shared event shape for downstream consumers - StellarChainMonitor reference implementation via horizon polling - ChainRegistryService routes events by chainId, unified subscribeAll - ChainsModule with documented extension points for new chains - 8 unit tests: registry, routing, subscription, health - register SiemModule and ChainsModule in AppModule --- apps/backend/src/app.module.ts | 4 + .../integrations/siem/dto/siem-config.dto.ts | 23 +++++ .../siem/interfaces/siem-event.interface.ts | 19 ++++ .../interfaces/siem-provider.interface.ts | 17 ++++ .../siem/providers/elastic.siem-provider.ts | 84 ++++++++++++++++ .../siem/providers/splunk.siem-provider.ts | 68 +++++++++++++ .../src/integrations/siem/siem.module.ts | 52 ++++++++++ .../integrations/siem/siem.service.spec.ts | 76 +++++++++++++++ .../src/integrations/siem/siem.service.ts | 56 +++++++++++ .../chains/chain-registry.service.spec.ts | 88 +++++++++++++++++ .../modules/chains/chain-registry.service.ts | 68 +++++++++++++ .../src/modules/chains/chains.module.ts | 34 +++++++ .../interfaces/chain-monitor.interface.ts | 36 +++++++ .../normalized-chain-event.interface.ts | 25 +++++ .../chains/monitors/stellar.chain-monitor.ts | 96 +++++++++++++++++++ 15 files changed, 746 insertions(+) create mode 100644 apps/backend/src/integrations/siem/dto/siem-config.dto.ts create mode 100644 apps/backend/src/integrations/siem/interfaces/siem-event.interface.ts create mode 100644 apps/backend/src/integrations/siem/interfaces/siem-provider.interface.ts create mode 100644 apps/backend/src/integrations/siem/providers/elastic.siem-provider.ts create mode 100644 apps/backend/src/integrations/siem/providers/splunk.siem-provider.ts create mode 100644 apps/backend/src/integrations/siem/siem.module.ts create mode 100644 apps/backend/src/integrations/siem/siem.service.spec.ts create mode 100644 apps/backend/src/integrations/siem/siem.service.ts create mode 100644 apps/backend/src/modules/chains/chain-registry.service.spec.ts create mode 100644 apps/backend/src/modules/chains/chain-registry.service.ts create mode 100644 apps/backend/src/modules/chains/chains.module.ts create mode 100644 apps/backend/src/modules/chains/interfaces/chain-monitor.interface.ts create mode 100644 apps/backend/src/modules/chains/interfaces/normalized-chain-event.interface.ts create mode 100644 apps/backend/src/modules/chains/monitors/stellar.chain-monitor.ts diff --git a/apps/backend/src/app.module.ts b/apps/backend/src/app.module.ts index 88bae4d..df9fb80 100644 --- a/apps/backend/src/app.module.ts +++ b/apps/backend/src/app.module.ts @@ -5,6 +5,8 @@ import { HealthModule } from './modules/health/health.module'; import { NotificationsModule } from './modules/notifications/notifications.module'; import { ReportingModule } from './modules/reporting/reporting.module'; import { DependencyTrackerModule } from './modules/contracts/dependencies/dependency-tracker.module'; +import { SiemModule } from './integrations/siem/siem.module'; +import { ChainsModule } from './modules/chains/chains.module'; @Module({ imports: [ @@ -13,6 +15,8 @@ import { DependencyTrackerModule } from './modules/contracts/dependencies/depend NotificationsModule, ReportingModule, DependencyTrackerModule, + SiemModule, + ChainsModule, ], controllers: [AppController], }) diff --git a/apps/backend/src/integrations/siem/dto/siem-config.dto.ts b/apps/backend/src/integrations/siem/dto/siem-config.dto.ts new file mode 100644 index 0000000..3ee398c --- /dev/null +++ b/apps/backend/src/integrations/siem/dto/siem-config.dto.ts @@ -0,0 +1,23 @@ +/** + * Configuration for the Splunk HTTP Event Collector (HEC) provider. + */ +export interface SplunkSiemConfig { + /** Full URL to the Splunk HEC endpoint (e.g. https://splunk.corp:8088/services/collector). */ + hecUrl: string; + /** HEC token used for authentication. */ + hecToken: string; + /** Splunk source type tag applied to every event (default: "sentinel:security"). */ + sourceType?: string; +} + +/** + * Configuration for the Elastic SIEM (ECS) provider. + */ +export interface ElasticSiemConfig { + /** Full URL to the Elasticsearch cluster (e.g. https://elastic.corp:9200). */ + elasticUrl: string; + /** Elasticsearch API key for authentication. */ + apiKey: string; + /** Target index for Sentinel events (default: "sentinel-events"). */ + index?: string; +} diff --git a/apps/backend/src/integrations/siem/interfaces/siem-event.interface.ts b/apps/backend/src/integrations/siem/interfaces/siem-event.interface.ts new file mode 100644 index 0000000..6fd47fd --- /dev/null +++ b/apps/backend/src/integrations/siem/interfaces/siem-event.interface.ts @@ -0,0 +1,19 @@ +/** + * Normalized security event forwarded to SIEM platforms. + * All provider adapters receive this common shape. + */ +export interface SiemEvent { + /** ISO-8601 timestamp of when the event occurred. */ + timestamp: string; + /** Short machine-readable event type (e.g. "suspicious_transaction"). */ + eventType: string; + /** Human-readable summary. */ + title: string; + /** Full description with contextual detail. */ + message: string; + severity: 'low' | 'medium' | 'high' | 'critical'; + /** Source chain or system (e.g. "stellar", "ethereum", "internal"). */ + source: string; + /** Arbitrary key-value pairs for provider-specific enrichment. */ + metadata?: Record; +} diff --git a/apps/backend/src/integrations/siem/interfaces/siem-provider.interface.ts b/apps/backend/src/integrations/siem/interfaces/siem-provider.interface.ts new file mode 100644 index 0000000..8f16c45 --- /dev/null +++ b/apps/backend/src/integrations/siem/interfaces/siem-provider.interface.ts @@ -0,0 +1,17 @@ +import { SiemEvent } from './siem-event.interface'; + +/** + * Contract every SIEM provider adapter must implement. + * Add new platforms (QRadar, Sentinel, etc.) by implementing this interface + * and registering the adapter in SiemModule — no changes to SiemService needed. + */ +export interface ISiemProvider { + /** Unique identifier for this provider (e.g. "splunk", "elastic"). */ + readonly providerName: string; + + /** Forward a normalized security event to the SIEM platform. */ + forwardEvent(event: SiemEvent): Promise; + + /** Return true when the provider endpoint is reachable and configured. */ + isHealthy(): Promise; +} diff --git a/apps/backend/src/integrations/siem/providers/elastic.siem-provider.ts b/apps/backend/src/integrations/siem/providers/elastic.siem-provider.ts new file mode 100644 index 0000000..4aca774 --- /dev/null +++ b/apps/backend/src/integrations/siem/providers/elastic.siem-provider.ts @@ -0,0 +1,84 @@ +import { Injectable, Logger } from '@nestjs/common'; +import axios from 'axios'; +import { ISiemProvider } from '../interfaces/siem-provider.interface'; +import { SiemEvent } from '../interfaces/siem-event.interface'; +import { ElasticSiemConfig } from '../dto/siem-config.dto'; + +/** + * Forwards Sentinel security events to Elasticsearch using the ECS-aligned + * Bulk API. Events land in a configurable index for Elastic SIEM rules to consume. + * + * Environment variables: + * ELASTIC_URL — Elasticsearch cluster URL (e.g. https://elastic.corp:9200) + * ELASTIC_API_KEY — API key for authentication + * ELASTIC_INDEX — target index (default: "sentinel-events") + */ +@Injectable() +export class ElasticSiemProvider implements ISiemProvider { + readonly providerName = 'elastic'; + private readonly logger = new Logger(ElasticSiemProvider.name); + private readonly index: string; + + constructor(private readonly config: ElasticSiemConfig) { + this.index = config.index ?? 'sentinel-events'; + } + + async forwardEvent(event: SiemEvent): Promise { + const doc = { + '@timestamp': event.timestamp, + 'event.kind': 'alert', + 'event.category': 'intrusion_detection', + 'event.type': event.eventType, + 'event.severity': this.severityToNumeric(event.severity), + message: event.message, + labels: { + title: event.title, + source: event.source, + severity: event.severity, + }, + ...event.metadata, + }; + + const body = + [JSON.stringify({ index: { _index: this.index } }), JSON.stringify(doc)].join('\n') + '\n'; + + try { + await axios.post(`${this.config.elasticUrl}/_bulk`, body, { + headers: { + Authorization: `ApiKey ${this.config.apiKey}`, + 'Content-Type': 'application/x-ndjson', + }, + }); + this.logger.log(`Elastic: forwarded event "${event.eventType}"`); + } catch (error) { + const message = axios.isAxiosError(error) + ? (error.response?.data?.error?.reason ?? error.message) + : String(error); + this.logger.error(`Elastic: forwardEvent failed: ${message}`); + throw new Error(`ElasticSiemProvider.forwardEvent failed: ${message}`); + } + } + + async isHealthy(): Promise { + try { + const response = await axios.get(`${this.config.elasticUrl}/_cluster/health`, { + headers: { Authorization: `ApiKey ${this.config.apiKey}` }, + }); + const status: string = response.data?.status ?? 'red'; + return status !== 'red'; + } catch (error) { + this.logger.warn(`Elastic health check failed: ${String(error)}`); + return false; + } + } + + private severityToNumeric(severity: SiemEvent['severity']): number { + const map: Record = { + low: 25, + medium: 50, + high: 75, + critical: 100, + }; + return map[severity]; + } +} diff --git a/apps/backend/src/integrations/siem/providers/splunk.siem-provider.ts b/apps/backend/src/integrations/siem/providers/splunk.siem-provider.ts new file mode 100644 index 0000000..b2dd767 --- /dev/null +++ b/apps/backend/src/integrations/siem/providers/splunk.siem-provider.ts @@ -0,0 +1,68 @@ +import { Injectable, Logger } from '@nestjs/common'; +import axios from 'axios'; +import { ISiemProvider } from '../interfaces/siem-provider.interface'; +import { SiemEvent } from '../interfaces/siem-event.interface'; +import { SplunkSiemConfig } from '../dto/siem-config.dto'; + +/** + * Forwards Sentinel security events to Splunk via the HTTP Event Collector (HEC). + * + * Environment variables: + * SPLUNK_HEC_URL — HEC endpoint (e.g. https://splunk.corp:8088/services/collector) + * SPLUNK_HEC_TOKEN — HEC authentication token + * SPLUNK_SOURCE_TYPE — optional source type tag (default: "sentinel:security") + */ +@Injectable() +export class SplunkSiemProvider implements ISiemProvider { + readonly providerName = 'splunk'; + private readonly logger = new Logger(SplunkSiemProvider.name); + private readonly sourceType: string; + + constructor(private readonly config: SplunkSiemConfig) { + this.sourceType = config.sourceType ?? 'sentinel:security'; + } + + async forwardEvent(event: SiemEvent): Promise { + const body = { + time: Math.floor(new Date(event.timestamp).getTime() / 1000), + sourcetype: this.sourceType, + event: { + event_type: event.eventType, + title: event.title, + message: event.message, + severity: event.severity, + source: event.source, + ...event.metadata, + }, + }; + + try { + await axios.post(this.config.hecUrl, body, { + headers: { + Authorization: `Splunk ${this.config.hecToken}`, + 'Content-Type': 'application/json', + }, + }); + this.logger.log(`Splunk: forwarded event "${event.eventType}"`); + } catch (error) { + const message = axios.isAxiosError(error) + ? (error.response?.data?.text ?? error.message) + : String(error); + this.logger.error(`Splunk: forwardEvent failed: ${message}`); + throw new Error(`SplunkSiemProvider.forwardEvent failed: ${message}`); + } + } + + async isHealthy(): Promise { + try { + await axios.get(this.config.hecUrl, { + headers: { Authorization: `Splunk ${this.config.hecToken}` }, + validateStatus: status => status < 500, + }); + return true; + } catch (error) { + this.logger.warn(`Splunk health check failed: ${String(error)}`); + return false; + } + } +} diff --git a/apps/backend/src/integrations/siem/siem.module.ts b/apps/backend/src/integrations/siem/siem.module.ts new file mode 100644 index 0000000..56853af --- /dev/null +++ b/apps/backend/src/integrations/siem/siem.module.ts @@ -0,0 +1,52 @@ +import { Module } from '@nestjs/common'; +import { SiemService } from './siem.service'; +import { SplunkSiemProvider } from './providers/splunk.siem-provider'; +import { ElasticSiemProvider } from './providers/elastic.siem-provider'; +import { ISiemProvider } from './interfaces/siem-provider.interface'; + +/** + * SIEM Integration Module. + * + * Providers are registered conditionally based on environment variables so + * deployments without a SIEM platform incur no overhead. + * + * To add a new provider: + * 1. Implement ISiemProvider in providers/ + * 2. Add its config env vars to .env.example + * 3. Register it in the SIEM_PROVIDERS factory below + */ +@Module({ + providers: [ + SiemService, + { + provide: 'SIEM_PROVIDERS', + useFactory: (): ISiemProvider[] => { + const providers: ISiemProvider[] = []; + + if (process.env.SPLUNK_HEC_URL && process.env.SPLUNK_HEC_TOKEN) { + providers.push( + new SplunkSiemProvider({ + hecUrl: process.env.SPLUNK_HEC_URL, + hecToken: process.env.SPLUNK_HEC_TOKEN, + sourceType: process.env.SPLUNK_SOURCE_TYPE, + }), + ); + } + + if (process.env.ELASTIC_URL && process.env.ELASTIC_API_KEY) { + providers.push( + new ElasticSiemProvider({ + elasticUrl: process.env.ELASTIC_URL, + apiKey: process.env.ELASTIC_API_KEY, + index: process.env.ELASTIC_INDEX, + }), + ); + } + + return providers; + }, + }, + ], + exports: [SiemService], +}) +export class SiemModule {} diff --git a/apps/backend/src/integrations/siem/siem.service.spec.ts b/apps/backend/src/integrations/siem/siem.service.spec.ts new file mode 100644 index 0000000..8d10d18 --- /dev/null +++ b/apps/backend/src/integrations/siem/siem.service.spec.ts @@ -0,0 +1,76 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { SiemService } from './siem.service'; +import { ISiemProvider } from './interfaces/siem-provider.interface'; +import { SiemEvent } from './interfaces/siem-event.interface'; + +const makeEvent = (overrides: Partial = {}): SiemEvent => ({ + timestamp: new Date().toISOString(), + eventType: 'test_event', + title: 'Test Event', + message: 'A test security event', + severity: 'low', + source: 'stellar', + ...overrides, +}); + +const makeProvider = (name: string, healthy = true): jest.Mocked => ({ + providerName: name, + forwardEvent: jest.fn().mockResolvedValue(undefined), + isHealthy: jest.fn().mockResolvedValue(healthy), +}); + +describe('SiemService', () => { + let service: SiemService; + let providerA: jest.Mocked; + let providerB: jest.Mocked; + + beforeEach(async () => { + providerA = makeProvider('splunk'); + providerB = makeProvider('elastic'); + + const module: TestingModule = await Test.createTestingModule({ + providers: [SiemService, { provide: 'SIEM_PROVIDERS', useValue: [providerA, providerB] }], + }).compile(); + + service = module.get(SiemService); + }); + + it('forwards events to all providers', async () => { + const event = makeEvent(); + await service.forwardEvent(event); + expect(providerA.forwardEvent).toHaveBeenCalledWith(event); + expect(providerB.forwardEvent).toHaveBeenCalledWith(event); + }); + + it('continues delivery when one provider fails', async () => { + providerA.forwardEvent.mockRejectedValue(new Error('network error')); + const event = makeEvent({ severity: 'critical' }); + await expect(service.forwardEvent(event)).resolves.not.toThrow(); + expect(providerB.forwardEvent).toHaveBeenCalledWith(event); + }); + + it('logs a warning and skips when no providers are configured', async () => { + const emptyModule = await Test.createTestingModule({ + providers: [SiemService, { provide: 'SIEM_PROVIDERS', useValue: [] }], + }).compile(); + + const emptyService = emptyModule.get(SiemService); + await expect(emptyService.forwardEvent(makeEvent())).resolves.not.toThrow(); + }); + + it('returns health status for all providers', async () => { + providerB.isHealthy.mockResolvedValue(false); + const health = await service.getProvidersHealth(); + expect(health).toEqual({ splunk: true, elastic: false }); + }); + + it('returns false for a provider whose health check throws', async () => { + providerA.isHealthy.mockRejectedValue(new Error('timeout')); + const health = await service.getProvidersHealth(); + expect(health.splunk).toBe(false); + }); + + it('returns all provider names', () => { + expect(service.getProviderNames()).toEqual(['splunk', 'elastic']); + }); +}); diff --git a/apps/backend/src/integrations/siem/siem.service.ts b/apps/backend/src/integrations/siem/siem.service.ts new file mode 100644 index 0000000..8204566 --- /dev/null +++ b/apps/backend/src/integrations/siem/siem.service.ts @@ -0,0 +1,56 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { ISiemProvider } from './interfaces/siem-provider.interface'; +import { SiemEvent } from './interfaces/siem-event.interface'; + +/** + * Orchestrates security event forwarding across all registered SIEM providers. + * Callers never reference a concrete provider — swap or add adapters in + * SiemModule without touching this service. + */ +@Injectable() +export class SiemService { + private readonly logger = new Logger(SiemService.name); + + constructor( + @Inject('SIEM_PROVIDERS') + private readonly providers: ISiemProvider[], + ) {} + + /** + * Forward a security event to every registered SIEM provider. + * Individual failures are logged but do not abort delivery to remaining providers. + */ + async forwardEvent(event: SiemEvent): Promise { + if (this.providers.length === 0) { + this.logger.warn('No SIEM providers configured — event not forwarded'); + return; + } + + const results = await Promise.allSettled( + this.providers.map(provider => provider.forwardEvent(event)), + ); + + results.forEach((result, index) => { + const name = this.providers[index].providerName; + if (result.status === 'rejected') { + this.logger.error(`SIEM provider "${name}" failed: ${String(result.reason)}`); + } + }); + } + + /** Returns the health status of every registered SIEM provider. */ + async getProvidersHealth(): Promise> { + const entries = await Promise.all( + this.providers.map(async provider => [ + provider.providerName, + await provider.isHealthy().catch(() => false), + ]), + ); + return Object.fromEntries(entries); + } + + /** Returns the names of all registered providers. */ + getProviderNames(): string[] { + return this.providers.map(p => p.providerName); + } +} diff --git a/apps/backend/src/modules/chains/chain-registry.service.spec.ts b/apps/backend/src/modules/chains/chain-registry.service.spec.ts new file mode 100644 index 0000000..0e805f1 --- /dev/null +++ b/apps/backend/src/modules/chains/chain-registry.service.spec.ts @@ -0,0 +1,88 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ChainRegistryService } from './chain-registry.service'; +import { IChainMonitor } from './interfaces/chain-monitor.interface'; +import { NormalizedChainEvent } from './interfaces/normalized-chain-event.interface'; + +const makeEvent = (chainId: string): NormalizedChainEvent => ({ + timestamp: new Date().toISOString(), + chainId, + eventType: 'payment', + txHash: 'abc123', + from: 'GABC', + raw: {}, +}); + +const makeMonitor = (chainId: string, healthy = true): jest.Mocked => ({ + chainId, + normalizeEvent: jest.fn(_raw => makeEvent(chainId)), + subscribe: jest.fn().mockResolvedValue(undefined), + isHealthy: jest.fn().mockResolvedValue(healthy), +}); + +describe('ChainRegistryService', () => { + let service: ChainRegistryService; + let stellar: jest.Mocked; + let ethereum: jest.Mocked; + + beforeEach(async () => { + stellar = makeMonitor('stellar'); + ethereum = makeMonitor('ethereum'); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + ChainRegistryService, + { provide: 'CHAIN_MONITORS', useValue: [stellar, ethereum] }, + ], + }).compile(); + + service = module.get(ChainRegistryService); + }); + + it('registers all monitors and returns their chain IDs', () => { + expect(service.getChainIds()).toEqual(expect.arrayContaining(['stellar', 'ethereum'])); + }); + + it('returns the correct monitor for a given chainId', () => { + expect(service.getMonitor('stellar')).toBe(stellar); + expect(service.getMonitor('ethereum')).toBe(ethereum); + }); + + it('returns undefined for an unknown chainId', () => { + expect(service.getMonitor('polygon')).toBeUndefined(); + }); + + it('calls subscribe on all monitors when subscribeAll is invoked', async () => { + const callback = jest.fn(); + await service.subscribeAll(callback); + expect(stellar.subscribe).toHaveBeenCalledWith(callback); + expect(ethereum.subscribe).toHaveBeenCalledWith(callback); + }); + + it('continues subscribeAll when one monitor throws', async () => { + stellar.subscribe.mockRejectedValue(new Error('connection refused')); + const callback = jest.fn(); + await expect(service.subscribeAll(callback)).resolves.not.toThrow(); + expect(ethereum.subscribe).toHaveBeenCalled(); + }); + + it('returns health status for all monitors', async () => { + ethereum.isHealthy.mockResolvedValue(false); + const health = await service.getHealth(); + expect(health).toEqual({ stellar: true, ethereum: false }); + }); + + it('returns false for a monitor whose health check throws', async () => { + stellar.isHealthy.mockRejectedValue(new Error('timeout')); + const health = await service.getHealth(); + expect(health.stellar).toBe(false); + }); + + it('warns and resolves when no monitors are registered', async () => { + const emptyModule = await Test.createTestingModule({ + providers: [ChainRegistryService, { provide: 'CHAIN_MONITORS', useValue: [] }], + }).compile(); + + const emptyService = emptyModule.get(ChainRegistryService); + await expect(emptyService.subscribeAll(jest.fn())).resolves.not.toThrow(); + }); +}); diff --git a/apps/backend/src/modules/chains/chain-registry.service.ts b/apps/backend/src/modules/chains/chain-registry.service.ts new file mode 100644 index 0000000..ca7a0e2 --- /dev/null +++ b/apps/backend/src/modules/chains/chain-registry.service.ts @@ -0,0 +1,68 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { IChainMonitor } from './interfaces/chain-monitor.interface'; +import { NormalizedChainEvent } from './interfaces/normalized-chain-event.interface'; + +/** + * Central registry for all chain monitors. + * + * Routes events to the correct monitor by chainId and provides a unified + * subscription point so consumers receive normalized events from every chain + * through a single callback. + */ +@Injectable() +export class ChainRegistryService { + private readonly logger = new Logger(ChainRegistryService.name); + private readonly registry = new Map(); + + constructor( + @Inject('CHAIN_MONITORS') + monitors: IChainMonitor[], + ) { + for (const monitor of monitors) { + this.registry.set(monitor.chainId, monitor); + this.logger.log(`Registered chain monitor: ${monitor.chainId}`); + } + } + + /** Return the monitor registered for the given chainId, or undefined. */ + getMonitor(chainId: string): IChainMonitor | undefined { + return this.registry.get(chainId); + } + + /** Return all registered chain IDs. */ + getChainIds(): string[] { + return Array.from(this.registry.keys()); + } + + /** + * Subscribe to normalized events from every registered chain. + * The same callback receives events from all chains; use event.chainId to filter. + */ + async subscribeAll(onEvent: (event: NormalizedChainEvent) => void): Promise { + if (this.registry.size === 0) { + this.logger.warn('ChainRegistry: no monitors registered'); + return; + } + + await Promise.all( + Array.from(this.registry.values()).map(monitor => + monitor.subscribe(onEvent).catch((err: unknown) => { + this.logger.error( + `ChainRegistry: monitor "${monitor.chainId}" subscribe error: ${String(err)}`, + ); + }), + ), + ); + } + + /** Returns a health map for all registered chain monitors. */ + async getHealth(): Promise> { + const entries = await Promise.all( + Array.from(this.registry.entries()).map(async ([chainId, monitor]) => [ + chainId, + await monitor.isHealthy().catch(() => false), + ]), + ); + return Object.fromEntries(entries); + } +} diff --git a/apps/backend/src/modules/chains/chains.module.ts b/apps/backend/src/modules/chains/chains.module.ts new file mode 100644 index 0000000..4547ee6 --- /dev/null +++ b/apps/backend/src/modules/chains/chains.module.ts @@ -0,0 +1,34 @@ +import { Module } from '@nestjs/common'; +import { ChainRegistryService } from './chain-registry.service'; +import { StellarChainMonitor } from './monitors/stellar.chain-monitor'; +import { IChainMonitor } from './interfaces/chain-monitor.interface'; + +/** + * Multi-Chain Monitoring Module. + * + * Extension points are documented in IChainMonitor. + * To add a new chain: + * 1. Implement IChainMonitor in monitors/ + * 2. Instantiate and add it to the CHAIN_MONITORS array below + * 3. ChainRegistryService picks it up automatically + */ +@Module({ + providers: [ + ChainRegistryService, + { + provide: 'CHAIN_MONITORS', + useFactory: (): IChainMonitor[] => { + const monitors: IChainMonitor[] = [new StellarChainMonitor()]; + + // Future chains — uncomment to activate: + // if (process.env.ETHEREUM_RPC_URL) { + // monitors.push(new EthereumChainMonitor(process.env.ETHEREUM_RPC_URL)); + // } + + return monitors; + }, + }, + ], + exports: [ChainRegistryService], +}) +export class ChainsModule {} diff --git a/apps/backend/src/modules/chains/interfaces/chain-monitor.interface.ts b/apps/backend/src/modules/chains/interfaces/chain-monitor.interface.ts new file mode 100644 index 0000000..7b91c5e --- /dev/null +++ b/apps/backend/src/modules/chains/interfaces/chain-monitor.interface.ts @@ -0,0 +1,36 @@ +import { NormalizedChainEvent } from './normalized-chain-event.interface'; + +/** + * Contract every chain monitor must implement. + * + * Extension points: + * - `chainId` — uniquely identifies the chain in the registry + * - `normalizeEvent` — translates a raw chain payload to NormalizedChainEvent + * - `subscribe` — starts emitting events via the provided callback + * - `isHealthy` — verifies RPC/node connectivity + * + * To add a new chain: + * 1. Implement this interface in monitors/ + * 2. Register the monitor in ChainsModule + * 3. ChainRegistry picks it up automatically — no other changes needed + */ +export interface IChainMonitor { + /** Unique chain identifier (e.g. "stellar", "ethereum", "polygon"). */ + readonly chainId: string; + + /** + * Translate a raw chain-specific event payload into the shared + * NormalizedChainEvent shape. + */ + normalizeEvent(rawEvent: Record): NormalizedChainEvent; + + /** + * Begin delivering normalized events to the provided callback. + * Implementations should handle reconnection internally. + * @param onEvent - Called for each incoming event. + */ + subscribe(onEvent: (event: NormalizedChainEvent) => void): Promise; + + /** Return true when the underlying RPC / node connection is healthy. */ + isHealthy(): Promise; +} diff --git a/apps/backend/src/modules/chains/interfaces/normalized-chain-event.interface.ts b/apps/backend/src/modules/chains/interfaces/normalized-chain-event.interface.ts new file mode 100644 index 0000000..00c8123 --- /dev/null +++ b/apps/backend/src/modules/chains/interfaces/normalized-chain-event.interface.ts @@ -0,0 +1,25 @@ +/** + * Chain-agnostic representation of a security-relevant blockchain event. + * Every IChainMonitor normalizes its raw events to this shape so downstream + * consumers (alerting, SIEM forwarding) work the same regardless of chain. + */ +export interface NormalizedChainEvent { + /** ISO-8601 timestamp from the chain (block time or ledger close time). */ + timestamp: string; + /** Identifier of the originating chain (e.g. "stellar", "ethereum"). */ + chainId: string; + /** Human-readable event classification (e.g. "large_transfer", "contract_call"). */ + eventType: string; + /** Transaction or operation hash. */ + txHash: string; + /** Account or address that initiated the event. */ + from: string; + /** Destination account or contract address, if applicable. */ + to?: string; + /** Asset amount in the chain's base unit (string to preserve precision). */ + amount?: string; + /** Asset code or symbol (e.g. "XLM", "ETH", "USDC"). */ + asset?: string; + /** Raw chain-specific payload for provider-level enrichment. */ + raw: Record; +} diff --git a/apps/backend/src/modules/chains/monitors/stellar.chain-monitor.ts b/apps/backend/src/modules/chains/monitors/stellar.chain-monitor.ts new file mode 100644 index 0000000..09b78ae --- /dev/null +++ b/apps/backend/src/modules/chains/monitors/stellar.chain-monitor.ts @@ -0,0 +1,96 @@ +import { Injectable, Logger } from '@nestjs/common'; +import axios from 'axios'; +import { IChainMonitor } from '../interfaces/chain-monitor.interface'; +import { NormalizedChainEvent } from '../interfaces/normalized-chain-event.interface'; + +/** + * Stellar chain monitor — the reference IChainMonitor implementation. + * + * Polls the Horizon /payments endpoint via SSE-style cursor paging and + * normalizes each Stellar payment operation to NormalizedChainEvent. + * + * Environment variables: + * STELLAR_HORIZON_URL — Horizon base URL (default: https://horizon-testnet.stellar.org) + */ +@Injectable() +export class StellarChainMonitor implements IChainMonitor { + readonly chainId = 'stellar'; + private readonly logger = new Logger(StellarChainMonitor.name); + private readonly horizonUrl: string; + + constructor(horizonUrl?: string) { + this.horizonUrl = + horizonUrl ?? process.env.STELLAR_HORIZON_URL ?? 'https://horizon-testnet.stellar.org'; + } + + normalizeEvent(rawEvent: Record): NormalizedChainEvent { + const createdAt = + typeof rawEvent.created_at === 'string' ? rawEvent.created_at : new Date().toISOString(); + const txHash = typeof rawEvent.transaction_hash === 'string' ? rawEvent.transaction_hash : ''; + const from = typeof rawEvent.from === 'string' ? rawEvent.from : ''; + const to = typeof rawEvent.to === 'string' ? rawEvent.to : undefined; + const amount = typeof rawEvent.amount === 'string' ? rawEvent.amount : undefined; + + let asset: string | undefined; + if (rawEvent.asset_type === 'native') { + asset = 'XLM'; + } else if (typeof rawEvent.asset_code === 'string') { + asset = rawEvent.asset_code; + } + + return { + timestamp: createdAt, + chainId: this.chainId, + eventType: typeof rawEvent.type === 'string' ? rawEvent.type : 'payment', + txHash, + from, + to, + amount, + asset, + raw: rawEvent, + }; + } + + async subscribe(onEvent: (event: NormalizedChainEvent) => void): Promise { + this.logger.log('StellarChainMonitor: subscribing to Horizon payment stream'); + + let cursor = 'now'; + + const poll = async () => { + try { + const response = await axios.get(`${this.horizonUrl}/payments`, { + params: { cursor, order: 'asc', limit: 50 }, + }); + + const records: Record[] = Array.isArray(response.data?._embedded?.records) + ? (response.data._embedded.records as Record[]) + : []; + + for (const record of records) { + const normalized = this.normalizeEvent(record); + onEvent(normalized); + if (typeof record.paging_token === 'string') { + cursor = record.paging_token; + } + } + } catch (error) { + this.logger.warn(`StellarChainMonitor: poll error: ${String(error)}`); + } + + // Poll every 5 seconds — replace with SSE streaming for production use. + setTimeout(() => void poll(), 5000); + }; + + await poll(); + } + + async isHealthy(): Promise { + try { + await axios.get(`${this.horizonUrl}`, { timeout: 5000 }); + return true; + } catch (error) { + this.logger.warn(`StellarChainMonitor: health check failed: ${String(error)}`); + return false; + } + } +}