Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,13 @@ ADMIN_DASHBOARD_URL=https://admin.neurowealth.io
# Graceful shutdown
# Grace period (ms) for in-flight requests to complete before force-exit
SHUTDOWN_DRAIN_TIMEOUT_MS=30000

# Data retention (all optional — defaults shown)
# Days to retain processed_events rows before deletion (default: 90)
RETENTION_PROCESSED_EVENTS_DAYS=90
# Days to retain RESOLVED dead_letter_events rows before deletion (default: 30)
RETENTION_DEAD_LETTER_EVENTS_DAYS=30
# Days to retain agent_logs rows before deletion (default: 60)
RETENTION_AGENT_LOGS_DAYS=60
# Interval between retention job runs in ms (default: 86400000 = 24 h)
RETENTION_INTERVAL_MS=86400000
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Add indexes to support efficient retention/cleanup queries
-- These indexes cover the WHERE clauses used by the data retention jobs

-- auth_nonces: already has @@index([expiresAt]) in schema, ensure it exists
CREATE INDEX IF NOT EXISTS "auth_nonces_expiresAt_idx" ON "auth_nonces"("expiresAt");

-- processed_events: index on processedAt for time-based pruning
CREATE INDEX IF NOT EXISTS "processed_events_processedAt_idx" ON "processed_events"("processedAt");

-- dead_letter_events: index on (status, createdAt) for RESOLVED+age cleanup
CREATE INDEX IF NOT EXISTS "dead_letter_events_status_createdAt_idx" ON "dead_letter_events"("status", "createdAt");

-- agent_logs: already has @@index([createdAt]), ensure it exists
CREATE INDEX IF NOT EXISTS "agent_logs_createdAt_idx" ON "agent_logs"("createdAt");
10 changes: 10 additions & 0 deletions src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,14 @@ export const config = {
/** Grace period (ms) for in-force requests to complete before force-exit */
drainTimeoutMs: parseInt(process.env.SHUTDOWN_DRAIN_TIMEOUT_MS || '30000'),
},
retention: {
/** How many days to keep processed_events rows (default: 90 days) */
processedEventsDays: parseInt(process.env.RETENTION_PROCESSED_EVENTS_DAYS || '90'),
/** How many days to keep RESOLVED dead_letter_events (default: 30 days) */
deadLetterEventsDays: parseInt(process.env.RETENTION_DEAD_LETTER_EVENTS_DAYS || '30'),
/** How many days to keep agent_logs rows (default: 60 days) */
agentLogsDays: parseInt(process.env.RETENTION_AGENT_LOGS_DAYS || '60'),
/** Interval between retention job runs in ms (default: 24 hours) */
intervalMs: parseInt(process.env.RETENTION_INTERVAL_MS || '86400000'),
},
}
10 changes: 10 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { logger } from './utils/logger'
import { startAgentLoop, stopAgentLoop } from './agent/loop'
import { connectDb } from './db'
import { scheduleSessionCleanup } from './jobs/sessionCleanup'
import { scheduleDataRetention } from './jobs/dataRetention'
import { startEventListener, stopEventListener } from './stellar/events'
import healthRouter from './routes/health'
import agentRouter from './routes/agent'
Expand Down Expand Up @@ -46,6 +47,7 @@ const serviceStatus: Record<string, ServiceStatus> = {
let isShuttingDown = false
let httpServer: Server | null = null
let sessionCleanupHandle: NodeJS.Timeout | null = null
let dataRetentionHandle: NodeJS.Timeout | null = null

function allServicesReady(): boolean {
return Object.values(serviceStatus).every(s => s.ready)
Expand Down Expand Up @@ -156,6 +158,13 @@ async function gracefulShutdown(signal: string): Promise<void> {
logger.info('[Shutdown] Session cleanup timer cleared')
}

// Stop the data retention interval
if (dataRetentionHandle) {
clearInterval(dataRetentionHandle)
dataRetentionHandle = null
logger.info('[Shutdown] Data retention timer cleared')
}

if (!httpServer) {
logger.warn('[Shutdown] No HTTP server to close')
process.exit(0)
Expand Down Expand Up @@ -287,6 +296,7 @@ async function main(): Promise<void> {

// Non-critical jobs start after the server is up
sessionCleanupHandle = scheduleSessionCleanup()
dataRetentionHandle = scheduleDataRetention()
}

// ── Process-level error guards ────────────────────────────────────────────────
Expand Down
145 changes: 145 additions & 0 deletions src/jobs/dataRetention.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import db from '../db';
import { logger } from '../utils/logger';
import { config } from '../config/env';
import { recordBackgroundJob, recordRetentionDeletes } from '../utils/metrics';

function cutoffDate(retentionDays: number): Date {
const d = new Date();
d.setDate(d.getDate() - retentionDays);
return d;
}

/**
* Delete expired auth_nonces (expiresAt < now).
*/
export async function cleanupAuthNonces(): Promise<void> {
const start = Date.now();
const jobName = 'retention_auth_nonces';
try {
const result = await db.authNonce.deleteMany({
where: { expiresAt: { lt: new Date() } },
});
const duration = (Date.now() - start) / 1000;
if (result.count > 0) {
logger.info(`[DataRetention] auth_nonces: removed ${result.count} expired row(s)`);
recordRetentionDeletes('auth_nonces', result.count);
}
recordBackgroundJob(jobName, 'success', duration);
} catch (error) {
const duration = (Date.now() - start) / 1000;
logger.error('[DataRetention] auth_nonces cleanup failed:', error);
recordBackgroundJob(jobName, 'failed', duration);
}
}

/**
* Prune processed_events older than RETENTION_PROCESSED_EVENTS_DAYS (default 90).
*/
export async function cleanupProcessedEvents(): Promise<void> {
const start = Date.now();
const jobName = 'retention_processed_events';
try {
const cutoff = cutoffDate(config.retention.processedEventsDays);
const result = await db.processedEvent.deleteMany({
where: { processedAt: { lt: cutoff } },
});
const duration = (Date.now() - start) / 1000;
if (result.count > 0) {
logger.info(
`[DataRetention] processed_events: removed ${result.count} row(s) older than ${config.retention.processedEventsDays}d`,
);
recordRetentionDeletes('processed_events', result.count);
}
recordBackgroundJob(jobName, 'success', duration);
} catch (error) {
const duration = (Date.now() - start) / 1000;
logger.error('[DataRetention] processed_events cleanup failed:', error);
recordBackgroundJob(jobName, 'failed', duration);
}
}

/**
* Prune RESOLVED dead_letter_events older than RETENTION_DEAD_LETTER_EVENTS_DAYS (default 30).
* PENDING and RETRIED records are left untouched so they remain actionable.
*/
export async function cleanupDeadLetterEvents(): Promise<void> {
const start = Date.now();
const jobName = 'retention_dead_letter_events';
try {
const cutoff = cutoffDate(config.retention.deadLetterEventsDays);
const result = await db.deadLetterEvent.deleteMany({
where: {
status: 'RESOLVED',
createdAt: { lt: cutoff },
},
});
const duration = (Date.now() - start) / 1000;
if (result.count > 0) {
logger.info(
`[DataRetention] dead_letter_events: removed ${result.count} RESOLVED row(s) older than ${config.retention.deadLetterEventsDays}d`,
);
recordRetentionDeletes('dead_letter_events', result.count);
}
recordBackgroundJob(jobName, 'success', duration);
} catch (error) {
const duration = (Date.now() - start) / 1000;
logger.error('[DataRetention] dead_letter_events cleanup failed:', error);
recordBackgroundJob(jobName, 'failed', duration);
}
}

/**
* Prune agent_logs older than RETENTION_AGENT_LOGS_DAYS (default 60).
*/
export async function cleanupAgentLogs(): Promise<void> {
const start = Date.now();
const jobName = 'retention_agent_logs';
try {
const cutoff = cutoffDate(config.retention.agentLogsDays);
const result = await db.agentLog.deleteMany({
where: { createdAt: { lt: cutoff } },
});
const duration = (Date.now() - start) / 1000;
if (result.count > 0) {
logger.info(
`[DataRetention] agent_logs: removed ${result.count} row(s) older than ${config.retention.agentLogsDays}d`,
);
recordRetentionDeletes('agent_logs', result.count);
}
recordBackgroundJob(jobName, 'success', duration);
} catch (error) {
const duration = (Date.now() - start) / 1000;
logger.error('[DataRetention] agent_logs cleanup failed:', error);
recordBackgroundJob(jobName, 'failed', duration);
}
}

/**
* Run all retention jobs sequentially.
*/
export async function runAllRetentionJobs(): Promise<void> {
logger.info('[DataRetention] Starting all retention cleanup jobs');
await cleanupAuthNonces();
await cleanupProcessedEvents();
await cleanupDeadLetterEvents();
await cleanupAgentLogs();
logger.info('[DataRetention] All retention cleanup jobs complete');
}

/**
* Schedule the retention cleanup jobs.
* Runs once on startup then on the configured interval (default 24 h).
*
* @returns NodeJS.Timeout handle — pass to clearInterval() on shutdown.
*/
export function scheduleDataRetention(): NodeJS.Timeout {
runAllRetentionJobs();
const handle = setInterval(runAllRetentionJobs, config.retention.intervalMs);
logger.info(
`[DataRetention] Retention jobs scheduled every ${config.retention.intervalMs / 3600000}h` +
` (processed_events=${config.retention.processedEventsDays}d,` +
` dlq=${config.retention.deadLetterEventsDays}d,` +
` agent_logs=${config.retention.agentLogsDays}d)`,
);
return handle;
}
24 changes: 24 additions & 0 deletions src/utils/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,22 @@ export const backgroundJobDuration = new client.Histogram({
registers: [register],
})

// ── Data Retention Metrics ───────────────────────────────────────────────────

export const retentionDeletesTotal = new client.Counter({
name: 'retention_deletes_total',
help: 'Total number of rows deleted by retention cleanup jobs',
labelNames: ['table'] as const,
registers: [register],
})

export const retentionLastRunTimestamp = new client.Gauge({
name: 'retention_last_run_timestamp_seconds',
help: 'Unix timestamp of the last successful retention job run per table',
labelNames: ['table'] as const,
registers: [register],
})

// ── External Service Error Metrics ──────────────────────────────────────────

export const externalServiceErrorsTotal = new client.Counter({
Expand Down Expand Up @@ -329,6 +345,14 @@ export function recordBackgroundJob(
backgroundJobDuration.observe({ job }, durationSeconds)
}

/**
* Record rows deleted by a retention job
*/
export function recordRetentionDeletes(table: string, count: number): void {
retentionDeletesTotal.inc({ table }, count)
retentionLastRunTimestamp.set({ table }, Date.now() / 1000)
}

/**
* Record an external service error
*/
Expand Down
Loading