From 3b0cd96c1b2542a7817395b8d5cf9f24504b2236 Mon Sep 17 00:00:00 2001 From: khaylebfortune <111098422+khaylebfortune@users.noreply.github.com> Date: Thu, 18 Jun 2026 15:09:55 +0000 Subject: [PATCH 1/2] Implement indexer module, status endpoint, and add indexer_state migration --- src/app.module.ts | 4 +- .../event-parser.service.ts | 0 src/indexer/indexer-status.service.ts | 21 +++++++++++ src/indexer/indexer.controller.ts | 16 ++++++++ .../indexer.module.ts} | 14 ++++--- .../indexer.processor.ts} | 37 +++++++++---------- .../indexer.service.ts} | 20 +++++----- .../interfaces/index.ts | 0 .../interfaces/indexer.interfaces.ts | 0 src/modules/health/health.service.ts | 2 +- src/modules/metrics/metrics.updater.ts | 2 +- .../20260618000000_create_indexer_state.sql | 7 ++++ .../blockchain-indexer.processor.spec.ts | 26 ++++++------- 13 files changed, 97 insertions(+), 52 deletions(-) rename src/{jobs/blockchain-indexer => indexer}/event-parser.service.ts (100%) create mode 100644 src/indexer/indexer-status.service.ts create mode 100644 src/indexer/indexer.controller.ts rename src/{jobs/blockchain-indexer/blockchain-indexer.module.ts => indexer/indexer.module.ts} (58%) rename src/{jobs/blockchain-indexer/blockchain-indexer.processor.ts => indexer/indexer.processor.ts} (94%) rename src/{jobs/blockchain-indexer/blockchain-indexer.service.ts => indexer/indexer.service.ts} (61%) rename src/{jobs/blockchain-indexer => indexer}/interfaces/index.ts (100%) rename src/{jobs/blockchain-indexer => indexer}/interfaces/indexer.interfaces.ts (100%) create mode 100644 supabase/migrations/20260618000000_create_indexer_state.sql diff --git a/src/app.module.ts b/src/app.module.ts index a0a196e..b621ae6 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -15,7 +15,7 @@ import { LiquidityModule } from './modules/liquidity/liquidity.module'; import { NotificationsModule } from './modules/notifications/notifications.module'; import { TransactionsModule } from './modules/transactions/transactions.module'; import { LearnersModule } from './modules/learners/learners.module'; -import { BlockchainIndexerModule } from './jobs/blockchain-indexer/blockchain-indexer.module'; +import { IndexerModule } from './indexer/indexer.module'; import { LoanPaymentReminderModule } from './jobs/loan-payment-reminder/loan-payment-reminder.module'; import { TransactionStatusCheckerModule } from './jobs/transaction-status-checker/transaction-status-checker.module'; import { NonceCleanupModule } from './jobs/nonce-cleanup/nonce-cleanup.module'; @@ -57,7 +57,7 @@ import { CorrelationIdMiddleware } from './common/logger/correlation-id.middlewa NotificationsModule, TransactionsModule, LearnersModule, - BlockchainIndexerModule, + IndexerModule, LoanPaymentReminderModule, TransactionStatusCheckerModule, NonceCleanupModule, diff --git a/src/jobs/blockchain-indexer/event-parser.service.ts b/src/indexer/event-parser.service.ts similarity index 100% rename from src/jobs/blockchain-indexer/event-parser.service.ts rename to src/indexer/event-parser.service.ts diff --git a/src/indexer/indexer-status.service.ts b/src/indexer/indexer-status.service.ts new file mode 100644 index 0000000..b37877c --- /dev/null +++ b/src/indexer/indexer-status.service.ts @@ -0,0 +1,21 @@ +import { Injectable } from '@nestjs/common'; +import { SupabaseService } from '../database/supabase.client'; + +@Injectable() +export class IndexerStatusService { + constructor(private readonly supabaseService: SupabaseService) {} + + async getStatus() { + const db = this.supabaseService.getServiceRoleClient(); + const { data, error } = await db + .from('indexer_state') + .select('contract_id, last_ledger, updated_at') + .order('updated_at', { ascending: false }); + + return { + status: error ? 'error' : 'ok', + data: data ?? [], + error: error?.message || null, + }; + } +} diff --git a/src/indexer/indexer.controller.ts b/src/indexer/indexer.controller.ts new file mode 100644 index 0000000..a3f4f95 --- /dev/null +++ b/src/indexer/indexer.controller.ts @@ -0,0 +1,16 @@ +import { Controller, Get } from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger'; +import { IndexerStatusService } from './indexer-status.service'; + +@ApiTags('indexer') +@Controller('indexer') +export class IndexerController { + constructor(private readonly indexerStatusService: IndexerStatusService) {} + + @Get('status') + @ApiOperation({ summary: 'Get indexer status and latest processed ledger' }) + @ApiResponse({ status: 200, description: 'Indexer status retrieved successfully' }) + async getStatus() { + return this.indexerStatusService.getStatus(); + } +} diff --git a/src/jobs/blockchain-indexer/blockchain-indexer.module.ts b/src/indexer/indexer.module.ts similarity index 58% rename from src/jobs/blockchain-indexer/blockchain-indexer.module.ts rename to src/indexer/indexer.module.ts index f977025..93d6c32 100644 --- a/src/jobs/blockchain-indexer/blockchain-indexer.module.ts +++ b/src/indexer/indexer.module.ts @@ -1,11 +1,13 @@ import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bullmq'; import { ConfigModule } from '@nestjs/config'; -import { BlockchainIndexerService } from './blockchain-indexer.service'; -import { BlockchainIndexerProcessor } from './blockchain-indexer.processor'; +import { IndexerService } from './indexer.service'; +import { IndexerProcessor } from './indexer.processor'; import { EventParserService } from './event-parser.service'; import { SupabaseService } from '../../database/supabase.client'; import { StellarModule } from '../../stellar/stellar.module'; +import { IndexerController } from './indexer.controller'; +import { IndexerStatusService } from './indexer-status.service'; @Module({ imports: [ @@ -13,11 +15,13 @@ import { StellarModule } from '../../stellar/stellar.module'; StellarModule, BullModule.registerQueue({ name: 'blockchain-indexer' }), ], + controllers: [IndexerController], providers: [ - BlockchainIndexerService, - BlockchainIndexerProcessor, + IndexerService, + IndexerProcessor, EventParserService, SupabaseService, + IndexerStatusService, ], }) -export class BlockchainIndexerModule {} +export class IndexerModule {} diff --git a/src/jobs/blockchain-indexer/blockchain-indexer.processor.ts b/src/indexer/indexer.processor.ts similarity index 94% rename from src/jobs/blockchain-indexer/blockchain-indexer.processor.ts rename to src/indexer/indexer.processor.ts index 861834d..bb10a96 100644 --- a/src/jobs/blockchain-indexer/blockchain-indexer.processor.ts +++ b/src/indexer/indexer.processor.ts @@ -19,15 +19,15 @@ import { /** * BullMQ processor for the `blockchain-indexer` queue. * - * On every invocation (every 30 s) it: - * 1. Reads the last indexed ledger per contract from `indexer_cursor`. + * On every invocation (every 6 s) it: + * 1. Reads the last indexed ledger per contract from `indexer_state`. * 2. Fetches new Soroban events since that ledger. * 3. Parses, deduplicates, and persists them to the database. * 4. Updates the cursor so the next run resumes correctly. */ @Processor('blockchain-indexer') -export class BlockchainIndexerProcessor extends WorkerHost { - private readonly logger = new Logger(BlockchainIndexerProcessor.name); +export class IndexerProcessor extends WorkerHost { + private readonly logger = new Logger(IndexerProcessor.name); private readonly loanContractId: string; private readonly reputationContractId: string; @@ -51,7 +51,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { async process(_job: Job): Promise { this.logger.log({ - context: 'BlockchainIndexerProcessor', + context: 'IndexerProcessor', action: 'process', }, 'Blockchain indexer job started'); @@ -59,7 +59,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { await this.indexContract(this.loanContractId, 'loan'); } catch (error) { this.logger.error({ - context: 'BlockchainIndexerProcessor', + context: 'IndexerProcessor', action: 'indexLoanContract', error: error.message, stack: error.stack, @@ -70,7 +70,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { await this.indexContract(this.reputationContractId, 'reputation'); } catch (error) { this.logger.error({ - context: 'BlockchainIndexerProcessor', + context: 'IndexerProcessor', action: 'indexReputationContract', error: error.message, stack: error.stack, @@ -78,7 +78,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { } this.logger.log({ - context: 'BlockchainIndexerProcessor', + context: 'IndexerProcessor', action: 'process', }, 'Blockchain indexer job completed'); } @@ -102,8 +102,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { const startLedger = cursor + 1; this.logger.debug({ - context: 'BlockchainIndexerProcessor', - action: 'indexContract', + context: 'IndexerProcessor', contractId: contractId.slice(0, 8) + '...', label, startLedger, @@ -117,7 +116,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { } this.logger.log({ - context: 'BlockchainIndexerProcessor', + context: 'IndexerProcessor', action: 'indexContract', label, eventCount: rawEvents.length, @@ -140,7 +139,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { } this.logger.log({ - context: 'BlockchainIndexerProcessor', + context: 'IndexerProcessor', action: 'eventIndexed', eventType: parsed.type, eventId: parsed.eventId, @@ -151,7 +150,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { } catch (error) { errorCount++; this.logger.error({ - context: 'BlockchainIndexerProcessor', + context: 'IndexerProcessor', action: 'persistEvent', error: error.message, eventId: rawEvent?.id, @@ -165,7 +164,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { } this.logger.log({ - context: 'BlockchainIndexerProcessor', + context: 'IndexerProcessor', action: 'indexContractComplete', label, successCount, @@ -385,7 +384,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { if (cacheError) { // Non-fatal: cache update failure should not block event processing this.logger.warn({ - context: 'BlockchainIndexerProcessor', + context: 'IndexerProcessor', action: 'updateReputationCache', error: cacheError.message, wallet: payload.wallet, @@ -398,14 +397,14 @@ export class BlockchainIndexerProcessor extends WorkerHost { // ------------------------------------------------------------------------- /** - * Reads the last indexed ledger for a contract from `indexer_cursor`. + * Reads the last indexed ledger for a contract from `indexer_state`. * Returns 0 if no cursor exists (first run). */ async getCursor(contractId: string): Promise { const db = this.supabaseService.getServiceRoleClient(); const { data, error } = await db - .from('indexer_cursor') + .from('indexer_state') .select('last_ledger') .eq('contract_id', contractId) .single(); @@ -423,7 +422,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { async updateCursor(contractId: string, ledger: number): Promise { const db = this.supabaseService.getServiceRoleClient(); - const { error } = await db.from('indexer_cursor').upsert( + const { error } = await db.from('indexer_state').upsert( { contract_id: contractId, last_ledger: ledger, @@ -434,7 +433,7 @@ export class BlockchainIndexerProcessor extends WorkerHost { if (error) { this.logger.error({ - context: 'BlockchainIndexerProcessor', + context: 'IndexerProcessor', action: 'updateCursor', error: error.message, contractId, diff --git a/src/jobs/blockchain-indexer/blockchain-indexer.service.ts b/src/indexer/indexer.service.ts similarity index 61% rename from src/jobs/blockchain-indexer/blockchain-indexer.service.ts rename to src/indexer/indexer.service.ts index 740a273..29c6674 100644 --- a/src/jobs/blockchain-indexer/blockchain-indexer.service.ts +++ b/src/indexer/indexer.service.ts @@ -5,12 +5,12 @@ import { Queue } from 'bullmq'; /** * Schedules the blockchain-indexer repeating job on module initialisation. * - * The job runs every 30 seconds. If the job already exists (e.g. after a + * The job runs every 6 seconds. If the job already exists (e.g. after a * hot-reload) BullMQ silently skips the duplicate. */ @Injectable() -export class BlockchainIndexerService implements OnModuleInit { - private readonly logger = new Logger(BlockchainIndexerService.name); +export class IndexerService implements OnModuleInit { + private readonly logger = new Logger(IndexerService.name); constructor( @InjectQueue('blockchain-indexer') @@ -18,7 +18,6 @@ export class BlockchainIndexerService implements OnModuleInit { ) {} async onModuleInit(): Promise { - // Remove any stale repeatable jobs from a previous run const existing = await this.indexerQueue.getRepeatableJobs(); for (const job of existing) { await this.indexerQueue.removeRepeatableByKey(job.key); @@ -28,15 +27,18 @@ export class BlockchainIndexerService implements OnModuleInit { 'index-events', {}, { - repeat: { every: 30_000 }, // 30 seconds + repeat: { every: 6_000 }, // 6 seconds removeOnComplete: { count: 10 }, removeOnFail: { count: 50 }, }, ); - this.logger.log({ - context: 'BlockchainIndexerService', - action: 'onModuleInit', - }, 'Blockchain indexer job scheduled — runs every 30 seconds'); + this.logger.log( + { + context: 'IndexerService', + action: 'onModuleInit', + }, + 'Indexer job scheduled — runs every 6 seconds', + ); } } diff --git a/src/jobs/blockchain-indexer/interfaces/index.ts b/src/indexer/interfaces/index.ts similarity index 100% rename from src/jobs/blockchain-indexer/interfaces/index.ts rename to src/indexer/interfaces/index.ts diff --git a/src/jobs/blockchain-indexer/interfaces/indexer.interfaces.ts b/src/indexer/interfaces/indexer.interfaces.ts similarity index 100% rename from src/jobs/blockchain-indexer/interfaces/indexer.interfaces.ts rename to src/indexer/interfaces/indexer.interfaces.ts diff --git a/src/modules/health/health.service.ts b/src/modules/health/health.service.ts index d46becf..4fcb4ea 100644 --- a/src/modules/health/health.service.ts +++ b/src/modules/health/health.service.ts @@ -150,7 +150,7 @@ export class HealthService { try { const db = this.supabaseService.getServiceRoleClient(); const { data } = await db - .from('indexer_cursor') + .from('indexer_state') .select('last_ledger') .order('updated_at', { ascending: false }) .limit(1) diff --git a/src/modules/metrics/metrics.updater.ts b/src/modules/metrics/metrics.updater.ts index f5c29f4..02e9643 100644 --- a/src/modules/metrics/metrics.updater.ts +++ b/src/modules/metrics/metrics.updater.ts @@ -60,7 +60,7 @@ export class MetricsUpdater implements OnModuleInit { try { const db = this.supabaseService.getServiceRoleClient(); const { data } = await db - .from('indexer_cursor') + .from('indexer_state') .select('last_ledger') .order('updated_at', { ascending: false }) .limit(1) diff --git a/supabase/migrations/20260618000000_create_indexer_state.sql b/supabase/migrations/20260618000000_create_indexer_state.sql new file mode 100644 index 0000000..ac29c02 --- /dev/null +++ b/supabase/migrations/20260618000000_create_indexer_state.sql @@ -0,0 +1,7 @@ +CREATE TABLE public.indexer_state ( + contract_id TEXT PRIMARY KEY, + last_ledger BIGINT NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +ALTER TABLE public.indexer_state ENABLE ROW LEVEL SECURITY; diff --git a/test/unit/jobs/blockchain-indexer/blockchain-indexer.processor.spec.ts b/test/unit/jobs/blockchain-indexer/blockchain-indexer.processor.spec.ts index e42c436..ac97eec 100644 --- a/test/unit/jobs/blockchain-indexer/blockchain-indexer.processor.spec.ts +++ b/test/unit/jobs/blockchain-indexer/blockchain-indexer.processor.spec.ts @@ -1,7 +1,7 @@ import { Test, TestingModule } from '@nestjs/testing'; import { ConfigService } from '@nestjs/config'; -import { BlockchainIndexerProcessor } from '../../../../src/jobs/blockchain-indexer/blockchain-indexer.processor'; -import { EventParserService } from '../../../../src/jobs/blockchain-indexer/event-parser.service'; +import { IndexerProcessor } from '../../../../src/indexer/indexer.processor'; +import { EventParserService } from '../../../../src/indexer/event-parser.service'; import { SupabaseService } from '../../../../src/database/supabase.client'; import { SorobanService } from '../../../../src/blockchain/soroban/soroban.service'; import { @@ -11,7 +11,7 @@ import { LoanRepaidPayload, LoanDefaultedPayload, ScoreChangedPayload, -} from '../../../../src/jobs/blockchain-indexer/interfaces'; +} from '../../../../src/indexer/interfaces/indexer.interfaces'; // --------------------------------------------------------------------------- // Fluent Supabase mock helpers @@ -39,8 +39,8 @@ function createChain(overrides: Record = {}) { // Tests // --------------------------------------------------------------------------- -describe('BlockchainIndexerProcessor', () => { - let processor: BlockchainIndexerProcessor; +describe('IndexerProcessor', () => { + let processor: IndexerProcessor; let eventParser: EventParserService; // Per-table chains so we can assert on the right table @@ -77,12 +77,8 @@ describe('BlockchainIndexerProcessor', () => { mockSupabaseClient.from.mockImplementation((table: string) => { switch (table) { - case 'indexer_cursor': + case 'indexer_state': return cursorChain; - case 'loan_index': - return loanChain; - case 'payment_index': - return paymentChain; case 'reputation_history': return reputationHistoryChain; case 'reputation_cache': @@ -96,7 +92,7 @@ describe('BlockchainIndexerProcessor', () => { beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ providers: [ - BlockchainIndexerProcessor, + IndexerProcessor, EventParserService, { provide: SupabaseService, useValue: mockSupabaseService }, { provide: SorobanService, useValue: mockSorobanService }, @@ -109,7 +105,7 @@ describe('BlockchainIndexerProcessor', () => { ], }).compile(); - processor = module.get(BlockchainIndexerProcessor); + processor = module.get(IndexerProcessor); eventParser = module.get(EventParserService); jest.clearAllMocks(); @@ -156,7 +152,7 @@ describe('BlockchainIndexerProcessor', () => { it('should upsert the cursor with the new ledger', async () => { await processor.updateCursor('C_FAKE', 99999); - expect(mockSupabaseClient.from).toHaveBeenCalledWith('indexer_cursor'); + expect(mockSupabaseClient.from).toHaveBeenCalledWith('indexer_state'); expect(cursorChain.upsert).toHaveBeenCalledWith( expect.objectContaining({ contract_id: 'C_FAKE', @@ -199,7 +195,7 @@ describe('BlockchainIndexerProcessor', () => { it('should skip contracts with no configured ID', async () => { const module: TestingModule = await Test.createTestingModule({ providers: [ - BlockchainIndexerProcessor, + IndexerProcessor, EventParserService, { provide: SupabaseService, useValue: mockSupabaseService }, { provide: SorobanService, useValue: mockSorobanService }, @@ -210,7 +206,7 @@ describe('BlockchainIndexerProcessor', () => { ], }).compile(); - const emptyProcessor = module.get(BlockchainIndexerProcessor); + const emptyProcessor = module.get(IndexerProcessor); mockServer.getEvents.mockClear(); await emptyProcessor.process({} as any); From 4f9629638147796e38beb621ef8b98083c9387b7 Mon Sep 17 00:00:00 2001 From: khaylebfortune <111098422+khaylebfortune@users.noreply.github.com> Date: Thu, 18 Jun 2026 15:47:55 +0000 Subject: [PATCH 2/2] modified --- src/indexer/indexer.module.ts | 4 ++-- src/indexer/indexer.processor.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/indexer/indexer.module.ts b/src/indexer/indexer.module.ts index 93d6c32..d951230 100644 --- a/src/indexer/indexer.module.ts +++ b/src/indexer/indexer.module.ts @@ -4,8 +4,8 @@ import { ConfigModule } from '@nestjs/config'; import { IndexerService } from './indexer.service'; import { IndexerProcessor } from './indexer.processor'; import { EventParserService } from './event-parser.service'; -import { SupabaseService } from '../../database/supabase.client'; -import { StellarModule } from '../../stellar/stellar.module'; +import { SupabaseService } from '../database/supabase.client'; +import { StellarModule } from '../stellar/stellar.module'; import { IndexerController } from './indexer.controller'; import { IndexerStatusService } from './indexer-status.service'; diff --git a/src/indexer/indexer.processor.ts b/src/indexer/indexer.processor.ts index bb10a96..32101f8 100644 --- a/src/indexer/indexer.processor.ts +++ b/src/indexer/indexer.processor.ts @@ -3,8 +3,8 @@ import { Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Job } from 'bullmq'; import * as StellarSdk from 'stellar-sdk'; -import { SupabaseService } from '../../database/supabase.client'; -import { SorobanService } from '../../blockchain/soroban/soroban.service'; +import { SupabaseService } from '../database/supabase.client'; +import { SorobanService } from '../blockchain/soroban/soroban.service'; import { EventParserService } from './event-parser.service'; import { ParsedContractEvent,