diff --git a/.github/workflows/k8s-validate.yml b/.github/workflows/k8s-validate.yml new file mode 100644 index 0000000..3247695 --- /dev/null +++ b/.github/workflows/k8s-validate.yml @@ -0,0 +1,27 @@ +name: Validate Kubernetes manifests + +on: + push: + paths: + - 'deploy/k8s/**' + - '.github/workflows/k8s-validate.yml' + pull_request: + paths: + - 'deploy/k8s/**' + - '.github/workflows/k8s-validate.yml' + +jobs: + kubeconform: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install kubeconform + run: | + curl -sSL https://github.com/yannh/kubeconform/releases/download/v0.6.7/kubeconform-linux-amd64.tar.gz \ + | tar xz + sudo mv kubeconform /usr/local/bin/ + + - name: Validate manifests + run: | + kubeconform -summary -ignore-missing-schemas deploy/k8s/*.yaml diff --git a/deploy/k8s/configmap.yaml b/deploy/k8s/configmap.yaml new file mode 100644 index 0000000..72d4d1e --- /dev/null +++ b/deploy/k8s/configmap.yaml @@ -0,0 +1,34 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: neurowealth-config + namespace: neurowealth + labels: + app.kubernetes.io/name: neurowealth-backend +data: + NODE_ENV: "production" + PORT: "3001" + LOG_LEVEL: "info" + TRUST_PROXY: "1" + # Stellar network configuration (non-secret) + STELLAR_NETWORK: "mainnet" + STELLAR_RPC_URL: "https://soroban-rpc.mainnet.stellar.org" + # Contract addresses — override per environment + VAULT_CONTRACT_ID: "REPLACE_WITH_VAULT_CONTRACT_ID" + USDC_TOKEN_ADDRESS: "REPLACE_WITH_USDC_TOKEN_ADDRESS" + # CORS — must be explicit in production + CORS_ORIGINS: "https://app.neurowealth.io" + ALLOWED_ORIGINS: "https://app.neurowealth.io" + # Rate limits (tune per environment) + RATE_LIMIT_MAX: "100" + RATE_LIMIT_WINDOW_MS: "900000" + AUTH_RATE_LIMIT_MAX: "20" + ADMIN_RATE_LIMIT_MAX: "10" + # DLQ alerting + DLQ_ALERT_THRESHOLD: "50" + DLQ_ALERT_COOLDOWN_MS: "900000" + # Agent thresholds + REBALANCE_THRESHOLD_PERCENT: "0.5" + MAX_GAS_PERCENT: "0.1" + # Admin dashboard link for alerts + ADMIN_DASHBOARD_URL: "https://admin.neurowealth.io" diff --git a/deploy/k8s/deployment.yaml b/deploy/k8s/deployment.yaml new file mode 100644 index 0000000..b348ffd --- /dev/null +++ b/deploy/k8s/deployment.yaml @@ -0,0 +1,92 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: neurowealth-backend + namespace: neurowealth + labels: + app.kubernetes.io/name: neurowealth-backend + app.kubernetes.io/component: api +spec: + # IMPORTANT: The monolith starts an in-process Stellar event listener and + # agent cron loop. Multiple replicas will duplicate event processing and + # scheduled jobs. Keep replicas at 1 until worker/API split is implemented. + # See docs/DEPLOYMENT.md for scaling guidance. + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/name: neurowealth-backend + strategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 1 + maxUnavailable: 0 + template: + metadata: + labels: + app.kubernetes.io/name: neurowealth-backend + app.kubernetes.io/component: api + spec: + serviceAccountName: neurowealth-backend + securityContext: + runAsNonRoot: true + fsGroup: 1000 + terminationGracePeriodSeconds: 35 + initContainers: + - name: migrate + image: neurowealth-backend:latest + imagePullPolicy: IfNotPresent + command: ["npx", "prisma", "migrate", "deploy"] + envFrom: + - secretRef: + name: neurowealth-secrets + resources: + requests: + cpu: 50m + memory: 128Mi + limits: + cpu: 200m + memory: 256Mi + containers: + - name: api + image: neurowealth-backend:latest + imagePullPolicy: IfNotPresent + # Do NOT run migrations in the app container — initContainer handles that. + command: ["node", "dist/index.js"] + ports: + - name: http + containerPort: 3001 + protocol: TCP + envFrom: + - configMapRef: + name: neurowealth-config + - secretRef: + name: neurowealth-secrets + livenessProbe: + httpGet: + path: /health/live + port: http + initialDelaySeconds: 10 + periodSeconds: 15 + timeoutSeconds: 5 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /health/ready + port: http + initialDelaySeconds: 15 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 500m + memory: 512Mi + securityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: false + capabilities: + drop: + - ALL diff --git a/deploy/k8s/hpa.yaml b/deploy/k8s/hpa.yaml new file mode 100644 index 0000000..e735706 --- /dev/null +++ b/deploy/k8s/hpa.yaml @@ -0,0 +1,24 @@ +# Horizontal Pod Autoscaler — disabled by default because the monolith +# requires a single active consumer for the Stellar event listener and agent cron. +# Enable only after splitting API and worker deployments (see docs/DEPLOYMENT.md). +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: neurowealth-backend + namespace: neurowealth + labels: + app.kubernetes.io/name: neurowealth-backend +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: neurowealth-backend + minReplicas: 1 + maxReplicas: 1 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 diff --git a/deploy/k8s/ingress.yaml b/deploy/k8s/ingress.yaml new file mode 100644 index 0000000..182eba4 --- /dev/null +++ b/deploy/k8s/ingress.yaml @@ -0,0 +1,31 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: neurowealth-backend + namespace: neurowealth + labels: + app.kubernetes.io/name: neurowealth-backend + annotations: + # TLS — adjust for your ingress controller (nginx, ALB, etc.) + cert-manager.io/cluster-issuer: "letsencrypt-prod" + nginx.ingress.kubernetes.io/ssl-redirect: "true" + nginx.ingress.kubernetes.io/proxy-body-size: "100k" + # Optional WAF — enable when your provider supports it + # nginx.ingress.kubernetes.io/enable-modsecurity: "true" +spec: + ingressClassName: nginx + tls: + - hosts: + - api.neurowealth.io + secretName: neurowealth-api-tls + rules: + - host: api.neurowealth.io + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: neurowealth-backend + port: + number: 3001 diff --git a/deploy/k8s/migration-job.yaml b/deploy/k8s/migration-job.yaml new file mode 100644 index 0000000..4b6a77a --- /dev/null +++ b/deploy/k8s/migration-job.yaml @@ -0,0 +1,39 @@ +# Pre-deploy migration job — run before rolling out a new image version. +# Usage: +# kubectl apply -f deploy/k8s/migration-job.yaml +# kubectl wait --for=condition=complete job/neurowealth-migrate -n neurowealth --timeout=300s +apiVersion: batch/v1 +kind: Job +metadata: + name: neurowealth-migrate + namespace: neurowealth + labels: + app.kubernetes.io/name: neurowealth-backend + app.kubernetes.io/component: migration +spec: + ttlSecondsAfterFinished: 3600 + backoffLimit: 2 + template: + metadata: + labels: + app.kubernetes.io/name: neurowealth-backend + app.kubernetes.io/component: migration + spec: + restartPolicy: Never + securityContext: + runAsNonRoot: true + containers: + - name: migrate + image: neurowealth-backend:latest + imagePullPolicy: IfNotPresent + command: ["npx", "prisma", "migrate", "deploy"] + envFrom: + - secretRef: + name: neurowealth-secrets + resources: + requests: + cpu: 50m + memory: 128Mi + limits: + cpu: 200m + memory: 256Mi diff --git a/deploy/k8s/namespace.yaml b/deploy/k8s/namespace.yaml new file mode 100644 index 0000000..dc839fc --- /dev/null +++ b/deploy/k8s/namespace.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: neurowealth + labels: + app.kubernetes.io/name: neurowealth + app.kubernetes.io/part-of: neurowealth diff --git a/deploy/k8s/secret.yaml.example b/deploy/k8s/secret.yaml.example new file mode 100644 index 0000000..ef857a5 --- /dev/null +++ b/deploy/k8s/secret.yaml.example @@ -0,0 +1,29 @@ +# Template only — NEVER commit real secret values. +# Create the live Secret with your secrets manager or: +# kubectl create secret generic neurowealth-secrets \ +# --namespace=neurowealth \ +# --from-literal=DATABASE_URL='postgresql://...' \ +# --from-literal=JWT_SEED='...' \ +# ... +apiVersion: v1 +kind: Secret +metadata: + name: neurowealth-secrets + namespace: neurowealth + labels: + app.kubernetes.io/name: neurowealth-backend +type: Opaque +stringData: + DATABASE_URL: "postgresql://USER:PASSWORD@HOST:5432/neurowealth" + JWT_SEED: "REPLACE_WITH_64_HEX_CHARS" + WALLET_ENCRYPTION_KEY: "REPLACE_WITH_32_BYTE_HEX" + STELLAR_AGENT_SECRET_KEY: "REPLACE_WITH_STELLAR_SECRET_KEY" + ANTHROPIC_API_KEY: "REPLACE_WITH_ANTHROPIC_KEY" + ADMIN_API_TOKEN: "REPLACE_WITH_ADMIN_TOKEN" + TWILIO_AUTH_TOKEN: "REPLACE_WITH_TWILIO_AUTH_TOKEN" + # Optional — include when using WhatsApp integration + TWILIO_ACCOUNT_SID: "" + # Optional alerting / internal service auth + INTERNAL_SERVICE_TOKEN: "" + SLACK_WEBHOOK_URL: "" + PAGERDUTY_ROUTING_KEY: "" diff --git a/deploy/k8s/service.yaml b/deploy/k8s/service.yaml new file mode 100644 index 0000000..e4a7209 --- /dev/null +++ b/deploy/k8s/service.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: Service +metadata: + name: neurowealth-backend + namespace: neurowealth + labels: + app.kubernetes.io/name: neurowealth-backend +spec: + type: ClusterIP + selector: + app.kubernetes.io/name: neurowealth-backend + ports: + - name: http + port: 3001 + targetPort: http + protocol: TCP diff --git a/deploy/k8s/serviceaccount.yaml b/deploy/k8s/serviceaccount.yaml new file mode 100644 index 0000000..1e2aed6 --- /dev/null +++ b/deploy/k8s/serviceaccount.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: neurowealth-backend + namespace: neurowealth + labels: + app.kubernetes.io/name: neurowealth-backend diff --git a/docs/DEPLOYMENT.md b/docs/DEPLOYMENT.md new file mode 100644 index 0000000..37f9df0 --- /dev/null +++ b/docs/DEPLOYMENT.md @@ -0,0 +1,236 @@ +# NeuroWealth — Production Deployment Runbook + +End-to-end guide for deploying the NeuroWealth backend to Kubernetes with safe migrations, health checks, and secrets management. + +For Docker image build details see also `docs/PRODUCTION_DEPLOYMENT.md`. + +--- + +## Prerequisites + +| Requirement | Notes | +|-------------|-------| +| Kubernetes 1.25+ | Any managed cluster (EKS, GKE, AKS) | +| PostgreSQL 14+ | Managed database (RDS, Cloud SQL, etc.) — **not** the in-repo `docker-compose.yml` Postgres | +| Container registry | Push the image built from the root `Dockerfile` | +| Stellar Soroban RPC | `STELLAR_RPC_URL` or comma-separated `STELLAR_RPC_URLS` for failover | +| TLS certificate | cert-manager, cloud LB, or manual `Secret` for ingress | +| Secrets store | External Secrets Operator, Sealed Secrets, or `kubectl create secret` | + +--- + +## Manifest layout + +All manifests live under `deploy/k8s/`: + +| File | Purpose | +|------|---------| +| `namespace.yaml` | `neurowealth` namespace | +| `configmap.yaml` | Non-secret environment (CORS, rate limits, RPC URLs, contract IDs) | +| `secret.yaml.example` | **Template only** — copy values into a real Secret; never commit plaintext | +| `serviceaccount.yaml` | Pod service account | +| `deployment.yaml` | App Deployment with initContainer migration + probes | +| `service.yaml` | ClusterIP on port 3001 | +| `ingress.yaml` | TLS termination (adjust host / ingress class) | +| `migration-job.yaml` | Standalone pre-deploy migration Job | +| `hpa.yaml` | HPA pinned to 1 replica (see scaling constraints) | + +--- + +## Environment matrix + +| Setting | Staging | Production | +|---------|---------|------------| +| `NODE_ENV` | `staging` | `production` | +| `STELLAR_NETWORK` | `testnet` | `mainnet` | +| `STELLAR_RPC_URL` | Testnet Soroban RPC | Mainnet Soroban RPC | +| `CORS_ORIGINS` | Staging frontend URL | Production frontend URL | +| `LOG_LEVEL` | `debug` | `info` | +| `replicas` | `1` | `1` (until worker split) | +| Secrets | Staging Secret / external store | Production Secret / external store | + +Override `configmap.yaml` values per environment (separate ConfigMaps or Kustomize overlays recommended). + +--- + +## Secrets + +Create the live Secret from the template — **do not** apply `secret.yaml.example` with real values: + +```bash +kubectl create namespace neurowealth + +kubectl create secret generic neurowealth-secrets \ + --namespace=neurowealth \ + --from-literal=DATABASE_URL='postgresql://...' \ + --from-literal=JWT_SEED='...' \ + --from-literal=WALLET_ENCRYPTION_KEY='...' \ + --from-literal=STELLAR_AGENT_SECRET_KEY='...' \ + --from-literal=ANTHROPIC_API_KEY='...' \ + --from-literal=ADMIN_API_TOKEN='...' \ + --from-literal=TWILIO_AUTH_TOKEN='...' +``` + +Required keys match `src/config/env.ts` startup validation. Optional keys: `TWILIO_ACCOUNT_SID`, `INTERNAL_SERVICE_TOKEN`, `SLACK_WEBHOOK_URL`, `PAGERDUTY_ROUTING_KEY`. + +--- + +## Build and push image + +```bash +docker build -t /neurowealth-backend: . +docker push /neurowealth-backend: +``` + +Update the `image:` field in `deployment.yaml` and `migration-job.yaml` to your registry tag. + +**Migration strategy:** The default `Dockerfile` CMD runs `prisma migrate deploy && node dist/index.js`. In Kubernetes, the Deployment **overrides** the command to `node dist/index.js` only. Migrations run in the **initContainer** (or standalone Job) so a failed migration blocks the rollout instead of leaving a half-started pod serving traffic. + +--- + +## Rollout procedure + +### 1. Migrate + +**Option A — initContainer (default in `deployment.yaml`):** migrations run automatically before each pod starts. + +**Option B — standalone Job (recommended for large migrations):** + +```bash +# Update image tag in migration-job.yaml, then: +kubectl apply -f deploy/k8s/migration-job.yaml +kubectl wait --for=condition=complete job/neurowealth-migrate -n neurowealth --timeout=300s +``` + +### 2. Deploy + +```bash +kubectl apply -f deploy/k8s/namespace.yaml +kubectl apply -f deploy/k8s/serviceaccount.yaml +kubectl apply -f deploy/k8s/configmap.yaml +# secrets already created above +kubectl apply -f deploy/k8s/deployment.yaml +kubectl apply -f deploy/k8s/service.yaml +kubectl apply -f deploy/k8s/ingress.yaml +``` + +### 3. Verify readiness + +```bash +kubectl rollout status deployment/neurowealth-backend -n neurowealth + +# Port-forward for local check: +kubectl port-forward svc/neurowealth-backend 3001:3001 -n neurowealth + +curl -s http://localhost:3001/health/live +curl -s http://localhost:3001/health/ready +``` + +Readiness returns **200** only when database, event listener, and agent loop are all healthy. During rollout or shutdown it returns **503**. + +### 4. Smoke test + +```bash +curl -s -o /dev/null -w "%{http_code}" https://api.neurowealth.io/health +``` + +--- + +## Health probes + +Configured in `deployment.yaml` to match existing endpoints in `src/index.ts`: + +| Probe | Path | Purpose | +|-------|------|---------| +| Liveness | `GET /health/live` | Process is running — always 200 | +| Readiness | `GET /health/ready` | DB + event listener + agent loop ready | + +`terminationGracePeriodSeconds: 35` — the app drains in-flight requests for up to 30 s on `SIGTERM` before stopping background services. + +--- + +## Rollback procedure + +```bash +# Roll back to previous ReplicaSet +kubectl rollout undo deployment/neurowealth-backend -n neurowealth + +# Or pin a known-good image: +kubectl set image deployment/neurowealth-backend \ + api=/neurowealth-backend: \ + -n neurowealth + +kubectl rollout status deployment/neurowealth-backend -n neurowealth +``` + +**Database rollback:** Prisma migrations are forward-only. If a migration introduced a breaking schema change, restore from a database backup or deploy a hotfix migration — do not rely on `migrate reset` in production. + +--- + +## Scaling guidance + +### Current constraint: single active consumer + +The monolith starts three subsystems in every pod (`src/index.ts`): + +1. HTTP API +2. **Stellar event listener** — polls Soroban RPC every 5 s, persists cursor to `event_cursors` +3. **Agent cron loop** — hourly rebalance, snapshots, daily protocol scan + +There is **no leader election**. Running multiple replicas will: + +- Duplicate event processing (mitigated by `processed_events` idempotency, but wastes RPC quota and risks race conditions) +- Run duplicate cron jobs (rebalance checks, snapshots) + +**Recommendation:** keep `replicas: 1` until the architecture is split. + +### Future scaling path + +1. Add feature flags: `ENABLE_EVENT_LISTENER`, `ENABLE_AGENT_LOOP` +2. Split deployments: + - `neurowealth-api` — stateless HTTP, `replicas: N`, HPA enabled + - `neurowealth-worker` — listener + agent, `replicas: 1` +3. Optional: K8s Lease or Postgres advisory lock for worker leader election before scaling workers beyond 1 + +### HPA + +`deploy/k8s/hpa.yaml` is pinned to `minReplicas: 1` / `maxReplicas: 1`. Re-enable scaling only after the worker/API split. + +--- + +## Observability + +- **Metrics:** `GET /metrics` on port 3001 (Prometheus) +- **Request tracing:** clients may send `X-Request-ID` or `X-Correlation-ID`; the server echoes `X-Request-ID` on every response and includes `correlationId` in structured logs +- **DLQ:** monitor `dead_letter_events` count and `event_cursors.lastProcessedLedger` lag — see `docs/OBSERVABILITY.md` and `docs/RUNBOOK.md` + +--- + +## CI validation + +Manifests are validated in CI with `kubeconform` (see `.github/workflows/k8s-validate.yml`). Run locally: + +```bash +kubeconform -summary deploy/k8s/*.yaml +``` + +--- + +## Troubleshooting + +| Symptom | Check | +|---------|-------| +| Pod `CrashLoopBackOff` | `kubectl logs deployment/neurowealth-backend -n neurowealth`; verify all required secrets | +| Readiness 503 | `kubectl logs` — DB connection, RPC URL, or background service startup failure | +| Migration initContainer failed | `kubectl logs -c migrate -n neurowealth` | +| Events not processing | `SELECT * FROM event_cursors;` — cursor lag; ensure only one replica runs the listener | +| Duplicate rebalances | Confirm `replicas: 1`; check agent cron is not running on multiple pods | + +--- + +## Related docs + +- `docs/PRODUCTION_DEPLOYMENT.md` — Docker build, secret rotation +- `docs/RUNBOOK.md` — DLQ replay, cursor management +- `docs/OBSERVABILITY.md` — alerting and metrics +- `readme.md` — API request tracing headers diff --git a/readme.md b/readme.md index d7a58a1..32ab87c 100644 --- a/readme.md +++ b/readme.md @@ -109,7 +109,24 @@ Public unauthenticated read endpoints (`/api/protocols/*`, `/api/vault/state`, ` **Bypass (trusted services only):** set `TRUSTED_IPS` to a comma-separated allowlist of IPs, or send the shared secret in the `X-Internal-Token` header (`INTERNAL_SERVICE_TOKEN`). Mount order matters: the bypass middleware runs before limiters in `src/index.ts`. -For production secret handling, migrations, and rollback steps see `docs/DEPLOYMENT_PRODUCTION.md`. +For production secret handling, migrations, and rollback steps see `docs/DEPLOYMENT.md` and `docs/DEPLOYMENT_PRODUCTION.md`. + +Request tracing +--------------- +Every API response includes an `X-Request-ID` header for correlating logs across HTTP handlers, Stellar event processing, DLQ entries, and agent actions. + +- **Send your own ID:** include `X-Request-ID` or `X-Correlation-ID` on any request (alphanumeric, hyphen, underscore; max 128 characters). Invalid values are replaced with a server-generated UUID. +- **Response header:** `X-Request-ID` is always returned on success and error responses. +- **Error JSON:** 500 responses include `"requestId": ""` alongside the error message. +- **Logs:** structured log entries include `correlationId` when a request or background job context is active. + +Example: + +```bash +curl -v -H "X-Request-ID: my-deposit-attempt-001" \ + https://api.neurowealth.io/api/deposit ... +# Response header: X-Request-ID: my-deposit-attempt-001 +``` Testing ------- diff --git a/src/agent/loop.ts b/src/agent/loop.ts index bd9fb78..8d3ec25 100644 --- a/src/agent/loop.ts +++ b/src/agent/loop.ts @@ -4,6 +4,7 @@ import cron, { ScheduledTask } from 'node-cron'; import { logger } from '../utils/logger'; +import { generateCorrelationId, runWithCorrelationIdAsync } from '../utils/correlation'; import { scanAllProtocols } from './scanner'; import { executeRebalanceIfNeeded, getThresholds, logAgentAction } from './router'; import { captureAllUserBalances, cleanupOldSnapshots } from './snapshotter'; @@ -64,11 +65,13 @@ function getNextCheckTime(): Date { * Main rebalance check job - runs every hour at :00 */ async function rebalanceCheckJob(): Promise { + const correlationId = generateCorrelationId(); + return runWithCorrelationIdAsync(correlationId, async () => { const jobName = 'Hourly Rebalance Check'; const startTime = Date.now(); try { - logger.info(`${jobName} started`); + logger.info(`${jobName} started`, { correlationId }); // Update heartbeat updateAgentHeartbeat(); @@ -125,9 +128,7 @@ async function rebalanceCheckJob(): Promise { const duration = Date.now() - startTime; await logAgentAction('ANALYZE', 'SUCCESS', { - positionsChecked: positions.length, - rebalancesTriggered, - duration, + input: { correlationId, positionsChecked: positions.length, rebalancesTriggered, duration }, }); // Record Prometheus metrics @@ -156,20 +157,24 @@ async function rebalanceCheckJob(): Promise { recordDbOperation('rebalance_check', duration / 1000); await logAgentAction('ANALYZE', 'FAILED', { + input: { correlationId }, error: errorMessage, }); } + }); } /** * Snapshot job - runs every hour at :30 */ async function snapshotJob(): Promise { + const correlationId = generateCorrelationId(); + return runWithCorrelationIdAsync(correlationId, async () => { const jobName = 'Hourly Balance Snapshot'; const startTime = Date.now(); try { - logger.info(`${jobName} started`); + logger.info(`${jobName} started`, { correlationId }); // Update heartbeat updateAgentHeartbeat(); @@ -204,6 +209,7 @@ async function snapshotJob(): Promise { // Record Prometheus metrics recordDbOperation('snapshot_job', duration / 1000); } + }); } /** @@ -243,18 +249,30 @@ export async function startAgentLoop(): Promise { // Daily protocol scan at 2 AM const scanJob = cron.schedule('0 2 * * *', async () => { + const correlationId = generateCorrelationId(); + return runWithCorrelationIdAsync(correlationId, async () => { try { - logger.info('Daily protocol scan started'); + logger.info('Daily protocol scan started', { correlationId }); updateAgentHeartbeat(); const protocols = await scanAllProtocols(); + await logAgentAction('SCAN', 'SUCCESS', { + input: { correlationId, protocolsScanned: protocols.length }, + }); logger.info('Daily protocol scan complete', { + correlationId, protocolsScanned: protocols.length, }); } catch (error) { logger.error('Daily protocol scan failed', { + correlationId, + error: error instanceof Error ? error.message : 'Unknown error', + }); + await logAgentAction('SCAN', 'FAILED', { + input: { correlationId }, error: error instanceof Error ? error.message : 'Unknown error', }); } + }); }); cronJobs.push(scanJob); logger.info('✓ Daily protocol scan scheduled: Daily at 2 AM'); diff --git a/src/agent/router.ts b/src/agent/router.ts index 6c910c2..7ce9034 100644 --- a/src/agent/router.ts +++ b/src/agent/router.ts @@ -3,6 +3,7 @@ */ import { logger } from '../utils/logger'; +import { getCorrelationId } from '../utils/correlation'; import { ProtocolComparison, RebalanceDetails, RebalanceThresholds } from './types'; import { scanAllProtocols, getCurrentOnChainApy } from './scanner'; import { triggerRebalance as submitRebalance } from '../stellar/contract'; @@ -312,6 +313,15 @@ export async function logAgentAction( userId?: string, positionId?: string, ): Promise { + const correlationId = getCorrelationId(); + const inputWithCorrelation = + data?.input || correlationId + ? { + ...(typeof data?.input === 'object' && data.input !== null ? data.input : {}), + ...(correlationId ? { correlationId } : {}), + } + : undefined; + try { await db.agentLog.create({ data: { @@ -319,7 +329,7 @@ export async function logAgentAction( positionId: positionId ?? null, action: action as any, status: status as any, - inputData: data?.input ? JSON.stringify(data.input) : undefined, + inputData: inputWithCorrelation ? JSON.stringify(inputWithCorrelation) : data?.input ? JSON.stringify(data.input) : undefined, outputData: data?.output ? JSON.stringify(data.output) : undefined, reasoning: data?.reasoning as string | undefined, errorMessage: data?.error as string | undefined, diff --git a/src/controllers/transaction-controller.ts b/src/controllers/transaction-controller.ts index ca7e165..11c29ef 100644 --- a/src/controllers/transaction-controller.ts +++ b/src/controllers/transaction-controller.ts @@ -3,6 +3,7 @@ import db from '../db' import { depositForUser, withdrawForUser } from '../stellar/contract' import { formatDepositReply, formatWithdrawReply } from '../whatsapp/formatters' import { sendNotFound, sendConflict, sendUnauthorized } from '../utils/errors' +import { logger } from '../utils/logger' export async function processOnChainTransaction( req: Request, @@ -24,6 +25,15 @@ export async function processOnChainTransaction( } const onChainFn = type === 'DEPOSIT' ? depositForUser : withdrawForUser + + logger.info('Submitting on-chain transaction', { + correlationId: req.correlationId, + type, + userId, + amount, + assetSymbol, + }) + const onChainTransaction = await onChainFn( userId, req.auth!.walletAddress, @@ -31,6 +41,14 @@ export async function processOnChainTransaction( assetSymbol ) + logger.info('On-chain transaction completed', { + correlationId: req.correlationId, + type, + userId, + txHash: onChainTransaction.hash, + status: onChainTransaction.status, + }) + const transactionStatus = onChainTransaction.status === 'success' ? 'CONFIRMED' : 'FAILED' const existing = await db.transaction.findUnique({ diff --git a/src/index.ts b/src/index.ts index ba07f80..26c87c8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,6 +2,7 @@ import { type Server } from 'node:http' import express from 'express' import { config } from './config/env' import { errorHandler } from './middleware/errorHandler' +import { correlationIdMiddleware } from './middleware/correlationId' import { requestLogger } from './middleware/logger' import { rateLimiter, authRateLimiter, adminRateLimiter, internalRateLimiter, webhookRateLimiter, trustedIpBypass } from './middleware/rateLimiter' import { configureTrustProxy, securityHeaders } from './middleware/security' @@ -67,6 +68,9 @@ app.use(corsMiddleware) app.use(jsonBodyParser) app.use(urlencodedBodyParser) +// Request correlation ID — must run before requestLogger +app.use(correlationIdMiddleware) + // Logging + rate limiting app.use(requestLogger) // Trusted-IP / service-token bypass must run before any rate limiter diff --git a/src/middleware/correlationId.ts b/src/middleware/correlationId.ts new file mode 100644 index 0000000..9d1c739 --- /dev/null +++ b/src/middleware/correlationId.ts @@ -0,0 +1,22 @@ +import { Request, Response, NextFunction } from 'express' +import { resolveCorrelationId, runWithCorrelationId } from '../utils/correlation' + +export const REQUEST_ID_HEADER = 'X-Request-ID' + +/** + * Assigns a request-scoped correlation ID from incoming headers or a new UUID. + * Propagates the ID through AsyncLocalStorage for downstream logging. + */ +export function correlationIdMiddleware( + req: Request, + res: Response, + next: NextFunction +): void { + const correlationId = resolveCorrelationId(req.headers as Record) + + req.correlationId = correlationId + res.locals.correlationId = correlationId + res.setHeader(REQUEST_ID_HEADER, correlationId) + + runWithCorrelationId(correlationId, () => next()) +} diff --git a/src/middleware/corsandbody.ts b/src/middleware/corsandbody.ts index d3b6d4f..98dbb34 100644 --- a/src/middleware/corsandbody.ts +++ b/src/middleware/corsandbody.ts @@ -62,7 +62,7 @@ function buildCorsOptions(): CorsOptions { // Standard safe headers; expand as your API needs grow methods: ['GET', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], - allowedHeaders: ['Content-Type', 'Authorization', 'X-Admin-Token', 'X-Request-ID'], + allowedHeaders: ['Content-Type', 'Authorization', 'X-Admin-Token', 'X-Request-ID', 'X-Correlation-ID'], exposedHeaders: ['X-Request-ID'], credentials: true, // Pre-flight cache: 2 hours in production, no cache in dev diff --git a/src/middleware/errorHandler.ts b/src/middleware/errorHandler.ts index 95f43e8..ddbfcb3 100644 --- a/src/middleware/errorHandler.ts +++ b/src/middleware/errorHandler.ts @@ -7,14 +7,18 @@ export function errorHandler( res: Response, next: NextFunction ) { + const requestId = req.correlationId + logger.error(`Unhandled error: ${err.message}`, { + correlationId: requestId, stack: err.stack, path: req.path, - method: req.method + method: req.method, }) res.status(500).json({ error: 'Internal server error', - message: process.env.NODE_ENV === 'development' ? err.message : undefined + requestId, + message: process.env.NODE_ENV === 'development' ? err.message : undefined, }) } \ No newline at end of file diff --git a/src/middleware/logger.ts b/src/middleware/logger.ts index d4bbcf1..8d24a60 100644 --- a/src/middleware/logger.ts +++ b/src/middleware/logger.ts @@ -7,9 +7,10 @@ export function requestLogger(req: Request, res: Response, next: NextFunction) { res.on('finish', () => { const duration = Date.now() - start logger.info(`${req.method} ${req.path}`, { + correlationId: req.correlationId, status: res.statusCode, duration: `${duration}ms`, - ip: req.ip + ip: req.ip, }) }) diff --git a/src/stellar/dlq.ts b/src/stellar/dlq.ts index 93039a4..f33d1c3 100644 --- a/src/stellar/dlq.ts +++ b/src/stellar/dlq.ts @@ -15,6 +15,7 @@ import db from '../db' import { updateDlqSize } from '../utils/metrics' import { config } from '../config' import { alertingService, type DLQAlertPayload } from '../services/alerting' +import { getCorrelationId } from '../utils/correlation' export type DeadLetterEventStatus = 'PENDING' | 'RETRIED' | 'RESOLVED' @@ -63,6 +64,18 @@ function serializePayload(event: any): any { } } +function buildPayload(event: any): any { + const correlationId = getCorrelationId() ?? event?.correlationId + const serialized = serializePayload(event) + if (correlationId) { + return { + ...serialized, + _metadata: { correlationId }, + } + } + return serialized +} + function deserializePayload(event: any): any { return { ...event, @@ -118,7 +131,7 @@ export class DeadLetterQueue { eventType: event?.type ?? 'unknown', ledger: typeof event?.ledger === 'number' ? event.ledger : 0, error: errorMsg, - payload: serializePayload(event), + payload: buildPayload(event), status: 'PENDING' as const, retryCount: 0, }, diff --git a/src/stellar/events.ts b/src/stellar/events.ts index 31ec37c..fb6147a 100644 --- a/src/stellar/events.ts +++ b/src/stellar/events.ts @@ -7,6 +7,7 @@ import { ContractEvent, DepositEvent, WithdrawEvent, RebalanceEvent, EventMetric import { logger } from '../utils/logger'; import { config } from '../config'; import { DeadLetterQueue } from './dlq'; +import { generateCorrelationId, runWithCorrelationIdAsync } from '../utils/correlation'; import { ContractEventSchema, DepositEventSchema, @@ -341,9 +342,14 @@ async function handleRebalanceEvent(rebalanceData: RebalanceEvent, event: Contra * Handle contract event with persistence, idempotency, and validation (Issue #53) */ export async function handleEvent(event: ContractEvent, tx: any = db): Promise { - const startTime = Date.now(); - try { - logger.info(`[Event] ${event.type} detected at ledger ${event.ledger}, tx: ${event.txHash}`); + const correlationId = generateCorrelationId(); + return runWithCorrelationIdAsync(correlationId, async () => { + const eventWithCorrelation = { ...event, correlationId }; + const startTime = Date.now(); + try { + logger.info(`[Event] ${event.type} detected at ledger ${event.ledger}, tx: ${event.txHash}`, { + correlationId, + }); // Issue #53: Event validation ContractEventSchema.parse(event); @@ -409,7 +415,7 @@ export async function handleEvent(event: ContractEvent, tx: any = db): Promise() + +/** Max length for client-supplied request IDs. */ +export const MAX_CORRELATION_ID_LENGTH = 128 + +/** UUID v4 or common request-id tokens (alphanumeric, hyphen, underscore). */ +const VALID_CORRELATION_ID = /^[A-Za-z0-9_-]{1,128}$/ + +export function generateCorrelationId(): string { + return randomUUID() +} + +export function isValidCorrelationId(value: string): boolean { + return VALID_CORRELATION_ID.test(value) && value.length <= MAX_CORRELATION_ID_LENGTH +} + +export function resolveCorrelationId( + headers: Record +): string { + const raw = + headers['x-request-id'] ?? + headers['x-correlation-id'] ?? + headers['X-Request-ID'] ?? + headers['X-Correlation-ID'] + + const candidate = Array.isArray(raw) ? raw[0] : raw + if (candidate && isValidCorrelationId(candidate.trim())) { + return candidate.trim() + } + + return generateCorrelationId() +} + +export function getCorrelationId(): string | undefined { + return correlationStorage.getStore() +} + +export function runWithCorrelationId(correlationId: string, fn: () => T): T { + return correlationStorage.run(correlationId, fn) +} + +export async function runWithCorrelationIdAsync( + correlationId: string, + fn: () => Promise +): Promise { + return correlationStorage.run(correlationId, fn) +} diff --git a/src/utils/logger.ts b/src/utils/logger.ts index 654ec98..bd80224 100644 --- a/src/utils/logger.ts +++ b/src/utils/logger.ts @@ -1,6 +1,7 @@ import winston from 'winston' import * as fs from 'fs' import * as path from 'path' +import { getCorrelationId } from './correlation' // Ensure logs directory exists with fail-safe handling const logsDir = path.join(process.cwd(), 'logs') @@ -38,6 +39,15 @@ function redactSensitiveData(message: string): string { return redacted } +// Inject correlation ID from AsyncLocalStorage when present +const correlationFormat = winston.format((info) => { + const correlationId = getCorrelationId() + if (correlationId && !info.correlationId) { + info.correlationId = correlationId + } + return info +}) + // Custom format that redacts sensitive data const redactFormat = winston.format.printf(({ timestamp, level, message, ...meta }) => { const safeMessage = typeof message === 'string' ? redactSensitiveData(message) : message @@ -98,10 +108,19 @@ if (fs.existsSync(logsDir) && fs.statSync(logsDir).isDirectory()) { export const logger = winston.createLogger({ level: logLevel, - format: winston.format.combine(winston.format.timestamp(), redactFormat), + format: winston.format.combine( + winston.format.timestamp(), + correlationFormat(), + redactFormat + ), transports, }) +/** Child logger with a fixed correlationId (e.g. background jobs). */ +export function createCorrelatedLogger(correlationId: string): winston.Logger { + return logger.child({ correlationId }) +} + // Optional cloud logging adapters (disabled by default) export function addCloudLoggingAdapter(adapter: winston.transport): void { logger.add(adapter) diff --git a/tests/integration/correlationId.integration.test.ts b/tests/integration/correlationId.integration.test.ts new file mode 100644 index 0000000..970d5db --- /dev/null +++ b/tests/integration/correlationId.integration.test.ts @@ -0,0 +1,50 @@ +import express from 'express' +import request from 'supertest' +import { correlationIdMiddleware } from '../../src/middleware/correlationId' +import { requestLogger } from '../../src/middleware/logger' +import { logger } from '../../src/utils/logger' + +jest.mock('../../src/utils/logger', () => ({ + logger: { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + child: jest.fn(), + }, + createCorrelatedLogger: jest.fn(), +})) + +describe('correlation ID integration', () => { + const clientId = '550e8400-e29b-41d4-a716-446655440000' + + beforeEach(() => { + jest.clearAllMocks() + }) + + it('propagates a single request ID through response header, handler, and logs', async () => { + const app = express() + app.use(correlationIdMiddleware) + app.use(requestLogger) + + const downstreamHandler = jest.fn((req, res) => { + res.json({ requestId: req.correlationId }) + }) + + app.get('/test', downstreamHandler) + + const res = await request(app).get('/test').set('X-Request-ID', clientId) + + expect(res.status).toBe(200) + expect(res.headers['x-request-id']).toBe(clientId) + expect(res.body.requestId).toBe(clientId) + expect(downstreamHandler).toHaveBeenCalledTimes(1) + + expect(logger.info).toHaveBeenCalledWith( + 'GET /test', + expect.objectContaining({ + correlationId: clientId, + status: 200, + }) + ) + }) +}) diff --git a/tests/unit/middleware/correlationId.test.ts b/tests/unit/middleware/correlationId.test.ts new file mode 100644 index 0000000..37a9e40 --- /dev/null +++ b/tests/unit/middleware/correlationId.test.ts @@ -0,0 +1,80 @@ +import { Request, Response, NextFunction } from 'express' +import { correlationIdMiddleware, REQUEST_ID_HEADER } from '../../../src/middleware/correlationId' +import { isValidCorrelationId, generateCorrelationId } from '../../../src/utils/correlation' + +describe('correlation utilities', () => { + it('accepts UUID-shaped IDs', () => { + expect(isValidCorrelationId('550e8400-e29b-41d4-a716-446655440000')).toBe(true) + }) + + it('accepts alphanumeric request IDs up to 128 chars', () => { + expect(isValidCorrelationId('req-abc_123')).toBe(true) + }) + + it('rejects IDs over 128 characters', () => { + expect(isValidCorrelationId('a'.repeat(129))).toBe(false) + }) + + it('rejects IDs with invalid characters', () => { + expect(isValidCorrelationId('bad id with spaces')).toBe(false) + }) + + it('generates valid UUIDs', () => { + const id = generateCorrelationId() + expect(isValidCorrelationId(id)).toBe(true) + }) +}) + +describe('correlationIdMiddleware', () => { + let req: Partial + let res: Partial + let next: NextFunction + + beforeEach(() => { + req = { headers: {} } + res = { + locals: {}, + setHeader: jest.fn(), + } + next = jest.fn() + }) + + it('preserves a client-supplied X-Request-ID', () => { + const clientId = '550e8400-e29b-41d4-a716-446655440000' + req.headers = { 'x-request-id': clientId } + + correlationIdMiddleware(req as Request, res as Response, next) + + expect(req.correlationId).toBe(clientId) + expect(res.locals?.correlationId).toBe(clientId) + expect(res.setHeader).toHaveBeenCalledWith(REQUEST_ID_HEADER, clientId) + expect(next).toHaveBeenCalled() + }) + + it('accepts X-Correlation-ID when X-Request-ID is absent', () => { + const clientId = 'client-correlation-001' + req.headers = { 'x-correlation-id': clientId } + + correlationIdMiddleware(req as Request, res as Response, next) + + expect(req.correlationId).toBe(clientId) + expect(res.setHeader).toHaveBeenCalledWith(REQUEST_ID_HEADER, clientId) + }) + + it('generates a UUID when no header is provided', () => { + correlationIdMiddleware(req as Request, res as Response, next) + + expect(req.correlationId).toBeDefined() + expect(isValidCorrelationId(req.correlationId!)).toBe(true) + expect(res.setHeader).toHaveBeenCalledWith(REQUEST_ID_HEADER, req.correlationId) + }) + + it('generates a new ID when the client header is invalid', () => { + req.headers = { 'x-request-id': 'not valid!!!' } + + correlationIdMiddleware(req as Request, res as Response, next) + + expect(req.correlationId).not.toBe('not valid!!!') + expect(isValidCorrelationId(req.correlationId!)).toBe(true) + }) +}) diff --git a/tests/unit/stellar/dlq-correlation.test.ts b/tests/unit/stellar/dlq-correlation.test.ts new file mode 100644 index 0000000..06ae2dd --- /dev/null +++ b/tests/unit/stellar/dlq-correlation.test.ts @@ -0,0 +1,65 @@ +import { DeadLetterQueue } from '../../../src/stellar/dlq' +import { runWithCorrelationIdAsync } from '../../../src/utils/correlation' + +jest.mock('../../../src/db', () => ({ + __esModule: true, + default: { + deadLetterEvent: { + create: jest.fn().mockResolvedValue({ + id: 'dlq-1', + contractId: 'contract-1', + txHash: 'tx-abc', + eventType: 'deposit', + ledger: 100, + error: 'test error', + payload: {}, + status: 'PENDING', + retryCount: 0, + createdAt: new Date(), + updatedAt: new Date(), + }), + count: jest.fn().mockResolvedValue(1), + }, + }, +})) + +jest.mock('../../../src/utils/logger', () => ({ + logger: { warn: jest.fn(), info: jest.fn(), error: jest.fn() }, +})) + +jest.mock('../../../src/utils/metrics', () => ({ + updateDlqSize: jest.fn(), +})) + +jest.mock('../../../src/services/alerting', () => ({ + alertingService: { clearDLQAlertState: jest.fn(), emitDLQAlert: jest.fn() }, +})) + +jest.mock('../../../src/config', () => ({ + config: { dlq: { alertThreshold: 50 } }, +})) + +import db from '../../../src/db' + +describe('DeadLetterQueue correlation ID', () => { + beforeEach(() => { + jest.clearAllMocks() + }) + + it('stores correlationId in payload metadata when available', async () => { + const correlationId = 'event-correlation-123' + const event = { + contractId: 'contract-1', + txHash: 'tx-abc', + type: 'deposit', + ledger: 100, + } + + await runWithCorrelationIdAsync(correlationId, async () => { + await DeadLetterQueue.add(event, 'processing failed') + }) + + const createCall = (db as any).deadLetterEvent.create.mock.calls[0][0] + expect(createCall.data.payload._metadata).toEqual({ correlationId }) + }) +})