Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## [Unreleased]

### Added

- `zeph-memory`: `MAX_GRAPH_NODES` constant (500) added to A* graph retrieval; `node_map` is
truncated to the highest-scored 500 nodes before the inner loop, bounding worst-case complexity
from O(n²) per seed to O(n log n) (closes #4368).
- `zeph-subagent`: `FleetRegistry` trait and `SqliteFleetRegistry` adapter; sub-agents spawned by
`SubAgentManager` are now registered in the fleet `agent_sessions` table and visible in the fleet
dashboard. `cancel_all` marks all active sub-agent sessions as cancelled on shutdown (closes #4370).

## [0.21.2] - 2026-05-18

### Added
Expand Down
75 changes: 75 additions & 0 deletions crates/zeph-memory/src/graph/retrieval_astar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ use crate::graph::types::{EdgeType, GraphFact};

const ENTITY_COLLECTION: &str = "zeph_graph_entities";

/// Maximum number of graph nodes passed to the A* traversal loop.
///
/// Without a cap the A* inner loop is O(|nodes|²) per seed, which becomes
/// prohibitively slow on large or highly-connected graphs. Nodes are ranked by
/// their seed score before truncation so only the least-relevant nodes are dropped.
const MAX_GRAPH_NODES: usize = 500;

/// Cosine similarity of two equal-length slices. Returns `0.0` when either norm is zero.
fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
let dot: f32 = a.iter().zip(b.iter()).map(|(&x, &y)| x * y).sum();
Expand All @@ -36,6 +43,35 @@ const DEFAULT_COMMUNITY_CAP: usize = 3;
/// Query embedding paired with per-entity embedding map, produced by the PRISM path.
type PrismEmbeddings = Option<(Vec<f32>, HashMap<i64, Vec<f32>>)>;

/// Truncate `node_map` in-place to at most `cap` entries, keeping those with the highest score.
///
/// Score is looked up from `entity_scores`; entities absent from that map receive `0.0`.
/// When `node_map.len() <= cap` the map is unchanged.
pub(crate) fn cap_node_map<V>(
node_map: &mut HashMap<i64, V>,
entity_scores: &HashMap<i64, f32>,
cap: usize,
) {
if node_map.len() <= cap {
return;
}
let mut scored: Vec<(i64, f32)> = node_map
.keys()
.map(|&id| {
let s = entity_scores.get(&id).copied().unwrap_or(0.0);
(id, s)
})
.collect();
scored.sort_unstable_by(|a, b| b.1.total_cmp(&a.1));
scored.truncate(cap);
let keep: HashSet<i64> = scored.into_iter().map(|(id, _)| id).collect();
node_map.retain(|id, _| keep.contains(id));
tracing::debug!(
retained = node_map.len(),
"graph_recall_astar: node_map capped"
);
}

/// Retrieve graph facts using A* shortest-path traversal.
///
/// Algorithm:
Expand Down Expand Up @@ -251,6 +287,9 @@ pub async fn graph_recall_astar(
graph.add_edge(src, tgt, cost);
}

// Cap node_map to the highest-scored nodes to bound A* time complexity.
cap_node_map(&mut node_map, &entity_scores, MAX_GRAPH_NODES);

// Run A* from each seed; collect path node pairs.
let mut path_pairs: HashSet<(NodeIndex, NodeIndex)> = HashSet::new();

Expand Down Expand Up @@ -549,4 +588,40 @@ mod tests {
assert!(!result.is_empty());
assert_eq!(result[0].entity_name, "alice");
}

#[test]
fn cap_node_map_truncates_to_cap_keeping_highest_scored() {
use petgraph::graph::NodeIndex;

let mut node_map: HashMap<i64, NodeIndex> = (0..600i64)
.map(|id| (id, NodeIndex::new(usize::try_from(id).unwrap())))
.collect();
// Assign scores: entity 599 gets highest, 0 gets lowest.
// i16 covers 0..600 exactly; the i16→f32 cast is lossless (f32 has 23 mantissa bits).
let entity_scores: HashMap<i64, f32> = (0..600i64)
.map(|id| (id, f32::from(i16::try_from(id).unwrap()) / 599.0))
.collect();

cap_node_map(&mut node_map, &entity_scores, MAX_GRAPH_NODES);

assert_eq!(node_map.len(), MAX_GRAPH_NODES);
// The 500 retained nodes must all have id >= 100 (top 500 by score are ids 100..=599).
for &id in node_map.keys() {
assert!(id >= 100, "low-scored node {id} should have been dropped");
}
}

#[test]
fn cap_node_map_noop_when_within_limit() {
use petgraph::graph::NodeIndex;

let mut node_map: HashMap<i64, NodeIndex> = (0..10i64)
.map(|id| (id, NodeIndex::new(usize::try_from(id).unwrap())))
.collect();
let entity_scores: HashMap<i64, f32> = HashMap::new();

cap_node_map(&mut node_map, &entity_scores, MAX_GRAPH_NODES);

assert_eq!(node_map.len(), 10);
}
}
68 changes: 68 additions & 0 deletions crates/zeph-subagent/src/fleet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
// SPDX-License-Identifier: MIT OR Apache-2.0

//! Fleet registry abstraction for sub-agent session tracking.
//!
//! [`FleetRegistry`] is a narrow trait that decouples `zeph-subagent` from the
//! `zeph-memory` `SqliteStore`. The concrete implementation lives in `zeph-core`
//! and is injected via `SubAgentManager::set_fleet_registry`.

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

/// Terminal lifecycle status of an agent session visible in the fleet dashboard.
///
/// Only terminal states are represented because [`FleetRegistry::mark_terminal`] is only
/// called when a session ends. Active sessions are registered via
/// [`FleetRegistry::register_active`].
///
/// Mirrors the terminal variants of `zeph_memory::SessionStatus` without creating a
/// dependency on that crate.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FleetSessionStatus {
/// Session ended normally.
Completed,
/// Session ended due to an unrecoverable error.
Failed,
/// Session was cancelled by the user.
Cancelled,
}

/// Minimal data needed to register a sub-agent session in the fleet dashboard.
#[derive(Debug, Clone)]
pub struct FleetSessionInfo {
/// Stable session ID (UUID string).
pub id: String,
/// Human-readable agent name (from the definition).
pub agent_name: String,
/// ISO-8601 UTC timestamp of session start.
pub started_at: String,
}

/// Trait that abstracts fleet session persistence for `SubAgentManager`.
///
/// Implementors must be `Send + Sync` and handle their own internal error recovery;
/// the manager logs failures at `warn` level but never propagates them to the caller.
pub trait FleetRegistry: Send + Sync {
/// Register or update a sub-agent session as active.
///
/// Called once at sub-agent spawn time. Fire-and-forget: errors are logged
/// by the manager and do not abort the spawn.
fn register_active<'a>(
&'a self,
info: &'a FleetSessionInfo,
) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>>;

/// Mark a sub-agent session as terminal.
///
/// Called when a sub-agent finishes (collect) or is cancelled.
fn mark_terminal<'a>(
&'a self,
session_id: &'a str,
status: FleetSessionStatus,
) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send + 'a>>;
}

/// Shared, type-erased fleet registry injected into the manager.
pub type SharedFleetRegistry = Arc<dyn FleetRegistry>;
2 changes: 2 additions & 0 deletions crates/zeph-subagent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub mod command;
pub mod def;
pub mod error;
pub mod filter;
pub mod fleet;
pub mod grants;
pub mod hooks;
pub mod manager;
Expand All @@ -58,6 +59,7 @@ pub use def::{
};
pub use error::SubAgentError;
pub use filter::{FilteredToolExecutor, PlanModeExecutor, filter_skills};
pub use fleet::{FleetRegistry, FleetSessionInfo, FleetSessionStatus, SharedFleetRegistry};
pub use grants::{Grant, GrantKind, PermissionGrants, SecretRequest};
pub use hooks::{
HookAction, HookDef, HookError, HookMatcher, HookOutput, HookRunResult, McpDispatch,
Expand Down
Loading
Loading