diff --git a/Cargo.lock b/Cargo.lock index a1020b14..c3b84574 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -509,7 +509,7 @@ checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" [[package]] name = "cli-sub-agent" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "chrono", @@ -701,7 +701,7 @@ dependencies = [ [[package]] name = "csa-acp" -version = "0.1.736" +version = "0.1.737" dependencies = [ "agent-client-protocol", "anyhow", @@ -721,7 +721,7 @@ dependencies = [ [[package]] name = "csa-config" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "chrono", @@ -739,7 +739,7 @@ dependencies = [ [[package]] name = "csa-core" -version = "0.1.736" +version = "0.1.737" dependencies = [ "agent-teams", "chrono", @@ -755,7 +755,7 @@ dependencies = [ [[package]] name = "csa-eval" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "chrono", @@ -769,7 +769,7 @@ dependencies = [ [[package]] name = "csa-executor" -version = "0.1.736" +version = "0.1.737" dependencies = [ "agent-teams", "anyhow", @@ -797,7 +797,7 @@ dependencies = [ [[package]] name = "csa-hooks" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "chrono", @@ -816,7 +816,7 @@ dependencies = [ [[package]] name = "csa-lock" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "chrono", @@ -830,7 +830,7 @@ dependencies = [ [[package]] name = "csa-mcp-hub" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "axum", @@ -853,7 +853,7 @@ dependencies = [ [[package]] name = "csa-memory" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "async-trait", @@ -872,7 +872,7 @@ dependencies = [ [[package]] name = "csa-process" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "chrono", @@ -891,7 +891,7 @@ dependencies = [ [[package]] name = "csa-resource" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "csa-core", @@ -907,7 +907,7 @@ dependencies = [ [[package]] name = "csa-scheduler" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "chrono", @@ -925,7 +925,7 @@ dependencies = [ [[package]] name = "csa-session" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "chrono", @@ -951,7 +951,7 @@ dependencies = [ [[package]] name = "csa-todo" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "chrono", @@ -4516,7 +4516,7 @@ dependencies = [ [[package]] name = "weave" -version = "0.1.736" +version = "0.1.737" dependencies = [ "anyhow", "clap", diff --git a/Cargo.toml b/Cargo.toml index 9c2a8be9..840868de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ default-members = [ resolver = "2" [workspace.package] -version = "0.1.736" +version = "0.1.737" edition = "2024" rust-version = "1.88" license = "Apache-2.0" diff --git a/crates/cli-sub-agent/src/pipeline_post_exec.rs b/crates/cli-sub-agent/src/pipeline_post_exec.rs index 27c3d987..600fcc74 100644 --- a/crates/cli-sub-agent/src/pipeline_post_exec.rs +++ b/crates/cli-sub-agent/src/pipeline_post_exec.rs @@ -56,6 +56,11 @@ pub(crate) struct PostExecContext<'a> { pub pre_exec_snapshot: Option, /// Whether the transport observed any tool calls during execution. pub has_tool_calls: bool, + /// Number of agent conversation turns observed in this run (one per + /// `AgentMessage` event). `0` means the transport did not parse streaming + /// events; `process_execution_result` falls back to `+= 1` to preserve the + /// legacy "one increment per csa run" semantics. + pub turn_count: u32, /// Whether this session is running in SA (sub-agent / autonomous) mode. pub sa_mode: bool, } @@ -109,8 +114,11 @@ pub(crate) async fn process_execution_result( } } - // Increment turn count - session.turn_count += 1; + // Increment turn count. Transports that parse streaming events (claude-code + // CLI, codex/gemini ACP) populate `ctx.turn_count` with the number of + // observed agent conversation turns; legacy transports leave it at `0` and + // fall back to the historical `+= 1` per-invocation contract (#1438). + session.turn_count = session.turn_count.saturating_add(ctx.turn_count.max(1)); // Update cumulative token usage update_cumulative_tokens(session, token_usage); diff --git a/crates/cli-sub-agent/src/pipeline_session_exec.rs b/crates/cli-sub-agent/src/pipeline_session_exec.rs index 5efce0c5..25988a71 100644 --- a/crates/cli-sub-agent/src/pipeline_session_exec.rs +++ b/crates/cli-sub-agent/src/pipeline_session_exec.rs @@ -751,8 +751,6 @@ pub(crate) async fn execute_with_session_and_meta_with_parent_source( execute_events_observed, ); } - let has_tool_calls = transport_result.metadata.has_tool_calls - || transport_result.metadata.has_execute_tool_calls; let sa_mode = std::env::var(crate::pipeline::prompt_guard::PROMPT_GUARD_CALLER_INJECTION_ENV) .ok() .map(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1")) @@ -776,7 +774,9 @@ pub(crate) async fn execute_with_session_and_meta_with_parent_source( transcript_artifacts, changed_paths, pre_exec_snapshot, - has_tool_calls, + has_tool_calls: transport_result.metadata.has_tool_calls + || transport_result.metadata.has_execute_tool_calls, + turn_count: transport_result.metadata.turn_count, sa_mode, }; if let Err(err) = diff --git a/crates/cli-sub-agent/src/pipeline_tests_no_op_gate.rs b/crates/cli-sub-agent/src/pipeline_tests_no_op_gate.rs index c5eb9172..41164c3a 100644 --- a/crates/cli-sub-agent/src/pipeline_tests_no_op_gate.rs +++ b/crates/cli-sub-agent/src/pipeline_tests_no_op_gate.rs @@ -35,6 +35,7 @@ fn build_test_ctx<'a>( changed_paths: vec![], pre_exec_snapshot: None, has_tool_calls, + turn_count: 0, sa_mode, } } diff --git a/crates/cli-sub-agent/src/pipeline_tests_no_progress.rs b/crates/cli-sub-agent/src/pipeline_tests_no_progress.rs index d16660a5..e626733d 100644 --- a/crates/cli-sub-agent/src/pipeline_tests_no_progress.rs +++ b/crates/cli-sub-agent/src/pipeline_tests_no_progress.rs @@ -32,6 +32,7 @@ fn build_test_ctx<'a>( changed_paths: vec![], pre_exec_snapshot: None, has_tool_calls: true, + turn_count: 0, sa_mode: false, } } diff --git a/crates/cli-sub-agent/src/pipeline_tests_post_exec.rs b/crates/cli-sub-agent/src/pipeline_tests_post_exec.rs index 03881b4d..8cfdc2e9 100644 --- a/crates/cli-sub-agent/src/pipeline_tests_post_exec.rs +++ b/crates/cli-sub-agent/src/pipeline_tests_post_exec.rs @@ -270,6 +270,7 @@ async fn process_execution_result_respects_vcs_auto_snapshot_gate_for_colocated_ changed_paths: changed_paths.clone(), pre_exec_snapshot: None, has_tool_calls: true, + turn_count: 0, sa_mode: false, }; let mut disabled_result = csa_process::ExecutionResult { @@ -312,6 +313,7 @@ async fn process_execution_result_respects_vcs_auto_snapshot_gate_for_colocated_ changed_paths, pre_exec_snapshot: None, has_tool_calls: true, + turn_count: 0, sa_mode: false, }; let mut enabled_result = csa_process::ExecutionResult { @@ -513,6 +515,7 @@ auto_capture = true changed_paths: vec![], pre_exec_snapshot: None, has_tool_calls: true, + turn_count: 0, sa_mode: false, }; let mut result = csa_process::ExecutionResult { diff --git a/crates/cli-sub-agent/src/pipeline_tests_post_exec_audit.rs b/crates/cli-sub-agent/src/pipeline_tests_post_exec_audit.rs index 91c94c7f..3f3224c1 100644 --- a/crates/cli-sub-agent/src/pipeline_tests_post_exec_audit.rs +++ b/crates/cli-sub-agent/src/pipeline_tests_post_exec_audit.rs @@ -203,6 +203,7 @@ fn pre_execution_audit_baseline_returns_none_for_legacy_sessions_without_snapsho changed_paths: vec![], pre_exec_snapshot: None, has_tool_calls: false, + turn_count: 0, sa_mode: false, }; @@ -269,6 +270,7 @@ fn pre_execution_audit_baseline_prefers_per_execution_snapshot() { porcelain: Some(" M fresh.txt\0".to_string()), }), has_tool_calls: false, + turn_count: 0, sa_mode: false, }; @@ -362,6 +364,7 @@ fn audit_failure_does_not_fail_execution() { changed_paths: vec![], pre_exec_snapshot: None, has_tool_calls: false, + turn_count: 0, sa_mode: false, }; let session = MetaSessionState { @@ -484,6 +487,7 @@ fn reused_session_audit_uses_per_execution_baseline_not_session_creation() { changed_paths: vec![], pre_exec_snapshot: Some(turn_two_snapshot), has_tool_calls: false, + turn_count: 0, sa_mode: false, }; let session = MetaSessionState { @@ -585,6 +589,7 @@ fn first_execution_falls_back_to_session_creation_baseline_when_per_exec_capture changed_paths: vec![], pre_exec_snapshot: None, has_tool_calls: false, + turn_count: 0, sa_mode: false, }; let session = MetaSessionState { diff --git a/crates/csa-acp/src/client.rs b/crates/csa-acp/src/client.rs index 14876fc7..18e2aa3b 100644 --- a/crates/csa-acp/src/client.rs +++ b/crates/csa-acp/src/client.rs @@ -23,6 +23,12 @@ const MAX_EXTRACTED_COMMANDS: usize = 100; pub struct StreamingMetadata { /// Total number of events seen across the entire prompt turn, including dropped events. pub total_events_count: usize, + /// Number of agent conversation turns observed (one per `AgentMessage`). + /// + /// Mirrors `csa_core::transport_events::StreamingMetadata::turn_count`; copied + /// through `convert_acp_metadata` in the executor crate. See that field's + /// docstring for the legacy fallback contract. + pub turn_count: u32, /// Whether any `ToolCallStarted` event was observed. pub has_tool_calls: bool, /// Whether any execute `ToolCallStarted` event was observed. @@ -60,6 +66,7 @@ pub struct StreamingMetadata { impl StreamingMetadata { pub(crate) fn sync_from_store(&mut self, store: &SessionEventStore) { self.total_events_count = store.total_events_count(); + self.turn_count = store.turn_count(); self.has_tool_calls = store.has_tool_calls(); self.has_execute_tool_calls = store.has_execute_tool_calls(); self.has_no_verify_commit = store.has_no_verify_commit(); @@ -143,6 +150,7 @@ pub(crate) fn event_counts_as_initial_response(event: &SessionEvent) -> bool { pub(crate) struct SessionEventStore { events: VecDeque, total_events_count: usize, + turn_count: u32, has_tool_calls: bool, has_execute_tool_calls: bool, has_no_verify_commit: bool, @@ -191,6 +199,10 @@ impl SessionEventStore { self.total_events_count } + pub(crate) fn turn_count(&self) -> u32 { + self.turn_count + } + pub(crate) fn has_tool_calls(&self) -> bool { self.has_tool_calls } @@ -229,8 +241,10 @@ impl SessionEventStore { SessionEvent::PlanUpdate(_) => { self.has_plan_updates = true; } - SessionEvent::AgentMessage(_) - | SessionEvent::AgentThought(_) + SessionEvent::AgentMessage(_) => { + self.turn_count = self.turn_count.saturating_add(1); + } + SessionEvent::AgentThought(_) | SessionEvent::ToolCallCompleted { .. } | SessionEvent::Other(_) => {} } diff --git a/crates/csa-acp/src/client/tests.rs b/crates/csa-acp/src/client/tests.rs index c232bc7e..b8838dfc 100644 --- a/crates/csa-acp/src/client/tests.rs +++ b/crates/csa-acp/src/client/tests.rs @@ -204,6 +204,43 @@ fn session_event_store_bounds_retained_events_and_metadata() { ); } +/// `SessionEventStore` MUST count every `AgentMessage` as one conversation +/// turn so the ACP transport can populate `StreamingMetadata.turn_count` +/// with real turn counts rather than `csa run` invocations (#1438). The +/// count must survive ring-buffer eviction (it is metadata, not an event). +#[test] +fn session_event_store_turn_count_tracks_agent_messages_and_survives_eviction() { + let mut store = SessionEventStore::default(); + for i in 0..3 { + store.push(SessionEvent::AgentMessage(format!("turn-{i}"))); + store.push(SessionEvent::ToolCallStarted { + id: format!("call-{i}"), + title: format!("cmd-{i}"), + kind: "Execute".to_string(), + }); + store.push(SessionEvent::ToolCallCompleted { + id: format!("call-{i}"), + status: "success".to_string(), + }); + } + store.push(SessionEvent::AgentThought("internal".to_string())); + assert_eq!( + store.turn_count(), + 3, + "three AgentMessage events => three observed turns; thought/tool events do not count" + ); + + for i in 0..(MAX_RETAINED_EVENTS + 25) { + store.push(SessionEvent::AgentMessage(format!("flood-{i}"))); + } + let expected_turns = 3 + (MAX_RETAINED_EVENTS as u32) + 25; + assert_eq!( + store.turn_count(), + expected_turns, + "turn_count is metadata; ring-buffer eviction must not lose it" + ); +} + #[test] fn session_event_store_keeps_no_verify_sticky_after_command_ring_eviction() { let mut store = SessionEventStore::default(); diff --git a/crates/csa-core/src/transport_events.rs b/crates/csa-core/src/transport_events.rs index 045592c1..d733baca 100644 --- a/crates/csa-core/src/transport_events.rs +++ b/crates/csa-core/src/transport_events.rs @@ -3,6 +3,15 @@ pub struct StreamingMetadata { /// Total number of events seen across the entire prompt turn, including dropped events. pub total_events_count: usize, + /// Number of agent conversation turns observed in this prompt run. + /// + /// Counted by transports as the number of `AgentMessage` events emitted by + /// the underlying tool. A single `csa run` invocation may span many turns + /// when the agent reads tool results, deliberates, and responds again. + /// Transports that do not parse streaming events leave this at `0`; callers + /// must treat `0` as "unknown" and fall back to a `+= 1` increment for + /// `MetaSessionState.turn_count` to preserve the legacy counting contract. + pub turn_count: u32, /// Whether any `ToolCallStarted` event was observed. pub has_tool_calls: bool, /// Whether any execute `ToolCallStarted` event was observed. diff --git a/crates/csa-executor/src/transport.rs b/crates/csa-executor/src/transport.rs index 84c5cba6..51415dde 100644 --- a/crates/csa-executor/src/transport.rs +++ b/crates/csa-executor/src/transport.rs @@ -741,6 +741,7 @@ fn convert_acp_metadata( ) -> csa_core::transport_events::StreamingMetadata { csa_core::transport_events::StreamingMetadata { total_events_count: metadata.total_events_count, + turn_count: metadata.turn_count, has_tool_calls: metadata.has_tool_calls, has_execute_tool_calls: metadata.has_execute_tool_calls, has_no_verify_commit: metadata.has_no_verify_commit, diff --git a/crates/csa-executor/src/transport_cli.rs b/crates/csa-executor/src/transport_cli.rs index e74ca2c0..d2358dfa 100644 --- a/crates/csa-executor/src/transport_cli.rs +++ b/crates/csa-executor/src/transport_cli.rs @@ -562,6 +562,7 @@ fn parse_stream_json(buffer: &str) -> StreamParseResult { } SessionEvent::AgentMessage(text) => { result.metadata.message_text.push_str(text); + result.metadata.turn_count = result.metadata.turn_count.saturating_add(1); } SessionEvent::AgentThought(text) => { result.metadata.thought_text.push_str(text); diff --git a/crates/csa-executor/src/transport_cli_tests.rs b/crates/csa-executor/src/transport_cli_tests.rs index 41042a0f..b442a6d8 100644 --- a/crates/csa-executor/src/transport_cli_tests.rs +++ b/crates/csa-executor/src/transport_cli_tests.rs @@ -358,6 +358,42 @@ fn parse_stream_json_happy_path_emits_events() { assert!(parsed.metadata.has_execute_tool_calls); assert_eq!(parsed.metadata.total_events_count, 5); assert_eq!(parsed.metadata.message_text, "Hello"); + assert_eq!( + parsed.metadata.turn_count, 1, + "one assistant envelope => one observed turn (#1438)" + ); +} + +/// `parse_stream_json` MUST count every `assistant` envelope as one +/// observed conversation turn so `MetaSessionState.turn_count` reflects +/// actual turns rather than `csa run` invocations (#1438). Before the +/// fix, multi-turn streams (read tool result → think → respond again) +/// left `turn_count` at `0` and downstream pipeline added a single +/// `+= 1` per invocation regardless of how many turns the agent took. +#[test] +fn parse_stream_json_counts_each_assistant_message_as_turn() { + let stream = concat!( + r#"{"type":"system","session_id":"sess-multi","subtype":"init"}"#, + "\n", + r#"{"type":"assistant","session_id":"sess-multi","message":{"content":[{"type":"text","text":"first"}]}}"#, + "\n", + r#"{"type":"tool_use","session_id":"sess-multi","tool_use_id":"tu-1","name":"Read"}"#, + "\n", + r#"{"type":"tool_result","session_id":"sess-multi","tool_use_id":"tu-1","status":"success"}"#, + "\n", + r#"{"type":"assistant","session_id":"sess-multi","message":{"content":[{"type":"text","text":"second"}]}}"#, + "\n", + r#"{"type":"assistant","session_id":"sess-multi","message":{"content":[{"type":"text","text":"third"}]}}"#, + "\n", + r#"{"type":"result","session_id":"sess-multi","subtype":"final"}"#, + "\n", + ); + let parsed = parse_stream_json(stream); + assert_eq!( + parsed.metadata.turn_count, 3, + "three assistant envelopes => three observed turns" + ); + assert_eq!(parsed.metadata.message_text, "firstsecondthird"); } #[test]