diff --git a/app/backend/src/developer/developer.controller.ts b/app/backend/src/developer/developer.controller.ts index 717c2bea..fb289765 100644 --- a/app/backend/src/developer/developer.controller.ts +++ b/app/backend/src/developer/developer.controller.ts @@ -22,6 +22,7 @@ import { DeveloperService } from './developer.service'; import { BulkRevokeDto, BulkRevokeResultDto, + WebhookSampleEventDto, WebhookTestResultDto, IntegrationHealthDto, PingResponseDto, @@ -59,6 +60,25 @@ export class DeveloperController { return this.developerService.testWebhook(webhookId); } + @Post('webhooks/:webhookId/sample-events') + @HttpCode(HttpStatus.OK) + @RequireScopes('admin') + @ApiOperation({ + summary: 'Send a canonical sample event to a webhook receiver', + description: + 'Posts a sample link.created, payment.received, payment.settled, or payment.failed payload. ' + + 'The request can include signature headers and an explicit timestamp for receiver testing.', + }) + @ApiParam({ name: 'webhookId', description: 'Webhook UUID' }) + @ApiResponse({ status: 200, type: WebhookTestResultDto }) + @ApiResponse({ status: 404, description: 'Webhook not found' }) + sampleWebhookEvent( + @Param('webhookId', ParseUUIDPipe) webhookId: string, + @Body() dto: WebhookSampleEventDto, + ): Promise { + return this.developerService.sendSampleWebhookEvent(webhookId, dto); + } + @Post('keys/bulk-revoke') @HttpCode(HttpStatus.OK) @RequireScopes('admin') diff --git a/app/backend/src/developer/developer.service.ts b/app/backend/src/developer/developer.service.ts index 00c0d603..c68d3d4f 100644 --- a/app/backend/src/developer/developer.service.ts +++ b/app/backend/src/developer/developer.service.ts @@ -9,6 +9,8 @@ import { AuditService } from '../audit/audit.service'; import { BulkRevokeDto, BulkRevokeResultDto, + WebhookSampleEventDto, + WebhookSampleEventType, WebhookTestResultDto, IntegrationHealthDto, PingResponseDto, @@ -37,22 +39,37 @@ export class DeveloperService { } async testWebhook(webhookId: string): Promise { + return this.sendSampleWebhookEvent(webhookId, { + event_type: 'payment.received', + include_signature: true, + }, 'webhook.test'); + } + + async sendSampleWebhookEvent( + webhookId: string, + dto: WebhookSampleEventDto = {}, + auditAction = 'webhook.sample', + ): Promise { const webhook = await this.webhookService.getWebhook(webhookId); if (!webhook) throw new NotFoundException('Webhook not found'); - const sentAt = new Date().toISOString(); - const testEventId = `test_${crypto.randomUUID()}`; + const eventType = dto.event_type ?? 'payment.received'; + const sentAt = dto.timestamp ?? new Date().toISOString(); + const eventId = `sample_${eventType.replace('.', '_')}_${crypto.randomUUID()}`; const payload = { - eventType: 'payment.received', - eventId: testEventId, + eventType, + eventId, recipientPublicKey: webhook.publicKey, - payload: { test: true, source: 'developer_self_service_api' }, + payload: this.buildSamplePayload(eventType, webhook.publicKey, sentAt), timestamp: sentAt, }; const bodyStr = JSON.stringify(payload); - const ts = Date.now(); - const signature = this.signPayload(webhook.secret, bodyStr, ts); + const includeSignature = dto.include_signature ?? true; + const ts = new Date(sentAt).getTime(); + const signature = includeSignature + ? this.signPayload(webhook.secret, bodyStr, ts) + : undefined; const controller = new AbortController(); const timer = setTimeout(() => controller.abort(), TEST_WEBHOOK_TIMEOUT_MS); @@ -67,9 +84,10 @@ export class DeveloperService { method: 'POST', headers: { 'Content-Type': 'application/json', - 'X-QX-Signature': signature, - 'X-QX-Event': 'payment.received', - 'X-QX-Event-Id': testEventId, + ...(signature ? { 'X-QX-Signature': signature } : {}), + ...(includeSignature ? { 'X-QX-Timestamp': String(ts) } : {}), + 'X-QX-Event': eventType, + 'X-QX-Event-Id': eventId, 'X-QX-Test': 'true', 'User-Agent': 'QuickEx-Webhook/1.0', }, @@ -100,9 +118,16 @@ export class DeveloperService { await this.auditService.log( 'developer_api', - 'webhook.test', + auditAction, webhookId, - { target_url: webhook.webhookUrl, http_status: httpStatus, success, latency_ms: latencyMs }, + { + target_url: webhook.webhookUrl, + http_status: httpStatus, + success, + latency_ms: latencyMs, + event_type: eventType, + signature_included: includeSignature, + }, ); return { @@ -113,6 +138,9 @@ export class DeveloperService { response_body: responseBody, latency_ms: latencyMs, sent_at: sentAt, + event_type: eventType, + event_id: eventId, + signature_included: includeSignature, }; } @@ -219,4 +247,63 @@ export class DeveloperService { const hmac = crypto.createHmac('sha256', secret).update(signed).digest('hex'); return `t=${timestamp},v1=${hmac}`; } + + private buildSamplePayload( + eventType: WebhookSampleEventType, + recipientPublicKey: string, + timestamp: string, + ): Record { + const base = { + test: true, + source: 'developer_self_service_api', + schema_version: '2026-04-29', + }; + + switch (eventType) { + case 'link.created': + return { + ...base, + link_id: 'plink_sample_01', + creator_public_key: recipientPublicKey, + asset: 'XLM', + amount: '25.0000000', + memo: 'QuickEx sample payment link', + expires_at: new Date(new Date(timestamp).getTime() + 86_400_000).toISOString(), + }; + case 'payment.received': + return { + ...base, + payment_id: 'pay_sample_received_01', + link_id: 'plink_sample_01', + from_public_key: 'GBZXN7PIRZGNMHGA6U2QBG7A5XBQ2YH6R3MGNJ2T63PXWKBUI5V3R2ZU', + to_public_key: recipientPublicKey, + asset: 'XLM', + amount: '25.0000000', + tx_hash: 'sample_received_tx_hash', + }; + case 'payment.settled': + return { + ...base, + payment_id: 'pay_sample_settled_01', + settlement_id: 'set_sample_01', + recipient_public_key: recipientPublicKey, + asset: 'XLM', + amount: '25.0000000', + fee_amount: '0.1000000', + settled_at: timestamp, + tx_hash: 'sample_settlement_tx_hash', + }; + case 'payment.failed': + return { + ...base, + payment_id: 'pay_sample_failed_01', + link_id: 'plink_sample_01', + recipient_public_key: recipientPublicKey, + asset: 'XLM', + amount: '25.0000000', + failure_code: 'INSUFFICIENT_BALANCE', + failure_message: 'Sample failure: sender balance was too low.', + }; + } + } } diff --git a/app/backend/src/developer/developer.service.unit.spec.ts b/app/backend/src/developer/developer.service.unit.spec.ts index b31a6a0b..868bce3f 100644 --- a/app/backend/src/developer/developer.service.unit.spec.ts +++ b/app/backend/src/developer/developer.service.unit.spec.ts @@ -119,6 +119,7 @@ describe('DeveloperService', () => { expect(result.success).toBe(true); expect(result.http_status).toBe(200); expect(result.webhook_id).toBe('webhook-uuid-1234'); + expect(result.event_type).toBe('payment.received'); expect(mockFetch).toHaveBeenCalledWith( 'https://example.com/hook', expect.objectContaining({ method: 'POST' }), @@ -165,6 +166,55 @@ describe('DeveloperService', () => { }); }); + // ------------------------------------------------------------------------- + describe('sendSampleWebhookEvent', () => { + it('sends the requested canonical event type', async () => { + (mockWebhookService.getWebhook as jest.Mock).mockResolvedValue(makeWebhook()); + const mockFetch = jest.spyOn(global, 'fetch').mockResolvedValue({ + ok: true, + status: 200, + text: () => Promise.resolve('ok'), + } as unknown as Response); + + const result = await service.sendSampleWebhookEvent('webhook-uuid-1234', { + event_type: 'payment.settled', + timestamp: '2026-04-29T12:00:00.000Z', + }); + + const [, request] = mockFetch.mock.calls[0]; + const body = JSON.parse((request as RequestInit).body as string); + + expect(result.event_type).toBe('payment.settled'); + expect(result.signature_included).toBe(true); + expect(body.eventType).toBe('payment.settled'); + expect(body.payload.settlement_id).toBe('set_sample_01'); + expect((request as RequestInit).headers).toMatchObject({ + 'X-QX-Event': 'payment.settled', + 'X-QX-Timestamp': String(new Date('2026-04-29T12:00:00.000Z').getTime()), + }); + }); + + it('can omit signature headers for unsigned receiver tests', async () => { + (mockWebhookService.getWebhook as jest.Mock).mockResolvedValue(makeWebhook()); + const mockFetch = jest.spyOn(global, 'fetch').mockResolvedValue({ + ok: true, + status: 200, + text: () => Promise.resolve('ok'), + } as unknown as Response); + + const result = await service.sendSampleWebhookEvent('webhook-uuid-1234', { + event_type: 'payment.failed', + include_signature: false, + }); + + const headers = (mockFetch.mock.calls[0][1] as RequestInit).headers as Record; + expect(result.signature_included).toBe(false); + expect(headers['X-QX-Signature']).toBeUndefined(); + expect(headers['X-QX-Timestamp']).toBeUndefined(); + expect(headers['X-QX-Event']).toBe('payment.failed'); + }); + }); + // ------------------------------------------------------------------------- describe('bulkRevoke', () => { it('revokes all keys on full success', async () => { @@ -335,4 +385,3 @@ describe('DeveloperService', () => { }); }); }); - diff --git a/app/backend/src/developer/dto/developer.dto.ts b/app/backend/src/developer/dto/developer.dto.ts index bdd11c4b..2ba89b78 100644 --- a/app/backend/src/developer/dto/developer.dto.ts +++ b/app/backend/src/developer/dto/developer.dto.ts @@ -4,8 +4,48 @@ import { IsUUID, ArrayMinSize, ArrayMaxSize, + IsBoolean, + IsEnum, + IsISO8601, + IsOptional, } from 'class-validator'; +export const WEBHOOK_SAMPLE_EVENT_TYPES = [ + 'link.created', + 'payment.received', + 'payment.settled', + 'payment.failed', +] as const; + +export type WebhookSampleEventType = (typeof WEBHOOK_SAMPLE_EVENT_TYPES)[number]; + +export class WebhookSampleEventDto { + @ApiPropertyOptional({ + enum: WEBHOOK_SAMPLE_EVENT_TYPES, + default: 'payment.received', + description: 'Canonical event type to send to the webhook receiver.', + }) + @IsEnum(WEBHOOK_SAMPLE_EVENT_TYPES) + @IsOptional() + event_type?: WebhookSampleEventType; + + @ApiPropertyOptional({ + default: true, + description: 'When false, omits QuickEx signature headers for receivers that are testing unsigned payloads.', + }) + @IsBoolean() + @IsOptional() + include_signature?: boolean; + + @ApiPropertyOptional({ + example: '2026-04-29T12:00:00.000Z', + description: 'Optional event timestamp. Defaults to the current server time.', + }) + @IsISO8601() + @IsOptional() + timestamp?: string; +} + export class BulkRevokeDto { @ApiProperty({ description: 'Array of API key UUIDs to revoke (max 100)', @@ -65,6 +105,15 @@ export class WebhookTestResultDto { @ApiProperty({ example: '2026-04-29T12:00:00.000Z' }) sent_at: string; + + @ApiPropertyOptional({ enum: WEBHOOK_SAMPLE_EVENT_TYPES, example: 'payment.received' }) + event_type?: WebhookSampleEventType; + + @ApiPropertyOptional({ example: 'evt_sample_123' }) + event_id?: string; + + @ApiPropertyOptional({ example: true }) + signature_included?: boolean; } export class HealthComponentsDto { diff --git a/app/backend/src/ingestion/__tests__/soroban-event-indexer.service.unit.spec.ts b/app/backend/src/ingestion/__tests__/soroban-event-indexer.service.unit.spec.ts index b9f017c6..5132a965 100644 --- a/app/backend/src/ingestion/__tests__/soroban-event-indexer.service.unit.spec.ts +++ b/app/backend/src/ingestion/__tests__/soroban-event-indexer.service.unit.spec.ts @@ -8,6 +8,7 @@ import { EscrowEventRepository } from "../escrow-event.repository"; import { PrivacyEventRepository } from "../privacy-event.repository"; import { AdminEventRepository } from "../admin-event.repository"; import { StealthEventRepository } from "../stealth-event.repository"; +import { UnparsedSorobanEventRepository } from "../unparsed-soroban-event.repository"; import { MetricsService } from "../../metrics/metrics.service"; // ─── Helpers ───────────────────────────────────────────────────────────────── @@ -89,8 +90,16 @@ function buildMocks() { upsertEvent: jest.fn().mockResolvedValue(undefined), } as unknown as StealthEventRepository; + const unparsedRepo = { + save: jest.fn().mockResolvedValue(undefined), + listPending: jest.fn().mockResolvedValue([]), + markReplayed: jest.fn().mockResolvedValue(undefined), + markFailed: jest.fn().mockResolvedValue(undefined), + } as unknown as UnparsedSorobanEventRepository; + const metrics = { recordUnknownSchemaVersion: jest.fn(), + recordError: jest.fn(), } as unknown as MetricsService; const eventEmitter = { emit: jest.fn() } as unknown as EventEmitter2; @@ -102,6 +111,7 @@ function buildMocks() { privacyRepo, adminRepo, stealthRepo, + unparsedRepo, metrics, eventEmitter, }; @@ -115,6 +125,7 @@ function buildService(mocks: ReturnType) { mocks.privacyRepo, mocks.adminRepo, mocks.stealthRepo, + mocks.unparsedRepo, mocks.metrics, mocks.eventEmitter, ); @@ -226,6 +237,87 @@ describe("SorobanEventIndexerService", () => { "EscrowDeposited", 99, ); + expect(mocks.unparsedRepo.save).toHaveBeenCalledWith( + expect.objectContaining({ + reason: "unknown_schema_version", + eventName: "EscrowDeposited", + schemaVersion: 99, + raw, + }), + ); + }); + + it("captures parse failures for known schema versions", async () => { + const mocks = buildMocks(); + const svc = buildService(mocks); + + const topics = [ + symVal("EscrowDeposited"), + xdr.ScVal.scvBytes(Buffer.from(COMMITMENT_HEX, "hex")), + nativeToScVal(OWNER), + ]; + const data = mapVal({ + schema_version: nativeToScVal(2, { type: "u32" }), + amount: nativeToScVal(1_000n, { type: "i128" }), + expires_at: nativeToScVal(9999999n, { type: "u64" }), + timestamp: nativeToScVal(1700000000n, { type: "u64" }), + }); + const raw: RawHorizonContractEvent = { + id: "parse-failure", + paging_token: "101-1", + transaction_hash: "tx-parse-failure", + ledger: 101, + created_at: "2026-01-01T00:00:00Z", + contract_id: CONTRACT_ID, + type: "contract", + topic: topics.map((v) => v.toXDR("base64")), + value: { xdr: data.toXDR("base64") }, + }; + + mockHorizonPage([raw]); + + const result = await svc.indexLedgerRange(CONTRACT_ID, 101, 101); + + expect(result.parseFailures).toBe(1); + expect(mocks.metrics.recordError).toHaveBeenCalledWith( + "soroban_indexer", + "parse_failure", + ); + expect(mocks.unparsedRepo.save).toHaveBeenCalledWith( + expect.objectContaining({ + reason: "parse_failure", + eventName: "EscrowDeposited", + schemaVersion: 2, + raw, + }), + ); + }); + + it("replays retained unparsed events when they parse successfully", async () => { + const mocks = buildMocks(); + const record = makeEscrowDepositedRaw(102, "102-1"); + (mocks.unparsedRepo.listPending as jest.Mock).mockResolvedValue([ + { + raw: record, + pagingToken: record.paging_token, + contractId: record.contract_id, + ledger: record.ledger, + transactionHash: record.transaction_hash, + attempts: 0, + reason: "parse_failure", + }, + ]); + const svc = buildService(mocks); + + const result = await svc.replayUnparsedEvents(10); + + expect(result).toEqual({ attempted: 1, replayed: 1, stillUnparsed: 0 }); + expect(mocks.escrowRepo.upsertEvent).toHaveBeenCalledTimes(1); + expect(mocks.unparsedRepo.markReplayed).toHaveBeenCalledWith("102-1"); + expect(mocks.eventEmitter.emit).toHaveBeenCalledWith( + "stellar.EscrowDeposited", + expect.anything(), + ); }); it("is idempotent: calling twice with same range does not double-persist", async () => { diff --git a/app/backend/src/ingestion/ingestion.module.ts b/app/backend/src/ingestion/ingestion.module.ts index f1a5bb3e..09a9cd9c 100644 --- a/app/backend/src/ingestion/ingestion.module.ts +++ b/app/backend/src/ingestion/ingestion.module.ts @@ -9,6 +9,7 @@ import { PrivacyEventRepository } from "./privacy-event.repository"; import { AdminEventRepository } from "./admin-event.repository"; import { StealthEventRepository } from "./stealth-event.repository"; import { IndexerCheckpointRepository } from "./indexer-checkpoint.repository"; +import { UnparsedSorobanEventRepository } from "./unparsed-soroban-event.repository"; import { SorobanEventParser } from "./soroban-event.parser"; import { StellarIngestionService } from "./stellar-ingestion.service"; import { SorobanEventIndexerService } from "./soroban-event-indexer.service"; @@ -29,6 +30,7 @@ import { IngestionBootstrapService } from "./ingestion-bootstrap.service"; AdminEventRepository, StealthEventRepository, IndexerCheckpointRepository, + UnparsedSorobanEventRepository, SorobanEventParser, StellarIngestionService, SorobanEventIndexerService, @@ -40,6 +42,7 @@ import { IngestionBootstrapService } from "./ingestion-bootstrap.service"; SorobanEventParser, CursorRepository, EscrowEventRepository, + UnparsedSorobanEventRepository, ], }) export class IngestionModule {} diff --git a/app/backend/src/ingestion/soroban-event-indexer.service.ts b/app/backend/src/ingestion/soroban-event-indexer.service.ts index 21ee530c..ef1848ec 100644 --- a/app/backend/src/ingestion/soroban-event-indexer.service.ts +++ b/app/backend/src/ingestion/soroban-event-indexer.service.ts @@ -10,6 +10,7 @@ import { EscrowEventRepository } from "./escrow-event.repository"; import { PrivacyEventRepository } from "./privacy-event.repository"; import { AdminEventRepository } from "./admin-event.repository"; import { StealthEventRepository } from "./stealth-event.repository"; +import { UnparsedSorobanEventRepository } from "./unparsed-soroban-event.repository"; import type { QuickExContractEvent, EscrowEvent, @@ -26,6 +27,13 @@ export interface LedgerRangeResult { processed: number; persisted: number; skippedUnknownSchema: number; + parseFailures: number; +} + +export interface ReplayUnparsedResult { + attempted: number; + replayed: number; + stillUnparsed: number; } export interface DualReadConfig { @@ -60,6 +68,7 @@ export class SorobanEventIndexerService { private readonly privacyRepo: PrivacyEventRepository, private readonly adminRepo: AdminEventRepository, private readonly stealthRepo: StealthEventRepository, + private readonly unparsedRepo: UnparsedSorobanEventRepository, private readonly metrics: MetricsService, private readonly eventEmitter: EventEmitter2, ) { @@ -106,7 +115,7 @@ export class SorobanEventIndexerService { this.logger.log( `Contract ${contractId}: ledger range [${effectiveFrom}, ${toLedger}] already indexed; skipping.`, ); - return { fromLedger, toLedger, processed: 0, persisted: 0, skippedUnknownSchema: 0 }; + return { fromLedger, toLedger, processed: 0, persisted: 0, skippedUnknownSchema: 0, parseFailures: 0 }; } const inDualReadWindow = this.isInDualReadWindow(effectiveFrom, dualReadConfig); @@ -119,6 +128,7 @@ export class SorobanEventIndexerService { let processed = 0; let persisted = 0; let skippedUnknownSchema = 0; + let parseFailures = 0; // In dual-read mode, index both current and previous contract IDs if (inDualReadWindow && dualReadConfig?.previousContractId) { @@ -131,6 +141,7 @@ export class SorobanEventIndexerService { processed += previousResult.processed; persisted += previousResult.persisted; skippedUnknownSchema += previousResult.skippedUnknownSchema; + parseFailures += previousResult.parseFailures; } // Always index the current contract ID @@ -143,13 +154,14 @@ export class SorobanEventIndexerService { processed += currentResult.processed; persisted += currentResult.persisted; skippedUnknownSchema += currentResult.skippedUnknownSchema; + parseFailures += currentResult.parseFailures; this.logger.log( `Indexed contract ${contractId} [${effectiveFrom}, ${toLedger}]: ` + - `processed=${processed} persisted=${persisted} skippedUnknownSchema=${skippedUnknownSchema}`, + `processed=${processed} persisted=${persisted} skippedUnknownSchema=${skippedUnknownSchema} parseFailures=${parseFailures}`, ); - return { fromLedger: effectiveFrom, toLedger, processed, persisted, skippedUnknownSchema }; + return { fromLedger: effectiveFrom, toLedger, processed, persisted, skippedUnknownSchema, parseFailures }; } private async indexContractWithCursor( @@ -157,10 +169,11 @@ export class SorobanEventIndexerService { fromLedger: number, toLedger: number, cursor: string | undefined, - ): Promise<{ processed: number; persisted: number; skippedUnknownSchema: number }> { + ): Promise<{ processed: number; persisted: number; skippedUnknownSchema: number; parseFailures: number }> { let processed = 0; let persisted = 0; let skippedUnknownSchema = 0; + let parseFailures = 0; let nextCursor = cursor; while (true) { @@ -178,7 +191,12 @@ export class SorobanEventIndexerService { const event = this.parser.parse(raw); if (!event) { - skippedUnknownSchema++; + const outcome = await this.captureUnparsedEvent(raw); + if (outcome === "parse_failure") { + parseFailures++; + } else { + skippedUnknownSchema++; + } continue; } @@ -200,7 +218,7 @@ export class SorobanEventIndexerService { // Final checkpoint await this.checkpointRepo.saveLastLedger(contractId, toLedger); - return { processed, persisted, skippedUnknownSchema }; + return { processed, persisted, skippedUnknownSchema, parseFailures }; } private isInDualReadWindow(currentLedger: number, config?: DualReadConfig): boolean { @@ -290,4 +308,74 @@ export class SorobanEventIndexerService { this.logger.debug(`Event ${(event as QuickExContractEvent).eventType} not persisted.`); } } + + async listUnparsedEvents(limit = 100) { + return this.unparsedRepo.listPending(limit); + } + + async replayUnparsedEvents(limit = 100): Promise { + const pending = await this.unparsedRepo.listPending(limit); + let replayed = 0; + let stillUnparsed = 0; + + for (const record of pending) { + const event = this.parser.parse(record.raw); + if (event) { + try { + await this.persistEvent(event); + await this.unparsedRepo.markReplayed(record.pagingToken); + this.eventEmitter.emit(`stellar.${event.eventType}`, event); + replayed++; + } catch (err) { + await this.unparsedRepo.markFailed( + record.pagingToken, + (err as Error).message, + ); + stillUnparsed++; + } + } else { + await this.unparsedRepo.markFailed( + record.pagingToken, + "Parser still returned null after replay attempt", + ); + stillUnparsed++; + } + } + + return { attempted: pending.length, replayed, stillUnparsed }; + } + + private async captureUnparsedEvent( + raw: RawHorizonContractEvent, + ): Promise<"unknown_schema_version" | "parse_failure" | "ignored"> { + const metadata = this.parser.inspect(raw); + if (!metadata) { + return "ignored"; + } + + if ( + !this.parser.isSupportedSchemaVersion( + metadata.eventName, + metadata.schemaVersion, + ) + ) { + await this.unparsedRepo.save({ + raw, + reason: "unknown_schema_version", + eventName: metadata.eventName, + schemaVersion: metadata.schemaVersion, + }); + return "unknown_schema_version"; + } + + this.metrics.recordError("soroban_indexer", "parse_failure"); + await this.unparsedRepo.save({ + raw, + reason: "parse_failure", + eventName: metadata.eventName, + schemaVersion: metadata.schemaVersion, + errorMessage: "Parser returned null for a supported schema version", + }); + return "parse_failure"; + } } diff --git a/app/backend/src/ingestion/soroban-event.parser.ts b/app/backend/src/ingestion/soroban-event.parser.ts index 53d154b8..38226f53 100644 --- a/app/backend/src/ingestion/soroban-event.parser.ts +++ b/app/backend/src/ingestion/soroban-event.parser.ts @@ -22,6 +22,7 @@ import { /** Maximum schema version this indexer understands. */ export const MAX_SUPPORTED_SCHEMA_VERSION = 2; +export const SUPPORTED_SCHEMA_VERSIONS = [1, 2] as const; export type UnknownSchemaVersionHandler = ( eventName: SorobanEventType, @@ -29,6 +30,13 @@ export type UnknownSchemaVersionHandler = ( pagingToken: string, ) => void; +export interface RawEventMetadata { + eventName: SorobanEventType; + schemaVersion: number; + contractId: string; + pagingToken: string; +} + /** * Raw Horizon contract event record shape (subset we need). */ @@ -70,6 +78,31 @@ export class SorobanEventParser { private readonly onUnknownSchemaVersion?: UnknownSchemaVersionHandler, ) {} + inspect(raw: RawHorizonContractEvent): RawEventMetadata | null { + try { + const topics = raw.topic.map((t) => xdr.ScVal.fromXDR(t, "base64")); + const dataVal = xdr.ScVal.fromXDR(raw.value.xdr, "base64"); + const layout = this.resolveTopicLayout(topics); + if (!layout) return null; + + return { + eventName: layout.eventName, + schemaVersion: this.extractSchemaVersionFromData(dataVal), + contractId: raw.contract_id, + pagingToken: raw.paging_token, + }; + } catch { + return null; + } + } + + isSupportedSchemaVersion( + eventName: SorobanEventType, + schemaVersion: number, + ): boolean { + return this.isCompatibleSchemaVersion(eventName, schemaVersion); + } + /** * Attempt to parse a raw Horizon contract event. * Returns null when the event is unrecognised, malformed, or carries an @@ -99,7 +132,12 @@ export class SorobanEventParser { return null; } - if (!this.isCompatibleSchemaVersion(layout.eventName, schemaVersion)) { + if ( + !this.isSupportedSchemaVersion( + layout.eventName, + schemaVersion, + ) + ) { this.logger.warn( `Unsupported ${layout.eventName} schema version ${schemaVersion}`, ); diff --git a/app/backend/src/ingestion/soroban-indexer.controller.ts b/app/backend/src/ingestion/soroban-indexer.controller.ts index 30e0d025..589dcacc 100644 --- a/app/backend/src/ingestion/soroban-indexer.controller.ts +++ b/app/backend/src/ingestion/soroban-indexer.controller.ts @@ -2,14 +2,17 @@ import { Body, Controller, ConflictException, + Get, HttpCode, HttpStatus, Post, + Query, } from "@nestjs/common"; import { ApiOperation, ApiResponse, ApiTags } from "@nestjs/swagger"; import { IsBoolean, IsInt, IsNotEmpty, IsOptional, IsString, Min } from "class-validator"; import { SorobanEventIndexerService, LedgerRangeResult } from "./soroban-event-indexer.service"; +import type { UnparsedSorobanEventRecord } from "./unparsed-soroban-event.repository"; class ReindexDto { @IsString() @@ -73,4 +76,29 @@ export class SorobanIndexerController { this.running = false; } } + + @Get("unparsed-events") + @ApiOperation({ + summary: "List pending unparsed Soroban events", + description: + "Returns raw contract events retained because their schema version was unknown or parsing failed.", + }) + @ApiResponse({ status: 200, description: "Pending unparsed events" }) + listUnparsed( + @Query("limit") limit?: string, + ): Promise { + return this.indexer.listUnparsedEvents(Number(limit ?? 100)); + } + + @Post("unparsed-events/replay") + @HttpCode(HttpStatus.OK) + @ApiOperation({ + summary: "Replay pending unparsed Soroban events", + description: + "Attempts to parse and persist retained raw events after schema support has been updated.", + }) + @ApiResponse({ status: 200, description: "Replay completed" }) + replayUnparsed(@Query("limit") limit?: string) { + return this.indexer.replayUnparsedEvents(Number(limit ?? 100)); + } } diff --git a/app/backend/src/ingestion/unparsed-soroban-event.repository.ts b/app/backend/src/ingestion/unparsed-soroban-event.repository.ts new file mode 100644 index 00000000..8e45b4e4 --- /dev/null +++ b/app/backend/src/ingestion/unparsed-soroban-event.repository.ts @@ -0,0 +1,121 @@ +import { Injectable, Logger } from "@nestjs/common"; + +import { SupabaseService } from "../supabase/supabase.service"; +import type { RawHorizonContractEvent } from "./soroban-event.parser"; + +export type UnparsedSorobanEventReason = + | "unknown_schema_version" + | "parse_failure"; + +export interface SaveUnparsedSorobanEventInput { + raw: RawHorizonContractEvent; + reason: UnparsedSorobanEventReason; + eventName?: string | null; + schemaVersion?: number | null; + errorMessage?: string | null; +} + +export interface UnparsedSorobanEventRecord + extends SaveUnparsedSorobanEventInput { + pagingToken: string; + contractId: string; + ledger: number; + transactionHash: string; + attempts: number; +} + +@Injectable() +export class UnparsedSorobanEventRepository { + private readonly logger = new Logger(UnparsedSorobanEventRepository.name); + + constructor(private readonly supabase: SupabaseService) {} + + async save(input: SaveUnparsedSorobanEventInput): Promise { + const row = { + paging_token: input.raw.paging_token, + contract_id: input.raw.contract_id, + ledger: input.raw.ledger, + transaction_hash: input.raw.transaction_hash, + event_name: input.eventName ?? null, + schema_version: input.schemaVersion ?? null, + reason: input.reason, + raw_topics: input.raw.topic, + raw_payload: input.raw.value, + raw_event: input.raw, + error_message: input.errorMessage ?? null, + status: "pending", + updated_at: new Date().toISOString(), + }; + + const { error } = await this.supabase + .getClient() + .from("unparsed_soroban_events") + .upsert(row, { onConflict: "paging_token" }); + + if (error) { + this.logger.error( + `Failed to persist unparsed Soroban event ${input.raw.paging_token}: ${error.message}`, + ); + throw error; + } + } + + async listPending(limit = 100): Promise { + const { data, error } = await this.supabase + .getClient() + .from("unparsed_soroban_events") + .select("*") + .eq("status", "pending") + .order("ledger", { ascending: true }) + .limit(limit); + + if (error) { + this.logger.error(`Failed to list unparsed Soroban events: ${error.message}`); + throw error; + } + + return (data ?? []).map((row: Record) => ({ + raw: row.raw_event as RawHorizonContractEvent, + reason: row.reason as UnparsedSorobanEventReason, + eventName: (row.event_name as string | null) ?? null, + schemaVersion: + row.schema_version === null || row.schema_version === undefined + ? null + : Number(row.schema_version), + errorMessage: (row.error_message as string | null) ?? null, + pagingToken: String(row.paging_token), + contractId: String(row.contract_id), + ledger: Number(row.ledger), + transactionHash: String(row.transaction_hash), + attempts: Number(row.attempts ?? 0), + })); + } + + async markReplayed(pagingToken: string): Promise { + await this.updateStatus(pagingToken, "replayed"); + } + + async markFailed(pagingToken: string, errorMessage: string): Promise { + const { error } = await this.supabase + .getClient() + .from("unparsed_soroban_events") + .update({ + status: "pending", + error_message: errorMessage, + updated_at: new Date().toISOString(), + }) + .eq("paging_token", pagingToken); + + if (error) throw error; + } + + private async updateStatus(pagingToken: string, status: string): Promise { + const { error } = await this.supabase + .getClient() + .from("unparsed_soroban_events") + .update({ status, updated_at: new Date().toISOString() }) + .eq("paging_token", pagingToken); + + if (error) throw error; + } +} diff --git a/app/backend/supabase/migrations/20260601000000_create_unparsed_soroban_events.sql b/app/backend/supabase/migrations/20260601000000_create_unparsed_soroban_events.sql new file mode 100644 index 00000000..cc691cdc --- /dev/null +++ b/app/backend/supabase/migrations/20260601000000_create_unparsed_soroban_events.sql @@ -0,0 +1,28 @@ +-- BE-51: Retain unparsed Soroban events for schema replay. + +CREATE TABLE IF NOT EXISTS unparsed_soroban_events ( + paging_token TEXT PRIMARY KEY, + contract_id TEXT NOT NULL, + ledger BIGINT NOT NULL, + transaction_hash TEXT NOT NULL, + event_name TEXT, + schema_version INT, + reason TEXT NOT NULL CHECK (reason IN ('unknown_schema_version', 'parse_failure')), + raw_topics JSONB NOT NULL, + raw_payload JSONB NOT NULL, + raw_event JSONB NOT NULL, + error_message TEXT, + status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'replayed')), + attempts INT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS unparsed_soroban_events_status_ledger_idx + ON unparsed_soroban_events (status, ledger); + +CREATE INDEX IF NOT EXISTS unparsed_soroban_events_contract_idx + ON unparsed_soroban_events (contract_id); + +COMMENT ON TABLE unparsed_soroban_events IS + 'Raw Soroban events retained when schema versions are unknown or parsing fails.';