Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/loopal-acp/src/translate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ pub fn translate_event(payload: &AgentEventPayload, session_id: &str) -> Option<
| AgentEventPayload::HubDegraded { .. }
| AgentEventPayload::HubRecovered { .. }
| AgentEventPayload::DegenerationDetected(_)
| AgentEventPayload::ContinuationSkipped { .. }
| AgentEventPayload::TurnCancelled { .. }
| AgentEventPayload::ContinuationGateChanged(_) => None,
AgentEventPayload::ToolPermissionResolved { id } => Some(AcpNotification::Extension {
method: "_loopal/permission_resolved".into(),
Expand Down
5 changes: 5 additions & 0 deletions crates/loopal-context/src/turn_store/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ impl TurnStore {
self.current_turn_id.as_ref()
}

pub fn current_turn_index(&self) -> Option<usize> {
let id = self.current_turn_id.as_ref()?;
self.turns.iter().position(|t| &t.id == id)
}

pub fn len(&self) -> usize {
self.turns.len()
}
Expand Down
36 changes: 35 additions & 1 deletion crates/loopal-context/src/turn_tracker/tool_batch.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use loopal_turn::{ToolExecState, TurnEvent, TurnStep};
use loopal_turn::{CancelCause, ToolExecState, TurnEvent, TurnStep};

use super::TurnTracker;
use super::error::TurnTrackerError;
Expand All @@ -13,6 +13,40 @@ impl TurnTracker {
self.current_tool_batch_step = None;
}

// Cancel every not-yet-terminal item in the open ToolBatch, then close it.
// reason: turn-level cancellation (parent abort) leaves spawned tools in
// Pending/Running; without this the wire serializes them as the bogus
// "Pending — runtime invariant violated" tool_result.
pub fn cancel_open_tool_batch(&mut self, cause: CancelCause, logger: &dyn TurnEventLogger) {
let Some(step_index) = self.current_tool_batch_step else {
return;
};
let pending: Vec<u32> = self
.store
.current_turn()
.and_then(|t| t.body.steps.get(step_index as usize))
.and_then(|s| match s {
TurnStep::ToolBatch(b) => Some(b),
_ => None,
})
.map(|b| {
b.items
.iter()
.enumerate()
.filter(|(_, it)| {
matches!(it.state, ToolExecState::Pending | ToolExecState::Running)
})
.map(|(i, _)| i as u32)
.collect()
})
.unwrap_or_default();
for idx in pending {
let _ =
self.try_update_tool_state(idx, ToolExecState::Cancelled(cause.clone()), logger);
}
self.close_tool_batch();
}

pub fn try_update_tool_state(
&mut self,
item_index: u32,
Expand Down
2 changes: 2 additions & 0 deletions crates/loopal-context/tests/suite.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// Single test binary — includes all test modules
#[path = "suite/cancel_open_batch_test.rs"]
mod cancel_open_batch_test;
#[path = "suite/compaction_pair_test.rs"]
mod compaction_pair_test;
#[path = "suite/file_snapshot_test.rs"]
Expand Down
82 changes: 82 additions & 0 deletions crates/loopal-context/tests/suite/cancel_open_batch_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use loopal_context::{ContextBudget, PersistError, TurnEventLogger, TurnStore, TurnTracker};
use loopal_turn::{
CancelCause, OrderedToolBatch, ToolBatchItem, ToolCall, ToolCallId, ToolExecState, TurnEvent,
TurnStep, TurnTrigger,
};

struct InMemoryLogger;
impl TurnEventLogger for InMemoryLogger {
fn persist(&self, _event: &TurnEvent) -> Result<(), PersistError> {
Ok(())
}
}

fn budget() -> ContextBudget {
ContextBudget {
context_window: 200_000,
system_tokens: 0,
tool_tokens: 0,
output_reserve: 16_384,
safety_margin: 10_000,
message_budget: 173_616,
max_output_tokens: 64_000,
}
}

fn pending_item(id: &str) -> ToolBatchItem {
ToolBatchItem {
call: ToolCall {
id: ToolCallId::new(id),
name: "Bash".into(),
input: serde_json::Value::Null,
},
state: ToolExecState::Pending,
}
}

#[test]
fn cancel_open_tool_batch_marks_all_pending_as_cancelled() {
let mut t = TurnTracker::new(TurnStore::new(), budget());
t.try_start_turn(
TurnTrigger::UserInput {
envelope_id: "e".into(),
content: "hi".into(),
images: Vec::new(),
},
&InMemoryLogger,
)
.unwrap();
let step_index = t
.try_append_step(
TurnStep::ToolBatch(OrderedToolBatch {
items: vec![pending_item("t1"), pending_item("t2")],
}),
&InMemoryLogger,
)
.unwrap();
t.mark_tool_batch_open(step_index);

t.cancel_open_tool_batch(CancelCause::CrashRecovery, &InMemoryLogger);

let turn = t.store().current_turn().unwrap();
let TurnStep::ToolBatch(b) = &turn.body.steps[step_index as usize] else {
panic!("expected ToolBatch step");
};
for item in &b.items {
assert!(
matches!(
item.state,
ToolExecState::Cancelled(CancelCause::CrashRecovery)
),
"pending item must become Cancelled, got {:?}",
item.state
);
}
}

#[test]
fn cancel_open_tool_batch_is_noop_without_open_batch() {
let mut t = TurnTracker::new(TurnStore::new(), budget());
// no open batch → must not panic
t.cancel_open_tool_batch(CancelCause::UserInterrupt, &InMemoryLogger);
}
20 changes: 20 additions & 0 deletions crates/loopal-context/tests/suite/turn_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,26 @@ fn tracker() -> TurnTracker {
TurnTracker::new(TurnStore::new(), budget())
}

#[test]
fn current_turn_index_points_at_in_progress_turn() {
let mut t = tracker();
for i in 0..3 {
t.try_start_turn(
TurnTrigger::UserInput {
envelope_id: format!("env-{i}"),
content: format!("msg-{i}"),
images: Vec::new(),
},
&InMemoryLogger,
)
.unwrap();
t.end_turn(TurnOutcome::Complete, &InMemoryLogger).unwrap();
}
assert!(t.store().current_turn_index().is_none());
t.try_start_turn(user_trigger(), &InMemoryLogger).unwrap();
assert_eq!(t.store().current_turn_index(), Some(3));
}

#[test]
fn empty_store_has_no_current_turn() {
let t = tracker();
Expand Down
2 changes: 2 additions & 0 deletions crates/loopal-prompt-system/prompts/tools/usage-policy.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ If unsure whether a dedicated tool exists, use the dedicated tool — do NOT fal
## Parallel Calls

You can call multiple tools in a single response. If you intend to call multiple tools and there are no dependencies between them, make all independent tool calls in parallel. Maximize use of parallel tool calls where possible to increase efficiency. However, if some tool calls depend on previous calls to inform dependent values, do NOT call these tools in parallel and instead call them sequentially. For instance, if one operation must complete before another starts, run these operations sequentially instead.

A parallel batch has NO ordering guarantee. In particular, when you Write/Edit a file and then need a Bash command that reads, executes, or `chmod`s that SAME file, these are dependent — put the Bash command in a LATER turn, never in the same batch as the Write. Otherwise the Bash command may run before the file exists and fail (e.g. `chmod: No such file or directory`).
9 changes: 5 additions & 4 deletions crates/loopal-protocol/src/event_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ use crate::task_snapshot::TaskSnapshot;
use crate::thread_goal::{GoalTransitionReason, ThreadGoal};

/// Event payload. Runner/LLM/Tools only construct this enum.
///
/// `#[rustfmt::skip]` keeps single-field variants on one line so the file
/// fits the 200-line budget; expanded form (`{ text: String }` on three
/// lines) would push it well past with no readability gain.
/// `#[rustfmt::skip]` keeps single-field variants on one line (200-line budget).
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AgentEventPayload {
Expand Down Expand Up @@ -196,4 +193,8 @@ pub enum AgentEventPayload {
DegenerationDetected(DegenerationSummary),
/// Continuation gate opened or closed. Drives UI status indicators.
ContinuationGateChanged(ContinuationGateSummary),
/// A goal-continuation turn was skipped (goal changed) — keeps the skip observable, not silent.
ContinuationSkipped { reason: String },
/// A turn was cancelled (parent abort / governance / interrupt). `cause` is the rendered CancelledCause.
TurnCancelled { cause: String },
}
19 changes: 19 additions & 0 deletions crates/loopal-provider-api/tests/suite/turn_projection_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,25 @@ fn user_trigger_emits_human_user_message() {
assert!(matches!(msgs[0].origin, Some(MessageOrigin::Human)));
}

#[test]
fn goal_continuation_trigger_projects_to_user_message() {
// The continuation skip-gate relies on this: a GoalContinuation turn
// projects to a User message, so goal_continuation_check (last_role != User)
// won't re-inject and loop after a skip.
let t = turn_with(
TurnTrigger::GoalContinuation {
envelope_id: "env-g".into(),
content: "keep going".into(),
},
vec![],
);
let msgs = project_turn_to_messages(&t);
assert_eq!(msgs.len(), 1);
// The skip-gate keys on view().last_role(); assert last (not first) to pin
// the actual load-bearing property even if projection grows more messages.
assert_eq!(msgs.last().unwrap().role, MessageRole::User);
}

#[test]
fn cron_trigger_prefixed_with_scheduled() {
let t = turn_with(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fn cancel_reason(c: &CancelCause) -> &'static str {
CancelCause::GovernanceAbort => "Aborted by governance",
CancelCause::CrashRecovery => "Cancelled (crash recovery)",
CancelCause::Timeout => "Timed out",
CancelCause::ParentTurnAborted => "Cancelled (superseded by new input)",
}
}

Expand Down
53 changes: 52 additions & 1 deletion crates/loopal-runtime/src/agent_loop/goal_consistency.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use loopal_protocol::AgentEventPayload;
use loopal_provider_api::{ContinuationIntent, ContinuationReason};
use tracing::warn;

use super::runner::AgentLoopRunner;
use super::turn_context::TurnContext;

const CONTINUATION_SKIPPED_REASON: &str = "goal changed before continuation turn started";

impl AgentLoopRunner {
pub(super) async fn continuation_still_consistent(&self) -> bool {
Expand All @@ -12,11 +19,55 @@ impl AgentLoopRunner {
};
let goal = match session.snapshot().await {
Ok(Some(g)) => g,
_ => return false,
// Transient read error: no evidence the goal changed → stay
// consistent (don't skip) rather than drop the continuation.
Err(_) => return true,
// Goal genuinely gone → inconsistent.
Ok(None) => return false,
};
if &goal.goal_id != goal_id {
return false;
}
goal.status.participates_in_continuation()
}

// A goal-continuation turn whose goal changed before it started has nothing
// to do. Recovery retries (RecoveryRetry) are not continuations and are
// never skipped. Skipping ends the turn and returns true WITHOUT recording
// it into turn_history — so it cannot pollute degeneration/loop counters.
pub(super) async fn skip_stale_continuation_turn(&mut self, turn_ctx: &TurnContext) -> bool {
if matches!(
turn_ctx.pending_continuation,
Some(ContinuationIntent::AutoContinue {
reason: ContinuationReason::RecoveryRetry
})
) {
return false;
}
let is_continuation = self
.turns
.store()
.current_turn()
.is_some_and(|t| t.trigger.is_goal_continuation());
if !is_continuation || self.continuation_still_consistent().await {
return false;
}
warn!("{CONTINUATION_SKIPPED_REASON}");
self.reset_continuation_state();
self.emit_cosmetic(AgentEventPayload::ContinuationSkipped {
reason: CONTINUATION_SKIPPED_REASON.into(),
})
.await;
// Drop the stale turn by IDENTITY (its index), not position: the
// GoalContinuation trigger would otherwise project as a dangling
// "continue" User message into the wire context. Keying on the current
// turn's index (vs len-1) does not assume it is the last turn. rewind is
// event-sourced (survives resume).
let keep = match self.turns.store().current_turn_index() {
Some(idx) => idx,
None => return true,
};
self.rewind_turns_record(keep);
true
}
}
2 changes: 1 addition & 1 deletion crates/loopal-runtime/src/agent_loop/goal_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl AgentLoopRunner {
return Ok(false);
}
if is_clear {
self.last_continuation_goal_id = None;
self.reset_continuation_state();
}
if kickoff_eligible {
return self.goal_continuation_check().await;
Expand Down
19 changes: 19 additions & 0 deletions crates/loopal-runtime/src/agent_loop/governance/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ pub trait Governance: Send + Sync {
// not pre-classify so each Governance can apply its own semantics.
fn on_envelope_received(&mut self, _source: &MessageSource) {}

// Post-execution observation for decision-makers that need tool OUTPUT to
// decide (e.g. LoopDetector keys its signature on input+output so that a
// tool re-reading a mutating path — same args, different result — is not
// flagged as a loop). `results[i]` is index-matched to `tool_uses[i]`.
// Cannot veto (the batch already ran); it feeds the next on_before_tools.
fn on_after_tools(
&mut self,
_ctx: &mut TurnContext,
_tool_uses: &[(String, String, serde_json::Value)],
_results: &[ContentBlock],
) {
}

// Compaction just rewrote earlier history: any cross-turn state derived
// from the pre-compact conversation (e.g. signature counters that index
// tool calls now absent from the store) is stale and must reset.
Expand All @@ -54,6 +67,12 @@ pub trait Governance: Send + Sync {
// cross-turn state.
fn on_compact_completed(&mut self) {}

// A turn was cancelled (user interrupt / parent abort). Cross-turn state
// accrued from its batches is not a valid sample — a user interrupt should
// reset a loop streak rather than let it span the cancellation. Default
// no-op for governances whose state is not turn-scoped.
fn on_turn_cancelled(&mut self) {}

/// Inspect the completed turn alongside trailing history; default
/// returns `PostTurnAction::None`. Cross-turn safety nets (degeneration
/// detector, repetition guard) override this.
Expand Down
5 changes: 2 additions & 3 deletions crates/loopal-runtime/src/agent_loop/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ impl AgentLoopRunner {
}
}
if self.turns.current_turn_id().is_some() {
self.end_turn_record(loopal_turn::TurnOutcome::Cancelled {
cause: loopal_turn::CancelledCause::ParentTurnAborted,
});
self.finalize_turn_cancellation(loopal_turn::CancelledCause::ParentTurnAborted)
.await;
}
let Some(_turn_id) = self.start_turn_record(envelope_to_trigger(env)) else {
error!(
Expand Down
Loading
Loading