From b22aa3c4f96ed42e8a13bf6c45756c684637a1ce Mon Sep 17 00:00:00 2001 From: DevALVIN-24 Date: Sat, 30 May 2026 21:35:17 +0100 Subject: [PATCH 1/2] feat: add RelayerBalanceMonitorService to monitor relayer XLM balance - Implement background service checking relayer native XLM balance every 100 ledgers - Trigger a high-priority system warning log if balance falls below 50 XLM - Implement 5-minute fallback polling comparing latest ledger sequence in case stream stalls - Integrate service with server startup and graceful shutdown - Add comprehensive Jest unit test suite in test/relayerBalanceMonitor.jest.test.ts --- src/index.ts | 15 ++ src/services/relayerBalanceMonitor.ts | 201 ++++++++++++++++++++++++ test/relayerBalanceMonitor.jest.test.ts | 127 +++++++++++++++ 3 files changed, 343 insertions(+) create mode 100644 src/services/relayerBalanceMonitor.ts create mode 100644 test/relayerBalanceMonitor.jest.test.ts diff --git a/src/index.ts b/src/index.ts index f2a4e687..9b4506e7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,6 +18,7 @@ import { GasBalanceMonitorService, getGasBalanceMonitorService, } from "./services/gasBalanceMonitorService"; +import { relayerBalanceMonitorService } from "./services/relayerBalanceMonitor"; import { validateEnv } from "./utils/envValidator"; import { enableGlobalLogMasking } from "./utils/logMasker"; import { hourlyAverageService } from "./services/hourlyAverageService"; @@ -293,6 +294,7 @@ const shutdown = async (signal: "SIGINT" | "SIGTERM"): Promise => { multiSigSubmissionService.stop(); // FIX 2: Optional chaining β€” safe to call even if service never started gasBalanceMonitorService?.stop(); + relayerBalanceMonitorService.stop(); hourlyAverageService.stop(); priceAggregatorService.stop(); providerSecretRotationService.stop(); @@ -448,6 +450,19 @@ httpServer.listen(PORT, async () => { err instanceof Error ? err.message : err, ); } + + // Start background relayer balance monitor service + try { + relayerBalanceMonitorService.start().catch((err: Error) => { + console.error("Failed to start relayer balance monitor service:", err); + }); + console.log(`β›½ Relayer balance monitor service started`); + } catch (err) { + console.warn( + "Relayer balance monitor service not started:", + err instanceof Error ? err.message : err, + ); + } }); export default app; diff --git a/src/services/relayerBalanceMonitor.ts b/src/services/relayerBalanceMonitor.ts new file mode 100644 index 00000000..e91d4a65 --- /dev/null +++ b/src/services/relayerBalanceMonitor.ts @@ -0,0 +1,201 @@ +import { Horizon } from "@stellar/stellar-sdk"; +import { signer } from "../signer"; +import stellarProvider from "../lib/stellarProvider"; +import { logger } from "../utils/logger"; +import dotenv from "dotenv"; + +dotenv.config(); + +/** + * RelayerBalanceMonitorService + * Background service that monitors the relayer wallet's XLM balance. + * Checks the balance every 100 ledgers and triggers a high-priority system + * warning log if the balance drops below the threshold (default: 50 XLM). + */ +export class RelayerBalanceMonitorService { + private server: Horizon.Server; + private isRunning: boolean = false; + private closeStream: (() => void) | null = null; + private lastCheckedLedger: number = 0; + private balanceThresholdXLM: number; + private fallbackTimer: ReturnType | null = null; + + constructor() { + this.server = stellarProvider.getServer(); + const threshold = parseFloat(process.env.RELAYER_BALANCE_ALERT_THRESHOLD_XLM || "50"); + this.balanceThresholdXLM = isNaN(threshold) ? 50 : threshold; + } + + /** + * Start the background balance monitor service. + */ + async start(): Promise { + if (this.isRunning) { + logger.warn("[RelayerBalanceMonitor] Service is already running"); + return; + } + + this.isRunning = true; + logger.info(`[RelayerBalanceMonitor] Started with threshold: ${this.balanceThresholdXLM} XLM`); + + // Perform an initial fetch of latest ledger sequence & check balance immediately + try { + this.server = stellarProvider.getServer(); + const root = await this.server.root(); + this.lastCheckedLedger = root.history_latest_ledger_sequence || 0; + logger.info(`[RelayerBalanceMonitor] Initialized latest ledger sequence: ${this.lastCheckedLedger}`); + } catch (err) { + logger.error("[RelayerBalanceMonitor] Failed to fetch initial ledger sequence from Horizon:", err); + } + + await this.checkBalance().catch((err) => { + logger.error("[RelayerBalanceMonitor] Error during initial balance check:", err); + }); + + this.startStream(); + + // Fallback timer: every 5 minutes, query server root to check if ledger has advanced by 100 + this.fallbackTimer = setInterval(() => { + this.runFallbackCheck().catch((err) => { + logger.error("[RelayerBalanceMonitor] Error in fallback check:", err); + }); + }, 5 * 60 * 1000); // 5 minutes + } + + /** + * Stop the background balance monitor service. + */ + stop(): void { + if (this.closeStream) { + try { + this.closeStream(); + } catch {} + this.closeStream = null; + } + if (this.fallbackTimer) { + clearInterval(this.fallbackTimer); + this.fallbackTimer = null; + } + this.isRunning = false; + logger.info("[RelayerBalanceMonitor] Stopped"); + } + + /** + * Start the ledger stream to count and react to closed ledgers. + */ + private startStream(): void { + if (!this.isRunning) return; + + try { + this.server = stellarProvider.getServer(); + this.closeStream = this.server.ledgers() + .cursor("now") + .stream({ + onmessage: (ledger) => { + const currentLedger = ledger.sequence; + if (this.lastCheckedLedger === 0) { + this.lastCheckedLedger = currentLedger; + } else if (currentLedger - this.lastCheckedLedger >= 100) { + logger.info( + `[RelayerBalanceMonitor] 100 ledgers advanced (from ${this.lastCheckedLedger} to ${currentLedger}). Triggering balance check.` + ); + this.lastCheckedLedger = currentLedger; + this.checkBalance().catch((err) => { + logger.error("[RelayerBalanceMonitor] Error in ledger-triggered balance check:", err); + }); + } + }, + onerror: (error) => { + logger.warn("[RelayerBalanceMonitor] Ledger stream error, restarting stream in 10 seconds:", error); + this.restartStream(); + } + }); + } catch (err) { + logger.error("[RelayerBalanceMonitor] Failed to start ledger stream, retrying in 10 seconds:", err); + this.restartStream(); + } + } + + /** + * Safely restarts the ledger stream. + */ + private restartStream(): void { + if (this.closeStream) { + try { + this.closeStream(); + } catch {} + this.closeStream = null; + } + setTimeout(() => { + if (this.isRunning) { + this.startStream(); + } + }, 10000); + } + + /** + * Fallback poll that queries Horizon root to compare ledger sequence numbers. + */ + private async runFallbackCheck(): Promise { + try { + this.server = stellarProvider.getServer(); + const root = await this.server.root(); + const currentLedger = root.history_latest_ledger_sequence; + if (currentLedger && (this.lastCheckedLedger === 0 || currentLedger - this.lastCheckedLedger >= 100)) { + logger.info( + `[RelayerBalanceMonitor] Fallback check: ${currentLedger - this.lastCheckedLedger} ledgers advanced. Triggering balance check.` + ); + this.lastCheckedLedger = currentLedger; + await this.checkBalance(); + } + } catch (err) { + logger.error("[RelayerBalanceMonitor] Failed fallback check Horizon root call:", err); + } + } + + /** + * Check the balance of the relayer and log a high-priority warning if below threshold. + */ + async checkBalance(): Promise { + try { + this.server = stellarProvider.getServer(); + const publicKey = await signer.getPublicKey(); + + const account = await this.server.loadAccount(publicKey); + const xlmBalance = account.balances.find( + (balance) => balance.asset_type === "native" + ); + + if (!xlmBalance) { + logger.warn(`[RelayerBalanceMonitor] No native XLM balance found for relayer account: ${publicKey}`); + return; + } + + const balanceAmount = parseFloat(xlmBalance.balance); + logger.info(`[RelayerBalanceMonitor] Relayer wallet balance: ${balanceAmount} XLM (threshold: ${this.balanceThresholdXLM} XLM)`); + + if (balanceAmount < this.balanceThresholdXLM) { + logger.warn( + `[SYSTEM_WARNING] [RelayerBalanceMonitor] 🚨 HIGH PRIORITY: Relayer wallet (${publicKey}) balance is extremely low! ` + + `Current: ${balanceAmount} XLM, Threshold: ${this.balanceThresholdXLM} XLM. Price updates will stop completely if gas runs out!` + ); + } + } catch (err) { + logger.error("[RelayerBalanceMonitor] Failed to check relayer balance:", err); + } + } + + /** + * Get the current status of the service (useful for monitoring / testing). + */ + getStatus() { + return { + isRunning: this.isRunning, + balanceThresholdXLM: this.balanceThresholdXLM, + lastCheckedLedger: this.lastCheckedLedger, + }; + } +} + +// Export singleton instance +export const relayerBalanceMonitorService = new RelayerBalanceMonitorService(); diff --git a/test/relayerBalanceMonitor.jest.test.ts b/test/relayerBalanceMonitor.jest.test.ts new file mode 100644 index 00000000..d498fa08 --- /dev/null +++ b/test/relayerBalanceMonitor.jest.test.ts @@ -0,0 +1,127 @@ +import { jest, describe, it, expect, beforeEach, afterEach } from "@jest/globals"; +import { Keypair } from "@stellar/stellar-sdk"; + +const sourceKeypair = Keypair.random(); +const fakeServer = { + root: jest.fn(), + loadAccount: jest.fn(), + ledgers: jest.fn(), +}; + +// Stream mock helpers +let streamCallback: ((ledger: any) => void) | null = null; +let streamErrorCallback: ((err: any) => void) | null = null; +const closeStreamSpy = jest.fn(); + +fakeServer.ledgers.mockReturnValue({ + cursor: jest.fn().mockReturnThis(), + stream: jest.fn().mockImplementation((options: any) => { + streamCallback = options.onmessage; + streamErrorCallback = options.onerror; + return closeStreamSpy; + }), +} as any); + +jest.unstable_mockModule("../src/lib/stellarProvider", () => ({ + default: { + getServer: () => fakeServer, + reportFailure: jest.fn(), + }, +})); + +jest.unstable_mockModule("../src/signer", () => ({ + signer: { + getPublicKey: jest.fn(async () => sourceKeypair.publicKey()), + }, +})); + +const warnSpy = jest.fn(); +const infoSpy = jest.fn(); +const errorSpy = jest.fn(); + +jest.unstable_mockModule("../src/utils/logger", () => ({ + logger: { + warn: warnSpy, + info: infoSpy, + error: errorSpy, + }, +})); + +const { RelayerBalanceMonitorService } = await import("../src/services/relayerBalanceMonitor"); + +describe("RelayerBalanceMonitorService", () => { + beforeEach(() => { + jest.clearAllMocks(); + streamCallback = null; + streamErrorCallback = null; + + fakeServer.root.mockResolvedValue({ + history_latest_ledger_sequence: 1000, + } as any); + + fakeServer.loadAccount.mockResolvedValue({ + balances: [ + { asset_type: "native", balance: "100.0" } + ] + } as any); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it("checks balance on startup and logs normally when balance is above threshold", async () => { + const monitor = new RelayerBalanceMonitorService(); + await monitor.start(); + + expect(fakeServer.root).toHaveBeenCalled(); + expect(fakeServer.loadAccount).toHaveBeenCalledWith(sourceKeypair.publicKey()); + expect(infoSpy).toHaveBeenCalledWith(expect.stringContaining("Relayer wallet balance: 100 XLM")); + expect(warnSpy).not.toHaveBeenCalledWith(expect.stringContaining("HIGH PRIORITY")); + expect(monitor.getStatus()).toEqual({ + isRunning: true, + balanceThresholdXLM: 50, + lastCheckedLedger: 1000, + }); + + monitor.stop(); + }); + + it("triggers high-priority warning log when balance is below threshold", async () => { + fakeServer.loadAccount.mockResolvedValue({ + balances: [ + { asset_type: "native", balance: "45.0" } + ] + } as any); + + const monitor = new RelayerBalanceMonitorService(); + await monitor.start(); + + expect(fakeServer.loadAccount).toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("🚨 HIGH PRIORITY: Relayer wallet")); + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("Current: 45 XLM, Threshold: 50 XLM")); + + monitor.stop(); + }); + + it("triggers balance check when ledger sequence advances by 100 ledgers", async () => { + const monitor = new RelayerBalanceMonitorService(); + await monitor.start(); + + // Reset calls count to count only subsequent ones + fakeServer.loadAccount.mockClear(); + + expect(streamCallback).toBeDefined(); + + // Advance by 50 ledgers (from 1000 to 1050) -> should not trigger check + streamCallback!({ sequence: 1050 }); + expect(fakeServer.loadAccount).not.toHaveBeenCalled(); + + // Advance by 100 ledgers (from 1000 to 1100) -> should trigger check + streamCallback!({ sequence: 1100 }); + expect(fakeServer.loadAccount).toHaveBeenCalled(); + + monitor.stop(); + expect(closeStreamSpy).toHaveBeenCalled(); + }); +}); From d0d5f9bd299677706aa907c0e8c4bb79d4e5b117 Mon Sep 17 00:00:00 2001 From: DevALVIN-24 Date: Sat, 30 May 2026 22:53:03 +0100 Subject: [PATCH 2/2] feat: implement proactive relayer ping service with 500ms response limit --- src/metrics /index.js | 3 - src/metrics /metricsRouter.js | 54 ------------- src/metrics /oracleMetrics.js | 69 ----------------- src/services/pingService.ts | 142 ++++++++++++++++++++++++++++++++++ 4 files changed, 142 insertions(+), 126 deletions(-) delete mode 100644 src/metrics /index.js delete mode 100644 src/metrics /metricsRouter.js delete mode 100644 src/metrics /oracleMetrics.js create mode 100644 src/services/pingService.ts diff --git a/src/metrics /index.js b/src/metrics /index.js deleted file mode 100644 index 4daa0dc3..00000000 --- a/src/metrics /index.js +++ /dev/null @@ -1,3 +0,0 @@ -'use strict'; - -module.exports = require('./oracleMetrics'); diff --git a/src/metrics /metricsRouter.js b/src/metrics /metricsRouter.js deleted file mode 100644 index 65d8dd62..00000000 --- a/src/metrics /metricsRouter.js +++ /dev/null @@ -1,54 +0,0 @@ -'use strict'; - -const express = require('express'); -const { registry } = require('./oracleMetrics'); - -const router = express.Router(); - -/** - * Bearer-token middleware - * Reads METRICS_SECRET from env. Rejects with 401 if missing or wrong. - * Returns 403 if the env var itself is not configured (fail-safe). - */ -function requireMetricsToken(req, res, next) { - const secret = process.env.METRICS_SECRET; - - if (!secret) { - console.error('[metrics] METRICS_SECRET env var is not set β€” endpoint disabled'); - return res.status(403).json({ error: 'Metrics endpoint is not configured' }); - } - - const authHeader = req.headers['authorization'] || ''; - const token = authHeader.startsWith('Bearer ') ? authHeader.slice(7) : null; - - if (!token || token !== secret) { - return res.status(401).json({ error: 'Unauthorized' }); - } - - next(); -} - -/** - * GET /metrics - * Returns Prometheus text exposition format. - * Protected by Bearer token (see requireMetricsToken above). - * - * Grafana scrape config example: - * - job_name: stellarflow-oracle - * bearer_token: - * static_configs: - * - targets: ['your-host:3000'] - * metrics_path: /metrics - */ -router.get('/', requireMetricsToken, async (req, res) => { - try { - res.set('Content-Type', registry.contentType); - const output = await registry.metrics(); - res.end(output); - } catch (err) { - console.error('[metrics] Failed to collect metrics:', err); - res.status(500).end(); - } -}); - -module.exports = router; diff --git a/src/metrics /oracleMetrics.js b/src/metrics /oracleMetrics.js deleted file mode 100644 index b9c3ec66..00000000 --- a/src/metrics /oracleMetrics.js +++ /dev/null @@ -1,69 +0,0 @@ -'use strict'; - -const client = require('prom-client'); - -// Create a dedicated registry so we don't bleed into any default register -const registry = new client.Registry(); - -// ── Default Node.js process metrics (CPU, memory, event loop lag) ────────── -client.collectDefaultMetrics({ register: registry, prefix: 'stellarflow_' }); - -// ── Custom oracle metrics ─────────────────────────────────────────────────── - -/** - * successful_submissions_total - * Counter β€” incremented each time an oracle round is submitted successfully. - * Label: asset (e.g. "XLM/USD", "BTC/USD") - */ -const successfulSubmissions = new client.Counter({ - name: 'oracle_successful_submissions_total', - help: 'Total number of successful oracle price submissions', - labelNames: ['asset'], - registers: [registry], -}); - -/** - * failed_submissions_total - * Counter β€” incremented on any submission error (RPC error, timeout, etc.). - * Label: asset, reason (e.g. reason="timeout" | "rpc_error" | "validation") - */ -const failedSubmissions = new client.Counter({ - name: 'oracle_failed_submissions_total', - help: 'Total number of failed oracle price submissions', - labelNames: ['asset', 'reason'], - registers: [registry], -}); - -/** - * gas_usage_per_asset - * Histogram β€” records the fee/stroops used per submission so Grafana can - * show p50/p95/p99 percentiles per asset. - * Buckets are tuned for Stellar stroops (1 XLM = 10_000_000 stroops). - */ -const gasUsagePerAsset = new client.Histogram({ - name: 'oracle_gas_usage_per_asset', - help: 'Stellar transaction fee (in stroops) used per oracle submission', - labelNames: ['asset'], - buckets: [100, 500, 1000, 2500, 5000, 10000, 25000, 50000, 100000], - registers: [registry], -}); - -/** - * submission_duration_seconds - * Histogram β€” end-to-end latency of a submission from signing to ledger confirm. - */ -const submissionDuration = new client.Histogram({ - name: 'oracle_submission_duration_seconds', - help: 'End-to-end duration of an oracle submission in seconds', - labelNames: ['asset'], - buckets: [0.1, 0.5, 1, 2, 5, 10, 30], - registers: [registry], -}); - -module.exports = { - registry, - successfulSubmissions, - failedSubmissions, - gasUsagePerAsset, - submissionDuration, -}; diff --git a/src/services/pingService.ts b/src/services/pingService.ts new file mode 100644 index 00000000..cfb58f8a --- /dev/null +++ b/src/services/pingService.ts @@ -0,0 +1,142 @@ +import axios from "axios"; +import logger from "../utils/logger"; +import dotenv from "dotenv"; + +dotenv.config(); + +export class PingService { + private activePool: Set = new Set(); + private pingInterval: NodeJS.Timeout | null = null; + private isRunning: boolean = false; + + constructor() { + // Populate active pool initially with configured servers to avoid cold start issues + const servers = this.remoteServers; + for (const server of servers) { + this.activePool.add(server); + } + } + + /** + * Dynamically retrieve configured remote servers from environment. + * This ensures we always use fresh configuration. + */ + get remoteServers(): string[] { + const remoteServersEnv = process.env.REMOTE_ORACLE_SERVERS || ""; + return remoteServersEnv + .split(",") + .map((url) => url.trim()) + .filter((url) => url.length > 0); + } + + /** + * Proactively ping a single relayer to check responsiveness. + * Relayer must respond to GET /ping within 500ms. + */ + async pingRelayer(url: string): Promise { + const startTime = Date.now(); + try { + const pingUrl = url.endsWith('/') ? `${url}ping` : `${url}/ping`; + const response = await axios.get(pingUrl, { + timeout: 500, + headers: { + "User-Agent": "StellarFlow-Oracle/1.0", + }, + }); + const latency = Date.now() - startTime; + if (response.status >= 200 && response.status < 300 && latency <= 500) { + this.activePool.add(url); + logger.debug(`[PingService] Relayer ${url} is healthy (${latency}ms)`); + return true; + } + } catch (error) { + // network error, timeout, or non‑2xx response + } + this.activePool.delete(url); + logger.warn(`[PingService] Relayer ${url} is unresponsive or slow (>500ms)`); + return false; + } + + /** + * Ping all configured relayers concurrently and update the active pool. + */ + async pingAll(): Promise { + const servers = this.remoteServers; + if (servers.length === 0) { + return; + } + + logger.debug(`[PingService] Proactively pinging ${servers.length} configured relayers...`); + await Promise.all(servers.map((url) => this.pingRelayer(url))); + } + + /** + * Start the periodic background ping monitoring loop. + */ + async start(intervalMs?: number): Promise { + if (this.isRunning) { + logger.warn("[PingService] Service is already running"); + return; + } + + const defaultInterval = parseInt(process.env.RELAYER_PING_INTERVAL_MS || "30000", 10); + const interval = intervalMs ?? (isNaN(defaultInterval) ? 30000 : defaultInterval); + + this.isRunning = true; + logger.info(`[PingService] Started proactive relayer checks every ${interval}ms`); + + // Perform an initial check immediately + await this.pingAll().catch((err) => { + logger.error("[PingService] Initial ping check failed:", err); + }); + + this.pingInterval = setInterval(async () => { + try { + await this.pingAll(); + } catch (err) { + logger.error("[PingService] Error during periodic ping:", err); + } + }, interval); + } + + /** + * Stop the periodic background ping monitoring loop. + */ + stop(): void { + if (this.pingInterval) { + clearInterval(this.pingInterval); + this.pingInterval = null; + } + this.isRunning = false; + logger.info("[PingService] Stopped"); + } + + /** + * Get the current active pool of responsive relayers. + */ + getActivePool(): string[] { + return Array.from(this.activePool); + } + + /** + * Check if a specific relayer is in the active pool. + */ + isRelayerActive(url: string): boolean { + return this.activePool.has(url); + } + + /** + * Get the current status of the service (for stats / monitoring). + */ + getStatus() { + return { + isRunning: this.isRunning, + activePoolSize: this.activePool.size, + activePool: this.getActivePool(), + configuredRelayers: this.remoteServers, + }; + } +} + +// Export singleton instance +export const pingService = new PingService();