Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crates/loopal-runtime/src/agent_loop/compaction_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ impl AgentLoopRunner {
))
.await?;

// Route the post-compact token truth through the canonical token path
// so the status bar's ctx counter refreshes; mirrors resume reset.
self.emit(AgentEventPayload::TokenUsage {
input_tokens: tokens_after,
output_tokens: 0,
context_window: self.turns.view().budget().context_window,
cache_creation_input_tokens: 0,
cache_read_input_tokens: 0,
thinking_tokens: 0,
})
.await?;

// Done phase closes the progress stream so frontends can collapse
// the inline indicator. Emitted last, after `Compacted` so any
// listener that wants the final stats has them in hand.
Expand Down
7 changes: 6 additions & 1 deletion crates/loopal-runtime/src/agent_loop/llm_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use loopal_turn::{
AssistantOutput, ServerToolCall, ServerToolPair, ServerToolResult,
StopReason as TurnStopReason, TextBlock, ThinkingBlock, ToolCall, ToolCallId, TurnStep,
};
use tracing::error;
use tracing::{error, warn};

use super::runner::AgentLoopRunner;

Expand All @@ -23,6 +23,11 @@ impl AgentLoopRunner {
let has_tools = !tool_uses.is_empty();
let has_server = !server_blocks.is_empty();
if !has_thinking && !has_text && !has_tools && !has_server {
warn!(
"LLM returned an empty response (no text, tool_use, thinking, or \
server block); turn ends with no assistant output — check the \
provider/endpoint for dropped content on large or image requests"
);
return;
}
let step = build_llm_call_step(
Expand Down
1 change: 1 addition & 0 deletions crates/loopal-runtime/src/agent_loop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub mod turn_history;
pub(crate) mod turn_metrics;
mod turn_observer_dispatch;
mod turn_record;
mod turn_recover;
mod turn_response;
mod turn_state;
mod turn_telemetry;
Expand Down
60 changes: 7 additions & 53 deletions crates/loopal-runtime/src/agent_loop/run.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use loopal_error::{AgentOutput, LoopalError, Result, TerminateReason};
use loopal_protocol::{AgentEventPayload, AgentStatus};
use loopal_provider_api::MessageRole;
use loopal_provider_api::{
ContinuationIntent, ContinuationReason, ErrorClass, default_classify_error,
};
use tracing::{error, info, warn};
use loopal_provider_api::{ContinuationIntent, ContinuationReason};
use tracing::{error, info};

pub const CONTEXT_OVERFLOW_BANNER: &str = "Context overflow — compacting and retrying...";

Expand Down Expand Up @@ -65,6 +63,11 @@ impl AgentLoopRunner {
"turn start"
);
self.transition(AgentStatus::Running).await?;
// resume / cold-start with a User-tail history skips the idle
// phase (needs_input=false), so no ingest opened a turn record.
if !self.ensure_resume_turn_record().await? {
break;
}
self.emit_inbox_consumed().await;

let cancel = TurnCancel::new(self.interrupt.clone(), self.interrupt_tx.clone());
Expand Down Expand Up @@ -118,55 +121,6 @@ impl AgentLoopRunner {
})
}

fn classify_turn_error(&self, err: &LoopalError) -> ErrorClass {
match self
.params
.deps
.kernel
.resolve_provider(self.params.config.model())
{
Ok(provider) => provider.classify_error(err),
Err(_) => default_classify_error(err),
}
}

async fn try_recover(
&mut self,
class: ErrorClass,
server_block_retry: &mut bool,
context_overflow_retry: &mut bool,
) -> Result<bool> {
match class {
ErrorClass::ServerBlockError if !*server_block_retry => {
*server_block_retry = true;
info!("condensing server blocks after API rejection, retrying");
self.turns
.with_wire_mut(loopal_context::condense_server_blocks);
Ok(true)
}
ErrorClass::ContextOverflow if !*context_overflow_retry => {
*context_overflow_retry = true;
info!("context overflow detected, force-compacting and retrying");
// Emit banner ONLY after compact succeeded — pre-fix showed
// "retrying" then errored when force_compact silently bailed.
let compacted = self.force_compact(None).await?;
if !compacted {
warn!("force_compact declined; ContextOverflow propagates");
return Ok(false);
}
self.emit_cosmetic(AgentEventPayload::Error {
message: CONTEXT_OVERFLOW_BANNER.into(),
})
.await;
Ok(true)
}
// PrefillRejected here means provider's finalize_messages let it
// through — the model catalog has supports_prefill=true wrongly.
// Fail fast: silent retry would loop without state change.
_ => Ok(false),
}
}

pub(super) fn notify_observers_envelope_received(
&mut self,
source: &loopal_protocol::MessageSource,
Expand Down
1 change: 1 addition & 0 deletions crates/loopal-runtime/src/agent_loop/runner_transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl AgentLoopRunner {
}
let old = self.status;
self.status = new_status;
tracing::debug!(?old, new = ?new_status, "agent status transition");
let result = match new_status {
AgentStatus::Starting => Ok(()),
AgentStatus::Running => self.emit(AgentEventPayload::Running).await,
Expand Down
18 changes: 18 additions & 0 deletions crates/loopal-runtime/src/agent_loop/turn_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ impl AgentLoopRunner {
self.turns.try_start_turn(trigger, &logger)
}

pub(super) async fn ensure_resume_turn_record(&mut self) -> loopal_error::Result<bool> {
if self.turns.current_turn_id().is_some()
|| self.start_turn_record(TurnTrigger::Resume).is_some()
{
return Ok(true);
}
tracing::error!("TurnStarted persist failed on resume; cannot execute turn");
self.emit(loopal_protocol::AgentEventPayload::Error {
message: "Failed to start turn record on resume: persist log unavailable".to_string(),
})
.await?;
Ok(false)
}

pub fn recorded_turns(&self) -> &[loopal_turn::Turn] {
self.turns.store().turns()
}

pub fn append_step_record(&mut self, step: TurnStep) -> Result<u32, TurnTrackerError> {
let logger = make_logger(&self.params.deps.session_manager, &self.params.session.id);
self.turns.try_append_step(step, &logger)
Expand Down
58 changes: 58 additions & 0 deletions crates/loopal-runtime/src/agent_loop/turn_recover.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use loopal_error::{LoopalError, Result};
use loopal_protocol::AgentEventPayload;
use loopal_provider_api::{ErrorClass, default_classify_error};
use tracing::{info, warn};

use super::run::CONTEXT_OVERFLOW_BANNER;
use super::runner::AgentLoopRunner;

impl AgentLoopRunner {
pub(super) fn classify_turn_error(&self, err: &LoopalError) -> ErrorClass {
match self
.params
.deps
.kernel
.resolve_provider(self.params.config.model())
{
Ok(provider) => provider.classify_error(err),
Err(_) => default_classify_error(err),
}
}

pub(super) async fn try_recover(
&mut self,
class: ErrorClass,
server_block_retry: &mut bool,
context_overflow_retry: &mut bool,
) -> Result<bool> {
match class {
ErrorClass::ServerBlockError if !*server_block_retry => {
*server_block_retry = true;
info!("condensing server blocks after API rejection, retrying");
self.turns
.with_wire_mut(loopal_context::condense_server_blocks);
Ok(true)
}
ErrorClass::ContextOverflow if !*context_overflow_retry => {
*context_overflow_retry = true;
info!("context overflow detected, force-compacting and retrying");
// Emit banner ONLY after compact succeeded — pre-fix showed
// "retrying" then errored when force_compact silently bailed.
let compacted = self.force_compact(None).await?;
if !compacted {
warn!("force_compact declined; ContextOverflow propagates");
return Ok(false);
}
self.emit_cosmetic(AgentEventPayload::Error {
message: CONTEXT_OVERFLOW_BANNER.into(),
})
.await;
Ok(true)
}
// PrefillRejected here means provider's finalize_messages let it
// through — the model catalog has supports_prefill=true wrongly.
// Fail fast: silent retry would loop without state change.
_ => Ok(false),
}
}
}
43 changes: 43 additions & 0 deletions crates/loopal-runtime/tests/agent_loop/compact_token_sync_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use loopal_protocol::AgentEventPayload;
use loopal_provider_api::Message;
use loopal_test_support::{HarnessBuilder, chunks};

#[tokio::test]
async fn force_compact_emits_token_usage_matching_tokens_after() {
let calls = vec![chunks::text_turn("<summary>summary</summary>")];
let mut h = HarnessBuilder::new()
.calls(calls)
.messages(vec![
Message::user("turn 1 content"),
Message::user("turn 2 content"),
Message::user("turn 3 content"),
Message::user("turn 4 content"),
Message::user("turn 5 content"),
])
.build()
.await;

let _ = h.runner.force_compact(None).await;

let evts = loopal_test_support::events::drain_pending(&mut h.event_rx).await;
let tokens_after = evts
.iter()
.find_map(|e| match e {
AgentEventPayload::Compacted(s) => Some(s.tokens_after),
_ => None,
})
.expect("force_compact must emit Compacted");
let usage_input = evts
.iter()
.find_map(|e| match e {
AgentEventPayload::TokenUsage { input_tokens, .. } => Some(*input_tokens),
_ => None,
})
.expect("force_compact must emit TokenUsage to refresh the ctx counter");

assert_eq!(
usage_input, tokens_after,
"post-compact TokenUsage.input_tokens must equal Compacted.tokens_after \
so the status bar ctx counter reflects the compacted size",
);
}
1 change: 1 addition & 0 deletions crates/loopal-runtime/tests/agent_loop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ mod compact_force_e2e_test;
mod compact_hooks_e2e_test;
mod compact_instructions_e2e_test;
mod compact_phases_e2e_test;
mod compact_token_sync_test;
mod compaction_run_e2e_test;
mod cron_e2e_test;
mod degeneration_e2e_test;
Expand Down
Loading
Loading