diff --git a/.env.example b/.env.example index 4ee8386..cfa060f 100644 --- a/.env.example +++ b/.env.example @@ -91,3 +91,13 @@ ADMIN_DASHBOARD_URL=https://admin.neurowealth.io # Graceful shutdown # Grace period (ms) for in-flight requests to complete before force-exit SHUTDOWN_DRAIN_TIMEOUT_MS=30000 + +# Data retention (all optional — defaults shown) +# Days to retain processed_events rows before deletion (default: 90) +RETENTION_PROCESSED_EVENTS_DAYS=90 +# Days to retain RESOLVED dead_letter_events rows before deletion (default: 30) +RETENTION_DEAD_LETTER_EVENTS_DAYS=30 +# Days to retain agent_logs rows before deletion (default: 60) +RETENTION_AGENT_LOGS_DAYS=60 +# Interval between retention job runs in ms (default: 86400000 = 24 h) +RETENTION_INTERVAL_MS=86400000 diff --git a/prisma/migrations/20260618000000_add_retention_indexes/migration.sql b/prisma/migrations/20260618000000_add_retention_indexes/migration.sql new file mode 100644 index 0000000..3c02097 --- /dev/null +++ b/prisma/migrations/20260618000000_add_retention_indexes/migration.sql @@ -0,0 +1,14 @@ +-- Add indexes to support efficient retention/cleanup queries +-- These indexes cover the WHERE clauses used by the data retention jobs + +-- auth_nonces: already has @@index([expiresAt]) in schema, ensure it exists +CREATE INDEX IF NOT EXISTS "auth_nonces_expiresAt_idx" ON "auth_nonces"("expiresAt"); + +-- processed_events: index on processedAt for time-based pruning +CREATE INDEX IF NOT EXISTS "processed_events_processedAt_idx" ON "processed_events"("processedAt"); + +-- dead_letter_events: index on (status, createdAt) for RESOLVED+age cleanup +CREATE INDEX IF NOT EXISTS "dead_letter_events_status_createdAt_idx" ON "dead_letter_events"("status", "createdAt"); + +-- agent_logs: already has @@index([createdAt]), ensure it exists +CREATE INDEX IF NOT EXISTS "agent_logs_createdAt_idx" ON "agent_logs"("createdAt"); diff --git a/src/config/env.ts b/src/config/env.ts index de71282..1ee0087 100644 --- a/src/config/env.ts +++ b/src/config/env.ts @@ -278,4 +278,14 @@ export const config = { /** Grace period (ms) for in-force requests to complete before force-exit */ drainTimeoutMs: parseInt(process.env.SHUTDOWN_DRAIN_TIMEOUT_MS || '30000'), }, + retention: { + /** How many days to keep processed_events rows (default: 90 days) */ + processedEventsDays: parseInt(process.env.RETENTION_PROCESSED_EVENTS_DAYS || '90'), + /** How many days to keep RESOLVED dead_letter_events (default: 30 days) */ + deadLetterEventsDays: parseInt(process.env.RETENTION_DEAD_LETTER_EVENTS_DAYS || '30'), + /** How many days to keep agent_logs rows (default: 60 days) */ + agentLogsDays: parseInt(process.env.RETENTION_AGENT_LOGS_DAYS || '60'), + /** Interval between retention job runs in ms (default: 24 hours) */ + intervalMs: parseInt(process.env.RETENTION_INTERVAL_MS || '86400000'), + }, } \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index a9add22..159af23 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,6 +10,7 @@ import { logger } from './utils/logger' import { startAgentLoop, stopAgentLoop } from './agent/loop' import { connectDb } from './db' import { scheduleSessionCleanup } from './jobs/sessionCleanup' +import { scheduleDataRetention } from './jobs/dataRetention' import { startEventListener, stopEventListener } from './stellar/events' import healthRouter from './routes/health' import agentRouter from './routes/agent' @@ -46,6 +47,7 @@ const serviceStatus: Record = { let isShuttingDown = false let httpServer: Server | null = null let sessionCleanupHandle: NodeJS.Timeout | null = null +let dataRetentionHandle: NodeJS.Timeout | null = null function allServicesReady(): boolean { return Object.values(serviceStatus).every(s => s.ready) @@ -156,6 +158,13 @@ async function gracefulShutdown(signal: string): Promise { logger.info('[Shutdown] Session cleanup timer cleared') } + // Stop the data retention interval + if (dataRetentionHandle) { + clearInterval(dataRetentionHandle) + dataRetentionHandle = null + logger.info('[Shutdown] Data retention timer cleared') + } + if (!httpServer) { logger.warn('[Shutdown] No HTTP server to close') process.exit(0) @@ -287,6 +296,7 @@ async function main(): Promise { // Non-critical jobs start after the server is up sessionCleanupHandle = scheduleSessionCleanup() + dataRetentionHandle = scheduleDataRetention() } // ── Process-level error guards ──────────────────────────────────────────────── diff --git a/src/jobs/dataRetention.ts b/src/jobs/dataRetention.ts new file mode 100644 index 0000000..390dd9b --- /dev/null +++ b/src/jobs/dataRetention.ts @@ -0,0 +1,145 @@ +import db from '../db'; +import { logger } from '../utils/logger'; +import { config } from '../config/env'; +import { recordBackgroundJob, recordRetentionDeletes } from '../utils/metrics'; + +function cutoffDate(retentionDays: number): Date { + const d = new Date(); + d.setDate(d.getDate() - retentionDays); + return d; +} + +/** + * Delete expired auth_nonces (expiresAt < now). + */ +export async function cleanupAuthNonces(): Promise { + const start = Date.now(); + const jobName = 'retention_auth_nonces'; + try { + const result = await db.authNonce.deleteMany({ + where: { expiresAt: { lt: new Date() } }, + }); + const duration = (Date.now() - start) / 1000; + if (result.count > 0) { + logger.info(`[DataRetention] auth_nonces: removed ${result.count} expired row(s)`); + recordRetentionDeletes('auth_nonces', result.count); + } + recordBackgroundJob(jobName, 'success', duration); + } catch (error) { + const duration = (Date.now() - start) / 1000; + logger.error('[DataRetention] auth_nonces cleanup failed:', error); + recordBackgroundJob(jobName, 'failed', duration); + } +} + +/** + * Prune processed_events older than RETENTION_PROCESSED_EVENTS_DAYS (default 90). + */ +export async function cleanupProcessedEvents(): Promise { + const start = Date.now(); + const jobName = 'retention_processed_events'; + try { + const cutoff = cutoffDate(config.retention.processedEventsDays); + const result = await db.processedEvent.deleteMany({ + where: { processedAt: { lt: cutoff } }, + }); + const duration = (Date.now() - start) / 1000; + if (result.count > 0) { + logger.info( + `[DataRetention] processed_events: removed ${result.count} row(s) older than ${config.retention.processedEventsDays}d`, + ); + recordRetentionDeletes('processed_events', result.count); + } + recordBackgroundJob(jobName, 'success', duration); + } catch (error) { + const duration = (Date.now() - start) / 1000; + logger.error('[DataRetention] processed_events cleanup failed:', error); + recordBackgroundJob(jobName, 'failed', duration); + } +} + +/** + * Prune RESOLVED dead_letter_events older than RETENTION_DEAD_LETTER_EVENTS_DAYS (default 30). + * PENDING and RETRIED records are left untouched so they remain actionable. + */ +export async function cleanupDeadLetterEvents(): Promise { + const start = Date.now(); + const jobName = 'retention_dead_letter_events'; + try { + const cutoff = cutoffDate(config.retention.deadLetterEventsDays); + const result = await db.deadLetterEvent.deleteMany({ + where: { + status: 'RESOLVED', + createdAt: { lt: cutoff }, + }, + }); + const duration = (Date.now() - start) / 1000; + if (result.count > 0) { + logger.info( + `[DataRetention] dead_letter_events: removed ${result.count} RESOLVED row(s) older than ${config.retention.deadLetterEventsDays}d`, + ); + recordRetentionDeletes('dead_letter_events', result.count); + } + recordBackgroundJob(jobName, 'success', duration); + } catch (error) { + const duration = (Date.now() - start) / 1000; + logger.error('[DataRetention] dead_letter_events cleanup failed:', error); + recordBackgroundJob(jobName, 'failed', duration); + } +} + +/** + * Prune agent_logs older than RETENTION_AGENT_LOGS_DAYS (default 60). + */ +export async function cleanupAgentLogs(): Promise { + const start = Date.now(); + const jobName = 'retention_agent_logs'; + try { + const cutoff = cutoffDate(config.retention.agentLogsDays); + const result = await db.agentLog.deleteMany({ + where: { createdAt: { lt: cutoff } }, + }); + const duration = (Date.now() - start) / 1000; + if (result.count > 0) { + logger.info( + `[DataRetention] agent_logs: removed ${result.count} row(s) older than ${config.retention.agentLogsDays}d`, + ); + recordRetentionDeletes('agent_logs', result.count); + } + recordBackgroundJob(jobName, 'success', duration); + } catch (error) { + const duration = (Date.now() - start) / 1000; + logger.error('[DataRetention] agent_logs cleanup failed:', error); + recordBackgroundJob(jobName, 'failed', duration); + } +} + +/** + * Run all retention jobs sequentially. + */ +export async function runAllRetentionJobs(): Promise { + logger.info('[DataRetention] Starting all retention cleanup jobs'); + await cleanupAuthNonces(); + await cleanupProcessedEvents(); + await cleanupDeadLetterEvents(); + await cleanupAgentLogs(); + logger.info('[DataRetention] All retention cleanup jobs complete'); +} + +/** + * Schedule the retention cleanup jobs. + * Runs once on startup then on the configured interval (default 24 h). + * + * @returns NodeJS.Timeout handle — pass to clearInterval() on shutdown. + */ +export function scheduleDataRetention(): NodeJS.Timeout { + runAllRetentionJobs(); + const handle = setInterval(runAllRetentionJobs, config.retention.intervalMs); + logger.info( + `[DataRetention] Retention jobs scheduled every ${config.retention.intervalMs / 3600000}h` + + ` (processed_events=${config.retention.processedEventsDays}d,` + + ` dlq=${config.retention.deadLetterEventsDays}d,` + + ` agent_logs=${config.retention.agentLogsDays}d)`, + ); + return handle; +} diff --git a/src/utils/metrics.ts b/src/utils/metrics.ts index 063f584..eb5879a 100644 --- a/src/utils/metrics.ts +++ b/src/utils/metrics.ts @@ -199,6 +199,22 @@ export const backgroundJobDuration = new client.Histogram({ registers: [register], }) +// ── Data Retention Metrics ─────────────────────────────────────────────────── + +export const retentionDeletesTotal = new client.Counter({ + name: 'retention_deletes_total', + help: 'Total number of rows deleted by retention cleanup jobs', + labelNames: ['table'] as const, + registers: [register], +}) + +export const retentionLastRunTimestamp = new client.Gauge({ + name: 'retention_last_run_timestamp_seconds', + help: 'Unix timestamp of the last successful retention job run per table', + labelNames: ['table'] as const, + registers: [register], +}) + // ── External Service Error Metrics ────────────────────────────────────────── export const externalServiceErrorsTotal = new client.Counter({ @@ -329,6 +345,14 @@ export function recordBackgroundJob( backgroundJobDuration.observe({ job }, durationSeconds) } +/** + * Record rows deleted by a retention job + */ +export function recordRetentionDeletes(table: string, count: number): void { + retentionDeletesTotal.inc({ table }, count) + retentionLastRunTimestamp.set({ table }, Date.now() / 1000) +} + /** * Record an external service error */