From 2648674b89c35e0071d032e6f1300ee1e62ad3da Mon Sep 17 00:00:00 2001 From: yishuiliunian Date: Tue, 2 Jun 2026 09:56:51 +0800 Subject: [PATCH] fix(runtime): close turn-lifecycle cancellation contract gaps A real session showed the agent silently "hanging": the goal-continuation consistency gate ran on EVERY turn (not just continuations), swallowing real user input; turn cancellation had multiple entry points with no shared invariant; and the loop detector keyed only on tool input, so a tool re-reading a mutating path (ReadImage on an overwritten screenshot) was falsely aborted. Root causes addressed: - A. Continuation gate now keys on TurnTrigger::is_goal_continuation(), so non-continuation turns always reach the LLM. Stale continuations are skipped observably (ContinuationSkipped) and rewound by identity (current_turn_index), not by len-1 position. - B. All user-level cancellation funnels through finalize_turn_cancellation: pairs the open tool batch, resets continuation state, notifies governance on_turn_cancelled, emits TurnCancelled, ends the record. end_turn_record is pub(crate) so Cancelled construction is single-sourced; the two intentional bypasses (compaction host, governance abort) are documented. - C. LoopDetector counts CONSECUTIVE identical (input, output) in on_after_tools, hashing content + is_error + image content_key + metadata; resets on on_turn_cancelled / task boundary / compaction. A cancelled turn no longer emits both TurnCompleted and TurnCancelled, and no longer feeds the degeneration/loop detectors. Every fix carries unit + e2e regression coverage. --- crates/loopal-acp/src/translate/mod.rs | 2 + crates/loopal-context/src/turn_store/query.rs | 5 + .../src/turn_tracker/tool_batch.rs | 36 ++- crates/loopal-context/tests/suite.rs | 2 + .../tests/suite/cancel_open_batch_test.rs | 82 ++++++ .../tests/suite/turn_store_test.rs | 20 ++ .../prompts/tools/usage-policy.md | 2 + crates/loopal-protocol/src/event_payload.rs | 9 +- .../tests/suite/turn_projection_test.rs | 19 ++ .../src/anthropic/request_turns/to_json.rs | 1 + .../src/agent_loop/goal_consistency.rs | 53 +++- .../src/agent_loop/goal_control.rs | 2 +- .../src/agent_loop/governance/traits.rs | 19 ++ .../loopal-runtime/src/agent_loop/ingest.rs | 5 +- .../src/agent_loop/loop_detector.rs | 165 ++++++++---- crates/loopal-runtime/src/agent_loop/mod.rs | 1 + .../src/agent_loop/resume_session.rs | 2 +- crates/loopal-runtime/src/agent_loop/run.rs | 17 +- .../src/agent_loop/turn_cancel_finalize.rs | 62 +++++ .../src/agent_loop/turn_exec.rs | 9 +- .../src/agent_loop/turn_observer_dispatch.rs | 1 + .../src/agent_loop/turn_record.rs | 14 ++ .../src/agent_loop/turn_telemetry.rs | 54 ++-- .../src/agent_loop/turn_tool_phase.rs | 8 + .../agent_loop/cancel_finalize_e2e_test.rs | 148 +++++++++++ .../agent_loop/continuation_bypass_test.rs | 40 +++ .../agent_loop/governance_cancel_e2e_test.rs | 141 +++++++++++ crates/loopal-runtime/tests/agent_loop/mod.rs | 3 + crates/loopal-runtime/tests/suite.rs | 2 + .../tests/suite/loop_detector_digest_test.rs | 157 ++++++++++++ .../tests/suite/loop_detector_edge_test.rs | 163 ++++++------ .../tests/suite/loop_detector_test.rs | 237 +++++++----------- crates/loopal-session/src/event_handler.rs | 2 + crates/loopal-tool-invocation/src/image.rs | 10 + crates/loopal-turn/src/step.rs | 2 + crates/loopal-turn/src/turn.rs | 9 + crates/loopal-turn/tests/suite.rs | 2 + .../tests/suite/turn_trigger_test.rs | 47 ++++ crates/loopal-view-state/src/mutators/mod.rs | 2 + 39 files changed, 1259 insertions(+), 296 deletions(-) create mode 100644 crates/loopal-context/tests/suite/cancel_open_batch_test.rs create mode 100644 crates/loopal-runtime/src/agent_loop/turn_cancel_finalize.rs create mode 100644 crates/loopal-runtime/tests/agent_loop/cancel_finalize_e2e_test.rs create mode 100644 crates/loopal-runtime/tests/agent_loop/continuation_bypass_test.rs create mode 100644 crates/loopal-runtime/tests/agent_loop/governance_cancel_e2e_test.rs create mode 100644 crates/loopal-runtime/tests/suite/loop_detector_digest_test.rs create mode 100644 crates/loopal-turn/tests/suite/turn_trigger_test.rs diff --git a/crates/loopal-acp/src/translate/mod.rs b/crates/loopal-acp/src/translate/mod.rs index 1e6d3791..878cb078 100644 --- a/crates/loopal-acp/src/translate/mod.rs +++ b/crates/loopal-acp/src/translate/mod.rs @@ -170,6 +170,8 @@ pub fn translate_event(payload: &AgentEventPayload, session_id: &str) -> Option< | AgentEventPayload::HubDegraded { .. } | AgentEventPayload::HubRecovered { .. } | AgentEventPayload::DegenerationDetected(_) + | AgentEventPayload::ContinuationSkipped { .. } + | AgentEventPayload::TurnCancelled { .. } | AgentEventPayload::ContinuationGateChanged(_) => None, AgentEventPayload::ToolPermissionResolved { id } => Some(AcpNotification::Extension { method: "_loopal/permission_resolved".into(), diff --git a/crates/loopal-context/src/turn_store/query.rs b/crates/loopal-context/src/turn_store/query.rs index 5ac3db3e..033ed1fa 100644 --- a/crates/loopal-context/src/turn_store/query.rs +++ b/crates/loopal-context/src/turn_store/query.rs @@ -18,6 +18,11 @@ impl TurnStore { self.current_turn_id.as_ref() } + pub fn current_turn_index(&self) -> Option { + let id = self.current_turn_id.as_ref()?; + self.turns.iter().position(|t| &t.id == id) + } + pub fn len(&self) -> usize { self.turns.len() } diff --git a/crates/loopal-context/src/turn_tracker/tool_batch.rs b/crates/loopal-context/src/turn_tracker/tool_batch.rs index ff3113fa..a51d5db8 100644 --- a/crates/loopal-context/src/turn_tracker/tool_batch.rs +++ b/crates/loopal-context/src/turn_tracker/tool_batch.rs @@ -1,4 +1,4 @@ -use loopal_turn::{ToolExecState, TurnEvent, TurnStep}; +use loopal_turn::{CancelCause, ToolExecState, TurnEvent, TurnStep}; use super::TurnTracker; use super::error::TurnTrackerError; @@ -13,6 +13,40 @@ impl TurnTracker { self.current_tool_batch_step = None; } + // Cancel every not-yet-terminal item in the open ToolBatch, then close it. + // reason: turn-level cancellation (parent abort) leaves spawned tools in + // Pending/Running; without this the wire serializes them as the bogus + // "Pending — runtime invariant violated" tool_result. + pub fn cancel_open_tool_batch(&mut self, cause: CancelCause, logger: &dyn TurnEventLogger) { + let Some(step_index) = self.current_tool_batch_step else { + return; + }; + let pending: Vec = self + .store + .current_turn() + .and_then(|t| t.body.steps.get(step_index as usize)) + .and_then(|s| match s { + TurnStep::ToolBatch(b) => Some(b), + _ => None, + }) + .map(|b| { + b.items + .iter() + .enumerate() + .filter(|(_, it)| { + matches!(it.state, ToolExecState::Pending | ToolExecState::Running) + }) + .map(|(i, _)| i as u32) + .collect() + }) + .unwrap_or_default(); + for idx in pending { + let _ = + self.try_update_tool_state(idx, ToolExecState::Cancelled(cause.clone()), logger); + } + self.close_tool_batch(); + } + pub fn try_update_tool_state( &mut self, item_index: u32, diff --git a/crates/loopal-context/tests/suite.rs b/crates/loopal-context/tests/suite.rs index 499f325c..28dff2d4 100644 --- a/crates/loopal-context/tests/suite.rs +++ b/crates/loopal-context/tests/suite.rs @@ -1,4 +1,6 @@ // Single test binary — includes all test modules +#[path = "suite/cancel_open_batch_test.rs"] +mod cancel_open_batch_test; #[path = "suite/compaction_pair_test.rs"] mod compaction_pair_test; #[path = "suite/file_snapshot_test.rs"] diff --git a/crates/loopal-context/tests/suite/cancel_open_batch_test.rs b/crates/loopal-context/tests/suite/cancel_open_batch_test.rs new file mode 100644 index 00000000..71f039fb --- /dev/null +++ b/crates/loopal-context/tests/suite/cancel_open_batch_test.rs @@ -0,0 +1,82 @@ +use loopal_context::{ContextBudget, PersistError, TurnEventLogger, TurnStore, TurnTracker}; +use loopal_turn::{ + CancelCause, OrderedToolBatch, ToolBatchItem, ToolCall, ToolCallId, ToolExecState, TurnEvent, + TurnStep, TurnTrigger, +}; + +struct InMemoryLogger; +impl TurnEventLogger for InMemoryLogger { + fn persist(&self, _event: &TurnEvent) -> Result<(), PersistError> { + Ok(()) + } +} + +fn budget() -> ContextBudget { + ContextBudget { + context_window: 200_000, + system_tokens: 0, + tool_tokens: 0, + output_reserve: 16_384, + safety_margin: 10_000, + message_budget: 173_616, + max_output_tokens: 64_000, + } +} + +fn pending_item(id: &str) -> ToolBatchItem { + ToolBatchItem { + call: ToolCall { + id: ToolCallId::new(id), + name: "Bash".into(), + input: serde_json::Value::Null, + }, + state: ToolExecState::Pending, + } +} + +#[test] +fn cancel_open_tool_batch_marks_all_pending_as_cancelled() { + let mut t = TurnTracker::new(TurnStore::new(), budget()); + t.try_start_turn( + TurnTrigger::UserInput { + envelope_id: "e".into(), + content: "hi".into(), + images: Vec::new(), + }, + &InMemoryLogger, + ) + .unwrap(); + let step_index = t + .try_append_step( + TurnStep::ToolBatch(OrderedToolBatch { + items: vec![pending_item("t1"), pending_item("t2")], + }), + &InMemoryLogger, + ) + .unwrap(); + t.mark_tool_batch_open(step_index); + + t.cancel_open_tool_batch(CancelCause::CrashRecovery, &InMemoryLogger); + + let turn = t.store().current_turn().unwrap(); + let TurnStep::ToolBatch(b) = &turn.body.steps[step_index as usize] else { + panic!("expected ToolBatch step"); + }; + for item in &b.items { + assert!( + matches!( + item.state, + ToolExecState::Cancelled(CancelCause::CrashRecovery) + ), + "pending item must become Cancelled, got {:?}", + item.state + ); + } +} + +#[test] +fn cancel_open_tool_batch_is_noop_without_open_batch() { + let mut t = TurnTracker::new(TurnStore::new(), budget()); + // no open batch → must not panic + t.cancel_open_tool_batch(CancelCause::UserInterrupt, &InMemoryLogger); +} diff --git a/crates/loopal-context/tests/suite/turn_store_test.rs b/crates/loopal-context/tests/suite/turn_store_test.rs index 8686b3ab..e7f86b65 100644 --- a/crates/loopal-context/tests/suite/turn_store_test.rs +++ b/crates/loopal-context/tests/suite/turn_store_test.rs @@ -55,6 +55,26 @@ fn tracker() -> TurnTracker { TurnTracker::new(TurnStore::new(), budget()) } +#[test] +fn current_turn_index_points_at_in_progress_turn() { + let mut t = tracker(); + for i in 0..3 { + t.try_start_turn( + TurnTrigger::UserInput { + envelope_id: format!("env-{i}"), + content: format!("msg-{i}"), + images: Vec::new(), + }, + &InMemoryLogger, + ) + .unwrap(); + t.end_turn(TurnOutcome::Complete, &InMemoryLogger).unwrap(); + } + assert!(t.store().current_turn_index().is_none()); + t.try_start_turn(user_trigger(), &InMemoryLogger).unwrap(); + assert_eq!(t.store().current_turn_index(), Some(3)); +} + #[test] fn empty_store_has_no_current_turn() { let t = tracker(); diff --git a/crates/loopal-prompt-system/prompts/tools/usage-policy.md b/crates/loopal-prompt-system/prompts/tools/usage-policy.md index a25f1880..3eb7a352 100644 --- a/crates/loopal-prompt-system/prompts/tools/usage-policy.md +++ b/crates/loopal-prompt-system/prompts/tools/usage-policy.md @@ -35,3 +35,5 @@ If unsure whether a dedicated tool exists, use the dedicated tool — do NOT fal ## Parallel Calls You can call multiple tools in a single response. If you intend to call multiple tools and there are no dependencies between them, make all independent tool calls in parallel. Maximize use of parallel tool calls where possible to increase efficiency. However, if some tool calls depend on previous calls to inform dependent values, do NOT call these tools in parallel and instead call them sequentially. For instance, if one operation must complete before another starts, run these operations sequentially instead. + +A parallel batch has NO ordering guarantee. In particular, when you Write/Edit a file and then need a Bash command that reads, executes, or `chmod`s that SAME file, these are dependent — put the Bash command in a LATER turn, never in the same batch as the Write. Otherwise the Bash command may run before the file exists and fail (e.g. `chmod: No such file or directory`). diff --git a/crates/loopal-protocol/src/event_payload.rs b/crates/loopal-protocol/src/event_payload.rs index 74523651..e0c6fa44 100644 --- a/crates/loopal-protocol/src/event_payload.rs +++ b/crates/loopal-protocol/src/event_payload.rs @@ -14,10 +14,7 @@ use crate::task_snapshot::TaskSnapshot; use crate::thread_goal::{GoalTransitionReason, ThreadGoal}; /// Event payload. Runner/LLM/Tools only construct this enum. -/// -/// `#[rustfmt::skip]` keeps single-field variants on one line so the file -/// fits the 200-line budget; expanded form (`{ text: String }` on three -/// lines) would push it well past with no readability gain. +/// `#[rustfmt::skip]` keeps single-field variants on one line (200-line budget). #[rustfmt::skip] #[derive(Debug, Clone, Serialize, Deserialize)] pub enum AgentEventPayload { @@ -196,4 +193,8 @@ pub enum AgentEventPayload { DegenerationDetected(DegenerationSummary), /// Continuation gate opened or closed. Drives UI status indicators. ContinuationGateChanged(ContinuationGateSummary), + /// A goal-continuation turn was skipped (goal changed) — keeps the skip observable, not silent. + ContinuationSkipped { reason: String }, + /// A turn was cancelled (parent abort / governance / interrupt). `cause` is the rendered CancelledCause. + TurnCancelled { cause: String }, } diff --git a/crates/loopal-provider-api/tests/suite/turn_projection_test.rs b/crates/loopal-provider-api/tests/suite/turn_projection_test.rs index 15b5e742..4dab1fc7 100644 --- a/crates/loopal-provider-api/tests/suite/turn_projection_test.rs +++ b/crates/loopal-provider-api/tests/suite/turn_projection_test.rs @@ -74,6 +74,25 @@ fn user_trigger_emits_human_user_message() { assert!(matches!(msgs[0].origin, Some(MessageOrigin::Human))); } +#[test] +fn goal_continuation_trigger_projects_to_user_message() { + // The continuation skip-gate relies on this: a GoalContinuation turn + // projects to a User message, so goal_continuation_check (last_role != User) + // won't re-inject and loop after a skip. + let t = turn_with( + TurnTrigger::GoalContinuation { + envelope_id: "env-g".into(), + content: "keep going".into(), + }, + vec![], + ); + let msgs = project_turn_to_messages(&t); + assert_eq!(msgs.len(), 1); + // The skip-gate keys on view().last_role(); assert last (not first) to pin + // the actual load-bearing property even if projection grows more messages. + assert_eq!(msgs.last().unwrap().role, MessageRole::User); +} + #[test] fn cron_trigger_prefixed_with_scheduled() { let t = turn_with( diff --git a/crates/loopal-provider/src/anthropic/request_turns/to_json.rs b/crates/loopal-provider/src/anthropic/request_turns/to_json.rs index 2893a84e..8753d7e3 100644 --- a/crates/loopal-provider/src/anthropic/request_turns/to_json.rs +++ b/crates/loopal-provider/src/anthropic/request_turns/to_json.rs @@ -55,6 +55,7 @@ fn cancel_reason(c: &CancelCause) -> &'static str { CancelCause::GovernanceAbort => "Aborted by governance", CancelCause::CrashRecovery => "Cancelled (crash recovery)", CancelCause::Timeout => "Timed out", + CancelCause::ParentTurnAborted => "Cancelled (superseded by new input)", } } diff --git a/crates/loopal-runtime/src/agent_loop/goal_consistency.rs b/crates/loopal-runtime/src/agent_loop/goal_consistency.rs index c2347c74..099229fb 100644 --- a/crates/loopal-runtime/src/agent_loop/goal_consistency.rs +++ b/crates/loopal-runtime/src/agent_loop/goal_consistency.rs @@ -1,4 +1,11 @@ +use loopal_protocol::AgentEventPayload; +use loopal_provider_api::{ContinuationIntent, ContinuationReason}; +use tracing::warn; + use super::runner::AgentLoopRunner; +use super::turn_context::TurnContext; + +const CONTINUATION_SKIPPED_REASON: &str = "goal changed before continuation turn started"; impl AgentLoopRunner { pub(super) async fn continuation_still_consistent(&self) -> bool { @@ -12,11 +19,55 @@ impl AgentLoopRunner { }; let goal = match session.snapshot().await { Ok(Some(g)) => g, - _ => return false, + // Transient read error: no evidence the goal changed → stay + // consistent (don't skip) rather than drop the continuation. + Err(_) => return true, + // Goal genuinely gone → inconsistent. + Ok(None) => return false, }; if &goal.goal_id != goal_id { return false; } goal.status.participates_in_continuation() } + + // A goal-continuation turn whose goal changed before it started has nothing + // to do. Recovery retries (RecoveryRetry) are not continuations and are + // never skipped. Skipping ends the turn and returns true WITHOUT recording + // it into turn_history — so it cannot pollute degeneration/loop counters. + pub(super) async fn skip_stale_continuation_turn(&mut self, turn_ctx: &TurnContext) -> bool { + if matches!( + turn_ctx.pending_continuation, + Some(ContinuationIntent::AutoContinue { + reason: ContinuationReason::RecoveryRetry + }) + ) { + return false; + } + let is_continuation = self + .turns + .store() + .current_turn() + .is_some_and(|t| t.trigger.is_goal_continuation()); + if !is_continuation || self.continuation_still_consistent().await { + return false; + } + warn!("{CONTINUATION_SKIPPED_REASON}"); + self.reset_continuation_state(); + self.emit_cosmetic(AgentEventPayload::ContinuationSkipped { + reason: CONTINUATION_SKIPPED_REASON.into(), + }) + .await; + // Drop the stale turn by IDENTITY (its index), not position: the + // GoalContinuation trigger would otherwise project as a dangling + // "continue" User message into the wire context. Keying on the current + // turn's index (vs len-1) does not assume it is the last turn. rewind is + // event-sourced (survives resume). + let keep = match self.turns.store().current_turn_index() { + Some(idx) => idx, + None => return true, + }; + self.rewind_turns_record(keep); + true + } } diff --git a/crates/loopal-runtime/src/agent_loop/goal_control.rs b/crates/loopal-runtime/src/agent_loop/goal_control.rs index d14499a9..75bb0706 100644 --- a/crates/loopal-runtime/src/agent_loop/goal_control.rs +++ b/crates/loopal-runtime/src/agent_loop/goal_control.rs @@ -68,7 +68,7 @@ impl AgentLoopRunner { return Ok(false); } if is_clear { - self.last_continuation_goal_id = None; + self.reset_continuation_state(); } if kickoff_eligible { return self.goal_continuation_check().await; diff --git a/crates/loopal-runtime/src/agent_loop/governance/traits.rs b/crates/loopal-runtime/src/agent_loop/governance/traits.rs index 72f5cc91..fe2521a9 100644 --- a/crates/loopal-runtime/src/agent_loop/governance/traits.rs +++ b/crates/loopal-runtime/src/agent_loop/governance/traits.rs @@ -46,6 +46,19 @@ pub trait Governance: Send + Sync { // not pre-classify so each Governance can apply its own semantics. fn on_envelope_received(&mut self, _source: &MessageSource) {} + // Post-execution observation for decision-makers that need tool OUTPUT to + // decide (e.g. LoopDetector keys its signature on input+output so that a + // tool re-reading a mutating path — same args, different result — is not + // flagged as a loop). `results[i]` is index-matched to `tool_uses[i]`. + // Cannot veto (the batch already ran); it feeds the next on_before_tools. + fn on_after_tools( + &mut self, + _ctx: &mut TurnContext, + _tool_uses: &[(String, String, serde_json::Value)], + _results: &[ContentBlock], + ) { + } + // Compaction just rewrote earlier history: any cross-turn state derived // from the pre-compact conversation (e.g. signature counters that index // tool calls now absent from the store) is stale and must reset. @@ -54,6 +67,12 @@ pub trait Governance: Send + Sync { // cross-turn state. fn on_compact_completed(&mut self) {} + // A turn was cancelled (user interrupt / parent abort). Cross-turn state + // accrued from its batches is not a valid sample — a user interrupt should + // reset a loop streak rather than let it span the cancellation. Default + // no-op for governances whose state is not turn-scoped. + fn on_turn_cancelled(&mut self) {} + /// Inspect the completed turn alongside trailing history; default /// returns `PostTurnAction::None`. Cross-turn safety nets (degeneration /// detector, repetition guard) override this. diff --git a/crates/loopal-runtime/src/agent_loop/ingest.rs b/crates/loopal-runtime/src/agent_loop/ingest.rs index 8bcd9401..4a85e56a 100644 --- a/crates/loopal-runtime/src/agent_loop/ingest.rs +++ b/crates/loopal-runtime/src/agent_loop/ingest.rs @@ -27,9 +27,8 @@ impl AgentLoopRunner { } } if self.turns.current_turn_id().is_some() { - self.end_turn_record(loopal_turn::TurnOutcome::Cancelled { - cause: loopal_turn::CancelledCause::ParentTurnAborted, - }); + self.finalize_turn_cancellation(loopal_turn::CancelledCause::ParentTurnAborted) + .await; } let Some(_turn_id) = self.start_turn_record(envelope_to_trigger(env)) else { error!( diff --git a/crates/loopal-runtime/src/agent_loop/loop_detector.rs b/crates/loopal-runtime/src/agent_loop/loop_detector.rs index 4b421bca..19589df6 100644 --- a/crates/loopal-runtime/src/agent_loop/loop_detector.rs +++ b/crates/loopal-runtime/src/agent_loop/loop_detector.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; +use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; use loopal_protocol::MessageSource; +use loopal_provider_api::ContentBlock; use super::governance::traits::{Governance, Verdict}; use super::turn_context::TurnContext; @@ -9,14 +11,22 @@ use super::turn_context::TurnContext; const WARN_THRESHOLD: u32 = 3; const ABORT_THRESHOLD: u32 = 5; -/// Tracks tool call signatures and their cumulative occurrence count. +// Tracks, per (tool, input), how many CONSECUTIVE times it produced the SAME +// output. A tool re-reading a mutating path (e.g. ReadImage on a screenshot +// path that is overwritten every call — same args, fresh pixels each time) +// keeps resetting the streak and is never flagged. Only stationary repetition +// (same input → same output) accrues toward warn/abort. pub struct LoopDetector { - /// (signature → cumulative count across the turn) - signatures: HashMap, + repeats: HashMap, warn_threshold: u32, abort_threshold: u32, } +struct Repeat { + output: u64, + count: u32, +} + impl Default for LoopDetector { fn default() -> Self { Self::new() @@ -28,20 +38,15 @@ impl LoopDetector { Self::with_thresholds(WARN_THRESHOLD, ABORT_THRESHOLD) } - /// Create a detector with custom thresholds (from HarnessConfig). pub fn with_thresholds(warn: u32, abort: u32) -> Self { Self { - signatures: HashMap::new(), + repeats: HashMap::new(), warn_threshold: warn, abort_threshold: abort, } } } -// Resets cumulative tool-call signatures on task boundaries. Delegates the -// predicate to `MessageSource::is_task_boundary` (hot path, no allocation); -// SSOT parity with `MessageOrigin::is_task_boundary` enforced by -// `loopal-protocol/tests/suite/task_boundary_test.rs`. impl Governance for LoopDetector { fn on_before_tools( &mut self, @@ -49,67 +54,135 @@ impl Governance for LoopDetector { tool_uses: &[(String, String, serde_json::Value)], ) -> Verdict { let mut worst = Verdict::Continue; - for (_, name, input) in tool_uses { - let sig = tool_signature(name, input); - let count = self.signatures.entry(sig).or_insert(0); - *count += 1; - - if *count >= self.abort_threshold { + let count = self + .repeats + .get(&input_signature(name, input)) + .map(|r| r.count) + .unwrap_or(0); + if count >= self.abort_threshold { tracing::warn!(tool = name, count, "loop detected, aborting turn"); return Verdict::AbortTurn { reason: format!( - "Loop detected: tool '{name}' called {count} cumulative times \ - with similar arguments. Aborting to prevent waste.", + "Loop detected: tool '{name}' produced identical output \ + {count} consecutive times. Aborting to prevent waste.", ), feedback_to_model: format!( - "Your previous `{name}` call was aborted by the runtime loop \ - detector: the same call has been issued {count} times in this \ - thread with no new information. Stop retrying with the same \ - arguments. Either change strategy (different tool, different \ - inputs, or ask the user for help) or report what you've learned \ - so far and pause." + "Your `{name}` call was aborted by the runtime loop detector: \ + the same call returned identical output {count} times in a row \ + with no new information. Stop retrying with the same arguments. \ + Change strategy (different tool, different inputs, or ask the \ + user) or report what you've learned and pause." ), }; } - if *count >= self.warn_threshold { + if count >= self.warn_threshold { tracing::warn!(tool = name, count, "possible loop detected"); worst = Verdict::InjectWarning(format!( - "[WARNING: Tool '{name}' has been called {count} times with similar \ - arguments. You may be stuck in a loop. Try a different \ - approach or ask the user for help.]", + "[WARNING: Tool '{name}' returned identical output {count} times in a \ + row. You may be stuck in a loop. Try a different approach or ask the \ + user for help.]", )); } } - worst } + // Counting happens here (post-execution), not in on_before_tools, because + // the streak keys on OUTPUT. Consequence: if a tool batch aborts with a + // turn-level Err before this runs (execute_tools returns Err in + // turn_tool_phase), that batch is not counted — such error-loops are bounded + // by try_recover's retry caps / transition_error instead of by this detector. + fn on_after_tools( + &mut self, + _ctx: &mut TurnContext, + tool_uses: &[(String, String, serde_json::Value)], + results: &[ContentBlock], + ) { + let mut counted = std::collections::HashSet::new(); + for (id, name, input) in tool_uses { + let Some(out) = output_digest_for(results, id) else { + continue; + }; + let sig = input_signature(name, input); + if !counted.insert(sig) { + continue; + } + match self.repeats.get_mut(&sig) { + Some(r) if r.output == out => r.count += 1, + _ => { + self.repeats.insert( + sig, + Repeat { + output: out, + count: 1, + }, + ); + } + } + } + } + fn on_envelope_received(&mut self, source: &MessageSource) { if source.is_task_boundary() { - self.signatures.clear(); + self.repeats.clear(); } } - // Compaction discards earlier turns: the tool calls that fed our - // signature counter are no longer in the store, so the counter is - // measuring history that no longer exists. Reset to avoid carrying - // pre-compact counts into the post-compact window. + // Compaction discards earlier turns: the calls that fed our counter are no + // longer in the store, so the streak is measuring history that no longer + // exists. Reset to avoid carrying pre-compact counts forward. fn on_compact_completed(&mut self) { - self.signatures.clear(); + self.repeats.clear(); } + + // A user interrupt resets the loop streak: counts accrued before the + // cancellation must not span it into the next turn. + fn on_turn_cancelled(&mut self) { + self.repeats.clear(); + } +} + +fn input_signature(name: &str, input: &serde_json::Value) -> u64 { + let mut h = DefaultHasher::new(); + name.hash(&mut h); + // serde_json::Value::Object is a BTreeMap → deterministic serialization. + input.to_string().hash(&mut h); + h.finish() } -/// Build a stable signature from tool name + full input JSON. -/// -/// We hash the **entire** serialized JSON (not a byte prefix). Prefix-based -/// hashing collided when distinguishing fields sorted late in the JSON -/// (e.g. `to` in `SendMessage`) were pushed past the cutoff by long -/// earlier fields — causing legitimate fan-out calls to be flagged as -/// loops. `serde_json::Value::Object` uses `BTreeMap`, so full-JSON -/// serialization is deterministic across equivalent inputs. -fn tool_signature(name: &str, input: &serde_json::Value) -> String { - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - input.to_string().hash(&mut hasher); - format!("{name}|{:x}", hasher.finish()) +// Digest the tool_result paired (by id) with this call: content + error flag +// + image identity + metadata. Returns None when no matching result is present, +// so the caller skips the streak update (an absent result is NOT "identical +// output" — folding all missing results to one empty hash would falsely accrue +// loops). +fn output_digest_for(results: &[ContentBlock], id: &str) -> Option { + for b in results { + if let ContentBlock::ToolResult { + tool_use_id, + content, + is_error, + images, + metadata, + } = b + && tool_use_id == id + { + let mut h = DefaultHasher::new(); + content.hash(&mut h); + is_error.hash(&mut h); + for img in images { + img.media_type().hash(&mut h); + img.content_key().hash(&mut h); + } + // Metadata distinguishes results with identical content (e.g. a + // Write returning a fixed banner but a changing byte count). + if let Some(md) = metadata + && let Ok(s) = serde_json::to_string(md) + { + s.hash(&mut h); + } + return Some(h.finish()); + } + } + None } diff --git a/crates/loopal-runtime/src/agent_loop/mod.rs b/crates/loopal-runtime/src/agent_loop/mod.rs index 1dcbe391..dd5d4c82 100644 --- a/crates/loopal-runtime/src/agent_loop/mod.rs +++ b/crates/loopal-runtime/src/agent_loop/mod.rs @@ -63,6 +63,7 @@ mod tools_phase; pub(crate) mod tools_plan; mod tools_plan_exit; mod tools_resolve; +mod turn_cancel_finalize; pub mod turn_context; mod turn_exec; pub mod turn_history; diff --git a/crates/loopal-runtime/src/agent_loop/resume_session.rs b/crates/loopal-runtime/src/agent_loop/resume_session.rs index 9d44eabe..5e1901c1 100644 --- a/crates/loopal-runtime/src/agent_loop/resume_session.rs +++ b/crates/loopal-runtime/src/agent_loop/resume_session.rs @@ -40,7 +40,7 @@ impl AgentLoopRunner { self.pending_consumed_ids.clear(); self.turn_history.clear(); self.continuation_gate = super::continuation_gate::ContinuationGate::new(); - self.last_continuation_goal_id = None; + self.reset_continuation_state(); if let Some(goal_session) = self.params.goal_session.as_ref() && let Err(err) = goal_session .set_session_id(self.params.session.id.clone()) diff --git a/crates/loopal-runtime/src/agent_loop/run.rs b/crates/loopal-runtime/src/agent_loop/run.rs index 34f06e61..3c75dcf2 100644 --- a/crates/loopal-runtime/src/agent_loop/run.rs +++ b/crates/loopal-runtime/src/agent_loop/run.rs @@ -84,7 +84,11 @@ impl AgentLoopRunner { } self.turn_count += 1; if self.interrupt.take() { - self.emit_interrupted().await?; + self.collect_interrupted_turn().await?; + } else if self.turns.current_turn_id().is_some() { + // is_some guard: skip_stale_continuation_turn already + // rewound the turn, leaving no current turn to end. + self.end_turn_record(loopal_turn::TurnOutcome::Complete); } } Err(e) => { @@ -97,7 +101,7 @@ impl AgentLoopRunner { continue; } if self.interrupt.take() { - self.emit_interrupted().await?; + self.collect_interrupted_turn().await?; continue; } error!(error = %e, "LLM request failed"); @@ -178,6 +182,15 @@ impl AgentLoopRunner { self.emit(AgentEventPayload::Interrupted).await } + // finalize runs even if the Interrupted emit fails, so the turn is never + // left InProgress. + async fn collect_interrupted_turn(&mut self) -> Result<()> { + let emit_result = self.emit_interrupted().await; + self.finalize_turn_cancellation(loopal_turn::CancelledCause::UserInterrupt) + .await; + emit_result + } + pub async fn emit_inbox_consumed(&mut self) { let ids = std::mem::take(&mut self.pending_consumed_ids); for message_id in ids { diff --git a/crates/loopal-runtime/src/agent_loop/turn_cancel_finalize.rs b/crates/loopal-runtime/src/agent_loop/turn_cancel_finalize.rs new file mode 100644 index 00000000..6ddbce45 --- /dev/null +++ b/crates/loopal-runtime/src/agent_loop/turn_cancel_finalize.rs @@ -0,0 +1,62 @@ +use loopal_protocol::AgentEventPayload; +use loopal_turn::{CancelCause, CancelledCause, TurnOutcome}; + +use super::runner::AgentLoopRunner; + +impl AgentLoopRunner { + pub(super) async fn finalize_turn_cancellation(&mut self, cause: CancelledCause) { + if self.turns.current_turn_id().is_none() { + return; + } + self.cancel_open_tool_batch_record(cancelled_to_cancel_cause(&cause)); + self.reset_continuation_state(); + for g in &mut self.governance { + g.on_turn_cancelled(); + } + self.emit_cosmetic(AgentEventPayload::TurnCancelled { + cause: cancelled_cause_wire(&cause).into(), + }) + .await; + self.end_turn_record(TurnOutcome::Cancelled { cause }); + } + + pub(super) fn reset_continuation_state(&mut self) { + self.last_continuation_goal_id = None; + } +} + +fn cancelled_to_cancel_cause(c: &CancelledCause) -> CancelCause { + match c { + CancelledCause::UserInterrupt => CancelCause::UserInterrupt, + CancelledCause::CrashRecovery => CancelCause::CrashRecovery, + CancelledCause::ParentTurnAborted => CancelCause::ParentTurnAborted, + } +} + +// Explicit wire string for the TurnCancelled event, NOT the derived Debug: +// renaming a CancelledCause variant then fails to compile here instead of +// silently shifting the IPC/ACP contract. +pub(super) fn cancelled_cause_wire(c: &CancelledCause) -> &'static str { + match c { + CancelledCause::UserInterrupt => "UserInterrupt", + CancelledCause::CrashRecovery => "CrashRecovery", + CancelledCause::ParentTurnAborted => "ParentTurnAborted", + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn wire_strings_are_pinned_for_every_variant() { + let cases = [ + (CancelledCause::UserInterrupt, "UserInterrupt"), + (CancelledCause::CrashRecovery, "CrashRecovery"), + (CancelledCause::ParentTurnAborted, "ParentTurnAborted"), + ]; + for (cause, wire) in cases { + assert_eq!(cancelled_cause_wire(&cause), wire); + } + } +} diff --git a/crates/loopal-runtime/src/agent_loop/turn_exec.rs b/crates/loopal-runtime/src/agent_loop/turn_exec.rs index 7d1dee88..3cb36eaa 100644 --- a/crates/loopal-runtime/src/agent_loop/turn_exec.rs +++ b/crates/loopal-runtime/src/agent_loop/turn_exec.rs @@ -1,7 +1,7 @@ use loopal_error::Result; use loopal_provider_api::ContinuationIntent; use loopal_provider_api::MessageRole; -use tracing::{info, warn}; +use tracing::info; use super::TurnOutput; use super::runner::AgentLoopRunner; @@ -14,13 +14,6 @@ impl AgentLoopRunner { &mut self, turn_ctx: &mut TurnContext, ) -> Result { - if !self.continuation_still_consistent().await { - warn!("skipping continuation turn: goal changed before turn started"); - self.last_continuation_goal_id = None; - return Ok(TurnOutput { - output: String::new(), - }); - } let mut c = TurnLoopCounters { last_text: String::new(), continuation_count: 0, diff --git a/crates/loopal-runtime/src/agent_loop/turn_observer_dispatch.rs b/crates/loopal-runtime/src/agent_loop/turn_observer_dispatch.rs index 74365f86..157777e9 100644 --- a/crates/loopal-runtime/src/agent_loop/turn_observer_dispatch.rs +++ b/crates/loopal-runtime/src/agent_loop/turn_observer_dispatch.rs @@ -31,6 +31,7 @@ impl AgentLoopRunner { feedback_to_model, } => { warn!(%reason, "governance aborted turn"); + self.reset_continuation_state(); self.emit_in_turn(AgentEventPayload::Error { message: reason.clone(), }) diff --git a/crates/loopal-runtime/src/agent_loop/turn_record.rs b/crates/loopal-runtime/src/agent_loop/turn_record.rs index d2a23e55..cfde0fb6 100644 --- a/crates/loopal-runtime/src/agent_loop/turn_record.rs +++ b/crates/loopal-runtime/src/agent_loop/turn_record.rs @@ -81,6 +81,20 @@ impl AgentLoopRunner { self.turns.close_tool_batch(); } + pub(super) fn cancel_open_tool_batch_record(&mut self, cause: loopal_turn::CancelCause) { + let logger = make_logger(&self.params.deps.session_manager, &self.params.session.id); + self.turns.cancel_open_tool_batch(cause, &logger); + } + + // reason: user-level cancellation MUST go through finalize_turn_cancellation + // (pairs tool_use/tool_result, resets continuation, emits TurnCancelled). + // Two paths intentionally bypass finalize and end here directly: + // 1. compaction's synthetic host turn (compaction/host.rs) — owns no tool + // batch and no continuation state. + // 2. governance abort (turn_observer_dispatch.rs) — the turn made a real + // LlmCall and ends Complete with Cancelled compensation items; it must + // NOT call on_turn_cancelled (that would clear the very loop/degeneration + // streak that triggered the abort) and must NOT emit TurnCancelled. pub fn end_turn_record(&mut self, outcome: TurnOutcome) { let logger = make_logger(&self.params.deps.session_manager, &self.params.session.id); if let Err(e) = self.turns.end_turn(outcome, &logger) { diff --git a/crates/loopal-runtime/src/agent_loop/turn_telemetry.rs b/crates/loopal-runtime/src/agent_loop/turn_telemetry.rs index 9e35e259..6c5c38fa 100644 --- a/crates/loopal-runtime/src/agent_loop/turn_telemetry.rs +++ b/crates/loopal-runtime/src/agent_loop/turn_telemetry.rs @@ -31,6 +31,14 @@ impl AgentLoopRunner { async fn execute_turn_body(&mut self, turn_ctx: &mut TurnContext) -> Result { crate::otel_metrics::active_turns().add(1, &[]); + // Skip before on_turn_start/record so a stale continuation never enters + // turn_history (zero metrics would pollute degeneration streaks). + if self.skip_stale_continuation_turn(turn_ctx).await { + crate::otel_metrics::active_turns().add(-1, &[]); + return Ok(TurnOutput { + output: String::new(), + }); + } for h in &mut self.hooks { h.on_turn_start(turn_ctx); } @@ -76,24 +84,28 @@ impl AgentLoopRunner { let attrs = &[KeyValue::new("gen_ai.token.type", "output"), model_attr]; crate::otel_metrics::token_usage().add(m.tokens_out as u64, attrs); - if let Err(emit_err) = self - .emit_in_turn(AgentEventPayload::TurnCompleted( - loopal_protocol::TurnSummary { - turn_id: turn_ctx.turn_id, - duration_ms, - llm_calls: m.llm_calls, - tool_calls_requested: m.tool_calls_requested, - tool_calls_approved: m.tool_calls_approved, - tool_calls_denied: m.tool_calls_denied, - tool_errors: m.tool_errors, - auto_continuations: m.auto_continuations, - warnings_injected: m.warnings_injected, - tokens_in: m.tokens_in, - tokens_out: m.tokens_out, - modified_files: files, - }, - )) - .await + // A cancelled turn is reported via TurnCancelled (finalize), not + // TurnCompleted — emitting both would give the same turn_id divergent + // terminal states. OTel duration/token above still record the real cost. + if !turn_ctx.cancel.is_cancelled() + && let Err(emit_err) = self + .emit_in_turn(AgentEventPayload::TurnCompleted( + loopal_protocol::TurnSummary { + turn_id: turn_ctx.turn_id, + duration_ms, + llm_calls: m.llm_calls, + tool_calls_requested: m.tool_calls_requested, + tool_calls_approved: m.tool_calls_approved, + tool_calls_denied: m.tool_calls_denied, + tool_errors: m.tool_errors, + auto_continuations: m.auto_continuations, + warnings_injected: m.warnings_injected, + tokens_in: m.tokens_in, + tokens_out: m.tokens_out, + modified_files: files, + }, + )) + .await { tracing::error!( error = %emit_err, @@ -108,6 +120,12 @@ impl AgentLoopRunner { } async fn record_turn_completion(&mut self, turn_ctx: &TurnContext) { + // A cancelled turn is not a degeneration/loop sample: it was interrupted, + // not the agent making (or failing to make) progress. tool_calls_approved + // counts permitted-not-completed tools, so it cannot stand in for progress. + if turn_ctx.cancel.is_cancelled() { + return; + } let record = TurnRecord { metrics: turn_ctx.metrics.clone(), text_hash: turn_ctx.metrics.text_hash, diff --git a/crates/loopal-runtime/src/agent_loop/turn_tool_phase.rs b/crates/loopal-runtime/src/agent_loop/turn_tool_phase.rs index d2b80f4d..65192ca8 100644 --- a/crates/loopal-runtime/src/agent_loop/turn_tool_phase.rs +++ b/crates/loopal-runtime/src/agent_loop/turn_tool_phase.rs @@ -72,6 +72,14 @@ impl AgentLoopRunner { } self.inject_pending_messages().await; + // A cancelled batch's results are synthetic "Interrupted by user" errors; + // feeding them to LoopDetector would let repeated interrupts accrue a + // false loop streak. Hooks (DiffTracker) still run on partial writes. + if !turn_ctx.cancel.is_cancelled() { + for g in &mut self.governance { + g.on_after_tools(turn_ctx, &tool_uses, &result_blocks); + } + } for h in &mut self.hooks { h.on_after_tools(turn_ctx, &tool_uses, &result_blocks); } diff --git a/crates/loopal-runtime/tests/agent_loop/cancel_finalize_e2e_test.rs b/crates/loopal-runtime/tests/agent_loop/cancel_finalize_e2e_test.rs new file mode 100644 index 00000000..7bae0b43 --- /dev/null +++ b/crates/loopal-runtime/tests/agent_loop/cancel_finalize_e2e_test.rs @@ -0,0 +1,148 @@ +use std::time::Duration; + +use loopal_protocol::{AgentEventPayload, Envelope, MessageSource}; +use loopal_runtime::agent_loop::WaitResult; +use loopal_test_support::{HarnessBuilder, TestFixture, chunks}; +use loopal_turn::{ + AssistantOutput, OrderedToolBatch, StopReason, ToolBatchItem, ToolCall, ToolCallId, + ToolExecState, Turn, TurnStep, TurnTrigger, +}; + +use super::e2e_event_waiters::{wait_for_interrupted_event, wait_for_stream_event}; +use super::goal_e2e_test::make_goal_session; + +fn in_progress_turn_with_open_tool() -> Turn { + let call = ToolCall { + id: ToolCallId::new("t1"), + name: "Bash".into(), + input: serde_json::Value::Null, + }; + let mut turn = Turn::new(TurnTrigger::UserInput { + envelope_id: "e1".into(), + content: "do work".into(), + images: vec![], + }); + turn.body.steps = vec![ + TurnStep::LlmCall { + model: "m".into(), + response: AssistantOutput { + thinking: None, + text_blocks: vec![], + tool_calls: vec![call.clone()], + server_blocks: vec![], + stop_reason: StopReason::ToolUse, + }, + }, + TurnStep::ToolBatch(OrderedToolBatch { + items: vec![ToolBatchItem { + call, + state: ToolExecState::Pending, + }], + }), + ]; + turn +} + +// Verifies the ingest→finalize path: ingesting a new envelope while a turn is +// still in-progress emits an observable TurnCancelled and resets continuation +// state (vs the old silent end_turn_record). The pending→Cancelled pairing is +// NOT exercised here (seed_test_turns leaves the tool batch unopened); that is +// covered by the cancel_open_tool_batch_marks_all_pending_as_cancelled unit test. +#[tokio::test] +async fn ingest_while_turn_in_progress_emits_turn_cancelled() { + let inner = HarnessBuilder::new() + .calls(vec![chunks::text_turn("x")]) + .messages(vec![]) + .lifecycle(loopal_runtime::LifecycleMode::Persistent) + .build() + .await; + let mut runner = inner.runner; + let mut event_rx = inner.event_rx; + + runner.seed_test_turns(vec![in_progress_turn_with_open_tool()]); + runner.last_continuation_goal_id = Some("stale-goal".into()); + + inner + .mailbox_tx + .send(Envelope::new(MessageSource::Human, "main", "next")) + .await + .unwrap(); + + let r = tokio::time::timeout(Duration::from_secs(2), runner.wait_for_input()) + .await + .expect("wait_for_input must not hang") + .unwrap(); + assert!(matches!(r, Some(WaitResult::MessageAdded))); + + // finalize must have reset the stale continuation goal. + assert!( + runner.last_continuation_goal_id.is_none(), + "cancellation must reset continuation state" + ); + + let mut saw_cancelled = false; + while let Ok(ev) = event_rx.try_recv() { + if let AgentEventPayload::TurnCancelled { cause } = &ev.payload { + assert_eq!(cause, "ParentTurnAborted"); + saw_cancelled = true; + } + } + assert!( + saw_cancelled, + "in-progress turn cancelled by new envelope must emit TurnCancelled" + ); + + drop(inner.mailbox_tx); + drop(inner.control_tx); +} + +// Interrupt mid-stream must finalize the turn: emit TurnCancelled (after the +// Interrupted event) so the interrupted turn is collected as +// Cancelled{UserInterrupt}, not left InProgress to be mislabeled later. +#[tokio::test] +async fn interrupt_mid_stream_finalizes_turn_as_cancelled() { + let fixture = TestFixture::new(); + let (_tmp, session, _log) = make_goal_session(&fixture.test_session("interrupt-finalize").id); + session.create("ongoing".into()).await.unwrap(); + let mut harness = HarnessBuilder::new() + .calls(vec![chunks::text_turn("streaming...")]) + .messages(vec![]) + .goal_session(session.clone()) + .llm_chunk_delay(Duration::from_millis(80)) + .build_spawned() + .await; + + wait_for_stream_event(&mut harness.event_rx).await; + harness.interrupt.signal(); + wait_for_interrupted_event(&mut harness.event_rx).await; + + let mut saw_cancelled = false; + let mut saw_completed = false; + let deadline = tokio::time::Instant::now() + Duration::from_secs(2); + while tokio::time::Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + match tokio::time::timeout(remaining, harness.event_rx.recv()).await { + Ok(Some(ev)) => match &ev.payload { + AgentEventPayload::TurnCancelled { cause } => { + assert_eq!(cause, "UserInterrupt"); + saw_cancelled = true; + break; + } + AgentEventPayload::TurnCompleted(_) => saw_completed = true, + _ => {} + }, + Ok(None) | Err(_) => break, + } + } + assert!( + saw_cancelled, + "interrupt must finalize the turn and emit TurnCancelled" + ); + assert!( + !saw_completed, + "a cancelled turn must NOT also emit TurnCompleted (divergent terminal states)" + ); + + drop(harness.control_tx); + drop(harness.mailbox_tx); +} diff --git a/crates/loopal-runtime/tests/agent_loop/continuation_bypass_test.rs b/crates/loopal-runtime/tests/agent_loop/continuation_bypass_test.rs new file mode 100644 index 00000000..1aca59d6 --- /dev/null +++ b/crates/loopal-runtime/tests/agent_loop/continuation_bypass_test.rs @@ -0,0 +1,40 @@ +use std::time::Duration; + +use loopal_protocol::{Envelope, MessageSource}; +use loopal_test_support::{HarnessBuilder, chunks}; + +use super::e2e_event_waiters::wait_for_call_count; + +// Regression: a stale `last_continuation_goal_id` + an inconsistent goal made +// the continuation gate fire for EVERY turn, silently swallowing real user +// input (TurnStarted→TurnEnded{Complete}, zero LlmCall). The gate must now +// only apply to GoalContinuation turns. +#[tokio::test] +async fn user_input_reaches_llm_despite_stale_continuation_goal() { + let inner = HarnessBuilder::new() + .calls(vec![chunks::text_turn("ok")]) + .messages(vec![]) + .lifecycle(loopal_runtime::LifecycleMode::Persistent) + .build() + .await; + let recorded = inner.recorded_messages.clone(); + let mut runner = inner.runner; + // No goal_session → continuation_still_consistent() returns false; the + // stale id is what the buggy gate keyed on. + runner.last_continuation_goal_id = Some("stale-goal".into()); + let task = tokio::spawn(async move { runner.run().await }); + + inner + .mailbox_tx + .send(Envelope::new(MessageSource::Human, "main", "hi")) + .await + .unwrap(); + + // Pre-fix this times out at 0 calls; post-fix the UserInput turn reaches + // the LLM exactly as a non-continuation turn must. + wait_for_call_count(&recorded, 1, Duration::from_secs(3)).await; + + drop(inner.mailbox_tx); + drop(inner.control_tx); + let _ = tokio::time::timeout(Duration::from_secs(1), task).await; +} diff --git a/crates/loopal-runtime/tests/agent_loop/governance_cancel_e2e_test.rs b/crates/loopal-runtime/tests/agent_loop/governance_cancel_e2e_test.rs new file mode 100644 index 00000000..61fcb117 --- /dev/null +++ b/crates/loopal-runtime/tests/agent_loop/governance_cancel_e2e_test.rs @@ -0,0 +1,141 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::Duration; + +use loopal_protocol::{AgentEventPayload, Envelope, MessageSource}; +use loopal_runtime::agent_loop::governance::{Governance, PostTurnAction}; +use loopal_runtime::agent_loop::turn_history::{TurnHistory, TurnRecord}; +use loopal_test_support::{HarnessBuilder, TestFixture, chunks}; +use loopal_turn::{Turn, TurnTrigger}; + +use super::e2e_event_waiters::{wait_for_interrupted_event, wait_for_stream_event}; +use super::goal_e2e_test::make_goal_session; + +struct OnAfterTurnSpy { + count: Arc, + cancelled: Arc, +} + +impl Governance for OnAfterTurnSpy { + fn on_after_turn(&mut self, _record: &TurnRecord, _history: &TurnHistory) -> PostTurnAction { + self.count.fetch_add(1, Ordering::Relaxed); + PostTurnAction::None + } + fn on_turn_cancelled(&mut self) { + self.cancelled.fetch_add(1, Ordering::Relaxed); + } +} + +// Control for interrupted_turn_skips_governance_after_turn: a normally completed +// turn DOES feed governance on_after_turn (count==1) and does NOT trigger +// on_turn_cancelled. Together the two pin the contract bidirectionally. +#[tokio::test] +async fn completed_turn_feeds_governance_after_turn() { + let inner = HarnessBuilder::new() + .calls(vec![chunks::text_turn("done")]) + .build() + .await; + let count = Arc::new(AtomicU32::new(0)); + let cancelled = Arc::new(AtomicU32::new(0)); + let mut runner = inner.runner; + runner.governance.push(Box::new(OnAfterTurnSpy { + count: count.clone(), + cancelled: cancelled.clone(), + })); + runner.run().await.unwrap(); + assert_eq!(count.load(Ordering::Relaxed), 1); + assert_eq!(cancelled.load(Ordering::Relaxed), 0); +} + +// A cancelled turn must NOT feed governance on_after_turn, but MUST trigger +// on_turn_cancelled (so LoopDetector resets its streak across the interrupt). +#[tokio::test] +async fn interrupted_turn_skips_governance_after_turn() { + let inner = HarnessBuilder::new() + .calls(vec![chunks::text_turn("streaming...")]) + .messages(vec![]) + .lifecycle(loopal_runtime::LifecycleMode::Persistent) + .llm_chunk_delay(Duration::from_millis(80)) + .build() + .await; + let count = Arc::new(AtomicU32::new(0)); + let cancelled = Arc::new(AtomicU32::new(0)); + let mut runner = inner.runner; + runner.governance.push(Box::new(OnAfterTurnSpy { + count: count.clone(), + cancelled: cancelled.clone(), + })); + let interrupt = inner.interrupt.clone(); + let mailbox = inner.mailbox_tx.clone(); + let mut event_rx = inner.event_rx; + let task = tokio::spawn(async move { runner.run().await }); + + mailbox + .send(Envelope::new(MessageSource::Human, "main", "go")) + .await + .unwrap(); + wait_for_stream_event(&mut event_rx).await; + interrupt.signal(); + wait_for_interrupted_event(&mut event_rx).await; + + drop(mailbox); + drop(inner.mailbox_tx); + drop(inner.control_tx); + let _ = tokio::time::timeout(Duration::from_secs(1), task).await; + + assert_eq!(count.load(Ordering::Relaxed), 0); + assert!(cancelled.load(Ordering::Relaxed) >= 1); +} + +fn goal_continuation_turn() -> Turn { + Turn::new(TurnTrigger::GoalContinuation { + envelope_id: "g1".into(), + content: "keep going".into(), + }) +} + +// A GoalContinuation turn whose goal changed before it started is skipped: +// emits ContinuationSkipped and is rewound (not executed). Regression for the +// round-7 skip path, which otherwise had no end-to-end coverage. +#[tokio::test] +async fn stale_continuation_turn_is_skipped() { + let fixture = TestFixture::new(); + let (_tmp, session, _log) = make_goal_session(&fixture.test_session("stale-skip").id); + session.create("ongoing".into()).await.unwrap(); + let inner = HarnessBuilder::new() + .calls(vec![chunks::text_turn("unused")]) + .messages(vec![]) + .goal_session(session.clone()) + .lifecycle(loopal_runtime::LifecycleMode::Persistent) + .build() + .await; + let mut runner = inner.runner; + // Stale id != the live goal's id → continuation_still_consistent is false. + runner.last_continuation_goal_id = Some("stale-goal-id".into()); + runner.seed_test_turns(vec![goal_continuation_turn()]); + let mut event_rx = inner.event_rx; + let task = tokio::spawn(async move { runner.run().await }); + + let mut saw_skipped = false; + let deadline = tokio::time::Instant::now() + Duration::from_secs(2); + while tokio::time::Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + match tokio::time::timeout(remaining, event_rx.recv()).await { + Ok(Some(ev)) => { + if matches!(ev.payload, AgentEventPayload::ContinuationSkipped { .. }) { + saw_skipped = true; + break; + } + } + Ok(None) | Err(_) => break, + } + } + drop(inner.mailbox_tx); + drop(inner.control_tx); + let _ = tokio::time::timeout(Duration::from_secs(1), task).await; + + assert!( + saw_skipped, + "stale continuation turn must emit ContinuationSkipped" + ); +} diff --git a/crates/loopal-runtime/tests/agent_loop/mod.rs b/crates/loopal-runtime/tests/agent_loop/mod.rs index 7ba78956..1d775552 100644 --- a/crates/loopal-runtime/tests/agent_loop/mod.rs +++ b/crates/loopal-runtime/tests/agent_loop/mod.rs @@ -71,6 +71,7 @@ mod goal_e2e_test; mod goal_kickoff_edge_test; mod goal_kickoff_runner_test; mod goal_kickoff_test; +mod governance_cancel_e2e_test; mod idle_e2e_test; mod inbox_event_test; mod input_edge_test; @@ -87,8 +88,10 @@ mod microcompact_e2e_test; pub mod mock_provider; mod rehydrate_e2e_test; pub use mock_provider::make_runner_with_mock_provider; +mod cancel_finalize_e2e_test; mod cancel_test; mod context_budget_test; +mod continuation_bypass_test; mod dispatch_test; mod model_routing_test; mod params_builder_test; diff --git a/crates/loopal-runtime/tests/suite.rs b/crates/loopal-runtime/tests/suite.rs index fc087772..9bc0e5ee 100644 --- a/crates/loopal-runtime/tests/suite.rs +++ b/crates/loopal-runtime/tests/suite.rs @@ -45,6 +45,8 @@ mod goal_session_test; mod governance_bridge_test; #[path = "suite/hydrate_test.rs"] mod hydrate_test; +#[path = "suite/loop_detector_digest_test.rs"] +mod loop_detector_digest_test; #[path = "suite/loop_detector_edge_test.rs"] mod loop_detector_edge_test; #[path = "suite/loop_detector_test.rs"] diff --git a/crates/loopal-runtime/tests/suite/loop_detector_digest_test.rs b/crates/loopal-runtime/tests/suite/loop_detector_digest_test.rs new file mode 100644 index 00000000..21461eaa --- /dev/null +++ b/crates/loopal-runtime/tests/suite/loop_detector_digest_test.rs @@ -0,0 +1,157 @@ +use loopal_protocol::InterruptSignal; +use loopal_provider_api::ContentBlock; +use loopal_runtime::agent_loop::cancel::TurnCancel; +use loopal_runtime::agent_loop::governance::{Governance, Verdict}; +use loopal_runtime::agent_loop::loop_detector::LoopDetector; +use loopal_runtime::agent_loop::turn_context::TurnContext; +use loopal_tool_invocation::{ToolImageBlock, ToolResultMetadata}; +use serde_json::json; +use std::sync::Arc; + +fn make_ctx() -> TurnContext { + let cancel = TurnCancel::new( + InterruptSignal::new(), + Arc::new(tokio::sync::watch::channel(0u64).0), + ); + TurnContext::new(0, cancel) +} + +fn image_result(data: &str) -> Vec { + vec![ContentBlock::ToolResult { + tool_use_id: "id".into(), + content: String::new(), + images: vec![ToolImageBlock::inline("image/png", data.to_string())], + is_error: false, + metadata: None, + }] +} + +#[test] +fn changing_image_content_never_aborts() { + // Same path (same input), different image bytes each call. content is empty + // so only the image distinguishes calls — pins #A: the digest must hash + // image content (data/id), not just byte_size. "frame-N" strings share the + // same byte_size, so a byte_size-only digest would falsely abort. + let mut det = LoopDetector::new(); + let mut ctx = make_ctx(); + let calls = [( + "id".into(), + "ReadImage".into(), + json!({"file": "/tmp/c.png"}), + )]; + for i in 0..8 { + det.on_before_tools(&mut ctx, &calls); + det.on_after_tools(&mut ctx, &calls, &image_result(&format!("frame-{i}"))); + } + assert!(matches!( + det.on_before_tools(&mut ctx, &calls), + Verdict::Continue + )); +} + +#[test] +fn identical_image_content_still_aborts() { + // Same path AND identical image bytes → genuine loop → must abort. + let mut det = LoopDetector::new(); + let mut ctx = make_ctx(); + let calls = [( + "id".into(), + "ReadImage".into(), + json!({"file": "/tmp/c.png"}), + )]; + for _ in 0..5 { + det.on_before_tools(&mut ctx, &calls); + det.on_after_tools(&mut ctx, &calls, &image_result("same-bytes")); + } + assert!(matches!( + det.on_before_tools(&mut ctx, &calls), + Verdict::AbortTurn { .. } + )); +} + +#[test] +fn absent_result_does_not_accrue() { + // output_digest_for returns None when no ToolResult matches the id; the + // streak must not accrue (None must not fold to one shared empty digest). + let mut det = LoopDetector::new(); + let mut ctx = make_ctx(); + let calls = [("id".into(), "Read".into(), json!({"file": "/tmp/x.rs"}))]; + let unrelated = vec![ContentBlock::ToolResult { + tool_use_id: "other".into(), + content: "x".into(), + images: vec![], + is_error: false, + metadata: None, + }]; + for _ in 0..7 { + det.on_before_tools(&mut ctx, &calls); + det.on_after_tools(&mut ctx, &calls, &unrelated); + } + assert!(matches!( + det.on_before_tools(&mut ctx, &calls), + Verdict::Continue + )); +} + +fn write_result(count: u64) -> Vec { + vec![ContentBlock::ToolResult { + tool_use_id: "id".into(), + content: "File written".into(), + images: vec![], + is_error: false, + metadata: Some(ToolResultMetadata::bytes_written(count)), + }] +} + +#[test] +fn changing_metadata_never_aborts() { + // Fixed content + is_error, only the metadata (byte count) changes — must + // not be flagged as a loop, since the result genuinely differs each call. + let mut det = LoopDetector::new(); + let mut ctx = make_ctx(); + let calls = [("id".into(), "Write".into(), json!({"file": "/tmp/x"}))]; + for i in 0..8 { + det.on_before_tools(&mut ctx, &calls); + det.on_after_tools(&mut ctx, &calls, &write_result(i)); + } + assert!(matches!( + det.on_before_tools(&mut ctx, &calls), + Verdict::Continue + )); +} + +#[test] +fn on_turn_cancelled_resets_streak() { + // A user interrupt resets the loop streak. Prove a FULL reset (not a + // decrement): post-cancel the next identical call is Continue (count back + // to 0, below warn threshold), AND a fresh run of 5 identical calls accrues + // from zero to AbortTurn again — a decrement-by-one would leave the streak + // hot and either warn immediately or abort one call early. + let mut det = LoopDetector::new(); + let mut ctx = make_ctx(); + let calls = [( + "id".into(), + "ReadImage".into(), + json!({"file": "/tmp/c.png"}), + )]; + for _ in 0..5 { + det.on_before_tools(&mut ctx, &calls); + det.on_after_tools(&mut ctx, &calls, &image_result("same-bytes")); + } + det.on_turn_cancelled(); + assert!( + matches!(det.on_before_tools(&mut ctx, &calls), Verdict::Continue), + "first post-cancel call must be Continue (streak cleared, not warning)" + ); + for _ in 0..5 { + det.on_before_tools(&mut ctx, &calls); + det.on_after_tools(&mut ctx, &calls, &image_result("same-bytes")); + } + assert!( + matches!( + det.on_before_tools(&mut ctx, &calls), + Verdict::AbortTurn { .. } + ), + "streak must re-accrue from zero after the reset" + ); +} diff --git a/crates/loopal-runtime/tests/suite/loop_detector_edge_test.rs b/crates/loopal-runtime/tests/suite/loop_detector_edge_test.rs index ae6c4efc..38d5a8ce 100644 --- a/crates/loopal-runtime/tests/suite/loop_detector_edge_test.rs +++ b/crates/loopal-runtime/tests/suite/loop_detector_edge_test.rs @@ -1,4 +1,5 @@ use loopal_protocol::{InterruptSignal, MessageSource, QualifiedAddress}; +use loopal_provider_api::ContentBlock; use loopal_runtime::agent_loop::cancel::TurnCancel; use loopal_runtime::agent_loop::governance::{Governance, Verdict}; use loopal_runtime::agent_loop::loop_detector::LoopDetector; @@ -14,51 +15,47 @@ fn make_ctx() -> TurnContext { TurnContext::new(0, cancel) } -// --- Regression: fan-out with long shared prefix must not collide --- +fn res(id: &str, content: &str) -> Vec { + vec![ContentBlock::ToolResult { + tool_use_id: id.into(), + content: content.into(), + images: vec![], + is_error: false, + metadata: None, + }] +} #[test] -fn loop_detector_fanout_different_targets_does_not_trigger() { - // Regression for prefix-hash collision. When the signature was built - // from the first 200 bytes of the serialized JSON, and `serde_json` - // ordered keys alphabetically (BTreeMap), a SendMessage call with - // {"message": , "summary": …, "to": } would hash away - // the `to` field entirely — so 5 messages to 5 distinct recipients - // looked identical and tripped the abort threshold. - // - // With full-JSON hashing, each distinct `to` yields a distinct signature. +fn fanout_different_targets_does_not_trigger() { + // Each distinct `to` yields a distinct input signature, so fanning out to + // many recipients never accrues a shared streak (regression for an old + // prefix-hash collision). let mut det = LoopDetector::new(); let mut ctx = make_ctx(); - let long_msg = "你好。我是 hub-83e6571f 的 agent。用户给我布置了一个任务:".repeat(6); - let targets = [ - "hub-6d7d3682", - "hub-0d7124fc", - "hub-9b54624e", - "hub-4809c5a6", - "hub-f117ce0b", - ]; - let calls: Vec<(String, String, serde_json::Value)> = targets - .iter() - .map(|t| { - ( - format!("id-{t}"), - "SendMessage".into(), - json!({"to": *t, "message": long_msg, "summary": "intro ping"}), - ) - }) - .collect(); - - let action = det.on_before_tools(&mut ctx, &calls); - assert!( - matches!(action, Verdict::Continue), - "fan-out to 5 distinct targets must not trigger loop detector, got {action:?}" - ); + let long_msg = "你好。我是 hub 的 agent。用户给我布置了任务:".repeat(6); + let targets = ["hub-a", "hub-b", "hub-c", "hub-d", "hub-e"]; + for t in targets { + let calls = vec![( + format!("id-{t}"), + "SendMessage".into(), + json!({"to": t, "message": long_msg, "summary": "intro"}), + )]; + det.on_before_tools(&mut ctx, &calls); + det.on_after_tools(&mut ctx, &calls, &res(&format!("id-{t}"), "ok")); + } + let calls = vec![( + "id-hub-a".into(), + "SendMessage".into(), + json!({"to": "hub-a", "message": long_msg, "summary": "intro"}), + )]; + assert!(matches!( + det.on_before_tools(&mut ctx, &calls), + Verdict::Continue + )); } #[test] -fn loop_detector_fanout_with_identical_payload_still_triggers() { - // Sanity check: the fix must not mask genuine loops. Repeating the - // *exact same* call (identical `to` + `message`) 5 times should still - // abort — this is the behavior the detector was designed to protect. +fn identical_payload_and_output_still_triggers() { let mut det = LoopDetector::new(); let mut ctx = make_ctx(); let call = vec![( @@ -66,24 +63,23 @@ fn loop_detector_fanout_with_identical_payload_still_triggers() { "SendMessage".into(), json!({"to": "hub-a", "message": "hello", "summary": "s"}), )]; - for _ in 0..4 { + for _ in 0..5 { det.on_before_tools(&mut ctx, &call); + det.on_after_tools(&mut ctx, &call, &res("id", "same")); } - let action = det.on_before_tools(&mut ctx, &call); - assert!( - matches!(action, Verdict::AbortTurn { .. }), - "identical SendMessage repeated 5 times should still abort, got {action:?}" - ); + assert!(matches!( + det.on_before_tools(&mut ctx, &call), + Verdict::AbortTurn { .. } + )); } -// --- Reset-on-envelope: end-to-end table sentinel --- - fn ready_to_abort_detector() -> (LoopDetector, TurnContext) { let mut det = LoopDetector::new(); let mut ctx = make_ctx(); let calls = [("id".into(), "Read".into(), json!({"file": "/tmp/x.rs"}))]; - for _ in 0..4 { + for _ in 0..5 { det.on_before_tools(&mut ctx, &calls); + det.on_after_tools(&mut ctx, &calls, &res("id", "same")); } (det, ctx) } @@ -111,10 +107,7 @@ fn assert_reset_outcome(source: MessageSource, should_reset: bool, label: &str) } #[test] -fn loop_detector_envelope_reset_table() { - // Each MessageSource variant is paired with its expected reset behavior. - // When MessageSource (or the System-kind set) grows, the author must - // extend this table and make an explicit decision. +fn envelope_reset_table() { let cases: Vec<(MessageSource, bool, &str)> = vec![ (MessageSource::Human, true, "Human"), (MessageSource::Scheduled, true, "Scheduled"), @@ -132,31 +125,11 @@ fn loop_detector_envelope_reset_table() { false, "System:goal_continuation", ), - ( - MessageSource::System("governance_compensation".into()), - false, - "System:governance_compensation", - ), ( MessageSource::System("governance_feedback".into()), false, "System:governance_feedback", ), - ( - MessageSource::System("stop_feedback".into()), - false, - "System:stop_feedback", - ), - ( - MessageSource::System("config_refresh".into()), - false, - "System:config_refresh", - ), - ( - MessageSource::System("compaction_summary".into()), - false, - "System:compaction_summary", - ), ( MessageSource::System("future_unknown".into()), false, @@ -167,3 +140,51 @@ fn loop_detector_envelope_reset_table() { assert_reset_outcome(src, expected_reset, label); } } + +#[test] +fn repeated_identical_error_aborts() { + // A tool that keeps returning the SAME error (is_error + same content) is a + // real loop — must still abort after the threshold. + let mut det = LoopDetector::new(); + let mut ctx = make_ctx(); + let calls = [("id".into(), "Bash".into(), json!({"cmd": "x"}))]; + let err = vec![ContentBlock::ToolResult { + tool_use_id: "id".into(), + content: "boom".into(), + images: vec![], + is_error: true, + metadata: None, + }]; + for _ in 0..5 { + det.on_before_tools(&mut ctx, &calls); + det.on_after_tools(&mut ctx, &calls, &err); + } + assert!(matches!( + det.on_before_tools(&mut ctx, &calls), + Verdict::AbortTurn { .. } + )); +} + +#[test] +fn parallel_identical_calls_count_once_per_batch() { + // Two identical (name,input) calls in one batch must bump the streak by 1, + // not 2 — a batch is a single decision point. + let mut det = LoopDetector::new(); + let mut ctx = make_ctx(); + let calls = vec![ + ("id1".into(), "Read".into(), json!({"file": "/tmp/x.rs"})), + ("id2".into(), "Read".into(), json!({"file": "/tmp/x.rs"})), + ]; + let results = [res("id1", "same"), res("id2", "same")].concat(); + // With per-batch dedup, 4 batches → count 4 → InjectWarning (≥WARN 3, not + // yet ABORT 5). Without dedup, 4 × 2 = 8 would abort; if accrual broke + // entirely, count would stay 0 → Continue. Asserting Warning pins both. + for _ in 0..4 { + det.on_before_tools(&mut ctx, &calls); + det.on_after_tools(&mut ctx, &calls, &results); + } + assert!(matches!( + det.on_before_tools(&mut ctx, &calls), + Verdict::InjectWarning(_) + )); +} diff --git a/crates/loopal-runtime/tests/suite/loop_detector_test.rs b/crates/loopal-runtime/tests/suite/loop_detector_test.rs index efeaf99f..052b3399 100644 --- a/crates/loopal-runtime/tests/suite/loop_detector_test.rs +++ b/crates/loopal-runtime/tests/suite/loop_detector_test.rs @@ -1,4 +1,5 @@ use loopal_protocol::{InterruptSignal, MessageSource}; +use loopal_provider_api::ContentBlock; use loopal_runtime::agent_loop::cancel::TurnCancel; use loopal_runtime::agent_loop::governance::{Governance, TurnHook, Verdict}; use loopal_runtime::agent_loop::loop_detector::LoopDetector; @@ -18,27 +19,45 @@ fn tool(name: &str) -> (String, String, serde_json::Value) { ("id".into(), name.into(), json!({"file": "/tmp/x.rs"})) } -// --- Governance trait defaults --- +fn result(content: &str) -> Vec { + vec![ContentBlock::ToolResult { + tool_use_id: "id".into(), + content: content.into(), + images: vec![], + is_error: false, + metadata: None, + }] +} + +// One before→execute→after cycle; returns the pre-execution verdict and feeds +// `content` as this call's output so the detector can track output stability. +fn cycle(det: &mut LoopDetector, ctx: &mut TurnContext, name: &str, content: &str) -> Verdict { + let calls = [tool(name)]; + let v = det.on_before_tools(ctx, &calls); + det.on_after_tools(ctx, &calls, &result(content)); + v +} + +// --- trait defaults --- #[test] fn governance_defaults_are_continue() { struct NoopGovernance; impl Governance for NoopGovernance {} - let mut g = NoopGovernance; let mut ctx = make_ctx(); - let action = g.on_before_tools(&mut ctx, &[tool("Read")]); - assert!(matches!(action, Verdict::Continue)); + assert!(matches!( + g.on_before_tools(&mut ctx, &[tool("Read")]), + Verdict::Continue + )); + g.on_after_tools(&mut ctx, &[tool("Read")], &[]); g.on_envelope_received(&MessageSource::Human); } -// --- TurnHook trait defaults --- - #[test] fn turn_hook_defaults_are_noop() { struct NoopHook; impl TurnHook for NoopHook {} - let mut h = NoopHook; let mut ctx = make_ctx(); h.on_turn_start(&mut ctx); @@ -46,193 +65,131 @@ fn turn_hook_defaults_are_noop() { h.on_turn_end(&ctx); } -// --- LoopDetector direct tests --- - -#[test] -fn loop_detector_no_repeat_returns_continue() { - let mut det = LoopDetector::new(); - let mut ctx = make_ctx(); - let action = det.on_before_tools(&mut ctx, &[tool("Read")]); - assert!(matches!(action, Verdict::Continue)); -} +// --- stationary repetition (same input → same output) trips the detector --- #[test] -fn loop_detector_three_repeats_warns() { +fn identical_output_three_times_warns() { let mut det = LoopDetector::new(); let mut ctx = make_ctx(); - let calls = [tool("Read")]; - det.on_before_tools(&mut ctx, &calls); - det.on_before_tools(&mut ctx, &calls); - let action = det.on_before_tools(&mut ctx, &calls); - assert!( - matches!(action, Verdict::InjectWarning(_)), - "expected InjectWarning after 3 repeats, got {action:?}" - ); + for _ in 0..3 { + cycle(&mut det, &mut ctx, "Read", "same"); + } + assert!(matches!( + cycle(&mut det, &mut ctx, "Read", "same"), + Verdict::InjectWarning(_) + )); } #[test] -fn loop_detector_five_repeats_aborts() { +fn identical_output_five_times_aborts() { let mut det = LoopDetector::new(); let mut ctx = make_ctx(); - let calls = [tool("Read")]; - for _ in 0..4 { - det.on_before_tools(&mut ctx, &calls); + for _ in 0..5 { + cycle(&mut det, &mut ctx, "Read", "same"); } - let action = det.on_before_tools(&mut ctx, &calls); let Verdict::AbortTurn { reason, feedback_to_model, - } = action + } = cycle(&mut det, &mut ctx, "Read", "same") else { - panic!("expected AbortTurn after 5 repeats, got {action:?}"); + panic!("expected AbortTurn after 5 identical outputs"); }; assert!(reason.contains("Loop detected")); - assert!( - !feedback_to_model.is_empty(), - "AbortTurn must carry a non-empty feedback_to_model so the model sees why" - ); - assert!( - feedback_to_model.contains("Read"), - "feedback_to_model should mention the offending tool name" - ); + assert!(feedback_to_model.contains("Read")); } +// --- the ReadImage regression: same args, fresh output each call --- + #[test] -fn loop_detector_human_envelope_resets() { +fn changing_output_never_aborts() { let mut det = LoopDetector::new(); let mut ctx = make_ctx(); - let calls = [tool("Read")]; - for _ in 0..4 { - det.on_before_tools(&mut ctx, &calls); + // Same tool + same args (e.g. ReadImage on an overwritten screenshot path) + // but a different result every call — must never be flagged. + for i in 0..8 { + let v = cycle(&mut det, &mut ctx, "ReadImage", &format!("frame-{i}")); + assert!( + matches!(v, Verdict::Continue), + "fresh output must reset the streak at iteration {i}, got {v:?}" + ); } - det.on_envelope_received(&MessageSource::Human); - let action = det.on_before_tools(&mut ctx, &calls); - assert!( - matches!(action, Verdict::Continue), - "expected Continue after Human envelope reset, got {action:?}" - ); } #[test] -fn loop_detector_scheduled_envelope_resets() { +fn single_cycle_returns_continue() { let mut det = LoopDetector::new(); let mut ctx = make_ctx(); - let calls = [tool("Read")]; - for _ in 0..4 { - det.on_before_tools(&mut ctx, &calls); - } - det.on_envelope_received(&MessageSource::Scheduled); - let action = det.on_before_tools(&mut ctx, &calls); - assert!( - matches!(action, Verdict::Continue), - "expected Continue after Scheduled envelope reset, got {action:?}" - ); + assert!(matches!( + cycle(&mut det, &mut ctx, "Read", "x"), + Verdict::Continue + )); } -#[test] -fn loop_detector_system_envelope_does_not_reset() { - let mut det = LoopDetector::new(); - let mut ctx = make_ctx(); - let calls = [tool("Read")]; - for _ in 0..4 { - det.on_before_tools(&mut ctx, &calls); +// --- resets --- + +fn prime_to_abort(det: &mut LoopDetector, ctx: &mut TurnContext) { + for _ in 0..5 { + cycle(det, ctx, "Read", "same"); } - // System-injected envelopes (continuation, hook rewake) must NOT reset — - // they extend the current loop rather than mark a new task boundary. - det.on_envelope_received(&MessageSource::System("goal_continuation".into())); - let action = det.on_before_tools(&mut ctx, &calls); - assert!( - matches!(action, Verdict::AbortTurn { .. }), - "expected AbortTurn (signatures preserved) after System envelope, got {action:?}" - ); } #[test] -fn loop_detector_different_tools_independent() { +fn human_envelope_resets() { let mut det = LoopDetector::new(); let mut ctx = make_ctx(); - // Read x2, Write x2 — neither reaches threshold - det.on_before_tools(&mut ctx, &[tool("Read")]); - det.on_before_tools(&mut ctx, &[tool("Write")]); - det.on_before_tools(&mut ctx, &[tool("Read")]); - let action = det.on_before_tools(&mut ctx, &[tool("Write")]); - assert!( - matches!(action, Verdict::Continue), - "different tools should not trigger loop: {action:?}" - ); + prime_to_abort(&mut det, &mut ctx); + det.on_envelope_received(&MessageSource::Human); + assert!(matches!( + cycle(&mut det, &mut ctx, "Read", "same"), + Verdict::Continue + )); } #[test] -fn loop_detector_different_inputs_independent() { +fn scheduled_envelope_resets() { let mut det = LoopDetector::new(); let mut ctx = make_ctx(); - // Same tool, different inputs — different signatures - for i in 0..5 { - let call = vec![( - "id".into(), - "Read".into(), - json!({"file": format!("/tmp/{i}.rs")}), - )]; - let action = det.on_before_tools(&mut ctx, &call); - assert!( - matches!(action, Verdict::Continue), - "different inputs should not trigger loop at iteration {i}" - ); - } + prime_to_abort(&mut det, &mut ctx); + det.on_envelope_received(&MessageSource::Scheduled); + assert!(matches!( + cycle(&mut det, &mut ctx, "Read", "same"), + Verdict::Continue + )); } #[test] -fn loop_detector_multibyte_utf8_input_does_not_panic() { +fn system_envelope_does_not_reset() { let mut det = LoopDetector::new(); let mut ctx = make_ctx(); - // Large CJK input — we hash full JSON, so this only exercises UTF-8 - // safety of the serialized string. Must not panic. - let cjk = "中".repeat(200); // 600 bytes - let call = vec![("id".into(), "Write".into(), json!({"result": cjk}))]; - let action = det.on_before_tools(&mut ctx, &call); - assert!(matches!(action, Verdict::Continue)); + prime_to_abort(&mut det, &mut ctx); + det.on_envelope_received(&MessageSource::System("goal_continuation".into())); + assert!(matches!( + cycle(&mut det, &mut ctx, "Read", "same"), + Verdict::AbortTurn { .. } + )); } #[test] -fn loop_detector_on_compact_completed_resets_signatures() { +fn compact_completed_resets() { let mut det = LoopDetector::new(); let mut ctx = make_ctx(); - let calls = [tool("Read")]; - for _ in 0..4 { - det.on_before_tools(&mut ctx, &calls); - } + prime_to_abort(&mut det, &mut ctx); det.on_compact_completed(); - let action = det.on_before_tools(&mut ctx, &calls); - assert!( - matches!(action, Verdict::Continue), - "compact completion must reset signature counter; got {action:?}", - ); + assert!(matches!( + cycle(&mut det, &mut ctx, "Read", "same"), + Verdict::Continue + )); } #[test] -fn loop_detector_compact_reset_independent_from_envelope_reset() { +fn different_tools_are_independent() { let mut det = LoopDetector::new(); let mut ctx = make_ctx(); - let calls = [tool("Bash")]; - for _ in 0..2 { - det.on_before_tools(&mut ctx, &calls); - } - det.on_compact_completed(); - // After compact reset, three more calls should not yet abort - // (would only hit the WARN_THRESHOLD on the 3rd post-reset call). - let a1 = det.on_before_tools(&mut ctx, &calls); - let a2 = det.on_before_tools(&mut ctx, &calls); - let a3 = det.on_before_tools(&mut ctx, &calls); - assert!( - matches!(a1, Verdict::Continue), - "first post-compact call must Continue, got {a1:?}", - ); - assert!( - matches!(a2, Verdict::Continue), - "second post-compact call must Continue, got {a2:?}", - ); - assert!( - matches!(a3, Verdict::InjectWarning(_)), - "third post-compact call hits WARN_THRESHOLD as if starting fresh, got {a3:?}", - ); + cycle(&mut det, &mut ctx, "Read", "a"); + cycle(&mut det, &mut ctx, "Write", "a"); + cycle(&mut det, &mut ctx, "Read", "a"); + assert!(matches!( + cycle(&mut det, &mut ctx, "Write", "a"), + Verdict::Continue + )); } diff --git a/crates/loopal-session/src/event_handler.rs b/crates/loopal-session/src/event_handler.rs index fc100369..4fcf7b86 100644 --- a/crates/loopal-session/src/event_handler.rs +++ b/crates/loopal-session/src/event_handler.rs @@ -81,6 +81,8 @@ pub fn apply_event(state: &mut SessionState, event: AgentEvent) { | AgentEventPayload::HubDegraded { .. } | AgentEventPayload::HubRecovered { .. } | AgentEventPayload::DegenerationDetected(_) + | AgentEventPayload::ContinuationSkipped { .. } + | AgentEventPayload::TurnCancelled { .. } | AgentEventPayload::ContinuationGateChanged(_) => {} } } diff --git a/crates/loopal-tool-invocation/src/image.rs b/crates/loopal-tool-invocation/src/image.rs index 6abeffc7..e116ac77 100644 --- a/crates/loopal-tool-invocation/src/image.rs +++ b/crates/loopal-tool-invocation/src/image.rs @@ -75,6 +75,16 @@ impl ToolImageBlock { } } + // Content identity: Inline → base64 data, SessionResource → content-addressed + // id. Distinguishes different images at the same path (byte_size alone would + // collide two same-length-but-different images). + pub fn content_key(&self) -> &str { + match self { + Self::Inline { data, .. } => data, + Self::SessionResource { id, .. } => id, + } + } + pub fn is_inline(&self) -> bool { matches!(self, Self::Inline { .. }) } diff --git a/crates/loopal-turn/src/step.rs b/crates/loopal-turn/src/step.rs index e8d9afb5..1929d4fa 100644 --- a/crates/loopal-turn/src/step.rs +++ b/crates/loopal-turn/src/step.rs @@ -77,6 +77,8 @@ pub enum CancelCause { GovernanceAbort, CrashRecovery, Timeout, + // Superseded: a new envelope arrived and aborted the in-progress turn. + ParentTurnAborted, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/loopal-turn/src/turn.rs b/crates/loopal-turn/src/turn.rs index fb0349f4..3816f5de 100644 --- a/crates/loopal-turn/src/turn.rs +++ b/crates/loopal-turn/src/turn.rs @@ -118,6 +118,15 @@ pub enum TurnTrigger { Resume, } +impl TurnTrigger { + // reason: only goal-continuation turns are subject to the continuation + // consistency gate; every other trigger (real user input, cron, agent, + // channel, resume) must reach the LLM unconditionally. + pub fn is_goal_continuation(&self) -> bool { + matches!(self, TurnTrigger::GoalContinuation { .. }) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum TurnOutcome { InProgress, diff --git a/crates/loopal-turn/tests/suite.rs b/crates/loopal-turn/tests/suite.rs index 7fdf2097..191eb096 100644 --- a/crates/loopal-turn/tests/suite.rs +++ b/crates/loopal-turn/tests/suite.rs @@ -4,3 +4,5 @@ mod event_test; mod repo_test; #[path = "suite/turn_basic_test.rs"] mod turn_basic_test; +#[path = "suite/turn_trigger_test.rs"] +mod turn_trigger_test; diff --git a/crates/loopal-turn/tests/suite/turn_trigger_test.rs b/crates/loopal-turn/tests/suite/turn_trigger_test.rs new file mode 100644 index 00000000..4a166336 --- /dev/null +++ b/crates/loopal-turn/tests/suite/turn_trigger_test.rs @@ -0,0 +1,47 @@ +use loopal_turn::TurnTrigger; + +fn env(id: &str) -> String { + id.to_string() +} + +#[test] +fn only_goal_continuation_is_goal_continuation() { + let user = TurnTrigger::UserInput { + envelope_id: env("e"), + content: "hi".into(), + images: vec![], + }; + let cron = TurnTrigger::Cron { + envelope_id: env("e"), + content: "tick".into(), + }; + let agent = TurnTrigger::Agent { + envelope_id: env("e"), + from: "a".into(), + content: "c".into(), + }; + let channel = TurnTrigger::Channel { + envelope_id: env("e"), + channel: "ch".into(), + from: "a".into(), + content: "c".into(), + }; + let goal = TurnTrigger::GoalContinuation { + envelope_id: env("e"), + content: "keep going".into(), + }; + let hook = TurnTrigger::BackgroundHook { + envelope_id: env("e"), + hook_kind: "stop_feedback".into(), + content: "c".into(), + }; + let resume = TurnTrigger::Resume; + + assert!(goal.is_goal_continuation()); + for t in [user, cron, agent, channel, hook, resume] { + assert!( + !t.is_goal_continuation(), + "non-GoalContinuation trigger must not be flagged: {t:?}" + ); + } +} diff --git a/crates/loopal-view-state/src/mutators/mod.rs b/crates/loopal-view-state/src/mutators/mod.rs index d460916d..572fb5df 100644 --- a/crates/loopal-view-state/src/mutators/mod.rs +++ b/crates/loopal-view-state/src/mutators/mod.rs @@ -165,6 +165,8 @@ pub(crate) fn mutate(state: &mut SessionViewState, event: &AgentEventPayload) -> | SessionResumeWarnings { .. } | QuestionDecided { .. } | DegenerationDetected(_) + | ContinuationSkipped { .. } + | TurnCancelled { .. } | ContinuationGateChanged(_) => MutationEffect::NoOp, CompactProgress { phase, detail } => compact::progress(state, *phase, detail.as_deref()), }