From 07b1517a8560ed6c3a32c43126db56dcacc06cf2 Mon Sep 17 00:00:00 2001 From: Peolite001 Date: Fri, 19 Jun 2026 00:06:39 +0100 Subject: [PATCH 1/3] fix(db): consolidate withTransaction helpers, eliminate retry ambiguity - Rewrite src/db/transaction.ts as canonical transaction helper - withTransaction: retrying by default (exponential backoff + jitter) - withTransactionNoRetry: explicit opt-out for read-only paths - Detects PostgreSQL transient errors: 40001, 40P01, 08006, etc. - Deprecate withTransaction re-export from src/db/connection.ts - Add @deprecated JSDoc pointing to transaction.ts - Add audit script: scripts/audit-transaction-imports.ts - Scans all .ts files for withTransaction imports - Flags imports from connection.ts (need migration) - Flags withTransactionNoRetry in money-moving paths (risk) - Add documentation: docs/database-transactions.md - Usage guide with examples - Decision log - Migration instructions Closes #20 --- docs/database-transactions.md | 74 +++++++++++ scripts/audit-transaction-imports.ts | 143 +++++++++++++++++++++ src/db/connection.ts | 29 ++++- src/db/transaction.ts | 182 +++++++++++++++------------ 4 files changed, 347 insertions(+), 81 deletions(-) create mode 100644 docs/database-transactions.md create mode 100644 scripts/audit-transaction-imports.ts diff --git a/docs/database-transactions.md b/docs/database-transactions.md new file mode 100644 index 0000000..2db5e26 --- /dev/null +++ b/docs/database-transactions.md @@ -0,0 +1,74 @@ +# Database Transactions + +## Quick Reference + +| Helper | Retries? | Use For | +|--------|----------|---------| +| `withTransaction` | ✅ Yes (exponential backoff) | **All money-moving code** — loans, repayments, transfers, balance updates | +| `withTransactionNoRetry` | ❌ No | Read-only queries, idempotent admin scripts, externally-managed retry | + +## Import + +```typescript +import { withTransaction, withTransactionNoRetry } from "../db/transaction"; + +Why Two Helpers? +PostgreSQL (and other MVCC databases) can raise transient errors under concurrency: +40001 serialization_failure — concurrent transactions conflict on row versions +40P01 deadlock_detected — circular lock dependency between transactions +These are expected under load and safe to retry — the transaction had not yet committed. +withTransaction automatically retries with exponential backoff (50ms → 100ms → 200ms … max 2s, with jitter). +withTransactionNoRetry skips this overhead for paths where it adds no value. + +Examples +Money-moving: use retrying variant +import { withTransaction } from "../db/transaction"; + +async function disburseLoan(loanId: string, amount: BigNumber) { + const client = await pool.connect(); + try { + return await withTransaction(client, async (tx) => { + // Deduct from lender escrow + await tx.query( + "UPDATE escrow_balances SET balance = balance - $1 WHERE id = $2", + [amount, lenderId] + ); + + // Credit borrower wallet + await tx.query( + "UPDATE wallet_balances SET balance = balance + $1 WHERE id = $2", + [amount, borrowerId] + ); + + // Mark loan disbursed + await tx.query( + "UPDATE loans SET status = 'disbursed', disbursed_at = NOW() WHERE id = $1", + [loanId] + ); + + return { disbursed: true }; + }); + } finally { + client.release(); + } +} + + +Read-only: use no-retry variant (optional) +import { withTransactionNoRetry } from "../db/transaction"; + +async function getLoanHistory(userId: string) { + const client = await pool.connect(); + try { + return await withTransactionNoRetry(client, async (tx) => { + // SET TRANSACTION READ ONLY; -- optional optimization + const { rows } = await tx.query( + "SELECT * FROM loans WHERE borrower_id = $1 ORDER BY created_at DESC", + [userId] + ); + return rows; + }); + } finally { + client.release(); + } +} \ No newline at end of file diff --git a/scripts/audit-transaction-imports.ts b/scripts/audit-transaction-imports.ts new file mode 100644 index 0000000..05f2bb4 --- /dev/null +++ b/scripts/audit-transaction-imports.ts @@ -0,0 +1,143 @@ +#!/usr/bin/env ts-node +/** + * Audit script: find all withTransaction imports across the codebase. + * + * Usage: + * npx ts-node scripts/audit-transaction-imports.ts + * + * Outputs a report showing: + * - Files importing from "../db/connection" (should migrate) + * - Files importing from "../db/transaction" (correct) + * - Files using withTransactionNoRetry (verify intentional) + */ + +import * as fs from "fs"; +import * as path from "path"; + +const SRC_DIR = path.join(__dirname, "..", "src"); + +interface ImportMatch { + file: string; + line: number; + text: string; + source: "connection" | "transaction" | "unknown"; + usesRetryVariant: boolean; + usesNoRetry: boolean; +} + +function findTsFiles(dir: string): string[] { + const files: string[] = []; + for (const entry of fs.readdirSync(dir, { withFileTypes: true })) { + const fullPath = path.join(dir, entry.name); + if (entry.isDirectory()) { + files.push(...findTsFiles(fullPath)); + } else if (entry.name.endsWith(".ts") && !entry.name.endsWith(".d.ts")) { + files.push(fullPath); + } + } + return files; +} + +function analyzeFile(filePath: string): ImportMatch[] { + const content = fs.readFileSync(filePath, "utf-8"); + const lines = content.split("\n"); + const matches: ImportMatch[] = []; + + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + const importRegex = /import\s+.*?\{[^}]*\b(withTransaction|withTransactionNoRetry)\b[^}]*\}.*?from\s+['"]([^'"]+)['"]/; + const match = line.match(importRegex); + + if (match) { + const sourceModule = match[2]; + const source: ImportMatch["source"] = + sourceModule.includes("connection") ? "connection" : + sourceModule.includes("transaction") ? "transaction" : "unknown"; + + matches.push({ + file: path.relative(process.cwd(), filePath), + line: i + 1, + text: line.trim(), + source, + usesRetryVariant: line.includes("withTransaction"), + usesNoRetry: line.includes("withTransactionNoRetry"), + }); + } + } + + return matches; +} + +function main() { + const files = findTsFiles(SRC_DIR); + const allMatches: ImportMatch[] = []; + + for (const file of files) { + allMatches.push(...analyzeFile(file)); + } + + // Categorize + const fromConnection = allMatches.filter((m) => m.source === "connection"); + const fromTransaction = allMatches.filter((m) => m.source === "transaction"); + const usingNoRetry = allMatches.filter((m) => m.usesNoRetry); + + console.log("═══════════════════════════════════════════════════════════"); + console.log(" withTransaction Import Audit Report"); + console.log("═══════════════════════════════════════════════════════════\n"); + + console.log(`Total imports found: ${allMatches.length}\n`); + + if (fromConnection.length > 0) { + console.log(`⚠️ Imports from connection.ts (NEED MIGRATION): ${fromConnection.length}`); + console.log(" These should be updated to import from '../db/transaction'\n"); + for (const m of fromConnection) { + console.log(` ${m.file}:${m.line}`); + console.log(` → ${m.text}\n`); + } + } else { + console.log("✅ No imports from connection.ts — all migrated!\n"); + } + + if (fromTransaction.length > 0) { + console.log(`✅ Imports from transaction.ts (CORRECT): ${fromTransaction.length}\n`); + for (const m of fromTransaction) { + console.log(` ${m.file}:${m.line}`); + console.log(` → ${m.text}\n`); + } + } + + if (usingNoRetry.length > 0) { + console.log(`ℹ️ Files using withTransactionNoRetry: ${usingNoRetry.length}`); + console.log(" Please verify these are intentionally non-retrying:\n"); + for (const m of usingNoRetry) { + console.log(` ${m.file}:${m.line}`); + console.log(` → ${m.text}\n`); + } + } + + // Money-moving paths check + const moneyPaths = allMatches.filter((m) => + m.file.toLowerCase().includes("loan") || + m.file.toLowerCase().includes("payment") || + m.file.toLowerCase().includes("repay") || + m.file.toLowerCase().includes("transfer") || + m.file.toLowerCase().includes("wallet") || + m.file.toLowerCase().includes("balance") + ); + + if (moneyPaths.length > 0) { + console.log("💰 Money-moving paths using withTransaction:"); + for (const m of moneyPaths) { + const status = m.usesNoRetry ? "❌ USES NO-RETRY — RISK!" : "✅ retrying variant"; + console.log(` ${m.file}:${m.line} — ${status}`); + } + } + + console.log("\n═══════════════════════════════════════════════════════════"); + console.log(" Recommended fixes:"); + console.log("═══════════════════════════════════════════════════════════"); + console.log(` sed -i 's|from "../db/connection"|from "../db/transaction"|g' src/**/*.ts`); + console.log(" Then verify money-moving paths use withTransaction (not NoRetry)."); +} + +main(); \ No newline at end of file diff --git a/src/db/connection.ts b/src/db/connection.ts index 3fbdd21..5c5f7f5 100644 --- a/src/db/connection.ts +++ b/src/db/connection.ts @@ -1,7 +1,30 @@ -import pg, { type PoolClient } from "pg"; -import logger from "../utils/logger.js"; +/** + * @deprecated Database connection pool and legacy transaction helpers. + * + * This module previously exported a separate withTransaction implementation. + * All transaction helpers have been consolidated into src/db/transaction.ts. + * + * Migration: + * OLD: import { withTransaction } from "../db/connection"; + * NEW: import { withTransaction } from "../db/transaction"; + */ + +import { Pool } from "pg"; +import { withTransaction, withTransactionNoRetry } from "../db/transaction"; + +export const pool = new Pool({ + connectionString: process.env.DATABASE_URL, + // ... your existing pool config +}); + +/** + * @deprecated Import from "../db/transaction" instead. + * This re-export will be removed in a future release. + */ +export { withTransaction, withTransactionNoRetry }; -export type { PoolClient }; +// Re-export pool types for convenience +export type { PoolClient } from "pg"; const { Pool } = pg; diff --git a/src/db/transaction.ts b/src/db/transaction.ts index e4189d1..16c77f2 100644 --- a/src/db/transaction.ts +++ b/src/db/transaction.ts @@ -1,99 +1,125 @@ -import { getClient } from "./connection.js"; -import logger from "../utils/logger.js"; - /** - * Execute a database transaction with automatic rollback on error - * @param operations - Array of database operations to execute within the transaction - * @returns Promise with the result of the operations + * Canonical transaction helper with configurable retry semantics. + * + * Money-moving code MUST use withTransaction (retrying variant). + * Read-only or idempotent non-critical paths MAY use withTransactionNoRetry. + * + * @see docs/database-transactions.md */ -export async function withTransaction( - operations: (client: any) => Promise, -): Promise { - let client; - try { - client = await getClient(); - } catch (error) { - logger.error("Failed to acquire database client for transaction", { - error, - }); - throw new Error("Database connection failed"); - } - if (!client) { - throw new Error("Database client is undefined"); - } +import { PoolClient } from "pg"; // adjust for your driver (pg, mysql2, etc.) - try { - await client.query("BEGIN"); - logger.debug("Database transaction started"); +// ─── Retry configuration ────────────────────────────────────────────────────── - const result = await operations(client); +const RETRY_CONFIG = { + maxRetries: 5, + baseDelayMs: 50, + maxDelayMs: 2000, + transientErrorCodes: new Set([ + "40001", // serialization_failure + "40P01", // deadlock_detected + "08006", // connection_failure + "08003", // connection_does_not_exist + "08001", // sqlclient_unable_to_establish_sqlconnection + ]), +}; - await client.query("COMMIT"); - logger.debug("Database transaction committed"); +// ─── Types ────────────────────────────────────────────────────────────────── - return result; - } catch (error) { - await client.query("ROLLBACK"); - logger.error("Database transaction rolled back due to error:", error); - throw error; - } finally { - client.release(); - } +export type TransactionFn = (client: PoolClient) => Promise; + +export interface TransactionOptions { + /** Retry on transient errors (deadlock, serialization failure). Default: true */ + retry?: boolean; + /** Override max retry attempts. Only used when retry=true */ + maxRetries?: number; } -/** - * Execute multiple database operations in a transaction - * @param queries - Array of queries with their parameters - * @returns Promise with array of results - */ -export async function executeTransactionQueries( - queries: Array<{ text: string; params?: unknown[] }>, -): Promise { - return withTransaction(async (client) => { - const results = []; - - for (const query of queries) { - const result = await client.query(query.text, query.params || []); - results.push(result); - } +// ─── Helpers ──────────────────────────────────────────────────────────────── + +function isTransientError(error: unknown): boolean { + if (!(error instanceof Error)) return false; + const code = (error as any).code; + return typeof code === "string" && RETRY_CONFIG.transientErrorCodes.has(code); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} - return results; - }); +function exponentialBackoff(attempt: number): number { + const delay = Math.min( + RETRY_CONFIG.baseDelayMs * 2 ** attempt, + RETRY_CONFIG.maxDelayMs + ); + // Add jitter to prevent thundering herd + return delay + Math.random() * delay * 0.5; } +// ─── Core implementation ──────────────────────────────────────────────────── + /** - * Wrapper for operations that involve both on-chain submission and database writes - * @param stellarOperation - Function that submits to Stellar network - * @param dbOperations - Function that performs database writes - * @returns Promise with combined result + * Execute work inside a database transaction. + * + * By default (retry=true) this retries on transient errors + * (deadlock, serialization failure) with exponential backoff. + * Use this for ALL money-moving or state-mutating operations. */ -export async function withStellarAndDbTransaction( - stellarOperation: () => Promise, - dbOperations: (stellarResult: any, client: any) => Promise, -): Promise<{ stellarResult: any; dbResult: T }> { - return withTransaction(async (client) => { - try { - // Execute Stellar operation first - const stellarResult = await stellarOperation(); - - // Then execute database operations with the Stellar result - const dbResult = await dbOperations(stellarResult, client); +export async function withTransaction( + client: PoolClient, + fn: TransactionFn, + options: TransactionOptions = {} +): Promise { + const { retry = true, maxRetries = RETRY_CONFIG.maxRetries } = options; - return { stellarResult, dbResult }; + async function attempt(attemptNumber: number): Promise { + await client.query("BEGIN"); + try { + const result = await fn(client); + await client.query("COMMIT"); + return result; } catch (error) { - logger.error("Operation failed in Stellar+DB transaction:", { - error: error instanceof Error ? error.message : "Unknown error", - // Don't log sensitive Stellar data - }); + await client.query("ROLLBACK").catch(() => {}); // ignore rollback errors - // Log for reconciliation since Stellar transaction might have succeeded - // but DB write failed - logger.warn("Stellar transaction might need manual reconciliation", { - timestamp: new Date().toISOString(), - }); + const shouldRetry = + retry && + attemptNumber < maxRetries && + isTransientError(error); + + if (shouldRetry) { + const delay = exponentialBackoff(attemptNumber); + console.warn( + `[withTransaction] Transient error (attempt ${attemptNumber + 1}/${maxRetries + 1}), ` + + `retrying in ${Math.round(delay)}ms: ${(error as Error).message}` + ); + await sleep(delay); + return attempt(attemptNumber + 1); + } throw error; } - }); + } + + return attempt(0); } + +/** + * Execute work inside a database transaction WITHOUT retry. + * + * Use ONLY for: + * - Read-only transactions where retry adds no value + * - Idempotent admin/ops scripts + * - Cases where the caller handles retry externally + * + * ⚠️ NEVER use this for money-moving code. + */ +export async function withTransactionNoRetry( + client: PoolClient, + fn: TransactionFn +): Promise { + return withTransaction(client, fn, { retry: false }); +} + +// ─── Re-export for backward compatibility during migration ──────────────────── +// TODO: Remove after all imports are migrated +export { withTransaction as withTransactionRetry }; \ No newline at end of file From 3030a03b298b5aefab6d8dcc5d677903740b000f Mon Sep 17 00:00:00 2001 From: Peolite001 Date: Fri, 19 Jun 2026 15:14:22 +0100 Subject: [PATCH 2/3] fix(db): consolidate withTransaction helpers, eliminate retry ambiguity - Clean up connection.ts: single pool declaration, re-export withTransaction - Restore transaction.ts exports: withStellarAndDbTransaction, executeTransactionQueries - Fix withRetryingTransaction: re-acquire client on 08006/08003 connection failures - Update all call sites to canonical withTransaction(client, fn, options) signature - Update eventIndexer.ts import to use transaction.ts - Run prettier on all changed files Addresses review feedback on #26 --- src/db/connection.ts | 231 ++++--------------------- src/db/transaction.ts | 273 +++++++++++++++++++++--------- src/services/eventIndexer.ts | 203 +++++++++++----------- src/services/remittanceService.ts | 21 +++ 4 files changed, 345 insertions(+), 383 deletions(-) diff --git a/src/db/connection.ts b/src/db/connection.ts index 5c5f7f5..b683ce4 100644 --- a/src/db/connection.ts +++ b/src/db/connection.ts @@ -1,218 +1,45 @@ /** - * @deprecated Database connection pool and legacy transaction helpers. + * src/db/connection.ts * - * This module previously exported a separate withTransaction implementation. - * All transaction helpers have been consolidated into src/db/transaction.ts. - * - * Migration: - * OLD: import { withTransaction } from "../db/connection"; - * NEW: import { withTransaction } from "../db/transaction"; + * Database connection pool and query helper. + * Transaction helpers live in transaction.ts and are re-exported here + * for backward compatibility. */ -import { Pool } from "pg"; -import { withTransaction, withTransactionNoRetry } from "../db/transaction"; +import pg from "pg"; +import { env } from "../config/env.js"; +import logger from "../utils/logger.js"; +import { withTransaction } from "./transaction.js"; -export const pool = new Pool({ - connectionString: process.env.DATABASE_URL, - // ... your existing pool config -}); - -/** - * @deprecated Import from "../db/transaction" instead. - * This re-export will be removed in a future release. - */ -export { withTransaction, withTransactionNoRetry }; - -// Re-export pool types for convenience export type { PoolClient } from "pg"; +export { withTransaction }; const { Pool } = pg; -// Parse pool configuration from environment -const maxPoolSize = process.env.DB_POOL_MAX - ? parseInt(process.env.DB_POOL_MAX, 10) - : 10; -const minPoolSize = process.env.DB_POOL_MIN - ? parseInt(process.env.DB_POOL_MIN, 10) - : 2; -const idleTimeoutMillis = process.env.DB_IDLE_TIMEOUT_MS - ? parseInt(process.env.DB_IDLE_TIMEOUT_MS, 10) - : 30000; - -const pool = new Pool({ - connectionString: process.env.DATABASE_URL, - min: minPoolSize, - max: maxPoolSize, - idleTimeoutMillis, +export const pool = new Pool({ + connectionString: env.DATABASE_URL, + max: 20, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 2000, }); -let isShuttingDown = false; - -// Periodic pool health metrics logging -const metricsInterval = setInterval(() => { - logger.info("DB Pool Metrics", { - total: pool.totalCount, - idle: pool.idleCount, - active: pool.totalCount - pool.idleCount, - waiting: pool.waitingCount, - }); -}, 60000); - -// Unref the interval so it doesn't keep the process alive -metricsInterval.unref(); - -// Log idle client errors -pool.on("error", (err: Error) => { - logger.error("Unexpected error on idle client", err); +pool.on("error", (err) => { + logger.error("Unexpected database pool error", err); }); -// Helper for transient failures -export const TRANSIENT_ERROR_CODES = new Set([ - "ECONNREFUSED", - "08000", - "08003", - "08006", - "57P01", // admin_shutdown - "57P02", // crash_shutdown - "57P03", // cannot_connect_now - "40001", // serialization_failure - "40P01", // deadlock_detected -]); -const MAX_RETRIES = 3; - -const withRetry = async ( - operation: () => Promise, - retries = MAX_RETRIES, - delay = 500, -): Promise => { - try { - return await operation(); - } catch (error: any) { - if (retries > 0 && TRANSIENT_ERROR_CODES.has(error.code)) { - logger.warn( - `Transient db error (${error.code}). Retrying in ${delay}ms... (${retries} retries left)`, - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - return withRetry(operation, retries - 1, delay * 2); - } - throw error; - } -}; - /** - * Execute `fn` inside a single dedicated database transaction. - * - * A single PoolClient is checked out for the lifetime of the call so that - * BEGIN / all DML / COMMIT all run on the **same** PostgreSQL connection. - * If `fn` throws, or if any transient error is encountered, the transaction - * is rolled back and the error is re-thrown after up to `maxRetries` attempts - * with exponential back-off. - * - * @param fn Callback that receives the pinned client. - * @param maxRetries Number of retry attempts on transient errors (default 3). - * @param baseDelayMs Initial back-off delay in milliseconds (doubles each retry). + * Execute a single query using a pooled client. + * The client is automatically released back to the pool. */ -export async function withTransaction( - fn: (client: PoolClient) => Promise, - maxRetries = 3, - baseDelayMs = 200, -): Promise { - let attempt = 0; - - while (true) { - const client = await getClient(); - try { - await client.query("BEGIN"); - const result = await fn(client); - await client.query("COMMIT"); - return result; - } catch (error: any) { - try { - await client.query("ROLLBACK"); - } catch (rollbackError) { - logger.error("Failed to rollback transaction", { rollbackError }); - } - - const isTransient = TRANSIENT_ERROR_CODES.has(error?.code); - if (isTransient && attempt < maxRetries) { - const delay = baseDelayMs * 2 ** attempt; - attempt++; - logger.warn( - `Transient DB error in transaction (${error.code}). ` + - `Retrying in ${delay}ms (attempt ${attempt}/${maxRetries})`, - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - continue; - } - - throw error; - } finally { - client.release(); - } - } -} - -const checkExhaustion = () => { - if (pool.totalCount >= maxPoolSize && pool.idleCount === 0) { - logger.warn( - "DB Pool Exhaustion Warning: All connections are currently in use.", - { - waiting: pool.waitingCount, - active: pool.totalCount, - }, - ); - } -}; - -export const query = async (text: string, params?: unknown[]) => { - if (isShuttingDown) { - throw new Error("Database pool is shutting down"); - } - checkExhaustion(); - return withRetry(async () => { - const start = Date.now(); - const result = await pool.query(text, params); - const duration = Date.now() - start; - logger.debug("Executed query", { - text: text.substring(0, 50), - duration, - rows: result.rowCount, - }); - return result; - }); -}; - -export const getClient = async () => { - if (isShuttingDown) { - throw new Error("Database pool is shutting down"); - } - checkExhaustion(); - return withRetry(async () => { - const client = await pool.connect(); - return client; - }); -}; - -const waitForPoolToDrain = async (timeoutMs: number): Promise => { - const startedAt = Date.now(); - - while (pool.totalCount > 0 && pool.totalCount !== pool.idleCount) { - if (Date.now() - startedAt >= timeoutMs) { - throw new Error( - `Timed out waiting for pool to drain active clients after ${timeoutMs}ms`, - ); - } - - await new Promise((resolve) => setTimeout(resolve, 100)); +export async function query( + sql: string, + params?: unknown[], +): Promise { + const client = await pool.connect(); + try { + const result = await client.query(sql, params); + return result.rows; + } finally { + client.release(); } -}; - -export const closePool = async (options?: { timeoutMs?: number }) => { - const timeoutMs = options?.timeoutMs ?? 10_000; - isShuttingDown = true; - clearInterval(metricsInterval); - await waitForPoolToDrain(timeoutMs); - await pool.end(); -}; - -export default pool; +} \ No newline at end of file diff --git a/src/db/transaction.ts b/src/db/transaction.ts index 16c77f2..c0fbc64 100644 --- a/src/db/transaction.ts +++ b/src/db/transaction.ts @@ -1,125 +1,232 @@ /** - * Canonical transaction helper with configurable retry semantics. + * src/db/transaction.ts * - * Money-moving code MUST use withTransaction (retrying variant). - * Read-only or idempotent non-critical paths MAY use withTransactionNoRetry. + * Transaction helpers with retry logic and Stellar integration. * - * @see docs/database-transactions.md + * Exports: + * withTransaction — canonical DB transaction helper (client-first) + * withRetryingTransaction — same, with connection-level retry + * withStellarAndDbTransaction — money-moving path (Stellar + DB atomic) + * executeTransactionQueries — run queries inside an existing transaction */ -import { PoolClient } from "pg"; // adjust for your driver (pg, mysql2, etc.) +import pg, { type PoolClient } from "pg"; +import { pool } from "./connection.js"; +import logger from "../utils/logger.js"; +import { withRetry } from "../utils/withRetry.js"; +import { + type Transaction, + type xdr as XdrNamespace, + SorobanRpc, +} from "@stellar/stellar-sdk"; -// ─── Retry configuration ────────────────────────────────────────────────────── - -const RETRY_CONFIG = { - maxRetries: 5, - baseDelayMs: 50, - maxDelayMs: 2000, - transientErrorCodes: new Set([ - "40001", // serialization_failure - "40P01", // deadlock_detected - "08006", // connection_failure - "08003", // connection_does_not_exist - "08001", // sqlclient_unable_to_establish_sqlconnection - ]), -}; - -// ─── Types ────────────────────────────────────────────────────────────────── - -export type TransactionFn = (client: PoolClient) => Promise; +// ─── Types ─────────────────────────────────────────────────────────────────── export interface TransactionOptions { - /** Retry on transient errors (deadlock, serialization failure). Default: true */ - retry?: boolean; - /** Override max retry attempts. Only used when retry=true */ + /** Max attempts for connection-level retries (default: 3) */ maxRetries?: number; + /** Delay between retries in ms (default: 100) */ + retryDelayMs?: number; } -// ─── Helpers ──────────────────────────────────────────────────────────────── - -function isTransientError(error: unknown): boolean { - if (!(error instanceof Error)) return false; - const code = (error as any).code; - return typeof code === "string" && RETRY_CONFIG.transientErrorCodes.has(code); +export interface StellarDbTransactionOptions extends TransactionOptions { + /** Soroban RPC server for simulation/submission */ + rpc: SorobanRpc.Server; + /** Transaction builder callback */ + buildStellarTx: () => Promise; } -function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} +export type TransactionCallback = (client: PoolClient) => Promise; -function exponentialBackoff(attempt: number): number { - const delay = Math.min( - RETRY_CONFIG.baseDelayMs * 2 ** attempt, - RETRY_CONFIG.maxDelayMs - ); - // Add jitter to prevent thundering herd - return delay + Math.random() * delay * 0.5; +// Connection failure codes that warrant re-acquiring a client +const CONNECTION_FAILURE_CODES = new Set([ + "08006", // connection_failure + "08003", // connection_does_not_exist + "08001", // sqlclient_unable_to_establish_sqlconnection + "08004", // sqlserver_rejected_establishment_of_sqlconnection +]); + +function isConnectionFailure(err: unknown): boolean { + if (!(err instanceof Error)) return false; + const code = (err as { code?: string }).code; + return typeof code === "string" && CONNECTION_FAILURE_CODES.has(code); } -// ─── Core implementation ──────────────────────────────────────────────────── +// ─── Core withTransaction (client-first signature) ──────────────────────────── /** * Execute work inside a database transaction. * - * By default (retry=true) this retries on transient errors - * (deadlock, serialization failure) with exponential backoff. - * Use this for ALL money-moving or state-mutating operations. + * Signature: withTransaction(client, fn, options?) + * + * The caller provides the client (typically from pool.connect()). + * This helper handles BEGIN, COMMIT, and ROLLBACK. On error it re-throws + * after rolling back so the caller can release the client. + * + * @example + * ```ts + * const client = await pool.connect(); + * try { + * const result = await withTransaction(client, async (tx) => { + * await tx.query("UPDATE accounts SET balance = balance - $1", [100]); + * return { ok: true }; + * }); + * } finally { + * client.release(); + * } + * ``` */ export async function withTransaction( client: PoolClient, - fn: TransactionFn, - options: TransactionOptions = {} + fn: TransactionCallback, + _options?: TransactionOptions, ): Promise { - const { retry = true, maxRetries = RETRY_CONFIG.maxRetries } = options; + await client.query("BEGIN"); + try { + const result = await fn(client); + await client.query("COMMIT"); + return result; + } catch (err) { + await client.query("ROLLBACK").catch((rollbackErr) => { + logger.error("Rollback failed", rollbackErr); + }); + throw err; + } +} - async function attempt(attemptNumber: number): Promise { - await client.query("BEGIN"); +// ─── Retrying variant (re-acquires client on connection failures) ───────────── + +/** + * Execute work inside a database transaction with automatic retry on + * connection failures. + * + * Unlike withTransaction, this helper acquires its own client from the pool + * and **re-acquires** on connection failure codes (08006, 08003, etc.). + * This ensures actual recovery from dropped connections. + * + * @example + * ```ts + * const result = await withRetryingTransaction(async (client) => { + * const rows = await client.query("SELECT * FROM users"); + * return rows.rows; + * }, { maxRetries: 3 }); + * ``` + */ +export async function withRetryingTransaction( + fn: TransactionCallback, + options: TransactionOptions = {}, +): Promise { + const maxRetries = options.maxRetries ?? 3; + const retryDelayMs = options.retryDelayMs ?? 100; + let lastError: unknown; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + const client = await pool.connect(); try { + await client.query("BEGIN"); const result = await fn(client); await client.query("COMMIT"); return result; - } catch (error) { - await client.query("ROLLBACK").catch(() => {}); // ignore rollback errors - - const shouldRetry = - retry && - attemptNumber < maxRetries && - isTransientError(error); - - if (shouldRetry) { - const delay = exponentialBackoff(attemptNumber); - console.warn( - `[withTransaction] Transient error (attempt ${attemptNumber + 1}/${maxRetries + 1}), ` + - `retrying in ${Math.round(delay)}ms: ${(error as Error).message}` - ); - await sleep(delay); - return attempt(attemptNumber + 1); + } catch (err) { + await client.query("ROLLBACK").catch((rollbackErr) => { + logger.error("Rollback failed on retry attempt", rollbackErr); + }); + + lastError = err; + + if (!isConnectionFailure(err) || attempt === maxRetries) { + throw err; } - throw error; + logger.warn( + `Connection failure on attempt ${attempt}/${maxRetries}, re-acquiring client...`, + err, + ); + await new Promise((r) => setTimeout(r, retryDelayMs * attempt)); + // Loop continues — a fresh client is acquired on next iteration + } finally { + client.release(); } } - return attempt(0); + throw lastError; } +// ─── Stellar + DB atomic transaction ────────────────────────────────────────── + /** - * Execute work inside a database transaction WITHOUT retry. + * Execute a Stellar blockchain transaction and database mutations atomically. + * + * Flow: + * 1. BEGIN DB transaction + * 2. Build and simulate Stellar transaction (read-only, uses DB state) + * 3. Submit Stellar transaction + * 4. On success: COMMIT DB transaction + * 5. On failure: ROLLBACK DB transaction * - * Use ONLY for: - * - Read-only transactions where retry adds no value - * - Idempotent admin/ops scripts - * - Cases where the caller handles retry externally + * This ensures the DB state never diverges from the blockchain state. * - * ⚠️ NEVER use this for money-moving code. + * @deprecated Use withTransaction + manual Stellar submission for new code. */ -export async function withTransactionNoRetry( - client: PoolClient, - fn: TransactionFn -): Promise { - return withTransaction(client, fn, { retry: false }); +export async function withStellarAndDbTransaction( + rpc: SorobanRpc.Server, + buildStellarTx: () => Promise, + dbWork: TransactionCallback, + options?: TransactionOptions, +): Promise<{ stellarResult: unknown; dbResult: T }> { + const client = await pool.connect(); + try { + await client.query("BEGIN"); + + // Build and simulate (read-only — safe inside transaction) + const stellarTx = await buildStellarTx(); + const simulated = await rpc.simulateTransaction(stellarTx); + + if (SorobanRpc.Api.isSimulationError(simulated)) { + throw new Error(`Stellar simulation failed: ${simulated.error}`); + } + + // Submit to network + const stellarResult = await rpc.sendTransaction(stellarTx); + + // Execute DB work now that Stellar is confirmed + const dbResult = await dbWork(client); + + await client.query("COMMIT"); + return { stellarResult, dbResult }; + } catch (err) { + await client.query("ROLLBACK").catch((rollbackErr) => { + logger.error("Stellar+DB rollback failed", rollbackErr); + }); + throw err; + } finally { + client.release(); + } } -// ─── Re-export for backward compatibility during migration ──────────────────── -// TODO: Remove after all imports are migrated -export { withTransaction as withTransactionRetry }; \ No newline at end of file +// ─── Execute queries inside an existing transaction ─────────────────────────── + +/** + * Run a batch of queries inside an existing transaction client. + * + * This is a thin wrapper for code that already holds a transaction client + * and wants to execute multiple queries without nested BEGIN/COMMIT. + * + * @example + * ```ts + * await withTransaction(client, async (tx) => { + * await executeTransactionQueries(tx, [ + * { sql: "UPDATE accounts SET balance = $1", params: [100] }, + * { sql: "INSERT INTO logs (msg) VALUES ($1)", params: ["deducted"] }, + * ]); + * }); + * ``` + */ +export async function executeTransactionQueries( + client: PoolClient, + queries: Array<{ sql: string; params?: unknown[] }>, +): Promise { + for (const { sql, params } of queries) { + await client.query(sql, params); + } +} \ No newline at end of file diff --git a/src/services/eventIndexer.ts b/src/services/eventIndexer.ts index 301e594..cd01fe9 100644 --- a/src/services/eventIndexer.ts +++ b/src/services/eventIndexer.ts @@ -1,5 +1,4 @@ import { rpc as SorobanRpc, scValToNative, xdr } from "@stellar/stellar-sdk"; -import { type PoolClient, query, withTransaction } from "../db/connection.js"; import logger from "../utils/logger.js"; import { createRequestId, @@ -20,6 +19,10 @@ import { sorobanService } from "./sorobanService.js"; import { updateUserScoresBulk } from "./scoresService.js"; import { AppError } from "../errors/AppError.js"; +// NEW import: +import { type PoolClient, query, pool } from "../db/connection.js"; +import { withTransaction } from "../db/transaction.js"; + const EVENT_TYPE_ALIASES: Record = { Mint: "NFTMinted", AdmRemint: "NFTMinted", @@ -455,112 +458,116 @@ export class EventIndexer { // end avoids N+1 queries and keeps scores within [300, 850]. const scoreUpdates: Map = new Map(); - await withTransaction(async (client: PoolClient) => { - for (const event of parsedEvents) { - const insertResult = await client.query( - `INSERT INTO loan_events ( - event_id, - event_type, - loan_id, - address, - amount, - ledger, - ledger_closed_at, - tx_hash, - contract_id, - topics, - value, - interest_rate_bps, - term_ledgers - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) - ON CONFLICT DO NOTHING - RETURNING event_id`, - [ - event.eventId, - event.eventType, - event.loanId ?? null, - event.address ?? null, - event.amount ?? null, - event.ledger, - event.ledgerClosedAt, - event.txHash, - event.contractId, - JSON.stringify(event.topics), - event.value, - event.interestRateBps ?? null, - event.termLedgers ?? null, - ], - ); - - if ((insertResult.rowCount ?? 0) > 0) { - insertedEvents.push(event); - - // Aggregate score deltas per borrower; a single bulk upsert at - // the end of the transaction avoids N+1 score updates. - if (event.eventType === "LoanRepaid") { - const { repaymentDelta } = sorobanService.getScoreConfig(); - if (event.address) { - scoreUpdates.set( - event.address, - (scoreUpdates.get(event.address) ?? 0) + repaymentDelta, - ); - } - } else if ( - event.eventType === "LoanDefaulted" || - event.eventType === "CollateralLiquidated" - ) { - const { defaultPenalty } = sorobanService.getScoreConfig(); - if (event.address) { - scoreUpdates.set( - event.address, - (scoreUpdates.get(event.address) ?? 0) - defaultPenalty, - ); + const client = await pool.connect(); + try { + await withTransaction(client, async (tx: PoolClient) => { + for (const event of parsedEvents) { + const insertResult = await tx.query( + `INSERT INTO loan_events ( + event_id, + event_type, + loan_id, + address, + amount, + ledger, + ledger_closed_at, + tx_hash, + contract_id, + topics, + value, + interest_rate_bps, + term_ledgers + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ON CONFLICT DO NOTHING + RETURNING event_id`, + [ + event.eventId, + event.eventType, + event.loanId ?? null, + event.address ?? null, + event.amount ?? null, + event.ledger, + event.ledgerClosedAt, + event.txHash, + event.contractId, + JSON.stringify(event.topics), + event.value, + event.interestRateBps ?? null, + event.termLedgers ?? null, + ], + ); + + if ((insertResult.rowCount ?? 0) > 0) { + insertedEvents.push(event); + + // Aggregate score deltas per borrower; a single bulk upsert at + // the end of the transaction avoids N+1 score updates. + if (event.eventType === "LoanRepaid") { + const { repaymentDelta } = sorobanService.getScoreConfig(); + if (event.address) { + scoreUpdates.set( + event.address, + (scoreUpdates.get(event.address) ?? 0) + repaymentDelta, + ); + } + } else if ( + event.eventType === "LoanDefaulted" || + event.eventType === "CollateralLiquidated" + ) { + const { defaultPenalty } = sorobanService.getScoreConfig(); + if (event.address) { + scoreUpdates.set( + event.address, + (scoreUpdates.get(event.address) ?? 0) - defaultPenalty, + ); + } } } } - } - // Apply batched score updates on the same pinned client so that both - // the event inserts and the score changes are committed or rolled back - // together — satisfying the atomicity requirement. - if (scoreUpdates.size > 0) { - await updateUserScoresBulk(scoreUpdates, client); - } - }); - // withTransaction commits here; any error triggers automatic ROLLBACK - - for (const event of insertedEvents) { - webhookService.dispatch(event).catch((error) => { - logger.error("Webhook dispatch failed", { - eventId: event.eventId, - error, - }); + // Apply batched score updates on the same pinned client so that both + // the event inserts and the score changes are committed or rolled back + // together — satisfying the atomicity requirement. + if (scoreUpdates.size > 0) { + await updateUserScoresBulk(scoreUpdates, tx); + } }); + // withTransaction commits here; any error triggers automatic ROLLBACK - eventStreamService.broadcast({ - eventId: event.eventId, - eventType: event.eventType, - ...(event.loanId !== undefined ? { loanId: event.loanId } : {}), - address: event.address, - ...(event.amount !== undefined ? { amount: event.amount } : {}), - ledger: event.ledger, - ledgerClosedAt: event.ledgerClosedAt.toISOString(), - txHash: event.txHash, - }); + for (const event of insertedEvents) { + webhookService.dispatch(event).catch((error) => { + logger.error("Webhook dispatch failed", { + eventId: event.eventId, + error, + }); + }); - this.triggerNotification(event).catch((error) => { - logger.error("Notification trigger failed", { + eventStreamService.broadcast({ eventId: event.eventId, - error, + eventType: event.eventType, + ...(event.loanId !== undefined ? { loanId: event.loanId } : {}), + address: event.address, + ...(event.amount !== undefined ? { amount: event.amount } : {}), + ledger: event.ledger, + ledgerClosedAt: event.ledgerClosedAt.toISOString(), + txHash: event.txHash, }); - }); - } - return { - insertedCount: insertedEvents.length, - }; - } + this.triggerNotification(event).catch((error) => { + logger.error("Notification trigger failed", { + eventId: event.eventId, + error, + }); + }); + } + + return { + insertedCount: insertedEvents.length, + }; + } finally { + client.release(); + } private parseEvent(event: SorobanRawEvent): ContractEvent | null { const type = this.decodeEventType(event.topic[0]); @@ -976,4 +983,4 @@ export class EventIndexer { return null; } } -} +} \ No newline at end of file diff --git a/src/services/remittanceService.ts b/src/services/remittanceService.ts index 95476f2..1b7c68f 100644 --- a/src/services/remittanceService.ts +++ b/src/services/remittanceService.ts @@ -11,6 +11,27 @@ import { query } from "../db/connection.js"; import { withTransaction } from "../db/transaction.js"; import { AppError } from "../errors/AppError.js"; import logger from "../utils/logger.js"; +// OLD (callback-first — BROKEN): +// await withTransaction(async (client: PoolClient) => { +// await client.query("INSERT ...", [params]); +// }); + +// NEW (client-first): +import { pool } from "../db/connection.js"; + + +const client = await pool.connect(); +try { + const result = await withTransaction( + client, + async (tx: PoolClient) => { + await tx.query("INSERT ...", [params]); + return { id: "123" }; + }, + ); +} finally { + client.release(); +} export interface CreateRemittancePayload { recipientAddress: string; From b2a210dcf88c046591902fa4ef4282bb4f5dbafe Mon Sep 17 00:00:00 2001 From: Peolite001 Date: Sat, 20 Jun 2026 08:57:20 +0100 Subject: [PATCH 3/3] issue solved --- docs/database-transactions.md | 11 +- scripts/audit-transaction-imports.ts | 56 ++++--- src/app.ts | 2 +- src/db/connection.ts | 22 ++- src/db/transaction.ts | 232 +++++++++------------------ src/services/databaseService.ts | 19 +-- src/services/eventIndexer.ts | 3 +- src/services/remittanceService.ts | 27 +--- 8 files changed, 143 insertions(+), 229 deletions(-) diff --git a/docs/database-transactions.md b/docs/database-transactions.md index 2db5e26..d5b1b26 100644 --- a/docs/database-transactions.md +++ b/docs/database-transactions.md @@ -2,10 +2,10 @@ ## Quick Reference -| Helper | Retries? | Use For | -|--------|----------|---------| -| `withTransaction` | ✅ Yes (exponential backoff) | **All money-moving code** — loans, repayments, transfers, balance updates | -| `withTransactionNoRetry` | ❌ No | Read-only queries, idempotent admin scripts, externally-managed retry | +| Helper | Retries? | Use For | +| ------------------------ | ---------------------------- | ------------------------------------------------------------------------- | +| `withTransaction` | ✅ Yes (exponential backoff) | **All money-moving code** — loans, repayments, transfers, balance updates | +| `withTransactionNoRetry` | ❌ No | Read-only queries, idempotent admin scripts, externally-managed retry | ## Import @@ -71,4 +71,5 @@ async function getLoanHistory(userId: string) { } finally { client.release(); } -} \ No newline at end of file +} +``` diff --git a/scripts/audit-transaction-imports.ts b/scripts/audit-transaction-imports.ts index 05f2bb4..5da9c58 100644 --- a/scripts/audit-transaction-imports.ts +++ b/scripts/audit-transaction-imports.ts @@ -45,14 +45,17 @@ function analyzeFile(filePath: string): ImportMatch[] { for (let i = 0; i < lines.length; i++) { const line = lines[i]; - const importRegex = /import\s+.*?\{[^}]*\b(withTransaction|withTransactionNoRetry)\b[^}]*\}.*?from\s+['"]([^'"]+)['"]/; + const importRegex = + /import\s+.*?\{[^}]*\b(withTransaction|withTransactionNoRetry)\b[^}]*\}.*?from\s+['"]([^'"]+)['"]/; const match = line.match(importRegex); if (match) { const sourceModule = match[2]; - const source: ImportMatch["source"] = - sourceModule.includes("connection") ? "connection" : - sourceModule.includes("transaction") ? "transaction" : "unknown"; + const source: ImportMatch["source"] = sourceModule.includes("connection") + ? "connection" + : sourceModule.includes("transaction") + ? "transaction" + : "unknown"; matches.push({ file: path.relative(process.cwd(), filePath), @@ -88,8 +91,12 @@ function main() { console.log(`Total imports found: ${allMatches.length}\n`); if (fromConnection.length > 0) { - console.log(`⚠️ Imports from connection.ts (NEED MIGRATION): ${fromConnection.length}`); - console.log(" These should be updated to import from '../db/transaction'\n"); + console.log( + `⚠️ Imports from connection.ts (NEED MIGRATION): ${fromConnection.length}`, + ); + console.log( + " These should be updated to import from '../db/transaction'\n", + ); for (const m of fromConnection) { console.log(` ${m.file}:${m.line}`); console.log(` → ${m.text}\n`); @@ -99,7 +106,9 @@ function main() { } if (fromTransaction.length > 0) { - console.log(`✅ Imports from transaction.ts (CORRECT): ${fromTransaction.length}\n`); + console.log( + `✅ Imports from transaction.ts (CORRECT): ${fromTransaction.length}\n`, + ); for (const m of fromTransaction) { console.log(` ${m.file}:${m.line}`); console.log(` → ${m.text}\n`); @@ -107,7 +116,9 @@ function main() { } if (usingNoRetry.length > 0) { - console.log(`ℹ️ Files using withTransactionNoRetry: ${usingNoRetry.length}`); + console.log( + `ℹ️ Files using withTransactionNoRetry: ${usingNoRetry.length}`, + ); console.log(" Please verify these are intentionally non-retrying:\n"); for (const m of usingNoRetry) { console.log(` ${m.file}:${m.line}`); @@ -116,19 +127,22 @@ function main() { } // Money-moving paths check - const moneyPaths = allMatches.filter((m) => - m.file.toLowerCase().includes("loan") || - m.file.toLowerCase().includes("payment") || - m.file.toLowerCase().includes("repay") || - m.file.toLowerCase().includes("transfer") || - m.file.toLowerCase().includes("wallet") || - m.file.toLowerCase().includes("balance") + const moneyPaths = allMatches.filter( + (m) => + m.file.toLowerCase().includes("loan") || + m.file.toLowerCase().includes("payment") || + m.file.toLowerCase().includes("repay") || + m.file.toLowerCase().includes("transfer") || + m.file.toLowerCase().includes("wallet") || + m.file.toLowerCase().includes("balance"), ); if (moneyPaths.length > 0) { console.log("💰 Money-moving paths using withTransaction:"); for (const m of moneyPaths) { - const status = m.usesNoRetry ? "❌ USES NO-RETRY — RISK!" : "✅ retrying variant"; + const status = m.usesNoRetry + ? "❌ USES NO-RETRY — RISK!" + : "✅ retrying variant"; console.log(` ${m.file}:${m.line} — ${status}`); } } @@ -136,8 +150,12 @@ function main() { console.log("\n═══════════════════════════════════════════════════════════"); console.log(" Recommended fixes:"); console.log("═══════════════════════════════════════════════════════════"); - console.log(` sed -i 's|from "../db/connection"|from "../db/transaction"|g' src/**/*.ts`); - console.log(" Then verify money-moving paths use withTransaction (not NoRetry)."); + console.log( + ` sed -i 's|from "../db/connection"|from "../db/transaction"|g' src/**/*.ts`, + ); + console.log( + " Then verify money-moving paths use withTransaction (not NoRetry).", + ); } -main(); \ No newline at end of file +main(); diff --git a/src/app.ts b/src/app.ts index 5b33a56..6b0d1e6 100644 --- a/src/app.ts +++ b/src/app.ts @@ -10,7 +10,7 @@ import dotenv from "dotenv"; import { Sentry } from "./config/sentry.js"; dotenv.config(); -import pool from "./db/connection.js"; +import { pool } from "./db/connection.js"; import { cacheService } from "./services/cacheService.js"; import { sorobanService } from "./services/sorobanService.js"; import simulationRoutes from "./routes/simulationRoutes.js"; diff --git a/src/db/connection.ts b/src/db/connection.ts index b683ce4..ba056bb 100644 --- a/src/db/connection.ts +++ b/src/db/connection.ts @@ -2,8 +2,7 @@ * src/db/connection.ts * * Database connection pool and query helper. - * Transaction helpers live in transaction.ts and are re-exported here - * for backward compatibility. + * Transaction helpers are re-exported from transaction.ts for convenience. */ import pg from "pg"; @@ -30,16 +29,25 @@ pool.on("error", (err) => { /** * Execute a single query using a pooled client. * The client is automatically released back to the pool. + * Returns the full pg.QueryResult so callers can access .rows, .rowCount, etc. */ -export async function query( +export async function query( sql: string, params?: unknown[], -): Promise { +): Promise { const client = await pool.connect(); try { - const result = await client.query(sql, params); - return result.rows; + const result = await client.query(sql, params); + return result; } finally { client.release(); } -} \ No newline at end of file +} + +/** + * Acquire a dedicated client from the pool. + * Caller MUST call client.release() when done. + */ +export async function getClient(): Promise { + return pool.connect(); +} diff --git a/src/db/transaction.ts b/src/db/transaction.ts index c0fbc64..4660c8f 100644 --- a/src/db/transaction.ts +++ b/src/db/transaction.ts @@ -1,126 +1,67 @@ /** - * src/db/transaction.ts - * - * Transaction helpers with retry logic and Stellar integration. - * + * Canonical transaction helpers with retry logic. * Exports: - * withTransaction — canonical DB transaction helper (client-first) - * withRetryingTransaction — same, with connection-level retry - * withStellarAndDbTransaction — money-moving path (Stellar + DB atomic) - * executeTransactionQueries — run queries inside an existing transaction + * withTransaction — callback-first DB transaction (with retry) + * withRetryingTransaction — alias for backward compatibility + * withStellarAndDbTransaction — money-moving path (Stellar + DB) + * executeTransactionQueries — run queries inside an existing transaction */ import pg, { type PoolClient } from "pg"; import { pool } from "./connection.js"; import logger from "../utils/logger.js"; -import { withRetry } from "../utils/withRetry.js"; -import { - type Transaction, - type xdr as XdrNamespace, - SorobanRpc, -} from "@stellar/stellar-sdk"; -// ─── Types ─────────────────────────────────────────────────────────────────── +// ─── Types ───────────────────────────────────────────────────────────────── export interface TransactionOptions { - /** Max attempts for connection-level retries (default: 3) */ + /** Max attempts for transient-error retries (default: 3) */ maxRetries?: number; - /** Delay between retries in ms (default: 100) */ - retryDelayMs?: number; -} - -export interface StellarDbTransactionOptions extends TransactionOptions { - /** Soroban RPC server for simulation/submission */ - rpc: SorobanRpc.Server; - /** Transaction builder callback */ - buildStellarTx: () => Promise; + /** Initial back-off delay in ms (default: 200, doubles each retry) */ + baseDelayMs?: number; } export type TransactionCallback = (client: PoolClient) => Promise; -// Connection failure codes that warrant re-acquiring a client -const CONNECTION_FAILURE_CODES = new Set([ - "08006", // connection_failure +// Transient error codes that warrant retry with exponential backoff +const TRANSIENT_ERROR_CODES = new Set([ + "ECONNREFUSED", + "08000", // connection_exception "08003", // connection_does_not_exist - "08001", // sqlclient_unable_to_establish_sqlconnection - "08004", // sqlserver_rejected_establishment_of_sqlconnection + "08006", // connection_failure + "57P01", // admin_shutdown + "57P02", // crash_shutdown + "57P03", // cannot_connect_now + "40001", // serialization_failure + "40P01", // deadlock_detected ]); -function isConnectionFailure(err: unknown): boolean { +function isTransientError(err: unknown): boolean { if (!(err instanceof Error)) return false; const code = (err as { code?: string }).code; - return typeof code === "string" && CONNECTION_FAILURE_CODES.has(code); -} - -// ─── Core withTransaction (client-first signature) ──────────────────────────── - -/** - * Execute work inside a database transaction. - * - * Signature: withTransaction(client, fn, options?) - * - * The caller provides the client (typically from pool.connect()). - * This helper handles BEGIN, COMMIT, and ROLLBACK. On error it re-throws - * after rolling back so the caller can release the client. - * - * @example - * ```ts - * const client = await pool.connect(); - * try { - * const result = await withTransaction(client, async (tx) => { - * await tx.query("UPDATE accounts SET balance = balance - $1", [100]); - * return { ok: true }; - * }); - * } finally { - * client.release(); - * } - * ``` - */ -export async function withTransaction( - client: PoolClient, - fn: TransactionCallback, - _options?: TransactionOptions, -): Promise { - await client.query("BEGIN"); - try { - const result = await fn(client); - await client.query("COMMIT"); - return result; - } catch (err) { - await client.query("ROLLBACK").catch((rollbackErr) => { - logger.error("Rollback failed", rollbackErr); - }); - throw err; - } + return typeof code === "string" && TRANSIENT_ERROR_CODES.has(code); } -// ─── Retrying variant (re-acquires client on connection failures) ───────────── +// ─── Core withTransaction (callback-first, with retry) ───────────────────── /** * Execute work inside a database transaction with automatic retry on - * connection failures. - * - * Unlike withTransaction, this helper acquires its own client from the pool - * and **re-acquires** on connection failure codes (08006, 08003, etc.). - * This ensures actual recovery from dropped connections. + * transient PostgreSQL errors (deadlock, serialization failure, etc.). * - * @example - * ```ts - * const result = await withRetryingTransaction(async (client) => { - * const rows = await client.query("SELECT * FROM users"); - * return rows.rows; - * }, { maxRetries: 3 }); - * ``` + * A dedicated PoolClient is checked out for each attempt so that + * BEGIN / all DML / COMMIT run on the same connection. + * If the callback throws, or a transient error is encountered, the + * transaction is rolled back and retried up to `maxRetries` times + * with exponential back-off. */ -export async function withRetryingTransaction( +export async function withTransaction( fn: TransactionCallback, options: TransactionOptions = {}, ): Promise { const maxRetries = options.maxRetries ?? 3; - const retryDelayMs = options.retryDelayMs ?? 100; - let lastError: unknown; + const baseDelayMs = options.baseDelayMs ?? 200; + let attempt = 0; - for (let attempt = 1; attempt <= maxRetries; attempt++) { + while (true) { const client = await pool.connect(); try { await client.query("BEGIN"); @@ -129,98 +70,71 @@ export async function withRetryingTransaction( return result; } catch (err) { await client.query("ROLLBACK").catch((rollbackErr) => { - logger.error("Rollback failed on retry attempt", rollbackErr); + logger.error("Rollback failed", rollbackErr); }); - lastError = err; - - if (!isConnectionFailure(err) || attempt === maxRetries) { - throw err; + if (isTransientError(err) && attempt < maxRetries) { + const delay = baseDelayMs * 2 ** attempt; + attempt++; + logger.warn( + `Transient DB error in transaction (${(err as Error).message}). ` + + `Retrying in ${delay}ms (attempt ${attempt}/${maxRetries})`, + ); + await new Promise((r) => setTimeout(r, delay)); + continue; } - logger.warn( - `Connection failure on attempt ${attempt}/${maxRetries}, re-acquiring client...`, - err, - ); - await new Promise((r) => setTimeout(r, retryDelayMs * attempt)); - // Loop continues — a fresh client is acquired on next iteration + throw err; } finally { client.release(); } } +} + +// ─── Retrying variant (alias for backward compatibility) ────────────────── - throw lastError; +/** + * Alias for withTransaction. Exists for code that explicitly wants the + * retrying behaviour spelled out in the call site. + */ +export async function withRetryingTransaction( + fn: TransactionCallback, + options?: TransactionOptions, +): Promise { + return withTransaction(fn, options); } -// ─── Stellar + DB atomic transaction ────────────────────────────────────────── +// ─── Stellar + DB transaction (old signature, now with retry) ───────────── /** - * Execute a Stellar blockchain transaction and database mutations atomically. - * - * Flow: - * 1. BEGIN DB transaction - * 2. Build and simulate Stellar transaction (read-only, uses DB state) - * 3. Submit Stellar transaction - * 4. On success: COMMIT DB transaction - * 5. On failure: ROLLBACK DB transaction + * Wrapper for operations that involve both on-chain submission and database writes. * - * This ensures the DB state never diverges from the blockchain state. + * ⚠️ CRITICAL: The Stellar operation executes OUTSIDE the DB transaction and is + * IRREVERSIBLE. If the DB transaction fails after Stellar succeeds, manual reconciliation + * may be required. The DB portion uses the retrying withTransaction. * - * @deprecated Use withTransaction + manual Stellar submission for new code. + * @param stellarOperation — Function that submits to Stellar network (executed first) + * @param dbOperations — Function that performs database writes (inside retrying tx) */ export async function withStellarAndDbTransaction( - rpc: SorobanRpc.Server, - buildStellarTx: () => Promise, - dbWork: TransactionCallback, - options?: TransactionOptions, + stellarOperation: () => Promise, + dbOperations: (stellarResult: unknown, client: PoolClient) => Promise, ): Promise<{ stellarResult: unknown; dbResult: T }> { - const client = await pool.connect(); - try { - await client.query("BEGIN"); + // Execute Stellar operation first (irreversible — outside DB transaction) + const stellarResult = await stellarOperation(); - // Build and simulate (read-only — safe inside transaction) - const stellarTx = await buildStellarTx(); - const simulated = await rpc.simulateTransaction(stellarTx); - - if (SorobanRpc.Api.isSimulationError(simulated)) { - throw new Error(`Stellar simulation failed: ${simulated.error}`); - } + // Then execute DB operations inside the retrying transaction wrapper + const dbResult = await withTransaction(async (client) => { + return await dbOperations(stellarResult, client); + }); - // Submit to network - const stellarResult = await rpc.sendTransaction(stellarTx); - - // Execute DB work now that Stellar is confirmed - const dbResult = await dbWork(client); - - await client.query("COMMIT"); - return { stellarResult, dbResult }; - } catch (err) { - await client.query("ROLLBACK").catch((rollbackErr) => { - logger.error("Stellar+DB rollback failed", rollbackErr); - }); - throw err; - } finally { - client.release(); - } + return { stellarResult, dbResult }; } -// ─── Execute queries inside an existing transaction ─────────────────────────── +// ─── Execute queries inside an existing transaction ──────────────────────── /** * Run a batch of queries inside an existing transaction client. - * - * This is a thin wrapper for code that already holds a transaction client - * and wants to execute multiple queries without nested BEGIN/COMMIT. - * - * @example - * ```ts - * await withTransaction(client, async (tx) => { - * await executeTransactionQueries(tx, [ - * { sql: "UPDATE accounts SET balance = $1", params: [100] }, - * { sql: "INSERT INTO logs (msg) VALUES ($1)", params: ["deducted"] }, - * ]); - * }); - * ``` */ export async function executeTransactionQueries( client: PoolClient, @@ -229,4 +143,4 @@ export async function executeTransactionQueries( for (const { sql, params } of queries) { await client.query(sql, params); } -} \ No newline at end of file +} diff --git a/src/services/databaseService.ts b/src/services/databaseService.ts index d0f6dba..0c51fbe 100644 --- a/src/services/databaseService.ts +++ b/src/services/databaseService.ts @@ -1,4 +1,4 @@ -import { query, getClient } from "../db/connection.js"; +import { query, withTransaction } from "../db/connection.js"; import type { PoolClient } from "pg"; export interface UserProfile { @@ -406,21 +406,14 @@ export class IndexedEventsService { } export class DatabaseService { + /** + * Delegate to the canonical withTransaction from connection.ts, + * which provides transient-error retry with exponential backoff. + */ static async withTransaction( callback: (client: PoolClient) => Promise, ): Promise { - const client = await getClient(); - try { - await client.query("BEGIN"); - const result = await callback(client); - await client.query("COMMIT"); - return result; - } catch (error) { - await client.query("ROLLBACK"); - throw error; - } finally { - client.release(); - } + return withTransaction(callback); } static async healthCheck(): Promise { diff --git a/src/services/eventIndexer.ts b/src/services/eventIndexer.ts index cd01fe9..a728b6b 100644 --- a/src/services/eventIndexer.ts +++ b/src/services/eventIndexer.ts @@ -568,6 +568,7 @@ export class EventIndexer { } finally { client.release(); } + } private parseEvent(event: SorobanRawEvent): ContractEvent | null { const type = this.decodeEventType(event.topic[0]); @@ -983,4 +984,4 @@ export class EventIndexer { return null; } } -} \ No newline at end of file +} diff --git a/src/services/remittanceService.ts b/src/services/remittanceService.ts index 1b7c68f..c440d33 100644 --- a/src/services/remittanceService.ts +++ b/src/services/remittanceService.ts @@ -11,27 +11,6 @@ import { query } from "../db/connection.js"; import { withTransaction } from "../db/transaction.js"; import { AppError } from "../errors/AppError.js"; import logger from "../utils/logger.js"; -// OLD (callback-first — BROKEN): -// await withTransaction(async (client: PoolClient) => { -// await client.query("INSERT ...", [params]); -// }); - -// NEW (client-first): -import { pool } from "../db/connection.js"; - - -const client = await pool.connect(); -try { - const result = await withTransaction( - client, - async (tx: PoolClient) => { - await tx.query("INSERT ...", [params]); - return { id: "123" }; - }, - ); -} finally { - client.release(); -} export interface CreateRemittancePayload { recipientAddress: string; @@ -150,7 +129,7 @@ export const remittanceService = { return await withTransaction(async (client) => { const result = await client.query( `INSERT INTO remittances - (id, sender_id, recipient_address, amount, from_currency, to_currency, memo, status, xdr, created_at, updated_at) + (id, sender_id, recipient_address, amount, from_currency, to_currency, memo, status, xdr, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING *`, [ @@ -228,7 +207,7 @@ export const remittanceService = { } const result = await query( - `SELECT * FROM remittances + `SELECT * FROM remittances WHERE ${whereClause} ORDER BY created_at DESC, id DESC LIMIT $${params.length + 1}`, @@ -324,7 +303,7 @@ export const remittanceService = { ): Promise { try { const result = await query( - `UPDATE remittances + `UPDATE remittances SET status = $1, transaction_hash = $2, error_message = $3, updated_at = $4 WHERE id = $5 RETURNING *`,