diff --git a/docs/database-transactions.md b/docs/database-transactions.md new file mode 100644 index 0000000..d5b1b26 --- /dev/null +++ b/docs/database-transactions.md @@ -0,0 +1,75 @@ +# Database Transactions + +## Quick Reference + +| Helper | Retries? | Use For | +| ------------------------ | ---------------------------- | ------------------------------------------------------------------------- | +| `withTransaction` | ✅ Yes (exponential backoff) | **All money-moving code** — loans, repayments, transfers, balance updates | +| `withTransactionNoRetry` | ❌ No | Read-only queries, idempotent admin scripts, externally-managed retry | + +## Import + +```typescript +import { withTransaction, withTransactionNoRetry } from "../db/transaction"; + +Why Two Helpers? +PostgreSQL (and other MVCC databases) can raise transient errors under concurrency: +40001 serialization_failure — concurrent transactions conflict on row versions +40P01 deadlock_detected — circular lock dependency between transactions +These are expected under load and safe to retry — the transaction had not yet committed. +withTransaction automatically retries with exponential backoff (50ms → 100ms → 200ms … max 2s, with jitter). +withTransactionNoRetry skips this overhead for paths where it adds no value. + +Examples +Money-moving: use retrying variant +import { withTransaction } from "../db/transaction"; + +async function disburseLoan(loanId: string, amount: BigNumber) { + const client = await pool.connect(); + try { + return await withTransaction(client, async (tx) => { + // Deduct from lender escrow + await tx.query( + "UPDATE escrow_balances SET balance = balance - $1 WHERE id = $2", + [amount, lenderId] + ); + + // Credit borrower wallet + await tx.query( + "UPDATE wallet_balances SET balance = balance + $1 WHERE id = $2", + [amount, borrowerId] + ); + + // Mark loan disbursed + await tx.query( + "UPDATE loans SET status = 'disbursed', disbursed_at = NOW() WHERE id = $1", + [loanId] + ); + + return { disbursed: true }; + }); + } finally { + client.release(); + } +} + + +Read-only: use no-retry variant (optional) +import { withTransactionNoRetry } from "../db/transaction"; + +async function getLoanHistory(userId: string) { + const client = await pool.connect(); + try { + return await withTransactionNoRetry(client, async (tx) => { + // SET TRANSACTION READ ONLY; -- optional optimization + const { rows } = await tx.query( + "SELECT * FROM loans WHERE borrower_id = $1 ORDER BY created_at DESC", + [userId] + ); + return rows; + }); + } finally { + client.release(); + } +} +``` diff --git a/scripts/audit-transaction-imports.ts b/scripts/audit-transaction-imports.ts new file mode 100644 index 0000000..5da9c58 --- /dev/null +++ b/scripts/audit-transaction-imports.ts @@ -0,0 +1,161 @@ +#!/usr/bin/env ts-node +/** + * Audit script: find all withTransaction imports across the codebase. + * + * Usage: + * npx ts-node scripts/audit-transaction-imports.ts + * + * Outputs a report showing: + * - Files importing from "../db/connection" (should migrate) + * - Files importing from "../db/transaction" (correct) + * - Files using withTransactionNoRetry (verify intentional) + */ + +import * as fs from "fs"; +import * as path from "path"; + +const SRC_DIR = path.join(__dirname, "..", "src"); + +interface ImportMatch { + file: string; + line: number; + text: string; + source: "connection" | "transaction" | "unknown"; + usesRetryVariant: boolean; + usesNoRetry: boolean; +} + +function findTsFiles(dir: string): string[] { + const files: string[] = []; + for (const entry of fs.readdirSync(dir, { withFileTypes: true })) { + const fullPath = path.join(dir, entry.name); + if (entry.isDirectory()) { + files.push(...findTsFiles(fullPath)); + } else if (entry.name.endsWith(".ts") && !entry.name.endsWith(".d.ts")) { + files.push(fullPath); + } + } + return files; +} + +function analyzeFile(filePath: string): ImportMatch[] { + const content = fs.readFileSync(filePath, "utf-8"); + const lines = content.split("\n"); + const matches: ImportMatch[] = []; + + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + const importRegex = + /import\s+.*?\{[^}]*\b(withTransaction|withTransactionNoRetry)\b[^}]*\}.*?from\s+['"]([^'"]+)['"]/; + const match = line.match(importRegex); + + if (match) { + const sourceModule = match[2]; + const source: ImportMatch["source"] = sourceModule.includes("connection") + ? "connection" + : sourceModule.includes("transaction") + ? "transaction" + : "unknown"; + + matches.push({ + file: path.relative(process.cwd(), filePath), + line: i + 1, + text: line.trim(), + source, + usesRetryVariant: line.includes("withTransaction"), + usesNoRetry: line.includes("withTransactionNoRetry"), + }); + } + } + + return matches; +} + +function main() { + const files = findTsFiles(SRC_DIR); + const allMatches: ImportMatch[] = []; + + for (const file of files) { + allMatches.push(...analyzeFile(file)); + } + + // Categorize + const fromConnection = allMatches.filter((m) => m.source === "connection"); + const fromTransaction = allMatches.filter((m) => m.source === "transaction"); + const usingNoRetry = allMatches.filter((m) => m.usesNoRetry); + + console.log("═══════════════════════════════════════════════════════════"); + console.log(" withTransaction Import Audit Report"); + console.log("═══════════════════════════════════════════════════════════\n"); + + console.log(`Total imports found: ${allMatches.length}\n`); + + if (fromConnection.length > 0) { + console.log( + `⚠️ Imports from connection.ts (NEED MIGRATION): ${fromConnection.length}`, + ); + console.log( + " These should be updated to import from '../db/transaction'\n", + ); + for (const m of fromConnection) { + console.log(` ${m.file}:${m.line}`); + console.log(` → ${m.text}\n`); + } + } else { + console.log("✅ No imports from connection.ts — all migrated!\n"); + } + + if (fromTransaction.length > 0) { + console.log( + `✅ Imports from transaction.ts (CORRECT): ${fromTransaction.length}\n`, + ); + for (const m of fromTransaction) { + console.log(` ${m.file}:${m.line}`); + console.log(` → ${m.text}\n`); + } + } + + if (usingNoRetry.length > 0) { + console.log( + `ℹ️ Files using withTransactionNoRetry: ${usingNoRetry.length}`, + ); + console.log(" Please verify these are intentionally non-retrying:\n"); + for (const m of usingNoRetry) { + console.log(` ${m.file}:${m.line}`); + console.log(` → ${m.text}\n`); + } + } + + // Money-moving paths check + const moneyPaths = allMatches.filter( + (m) => + m.file.toLowerCase().includes("loan") || + m.file.toLowerCase().includes("payment") || + m.file.toLowerCase().includes("repay") || + m.file.toLowerCase().includes("transfer") || + m.file.toLowerCase().includes("wallet") || + m.file.toLowerCase().includes("balance"), + ); + + if (moneyPaths.length > 0) { + console.log("💰 Money-moving paths using withTransaction:"); + for (const m of moneyPaths) { + const status = m.usesNoRetry + ? "❌ USES NO-RETRY — RISK!" + : "✅ retrying variant"; + console.log(` ${m.file}:${m.line} — ${status}`); + } + } + + console.log("\n═══════════════════════════════════════════════════════════"); + console.log(" Recommended fixes:"); + console.log("═══════════════════════════════════════════════════════════"); + console.log( + ` sed -i 's|from "../db/connection"|from "../db/transaction"|g' src/**/*.ts`, + ); + console.log( + " Then verify money-moving paths use withTransaction (not NoRetry).", + ); +} + +main(); diff --git a/src/app.ts b/src/app.ts index 5b33a56..6b0d1e6 100644 --- a/src/app.ts +++ b/src/app.ts @@ -10,7 +10,7 @@ import dotenv from "dotenv"; import { Sentry } from "./config/sentry.js"; dotenv.config(); -import pool from "./db/connection.js"; +import { pool } from "./db/connection.js"; import { cacheService } from "./services/cacheService.js"; import { sorobanService } from "./services/sorobanService.js"; import simulationRoutes from "./routes/simulationRoutes.js"; diff --git a/src/db/connection.ts b/src/db/connection.ts index 3fbdd21..ba056bb 100644 --- a/src/db/connection.ts +++ b/src/db/connection.ts @@ -1,195 +1,53 @@ -import pg, { type PoolClient } from "pg"; +/** + * src/db/connection.ts + * + * Database connection pool and query helper. + * Transaction helpers are re-exported from transaction.ts for convenience. + */ + +import pg from "pg"; +import { env } from "../config/env.js"; import logger from "../utils/logger.js"; +import { withTransaction } from "./transaction.js"; -export type { PoolClient }; +export type { PoolClient } from "pg"; +export { withTransaction }; const { Pool } = pg; -// Parse pool configuration from environment -const maxPoolSize = process.env.DB_POOL_MAX - ? parseInt(process.env.DB_POOL_MAX, 10) - : 10; -const minPoolSize = process.env.DB_POOL_MIN - ? parseInt(process.env.DB_POOL_MIN, 10) - : 2; -const idleTimeoutMillis = process.env.DB_IDLE_TIMEOUT_MS - ? parseInt(process.env.DB_IDLE_TIMEOUT_MS, 10) - : 30000; - -const pool = new Pool({ - connectionString: process.env.DATABASE_URL, - min: minPoolSize, - max: maxPoolSize, - idleTimeoutMillis, +export const pool = new Pool({ + connectionString: env.DATABASE_URL, + max: 20, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 2000, }); -let isShuttingDown = false; - -// Periodic pool health metrics logging -const metricsInterval = setInterval(() => { - logger.info("DB Pool Metrics", { - total: pool.totalCount, - idle: pool.idleCount, - active: pool.totalCount - pool.idleCount, - waiting: pool.waitingCount, - }); -}, 60000); - -// Unref the interval so it doesn't keep the process alive -metricsInterval.unref(); - -// Log idle client errors -pool.on("error", (err: Error) => { - logger.error("Unexpected error on idle client", err); +pool.on("error", (err) => { + logger.error("Unexpected database pool error", err); }); -// Helper for transient failures -export const TRANSIENT_ERROR_CODES = new Set([ - "ECONNREFUSED", - "08000", - "08003", - "08006", - "57P01", // admin_shutdown - "57P02", // crash_shutdown - "57P03", // cannot_connect_now - "40001", // serialization_failure - "40P01", // deadlock_detected -]); -const MAX_RETRIES = 3; - -const withRetry = async ( - operation: () => Promise, - retries = MAX_RETRIES, - delay = 500, -): Promise => { +/** + * Execute a single query using a pooled client. + * The client is automatically released back to the pool. + * Returns the full pg.QueryResult so callers can access .rows, .rowCount, etc. + */ +export async function query( + sql: string, + params?: unknown[], +): Promise { + const client = await pool.connect(); try { - return await operation(); - } catch (error: any) { - if (retries > 0 && TRANSIENT_ERROR_CODES.has(error.code)) { - logger.warn( - `Transient db error (${error.code}). Retrying in ${delay}ms... (${retries} retries left)`, - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - return withRetry(operation, retries - 1, delay * 2); - } - throw error; + const result = await client.query(sql, params); + return result; + } finally { + client.release(); } -}; +} /** - * Execute `fn` inside a single dedicated database transaction. - * - * A single PoolClient is checked out for the lifetime of the call so that - * BEGIN / all DML / COMMIT all run on the **same** PostgreSQL connection. - * If `fn` throws, or if any transient error is encountered, the transaction - * is rolled back and the error is re-thrown after up to `maxRetries` attempts - * with exponential back-off. - * - * @param fn Callback that receives the pinned client. - * @param maxRetries Number of retry attempts on transient errors (default 3). - * @param baseDelayMs Initial back-off delay in milliseconds (doubles each retry). + * Acquire a dedicated client from the pool. + * Caller MUST call client.release() when done. */ -export async function withTransaction( - fn: (client: PoolClient) => Promise, - maxRetries = 3, - baseDelayMs = 200, -): Promise { - let attempt = 0; - - while (true) { - const client = await getClient(); - try { - await client.query("BEGIN"); - const result = await fn(client); - await client.query("COMMIT"); - return result; - } catch (error: any) { - try { - await client.query("ROLLBACK"); - } catch (rollbackError) { - logger.error("Failed to rollback transaction", { rollbackError }); - } - - const isTransient = TRANSIENT_ERROR_CODES.has(error?.code); - if (isTransient && attempt < maxRetries) { - const delay = baseDelayMs * 2 ** attempt; - attempt++; - logger.warn( - `Transient DB error in transaction (${error.code}). ` + - `Retrying in ${delay}ms (attempt ${attempt}/${maxRetries})`, - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - continue; - } - - throw error; - } finally { - client.release(); - } - } +export async function getClient(): Promise { + return pool.connect(); } - -const checkExhaustion = () => { - if (pool.totalCount >= maxPoolSize && pool.idleCount === 0) { - logger.warn( - "DB Pool Exhaustion Warning: All connections are currently in use.", - { - waiting: pool.waitingCount, - active: pool.totalCount, - }, - ); - } -}; - -export const query = async (text: string, params?: unknown[]) => { - if (isShuttingDown) { - throw new Error("Database pool is shutting down"); - } - checkExhaustion(); - return withRetry(async () => { - const start = Date.now(); - const result = await pool.query(text, params); - const duration = Date.now() - start; - logger.debug("Executed query", { - text: text.substring(0, 50), - duration, - rows: result.rowCount, - }); - return result; - }); -}; - -export const getClient = async () => { - if (isShuttingDown) { - throw new Error("Database pool is shutting down"); - } - checkExhaustion(); - return withRetry(async () => { - const client = await pool.connect(); - return client; - }); -}; - -const waitForPoolToDrain = async (timeoutMs: number): Promise => { - const startedAt = Date.now(); - - while (pool.totalCount > 0 && pool.totalCount !== pool.idleCount) { - if (Date.now() - startedAt >= timeoutMs) { - throw new Error( - `Timed out waiting for pool to drain active clients after ${timeoutMs}ms`, - ); - } - - await new Promise((resolve) => setTimeout(resolve, 100)); - } -}; - -export const closePool = async (options?: { timeoutMs?: number }) => { - const timeoutMs = options?.timeoutMs ?? 10_000; - isShuttingDown = true; - clearInterval(metricsInterval); - await waitForPoolToDrain(timeoutMs); - await pool.end(); -}; - -export default pool; diff --git a/src/db/transaction.ts b/src/db/transaction.ts index e4189d1..4660c8f 100644 --- a/src/db/transaction.ts +++ b/src/db/transaction.ts @@ -1,99 +1,146 @@ -import { getClient } from "./connection.js"; +/** + * Canonical transaction helpers with retry logic. + * Exports: + * withTransaction — callback-first DB transaction (with retry) + * withRetryingTransaction — alias for backward compatibility + * withStellarAndDbTransaction — money-moving path (Stellar + DB) + * executeTransactionQueries — run queries inside an existing transaction + */ + +import pg, { type PoolClient } from "pg"; +import { pool } from "./connection.js"; import logger from "../utils/logger.js"; +// ─── Types ───────────────────────────────────────────────────────────────── + +export interface TransactionOptions { + /** Max attempts for transient-error retries (default: 3) */ + maxRetries?: number; + /** Initial back-off delay in ms (default: 200, doubles each retry) */ + baseDelayMs?: number; +} + +export type TransactionCallback = (client: PoolClient) => Promise; + +// Transient error codes that warrant retry with exponential backoff +const TRANSIENT_ERROR_CODES = new Set([ + "ECONNREFUSED", + "08000", // connection_exception + "08003", // connection_does_not_exist + "08006", // connection_failure + "57P01", // admin_shutdown + "57P02", // crash_shutdown + "57P03", // cannot_connect_now + "40001", // serialization_failure + "40P01", // deadlock_detected +]); + +function isTransientError(err: unknown): boolean { + if (!(err instanceof Error)) return false; + const code = (err as { code?: string }).code; + return typeof code === "string" && TRANSIENT_ERROR_CODES.has(code); +} + +// ─── Core withTransaction (callback-first, with retry) ───────────────────── + /** - * Execute a database transaction with automatic rollback on error - * @param operations - Array of database operations to execute within the transaction - * @returns Promise with the result of the operations + * Execute work inside a database transaction with automatic retry on + * transient PostgreSQL errors (deadlock, serialization failure, etc.). + * + * A dedicated PoolClient is checked out for each attempt so that + * BEGIN / all DML / COMMIT run on the same connection. + * If the callback throws, or a transient error is encountered, the + * transaction is rolled back and retried up to `maxRetries` times + * with exponential back-off. */ export async function withTransaction( - operations: (client: any) => Promise, + fn: TransactionCallback, + options: TransactionOptions = {}, ): Promise { - let client; - try { - client = await getClient(); - } catch (error) { - logger.error("Failed to acquire database client for transaction", { - error, - }); - throw new Error("Database connection failed"); - } - - if (!client) { - throw new Error("Database client is undefined"); - } + const maxRetries = options.maxRetries ?? 3; + const baseDelayMs = options.baseDelayMs ?? 200; + let attempt = 0; - try { - await client.query("BEGIN"); - logger.debug("Database transaction started"); - - const result = await operations(client); + while (true) { + const client = await pool.connect(); + try { + await client.query("BEGIN"); + const result = await fn(client); + await client.query("COMMIT"); + return result; + } catch (err) { + await client.query("ROLLBACK").catch((rollbackErr) => { + logger.error("Rollback failed", rollbackErr); + }); - await client.query("COMMIT"); - logger.debug("Database transaction committed"); + if (isTransientError(err) && attempt < maxRetries) { + const delay = baseDelayMs * 2 ** attempt; + attempt++; + logger.warn( + `Transient DB error in transaction (${(err as Error).message}). ` + + `Retrying in ${delay}ms (attempt ${attempt}/${maxRetries})`, + ); + await new Promise((r) => setTimeout(r, delay)); + continue; + } - return result; - } catch (error) { - await client.query("ROLLBACK"); - logger.error("Database transaction rolled back due to error:", error); - throw error; - } finally { - client.release(); + throw err; + } finally { + client.release(); + } } } +// ─── Retrying variant (alias for backward compatibility) ────────────────── + /** - * Execute multiple database operations in a transaction - * @param queries - Array of queries with their parameters - * @returns Promise with array of results + * Alias for withTransaction. Exists for code that explicitly wants the + * retrying behaviour spelled out in the call site. */ -export async function executeTransactionQueries( - queries: Array<{ text: string; params?: unknown[] }>, -): Promise { - return withTransaction(async (client) => { - const results = []; - - for (const query of queries) { - const result = await client.query(query.text, query.params || []); - results.push(result); - } - - return results; - }); +export async function withRetryingTransaction( + fn: TransactionCallback, + options?: TransactionOptions, +): Promise { + return withTransaction(fn, options); } +// ─── Stellar + DB transaction (old signature, now with retry) ───────────── + /** - * Wrapper for operations that involve both on-chain submission and database writes - * @param stellarOperation - Function that submits to Stellar network - * @param dbOperations - Function that performs database writes - * @returns Promise with combined result + * Wrapper for operations that involve both on-chain submission and database writes. + * + * ⚠️ CRITICAL: The Stellar operation executes OUTSIDE the DB transaction and is + * IRREVERSIBLE. If the DB transaction fails after Stellar succeeds, manual reconciliation + * may be required. The DB portion uses the retrying withTransaction. + * + * @param stellarOperation — Function that submits to Stellar network (executed first) + * @param dbOperations — Function that performs database writes (inside retrying tx) */ export async function withStellarAndDbTransaction( - stellarOperation: () => Promise, - dbOperations: (stellarResult: any, client: any) => Promise, -): Promise<{ stellarResult: any; dbResult: T }> { - return withTransaction(async (client) => { - try { - // Execute Stellar operation first - const stellarResult = await stellarOperation(); + stellarOperation: () => Promise, + dbOperations: (stellarResult: unknown, client: PoolClient) => Promise, +): Promise<{ stellarResult: unknown; dbResult: T }> { + // Execute Stellar operation first (irreversible — outside DB transaction) + const stellarResult = await stellarOperation(); - // Then execute database operations with the Stellar result - const dbResult = await dbOperations(stellarResult, client); + // Then execute DB operations inside the retrying transaction wrapper + const dbResult = await withTransaction(async (client) => { + return await dbOperations(stellarResult, client); + }); - return { stellarResult, dbResult }; - } catch (error) { - logger.error("Operation failed in Stellar+DB transaction:", { - error: error instanceof Error ? error.message : "Unknown error", - // Don't log sensitive Stellar data - }); + return { stellarResult, dbResult }; +} - // Log for reconciliation since Stellar transaction might have succeeded - // but DB write failed - logger.warn("Stellar transaction might need manual reconciliation", { - timestamp: new Date().toISOString(), - }); +// ─── Execute queries inside an existing transaction ──────────────────────── - throw error; - } - }); +/** + * Run a batch of queries inside an existing transaction client. + */ +export async function executeTransactionQueries( + client: PoolClient, + queries: Array<{ sql: string; params?: unknown[] }>, +): Promise { + for (const { sql, params } of queries) { + await client.query(sql, params); + } } diff --git a/src/services/databaseService.ts b/src/services/databaseService.ts index d0f6dba..0c51fbe 100644 --- a/src/services/databaseService.ts +++ b/src/services/databaseService.ts @@ -1,4 +1,4 @@ -import { query, getClient } from "../db/connection.js"; +import { query, withTransaction } from "../db/connection.js"; import type { PoolClient } from "pg"; export interface UserProfile { @@ -406,21 +406,14 @@ export class IndexedEventsService { } export class DatabaseService { + /** + * Delegate to the canonical withTransaction from connection.ts, + * which provides transient-error retry with exponential backoff. + */ static async withTransaction( callback: (client: PoolClient) => Promise, ): Promise { - const client = await getClient(); - try { - await client.query("BEGIN"); - const result = await callback(client); - await client.query("COMMIT"); - return result; - } catch (error) { - await client.query("ROLLBACK"); - throw error; - } finally { - client.release(); - } + return withTransaction(callback); } static async healthCheck(): Promise { diff --git a/src/services/eventIndexer.ts b/src/services/eventIndexer.ts index 301e594..a728b6b 100644 --- a/src/services/eventIndexer.ts +++ b/src/services/eventIndexer.ts @@ -1,5 +1,4 @@ import { rpc as SorobanRpc, scValToNative, xdr } from "@stellar/stellar-sdk"; -import { type PoolClient, query, withTransaction } from "../db/connection.js"; import logger from "../utils/logger.js"; import { createRequestId, @@ -20,6 +19,10 @@ import { sorobanService } from "./sorobanService.js"; import { updateUserScoresBulk } from "./scoresService.js"; import { AppError } from "../errors/AppError.js"; +// NEW import: +import { type PoolClient, query, pool } from "../db/connection.js"; +import { withTransaction } from "../db/transaction.js"; + const EVENT_TYPE_ALIASES: Record = { Mint: "NFTMinted", AdmRemint: "NFTMinted", @@ -455,111 +458,116 @@ export class EventIndexer { // end avoids N+1 queries and keeps scores within [300, 850]. const scoreUpdates: Map = new Map(); - await withTransaction(async (client: PoolClient) => { - for (const event of parsedEvents) { - const insertResult = await client.query( - `INSERT INTO loan_events ( - event_id, - event_type, - loan_id, - address, - amount, - ledger, - ledger_closed_at, - tx_hash, - contract_id, - topics, - value, - interest_rate_bps, - term_ledgers - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) - ON CONFLICT DO NOTHING - RETURNING event_id`, - [ - event.eventId, - event.eventType, - event.loanId ?? null, - event.address ?? null, - event.amount ?? null, - event.ledger, - event.ledgerClosedAt, - event.txHash, - event.contractId, - JSON.stringify(event.topics), - event.value, - event.interestRateBps ?? null, - event.termLedgers ?? null, - ], - ); - - if ((insertResult.rowCount ?? 0) > 0) { - insertedEvents.push(event); - - // Aggregate score deltas per borrower; a single bulk upsert at - // the end of the transaction avoids N+1 score updates. - if (event.eventType === "LoanRepaid") { - const { repaymentDelta } = sorobanService.getScoreConfig(); - if (event.address) { - scoreUpdates.set( - event.address, - (scoreUpdates.get(event.address) ?? 0) + repaymentDelta, - ); - } - } else if ( - event.eventType === "LoanDefaulted" || - event.eventType === "CollateralLiquidated" - ) { - const { defaultPenalty } = sorobanService.getScoreConfig(); - if (event.address) { - scoreUpdates.set( - event.address, - (scoreUpdates.get(event.address) ?? 0) - defaultPenalty, - ); + const client = await pool.connect(); + try { + await withTransaction(client, async (tx: PoolClient) => { + for (const event of parsedEvents) { + const insertResult = await tx.query( + `INSERT INTO loan_events ( + event_id, + event_type, + loan_id, + address, + amount, + ledger, + ledger_closed_at, + tx_hash, + contract_id, + topics, + value, + interest_rate_bps, + term_ledgers + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ON CONFLICT DO NOTHING + RETURNING event_id`, + [ + event.eventId, + event.eventType, + event.loanId ?? null, + event.address ?? null, + event.amount ?? null, + event.ledger, + event.ledgerClosedAt, + event.txHash, + event.contractId, + JSON.stringify(event.topics), + event.value, + event.interestRateBps ?? null, + event.termLedgers ?? null, + ], + ); + + if ((insertResult.rowCount ?? 0) > 0) { + insertedEvents.push(event); + + // Aggregate score deltas per borrower; a single bulk upsert at + // the end of the transaction avoids N+1 score updates. + if (event.eventType === "LoanRepaid") { + const { repaymentDelta } = sorobanService.getScoreConfig(); + if (event.address) { + scoreUpdates.set( + event.address, + (scoreUpdates.get(event.address) ?? 0) + repaymentDelta, + ); + } + } else if ( + event.eventType === "LoanDefaulted" || + event.eventType === "CollateralLiquidated" + ) { + const { defaultPenalty } = sorobanService.getScoreConfig(); + if (event.address) { + scoreUpdates.set( + event.address, + (scoreUpdates.get(event.address) ?? 0) - defaultPenalty, + ); + } } } } - } - - // Apply batched score updates on the same pinned client so that both - // the event inserts and the score changes are committed or rolled back - // together — satisfying the atomicity requirement. - if (scoreUpdates.size > 0) { - await updateUserScoresBulk(scoreUpdates, client); - } - }); - // withTransaction commits here; any error triggers automatic ROLLBACK - for (const event of insertedEvents) { - webhookService.dispatch(event).catch((error) => { - logger.error("Webhook dispatch failed", { - eventId: event.eventId, - error, - }); + // Apply batched score updates on the same pinned client so that both + // the event inserts and the score changes are committed or rolled back + // together — satisfying the atomicity requirement. + if (scoreUpdates.size > 0) { + await updateUserScoresBulk(scoreUpdates, tx); + } }); + // withTransaction commits here; any error triggers automatic ROLLBACK - eventStreamService.broadcast({ - eventId: event.eventId, - eventType: event.eventType, - ...(event.loanId !== undefined ? { loanId: event.loanId } : {}), - address: event.address, - ...(event.amount !== undefined ? { amount: event.amount } : {}), - ledger: event.ledger, - ledgerClosedAt: event.ledgerClosedAt.toISOString(), - txHash: event.txHash, - }); + for (const event of insertedEvents) { + webhookService.dispatch(event).catch((error) => { + logger.error("Webhook dispatch failed", { + eventId: event.eventId, + error, + }); + }); - this.triggerNotification(event).catch((error) => { - logger.error("Notification trigger failed", { + eventStreamService.broadcast({ eventId: event.eventId, - error, + eventType: event.eventType, + ...(event.loanId !== undefined ? { loanId: event.loanId } : {}), + address: event.address, + ...(event.amount !== undefined ? { amount: event.amount } : {}), + ledger: event.ledger, + ledgerClosedAt: event.ledgerClosedAt.toISOString(), + txHash: event.txHash, }); - }); - } - return { - insertedCount: insertedEvents.length, - }; + this.triggerNotification(event).catch((error) => { + logger.error("Notification trigger failed", { + eventId: event.eventId, + error, + }); + }); + } + + return { + insertedCount: insertedEvents.length, + }; + } finally { + client.release(); + } } private parseEvent(event: SorobanRawEvent): ContractEvent | null { diff --git a/src/services/remittanceService.ts b/src/services/remittanceService.ts index 95476f2..c440d33 100644 --- a/src/services/remittanceService.ts +++ b/src/services/remittanceService.ts @@ -129,7 +129,7 @@ export const remittanceService = { return await withTransaction(async (client) => { const result = await client.query( `INSERT INTO remittances - (id, sender_id, recipient_address, amount, from_currency, to_currency, memo, status, xdr, created_at, updated_at) + (id, sender_id, recipient_address, amount, from_currency, to_currency, memo, status, xdr, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *`, [ @@ -207,7 +207,7 @@ export const remittanceService = { } const result = await query( - `SELECT * FROM remittances + `SELECT * FROM remittances WHERE ${whereClause} ORDER BY created_at DESC, id DESC LIMIT $${params.length + 1}`, @@ -303,7 +303,7 @@ export const remittanceService = { ): Promise { try { const result = await query( - `UPDATE remittances + `UPDATE remittances SET status = $1, transaction_hash = $2, error_message = $3, updated_at = $4 WHERE id = $5 RETURNING *`,