Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { LiquidityModule } from './modules/liquidity/liquidity.module';
import { NotificationsModule } from './modules/notifications/notifications.module';
import { TransactionsModule } from './modules/transactions/transactions.module';
import { LearnersModule } from './modules/learners/learners.module';
import { BlockchainIndexerModule } from './jobs/blockchain-indexer/blockchain-indexer.module';
import { IndexerModule } from './indexer/indexer.module';
import { LoanPaymentReminderModule } from './jobs/loan-payment-reminder/loan-payment-reminder.module';
import { TransactionStatusCheckerModule } from './jobs/transaction-status-checker/transaction-status-checker.module';
import { NonceCleanupModule } from './jobs/nonce-cleanup/nonce-cleanup.module';
Expand Down Expand Up @@ -57,7 +57,7 @@ import { CorrelationIdMiddleware } from './common/logger/correlation-id.middlewa
NotificationsModule,
TransactionsModule,
LearnersModule,
BlockchainIndexerModule,
IndexerModule,
LoanPaymentReminderModule,
TransactionStatusCheckerModule,
NonceCleanupModule,
Expand Down
21 changes: 21 additions & 0 deletions src/indexer/indexer-status.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Injectable } from '@nestjs/common';
import { SupabaseService } from '../database/supabase.client';

@Injectable()
export class IndexerStatusService {
constructor(private readonly supabaseService: SupabaseService) {}

async getStatus() {
const db = this.supabaseService.getServiceRoleClient();
const { data, error } = await db
.from('indexer_state')
.select('contract_id, last_ledger, updated_at')
.order('updated_at', { ascending: false });

return {
status: error ? 'error' : 'ok',
data: data ?? [],
error: error?.message || null,
};
}
}
16 changes: 16 additions & 0 deletions src/indexer/indexer.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Controller, Get } from '@nestjs/common';
import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger';
import { IndexerStatusService } from './indexer-status.service';

@ApiTags('indexer')
@Controller('indexer')
export class IndexerController {
constructor(private readonly indexerStatusService: IndexerStatusService) {}

@Get('status')
@ApiOperation({ summary: 'Get indexer status and latest processed ledger' })
@ApiResponse({ status: 200, description: 'Indexer status retrieved successfully' })
async getStatus() {
return this.indexerStatusService.getStatus();
}
}
27 changes: 27 additions & 0 deletions src/indexer/indexer.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { ConfigModule } from '@nestjs/config';
import { IndexerService } from './indexer.service';
import { IndexerProcessor } from './indexer.processor';
import { EventParserService } from './event-parser.service';
import { SupabaseService } from '../database/supabase.client';
import { StellarModule } from '../stellar/stellar.module';
import { IndexerController } from './indexer.controller';
import { IndexerStatusService } from './indexer-status.service';

@Module({
imports: [
ConfigModule,
StellarModule,
BullModule.registerQueue({ name: 'blockchain-indexer' }),
],
controllers: [IndexerController],
providers: [
IndexerService,
IndexerProcessor,
EventParserService,
SupabaseService,
IndexerStatusService,
],
})
export class IndexerModule {}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Job } from 'bullmq';
import * as StellarSdk from 'stellar-sdk';
import { SupabaseService } from '../../database/supabase.client';
import { SorobanService } from '../../blockchain/soroban/soroban.service';
import { SupabaseService } from '../database/supabase.client';
import { SorobanService } from '../blockchain/soroban/soroban.service';
import { EventParserService } from './event-parser.service';
import {
ParsedContractEvent,
Expand All @@ -19,15 +19,15 @@ import {
/**
* BullMQ processor for the `blockchain-indexer` queue.
*
* On every invocation (every 30 s) it:
* 1. Reads the last indexed ledger per contract from `indexer_cursor`.
* On every invocation (every 6 s) it:
* 1. Reads the last indexed ledger per contract from `indexer_state`.
* 2. Fetches new Soroban events since that ledger.
* 3. Parses, deduplicates, and persists them to the database.
* 4. Updates the cursor so the next run resumes correctly.
*/
@Processor('blockchain-indexer')
export class BlockchainIndexerProcessor extends WorkerHost {
private readonly logger = new Logger(BlockchainIndexerProcessor.name);
export class IndexerProcessor extends WorkerHost {
private readonly logger = new Logger(IndexerProcessor.name);

private readonly loanContractId: string;
private readonly reputationContractId: string;
Expand All @@ -51,15 +51,15 @@ export class BlockchainIndexerProcessor extends WorkerHost {

async process(_job: Job): Promise<void> {
this.logger.log({
context: 'BlockchainIndexerProcessor',
context: 'IndexerProcessor',
action: 'process',
}, 'Blockchain indexer job started');

try {
await this.indexContract(this.loanContractId, 'loan');
} catch (error) {
this.logger.error({
context: 'BlockchainIndexerProcessor',
context: 'IndexerProcessor',
action: 'indexLoanContract',
error: error.message,
stack: error.stack,
Expand All @@ -70,15 +70,15 @@ export class BlockchainIndexerProcessor extends WorkerHost {
await this.indexContract(this.reputationContractId, 'reputation');
} catch (error) {
this.logger.error({
context: 'BlockchainIndexerProcessor',
context: 'IndexerProcessor',
action: 'indexReputationContract',
error: error.message,
stack: error.stack,
}, 'Failed to index reputation contract events — will retry next cycle');
}

this.logger.log({
context: 'BlockchainIndexerProcessor',
context: 'IndexerProcessor',
action: 'process',
}, 'Blockchain indexer job completed');
}
Expand All @@ -102,8 +102,7 @@ export class BlockchainIndexerProcessor extends WorkerHost {
const startLedger = cursor + 1;

this.logger.debug({
context: 'BlockchainIndexerProcessor',
action: 'indexContract',
context: 'IndexerProcessor',
contractId: contractId.slice(0, 8) + '...',
label,
startLedger,
Expand All @@ -117,7 +116,7 @@ export class BlockchainIndexerProcessor extends WorkerHost {
}

this.logger.log({
context: 'BlockchainIndexerProcessor',
context: 'IndexerProcessor',
action: 'indexContract',
label,
eventCount: rawEvents.length,
Expand All @@ -140,7 +139,7 @@ export class BlockchainIndexerProcessor extends WorkerHost {
}

this.logger.log({
context: 'BlockchainIndexerProcessor',
context: 'IndexerProcessor',
action: 'eventIndexed',
eventType: parsed.type,
eventId: parsed.eventId,
Expand All @@ -151,7 +150,7 @@ export class BlockchainIndexerProcessor extends WorkerHost {
} catch (error) {
errorCount++;
this.logger.error({
context: 'BlockchainIndexerProcessor',
context: 'IndexerProcessor',
action: 'persistEvent',
error: error.message,
eventId: rawEvent?.id,
Expand All @@ -165,7 +164,7 @@ export class BlockchainIndexerProcessor extends WorkerHost {
}

this.logger.log({
context: 'BlockchainIndexerProcessor',
context: 'IndexerProcessor',
action: 'indexContractComplete',
label,
successCount,
Expand Down Expand Up @@ -385,7 +384,7 @@ export class BlockchainIndexerProcessor extends WorkerHost {
if (cacheError) {
// Non-fatal: cache update failure should not block event processing
this.logger.warn({
context: 'BlockchainIndexerProcessor',
context: 'IndexerProcessor',
action: 'updateReputationCache',
error: cacheError.message,
wallet: payload.wallet,
Expand All @@ -398,14 +397,14 @@ export class BlockchainIndexerProcessor extends WorkerHost {
// -------------------------------------------------------------------------

/**
* Reads the last indexed ledger for a contract from `indexer_cursor`.
* Reads the last indexed ledger for a contract from `indexer_state`.
* Returns 0 if no cursor exists (first run).
*/
async getCursor(contractId: string): Promise<number> {
const db = this.supabaseService.getServiceRoleClient();

const { data, error } = await db
.from('indexer_cursor')
.from('indexer_state')
.select('last_ledger')
.eq('contract_id', contractId)
.single();
Expand All @@ -423,7 +422,7 @@ export class BlockchainIndexerProcessor extends WorkerHost {
async updateCursor(contractId: string, ledger: number): Promise<void> {
const db = this.supabaseService.getServiceRoleClient();

const { error } = await db.from('indexer_cursor').upsert(
const { error } = await db.from('indexer_state').upsert(
{
contract_id: contractId,
last_ledger: ledger,
Expand All @@ -434,7 +433,7 @@ export class BlockchainIndexerProcessor extends WorkerHost {

if (error) {
this.logger.error({
context: 'BlockchainIndexerProcessor',
context: 'IndexerProcessor',
action: 'updateCursor',
error: error.message,
contractId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ import { Queue } from 'bullmq';
/**
* Schedules the blockchain-indexer repeating job on module initialisation.
*
* The job runs every 30 seconds. If the job already exists (e.g. after a
* The job runs every 6 seconds. If the job already exists (e.g. after a
* hot-reload) BullMQ silently skips the duplicate.
*/
@Injectable()
export class BlockchainIndexerService implements OnModuleInit {
private readonly logger = new Logger(BlockchainIndexerService.name);
export class IndexerService implements OnModuleInit {
private readonly logger = new Logger(IndexerService.name);

constructor(
@InjectQueue('blockchain-indexer')
private readonly indexerQueue: Queue,
) {}

async onModuleInit(): Promise<void> {
// Remove any stale repeatable jobs from a previous run
const existing = await this.indexerQueue.getRepeatableJobs();
for (const job of existing) {
await this.indexerQueue.removeRepeatableByKey(job.key);
Expand All @@ -28,15 +27,18 @@ export class BlockchainIndexerService implements OnModuleInit {
'index-events',
{},
{
repeat: { every: 30_000 }, // 30 seconds
repeat: { every: 6_000 }, // 6 seconds
removeOnComplete: { count: 10 },
removeOnFail: { count: 50 },
},
);

this.logger.log({
context: 'BlockchainIndexerService',
action: 'onModuleInit',
}, 'Blockchain indexer job scheduled — runs every 30 seconds');
this.logger.log(
{
context: 'IndexerService',
action: 'onModuleInit',
},
'Indexer job scheduled — runs every 6 seconds',
);
}
}
23 changes: 0 additions & 23 deletions src/jobs/blockchain-indexer/blockchain-indexer.module.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/modules/health/health.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ export class HealthService {
try {
const db = this.supabaseService.getServiceRoleClient();
const { data } = await db
.from('indexer_cursor')
.from('indexer_state')
.select('last_ledger')
.order('updated_at', { ascending: false })
.limit(1)
Expand Down
2 changes: 1 addition & 1 deletion src/modules/metrics/metrics.updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class MetricsUpdater implements OnModuleInit {
try {
const db = this.supabaseService.getServiceRoleClient();
const { data } = await db
.from('indexer_cursor')
.from('indexer_state')
.select('last_ledger')
.order('updated_at', { ascending: false })
.limit(1)
Expand Down
7 changes: 7 additions & 0 deletions supabase/migrations/20260618000000_create_indexer_state.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE public.indexer_state (
contract_id TEXT PRIMARY KEY,
last_ledger BIGINT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

ALTER TABLE public.indexer_state ENABLE ROW LEVEL SECURITY;
Loading
Loading