feat(orchestrator): add conversation_events table, entity, and storage#1167
feat(orchestrator): add conversation_events table, entity, and storage#1167
Conversation
geoffjay
left a comment
There was a problem hiding this comment.
Review: feat(orchestrator): add conversation_events table, entity, and storage
Reviewed against the acceptance criteria in #1159. The migration, entity, domain types, and storage structure are all correct. Two issues must be fixed before merge, plus one strong recommendation.
🔴 Fix required — silent metadata loss in insert_conversation_event
storage.rs, insert_conversation_event:
// Current
metadata: Set(event
.metadata
.as_ref()
.map(|v| serde_json::to_string(v).unwrap_or_default())),unwrap_or_default() silently drops the metadata field if serde_json::to_string fails (e.g. a Value containing f64::NAN). The read path in model_to_conversation_event correctly propagates errors with .transpose()?. The write path should match:
// Fix
let metadata_str = event
.metadata
.as_ref()
.map(serde_json::to_string)
.transpose()
.map_err(|e| anyhow::anyhow!("Failed to serialize event metadata: {}", e))?;
// In the ActiveModel:
metadata: Set(metadata_str),This is consistent with anyhow-based error propagation used everywhere else in AgentStorage.
🔴 Fix required — since/until filter path is untested
The PR adds since and until to ConversationQuery and wires them into list_conversation_events, but no test exercises that code path. The 7 tests cover insert/count, list-all, type filtering, limit+offset, delete isolation, metadata roundtrip, and all 8 event type variants — but not time-range filtering.
Add a test:
#[tokio::test]
async fn test_conv_list_time_range() {
let (storage, _tmp) = create_test_storage().await;
let agent_id = Uuid::new_v4();
storage
.insert_conversation_event(&make_event(agent_id, ConversationEventType::Output))
.await
.unwrap();
let boundary = Utc::now();
storage
.insert_conversation_event(&make_event(agent_id, ConversationEventType::Output))
.await
.unwrap();
storage
.insert_conversation_event(&make_event(agent_id, ConversationEventType::Output))
.await
.unwrap();
// Only events after boundary
let opts = ConversationQuery { since: Some(boundary), ..Default::default() };
let events = storage.list_conversation_events(agent_id, &opts).await.unwrap();
assert_eq!(events.len(), 2);
// Only events before boundary
let opts = ConversationQuery { until: Some(boundary), ..Default::default() };
let events = storage.list_conversation_events(agent_id, &opts).await.unwrap();
assert_eq!(events.len(), 1);
}🟡 Recommendation — add session_number filter to ConversationQuery
The REST API (#1161) and UI (#1162) will need to replay a single session (e.g. "all events from session 3"). ConversationQuery is the right place for this and the fix is trivial while it's still a new type:
// types.rs
pub session_number: Option<i64>,// storage.rs list_conversation_events
if let Some(sn) = opts.session_number {
condition = condition.add(conv_entity::Column::SessionNumber.eq(sn));
}Adding this after #1161 lands would require a ConversationQuery change that forces updates in all callers. Cheap to do now.
✅ What's correct
- Migration uses
if_not_exists(), creates both composite indexes, anddown()correctly drops indexes before the table — idempotent and reversible #[allow(dead_code)]on the entity file with a clear follow-up comment is the right patternmodel_to_conversation_eventuses?throughout — correcttest_conv_event_type_roundtripverifies all 8 variants survive a DB round-trip — thoroughtest_conv_delete_for_agentcorrectly verifies cross-agent isolationapi.rsandclient.rschanges are cargo-fmt reformats only, no logic changes — acceptablesession_numbercorrectly typed asi64/big_integerthroughout
|
This change is part of the following stack: Change managed by git-spice. |
- Fix silent metadata loss in insert_conversation_event: replace unwrap_or_default() with transpose()?.map_err(...) so serialization failures propagate as errors rather than writing empty strings - Add test_conv_list_time_range covering since/until filter paths - Add test_conv_list_filter_by_session covering session_number filter - Add session_number: Option<i64> to ConversationQuery with DB-level filter in list_conversation_events (recommended while type is new) - Add #[allow(dead_code)] to conversation types and storage methods consumed by stacked branches (#1160, #1161, #1163) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Address two blocking issues from PR review: 1. Session number off-by-one on reconnect - replace get_usage_stats().session_count (which is N+1 when active) with a new get_max_conversation_session_number() storage method that queries MAX(session_number) directly from conversation_events. This is always correct regardless of usage-session semantics. 2. Add persistence tests - 9 new tests covering: session counter init without storage, session counter removed on unregister, storage-backed session init reads MAX(session_number), persist_context_cleared updates counter and writes event, send_user_message persists prompt_sent + activity_changed, handle_incoming_message persists output/tool_use/thinking/result events. 3. Fix race condition in register(): async storage init now uses max(storage, current) to avoid overwriting a newer value set by persist_context_cleared between register() and the spawned lookup completing. 4. Add #[allow(dead_code)] to storage query helpers and ConversationQuery struct that are used only in test code (clippy -D warnings). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ation, cursor encoding - Push session_number filter to DB in ConversationQuery/list_conversation_events so has_more/total reflect the filtered result set (removes in-memory retain) - Replace get_conversation_summary full table scan with two SQL aggregation queries (GROUP BY event_type COUNT + COUNT DISTINCT session_number) - URL-encode before/after cursor values in client.rs to handle RFC 3339 timestamps with + offsets correctly - Add #[allow(dead_code)] to methods used by stacked integration test branch - Add urlencoding = "2.1" dependency to orchestrator Cargo.toml Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… persistence
Adds tests/conversation_persistence.rs with 26 integration tests covering:
- Storage layer: since/until cursor filtering, combined time windows,
ascending ordering, session isolation, cross-agent delete isolation,
all-event-type round-trips, and metadata round-trips.
- REST API: GET /agents/{id}/conversation (events, limit/has_more,
event_type filter, session filter, after/before cursors, 404, empty),
GET summary, single-event retrieval with wrong-agent/unknown-id 404s,
DELETE /agents/{id}/conversation (204, 404, cross-agent isolation,
idempotent), and count accuracy.
Also adds DELETE /agents/{id}/conversation endpoint (204 No Content) to
api.rs and urlencoding dev-dependency to Cargo.toml.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eanup (address review feedback) - Fix counter: .absolute(n) -> .increment(n) so the Prometheus counter accumulates across cleanup cycles instead of being reset each run - Wire prune_excess_conversation_events into the periodic cleanup loop; now iterates all known agents and enforces max_events_per_agent each cycle - Guard cleanup_interval_secs, retention_days, and max_events_per_agent against zero in RetentionConfig::from_env (zero interval panics tokio::interval; zero retention/max would delete all events silently) - Fix Default impl: hardcode safe values instead of delegating to from_env(), so tests get predictable defaults regardless of environment variables - Add DEFAULT_* constants to RetentionConfig for the hardcoded fallbacks - Fix test_prune_excess_removes_oldest_first: use explicit timestamps spaced 1 s apart to prevent flakiness on fast machines - Update doc comments: max_events_per_agent now documents periodic enforcement Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1167 +/- ##
==========================================
+ Coverage 58.94% 60.33% +1.39%
==========================================
Files 226 228 +2
Lines 24467 25489 +1022
==========================================
+ Hits 14422 15380 +958
- Misses 10045 10109 +64 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Adds the persistence foundation for agent conversation history — the
conversation_eventsSQLite table, SeaORM entity, domain types, and four CRUD methods onAgentStorage.Changes
m20260417_000016_add_conversation_events.rs) — createsconversation_eventstable with two composite indexes (agent_id+created_at,agent_id+event_type);down()correctly drops indexes before the tableentity/conversation_event.rs) — SeaORMModelwith 7 fields matching the schema;#![allow(dead_code)]suppresses expected unused-derive warnings until Persist agent stream events in orchestrator WebSocket handler #1160 wires it uptypes.rs) —ConversationEventTypeenum (8 variants,Display/FromStr),ConversationEventstruct withnew()helper,ConversationQueryfor filtering/paginationstorage.rs) —insert_conversation_event,list_conversation_events(with type/time/pagination filters),count_conversation_events,delete_conversation_events_for_agent; plusmodel_to_conversation_eventhelperTest plan
storage::tests: insert+count, list all, filter by type, limit+offset, delete (cross-agent isolation), metadata JSON roundtrip, all 8 event-type variantscargo test -p orchestrator— 413 tests pass (pre-existinglinear_fetch_httpnetwork tests ignored)cargo fmt --check -p orchestrator— cleancargo clippy -p orchestrator— no errorsDependencies
Closes #1159
🤖 Generated with Claude Code