Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,30 @@ model SmartContract {
@@map("contracts")
}

/// ProcessedEvent tracks completed contract-event queue jobs to provide
/// idempotency guarantees and prevent duplicate donation/ milestone processing.
model ProcessedEvent {
id String @id @default(uuid())
txHash String
eventType String
processedAt DateTime @default(now())

@@unique([txHash, eventType])
@@map("processed_events")
}

/// EventCursor persists the Stellar event listener cursor across restarts,
/// preventing cursor loss due to Redis TTL eviction.
model EventCursor {
id String @id @default(uuid())
cursor String
network String
updatedAt DateTime @updatedAt

@@unique([network])
@@map("event_cursors")
}

/// DeadLetter model stores pruned failed Bull jobs for audit and analysis.
model DeadLetter {
id String @id @default(uuid())
Expand Down
4 changes: 2 additions & 2 deletions src/campaigns/campaigns.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ describe('CampaignsService milestone target validation', () => {
};

it.each([
['missing', undefined],
['missing', undefined as string | undefined],
['zero', '0'],
['zero decimal', '0.0000000'],
['below the minimum precision', '0.00000001'],
Expand All @@ -34,7 +34,7 @@ describe('CampaignsService milestone target validation', () => {
milestones: [
{
title: 'Prototype',
targetAmount,
targetAmount: targetAmount as string,
},
],
}),
Expand Down
130 changes: 130 additions & 0 deletions src/queue/contract-events.processor.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { ContractEventsProcessor } from './contract-events.processor';
import { Logger } from '@nestjs/common';

describe('ContractEventsProcessor', () => {
let processor: ContractEventsProcessor;
let prisma: any;

const mockJob = (data: any) =>
({
data,
id: 'test-job-id',
}) as any;

beforeEach(() => {
prisma = {
processedEvent: {
findUnique: jest.fn(),
create: jest.fn(),
},
donation: {
updateMany: jest.fn(),
},
milestone: {
updateMany: jest.fn(),
},
};
processor = new ContractEventsProcessor(prisma);
jest.spyOn(Logger.prototype, 'log').mockImplementation(() => undefined);
jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined);
});

afterEach(() => {
jest.restoreAllMocks();
});

describe('idempotency', () => {
it('skips already-processed (txHash, eventType) pairs', async () => {
prisma.processedEvent.findUnique.mockResolvedValue({
txHash: 'abc',
eventType: 'DonationReceived',
processedAt: new Date(),
});

const result = await processor.processEvent(
mockJob({ txHash: 'abc', eventType: 'DonationReceived' }),
);

expect(result).toEqual({ skipped: true, reason: 'duplicate' });
expect(prisma.donation.updateMany).not.toHaveBeenCalled();
expect(prisma.processedEvent.create).not.toHaveBeenCalled();
});

it('processes a new event and records idempotency key', async () => {
prisma.processedEvent.findUnique.mockResolvedValue(null);
prisma.processedEvent.create.mockResolvedValue({});
prisma.donation.updateMany.mockResolvedValue({ count: 1 });

await processor.processEvent(
mockJob({
txHash: 'abc',
eventType: 'DonationReceived',
contractId: 'CA...',
topics: ['DonationReceived', 'G...DONOR'],
value: { amount: '100' },
ledger: 12345,
pagingToken: '123-456',
createdAt: '2026-06-22T00:00:00Z',
}),
);

expect(prisma.donation.updateMany).toHaveBeenCalledWith({
where: { txHash: 'abc', status: 'PENDING' },
data: { status: 'CONFIRMED', confirmedAt: expect.any(Date) },
});
expect(prisma.processedEvent.create).toHaveBeenCalledWith({
data: { txHash: 'abc', eventType: 'DonationReceived' },
});
});
});

describe('event routing', () => {
it('updates milestone status on MilestoneReleased', async () => {
prisma.processedEvent.findUnique.mockResolvedValue(null);
prisma.processedEvent.create.mockResolvedValue({});
prisma.milestone.updateMany.mockResolvedValue({ count: 1 });

await processor.processEvent(
mockJob({
txHash: 'def',
eventType: 'MilestoneReleased',
contractId: 'CA...',
topics: ['MilestoneReleased'],
value: null,
ledger: 12346,
pagingToken: '124-456',
createdAt: '2026-06-22T00:00:01Z',
}),
);

expect(prisma.milestone.updateMany).toHaveBeenCalledWith({
where: { txHash: 'def', status: 'PENDING' },
data: { status: 'COMPLETED', completedAt: expect.any(Date) },
});
});

it('warns on unknown event types without crashing', async () => {
prisma.processedEvent.findUnique.mockResolvedValue(null);
prisma.processedEvent.create.mockResolvedValue({});
const warnSpy = jest.spyOn(Logger.prototype, 'warn');

await processor.processEvent(
mockJob({
txHash: 'xyz',
eventType: 'UnknownEvent',
contractId: 'CA...',
topics: ['UnknownEvent'],
value: null,
ledger: 12347,
pagingToken: '125-456',
createdAt: '2026-06-22T00:00:02Z',
}),
);

expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining('Unknown event type "UnknownEvent"'),
);
expect(prisma.processedEvent.create).toHaveBeenCalled();
});
});
});
96 changes: 96 additions & 0 deletions src/queue/contract-events.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { Processor, Process } from '@nestjs/bull';
import type { Job } from 'bull';
import { Logger } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { QUEUE_CONTRACT_EVENTS } from './queue.constants';

interface ContractEventJob {
contractId: string;
eventType: string;
topics: string[];
value: unknown;
ledger: number;
txHash: string;
pagingToken: string;
createdAt: string;
}

@Processor(QUEUE_CONTRACT_EVENTS)
export class ContractEventsProcessor {
private readonly logger = new Logger(ContractEventsProcessor.name);

constructor(private readonly prisma: PrismaService) {}

@Process('process-event')
async processEvent(job: Job<ContractEventJob>) {
const { txHash, eventType } = job.data;

const existing = await this.prisma.processedEvent.findUnique({
where: { txHash_eventType: { txHash, eventType } },
});
if (existing) {
this.logger.log(
`Skipping duplicate event [${eventType}] txHash=${txHash} — already processed at ${existing.processedAt.toISOString()}`,
);
return { skipped: true, reason: 'duplicate' };
}

try {
await this.handleEvent(job.data);

await this.prisma.processedEvent.create({
data: { txHash, eventType },
});

this.logger.log(
`Processed event [${eventType}] txHash=${txHash} contractId=${job.data.contractId}`,
);
return { processed: true };
} catch (err) {
this.logger.error(
`Failed to process event [${eventType}] txHash=${txHash}: ${err instanceof Error ? err.message : String(err)}`,
);
throw err;
}
}

private async handleEvent(data: ContractEventJob) {
const { eventType, topics, value, contractId, txHash } = data;

switch (eventType) {
case 'DonationReceived':
const donorAddress = topics[1] as string | undefined;

Check failure on line 62 in src/queue/contract-events.processor.ts

View workflow job for this annotation

GitHub Actions / Lint

Unexpected lexical declaration in case block
if (!donorAddress) {
this.logger.warn(`DonationReceived tx=${txHash}: no donor address in topics`);
break;
}
const amount = typeof value === 'object' && value !== null && 'amount' in value
? Number((value as Record<string, unknown>).amount)
: undefined;

Check failure on line 69 in src/queue/contract-events.processor.ts

View workflow job for this annotation

GitHub Actions / Lint

Unexpected lexical declaration in case block

await this.prisma.donation.updateMany({
where: { txHash, status: 'PENDING' },
data: { status: 'CONFIRMED', confirmedAt: new Date() },
});

this.logger.log(
`Confirmed donation tx=${txHash} ${amount ? `amount=${amount} ` : ''}donor=${donorAddress}`,
);
break;

case 'MilestoneReleased':
await this.prisma.milestone.updateMany({
where: { txHash, status: 'PENDING' },
data: { status: 'COMPLETED', completedAt: new Date() },
});

this.logger.log(`Completed milestone tx=${txHash}`);
break;

default:
this.logger.warn(
`Unknown event type "${eventType}" in tx=${txHash} — no handler registered`,
);
}
}
}
3 changes: 2 additions & 1 deletion src/queue/queue.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
import { ScheduleModule } from '@nestjs/schedule';
import { PrismaModule } from '../prisma/prisma.module';
import { QueueMaintenanceService } from './queue-maintenance.service';
import { ContractEventsProcessor } from './contract-events.processor';

const DEAD_LETTER_SETTINGS = {
attempts: 3,
Expand Down Expand Up @@ -40,7 +41,7 @@ const DEAD_LETTER_SETTINGS = {
ScheduleModule.forRoot(),
PrismaModule,
],
providers: [QueueMaintenanceService],
providers: [QueueMaintenanceService, ContractEventsProcessor],
exports: [BullModule],
})
export class QueueModule {}
105 changes: 105 additions & 0 deletions src/stellar/stellar-event.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigService } from '@nestjs/config';
import { getQueueToken } from '@nestjs/bull';
import { StellarEventService } from './stellar-event.service';
import { PrismaService } from '../prisma/prisma.service';

describe('StellarEventService — cursor persistence', () => {
const mockEventCursor = {
findUnique: jest.fn(),
upsert: jest.fn(),
};

const mockPrisma = {
eventCursor: mockEventCursor,
smartContract: { findMany: jest.fn().mockResolvedValue([]) },
} as unknown as PrismaService;

let mockConfig: { get: jest.Mock };
let mockQueue: { add: jest.Mock };

function buildConfig(url: string) {
mockConfig = {
get: jest.fn((key: string, fallback?: string) => {
if (key === 'STELLAR_HORIZON_URL') return url;
if (key === 'STELLAR_NETWORK') return undefined;
return fallback;
}),
} as any;
}

async function createService(): Promise<StellarEventService> {
mockQueue = { add: jest.fn() };
const module: TestingModule = await Test.createTestingModule({
providers: [
StellarEventService,
{ provide: ConfigService, useValue: mockConfig },
{ provide: getQueueToken('contract-events'), useValue: mockQueue },
{ provide: PrismaService, useValue: mockPrisma },
],
}).compile();
return module.get<StellarEventService>(StellarEventService);
}

beforeEach(() => {
jest.clearAllMocks();
});

describe('cursor persistence', () => {
it('loads cursor from Postgres on bootstrap when one exists', async () => {
buildConfig('https://horizon-testnet.stellar.org');
const svc = await createService();
mockEventCursor.findUnique.mockResolvedValue({
cursor: '123-456',
network: 'testnet',
});
(svc as any).active = false;

await svc.onApplicationBootstrap();

expect(mockEventCursor.findUnique).toHaveBeenCalledWith({
where: { network: 'testnet' },
});
});

it('starts from "now" when no cursor is found', async () => {
buildConfig('https://horizon-testnet.stellar.org');
const svc = await createService();
mockEventCursor.findUnique.mockResolvedValue(null);
(svc as any).active = false;

await svc.onApplicationBootstrap();

expect(mockEventCursor.findUnique).toHaveBeenCalledWith({
where: { network: 'testnet' },
});
expect((svc as any).lastCursor).toBe('now');
});
});

describe('network detection', () => {
it('identifies testnet from default URL', async () => {
buildConfig('https://horizon-testnet.stellar.org');
const svc = await createService();
expect((svc as any).network).toBe('testnet');
});

it('identifies mainnet from mainnet URL', async () => {
buildConfig('https://horizon.stellar.org');
const svc = await createService();
expect((svc as any).network).toBe('mainnet');
});

it('uses STELLAR_NETWORK config when provided', async () => {
mockConfig = {
get: jest.fn((key: string) => {
if (key === 'STELLAR_NETWORK') return 'custom-network';
if (key === 'STELLAR_HORIZON_URL') return 'https://horizon-testnet.stellar.org';
return undefined;
}),
} as any;
const svc = await createService();
expect((svc as any).network).toBe('custom-network');
});
});
});
Loading
Loading