From 796e86b3ec4996bba9aa078481381d13402c456f Mon Sep 17 00:00:00 2001 From: d3vobed Date: Mon, 22 Jun 2026 01:47:23 +0100 Subject: [PATCH] fix(#13): persist Stellar event listener cursor in Postgres, add contract-events processor with idempotency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace Redis TTL-backed cursor storage (60s default eviction) with persistent Postgres storage. The cursor is now stored in an event_cursors table keyed by network (testnet/mainnet), surviving restarts and eliminating the silent cursor-loss bug. Changes: - Add EventCursor Prisma model with unique constraint on network - Add ProcessedEvent Prisma model for idempotency key (txHash, eventType) - Replace cacheManager.get/set with prisma.eventCursor.findUnique/upsert - Infer network from Horizon URL ("testnet" / "mainnet"), overridable via STELLAR_NETWORK env var - Create ContractEventsProcessor to consume contract-events queue - Processor idempotency: skips already-processed (txHash, eventType) pairs - Handles DonationReceived → confirms donation (status CONFIRMED) - Handles MilestoneReleased → completes milestone (status COMPLETED) - 9 new tests cover cursor bootstrapping, network detection, processor idempotency, DonationReceived routing, MilestoneReleased routing, and unknown event fallback Closes #13 --- prisma/schema.prisma | 24 ++++ src/campaigns/campaigns.service.spec.ts | 4 +- src/queue/contract-events.processor.spec.ts | 130 ++++++++++++++++++++ src/queue/contract-events.processor.ts | 96 +++++++++++++++ src/queue/queue.module.ts | 3 +- src/stellar/stellar-event.service.spec.ts | 105 ++++++++++++++++ src/stellar/stellar-event.service.ts | 32 +++-- 7 files changed, 378 insertions(+), 16 deletions(-) create mode 100644 src/queue/contract-events.processor.spec.ts create mode 100644 src/queue/contract-events.processor.ts create mode 100644 src/stellar/stellar-event.service.spec.ts diff --git a/prisma/schema.prisma b/prisma/schema.prisma index eb176ae..b8783b8 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -448,6 +448,30 @@ model SmartContract { @@map("contracts") } +/// ProcessedEvent tracks completed contract-event queue jobs to provide +/// idempotency guarantees and prevent duplicate donation/ milestone processing. +model ProcessedEvent { + id String @id @default(uuid()) + txHash String + eventType String + processedAt DateTime @default(now()) + + @@unique([txHash, eventType]) + @@map("processed_events") +} + +/// EventCursor persists the Stellar event listener cursor across restarts, +/// preventing cursor loss due to Redis TTL eviction. +model EventCursor { + id String @id @default(uuid()) + cursor String + network String + updatedAt DateTime @updatedAt + + @@unique([network]) + @@map("event_cursors") +} + /// DeadLetter model stores pruned failed Bull jobs for audit and analysis. model DeadLetter { id String @id @default(uuid()) diff --git a/src/campaigns/campaigns.service.spec.ts b/src/campaigns/campaigns.service.spec.ts index 73184e8..d39510e 100644 --- a/src/campaigns/campaigns.service.spec.ts +++ b/src/campaigns/campaigns.service.spec.ts @@ -21,7 +21,7 @@ describe('CampaignsService milestone target validation', () => { }; it.each([ - ['missing', undefined], + ['missing', undefined as string | undefined], ['zero', '0'], ['zero decimal', '0.0000000'], ['below the minimum precision', '0.00000001'], @@ -34,7 +34,7 @@ describe('CampaignsService milestone target validation', () => { milestones: [ { title: 'Prototype', - targetAmount, + targetAmount: targetAmount as string, }, ], }), diff --git a/src/queue/contract-events.processor.spec.ts b/src/queue/contract-events.processor.spec.ts new file mode 100644 index 0000000..379639f --- /dev/null +++ b/src/queue/contract-events.processor.spec.ts @@ -0,0 +1,130 @@ +import { ContractEventsProcessor } from './contract-events.processor'; +import { Logger } from '@nestjs/common'; + +describe('ContractEventsProcessor', () => { + let processor: ContractEventsProcessor; + let prisma: any; + + const mockJob = (data: any) => + ({ + data, + id: 'test-job-id', + }) as any; + + beforeEach(() => { + prisma = { + processedEvent: { + findUnique: jest.fn(), + create: jest.fn(), + }, + donation: { + updateMany: jest.fn(), + }, + milestone: { + updateMany: jest.fn(), + }, + }; + processor = new ContractEventsProcessor(prisma); + jest.spyOn(Logger.prototype, 'log').mockImplementation(() => undefined); + jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + describe('idempotency', () => { + it('skips already-processed (txHash, eventType) pairs', async () => { + prisma.processedEvent.findUnique.mockResolvedValue({ + txHash: 'abc', + eventType: 'DonationReceived', + processedAt: new Date(), + }); + + const result = await processor.processEvent( + mockJob({ txHash: 'abc', eventType: 'DonationReceived' }), + ); + + expect(result).toEqual({ skipped: true, reason: 'duplicate' }); + expect(prisma.donation.updateMany).not.toHaveBeenCalled(); + expect(prisma.processedEvent.create).not.toHaveBeenCalled(); + }); + + it('processes a new event and records idempotency key', async () => { + prisma.processedEvent.findUnique.mockResolvedValue(null); + prisma.processedEvent.create.mockResolvedValue({}); + prisma.donation.updateMany.mockResolvedValue({ count: 1 }); + + await processor.processEvent( + mockJob({ + txHash: 'abc', + eventType: 'DonationReceived', + contractId: 'CA...', + topics: ['DonationReceived', 'G...DONOR'], + value: { amount: '100' }, + ledger: 12345, + pagingToken: '123-456', + createdAt: '2026-06-22T00:00:00Z', + }), + ); + + expect(prisma.donation.updateMany).toHaveBeenCalledWith({ + where: { txHash: 'abc', status: 'PENDING' }, + data: { status: 'CONFIRMED', confirmedAt: expect.any(Date) }, + }); + expect(prisma.processedEvent.create).toHaveBeenCalledWith({ + data: { txHash: 'abc', eventType: 'DonationReceived' }, + }); + }); + }); + + describe('event routing', () => { + it('updates milestone status on MilestoneReleased', async () => { + prisma.processedEvent.findUnique.mockResolvedValue(null); + prisma.processedEvent.create.mockResolvedValue({}); + prisma.milestone.updateMany.mockResolvedValue({ count: 1 }); + + await processor.processEvent( + mockJob({ + txHash: 'def', + eventType: 'MilestoneReleased', + contractId: 'CA...', + topics: ['MilestoneReleased'], + value: null, + ledger: 12346, + pagingToken: '124-456', + createdAt: '2026-06-22T00:00:01Z', + }), + ); + + expect(prisma.milestone.updateMany).toHaveBeenCalledWith({ + where: { txHash: 'def', status: 'PENDING' }, + data: { status: 'COMPLETED', completedAt: expect.any(Date) }, + }); + }); + + it('warns on unknown event types without crashing', async () => { + prisma.processedEvent.findUnique.mockResolvedValue(null); + prisma.processedEvent.create.mockResolvedValue({}); + const warnSpy = jest.spyOn(Logger.prototype, 'warn'); + + await processor.processEvent( + mockJob({ + txHash: 'xyz', + eventType: 'UnknownEvent', + contractId: 'CA...', + topics: ['UnknownEvent'], + value: null, + ledger: 12347, + pagingToken: '125-456', + createdAt: '2026-06-22T00:00:02Z', + }), + ); + + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('Unknown event type "UnknownEvent"'), + ); + expect(prisma.processedEvent.create).toHaveBeenCalled(); + }); + }); +}); diff --git a/src/queue/contract-events.processor.ts b/src/queue/contract-events.processor.ts new file mode 100644 index 0000000..90af2e4 --- /dev/null +++ b/src/queue/contract-events.processor.ts @@ -0,0 +1,96 @@ +import { Processor, Process } from '@nestjs/bull'; +import type { Job } from 'bull'; +import { Logger } from '@nestjs/common'; +import { PrismaService } from '../prisma/prisma.service'; +import { QUEUE_CONTRACT_EVENTS } from './queue.constants'; + +interface ContractEventJob { + contractId: string; + eventType: string; + topics: string[]; + value: unknown; + ledger: number; + txHash: string; + pagingToken: string; + createdAt: string; +} + +@Processor(QUEUE_CONTRACT_EVENTS) +export class ContractEventsProcessor { + private readonly logger = new Logger(ContractEventsProcessor.name); + + constructor(private readonly prisma: PrismaService) {} + + @Process('process-event') + async processEvent(job: Job) { + const { txHash, eventType } = job.data; + + const existing = await this.prisma.processedEvent.findUnique({ + where: { txHash_eventType: { txHash, eventType } }, + }); + if (existing) { + this.logger.log( + `Skipping duplicate event [${eventType}] txHash=${txHash} — already processed at ${existing.processedAt.toISOString()}`, + ); + return { skipped: true, reason: 'duplicate' }; + } + + try { + await this.handleEvent(job.data); + + await this.prisma.processedEvent.create({ + data: { txHash, eventType }, + }); + + this.logger.log( + `Processed event [${eventType}] txHash=${txHash} contractId=${job.data.contractId}`, + ); + return { processed: true }; + } catch (err) { + this.logger.error( + `Failed to process event [${eventType}] txHash=${txHash}: ${err instanceof Error ? err.message : String(err)}`, + ); + throw err; + } + } + + private async handleEvent(data: ContractEventJob) { + const { eventType, topics, value, contractId, txHash } = data; + + switch (eventType) { + case 'DonationReceived': + const donorAddress = topics[1] as string | undefined; + if (!donorAddress) { + this.logger.warn(`DonationReceived tx=${txHash}: no donor address in topics`); + break; + } + const amount = typeof value === 'object' && value !== null && 'amount' in value + ? Number((value as Record).amount) + : undefined; + + await this.prisma.donation.updateMany({ + where: { txHash, status: 'PENDING' }, + data: { status: 'CONFIRMED', confirmedAt: new Date() }, + }); + + this.logger.log( + `Confirmed donation tx=${txHash} ${amount ? `amount=${amount} ` : ''}donor=${donorAddress}`, + ); + break; + + case 'MilestoneReleased': + await this.prisma.milestone.updateMany({ + where: { txHash, status: 'PENDING' }, + data: { status: 'COMPLETED', completedAt: new Date() }, + }); + + this.logger.log(`Completed milestone tx=${txHash}`); + break; + + default: + this.logger.warn( + `Unknown event type "${eventType}" in tx=${txHash} — no handler registered`, + ); + } + } +} diff --git a/src/queue/queue.module.ts b/src/queue/queue.module.ts index 7a2267b..3715a5c 100644 --- a/src/queue/queue.module.ts +++ b/src/queue/queue.module.ts @@ -10,6 +10,7 @@ import { import { ScheduleModule } from '@nestjs/schedule'; import { PrismaModule } from '../prisma/prisma.module'; import { QueueMaintenanceService } from './queue-maintenance.service'; +import { ContractEventsProcessor } from './contract-events.processor'; const DEAD_LETTER_SETTINGS = { attempts: 3, @@ -40,7 +41,7 @@ const DEAD_LETTER_SETTINGS = { ScheduleModule.forRoot(), PrismaModule, ], - providers: [QueueMaintenanceService], + providers: [QueueMaintenanceService, ContractEventsProcessor], exports: [BullModule], }) export class QueueModule {} diff --git a/src/stellar/stellar-event.service.spec.ts b/src/stellar/stellar-event.service.spec.ts new file mode 100644 index 0000000..640d2ff --- /dev/null +++ b/src/stellar/stellar-event.service.spec.ts @@ -0,0 +1,105 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ConfigService } from '@nestjs/config'; +import { getQueueToken } from '@nestjs/bull'; +import { StellarEventService } from './stellar-event.service'; +import { PrismaService } from '../prisma/prisma.service'; + +describe('StellarEventService — cursor persistence', () => { + const mockEventCursor = { + findUnique: jest.fn(), + upsert: jest.fn(), + }; + + const mockPrisma = { + eventCursor: mockEventCursor, + smartContract: { findMany: jest.fn().mockResolvedValue([]) }, + } as unknown as PrismaService; + + let mockConfig: { get: jest.Mock }; + let mockQueue: { add: jest.Mock }; + + function buildConfig(url: string) { + mockConfig = { + get: jest.fn((key: string, fallback?: string) => { + if (key === 'STELLAR_HORIZON_URL') return url; + if (key === 'STELLAR_NETWORK') return undefined; + return fallback; + }), + } as any; + } + + async function createService(): Promise { + mockQueue = { add: jest.fn() }; + const module: TestingModule = await Test.createTestingModule({ + providers: [ + StellarEventService, + { provide: ConfigService, useValue: mockConfig }, + { provide: getQueueToken('contract-events'), useValue: mockQueue }, + { provide: PrismaService, useValue: mockPrisma }, + ], + }).compile(); + return module.get(StellarEventService); + } + + beforeEach(() => { + jest.clearAllMocks(); + }); + + describe('cursor persistence', () => { + it('loads cursor from Postgres on bootstrap when one exists', async () => { + buildConfig('https://horizon-testnet.stellar.org'); + const svc = await createService(); + mockEventCursor.findUnique.mockResolvedValue({ + cursor: '123-456', + network: 'testnet', + }); + (svc as any).active = false; + + await svc.onApplicationBootstrap(); + + expect(mockEventCursor.findUnique).toHaveBeenCalledWith({ + where: { network: 'testnet' }, + }); + }); + + it('starts from "now" when no cursor is found', async () => { + buildConfig('https://horizon-testnet.stellar.org'); + const svc = await createService(); + mockEventCursor.findUnique.mockResolvedValue(null); + (svc as any).active = false; + + await svc.onApplicationBootstrap(); + + expect(mockEventCursor.findUnique).toHaveBeenCalledWith({ + where: { network: 'testnet' }, + }); + expect((svc as any).lastCursor).toBe('now'); + }); + }); + + describe('network detection', () => { + it('identifies testnet from default URL', async () => { + buildConfig('https://horizon-testnet.stellar.org'); + const svc = await createService(); + expect((svc as any).network).toBe('testnet'); + }); + + it('identifies mainnet from mainnet URL', async () => { + buildConfig('https://horizon.stellar.org'); + const svc = await createService(); + expect((svc as any).network).toBe('mainnet'); + }); + + it('uses STELLAR_NETWORK config when provided', async () => { + mockConfig = { + get: jest.fn((key: string) => { + if (key === 'STELLAR_NETWORK') return 'custom-network'; + if (key === 'STELLAR_HORIZON_URL') return 'https://horizon-testnet.stellar.org'; + return undefined; + }), + } as any; + const svc = await createService(); + expect((svc as any).network).toBe('custom-network'); + }); + }); +}); diff --git a/src/stellar/stellar-event.service.ts b/src/stellar/stellar-event.service.ts index 83cc30a..734a080 100644 --- a/src/stellar/stellar-event.service.ts +++ b/src/stellar/stellar-event.service.ts @@ -1,14 +1,11 @@ import { Injectable, - Inject, OnApplicationBootstrap, Logger, } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import type { Queue } from 'bull'; import { InjectQueue } from '@nestjs/bull'; -import { CACHE_MANAGER } from '@nestjs/cache-manager'; -import type { Cache } from 'cache-manager'; import { Horizon, xdr, scValToNative, StrKey } from '@stellar/stellar-sdk'; import { PrismaService } from '../prisma/prisma.service'; import { QUEUE_CONTRACT_EVENTS } from '../queue/queue.constants'; @@ -27,34 +24,39 @@ export class StellarEventService implements OnApplicationBootstrap { private lastCursor = 'now'; private isConnecting = false; private active = true; + private readonly network: string; constructor( private readonly config: ConfigService, private readonly prisma: PrismaService, @InjectQueue(QUEUE_CONTRACT_EVENTS) private readonly contractEventsQueue: Queue, - @Inject(CACHE_MANAGER) private readonly cacheManager: Cache, ) { this.horizonUrl = this.config.get('STELLAR_HORIZON_URL') || 'https://horizon-testnet.stellar.org'; this.horizonServer = new Horizon.Server(this.horizonUrl); + this.network = + this.config.get('STELLAR_NETWORK') || + (this.horizonUrl.includes('testnet') ? 'testnet' : 'mainnet'); } async onApplicationBootstrap() { this.logger.log('Starting Stellar Event Listener Service...'); - // Load last cursor from cache - const savedCursor = await this.cacheManager.get( - 'stellar:event_listener:cursor', - ); - if (savedCursor) { - this.lastCursor = savedCursor; + // Load last cursor from Postgres (persistent, no TTL) + const row = await this.prisma.eventCursor.findUnique({ + where: { network: this.network }, + }); + if (row) { + this.lastCursor = row.cursor; this.logger.log( - `Loaded last processed transaction cursor: ${this.lastCursor}`, + `Loaded last processed transaction cursor: ${this.lastCursor} (network: ${this.network})`, ); } else { - this.logger.log('No saved cursor found. Starting from "now"'); + this.logger.log( + `No saved cursor found for network "${this.network}". Starting from "now"`, + ); } // Catch up on any missed events and start the stream @@ -218,7 +220,11 @@ export class StellarEventService implements OnApplicationBootstrap { private async saveCursor(cursor: string) { this.lastCursor = cursor; - await this.cacheManager.set('stellar:event_listener:cursor', cursor); + await this.prisma.eventCursor.upsert({ + where: { network: this.network }, + create: { cursor, network: this.network }, + update: { cursor }, + }); } private parseEvents(resultMetaXdr: string): any[] {