From d74ecf92e4fe1eb5483d2314504a49e72a4bf547 Mon Sep 17 00:00:00 2001 From: yishuiliunian Date: Tue, 2 Jun 2026 12:35:54 +0800 Subject: [PATCH 1/2] fix(runtime): unfreeze TUI after /compact and resume; observability sweep Two root causes behind a frozen UI cluster (ESC dead, /compact stuck on "Streaming", resumed "continue" never consumed): - Bug A: view-state compact mutator usurped backend-authoritative status by flipping it to Running without restoring it. Status is now backend-only; the compact_banner alone conveys compacting. - Bug D (keystone): resume/cold-start with a User-tail history skipped the idle phase, so no turn record was opened and every append_step hit NoCurrentTurn. ensure_resume_turn_record() now opens the record before the turn runs, letting the loop return to idle and drain queued input (Bug C). - Bug B: post_compact now emits a reliable TokenUsage so the ctx counter refreshes; conversation_display defensively syncs tokens from Compacted. Observability: default log level -> debug; telemetry build_env_filter switches to global-default + third-party blacklist (was a whitelist that dropped tui/session/hub/view-state); empty-LLM-response warn; status-transition and user-message-routing debug anchors. Regression coverage (e2e): resume_then_followup_message_runs_second_turn, resume_user_tail_records_turn_with_llm_step, compact_token_sync, and compact_idle_e2e reproduce the exact frozen-UI event sequences. --- .../src/agent_loop/compaction_run.rs | 12 +++ .../src/agent_loop/llm_record.rs | 7 +- crates/loopal-runtime/src/agent_loop/mod.rs | 1 + crates/loopal-runtime/src/agent_loop/run.rs | 60 ++---------- .../src/agent_loop/runner_transition.rs | 1 + .../src/agent_loop/turn_record.rs | 18 ++++ .../src/agent_loop/turn_recover.rs | 58 +++++++++++ .../agent_loop/compact_token_sync_test.rs | 43 +++++++++ crates/loopal-runtime/tests/agent_loop/mod.rs | 1 + .../tests/agent_loop/resume_invariant_test.rs | 64 +++++++++++-- crates/loopal-telemetry/src/filter.rs | 48 ++++++++++ crates/loopal-telemetry/src/lib.rs | 2 + crates/loopal-tui/src/key_dispatch_ops.rs | 5 + .../src/conversation/conversation_display.rs | 6 ++ .../loopal-view-state/src/mutators/compact.rs | 3 +- crates/loopal-view-state/tests/suite.rs | 2 + .../suite/compact_banner_mutator_test.rs | 39 +++++++- .../tests/suite/compact_idle_e2e_test.rs | 96 +++++++++++++++++++ src/logging.rs | 11 +-- 19 files changed, 405 insertions(+), 72 deletions(-) create mode 100644 crates/loopal-runtime/src/agent_loop/turn_recover.rs create mode 100644 crates/loopal-runtime/tests/agent_loop/compact_token_sync_test.rs create mode 100644 crates/loopal-telemetry/src/filter.rs create mode 100644 crates/loopal-view-state/tests/suite/compact_idle_e2e_test.rs diff --git a/crates/loopal-runtime/src/agent_loop/compaction_run.rs b/crates/loopal-runtime/src/agent_loop/compaction_run.rs index 9ef0c5de..0cdde7c2 100644 --- a/crates/loopal-runtime/src/agent_loop/compaction_run.rs +++ b/crates/loopal-runtime/src/agent_loop/compaction_run.rs @@ -151,6 +151,18 @@ impl AgentLoopRunner { )) .await?; + // Route the post-compact token truth through the canonical token path + // so the status bar's ctx counter refreshes; mirrors resume reset. + self.emit(AgentEventPayload::TokenUsage { + input_tokens: tokens_after, + output_tokens: 0, + context_window: self.turns.view().budget().context_window, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 0, + thinking_tokens: 0, + }) + .await?; + // Done phase closes the progress stream so frontends can collapse // the inline indicator. Emitted last, after `Compacted` so any // listener that wants the final stats has them in hand. diff --git a/crates/loopal-runtime/src/agent_loop/llm_record.rs b/crates/loopal-runtime/src/agent_loop/llm_record.rs index 671de78f..43089ab6 100644 --- a/crates/loopal-runtime/src/agent_loop/llm_record.rs +++ b/crates/loopal-runtime/src/agent_loop/llm_record.rs @@ -3,7 +3,7 @@ use loopal_turn::{ AssistantOutput, ServerToolCall, ServerToolPair, ServerToolResult, StopReason as TurnStopReason, TextBlock, ThinkingBlock, ToolCall, ToolCallId, TurnStep, }; -use tracing::error; +use tracing::{error, warn}; use super::runner::AgentLoopRunner; @@ -23,6 +23,11 @@ impl AgentLoopRunner { let has_tools = !tool_uses.is_empty(); let has_server = !server_blocks.is_empty(); if !has_thinking && !has_text && !has_tools && !has_server { + warn!( + "LLM returned an empty response (no text, tool_use, thinking, or \ + server block); turn ends with no assistant output — check the \ + provider/endpoint for dropped content on large or image requests" + ); return; } let step = build_llm_call_step( diff --git a/crates/loopal-runtime/src/agent_loop/mod.rs b/crates/loopal-runtime/src/agent_loop/mod.rs index dd5d4c82..6ce50113 100644 --- a/crates/loopal-runtime/src/agent_loop/mod.rs +++ b/crates/loopal-runtime/src/agent_loop/mod.rs @@ -70,6 +70,7 @@ pub mod turn_history; pub(crate) mod turn_metrics; mod turn_observer_dispatch; mod turn_record; +mod turn_recover; mod turn_response; mod turn_state; mod turn_telemetry; diff --git a/crates/loopal-runtime/src/agent_loop/run.rs b/crates/loopal-runtime/src/agent_loop/run.rs index 3c75dcf2..4e92cb72 100644 --- a/crates/loopal-runtime/src/agent_loop/run.rs +++ b/crates/loopal-runtime/src/agent_loop/run.rs @@ -1,10 +1,8 @@ use loopal_error::{AgentOutput, LoopalError, Result, TerminateReason}; use loopal_protocol::{AgentEventPayload, AgentStatus}; use loopal_provider_api::MessageRole; -use loopal_provider_api::{ - ContinuationIntent, ContinuationReason, ErrorClass, default_classify_error, -}; -use tracing::{error, info, warn}; +use loopal_provider_api::{ContinuationIntent, ContinuationReason}; +use tracing::{error, info}; pub const CONTEXT_OVERFLOW_BANNER: &str = "Context overflow — compacting and retrying..."; @@ -65,6 +63,11 @@ impl AgentLoopRunner { "turn start" ); self.transition(AgentStatus::Running).await?; + // resume / cold-start with a User-tail history skips the idle + // phase (needs_input=false), so no ingest opened a turn record. + if !self.ensure_resume_turn_record().await? { + break; + } self.emit_inbox_consumed().await; let cancel = TurnCancel::new(self.interrupt.clone(), self.interrupt_tx.clone()); @@ -118,55 +121,6 @@ impl AgentLoopRunner { }) } - fn classify_turn_error(&self, err: &LoopalError) -> ErrorClass { - match self - .params - .deps - .kernel - .resolve_provider(self.params.config.model()) - { - Ok(provider) => provider.classify_error(err), - Err(_) => default_classify_error(err), - } - } - - async fn try_recover( - &mut self, - class: ErrorClass, - server_block_retry: &mut bool, - context_overflow_retry: &mut bool, - ) -> Result { - match class { - ErrorClass::ServerBlockError if !*server_block_retry => { - *server_block_retry = true; - info!("condensing server blocks after API rejection, retrying"); - self.turns - .with_wire_mut(loopal_context::condense_server_blocks); - Ok(true) - } - ErrorClass::ContextOverflow if !*context_overflow_retry => { - *context_overflow_retry = true; - info!("context overflow detected, force-compacting and retrying"); - // Emit banner ONLY after compact succeeded — pre-fix showed - // "retrying" then errored when force_compact silently bailed. - let compacted = self.force_compact(None).await?; - if !compacted { - warn!("force_compact declined; ContextOverflow propagates"); - return Ok(false); - } - self.emit_cosmetic(AgentEventPayload::Error { - message: CONTEXT_OVERFLOW_BANNER.into(), - }) - .await; - Ok(true) - } - // PrefillRejected here means provider's finalize_messages let it - // through — the model catalog has supports_prefill=true wrongly. - // Fail fast: silent retry would loop without state change. - _ => Ok(false), - } - } - pub(super) fn notify_observers_envelope_received( &mut self, source: &loopal_protocol::MessageSource, diff --git a/crates/loopal-runtime/src/agent_loop/runner_transition.rs b/crates/loopal-runtime/src/agent_loop/runner_transition.rs index 0d3165b5..1eb242e8 100644 --- a/crates/loopal-runtime/src/agent_loop/runner_transition.rs +++ b/crates/loopal-runtime/src/agent_loop/runner_transition.rs @@ -30,6 +30,7 @@ impl AgentLoopRunner { } let old = self.status; self.status = new_status; + tracing::debug!(?old, new = ?new_status, "agent status transition"); let result = match new_status { AgentStatus::Starting => Ok(()), AgentStatus::Running => self.emit(AgentEventPayload::Running).await, diff --git a/crates/loopal-runtime/src/agent_loop/turn_record.rs b/crates/loopal-runtime/src/agent_loop/turn_record.rs index cfde0fb6..d0278bba 100644 --- a/crates/loopal-runtime/src/agent_loop/turn_record.rs +++ b/crates/loopal-runtime/src/agent_loop/turn_record.rs @@ -34,6 +34,24 @@ impl AgentLoopRunner { self.turns.try_start_turn(trigger, &logger) } + pub(super) async fn ensure_resume_turn_record(&mut self) -> loopal_error::Result { + if self.turns.current_turn_id().is_some() + || self.start_turn_record(TurnTrigger::Resume).is_some() + { + return Ok(true); + } + tracing::error!("TurnStarted persist failed on resume; cannot execute turn"); + self.emit(loopal_protocol::AgentEventPayload::Error { + message: "Failed to start turn record on resume: persist log unavailable".to_string(), + }) + .await?; + Ok(false) + } + + pub fn recorded_turns(&self) -> &[loopal_turn::Turn] { + self.turns.store().turns() + } + pub fn append_step_record(&mut self, step: TurnStep) -> Result { let logger = make_logger(&self.params.deps.session_manager, &self.params.session.id); self.turns.try_append_step(step, &logger) diff --git a/crates/loopal-runtime/src/agent_loop/turn_recover.rs b/crates/loopal-runtime/src/agent_loop/turn_recover.rs new file mode 100644 index 00000000..73b7a8da --- /dev/null +++ b/crates/loopal-runtime/src/agent_loop/turn_recover.rs @@ -0,0 +1,58 @@ +use loopal_error::{LoopalError, Result}; +use loopal_protocol::AgentEventPayload; +use loopal_provider_api::{ErrorClass, default_classify_error}; +use tracing::{info, warn}; + +use super::run::CONTEXT_OVERFLOW_BANNER; +use super::runner::AgentLoopRunner; + +impl AgentLoopRunner { + pub(super) fn classify_turn_error(&self, err: &LoopalError) -> ErrorClass { + match self + .params + .deps + .kernel + .resolve_provider(self.params.config.model()) + { + Ok(provider) => provider.classify_error(err), + Err(_) => default_classify_error(err), + } + } + + pub(super) async fn try_recover( + &mut self, + class: ErrorClass, + server_block_retry: &mut bool, + context_overflow_retry: &mut bool, + ) -> Result { + match class { + ErrorClass::ServerBlockError if !*server_block_retry => { + *server_block_retry = true; + info!("condensing server blocks after API rejection, retrying"); + self.turns + .with_wire_mut(loopal_context::condense_server_blocks); + Ok(true) + } + ErrorClass::ContextOverflow if !*context_overflow_retry => { + *context_overflow_retry = true; + info!("context overflow detected, force-compacting and retrying"); + // Emit banner ONLY after compact succeeded — pre-fix showed + // "retrying" then errored when force_compact silently bailed. + let compacted = self.force_compact(None).await?; + if !compacted { + warn!("force_compact declined; ContextOverflow propagates"); + return Ok(false); + } + self.emit_cosmetic(AgentEventPayload::Error { + message: CONTEXT_OVERFLOW_BANNER.into(), + }) + .await; + Ok(true) + } + // PrefillRejected here means provider's finalize_messages let it + // through — the model catalog has supports_prefill=true wrongly. + // Fail fast: silent retry would loop without state change. + _ => Ok(false), + } + } +} diff --git a/crates/loopal-runtime/tests/agent_loop/compact_token_sync_test.rs b/crates/loopal-runtime/tests/agent_loop/compact_token_sync_test.rs new file mode 100644 index 00000000..154b77e8 --- /dev/null +++ b/crates/loopal-runtime/tests/agent_loop/compact_token_sync_test.rs @@ -0,0 +1,43 @@ +use loopal_protocol::AgentEventPayload; +use loopal_provider_api::Message; +use loopal_test_support::{HarnessBuilder, chunks}; + +#[tokio::test] +async fn force_compact_emits_token_usage_matching_tokens_after() { + let calls = vec![chunks::text_turn("summary")]; + let mut h = HarnessBuilder::new() + .calls(calls) + .messages(vec![ + Message::user("turn 1 content"), + Message::user("turn 2 content"), + Message::user("turn 3 content"), + Message::user("turn 4 content"), + Message::user("turn 5 content"), + ]) + .build() + .await; + + let _ = h.runner.force_compact(None).await; + + let evts = loopal_test_support::events::drain_pending(&mut h.event_rx).await; + let tokens_after = evts + .iter() + .find_map(|e| match e { + AgentEventPayload::Compacted(s) => Some(s.tokens_after), + _ => None, + }) + .expect("force_compact must emit Compacted"); + let usage_input = evts + .iter() + .find_map(|e| match e { + AgentEventPayload::TokenUsage { input_tokens, .. } => Some(*input_tokens), + _ => None, + }) + .expect("force_compact must emit TokenUsage to refresh the ctx counter"); + + assert_eq!( + usage_input, tokens_after, + "post-compact TokenUsage.input_tokens must equal Compacted.tokens_after \ + so the status bar ctx counter reflects the compacted size", + ); +} diff --git a/crates/loopal-runtime/tests/agent_loop/mod.rs b/crates/loopal-runtime/tests/agent_loop/mod.rs index 1d775552..4ab0ee7f 100644 --- a/crates/loopal-runtime/tests/agent_loop/mod.rs +++ b/crates/loopal-runtime/tests/agent_loop/mod.rs @@ -62,6 +62,7 @@ mod compact_force_e2e_test; mod compact_hooks_e2e_test; mod compact_instructions_e2e_test; mod compact_phases_e2e_test; +mod compact_token_sync_test; mod compaction_run_e2e_test; mod cron_e2e_test; mod degeneration_e2e_test; diff --git a/crates/loopal-runtime/tests/agent_loop/resume_invariant_test.rs b/crates/loopal-runtime/tests/agent_loop/resume_invariant_test.rs index 798d7a05..15c621d3 100644 --- a/crates/loopal-runtime/tests/agent_loop/resume_invariant_test.rs +++ b/crates/loopal-runtime/tests/agent_loop/resume_invariant_test.rs @@ -1,7 +1,7 @@ use loopal_config::Settings; use loopal_error::LoopalError; use loopal_kernel::Kernel; -use loopal_protocol::{AgentEvent, ControlCommand, Envelope}; +use loopal_protocol::{AgentEvent, ControlCommand, Envelope, MessageSource}; use loopal_provider_api::{ChatParams, ChatStream, Provider, StopReason, StreamChunk}; use loopal_provider_api::{ContentBlock, Message, MessageRole}; use loopal_runtime::agent_loop::AgentLoopRunner; @@ -69,10 +69,11 @@ fn make_runner_with_history( AgentLoopRunner, Arc, mpsc::Receiver, + mpsc::Sender, ) { let fixture = TestFixture::new(); let (event_tx, event_rx) = mpsc::channel::(64); - let (_mbox_tx, mailbox_rx) = mpsc::channel::(16); + let (mbox_tx, mailbox_rx) = mpsc::channel::(16); let (_ctrl_tx, control_rx) = mpsc::channel::(16); let frontend = Arc::new(UnifiedFrontend::new( None, @@ -108,7 +109,7 @@ fn make_runner_with_history( let mut runner = AgentLoopRunner::new(params); let turns = loopal_test_support::seed_history::reverse_project_messages_to_turns(history); runner.seed_test_turns(turns); - (runner, call_count, event_rx) + (runner, call_count, event_rx, mbox_tx) } #[tokio::test] @@ -119,7 +120,7 @@ async fn resume_with_assistant_tail_does_not_call_llm() { // skipped idle phase → ReadyToCall debug_assert panicked / release silently // sent assistant-tailed messages to the LLM. let history = vec![user("hello"), assistant("hi there")]; - let (mut runner, calls, mut rx) = make_runner_with_history(history); + let (mut runner, calls, mut rx, _mbox_tx) = make_runner_with_history(history); tokio::spawn(async move { while rx.recv().await.is_some() {} }); let _ = runner.run().await.unwrap(); @@ -138,7 +139,7 @@ async fn resume_with_user_tail_calls_llm_immediately() { // Sanity: when last message is a User (e.g. tool_result mid-turn), the // agent should resume the turn without waiting for further input. let history = vec![user("question")]; - let (mut runner, calls, mut rx) = make_runner_with_history(history); + let (mut runner, calls, mut rx, _mbox_tx) = make_runner_with_history(history); tokio::spawn(async move { while rx.recv().await.is_some() {} }); let _ = runner.run().await.unwrap(); @@ -152,7 +153,7 @@ async fn resume_with_user_tail_calls_llm_immediately() { #[tokio::test] async fn resume_with_empty_store_waits_for_input() { - let (mut runner, calls, mut rx) = make_runner_with_history(vec![]); + let (mut runner, calls, mut rx, _mbox_tx) = make_runner_with_history(vec![]); tokio::spawn(async move { while rx.recv().await.is_some() {} }); let _ = runner.run().await.unwrap(); @@ -163,3 +164,54 @@ async fn resume_with_empty_store_waits_for_input() { "empty store must wait for user input" ); } + +#[tokio::test] +async fn resume_user_tail_records_turn_with_llm_step() { + // Bug D regression: a User-tail history projects to a Complete turn, so + // `current_turn_id` is None on resume. Pre-fix, run_loop went straight to + // execute_turn without opening a turn record, and every append_step hit + // NoCurrentTurn — the LlmCall step was silently dropped (no turn recorded). + let history = vec![user("question")]; + let (mut runner, _calls, mut rx, _mbox_tx) = make_runner_with_history(history); + tokio::spawn(async move { while rx.recv().await.is_some() {} }); + + let _ = runner.run().await.unwrap(); + + let has_llm_step = runner + .recorded_turns() + .iter() + .flat_map(|t| &t.body.steps) + .any(|s| matches!(s, loopal_turn::TurnStep::LlmCall { .. })); + assert!( + has_llm_step, + "resumed turn must open a turn record so its LlmCall step is persisted, \ + not dropped by NoCurrentTurn", + ); +} + +#[tokio::test] +async fn resume_then_followup_message_runs_second_turn() { + // Bug C end-to-end: after a User-tail cold-start resume runs its turn to + // completion, the Ephemeral loop must return to the idle phase and drain + // queued mailbox input — proving the "continue not consumed" regression is + // gone. Pre-Bug-D-fix the resumed turn never closed cleanly, the loop never + // came back to idle, and the followup Envelope sat forever in the mailbox. + let history = vec![user("question")]; + let (mut runner, calls, mut rx, mbox_tx) = make_runner_with_history(history); + tokio::spawn(async move { while rx.recv().await.is_some() {} }); + + mbox_tx + .send(Envelope::new(MessageSource::Human, "main", "continue")) + .await + .unwrap(); + drop(mbox_tx); + + let _ = runner.run().await.unwrap(); + + assert_eq!( + calls.load(Ordering::SeqCst), + 2, + "resume turn (1) + followup 'continue' turn (2); a count of 1 means the \ + queued message was never consumed after resume", + ); +} diff --git a/crates/loopal-telemetry/src/filter.rs b/crates/loopal-telemetry/src/filter.rs new file mode 100644 index 00000000..b293ac94 --- /dev/null +++ b/crates/loopal-telemetry/src/filter.rs @@ -0,0 +1,48 @@ +use tracing_subscriber::EnvFilter; + +/// Build the tracing filter. `level` is the global default applied to every +/// `loopal-*` crate. Using a global default (rather than a per-crate +/// whitelist) ensures new crates are never silently dropped from logging — +/// the prior whitelist had omitted the tui/session/hub/view-state crates, +/// making frontend/IPC failures invisible. Known-noisy third-party crates are +/// pinned lower so the default `debug` stays readable. +pub fn build_env_filter(level: &str) -> EnvFilter { + EnvFilter::new(filter_directives(level)) +} + +fn filter_directives(level: &str) -> String { + format!( + "{level},hyper=info,hyper_util=info,h2=info,rustls=info,reqwest=info,\ + tokio=info,tokio_util=info,mio=info,want=info,rusqlite=info,rmcp=info,\ + html5ever=info,selectors=info" + ) +} + +#[cfg(test)] +mod tests { + use super::filter_directives; + + #[test] + fn default_level_is_global_not_per_crate() { + let d = filter_directives("debug"); + assert!( + d.starts_with("debug,"), + "global default level must lead the directive list: {d}" + ); + } + + #[test] + fn noisy_third_party_crates_are_pinned() { + let d = filter_directives("debug"); + for noisy in ["hyper=info", "rustls=info", "tokio=info", "rmcp=info"] { + assert!(d.contains(noisy), "missing third-party pin {noisy}: {d}"); + } + } + + #[test] + fn no_loopal_crate_is_whitelisted() { + // Regression: a per-crate whitelist is what dropped frontend crates. + // The global-default design must not reintroduce explicit loopal pins. + assert!(!filter_directives("debug").contains("loopal_")); + } +} diff --git a/crates/loopal-telemetry/src/lib.rs b/crates/loopal-telemetry/src/lib.rs index 91edcca2..ee5f512f 100644 --- a/crates/loopal-telemetry/src/lib.rs +++ b/crates/loopal-telemetry/src/lib.rs @@ -7,6 +7,7 @@ pub(crate) mod file_metric_exporter; pub(crate) mod file_span_exporter; +mod filter; mod logs; mod metrics; mod resource; @@ -14,5 +15,6 @@ mod shutdown; mod subscriber; mod traces; +pub use filter::build_env_filter; pub use shutdown::TelemetryGuard; pub use subscriber::init_subscriber; diff --git a/crates/loopal-tui/src/key_dispatch_ops.rs b/crates/loopal-tui/src/key_dispatch_ops.rs index 5e4e83e9..61ca6fbf 100644 --- a/crates/loopal-tui/src/key_dispatch_ops.rs +++ b/crates/loopal-tui/src/key_dispatch_ops.rs @@ -55,6 +55,11 @@ pub(crate) async fn push_to_inbox(app: &mut App, content: UserContent) { app.history_index = None; // SSOT: do not write the user row locally — `route_to_agent` emits // `UserMessageQueued` so every attached UI sees the same history. + tracing::debug!( + active_view = %app.session.lock().active_view, + text_len = content.text.len(), + "TUI routing user message to agent" + ); app.session.route_message(content).await; } diff --git a/crates/loopal-view-state/src/conversation/conversation_display.rs b/crates/loopal-view-state/src/conversation/conversation_display.rs index 7c696791..bf3da73f 100644 --- a/crates/loopal-view-state/src/conversation/conversation_display.rs +++ b/crates/loopal-view-state/src/conversation/conversation_display.rs @@ -76,4 +76,10 @@ pub fn handle_compaction( {kept} kept. {tokens_before}→{tokens_after} tokens ({pct}% freed).", ), ); + // Self-correct ctx counter from the Compacted event alone, in case the + // paired TokenUsage emit is dropped or reordered. + conv.input_tokens = tokens_after; + conv.output_tokens = 0; + conv.cache_creation_tokens = 0; + conv.cache_read_tokens = 0; } diff --git a/crates/loopal-view-state/src/mutators/compact.rs b/crates/loopal-view-state/src/mutators/compact.rs index 2b0138cf..f0799cbb 100644 --- a/crates/loopal-view-state/src/mutators/compact.rs +++ b/crates/loopal-view-state/src/mutators/compact.rs @@ -1,4 +1,4 @@ -use loopal_protocol::{AgentStatus, CompactPhase}; +use loopal_protocol::CompactPhase; use crate::state::SessionViewState; @@ -17,7 +17,6 @@ pub(super) fn progress( } phase => { conv.compact_banner = Some(format_banner(phase, detail)); - state.agent.observable.status = AgentStatus::Running; } } MutationEffect::Mutated diff --git a/crates/loopal-view-state/tests/suite.rs b/crates/loopal-view-state/tests/suite.rs index d5b48f5b..9a36b7a0 100644 --- a/crates/loopal-view-state/tests/suite.rs +++ b/crates/loopal-view-state/tests/suite.rs @@ -3,6 +3,8 @@ mod classifier_status_mutator_test; #[path = "suite/compact_banner_mutator_test.rs"] mod compact_banner_mutator_test; +#[path = "suite/compact_idle_e2e_test.rs"] +mod compact_idle_e2e_test; #[path = "suite/conversation_serde_test.rs"] mod conversation_serde_test; #[path = "suite/decided_mutators_test.rs"] diff --git a/crates/loopal-view-state/tests/suite/compact_banner_mutator_test.rs b/crates/loopal-view-state/tests/suite/compact_banner_mutator_test.rs index 3871c766..21665af2 100644 --- a/crates/loopal-view-state/tests/suite/compact_banner_mutator_test.rs +++ b/crates/loopal-view-state/tests/suite/compact_banner_mutator_test.rs @@ -1,4 +1,4 @@ -use loopal_protocol::{AgentEventPayload, CompactPhase, CompactionSummary}; +use loopal_protocol::{AgentEventPayload, AgentStatus, CompactPhase, CompactionSummary}; use loopal_view_state::ViewStateReducer; fn progress(phase: CompactPhase, detail: Option<&str>) -> AgentEventPayload { @@ -113,3 +113,40 @@ fn phase_transitions_replace_previous_banner() { let second = banner_of(&r).unwrap(); assert_ne!(first, second, "transition must replace banner text"); } + +#[test] +fn compacted_event_refreshes_ctx() { + let mut r = ViewStateReducer::new("root"); + r.apply(AgentEventPayload::Compacted(CompactionSummary { + kept: 9, + removed: 491, + tokens_before: 259_392, + tokens_after: 6_453, + strategy: "manual".into(), + summary_msg_id: None, + files_rehydrated: 5, + })); + assert_eq!( + r.state().agent.conversation.token_count(), + 6_453, + "Compacted event must refresh ctx token count to tokens_after", + ); +} + +#[test] +fn compact_progress_does_not_touch_status() { + let mut r = ViewStateReducer::new("root"); + r.apply(AgentEventPayload::AwaitingInput); + let before = r.state().agent.observable.status; + assert_eq!(before, AgentStatus::WaitingForInput); + + r.apply(progress(CompactPhase::Summarize, None)); + r.apply(progress(CompactPhase::Rehydrate, Some("5 files"))); + r.apply(progress(CompactPhase::Done, None)); + + assert_eq!( + r.state().agent.observable.status, + AgentStatus::WaitingForInput, + "compaction progress must not mutate backend-authoritative status", + ); +} diff --git a/crates/loopal-view-state/tests/suite/compact_idle_e2e_test.rs b/crates/loopal-view-state/tests/suite/compact_idle_e2e_test.rs new file mode 100644 index 00000000..7cdb6e4c --- /dev/null +++ b/crates/loopal-view-state/tests/suite/compact_idle_e2e_test.rs @@ -0,0 +1,96 @@ +use loopal_protocol::{AgentEventPayload, AgentStatus, CompactPhase, CompactionSummary}; +use loopal_view_state::ViewStateReducer; + +fn idle_reducer() -> ViewStateReducer { + let mut r = ViewStateReducer::new("root"); + r.apply(AgentEventPayload::AwaitingInput); + assert_eq!( + r.state().agent.observable.status, + AgentStatus::WaitingForInput, + "precondition: reducer must start idle" + ); + r +} + +fn summarize(detail: &str) -> AgentEventPayload { + AgentEventPayload::CompactProgress { + phase: CompactPhase::Summarize, + detail: Some(detail.to_string()), + } +} + +fn done() -> AgentEventPayload { + AgentEventPayload::CompactProgress { + phase: CompactPhase::Done, + detail: None, + } +} + +// Reproduces the exact event sequence the backend emits for a manual /compact +// triggered while the agent is idle (no Running/AwaitingInput transitions — +// the agent never leaves WaitingForInput). Pre-fix, Summarize flipped status +// to Running and Done never restored it, leaving the TUI stuck on "Streaming" +// with ESC routing to interrupt and typed input swallowed. +#[test] +fn manual_compact_from_idle_keeps_idle_and_refreshes_ctx() { + let mut r = idle_reducer(); + + r.apply(summarize("259392 tokens before")); + r.apply(AgentEventPayload::Compacted(CompactionSummary { + kept: 9, + removed: 491, + tokens_before: 259_392, + tokens_after: 6_453, + strategy: "manual".into(), + summary_msg_id: None, + files_rehydrated: 5, + })); + r.apply(AgentEventPayload::TokenUsage { + input_tokens: 6_453, + output_tokens: 0, + context_window: 1_000_000, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 0, + thinking_tokens: 0, + }); + r.apply(done()); + + let state = r.state(); + assert_eq!( + state.agent.observable.status, + AgentStatus::WaitingForInput, + "after /compact the agent must remain idle so ESC/input work normally", + ); + assert_eq!( + state.agent.conversation.token_count(), + 6_453, + "ctx counter must reflect the compacted token total", + ); + assert_eq!( + state.agent.conversation.compact_banner, None, + "Done phase must clear the compacting banner", + ); +} + +// Even if the paired TokenUsage emit is dropped, the Compacted event alone +// must leave the reducer self-consistent (idle + refreshed ctx). +#[test] +fn manual_compact_idle_without_token_usage_still_consistent() { + let mut r = idle_reducer(); + + r.apply(summarize("259392 tokens before")); + r.apply(AgentEventPayload::Compacted(CompactionSummary { + kept: 9, + removed: 491, + tokens_before: 259_392, + tokens_after: 6_453, + strategy: "manual".into(), + summary_msg_id: None, + files_rehydrated: 5, + })); + r.apply(done()); + + let state = r.state(); + assert_eq!(state.agent.observable.status, AgentStatus::WaitingForInput); + assert_eq!(state.agent.conversation.token_count(), 6_453); +} diff --git a/src/logging.rs b/src/logging.rs index 52f1d6b4..4f2fb696 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -20,15 +20,8 @@ pub fn init_logging( let writer = crate::log_writer::RotatingFileWriter::new(&log_dir); let log_path = writer.current_path(); - let env_filter = std::env::var("LOOPAL_LOG").unwrap_or_else(|_| "info".to_string()); - let filter_str = format!( - "loopal={env_filter},loopal_runtime={env_filter},\ - loopal_provider={env_filter},loopal_kernel={env_filter},\ - loopal_mcp={env_filter},loopal_tools={env_filter},\ - loopal_context={env_filter},loopal_hooks={env_filter},\ - loopal_storage={env_filter},loopal_config={env_filter}" - ); - let env_filter = tracing_subscriber::EnvFilter::new(filter_str); + let level = std::env::var("LOOPAL_LOG").unwrap_or_else(|_| "debug".to_string()); + let env_filter = loopal_telemetry::build_env_filter(&level); let guard = loopal_telemetry::init_subscriber(telemetry_config, writer, env_filter); From 3d992fb0817b15f0da1dd61d0a293302948e3a30 Mon Sep 17 00:00:00 2001 From: yishuiliunian Date: Tue, 2 Jun 2026 13:06:58 +0800 Subject: [PATCH 2/2] fix: address CI failure - deflake resume_then_followup e2e test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The followup Envelope was pre-sent before run(), landing inside the resume turn's mid-turn inject_pending_messages drain window. Whether it was absorbed into turn 1 or ran as turn 2 depended on scheduling — flaky (failed ~70% on macOS CI, passed locally only when bundled in the full suite). Gate delivery on the agent's own AwaitingInput event: send "continue" only after the resume turn completes and the loop returns to idle, then close both mailbox + control senders on the second idle to end the Persistent loop. The helper now returns control_tx (5-tuple); existing tests drop both senders up front to keep their closed-channel break behavior. Deterministic 15/15. --- .../tests/agent_loop/resume_invariant_test.rs | 69 ++++++++++++++----- 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/crates/loopal-runtime/tests/agent_loop/resume_invariant_test.rs b/crates/loopal-runtime/tests/agent_loop/resume_invariant_test.rs index 15c621d3..dea17986 100644 --- a/crates/loopal-runtime/tests/agent_loop/resume_invariant_test.rs +++ b/crates/loopal-runtime/tests/agent_loop/resume_invariant_test.rs @@ -70,11 +70,12 @@ fn make_runner_with_history( Arc, mpsc::Receiver, mpsc::Sender, + mpsc::Sender, ) { let fixture = TestFixture::new(); let (event_tx, event_rx) = mpsc::channel::(64); let (mbox_tx, mailbox_rx) = mpsc::channel::(16); - let (_ctrl_tx, control_rx) = mpsc::channel::(16); + let (ctrl_tx, control_rx) = mpsc::channel::(16); let frontend = Arc::new(UnifiedFrontend::new( None, event_tx, @@ -109,7 +110,7 @@ fn make_runner_with_history( let mut runner = AgentLoopRunner::new(params); let turns = loopal_test_support::seed_history::reverse_project_messages_to_turns(history); runner.seed_test_turns(turns); - (runner, call_count, event_rx, mbox_tx) + (runner, call_count, event_rx, mbox_tx, ctrl_tx) } #[tokio::test] @@ -120,7 +121,9 @@ async fn resume_with_assistant_tail_does_not_call_llm() { // skipped idle phase → ReadyToCall debug_assert panicked / release silently // sent assistant-tailed messages to the LLM. let history = vec![user("hello"), assistant("hi there")]; - let (mut runner, calls, mut rx, _mbox_tx) = make_runner_with_history(history); + let (mut runner, calls, mut rx, mbox_tx, ctrl_tx) = make_runner_with_history(history); + drop(mbox_tx); + drop(ctrl_tx); tokio::spawn(async move { while rx.recv().await.is_some() {} }); let _ = runner.run().await.unwrap(); @@ -139,7 +142,9 @@ async fn resume_with_user_tail_calls_llm_immediately() { // Sanity: when last message is a User (e.g. tool_result mid-turn), the // agent should resume the turn without waiting for further input. let history = vec![user("question")]; - let (mut runner, calls, mut rx, _mbox_tx) = make_runner_with_history(history); + let (mut runner, calls, mut rx, mbox_tx, ctrl_tx) = make_runner_with_history(history); + drop(mbox_tx); + drop(ctrl_tx); tokio::spawn(async move { while rx.recv().await.is_some() {} }); let _ = runner.run().await.unwrap(); @@ -153,7 +158,9 @@ async fn resume_with_user_tail_calls_llm_immediately() { #[tokio::test] async fn resume_with_empty_store_waits_for_input() { - let (mut runner, calls, mut rx, _mbox_tx) = make_runner_with_history(vec![]); + let (mut runner, calls, mut rx, mbox_tx, ctrl_tx) = make_runner_with_history(vec![]); + drop(mbox_tx); + drop(ctrl_tx); tokio::spawn(async move { while rx.recv().await.is_some() {} }); let _ = runner.run().await.unwrap(); @@ -172,7 +179,9 @@ async fn resume_user_tail_records_turn_with_llm_step() { // execute_turn without opening a turn record, and every append_step hit // NoCurrentTurn — the LlmCall step was silently dropped (no turn recorded). let history = vec![user("question")]; - let (mut runner, _calls, mut rx, _mbox_tx) = make_runner_with_history(history); + let (mut runner, _calls, mut rx, mbox_tx, ctrl_tx) = make_runner_with_history(history); + drop(mbox_tx); + drop(ctrl_tx); tokio::spawn(async move { while rx.recv().await.is_some() {} }); let _ = runner.run().await.unwrap(); @@ -192,19 +201,41 @@ async fn resume_user_tail_records_turn_with_llm_step() { #[tokio::test] async fn resume_then_followup_message_runs_second_turn() { // Bug C end-to-end: after a User-tail cold-start resume runs its turn to - // completion, the Ephemeral loop must return to the idle phase and drain - // queued mailbox input — proving the "continue not consumed" regression is - // gone. Pre-Bug-D-fix the resumed turn never closed cleanly, the loop never - // came back to idle, and the followup Envelope sat forever in the mailbox. + // completion, the loop must return to idle and consume queued input — + // proving the "continue not consumed" regression is gone. Pre-Bug-D-fix the + // resumed turn never closed cleanly, the loop never came back to idle, and + // a followup never ran. + // + // Delivery is gated on the agent's own AwaitingInput event so the followup + // is never injected during the resume turn's mid-turn drain window (which + // would absorb it into turn 1 and make the count ambiguous). First idle → + // send "continue"; second idle (turn 2 done) → drop both senders so + // wait_for_input observes a closed channel and the loop exits. let history = vec![user("question")]; - let (mut runner, calls, mut rx, mbox_tx) = make_runner_with_history(history); - tokio::spawn(async move { while rx.recv().await.is_some() {} }); - - mbox_tx - .send(Envelope::new(MessageSource::Human, "main", "continue")) - .await - .unwrap(); - drop(mbox_tx); + let (mut runner, calls, mut rx, mbox_tx, ctrl_tx) = make_runner_with_history(history); + + tokio::spawn(async move { + let mut idle_count = 0; + while let Some(ev) = rx.recv().await { + if matches!( + ev.payload, + loopal_protocol::AgentEventPayload::AwaitingInput + ) { + idle_count += 1; + if idle_count == 1 { + mbox_tx + .send(Envelope::new(MessageSource::Human, "main", "continue")) + .await + .ok(); + } else { + drop(mbox_tx); + drop(ctrl_tx); + break; + } + } + } + while rx.recv().await.is_some() {} + }); let _ = runner.run().await.unwrap(); @@ -212,6 +243,6 @@ async fn resume_then_followup_message_runs_second_turn() { calls.load(Ordering::SeqCst), 2, "resume turn (1) + followup 'continue' turn (2); a count of 1 means the \ - queued message was never consumed after resume", + queued message was never consumed after the loop returned to idle", ); }