Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/loopal-agent-hub/tests/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ mod event_router_test;
mod hub_integration_test;
#[path = "suite/hub_lifecycle_test.rs"]
mod hub_lifecycle_test;
#[path = "suite/hub_secret_client_test.rs"]
mod hub_secret_client_test;
#[path = "suite/hub_shutdown_test.rs"]
mod hub_shutdown_test;
#[path = "suite/multi_agent_test.rs"]
Expand Down
118 changes: 118 additions & 0 deletions crates/loopal-agent-hub/tests/suite/hub_secret_client_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
//! HubSecretClient behaviors that aren't covered by `e2e_real_vault_test` or
//! `e2e_secret_ipc_test`: IpcBudget gating, HubHealth state transitions, and
//! retry-policy interaction.

use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use loopal_agent_hub::Hub;
use loopal_ipc::{Connection, IpcBudget, duplex_pair};
use loopal_secret_client::{HUB_RPC_BUDGET, HubSecretClient, SecretClient, SecretError};
use tokio::sync::Mutex;
use tokio::sync::mpsc;

use super::secret_test_helpers::spawn_hub_dispatch_loop;

fn make_client_and_hub() -> (HubSecretClient, Arc<Mutex<Hub>>) {
let (client_t, hub_t) = duplex_pair();
let (client_conn, _client_rx) = Connection::new(client_t).into_listening();
let (hub_conn, hub_rx) = Connection::new(hub_t).into_listening();

let (event_tx, _event_rx) = mpsc::channel(64);
let hub = Arc::new(Mutex::new(Hub::new(event_tx)));
spawn_hub_dispatch_loop(hub.clone(), hub_conn, hub_rx, "test-client".into());

let client = HubSecretClient::new(
client_conn,
PathBuf::from("/nonexistent-test-cwd"),
"test-agent".into(),
0,
);
(client, hub)
}

#[tokio::test]
async fn forbidden_budget_rejects_get_synchronously_without_ipc() {
// IpcBudget::Forbidden marks critical paths that must NOT issue hub RPC.
// The client must reject these synchronously with a clear error rather
// than letting the call go through and time out.
let (client, _hub) = make_client_and_hub();
let result = client
.get("api_key", IpcBudget::Forbidden)
.await
.unwrap_err();
let SecretError::Ipc(msg) = &result else {
panic!("expected Ipc error for Forbidden budget, got: {result:?}");
};
assert!(
msg.contains("Forbidden") || msg.contains("forbidden"),
"error must reference Forbidden, got: {msg:?}"
);
}

#[tokio::test]
async fn forbidden_budget_rejects_list_names_synchronously() {
let (client, _hub) = make_client_and_hub();
let result = client.list_names(IpcBudget::Forbidden).await.unwrap_err();
let SecretError::Ipc(msg) = &result else {
panic!("expected Ipc error, got: {result:?}");
};
assert!(msg.contains("Forbidden") || msg.contains("forbidden"));
}

#[tokio::test]
async fn health_starts_in_healthy_state() {
let (client, _hub) = make_client_and_hub();
// The `health` accessor returns the inner Arc; pre-IPC it must be
// healthy (no failures recorded yet).
let health = client.health();
assert!(
!health.is_degraded(),
"fresh HubSecretClient must start healthy, not degraded"
);
assert!(
health.degraded_at_unix_ms().is_none(),
"no degradation timestamp on a brand-new client"
);
}

#[tokio::test]
async fn health_degrades_after_consecutive_failures() {
// Hub side is wired but the cwd points at a directory with no vault →
// every call returns an Ipc/NotFound error. Enough consecutive failures
// must flip the health to degraded.
let (client, _hub) = make_client_and_hub();
let health = client.health();
assert!(!health.is_degraded());

// Fire enough requests to cross the degradation threshold (default 3).
for _ in 0..5 {
let _ = tokio::time::timeout(
Duration::from_secs(3),
client.get("missing", HUB_RPC_BUDGET),
)
.await
.expect("each get must return within budget");
}

assert!(
health.is_degraded(),
"health must transition to degraded after consecutive failures"
);
assert!(
health.degraded_at_unix_ms().is_some(),
"degraded state must carry a timestamp"
);
}

#[tokio::test]
async fn health_observer_is_shared_across_clones() {
// health() returns Arc<HubHealth>; clones must observe the same state
// — degradation seen by one clone must be visible to others. This is
// load-bearing for the agent-server settle-poll listener.
let (client, _hub) = make_client_and_hub();
let h1 = client.health();
let h2 = client.health();
assert!(Arc::ptr_eq(&h1, &h2), "health() must return the same Arc");
}
1 change: 1 addition & 0 deletions crates/loopal-agent-server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ rust_test(
"@crates//:serde_json",
"@crates//:tempfile",
"@crates//:tokio",
"@crates//:tokio-util",
"@crates//:uuid",
],
proc_macro_deps = ["@crates//:async-trait"],
Expand Down
1 change: 1 addition & 0 deletions crates/loopal-agent-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub mod testing {
pub use crate::cron_bridge::spawn as cron_bridge_spawn;
pub use crate::cron_bridge::spawn_with_receiver as cron_bridge_spawn_with_receiver;
pub use crate::ipc_handlers::SessionRef;
pub use crate::memory_consolidation::trigger_consolidation;
pub use crate::params::AgentSetupResult;
pub use crate::params::{StartParams, apply_start_overrides, build_kernel_with_provider};
pub use crate::session_handlers_factory::build_session_handlers;
Expand Down
2 changes: 2 additions & 0 deletions crates/loopal-agent-server/tests/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ mod hub_interaction_edge_test;
mod hub_interaction_test;
#[path = "suite/interrupt_filter_test.rs"]
mod interrupt_filter_test;
#[path = "suite/memory_consolidation_test.rs"]
mod memory_consolidation_test;
#[path = "suite/observer_join_edge_test.rs"]
mod observer_join_edge_test;
#[path = "suite/observer_join_test.rs"]
Expand Down
146 changes: 146 additions & 0 deletions crates/loopal-agent-server/tests/suite/memory_consolidation_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//! Lock-protocol behavior of `memory_consolidation::trigger_consolidation`.
//! The lock keeps concurrent consolidations from spawning duplicate sub-agents
//! — when `.consolidation_lock` exists with a fresh timestamp, the function
//! must short-circuit without spawning.

use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use loopal_agent::shared::{AgentShared, SchedulerHandle};
use loopal_agent::task_store::TaskStore;
use loopal_agent_server::testing::trigger_consolidation;
use loopal_config::Settings;
use loopal_ipc::Connection;
use loopal_kernel::Kernel;
use loopal_scheduler::CronScheduler;
use loopal_test_support::TestFixture;
use tokio_util::sync::CancellationToken;

fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}

fn build_shared(fixture: &TestFixture) -> Arc<AgentShared> {
let kernel = Arc::new(Kernel::new(Settings::default()).unwrap());
let cwd = fixture
.path()
.canonicalize()
.unwrap_or_else(|_| fixture.path().to_path_buf());
// Hub side dropped — spawn_agent will fail; that's intentional: we only
// care about the lock-protocol observable in the lock-held branch, and
// the spawn-failure path still exercises release_lock.
let (conn, _peer) = loopal_test_support::make_duplex_pair();
let (hub_connection, _rx) = Connection::new(conn).into_listening();
let scheduler_handle =
SchedulerHandle::new(Arc::new(CronScheduler::new()), CancellationToken::new());
Arc::new(AgentShared {
kernel,
task_store: Arc::new(TaskStore::with_sessions_root(fixture.path().join("tasks"))),
hub_connection,
cwd,
depth: 0,
agent_name: "consolidation-test".into(),
parent_event_tx: None,
cancel_token: None,
scheduler_handle,
message_snapshot: Arc::new(std::sync::RwLock::new(Vec::new())),
goal_session: None,
})
}

#[tokio::test]
async fn trigger_consolidation_skips_when_fresh_lock_exists() {
let fixture = TestFixture::new();
let shared = build_shared(&fixture);
let memory_dir = shared.cwd.join(".loopal/memory");
std::fs::create_dir_all(&memory_dir).unwrap();

// Pre-create a fresh lock (timestamp = now). trigger_consolidation must
// refuse to acquire and return without spawning.
let lock_path = memory_dir.join(".consolidation_lock");
let original_ts = now_secs();
std::fs::write(&lock_path, original_ts.to_string()).unwrap();

trigger_consolidation(&shared, "test-model");

// Lock unchanged: function early-returned, never wrote its own timestamp.
let actual = std::fs::read_to_string(&lock_path).unwrap();
assert_eq!(
actual.trim(),
original_ts.to_string(),
"lock content must be unchanged when trigger short-circuited"
);

// .last_consolidation must NOT be touched: the success path never ran.
assert!(
!memory_dir.join(".last_consolidation").exists(),
"marker file must not be written when trigger short-circuited"
);
}

#[tokio::test]
async fn trigger_consolidation_acquires_lock_when_free() {
let fixture = TestFixture::new();
let shared = build_shared(&fixture);
let memory_dir = shared.cwd.join(".loopal/memory");

let lock_path = memory_dir.join(".consolidation_lock");
assert!(
!lock_path.exists(),
"precondition: no lock prior to trigger"
);

// try_acquire_lock writes the lock file synchronously before tokio::spawn
// returns. Read immediately so the spawn-failure path can't have released
// it yet.
trigger_consolidation(&shared, "test-model");
assert!(
lock_path.exists(),
"trigger must acquire the lock synchronously before the spawned task can release it"
);

// Wait for the spawn-task to fail (hub side is dropped) and release the
// lock via the warn-path. spawn_agent errors immediately on the closed
// connection.
let deadline = std::time::Instant::now() + Duration::from_secs(8);
while std::time::Instant::now() < deadline {
if !lock_path.exists() {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
!lock_path.exists(),
"lock must be released after spawn_agent fails (connection dropped)"
);
}

#[tokio::test]
async fn trigger_consolidation_skips_then_unlocked_caller_succeeds() {
// Sequential trigger: first call holds, releases; second call sees a clean
// dir again and acquires freshly.
let fixture = TestFixture::new();
let shared = build_shared(&fixture);
let memory_dir = shared.cwd.join(".loopal/memory");
let lock_path = memory_dir.join(".consolidation_lock");

trigger_consolidation(&shared, "test-model");

let deadline = std::time::Instant::now() + Duration::from_secs(8);
while std::time::Instant::now() < deadline {
if !lock_path.exists() {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(!lock_path.exists());

trigger_consolidation(&shared, "test-model");
assert!(
lock_path.exists(),
"second trigger must re-acquire the lock synchronously"
);
}
4 changes: 4 additions & 0 deletions crates/loopal-tui/src/input/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub enum InputAction {
ToolApprove,
/// User denied tool use
ToolDeny,
/// Toggle the permission cursor between Allow/Deny (arrow-key nav).
ToolPermissionToggle,
/// Commit whichever option the permission cursor points at.
ToolPermissionConfirm,
/// Interrupt the agent's current work (ESC while busy)
Interrupt,
/// User wants to switch mode (from Shift+Tab shortcut)
Expand Down
5 changes: 5 additions & 0 deletions crates/loopal-tui/src/input/modal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ pub(super) fn handle_modal_keys(app: &mut App, key: &KeyEvent) -> Option<InputAc
return Some(match key.code {
KeyCode::Char('y') | KeyCode::Char('Y') => InputAction::ToolApprove,
KeyCode::Char('n') | KeyCode::Char('N') => InputAction::ToolDeny,
KeyCode::Left | KeyCode::Right | KeyCode::Up | KeyCode::Down => {
InputAction::ToolPermissionToggle
}
KeyCode::Tab => InputAction::ToolPermissionToggle,
KeyCode::Enter => InputAction::ToolPermissionConfirm,
KeyCode::Esc => InputAction::ToolDeny,
_ if is_ctrl_c => InputAction::ToolDeny,
_ => InputAction::None,
Expand Down
8 changes: 8 additions & 0 deletions crates/loopal-tui/src/key_dispatch_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ pub(crate) async fn apply_action(app: &mut App, action: InputAction) -> Dispatch
crate::key_dispatch_ops::tool_deny(app).await;
DispatchOutcome::Continue
}
InputAction::ToolPermissionToggle => {
crate::key_dispatch_ops::tool_permission_toggle(app);
DispatchOutcome::Continue
}
InputAction::ToolPermissionConfirm => {
crate::key_dispatch_ops::tool_permission_confirm(app).await;
DispatchOutcome::Continue
}
InputAction::Interrupt => {
app.session.interrupt();
DispatchOutcome::Continue
Expand Down
18 changes: 18 additions & 0 deletions crates/loopal-tui/src/key_dispatch_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ pub(crate) async fn tool_deny(app: &mut App) {
}
}

pub(crate) fn tool_permission_toggle(app: &mut App) {
app.with_active_conversation_mut(|conv| {
if let Some(p) = conv.pending_permission.as_mut() {
p.cursor = p.cursor.toggle();
}
});
}

pub(crate) async fn tool_permission_confirm(app: &mut App) {
let cursor =
app.with_active_conversation(|conv| conv.pending_permission.as_ref().map(|p| p.cursor));
match cursor {
Some(loopal_view_state::PermissionChoice::Allow) => tool_approve(app).await,
Some(loopal_view_state::PermissionChoice::Deny) => tool_deny(app).await,
None => {}
}
}

pub(crate) async fn push_to_inbox(app: &mut App, content: UserContent) {
let history_text = match &content.skill_info {
Some(si) if si.user_args.is_empty() => si.name.clone(),
Expand Down
Loading
Loading