Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions .github/workflows/observability-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
name: Observability CI

# Validates the Prometheus alert rules and synthetic canary on every PR/push.
# Fails fast if alerting rules are invalid or unit tests regress.

on:
pull_request:
paths:
- 'monitoring/**'
- 'scripts/canary.mjs'
- '.github/workflows/observability-ci.yml'
push:
branches: [main]

jobs:
# ── promtool: validate + unit-test alert rules ──────────────────────────────
alert-rules:
name: promtool — lint & test alert rules
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- name: Install Prometheus (for promtool)
run: |
PROM_VERSION=2.51.0
curl -fsSL "https://github.com/prometheus/prometheus/releases/download/v${PROM_VERSION}/prometheus-${PROM_VERSION}.linux-amd64.tar.gz" \
| tar xz --strip-components=1 -C /tmp "prometheus-${PROM_VERSION}.linux-amd64/promtool"
sudo mv /tmp/promtool /usr/local/bin/promtool
promtool --version

- name: Validate alert rule syntax
run: promtool check rules monitoring/alerting/alerting_rules.yml

- name: Run alert rule unit tests
run: promtool test rules monitoring/alerting/alerting_rules_test.yml

# ── Canary script: syntax check (no live testnet in CI) ────────────────────
canary-lint:
name: Canary script lint
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- uses: actions/setup-node@v4
with:
node-version: 20

- name: Check canary script syntax
run: node --check scripts/canary.mjs

- name: Dry-run canary (no network, expect fast fail)
run: |
timeout 10 node scripts/canary.mjs || true
env:
CANARY_API_URL: http://localhost:9999 # unreachable → fast fail
CANARY_TIMEOUT_MS: 2000

# ── Backend tests (timeout middleware + rpcPool) ────────────────────────────
backend-reliability:
name: Backend reliability unit tests
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- uses: actions/setup-node@v4
with:
node-version: 20
cache: npm

- run: npm ci

- name: Run backend unit tests
run: npx turbo run test --filter=backend
104 changes: 103 additions & 1 deletion backend/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ import { createVariantRoutes } from './routes/variants.js';
import { createVariantService } from './services/variantService.js';
import { createCohortRoutes } from './routes/cohorts.js';
import { createCohortService } from './services/cohortService.js';
import { requestTimeout } from './middleware/timeout.js';
import { PoolSaturatedError } from './rpcPool.js';

const DEFAULT_PORT = 3001;
const DEFAULT_RATE_LIMIT_WINDOW_MS = 60_000;
Expand All @@ -63,6 +65,7 @@ const DEFAULT_AUTH_LOCKOUT_BASE_LOCKOUT_MS = 60_000;
const DEFAULT_SHORT_CACHE_TTL_MS = 5_000;
const DEFAULT_JSON_BODY_LIMIT = '100kb';
const DEFAULT_RPC_POLL_INTERVAL_MS = 60_000;
const DEFAULT_REQUEST_TIMEOUT_MS = 30_000;
const LEGACY_API_PREFIX = '/api';
const API_V1_PREFIX = '/api/v1';
const CONTRACT_ID_PATTERN = /^C[A-Z2-7]{55}$/;
Expand Down Expand Up @@ -284,7 +287,22 @@ export async function createApp(options = {}) {
routeHits: new Map(),
authFailures: 0,
authLockouts: 0,
// p95 latency histogram — 12 buckets (ms): 50,100,200,500,1000,2000,5000,...
latencyBuckets: [50, 100, 200, 500, 1_000, 2_000, 5_000, 10_000, 30_000, Infinity],
latencyCounts: /** @type {number[]} */ ([]),
latencyTotal: 0,
latencySum: 0,
};
// Initialise bucket counters to 0.
metrics.latencyCounts = metrics.latencyBuckets.map(() => 0);

// Apply global request deadline so every route self-defends against slow
// upstreams. The timeout is configurable via REQUEST_TIMEOUT_MS.
const requestTimeoutMs = normalizePositiveInteger(
options.requestTimeoutMs ?? process.env.REQUEST_TIMEOUT_MS,
DEFAULT_REQUEST_TIMEOUT_MS,
);
app.use(requestTimeout(requestTimeoutMs));

/**
* Compatibility shim: ?api_version=v0 rewrites v1 routes to legacy patterns
Expand Down Expand Up @@ -412,12 +430,23 @@ export async function createApp(options = {}) {
/** @type {import('express').NextFunction} */ next,
) => {
metrics.requestTotal += 1;
const _reqStart = Date.now();
res.on('finish', () => {
const routeKey = `${req.method} ${req.path}`;
metrics.routeHits.set(routeKey, (metrics.routeHits.get(routeKey) ?? 0) + 1);
if (res.statusCode >= 400) {
metrics.requestErrors += 1;
}
// Record request duration into the latency histogram.
const durationMs = Date.now() - _reqStart;
metrics.latencySum += durationMs;
metrics.latencyTotal += 1;
for (let _bi = 0; _bi < metrics.latencyBuckets.length; _bi++) {
if (durationMs <= metrics.latencyBuckets[_bi]) {
metrics.latencyCounts[_bi] += 1;
break;
}
}
});
next();
},
Expand Down Expand Up @@ -574,6 +603,18 @@ export async function createApp(options = {}) {
})
.join('\n');

// Latency histogram — cumulative buckets (le = upper bound in ms).
const latencyBucketLines = metrics.latencyBuckets
.map((le, i) => {
const cumulative = metrics.latencyCounts.slice(0, i + 1).reduce((a, b) => a + b, 0);
const leLabel = le === Infinity ? '+Inf' : String(le);
return `trivela_http_request_duration_ms_bucket{le="${leLabel}"} ${cumulative}`;
})
.join('\n');

// RPC pool saturation metrics.
const poolStatus = rpcPool.getStatus();

const payload = [
'# HELP trivela_requests_total Total HTTP requests handled.',
'# TYPE trivela_requests_total counter',
Expand All @@ -593,6 +634,28 @@ export async function createApp(options = {}) {
'# HELP trivela_route_hits_total Route-level request counts.',
'# TYPE trivela_route_hits_total counter',
routeLines,
// Request latency histogram (issue #650 — p95 latency SLO).
'# HELP trivela_http_request_duration_ms HTTP request duration in milliseconds.',
'# TYPE trivela_http_request_duration_ms histogram',
latencyBucketLines,
`trivela_http_request_duration_ms_count ${metrics.latencyTotal}`,
`trivela_http_request_duration_ms_sum ${metrics.latencySum}`,
// RPC pool saturation (issue #650 — pool saturation safety).
'# HELP trivela_rpc_pool_in_use RPC pool slots currently in use.',
'# TYPE trivela_rpc_pool_in_use gauge',
`trivela_rpc_pool_in_use ${poolStatus.in_use}`,
'# HELP trivela_rpc_pool_idle RPC pool slots immediately available.',
'# TYPE trivela_rpc_pool_idle gauge',
`trivela_rpc_pool_idle ${poolStatus.idle}`,
'# HELP trivela_rpc_pool_waiting Callers queued waiting for a pool slot.',
'# TYPE trivela_rpc_pool_waiting gauge',
`trivela_rpc_pool_waiting ${poolStatus.waiting}`,
'# HELP trivela_rpc_pool_healthy Healthy RPC endpoints in the pool.',
'# TYPE trivela_rpc_pool_healthy gauge',
`trivela_rpc_pool_healthy ${poolStatus.healthy}`,
'# HELP trivela_rpc_pool_unhealthy Unhealthy RPC endpoints in the pool.',
'# TYPE trivela_rpc_pool_unhealthy gauge',
`trivela_rpc_pool_unhealthy ${poolStatus.unhealthy}`,
]
.filter(Boolean)
.join('\n');
Expand Down Expand Up @@ -1557,9 +1620,48 @@ export async function startServer(options = {}) {
const app = await createApp(options);
const port = options.port ?? process.env.PORT ?? DEFAULT_PORT;

return app.listen(port, () => {
const server = app.listen(port, () => {
log.info({ port }, 'Trivela API running');
});

// ── Graceful shutdown (issue #650) ─────────────────────────────────────────
// On SIGTERM / SIGINT:
// 1. Stop accepting new connections (server.close).
// 2. Allow in-flight HTTP requests to finish for up to SHUTDOWN_GRACE_MS.
// 3. Send "Connection: close / will-reconnect" hint to open SSE/WS streams.
// 4. Flush OTel spans.
// 5. Exit 0 once everything is drained (or force-exit after the grace window).
const SHUTDOWN_GRACE_MS = normalizePositiveInteger(process.env.SHUTDOWN_GRACE_MS, 15_000);

let shuttingDown = false;

async function gracefulShutdown(signal) {
if (shuttingDown) return;
shuttingDown = true;
log.info({ signal, graceMs: SHUTDOWN_GRACE_MS }, 'graceful shutdown started');

// Force exit after the grace window so a stuck handler never blocks a deploy.
const forceTimer = setTimeout(() => {
log.error('graceful shutdown timed out — forcing exit');
process.exit(1);
}, SHUTDOWN_GRACE_MS);
if (typeof forceTimer.unref === 'function') forceTimer.unref();

// Stop accepting new connections; drain in-flight HTTP requests.
await new Promise((resolve) => server.close(resolve));

// Flush OTel exporter.
await shutdownTracing().catch((err) => log.warn({ err }, 'OTel shutdown warning'));

log.info('graceful shutdown complete');
clearTimeout(forceTimer);
process.exit(0);
}

process.once('SIGTERM', () => gracefulShutdown('SIGTERM'));
process.once('SIGINT', () => gracefulShutdown('SIGINT'));

return server;
}

const isExecutedDirectly =
Expand Down
16 changes: 16 additions & 0 deletions backend/src/middleware/errorHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,28 @@ const isProd = process.env.NODE_ENV === 'production';
* in production. Sanitizes error details to prevent log injection and
* sensitive data leakage.
*
* Special cases:
* - PoolSaturatedError (code POOL_SATURATED) → 503 with typed code.
*
* @param {unknown} err
* @param {import('express').Request} _req
* @param {import('express').Response} res
* @param {import('express').NextFunction} _next
*/
export default function errorHandler(err, _req, res, _next) {
// Typed 503 for RPC pool saturation (issue #650 — pool saturation safety).
if (
err != null &&
typeof err === 'object' &&
/** @type {any} */ (err).code === 'POOL_SATURATED'
) {
log.warn({ err: { message: /** @type {any} */ (err).message } }, 'RPC pool saturated');
if (!res.headersSent) {
res.status(503).json({ error: 'Service temporarily unavailable', code: 'POOL_SATURATED' });
}
return;
}

const statusCode =
err != null &&
typeof err === 'object' &&
Expand Down
60 changes: 60 additions & 0 deletions backend/src/middleware/timeout.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Per-route request deadline middleware (issue #650 — request deadlines).
*
* Attaches an AbortSignal to `req.signal` that fires after `ms` milliseconds.
* When the deadline elapses the signal is aborted, the response is flushed
* with 504 Gateway Timeout, and subsequent handler writes are suppressed.
*
* When the client disconnects before the deadline the signal is also aborted
* so DB/RPC work queued downstream can short-circuit.
*
* Usage (per-route):
* import { requestTimeout } from './middleware/timeout.js';
* app.get('/expensive', requestTimeout(10_000), handler);
*
* Usage (global default — applied in index.js):
* app.use(requestTimeout(Number(process.env.REQUEST_TIMEOUT_MS ?? 30_000)));
*
* Downstream handlers that do async work should check `req.signal.aborted`
* before each expensive step, or pass req.signal to fetch() / pool.acquire().
*/

/**
* @param {number} ms Deadline in milliseconds.
* @returns {import('express').RequestHandler}
*/
export function requestTimeout(ms) {
return function timeoutMiddleware(req, res, next) {
const ac = new AbortController();

// Wire client-disconnect → abort so downstream work cancels early.
function onClose() {
if (!ac.signal.aborted) ac.abort(new Error('client disconnected'));
}
res.on('close', onClose);

const timer = setTimeout(() => {
if (res.headersSent) return;
ac.abort(new Error(`request timed out after ${ms}ms`));
res
.status(504)
.set('Content-Type', 'application/json')
.end(JSON.stringify({ error: 'Request timeout', code: 'REQUEST_TIMEOUT' }));
}, ms);

// Don't hold the event loop open past the response.
if (typeof timer.unref === 'function') timer.unref();

// Attach signal so downstream middleware/handlers can observe it.
req.signal = ac.signal;

res.on('finish', () => {
clearTimeout(timer);
res.off('close', onClose);
// Abort so any still-pending downstream fetch/acquire calls cancel.
if (!ac.signal.aborted) ac.abort(new Error('response finished'));
});

next();
};
}
Loading
Loading