feat(orchestrator): add conversation retention policy and periodic cleanup#1168
feat(orchestrator): add conversation retention policy and periodic cleanup#1168geoffjay merged 2 commits intoissue-1159from
Conversation
geoffjay
left a comment
There was a problem hiding this comment.
Review: feat(orchestrator): add conversation retention policy and periodic cleanup
Stack note: This PR is stacked on issue-1159 (PR #1167), which currently has needs-rework. These changes cannot merge until #1167 is fixed and this branch is restacked on the updated parent. The code review below is independent of that dependency.
🔴 Fix required — wrong Prometheus counter method (.absolute vs .increment)
main.rs, periodic cleanup task:
// Current — WRONG
metrics::counter!("agentd_conversation_events_pruned_total").absolute(n);.absolute(n) sets the counter to n, overwriting the accumulated total on every prune run. After two prune cycles of 100 events each the counter reads 100, not 200.
Every other counter in the codebase uses .increment():
// api.rs
metrics::counter!("agents_created_total").increment(1);
metrics::counter!("agents_terminated_total").increment(1);
// message_bridge.rs — .absolute(0) only used for initialisation to zero at startup
metrics::counter!("messages_delivered_to_agents").increment(1);
// main.rs (existing)
metrics::counter!("context_clears_total", "trigger" => "auto").increment(1);Fix:
metrics::counter!("agentd_conversation_events_pruned_total").increment(n);🔴 Fix required — zero cleanup_interval_secs panics at startup
types.rs, RetentionConfig::from_env():
cleanup_interval_secs: std::env::var("AGENTD_CONVERSATION_CLEANUP_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(21_600),If the variable is set to "0", parse::<u64>().ok() returns Some(0), so unwrap_or(21_600) never fires. Then:
tokio::time::interval(tokio::time::Duration::from_secs(0)) // panics: "Period is zero"This is the same class of bug flagged in PR #1166 (AGENTD_WRAP_CHANNEL_CAPACITY=0). Fix with a validation guard:
cleanup_interval_secs: std::env::var("AGENTD_CONVERSATION_CLEANUP_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok())
.filter(|&v| v >= 60) // reject values too small to be intentional
.unwrap_or(21_600),Or fail fast at startup:
let interval_secs: u64 = std::env::var("AGENTD_CONVERSATION_CLEANUP_INTERVAL_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(21_600);
anyhow::ensure!(interval_secs > 0,
"AGENTD_CONVERSATION_CLEANUP_INTERVAL_SECS must be > 0, got {}", interval_secs);Same validation should be applied to retention_days (zero would delete ALL events — cutoff becomes now - 0 days = now) and max_events_per_agent (zero would delete all events for every connected agent).
🟡 Important gap — prune_excess_conversation_events is never called
max_events_per_agent is configured, logged at startup, and the pruning method exists — but the periodic cleanup task only calls prune_old_conversation_events. The per-agent cap is never enforced:
// main.rs cleanup loop — only age-based pruning fires
match storage_cleanup.prune_old_conversation_events(retention.retention_days).await {
// prune_excess_conversation_events is never called hereIf this is intentional (cap enforcement is on-terminate only), the doc comment on RetentionConfig::max_events_per_agent should say so:
/// Hard cap per agent; oldest events are evicted first **on agent termination when
/// `cleanup_on_terminate = true`**. The periodic cleanup task only enforces `retention_days`.
/// (env: `AGENTD_CONVERSATION_MAX_EVENTS_PER_AGENT`, default: 50000).
pub max_events_per_agent: u64,If the cap should be enforced periodically, the loop needs to fetch all agent IDs from storage and call prune_excess_conversation_events for each, similar to how the reconciliation loop works.
✅ What's correct
- FK ordering in
terminate_agent():delete_conversation_events_for_agent()at line 267 fires beforestorage.delete(id)at line 278 — correct order, no orphaned events - Builder pattern:
with_retention()follows the existingwith_event_bus()pattern;new()correctly delegates to it with env-derived defaults - Cleanup task structure:
interval.tick().awaitbefore the loop (skip startup), thenloop { interval.tick().await; ... }— matches the reconciliation loop pattern exactly - Error handling: cleanup errors logged at
warn, don't interrupt the prune loop or the termination path - Test coverage: 4 tests correctly exercise old-row pruning, no-op on nothing old, excess pruning (oldest-first), and no-op under cap — all with cross-agent isolation
prune_excess_conversation_eventstwo-phase approach: fetch IDs then delete byis_in()is the correct SQLite workaround for correlated subqueries
ShawnSunClio
left a comment
There was a problem hiding this comment.
Review — feat(orchestrator): add conversation retention policy and periodic cleanup
Stack note: This PR is stacked on #1167 (issue-1159 → issue-1163), which is still open. The diff is clean relative to its parent — no merge conflicts detected — but this branch cannot be merged until #1167 lands first.
Blocking issues (3)
1. metrics::counter!(...).absolute(n) resets the counter instead of incrementing it (main.rs:447)
// current — WRONG
metrics::counter!("agentd_conversation_events_pruned_total").absolute(n);
// fix
metrics::counter!("agentd_conversation_events_pruned_total").increment(n);Counter::absolute(n) sets the counter to n; it does not add n to the running total. Every cleanup cycle would overwrite the counter with the number pruned in that single run. After two cycles pruning 100 and 50 events respectively, the counter would read 50, not 150. Every other counter in this file uses .increment(); this should too.
2. max_events_per_agent is configured and logged but never enforced
AGENTD_CONVERSATION_MAX_EVENTS_PER_AGENT is documented in the PR description as a working config knob, and RetentionConfig logs its value at startup. But the periodic cleanup loop (main.rs ~line 440–450) only calls prune_old_conversation_events — prune_excess_conversation_events is never invoked at the service level. The per-agent cap is dead code at runtime.
Either wire prune_excess_conversation_events into the cleanup loop (iterating over active agents), or explicitly document that this field is reserved for a future phase and remove it from the startup log until it is wired up. Logging a value that has no runtime effect is misleading.
3. cleanup_interval_secs = 0 panics the service (main.rs:439)
tokio::time::interval(tokio::time::Duration::from_secs(cleanup_interval))tokio::time::interval panics with "period must be positive" when given Duration::ZERO. A user who sets AGENTD_CONVERSATION_CLEANUP_INTERVAL_SECS=0 (or an env var that parses to 0) will crash the orchestrator at startup. Add a minimum bound — either clamp to max(cleanup_interval, 1) or return an explicit startup error with a descriptive message.
Non-blocking suggestions
4. impl Default delegates to from_env() — surprising in tests (types.rs:2137)
impl Default for RetentionConfig {
fn default() -> Self { Self::from_env() }
}Default conventionally yields hardcoded safe values with no side effects. Delegating to from_env() means any test that constructs AgentManager::new() (which calls RetentionConfig::from_env() internally) will silently inherit whatever env vars are set in the test runner. The better pattern is to implement Default with hardcoded values and have from_env() build from Self::default() as a baseline — that way tests get predictable defaults unless they explicitly set env vars.
5. test_prune_excess_removes_oldest_first may be flaky on fast machines (storage.rs:1857)
Five events are inserted in a tight loop using ConversationEvent::new(), which stamps each with Utc::now(). On a fast machine, multiple inserts may land within the same millisecond, giving identical created_at values. ORDER BY created_at ASC is then non-deterministic, and the assertion sessions.iter().all(|&s| s >= 2) could fail intermittently.
Fix by constructing events with explicitly distinct timestamps:
for i in 0..5i64 {
let mut ev = ConversationEvent::new(...);
ev.created_at = Utc::now() - chrono::Duration::seconds(5 - i); // oldest first
storage.insert_conversation_event(&ev).await.unwrap();
}6. retention_days = 0 silently deletes everything (storage.rs:prune_old_conversation_events)
prune_old_conversation_events(0) computes cutoff = Utc::now() - Duration::days(0) ≈ Utc::now(), which deletes all events created before the current instant — effectively wiping the table. A failed parse falling back to the default of 30 is fine, but a user who intentionally or accidentally sets the variable to 0 gets silent data loss. Worth a guard (if retention_days == 0 { return Ok(0); }) or a startup validation error.
Summary
Three issues require fixes before merge: the counter semantic (absolute → increment), the unconnected per-agent cap, and the zero-interval panic. Suggestions 4–6 are lower priority but worth addressing to avoid future pain.
|
This change is part of the following stack: Change managed by git-spice. |
…eanup (address review feedback) - Fix counter: .absolute(n) -> .increment(n) so the Prometheus counter accumulates across cleanup cycles instead of being reset each run - Wire prune_excess_conversation_events into the periodic cleanup loop; now iterates all known agents and enforces max_events_per_agent each cycle - Guard cleanup_interval_secs, retention_days, and max_events_per_agent against zero in RetentionConfig::from_env (zero interval panics tokio::interval; zero retention/max would delete all events silently) - Fix Default impl: hardcode safe values instead of delegating to from_env(), so tests get predictable defaults regardless of environment variables - Add DEFAULT_* constants to RetentionConfig for the hardcoded fallbacks - Fix test_prune_excess_removes_oldest_first: use explicit timestamps spaced 1 s apart to prevent flakiness on fast machines - Update doc comments: max_events_per_agent now documents periodic enforcement Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds configurable retention policy for conversation events to prevent unbounded database growth. Stacked on #1167 (issue #1159).
Changes
types.rs—RetentionConfigstruct withfrom_env()constructor reading four env vars:AGENTD_CONVERSATION_RETENTION_DAYS(default: 30)AGENTD_CONVERSATION_MAX_EVENTS_PER_AGENT(default: 50000)AGENTD_CONVERSATION_CLEANUP_ON_TERMINATE(default: false)AGENTD_CONVERSATION_CLEANUP_INTERVAL_SECS(default: 21600 / 6 h)storage.rs— Two new pruning methods onAgentStorage:prune_old_conversation_events(retention_days)— deletes rows older than the cutoff timestampprune_excess_conversation_events(agent_id, max_events)— fetches oldest IDs beyond the cap and deletes themmanager.rs—AgentManagergains aretention_config: Arc<RetentionConfig>field.with_retention()constructor for explicit config (testing/main).terminate_agent()callsdelete_conversation_events_for_agent()whencleanup_on_terminate = true.main.rs— ReadsRetentionConfig::from_env()at startup, passes it toAgentManager::with_retention(), logs resolved values, and spawns a periodic cleanup task (alongside the reconciliation loop) that runsprune_old_conversation_eventson the configured interval and increments theagentd_conversation_events_pruned_totalPrometheus counter.Test plan
test_prune_old_events_removes_old_rows,test_prune_old_events_no_op_when_nothing_old,test_prune_excess_removes_oldest_first,test_prune_excess_no_op_under_capcargo test -p orchestrator— 417 tests passcargo fmt --check -p orchestrator— cleancargo clippy -p orchestrator— no errorsCloses #1163
🤖 Generated with Claude Code