Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions qa/cases/playwright/ENV-001.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ test.describe('ENV-001', () => {
})

await test.step('Step 5: whoami matches visible user', async () => {
const { username } = await getWhoami(request)
await expect(page.locator('.sidebar-footer')).toContainText(username)
const { name } = await getWhoami(request)
await expect(page.locator('.sidebar-footer')).toContainText(name)
})

expect(errors, `console errors: ${errors.join('; ')}`).toEqual([])
Expand Down
2 changes: 1 addition & 1 deletion qa/cases/playwright/TSK-001.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ test.describe("TSK-001", () => {
await page.getByRole("button", { name: "Start", exact: true }).click();
// Status pill should flip once the refetch lands.
await expect(
page.locator(".task-detail__status").filter({ hasText: "in_progress" }),
page.locator(".task-detail__status").filter({ hasText: "in progress" }),
).toBeVisible({ timeout: 15_000 });
});

Expand Down
78 changes: 78 additions & 0 deletions qa/cases/playwright/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 28 additions & 9 deletions src/agent/event_forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::agent::drivers::{AgentEventItem, DriverEvent, FinishReason, ProcessSt
use crate::agent::manager::ManagedAgent;
use crate::agent::trace::{self, AgentTraceStore, TraceEvent, TraceEventKind};
use crate::store::Store;
use crate::store::StreamEvent;

/// Extract a short human-readable summary from an ACP tool-call `input`
/// object. Probes the common argument keys drivers use (`file_path`,
Expand Down Expand Up @@ -154,6 +155,7 @@ pub(super) fn spawn_event_forwarder(
activity_logs: Arc<ActivityLogMap>,
trace_store: Arc<AgentTraceStore>,
trace_tx: broadcast::Sender<TraceEvent>,
stream_tx: broadcast::Sender<StreamEvent>,
store: Arc<Store>,
agents: Arc<Mutex<HashMap<String, ManagedAgent>>>,
) -> tokio::task::JoinHandle<()> {
Expand Down Expand Up @@ -434,15 +436,18 @@ pub(super) fn spawn_event_forwarder(
"⚠️ @{} completed a run without replying. Common causes: not authenticated, authentication expired, or a runtime error. Check agent logs for details.",
key
);
if let Err(e) =
store.create_system_message(&channel_id, &warning)
{
warn!(
agent = %key,
channel_id = %channel_id,
error = %e,
"failed to post empty-run warning"
);
match store.create_system_message(&channel_id, &warning) {
Ok((_, event)) => {
let _ = stream_tx.send(event);
}
Err(e) => {
warn!(
agent = %key,
channel_id = %channel_id,
error = %e,
"failed to post empty-run warning"
);
}
}
}
}
Expand Down Expand Up @@ -590,6 +595,7 @@ mod tests {
let activity_logs = Arc::new(ActivityLogMap::default());
let trace_store = Arc::new(AgentTraceStore::new());
let (trace_tx, trace_rx) = broadcast::channel::<TraceEvent>(64);
let (stream_tx, _stream_rx) = broadcast::channel::<StreamEvent>(64);
let agents: Arc<Mutex<HashMap<String, ManagedAgent>>> =
Arc::new(Mutex::new(HashMap::new()));

Expand All @@ -599,6 +605,7 @@ mod tests {
activity_logs,
trace_store,
trace_tx.clone(),
stream_tx,
store,
agents,
);
Expand Down Expand Up @@ -709,6 +716,7 @@ mod tests {
let activity_logs = Arc::new(ActivityLogMap::default());
let trace_store = Arc::new(AgentTraceStore::new());
let (trace_tx, _trace_rx) = broadcast::channel::<TraceEvent>(64);
let (stream_tx, _stream_rx) = broadcast::channel::<StreamEvent>(64);
let agents: Arc<Mutex<HashMap<String, ManagedAgent>>> =
Arc::new(Mutex::new(HashMap::new()));

Expand All @@ -718,6 +726,7 @@ mod tests {
activity_logs,
trace_store.clone(),
trace_tx.clone(),
stream_tx,
store.clone(),
agents,
);
Expand Down Expand Up @@ -787,6 +796,7 @@ mod tests {
let activity_logs = Arc::new(ActivityLogMap::default());
let trace_store = Arc::new(AgentTraceStore::new());
let (trace_tx, _trace_rx) = broadcast::channel::<TraceEvent>(64);
let (stream_tx, _stream_rx) = broadcast::channel::<StreamEvent>(64);
let agents: Arc<Mutex<HashMap<String, ManagedAgent>>> =
Arc::new(Mutex::new(HashMap::new()));

Expand All @@ -796,6 +806,7 @@ mod tests {
activity_logs,
trace_store.clone(),
trace_tx.clone(),
stream_tx,
store.clone(),
agents,
);
Expand Down Expand Up @@ -886,6 +897,7 @@ mod tests {
let activity_logs = Arc::new(ActivityLogMap::default());
let trace_store = Arc::new(AgentTraceStore::new());
let (trace_tx, _trace_rx) = broadcast::channel::<TraceEvent>(64);
let (stream_tx, _stream_rx) = broadcast::channel::<StreamEvent>(64);
let agents: Arc<Mutex<HashMap<String, ManagedAgent>>> =
Arc::new(Mutex::new(HashMap::new()));

Expand All @@ -910,6 +922,7 @@ mod tests {
activity_logs,
trace_store.clone(),
trace_tx.clone(),
stream_tx,
store.clone(),
agents.clone(),
);
Expand Down Expand Up @@ -988,6 +1001,7 @@ mod tests {
let activity_logs = Arc::new(ActivityLogMap::default());
let trace_store = Arc::new(AgentTraceStore::new());
let (trace_tx, _trace_rx) = broadcast::channel::<TraceEvent>(64);
let (stream_tx, _stream_rx) = broadcast::channel::<StreamEvent>(64);
let agents: Arc<Mutex<HashMap<String, ManagedAgent>>> =
Arc::new(Mutex::new(HashMap::new()));

Expand All @@ -997,6 +1011,7 @@ mod tests {
activity_logs,
trace_store.clone(),
trace_tx.clone(),
stream_tx,
store.clone(),
agents,
);
Expand Down Expand Up @@ -1065,6 +1080,7 @@ mod tests {
let activity_logs = Arc::new(ActivityLogMap::default());
let trace_store = Arc::new(AgentTraceStore::new());
let (trace_tx, _trace_rx) = broadcast::channel::<TraceEvent>(64);
let (stream_tx, _stream_rx) = broadcast::channel::<StreamEvent>(64);
let agents: Arc<Mutex<HashMap<String, ManagedAgent>>> =
Arc::new(Mutex::new(HashMap::new()));

Expand All @@ -1074,6 +1090,7 @@ mod tests {
activity_logs,
trace_store.clone(),
trace_tx.clone(),
stream_tx,
store.clone(),
agents,
);
Expand Down Expand Up @@ -1141,6 +1158,7 @@ mod tests {
let activity_logs = Arc::new(ActivityLogMap::default());
let trace_store = Arc::new(AgentTraceStore::new());
let (trace_tx, _trace_rx) = broadcast::channel::<TraceEvent>(64);
let (stream_tx, _stream_rx) = broadcast::channel::<StreamEvent>(64);
let agents: Arc<Mutex<HashMap<String, ManagedAgent>>> =
Arc::new(Mutex::new(HashMap::new()));

Expand All @@ -1150,6 +1168,7 @@ mod tests {
activity_logs,
trace_store.clone(),
trace_tx.clone(),
stream_tx,
store.clone(),
agents,
);
Expand Down
39 changes: 31 additions & 8 deletions src/agent/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::agent::trace::{self, AgentTraceStore, TraceEvent, TraceEventKind};
use crate::agent::AgentLifecycle;
use crate::agent::AgentRuntime;
use crate::store::messages::ReceivedMessage;
use crate::store::stream::StreamEvent;
use crate::store::Store;

/// Managed agent backed by a [`RuntimeDriver`] + [`Session`].
Expand Down Expand Up @@ -99,6 +100,12 @@ pub struct AgentManager {
trace_store: Arc<AgentTraceStore>,
store: Arc<Store>,
data_dir: PathBuf,
/// Broadcast sender for agent trace events (tool calls, thinking, etc.).
/// Decoupled from Store so the persistence layer can be swapped.
trace_tx: tokio::sync::broadcast::Sender<crate::agent::trace::TraceEvent>,
/// Broadcast sender for stream events (messages, channel activity).
/// Passed to event forwarder so it can publish system messages.
stream_tx: tokio::sync::broadcast::Sender<StreamEvent>,
/// Optional explicit bridge endpoint. When `None`, agent startup reads the
/// shared discovery file from `~/.chorus/bridge.json`. Tests set this to a
/// synthetic URL, and `chorus serve` points it at the co-hosted bridge so
Expand All @@ -117,14 +124,21 @@ pub fn build_driver_registry() -> HashMap<AgentRuntime, Arc<dyn RuntimeDriver>>
}

impl AgentManager {
pub fn new(store: Arc<Store>, data_dir: PathBuf) -> Self {
pub fn new(
store: Arc<Store>,
data_dir: PathBuf,
trace_tx: tokio::sync::broadcast::Sender<crate::agent::trace::TraceEvent>,
stream_tx: tokio::sync::broadcast::Sender<StreamEvent>,
) -> Self {
Self {
driver_registry: build_driver_registry(),
agents: Arc::new(Mutex::new(HashMap::new())),
activity_logs: Arc::new(std::sync::Mutex::new(HashMap::new())),
trace_store: Arc::new(AgentTraceStore::new()),
store,
data_dir,
trace_tx,
stream_tx,
bridge_endpoint_override: None,
}
}
Expand Down Expand Up @@ -286,7 +300,8 @@ impl AgentManager {
event_rx,
self.activity_logs.clone(),
self.trace_store.clone(),
self.store.trace_sender(),
self.trace_tx.clone(),
self.stream_tx.clone(),
self.store.clone(),
self.agents.clone(),
);
Expand Down Expand Up @@ -370,7 +385,7 @@ impl AgentManager {
// End any active trace run.
trace::emit_active_event(
&self.trace_store,
&self.store.trace_sender(),
&self.trace_tx.clone(),
agent_name,
TraceEventKind::Error {
message: "Agent stopped".to_string(),
Expand Down Expand Up @@ -479,7 +494,7 @@ impl AgentManager {

let agents_ref = self.agents.clone();
let trace_store = self.trace_store.clone();
let trace_tx = self.store.trace_sender();
let trace_tx = self.trace_tx.clone();
let name = agent_name.to_string();

tokio::spawn(async move {
Expand Down Expand Up @@ -552,7 +567,9 @@ impl AgentManager {
/// or bridge process are required. Register drivers explicitly after
/// construction via [`register_driver`] if the test needs to start agents.
pub fn new_for_test(store: Arc<Store>, data_dir: std::path::PathBuf) -> Self {
let mut mgr = AgentManager::new(store, data_dir);
let (trace_tx, _) = tokio::sync::broadcast::channel(64);
let (stream_tx, _) = tokio::sync::broadcast::channel(64);
let mut mgr = AgentManager::new(store, data_dir, trace_tx, stream_tx);
mgr.bridge_endpoint_override = Some("http://127.0.0.1:1".to_string());
mgr
}
Expand Down Expand Up @@ -792,7 +809,9 @@ mod tests {
use tempfile::tempdir;

fn make_test_manager(store: Arc<Store>, dir: &std::path::Path) -> AgentManager {
let mut manager = AgentManager::new(store, dir.join("agents"));
let (trace_tx, _) = tokio::sync::broadcast::channel(64);
let (stream_tx, _) = tokio::sync::broadcast::channel(64);
let mut manager = AgentManager::new(store, dir.join("agents"), trace_tx, stream_tx);
let fake = Arc::new(FakeDriver::new(AgentRuntime::Codex));
manager.register_driver(AgentRuntime::Codex, fake);
// Tests use a synthetic endpoint — the FakeDriver ignores it, but the
Expand Down Expand Up @@ -968,7 +987,9 @@ mod tests {
fn resolve_bridge_endpoint_returns_override_when_set() {
let dir = tempdir().unwrap();
let store = Arc::new(Store::open(":memory:").unwrap());
let mut manager = AgentManager::new(store, dir.path().join("agents"));
let (trace_tx, _) = tokio::sync::broadcast::channel(64);
let (stream_tx, _) = tokio::sync::broadcast::channel(64);
let mut manager = AgentManager::new(store, dir.path().join("agents"), trace_tx, stream_tx);
manager.set_bridge_endpoint_override("http://127.0.0.1:9999");
let got = manager.resolve_bridge_endpoint().unwrap();
assert_eq!(got, "http://127.0.0.1:9999");
Expand All @@ -990,7 +1011,9 @@ mod tests {
}
let dir = tempdir().unwrap();
let store = Arc::new(Store::open(":memory:").unwrap());
let manager = AgentManager::new(store, dir.path().join("agents"));
let (trace_tx, _) = tokio::sync::broadcast::channel(64);
let (stream_tx, _) = tokio::sync::broadcast::channel(64);
let manager = AgentManager::new(store, dir.path().join("agents"), trace_tx, stream_tx);
let err = manager
.resolve_bridge_endpoint()
.expect_err("must fail when no override and no bridge");
Expand Down
Loading
Loading