From d976ac48aeddcc7bf053372dc4d84bb14321e546 Mon Sep 17 00:00:00 2001 From: Emmzyemms Date: Fri, 19 Jun 2026 14:02:06 +0100 Subject: [PATCH 1/2] Feat: Add backend observability for contract event schema drifts --- .../ingestion/contract-event-drift.service.ts | 373 ++++++++++++++++++ app/backend/src/ingestion/ingestion.module.ts | 24 +- .../src/ingestion/parser-health.dto.ts | 131 ++++++ .../soroban-event-indexer.service.ts | 31 +- .../src/ingestion/soroban-event.parser.ts | 155 +++++++- .../ingestion/soroban-indexer.controller.ts | 40 +- .../ingestion/stellar-ingestion.service.ts | 5 +- app/backend/src/metrics/metrics.service.ts | 109 +++++ 8 files changed, 843 insertions(+), 25 deletions(-) create mode 100644 app/backend/src/ingestion/contract-event-drift.service.ts create mode 100644 app/backend/src/ingestion/parser-health.dto.ts diff --git a/app/backend/src/ingestion/contract-event-drift.service.ts b/app/backend/src/ingestion/contract-event-drift.service.ts new file mode 100644 index 000000000..4b17e1b6c --- /dev/null +++ b/app/backend/src/ingestion/contract-event-drift.service.ts @@ -0,0 +1,373 @@ +import { Injectable, Logger } from "@nestjs/common"; + +import { + RustAcademy_EVENT_SCHEMA_CONTRACTS, + RustAcademy_EVENT_SCHEMA_VERSION, + type EventSchemaContract, +} from "./event-schema"; +import { MAX_SUPPORTED_SCHEMA_VERSION } from "./soroban-event.parser"; + +/** + * Classification for why a parser rejected or flagged an event. + */ +export type DriftReason = + | "unknown_event_name" // topic[1] is not in the schema registry + | "schema_version_too_high" // schema_version > MAX_SUPPORTED_SCHEMA_VERSION + | "incompatible_schema_version" // schema_version not in compatibleVersions list + | "field_mismatch" // parsed map is missing one or more expected payload keys + | "topic_mismatch" // canonical topic doesn't match the registry for that event name + | "parse_error"; // XDR decode or structural error during parsing + +export interface DriftEvent { + reason: DriftReason; + contractId: string; + eventName: string; + schemaVersion: number; + /** Paging token (safe to log — no PII). */ + pagingToken: string; + /** + * Raw payload safe-digest: field names present in the on-chain map. + * We never store raw values — only key names for schema comparison. + */ + observedFields?: string[]; + /** Expected payload keys from the schema registry. */ + expectedFields?: string[]; + /** Missing fields relative to the schema contract. */ + missingFields?: string[]; + /** Extra fields present in the payload that aren't in the schema. */ + extraFields?: string[]; + /** ISO timestamp when this drift was recorded. */ + detectedAt: string; +} + +export interface ParserHealthSnapshot { + /** Rolling window (last 5 min) counters. */ + window: { + processed: number; + rejected: number; + rejectionRate: number; + unknownEventNames: number; + fieldMismatches: number; + parseErrors: number; + schemaVersionTooHigh: number; + incompatibleSchemaVersion: number; + topicMismatches: number; + }; + /** Cumulative totals since service start. */ + totals: { + processed: number; + rejected: number; + unknownEventNames: number; + fieldMismatches: number; + parseErrors: number; + schemaVersionTooHigh: number; + incompatibleSchemaVersion: number; + topicMismatches: number; + }; + /** Known event names from the schema registry. */ + knownEventNames: string[]; + /** Current maximum supported schema version in this indexer. */ + maxSupportedSchemaVersion: number; + /** Current canonical schema version in event-schema.ts. */ + currentSchemaVersion: number; + /** Most recent drift events (up to 20, no raw payload values). */ + recentDriftEvents: DriftEvent[]; + /** ISO timestamp of the snapshot. */ + snapshotAt: string; +} + +/** Rolling bucket: one entry per 30-second slot (10 slots = 5 min window). */ +interface RollingBucket { + slotMs: number; + processed: number; + rejected: number; + unknownEventNames: number; + fieldMismatches: number; + parseErrors: number; + schemaVersionTooHigh: number; + incompatibleSchemaVersion: number; + topicMismatches: number; +} + +const SLOT_DURATION_MS = 30_000; // 30 s +const WINDOW_SLOTS = 10; // 10 × 30 s = 5 min +const MAX_RECENT_DRIFT_EVENTS = 20; +const ALERT_REJECTION_THRESHOLD = 0.1; // 10 % rejection rate triggers a warning log + +@Injectable() +export class ContractEventDriftService { + private readonly logger = new Logger(ContractEventDriftService.name); + + // ── Cumulative totals ──────────────────────────────────────────────────── + private totalProcessed = 0; + private totalRejected = 0; + private totalUnknownEventNames = 0; + private totalFieldMismatches = 0; + private totalParseErrors = 0; + private totalSchemaVersionTooHigh = 0; + private totalIncompatibleSchemaVersion = 0; + private totalTopicMismatches = 0; + + // ── Rolling window ─────────────────────────────────────────────────────── + private readonly buckets: RollingBucket[] = []; + + // ── Recent drift log ───────────────────────────────────────────────────── + private readonly recentDriftEvents: DriftEvent[] = []; + + // ── Schema registry snapshot ───────────────────────────────────────────── + private readonly knownEventNames: ReadonlySet = new Set( + Object.keys(RustAcademy_EVENT_SCHEMA_CONTRACTS), + ); + + // ───────────────────────────────────────────────────────────────────────── + // Public API called by the parser + // ───────────────────────────────────────────────────────────────────────── + + /** Record a successfully parsed event (increments processed counter). */ + recordProcessed(): void { + this.totalProcessed++; + this.currentBucket().processed++; + } + + /** + * Record a drift / rejection event with full diagnostic context. + * This is the primary entry-point used by `SorobanEventParser`. + */ + recordDrift(event: Omit): void { + const driftEvent: DriftEvent = { + ...event, + detectedAt: new Date().toISOString(), + }; + + this.totalRejected++; + this.currentBucket().rejected++; + + switch (event.reason) { + case "unknown_event_name": + this.totalUnknownEventNames++; + this.currentBucket().unknownEventNames++; + break; + case "field_mismatch": + this.totalFieldMismatches++; + this.currentBucket().fieldMismatches++; + break; + case "parse_error": + this.totalParseErrors++; + this.currentBucket().parseErrors++; + break; + case "schema_version_too_high": + this.totalSchemaVersionTooHigh++; + this.currentBucket().schemaVersionTooHigh++; + break; + case "incompatible_schema_version": + this.totalIncompatibleSchemaVersion++; + this.currentBucket().incompatibleSchemaVersion++; + break; + case "topic_mismatch": + this.totalTopicMismatches++; + this.currentBucket().topicMismatches++; + break; + } + + // Append to recent drift ring-buffer + this.recentDriftEvents.push(driftEvent); + if (this.recentDriftEvents.length > MAX_RECENT_DRIFT_EVENTS) { + this.recentDriftEvents.shift(); + } + + // Emit structured log so it's visible in monitoring aggregators + this.logger.warn( + `[schema-drift] reason=${event.reason} event=${event.eventName} ` + + `contract=${event.contractId} schema_version=${event.schemaVersion} ` + + `paging_token=${event.pagingToken}` + + (event.missingFields?.length + ? ` missing_fields=${event.missingFields.join(",")}` + : "") + + (event.extraFields?.length + ? ` extra_fields=${event.extraFields.join(",")}` + : ""), + ); + + // Alert if rolling window rejection rate exceeds threshold + this.checkRejectionRateAlert(); + } + + /** + * Compare observed payload field names against the schema contract for + * a known event. Returns a DriftEvent if mismatches are found, or null. + */ + detectFieldDrift( + eventName: string, + contractId: string, + pagingToken: string, + schemaVersion: number, + observedFields: string[], + ): DriftEvent | null { + const contract = + RustAcademy_EVENT_SCHEMA_CONTRACTS[ + eventName as keyof typeof RustAcademy_EVENT_SCHEMA_CONTRACTS + ]; + + if (!contract) return null; // unknown event — handled separately + + const expectedPayloadKeys = new Set(contract.payloadKeys); + const observedSet = new Set(observedFields); + + const missingFields = [...expectedPayloadKeys].filter( + (k) => !observedSet.has(k), + ); + // Extra fields are informational (new fields added before schema bump) + const extraFields = observedFields.filter((k) => !expectedPayloadKeys.has(k)); + + if (missingFields.length > 0) { + return { + reason: "field_mismatch", + contractId, + eventName, + schemaVersion, + pagingToken, + observedFields, + expectedFields: [...expectedPayloadKeys], + missingFields, + extraFields, + detectedAt: new Date().toISOString(), + }; + } + + return null; + } + + /** + * Check whether an event name is in the schema registry. + */ + isKnownEvent(eventName: string): boolean { + return this.knownEventNames.has(eventName); + } + + /** + * Get the schema contract for an event name (or undefined). + */ + getContract( + eventName: string, + ): EventSchemaContract | undefined { + return RustAcademy_EVENT_SCHEMA_CONTRACTS[ + eventName as keyof typeof RustAcademy_EVENT_SCHEMA_CONTRACTS + ] as EventSchemaContract | undefined; + } + + /** + * Build a developer-facing health snapshot. + */ + getHealthSnapshot(): ParserHealthSnapshot { + const window = this.computeWindowTotals(); + const rejected = window.rejected; + const processed = window.processed; + const rejectionRate = + processed + rejected > 0 ? rejected / (processed + rejected) : 0; + + return { + window: { + ...window, + rejectionRate: Math.round(rejectionRate * 10_000) / 100, // percentage, 2 dp + }, + totals: { + processed: this.totalProcessed, + rejected: this.totalRejected, + unknownEventNames: this.totalUnknownEventNames, + fieldMismatches: this.totalFieldMismatches, + parseErrors: this.totalParseErrors, + schemaVersionTooHigh: this.totalSchemaVersionTooHigh, + incompatibleSchemaVersion: this.totalIncompatibleSchemaVersion, + topicMismatches: this.totalTopicMismatches, + }, + knownEventNames: [...this.knownEventNames], + maxSupportedSchemaVersion: MAX_SUPPORTED_SCHEMA_VERSION, + currentSchemaVersion: RustAcademy_EVENT_SCHEMA_VERSION, + recentDriftEvents: [...this.recentDriftEvents], + snapshotAt: new Date().toISOString(), + }; + } + + // ───────────────────────────────────────────────────────────────────────── + // Private helpers + // ───────────────────────────────────────────────────────────────────────── + + private currentBucket(): RollingBucket { + const now = Date.now(); + const slotMs = now - (now % SLOT_DURATION_MS); + + const last = this.buckets[this.buckets.length - 1]; + if (last && last.slotMs === slotMs) { + return last; + } + + // Prune expired buckets + const cutoff = slotMs - SLOT_DURATION_MS * WINDOW_SLOTS; + while (this.buckets.length > 0 && this.buckets[0].slotMs < cutoff) { + this.buckets.shift(); + } + + const bucket: RollingBucket = { + slotMs, + processed: 0, + rejected: 0, + unknownEventNames: 0, + fieldMismatches: 0, + parseErrors: 0, + schemaVersionTooHigh: 0, + incompatibleSchemaVersion: 0, + topicMismatches: 0, + }; + this.buckets.push(bucket); + return bucket; + } + + private computeWindowTotals(): Omit { + const now = Date.now(); + const cutoff = now - SLOT_DURATION_MS * WINDOW_SLOTS; + + const totals: Omit = { + processed: 0, + rejected: 0, + unknownEventNames: 0, + fieldMismatches: 0, + parseErrors: 0, + schemaVersionTooHigh: 0, + incompatibleSchemaVersion: 0, + topicMismatches: 0, + }; + + for (const bucket of this.buckets) { + if (bucket.slotMs >= cutoff) { + totals.processed += bucket.processed; + totals.rejected += bucket.rejected; + totals.unknownEventNames += bucket.unknownEventNames; + totals.fieldMismatches += bucket.fieldMismatches; + totals.parseErrors += bucket.parseErrors; + totals.schemaVersionTooHigh += bucket.schemaVersionTooHigh; + totals.incompatibleSchemaVersion += bucket.incompatibleSchemaVersion; + totals.topicMismatches += bucket.topicMismatches; + } + } + + return totals; + } + + private checkRejectionRateAlert(): void { + const window = this.computeWindowTotals(); + const total = window.processed + window.rejected; + if (total < 20) return; // too few events to be meaningful + + const rate = window.rejected / total; + if (rate >= ALERT_REJECTION_THRESHOLD) { + this.logger.error( + `[schema-drift-alert] Rejection rate=${(rate * 100).toFixed(1)}% ` + + `exceeds threshold=${ALERT_REJECTION_THRESHOLD * 100}% ` + + `(${window.rejected}/${total} events in last 5 min). ` + + `unknown_names=${window.unknownEventNames} ` + + `field_mismatches=${window.fieldMismatches} ` + + `parse_errors=${window.parseErrors}`, + ); + } + } +} diff --git a/app/backend/src/ingestion/ingestion.module.ts b/app/backend/src/ingestion/ingestion.module.ts index f1a5bb3ef..dc479c79d 100644 --- a/app/backend/src/ingestion/ingestion.module.ts +++ b/app/backend/src/ingestion/ingestion.module.ts @@ -14,6 +14,8 @@ import { StellarIngestionService } from "./stellar-ingestion.service"; import { SorobanEventIndexerService } from "./soroban-event-indexer.service"; import { SorobanIndexerController } from "./soroban-indexer.controller"; import { IngestionBootstrapService } from "./ingestion-bootstrap.service"; +import { ContractEventDriftService } from "./contract-event-drift.service"; +import { MetricsService } from "../metrics/metrics.service"; @Module({ imports: [ @@ -29,7 +31,26 @@ import { IngestionBootstrapService } from "./ingestion-bootstrap.service"; AdminEventRepository, StealthEventRepository, IndexerCheckpointRepository, - SorobanEventParser, + ContractEventDriftService, + /** + * Custom factory provider for SorobanEventParser so we can wire + * the onUnknownSchemaVersion callback (which needs MetricsService) + * alongside the ContractEventDriftService at module bootstrap time. + */ + { + provide: SorobanEventParser, + useFactory: ( + metrics: MetricsService, + driftService: ContractEventDriftService, + ) => + new SorobanEventParser( + (eventName, version, pagingToken) => { + metrics.recordUnknownSchemaVersion(eventName, version); + }, + driftService, + ), + inject: [MetricsService, ContractEventDriftService], + }, StellarIngestionService, SorobanEventIndexerService, IngestionBootstrapService, @@ -40,6 +61,7 @@ import { IngestionBootstrapService } from "./ingestion-bootstrap.service"; SorobanEventParser, CursorRepository, EscrowEventRepository, + ContractEventDriftService, ], }) export class IngestionModule {} diff --git a/app/backend/src/ingestion/parser-health.dto.ts b/app/backend/src/ingestion/parser-health.dto.ts new file mode 100644 index 000000000..0ad0e2006 --- /dev/null +++ b/app/backend/src/ingestion/parser-health.dto.ts @@ -0,0 +1,131 @@ +import { ApiProperty } from "@nestjs/swagger"; + +export class ParserWindowStatsDto { + @ApiProperty({ description: "Events successfully parsed in the last 5 minutes" }) + processed!: number; + + @ApiProperty({ description: "Events rejected in the last 5 minutes" }) + rejected!: number; + + @ApiProperty({ + description: + "Rejection rate as a percentage (0–100, two decimal places) for the last 5 minutes", + example: 2.34, + }) + rejectionRate!: number; + + @ApiProperty({ description: "Events with an unrecognised event name in the last 5 minutes" }) + unknownEventNames!: number; + + @ApiProperty({ description: "Events with missing payload fields in the last 5 minutes" }) + fieldMismatches!: number; + + @ApiProperty({ description: "Events that caused an XDR parse error in the last 5 minutes" }) + parseErrors!: number; + + @ApiProperty({ + description: "Events with schema_version > maxSupportedSchemaVersion in the last 5 minutes", + }) + schemaVersionTooHigh!: number; + + @ApiProperty({ + description: "Events with schema_version not in the compatible versions list in the last 5 minutes", + }) + incompatibleSchemaVersion!: number; + + @ApiProperty({ description: "Events whose on-chain topic didn't match the expected topic in the last 5 minutes" }) + topicMismatches!: number; +} + +export class ParserTotalsDto { + @ApiProperty({ description: "Total events successfully parsed since service start" }) + processed!: number; + + @ApiProperty({ description: "Total events rejected since service start" }) + rejected!: number; + + @ApiProperty({ description: "Total unknown event names since service start" }) + unknownEventNames!: number; + + @ApiProperty({ description: "Total field mismatches since service start" }) + fieldMismatches!: number; + + @ApiProperty({ description: "Total parse errors since service start" }) + parseErrors!: number; + + @ApiProperty({ description: "Total schema_version_too_high rejections since service start" }) + schemaVersionTooHigh!: number; + + @ApiProperty({ description: "Total incompatible_schema_version rejections since service start" }) + incompatibleSchemaVersion!: number; + + @ApiProperty({ description: "Total topic mismatches since service start" }) + topicMismatches!: number; +} + +export class DriftEventDto { + @ApiProperty({ + enum: [ + "unknown_event_name", + "schema_version_too_high", + "incompatible_schema_version", + "field_mismatch", + "topic_mismatch", + "parse_error", + ], + }) + reason!: string; + + @ApiProperty({ example: "CXXX..." }) + contractId!: string; + + @ApiProperty({ example: "EscrowDeposited" }) + eventName!: string; + + @ApiProperty({ example: 3 }) + schemaVersion!: number; + + @ApiProperty({ example: "12345-1" }) + pagingToken!: string; + + @ApiProperty({ type: [String], required: false, example: ["amount", "token"] }) + observedFields?: string[]; + + @ApiProperty({ type: [String], required: false }) + expectedFields?: string[]; + + @ApiProperty({ type: [String], required: false, example: ["expires_at"] }) + missingFields?: string[]; + + @ApiProperty({ type: [String], required: false, example: ["new_field"] }) + extraFields?: string[]; + + @ApiProperty({ example: "2024-01-01T00:00:00.000Z" }) + detectedAt!: string; +} + +export class ParserHealthResponseDto { + @ApiProperty({ type: ParserWindowStatsDto, description: "Rolling 5-minute window stats" }) + window!: ParserWindowStatsDto; + + @ApiProperty({ type: ParserTotalsDto, description: "Cumulative totals since service start" }) + totals!: ParserTotalsDto; + + @ApiProperty({ type: [String], description: "Event names known to the schema registry" }) + knownEventNames!: string[]; + + @ApiProperty({ example: 2, description: "Maximum schema_version this indexer can parse" }) + maxSupportedSchemaVersion!: number; + + @ApiProperty({ example: 2, description: "Canonical schema_version declared in event-schema.ts" }) + currentSchemaVersion!: number; + + @ApiProperty({ + type: [DriftEventDto], + description: "Most recent schema drift events (up to 20). Contains only field names, never raw payload values.", + }) + recentDriftEvents!: DriftEventDto[]; + + @ApiProperty({ example: "2024-01-01T00:00:00.000Z" }) + snapshotAt!: string; +} diff --git a/app/backend/src/ingestion/soroban-event-indexer.service.ts b/app/backend/src/ingestion/soroban-event-indexer.service.ts index 29cdde9a1..f6f3cbac7 100644 --- a/app/backend/src/ingestion/soroban-event-indexer.service.ts +++ b/app/backend/src/ingestion/soroban-event-indexer.service.ts @@ -13,6 +13,7 @@ import { EscrowEventRepository } from "./escrow-event.repository"; import { PrivacyEventRepository } from "./privacy-event.repository"; import { AdminEventRepository } from "./admin-event.repository"; import { StealthEventRepository } from "./stealth-event.repository"; +import { ContractEventDriftService } from "./contract-event-drift.service"; import type { RustAcademyContractEvent } from "./types/contract-event.types"; const PAGE_LIMIT = 200; @@ -37,7 +38,6 @@ export interface DualReadConfig { export class SorobanEventIndexerService { private readonly logger = new Logger(SorobanEventIndexerService.name); private readonly horizonUrl: string; - private readonly parser: SorobanEventParser; constructor( private readonly config: AppConfigService, @@ -48,15 +48,10 @@ export class SorobanEventIndexerService { private readonly stealthRepo: StealthEventRepository, private readonly metrics: MetricsService, private readonly eventEmitter: EventEmitter2, + private readonly driftService: ContractEventDriftService, + private readonly parser: SorobanEventParser, ) { this.horizonUrl = HORIZON_BASE_URLS[this.config.network]; - - this.parser = new SorobanEventParser((eventName, version, pagingToken) => { - this.logger.warn( - `Unknown schema_version=${version} for event ${eventName} paging_token=${pagingToken}`, - ); - this.metrics.recordUnknownSchemaVersion(eventName, version); - }); } async indexLedgerRange( @@ -166,6 +161,26 @@ export class SorobanEventIndexerService { if (!event) { skippedUnknownSchema++; + + // Push individual rejection metrics from the drift service snapshot + // so Prometheus counters stay in sync with the in-process drift state. + const snapshot = this.driftService.getHealthSnapshot(); + const totals = snapshot.totals; + this.metrics.recordParserRejection( + "any", // aggregated label for total rejections + contractId, + "unknown", + 0, + ); + // Update the rolling rejection rate gauge for this contract + const window = snapshot.window; + if (window.processed + window.rejected > 0) { + this.metrics.setContractEventRejectionRate( + contractId, + window.rejected / (window.processed + window.rejected), + ); + } + continue; } diff --git a/app/backend/src/ingestion/soroban-event.parser.ts b/app/backend/src/ingestion/soroban-event.parser.ts index ee6f6fea7..34fd4bd8a 100644 --- a/app/backend/src/ingestion/soroban-event.parser.ts +++ b/app/backend/src/ingestion/soroban-event.parser.ts @@ -1,4 +1,4 @@ -import { Logger } from "@nestjs/common"; +import { Injectable, Logger, Optional } from "@nestjs/common"; import { xdr, scValToNative, Address } from "@stellar/stellar-sdk"; import type { @@ -19,6 +19,7 @@ import { RustAcademy_EVENT_TOPICS, type RustAcademyEventTopic, } from "./event-schema"; +import type { ContractEventDriftService } from "./contract-event-drift.service"; /** Maximum schema version this indexer understands. */ export const MAX_SUPPORTED_SCHEMA_VERSION = 2; @@ -63,29 +64,87 @@ interface TopicLayout { * Legacy events used Topic[0] = event name. The parser keeps a compatibility * path for those events and marks them with schemaVersion=1. */ +@Injectable() export class SorobanEventParser { private readonly logger = new Logger(SorobanEventParser.name); constructor( - private readonly onUnknownSchemaVersion?: UnknownSchemaVersionHandler, + @Optional() private readonly onUnknownSchemaVersion?: UnknownSchemaVersionHandler, + @Optional() private readonly driftService?: ContractEventDriftService, ) {} /** * Attempt to parse a raw Horizon contract event. * Returns null when the event is unrecognised, malformed, or carries an * unsupported schema version. + * + * All rejection paths are recorded in `ContractEventDriftService` so they + * are visible in monitoring and the parser health endpoint. */ parse(raw: RawHorizonContractEvent): RustAcademyContractEvent | null { try { const topics = raw.topic.map((t) => xdr.ScVal.fromXDR(t, "base64")); const dataVal = xdr.ScVal.fromXDR(raw.value.xdr, "base64"); - if (topics.length === 0) return null; + if (topics.length === 0) { + this.driftService?.recordDrift({ + reason: "parse_error", + contractId: raw.contract_id, + eventName: "unknown", + schemaVersion: 0, + pagingToken: raw.paging_token, + }); + return null; + } const layout = this.resolveTopicLayout(topics); - if (!layout) return null; + + if (!layout) { + // Classify: is the first topic symbol a recognisable name that's just + // in the wrong namespace, or is it truly unknown? + const firstSym = this.decodeSymbol(topics[0]); + const secondSym = topics[1] ? this.decodeSymbol(topics[1]) : null; + + // Canonical layout: topic[0]=namespace, topic[1]=eventName + const canonicalTopics = new Set(Object.values(RustAcademy_EVENT_TOPICS)); + if (firstSym && canonicalTopics.has(firstSym) && secondSym) { + // Namespace matched but event name is not in the registry + if (!this.driftService?.isKnownEvent(secondSym)) { + this.driftService?.recordDrift({ + reason: "unknown_event_name", + contractId: raw.contract_id, + eventName: secondSym, + schemaVersion: 0, + pagingToken: raw.paging_token, + }); + } else { + // Topic mismatch: event name is known but under a different topic + const contract = this.driftService?.getContract(secondSym); + if (contract) { + this.driftService?.recordDrift({ + reason: "topic_mismatch", + contractId: raw.contract_id, + eventName: secondSym, + schemaVersion: 0, + pagingToken: raw.paging_token, + }); + } + } + } else if (firstSym && !this.driftService?.isKnownEvent(firstSym)) { + // Legacy layout attempt: topic[0] = event name, but it's not known + this.driftService?.recordDrift({ + reason: "unknown_event_name", + contractId: raw.contract_id, + eventName: firstSym ?? "unknown", + schemaVersion: 0, + pagingToken: raw.paging_token, + }); + } + return null; + } const schemaVersion = this.extractSchemaVersionFromData(dataVal); + if (schemaVersion > MAX_SUPPORTED_SCHEMA_VERSION) { this.logger.warn( `Skipping event ${layout.eventName} paging_token=${raw.paging_token}: ` + @@ -96,6 +155,13 @@ export class SorobanEventParser { schemaVersion, raw.paging_token, ); + this.driftService?.recordDrift({ + reason: "schema_version_too_high", + contractId: raw.contract_id, + eventName: layout.eventName, + schemaVersion, + pagingToken: raw.paging_token, + }); return null; } @@ -103,9 +169,35 @@ export class SorobanEventParser { this.logger.warn( `Unsupported ${layout.eventName} schema version ${schemaVersion}`, ); + this.driftService?.recordDrift({ + reason: "incompatible_schema_version", + contractId: raw.contract_id, + eventName: layout.eventName, + schemaVersion, + pagingToken: raw.paging_token, + }); return null; } + // ── Field drift detection ────────────────────────────────────────── + // We extract the observed payload keys here (names only, no values) and + // compare against the schema contract before dispatching to the per-event + // parser. This is purely diagnostic — we still attempt the full parse. + const observedFieldNames = this.extractFieldNames(dataVal); + const fieldDrift = this.driftService?.detectFieldDrift( + layout.eventName, + raw.contract_id, + raw.paging_token, + schemaVersion, + observedFieldNames, + ); + if (fieldDrift) { + // Record the mismatch but continue parsing — the event might still be + // partially parseable and the data can be used for analytics. + this.driftService?.recordDrift(fieldDrift); + } + // ────────────────────────────────────────────────────────────────── + const base = { schemaVersion, topicNamespace: layout.topicNamespace, @@ -115,78 +207,102 @@ export class SorobanEventParser { contractTimestamp: this.extractTimestampFromData(dataVal), }; + let parsed: RustAcademyContractEvent | null = null; + switch (layout.eventName) { case "EscrowDeposited": - return this.parseEscrowDeposited( + parsed = this.parseEscrowDeposited( topics, dataVal, base, layout.indexedOffset, ); + break; case "EscrowWithdrawn": - return this.parseEscrowWithdrawn( + parsed = this.parseEscrowWithdrawn( topics, dataVal, base, layout.indexedOffset, ); + break; case "EscrowRefunded": - return this.parseEscrowRefunded( + parsed = this.parseEscrowRefunded( topics, dataVal, base, layout.indexedOffset, ); + break; case "PrivacyToggled": - return this.parsePrivacyToggled( + parsed = this.parsePrivacyToggled( topics, dataVal, base, layout.indexedOffset, ); + break; case "ContractPaused": - return this.parseContractPaused( + parsed = this.parseContractPaused( topics, dataVal, base, layout.indexedOffset, ); + break; case "AdminChanged": - return this.parseAdminChanged( + parsed = this.parseAdminChanged( topics, dataVal, base, layout.indexedOffset, ); + break; case "ContractUpgraded": - return this.parseContractUpgraded( + parsed = this.parseContractUpgraded( topics, dataVal, base, layout.indexedOffset, ); + break; case "EphemeralKeyRegistered": - return this.parseEphemeralKeyRegistered( + parsed = this.parseEphemeralKeyRegistered( topics, dataVal, base, layout.indexedOffset, ); + break; case "StealthWithdrawn": - return this.parseStealthWithdrawn( + parsed = this.parseStealthWithdrawn( topics, dataVal, base, layout.indexedOffset, ); + break; default: this.logger.debug(`Unrecognised event name: ${layout.eventName}`); return null; } + + if (parsed !== null) { + this.driftService?.recordProcessed(); + } + + return parsed; } catch (err) { this.logger.warn( `Failed to parse contract event ${raw.paging_token}: ${(err as Error).message}`, ); + this.driftService?.recordDrift({ + reason: "parse_error", + contractId: raw.contract_id, + eventName: "unknown", + schemaVersion: 0, + pagingToken: raw.paging_token, + }); return null; } } @@ -489,6 +605,19 @@ export class SorobanEventParser { return result; } + /** + * Extract only the field *names* from an XDR map (no values). + * Used for schema drift detection — we never log raw payload values. + */ + private extractFieldNames(data: xdr.ScVal): string[] { + try { + const mapEntries = data.map(); + return mapEntries.map((entry) => entry.key().sym().toString()); + } catch { + return []; + } + } + private extractSchemaVersionFromData(data: xdr.ScVal): number { try { const map = this.dataToMap(data); diff --git a/app/backend/src/ingestion/soroban-indexer.controller.ts b/app/backend/src/ingestion/soroban-indexer.controller.ts index 30e0d025e..8be383321 100644 --- a/app/backend/src/ingestion/soroban-indexer.controller.ts +++ b/app/backend/src/ingestion/soroban-indexer.controller.ts @@ -2,6 +2,7 @@ import { Body, Controller, ConflictException, + Get, HttpCode, HttpStatus, Post, @@ -10,6 +11,8 @@ import { ApiOperation, ApiResponse, ApiTags } from "@nestjs/swagger"; import { IsBoolean, IsInt, IsNotEmpty, IsOptional, IsString, Min } from "class-validator"; import { SorobanEventIndexerService, LedgerRangeResult } from "./soroban-event-indexer.service"; +import { ContractEventDriftService } from "./contract-event-drift.service"; +import { ParserHealthResponseDto } from "./parser-health.dto"; class ReindexDto { @IsString() @@ -34,7 +37,7 @@ class ReindexDto { } /** - * Admin endpoint for triggering Soroban event reindexing over a ledger range. + * Developer-facing endpoints for Soroban indexer administration and parser health. * Should be protected by an API-key guard in production. */ @ApiTags("indexer") @@ -42,7 +45,10 @@ class ReindexDto { export class SorobanIndexerController { private running = false; - constructor(private readonly indexer: SorobanEventIndexerService) {} + constructor( + private readonly indexer: SorobanEventIndexerService, + private readonly driftService: ContractEventDriftService, + ) {} @Post("reindex") @HttpCode(HttpStatus.OK) @@ -73,4 +79,34 @@ export class SorobanIndexerController { this.running = false; } } + + /** + * Developer-facing endpoint that exposes the parser health snapshot. + * + * Includes: + * - Rolling 5-minute window rejection rate and breakdown by reason + * - Cumulative totals since service start + * - Known event names from the schema registry + * - Max / current schema versions + * - Recent drift events (field names only — no raw payload values) + * + * This endpoint is intentionally unauthenticated for observability dashboards. + * It contains no sensitive data — only schema metadata and aggregate counters. + */ + @Get("parser-health") + @ApiOperation({ + summary: "Parser health and schema drift status", + description: + "Returns aggregate parser counters, rolling rejection rate, schema version info, " + + "and recent drift events (field names only, no raw payload values). " + + "Use this endpoint to surface schema mismatches in monitoring dashboards.", + }) + @ApiResponse({ + status: 200, + type: ParserHealthResponseDto, + description: "Parser health snapshot retrieved successfully", + }) + getParserHealth(): ParserHealthResponseDto { + return this.driftService.getHealthSnapshot(); + } } diff --git a/app/backend/src/ingestion/stellar-ingestion.service.ts b/app/backend/src/ingestion/stellar-ingestion.service.ts index b7a1c0add..bb4c10151 100644 --- a/app/backend/src/ingestion/stellar-ingestion.service.ts +++ b/app/backend/src/ingestion/stellar-ingestion.service.ts @@ -18,6 +18,7 @@ import { EscrowEventRepository } from "./escrow-event.repository"; import { JobQueueService } from "../job-queue/job-queue.service"; import { JobType } from "../job-queue/types"; import { StellarReconnectPayload } from "../job-queue/types/job-payloads.types"; +import { ContractEventDriftService } from "./contract-event-drift.service"; import type { EscrowEvent, RustAcademyContractEvent, @@ -67,6 +68,7 @@ export class StellarIngestionService implements OnModuleInit, OnModuleDestroy { private readonly parser: SorobanEventParser, private readonly eventEmitter: EventEmitter2, private readonly jobQueueService: JobQueueService, + private readonly driftService: ContractEventDriftService, ) {} onModuleInit(): void { @@ -376,7 +378,8 @@ export class StellarIngestionService implements OnModuleInit, OnModuleDestroy { const event = this.parser.parse(raw); if (!event) { - // Unrecognised or non- RustAcademy event; still advance cursor. + // The parser and drift service have already classified and counted the + // rejection. Still advance the cursor so we don't re-process the event. await this.safeUpdateCursor(streamId, raw.paging_token, raw.ledger); return; } diff --git a/app/backend/src/metrics/metrics.service.ts b/app/backend/src/metrics/metrics.service.ts index 634051a39..213ed3601 100644 --- a/app/backend/src/metrics/metrics.service.ts +++ b/app/backend/src/metrics/metrics.service.ts @@ -16,6 +16,13 @@ export class MetricsService implements OnModuleInit { private sorobanRpcFailoverTotal: client.Counter; private sorobanRpcActiveEndpoint: client.Gauge; private sorobanIndexerUnknownSchemaVersion: client.Counter; + // ── Contract event drift / parser observability ────────────────────────── + private contractEventParserRejections: client.Counter; + private contractEventUnknownNames: client.Counter; + private contractEventFieldMismatches: client.Counter; + private contractEventParseErrors: client.Counter; + private contractEventRejectionRate: client.Gauge; + // ───────────────────────────────────────────────────────────────────────── private parityCheckResults: client.Gauge; private shadowTrafficRequests: client.Counter; private indexerLagLedgers: client.Gauge; @@ -131,6 +138,38 @@ export class MetricsService implements OnModuleInit { help: "Indexer lag guard status (0=disabled, 1=enabled, 2=overridden, 3=lagging)", }); + // ── Contract event drift / parser observability ────────────────────── + this.contractEventParserRejections = new client.Counter({ + name: "contract_event_parser_rejections_total", + help: "Total contract events rejected by the parser, labelled by reason, contract ID, event name, and schema version", + labelNames: ["reason", "contract_id", "event_name", "schema_version"], + }); + + this.contractEventUnknownNames = new client.Counter({ + name: "contract_event_unknown_names_total", + help: "Contract events with an event name not present in the schema registry", + labelNames: ["contract_id"], + }); + + this.contractEventFieldMismatches = new client.Counter({ + name: "contract_event_field_mismatches_total", + help: "Contract events whose payload is missing one or more expected fields", + labelNames: ["contract_id", "event_name", "schema_version"], + }); + + this.contractEventParseErrors = new client.Counter({ + name: "contract_event_parse_errors_total", + help: "Contract events that caused an XDR decode or structural parse error", + labelNames: ["contract_id"], + }); + + this.contractEventRejectionRate = new client.Gauge({ + name: "contract_event_rejection_rate", + help: "Rolling 5-minute rejection rate (0.0–1.0) for the contract event parser", + labelNames: ["contract_id"], + }); + // ──────────────────────────────────────────────────────────────────── + this.register.registerMetric(this.httpRequestDuration); this.register.registerMetric(this.httpRequestTotal); this.register.registerMetric(this.rateLimitedRequestsTotal); @@ -148,6 +187,11 @@ export class MetricsService implements OnModuleInit { this.register.registerMetric(this.indexerLagLedgers); this.register.registerMetric(this.indexerLagGuardBlockedRequests); this.register.registerMetric(this.indexerLagGuardStatus); + this.register.registerMetric(this.contractEventParserRejections); + this.register.registerMetric(this.contractEventUnknownNames); + this.register.registerMetric(this.contractEventFieldMismatches); + this.register.registerMetric(this.contractEventParseErrors); + this.register.registerMetric(this.contractEventRejectionRate); this.initialized = true; } catch (error) { @@ -354,4 +398,69 @@ export class MetricsService implements OnModuleInit { this.indexerLagGuardStatus.set(status); } catch (error) {} } + + // ── Contract event drift / parser observability ───────────────────────── + + /** + * Increment the parser rejections counter. + * @param reason Why the event was rejected (drift reason) + * @param contractId Soroban contract ID + * @param eventName Event name (may be "unknown" if the name itself is unrecognised) + * @param schemaVersion Schema version read from the event (or 0 if unavailable) + */ + recordParserRejection( + reason: string, + contractId: string, + eventName: string, + schemaVersion: number, + ) { + if (!this.initialized || !this.contractEventParserRejections) return; + try { + this.contractEventParserRejections + .labels(reason, contractId, eventName, String(schemaVersion)) + .inc(); + } catch (error) {} + } + + /** Increment the unknown event name counter. */ + recordUnknownEventName(contractId: string) { + if (!this.initialized || !this.contractEventUnknownNames) return; + try { + this.contractEventUnknownNames.labels(contractId).inc(); + } catch (error) {} + } + + /** Increment the field mismatch counter. */ + recordFieldMismatch( + contractId: string, + eventName: string, + schemaVersion: number, + ) { + if (!this.initialized || !this.contractEventFieldMismatches) return; + try { + this.contractEventFieldMismatches + .labels(contractId, eventName, String(schemaVersion)) + .inc(); + } catch (error) {} + } + + /** Increment the parse error counter. */ + recordParseError(contractId: string) { + if (!this.initialized || !this.contractEventParseErrors) return; + try { + this.contractEventParseErrors.labels(contractId).inc(); + } catch (error) {} + } + + /** + * Update the rolling rejection rate gauge for a contract. + * @param contractId Soroban contract ID + * @param rate Value between 0 and 1 (0 = no rejections) + */ + setContractEventRejectionRate(contractId: string, rate: number) { + if (!this.initialized || !this.contractEventRejectionRate) return; + try { + this.contractEventRejectionRate.labels(contractId).set(rate); + } catch (error) {} + } } From 98fbedb036fe5ffcce620389333926b4ec5b91d4 Mon Sep 17 00:00:00 2001 From: Emmzyemms Date: Sat, 20 Jun 2026 10:23:50 +0100 Subject: [PATCH 2/2] Fix: Ci fails --- app/backend/src/ingestion/ingestion.module.ts | 2 +- app/backend/src/ingestion/soroban-event-indexer.service.ts | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/app/backend/src/ingestion/ingestion.module.ts b/app/backend/src/ingestion/ingestion.module.ts index dc479c79d..150270e62 100644 --- a/app/backend/src/ingestion/ingestion.module.ts +++ b/app/backend/src/ingestion/ingestion.module.ts @@ -44,7 +44,7 @@ import { MetricsService } from "../metrics/metrics.service"; driftService: ContractEventDriftService, ) => new SorobanEventParser( - (eventName, version, pagingToken) => { + (eventName, version) => { metrics.recordUnknownSchemaVersion(eventName, version); }, driftService, diff --git a/app/backend/src/ingestion/soroban-event-indexer.service.ts b/app/backend/src/ingestion/soroban-event-indexer.service.ts index f6f3cbac7..1ad2eb54b 100644 --- a/app/backend/src/ingestion/soroban-event-indexer.service.ts +++ b/app/backend/src/ingestion/soroban-event-indexer.service.ts @@ -165,7 +165,6 @@ export class SorobanEventIndexerService { // Push individual rejection metrics from the drift service snapshot // so Prometheus counters stay in sync with the in-process drift state. const snapshot = this.driftService.getHealthSnapshot(); - const totals = snapshot.totals; this.metrics.recordParserRejection( "any", // aggregated label for total rejections contractId,