Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ default-members = [
resolver = "2"

[workspace.package]
version = "0.1.736"
version = "0.1.737"
edition = "2024"
rust-version = "1.88"
license = "Apache-2.0"
Expand Down
12 changes: 10 additions & 2 deletions crates/cli-sub-agent/src/pipeline_post_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ pub(crate) struct PostExecContext<'a> {
pub pre_exec_snapshot: Option<PreExecutionSnapshot>,
/// Whether the transport observed any tool calls during execution.
pub has_tool_calls: bool,
/// Number of agent conversation turns observed in this run (one per
/// `AgentMessage` event). `0` means the transport did not parse streaming
/// events; `process_execution_result` falls back to `+= 1` to preserve the
/// legacy "one increment per csa run" semantics.
pub turn_count: u32,
/// Whether this session is running in SA (sub-agent / autonomous) mode.
pub sa_mode: bool,
}
Expand Down Expand Up @@ -109,8 +114,11 @@ pub(crate) async fn process_execution_result(
}
}

// Increment turn count
session.turn_count += 1;
// Increment turn count. Transports that parse streaming events (claude-code
// CLI, codex/gemini ACP) populate `ctx.turn_count` with the number of
// observed agent conversation turns; legacy transports leave it at `0` and
// fall back to the historical `+= 1` per-invocation contract (#1438).
session.turn_count = session.turn_count.saturating_add(ctx.turn_count.max(1));

// Update cumulative token usage
update_cumulative_tokens(session, token_usage);
Expand Down
6 changes: 3 additions & 3 deletions crates/cli-sub-agent/src/pipeline_session_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,8 +751,6 @@ pub(crate) async fn execute_with_session_and_meta_with_parent_source(
execute_events_observed,
);
}
let has_tool_calls = transport_result.metadata.has_tool_calls
|| transport_result.metadata.has_execute_tool_calls;
let sa_mode = std::env::var(crate::pipeline::prompt_guard::PROMPT_GUARD_CALLER_INJECTION_ENV)
.ok()
.map(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1"))
Expand All @@ -776,7 +774,9 @@ pub(crate) async fn execute_with_session_and_meta_with_parent_source(
transcript_artifacts,
changed_paths,
pre_exec_snapshot,
has_tool_calls,
has_tool_calls: transport_result.metadata.has_tool_calls
|| transport_result.metadata.has_execute_tool_calls,
turn_count: transport_result.metadata.turn_count,
sa_mode,
};
if let Err(err) =
Expand Down
1 change: 1 addition & 0 deletions crates/cli-sub-agent/src/pipeline_tests_no_op_gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ fn build_test_ctx<'a>(
changed_paths: vec![],
pre_exec_snapshot: None,
has_tool_calls,
turn_count: 0,
sa_mode,
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/cli-sub-agent/src/pipeline_tests_no_progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ fn build_test_ctx<'a>(
changed_paths: vec![],
pre_exec_snapshot: None,
has_tool_calls: true,
turn_count: 0,
sa_mode: false,
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/cli-sub-agent/src/pipeline_tests_post_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ async fn process_execution_result_respects_vcs_auto_snapshot_gate_for_colocated_
changed_paths: changed_paths.clone(),
pre_exec_snapshot: None,
has_tool_calls: true,
turn_count: 0,
sa_mode: false,
};
let mut disabled_result = csa_process::ExecutionResult {
Expand Down Expand Up @@ -312,6 +313,7 @@ async fn process_execution_result_respects_vcs_auto_snapshot_gate_for_colocated_
changed_paths,
pre_exec_snapshot: None,
has_tool_calls: true,
turn_count: 0,
sa_mode: false,
};
let mut enabled_result = csa_process::ExecutionResult {
Expand Down Expand Up @@ -513,6 +515,7 @@ auto_capture = true
changed_paths: vec![],
pre_exec_snapshot: None,
has_tool_calls: true,
turn_count: 0,
sa_mode: false,
};
let mut result = csa_process::ExecutionResult {
Expand Down
5 changes: 5 additions & 0 deletions crates/cli-sub-agent/src/pipeline_tests_post_exec_audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ fn pre_execution_audit_baseline_returns_none_for_legacy_sessions_without_snapsho
changed_paths: vec![],
pre_exec_snapshot: None,
has_tool_calls: false,
turn_count: 0,
sa_mode: false,
};

Expand Down Expand Up @@ -269,6 +270,7 @@ fn pre_execution_audit_baseline_prefers_per_execution_snapshot() {
porcelain: Some(" M fresh.txt\0".to_string()),
}),
has_tool_calls: false,
turn_count: 0,
sa_mode: false,
};

Expand Down Expand Up @@ -362,6 +364,7 @@ fn audit_failure_does_not_fail_execution() {
changed_paths: vec![],
pre_exec_snapshot: None,
has_tool_calls: false,
turn_count: 0,
sa_mode: false,
};
let session = MetaSessionState {
Expand Down Expand Up @@ -484,6 +487,7 @@ fn reused_session_audit_uses_per_execution_baseline_not_session_creation() {
changed_paths: vec![],
pre_exec_snapshot: Some(turn_two_snapshot),
has_tool_calls: false,
turn_count: 0,
sa_mode: false,
};
let session = MetaSessionState {
Expand Down Expand Up @@ -585,6 +589,7 @@ fn first_execution_falls_back_to_session_creation_baseline_when_per_exec_capture
changed_paths: vec![],
pre_exec_snapshot: None,
has_tool_calls: false,
turn_count: 0,
sa_mode: false,
};
let session = MetaSessionState {
Expand Down
18 changes: 16 additions & 2 deletions crates/csa-acp/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ const MAX_EXTRACTED_COMMANDS: usize = 100;
pub struct StreamingMetadata {
/// Total number of events seen across the entire prompt turn, including dropped events.
pub total_events_count: usize,
/// Number of agent conversation turns observed (one per `AgentMessage`).
///
/// Mirrors `csa_core::transport_events::StreamingMetadata::turn_count`; copied
/// through `convert_acp_metadata` in the executor crate. See that field's
/// docstring for the legacy fallback contract.
pub turn_count: u32,
/// Whether any `ToolCallStarted` event was observed.
pub has_tool_calls: bool,
/// Whether any execute `ToolCallStarted` event was observed.
Expand Down Expand Up @@ -60,6 +66,7 @@ pub struct StreamingMetadata {
impl StreamingMetadata {
pub(crate) fn sync_from_store(&mut self, store: &SessionEventStore) {
self.total_events_count = store.total_events_count();
self.turn_count = store.turn_count();
self.has_tool_calls = store.has_tool_calls();
self.has_execute_tool_calls = store.has_execute_tool_calls();
self.has_no_verify_commit = store.has_no_verify_commit();
Expand Down Expand Up @@ -143,6 +150,7 @@ pub(crate) fn event_counts_as_initial_response(event: &SessionEvent) -> bool {
pub(crate) struct SessionEventStore {
events: VecDeque<SessionEvent>,
total_events_count: usize,
turn_count: u32,
has_tool_calls: bool,
has_execute_tool_calls: bool,
has_no_verify_commit: bool,
Expand Down Expand Up @@ -191,6 +199,10 @@ impl SessionEventStore {
self.total_events_count
}

pub(crate) fn turn_count(&self) -> u32 {
self.turn_count
}

pub(crate) fn has_tool_calls(&self) -> bool {
self.has_tool_calls
}
Expand Down Expand Up @@ -229,8 +241,10 @@ impl SessionEventStore {
SessionEvent::PlanUpdate(_) => {
self.has_plan_updates = true;
}
SessionEvent::AgentMessage(_)
| SessionEvent::AgentThought(_)
SessionEvent::AgentMessage(_) => {
self.turn_count = self.turn_count.saturating_add(1);
}
Comment on lines +244 to +246
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The turn_count is incremented for every AgentMessage event. In the ACP transport, AgentMessage events are generated from AgentMessageChunk updates (see line 312). This means turn_count will reflect the number of streaming chunks received rather than the number of conversation turns. This over-counting will cause the "no-op exit gate" in pipeline_post_exec.rs (which checks session.turn_count <= 1) to be bypassed incorrectly even when no real work was performed. Consider only incrementing the count when a new message sequence begins.

Suggested change
SessionEvent::AgentMessage(_) => {
self.turn_count = self.turn_count.saturating_add(1);
}
SessionEvent::AgentMessage(_) => {
if !self.events.back().map_or(false, |e| matches!(e, SessionEvent::AgentMessage(_))) {
self.turn_count = self.turn_count.saturating_add(1);
}
}

SessionEvent::AgentThought(_)
| SessionEvent::ToolCallCompleted { .. }
| SessionEvent::Other(_) => {}
}
Expand Down
37 changes: 37 additions & 0 deletions crates/csa-acp/src/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,43 @@ fn session_event_store_bounds_retained_events_and_metadata() {
);
}

/// `SessionEventStore` MUST count every `AgentMessage` as one conversation
/// turn so the ACP transport can populate `StreamingMetadata.turn_count`
/// with real turn counts rather than `csa run` invocations (#1438). The
/// count must survive ring-buffer eviction (it is metadata, not an event).
#[test]
fn session_event_store_turn_count_tracks_agent_messages_and_survives_eviction() {
let mut store = SessionEventStore::default();
for i in 0..3 {
store.push(SessionEvent::AgentMessage(format!("turn-{i}")));
store.push(SessionEvent::ToolCallStarted {
id: format!("call-{i}"),
title: format!("cmd-{i}"),
kind: "Execute".to_string(),
});
store.push(SessionEvent::ToolCallCompleted {
id: format!("call-{i}"),
status: "success".to_string(),
});
}
store.push(SessionEvent::AgentThought("internal".to_string()));
assert_eq!(
store.turn_count(),
3,
"three AgentMessage events => three observed turns; thought/tool events do not count"
);

for i in 0..(MAX_RETAINED_EVENTS + 25) {
store.push(SessionEvent::AgentMessage(format!("flood-{i}")));
}
let expected_turns = 3 + (MAX_RETAINED_EVENTS as u32) + 25;
assert_eq!(
store.turn_count(),
expected_turns,
"turn_count is metadata; ring-buffer eviction must not lose it"
);
}

#[test]
fn session_event_store_keeps_no_verify_sticky_after_command_ring_eviction() {
let mut store = SessionEventStore::default();
Expand Down
9 changes: 9 additions & 0 deletions crates/csa-core/src/transport_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
pub struct StreamingMetadata {
/// Total number of events seen across the entire prompt turn, including dropped events.
pub total_events_count: usize,
/// Number of agent conversation turns observed in this prompt run.
///
/// Counted by transports as the number of `AgentMessage` events emitted by
/// the underlying tool. A single `csa run` invocation may span many turns
/// when the agent reads tool results, deliberates, and responds again.
/// Transports that do not parse streaming events leave this at `0`; callers
/// must treat `0` as "unknown" and fall back to a `+= 1` increment for
/// `MetaSessionState.turn_count` to preserve the legacy counting contract.
pub turn_count: u32,
/// Whether any `ToolCallStarted` event was observed.
pub has_tool_calls: bool,
/// Whether any execute `ToolCallStarted` event was observed.
Expand Down
1 change: 1 addition & 0 deletions crates/csa-executor/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ fn convert_acp_metadata(
) -> csa_core::transport_events::StreamingMetadata {
csa_core::transport_events::StreamingMetadata {
total_events_count: metadata.total_events_count,
turn_count: metadata.turn_count,
has_tool_calls: metadata.has_tool_calls,
has_execute_tool_calls: metadata.has_execute_tool_calls,
has_no_verify_commit: metadata.has_no_verify_commit,
Expand Down
Loading
Loading