diff --git a/crates/jcode-protocol/src/lib.rs b/crates/jcode-protocol/src/lib.rs index 57880e1c1..a74941c89 100644 --- a/crates/jcode-protocol/src/lib.rs +++ b/crates/jcode-protocol/src/lib.rs @@ -2089,13 +2089,50 @@ fn default_model_direction() -> i8 { 1 } -/// Encode an event as a newline-terminated JSON string +/// Encode an event as a newline-terminated JSON string. +/// +/// Defends the line-delimited protocol against accidental raw-newline +/// injection. A correctly-serialized JSON value contains no unescaped +/// newline bytes; if one slips through (custom Display impl, hand-built +/// JSON, etc.) the receiving side would resync mid-frame and see +/// "invalid type / expected value" errors that crash sessions. Stripping +/// embedded raw newlines here keeps the wire frame intact and surfaces +/// the bug via the eprintln below without taking the client down. pub fn encode_event(event: &ServerEvent) -> String { let mut json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string()); + if json.as_bytes().contains(&b'\n') { + // This should be impossible with serde_json, but if it ever + // happens we'd rather log loudly and ship a still-parseable + // single-line frame than corrupt the wire. + eprintln!( + "jcode-protocol: encode_event produced embedded newline; stripping. event_kind={}", + event_kind_for_debug(event) + ); + json = json.replace('\n', " "); + } + debug_assert!( + !json.as_bytes().contains(&b'\n'), + "encode_event must not produce embedded newlines" + ); json.push('\n'); json } +fn event_kind_for_debug(event: &ServerEvent) -> &'static str { + match event { + ServerEvent::Ack { .. } => "Ack", + ServerEvent::Pong { .. } => "Pong", + ServerEvent::Error { .. } => "Error", + ServerEvent::Done { .. } => "Done", + ServerEvent::Interrupted => "Interrupted", + ServerEvent::TextDelta { .. } => "TextDelta", + ServerEvent::ToolStart { .. } => "ToolStart", + ServerEvent::ToolDone { .. } => "ToolDone", + ServerEvent::SwarmStatus { .. } => "SwarmStatus", + _ => "other", + } +} + /// Decode a request from a JSON string pub fn decode_request(line: &str) -> Result { serde_json::from_str(line) diff --git a/docker-compose.agent-db.yml b/docker-compose.agent-db.yml new file mode 100644 index 000000000..2436b2dde --- /dev/null +++ b/docker-compose.agent-db.yml @@ -0,0 +1,26 @@ +# Agent DB Substrate — local Postgres for agent analytics +# Well-known credentials; only accessible from localhost. +# Start: docker compose -f docker-compose.agent-db.yml up -d +# Stop: docker compose -f docker-compose.agent-db.yml down -v + +services: + postgres: + image: postgres:16-alpine + container_name: jcode-agent-db + restart: unless-stopped + ports: + - "5432:5432" + environment: + POSTGRES_USER: jcode_agent + POSTGRES_PASSWORD: jcode_agent_local + POSTGRES_DB: jcode_agent_workspace + volumes: + - jcode_agent_db_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U jcode_agent -d jcode_agent_workspace"] + interval: 5s + timeout: 3s + retries: 5 + +volumes: + jcode_agent_db_data: diff --git a/src/ambient/runner.rs b/src/ambient/runner.rs index 61ec9112e..d90996347 100644 --- a/src/ambient/runner.rs +++ b/src/ambient/runner.rs @@ -28,6 +28,75 @@ use tokio::sync::{Notify, RwLock}; const MAX_IDLE_POLL_SECS: u64 = 30; +/// Pure helper: given the error returned by an ambient agent turn, decide +/// whether the runner should attempt a provider switch and what label to +/// switch to. Returns `Some(target_provider_id)` when the error carries a +/// failover prompt; `None` otherwise. +/// +/// Split out from [`try_headless_provider_failover`] so it can be unit +/// tested without constructing a real `Agent`/`Provider`. +fn parse_failover_target(run_result: &anyhow::Result) -> Option { + let err = run_result.as_ref().err()?; + let prompt = crate::provider::parse_failover_prompt_message(&err.to_string())?; + Some(prompt.to_provider) +} + +/// Try to recover from a provider-level failure during a headless ambient cycle +/// by switching providers once and re-running the original message. +/// +/// Returns `true` if a switch + retry was attempted (regardless of outcome). +/// The caller is responsible for inspecting `ambient_tools::take_cycle_result()` +/// after this returns; if the retry produced a result, it will be picked up +/// by the same code path that handles a normal successful turn. +/// +/// We intentionally cap this at a single retry per cycle to avoid burning +/// through every configured account/provider in one wake-up when an outage +/// is global rather than per-provider. +async fn try_headless_provider_failover( + provider: &Arc, + agent: &mut Agent, + run_result: &anyhow::Result, + initial_message: &str, +) -> bool { + let Some(target_provider) = parse_failover_target(run_result) else { + return false; + }; + // We re-parse here for the human-readable labels in the log line; the + // pure helper above only returns the machine-readable target id. + let prompt_for_log = run_result + .as_ref() + .err() + .and_then(|e| crate::provider::parse_failover_prompt_message(&e.to_string())); + match provider.switch_active_provider_to(&target_provider) { + Ok(()) => { + if let Some(prompt) = prompt_for_log { + logging::info(&format!( + "Ambient cycle: headless failover {} → {} (reason: {}); retrying initial message", + prompt.from_label, prompt.to_label, prompt.reason + )); + } else { + logging::info(&format!( + "Ambient cycle: headless failover → {}; retrying initial message", + target_provider + )); + } + } + Err(switch_err) => { + logging::warn(&format!( + "Ambient cycle: headless failover to {} failed: {}", + target_provider, switch_err + )); + return false; + } + } + // Re-issue the original user message on the new provider. The agent's + // conversation history already contains the failed turn, which the new + // provider will see as prior assistant context; that's intentional, since + // the failover prompt explicitly notes the input would be resent. + let _ = agent.run_once_capture(initial_message).await; + true +} + /// Shared ambient runner state, accessible from the server, debug socket, and TUI. #[derive(Clone)] pub struct AmbientRunnerHandle { @@ -881,11 +950,20 @@ impl AmbientRunnerHandle { self.set_running_detail("gathering context").await; let (system_prompt, initial_message) = self.build_cycle_context(provider).await?; - // Visible mode: spawn a full TUI instead of running headlessly + // Visible mode: spawn a full TUI instead of running headlessly. + // If the configured terminal is unavailable, fall back to headless so + // external wakeups (Discord/Telegram/email) still produce a cycle. if visible { - return self - .run_cycle_visible(started_at, system_prompt, initial_message) - .await; + match self + .run_cycle_visible(started_at, system_prompt.clone(), initial_message.clone()) + .await + { + Ok(result) => return Ok(result), + Err(e) => logging::warn(&format!( + "Ambient visible cycle failed ({}), falling back to headless", + e + )), + } } // Headless mode: run agent directly @@ -927,6 +1005,27 @@ impl AmbientRunnerHandle { }); } + // If the initial turn errored with a provider-failover prompt (e.g. the + // active provider hit its rate cap), try switching providers once and + // re-running the original message. In headless mode there is no TUI to + // surface the countdown, so this is the analogue of the TUI's auto- + // switch path. A single retry keeps the blast radius bounded. + let attempted_failover = + try_headless_provider_failover(provider, &mut agent, &run_result, &initial_message) + .await; + + if attempted_failover && let Some(result) = ambient_tools::take_cycle_result() { + ambient_tools::unregister_ambient_session(&ambient_session_id); + let conversation = agent.export_conversation_markdown(); + agent.mark_closed(); + return Ok(AmbientCycleResult { + started_at, + ended_at: Utc::now(), + conversation: Some(conversation), + ..result + }); + } + // Agent didn't call end_ambient_cycle - try continuation if run_result.is_err() { logging::warn("Ambient cycle: agent error without calling end_ambient_cycle"); diff --git a/src/ambient/runner_tests.rs b/src/ambient/runner_tests.rs index 94146aaa0..75800c393 100644 --- a/src/ambient/runner_tests.rs +++ b/src/ambient/runner_tests.rs @@ -1,4 +1,5 @@ use super::AmbientRunnerHandle; +use super::parse_failover_target; use crate::ambient::{Priority, ScheduleTarget, ScheduledItem}; use crate::message::{Message, Role, StreamEvent, ToolDefinition}; use crate::provider::{EventStream, Provider}; @@ -15,6 +16,33 @@ struct EnvVarGuard { prev: Option, } +#[test] +fn parse_failover_target_extracts_provider_from_failover_prompt_error() { + let prompt = crate::provider::ProviderFailoverPrompt { + from_provider: "openai".to_string(), + from_label: "OpenAI".to_string(), + to_provider: "claude".to_string(), + to_label: "Anthropic".to_string(), + reason: "rate limit reached".to_string(), + estimated_input_chars: 1234, + estimated_input_tokens: 567, + }; + let err: anyhow::Result = Err(anyhow::anyhow!(prompt.to_error_message())); + assert_eq!(parse_failover_target(&err).as_deref(), Some("claude")); +} + +#[test] +fn parse_failover_target_returns_none_for_unrelated_errors() { + let err: anyhow::Result = Err(anyhow::anyhow!("network timeout after 30s")); + assert!(parse_failover_target(&err).is_none()); +} + +#[test] +fn parse_failover_target_returns_none_for_ok_results() { + let ok: anyhow::Result = Ok("done".to_string()); + assert!(parse_failover_target(&ok).is_none()); +} + impl EnvVarGuard { fn set_path(key: &'static str, value: &std::path::Path) -> Self { let prev = std::env::var_os(key); diff --git a/src/cli/provider_init.rs b/src/cli/provider_init.rs index 09616f367..8b76502f7 100644 --- a/src/cli/provider_init.rs +++ b/src/cli/provider_init.rs @@ -1225,6 +1225,9 @@ async fn init_provider_with_options( crate::env::set_var("JCODE_PROVIDER_PROFILE_ACTIVE", "1"); } + let cfg = crate::config::config(); + crate::provider_catalog::rehydrate_active_named_provider_profile_env_from_config(cfg)?; + if std::env::var_os("JCODE_PROVIDER_PROFILE_ACTIVE").is_none() && std::env::var_os("JCODE_NAMED_PROVIDER_PROFILE").is_none() { diff --git a/src/prompt.rs b/src/prompt.rs index 0431a468d..1cfce0583 100644 --- a/src/prompt.rs +++ b/src/prompt.rs @@ -556,6 +556,7 @@ fn gpu_summary() -> Option { /// Load AGENTS.md files from a specific working directory pub fn load_agents_md_files_from_dir(working_dir: Option<&Path>) -> (Option, ContextInfo) { let mut contents = vec![]; + let mut loaded_paths = Vec::new(); let mut info = ContextInfo::default(); // Helper to load a file if it exists, returns (formatted_content, raw_size) @@ -573,17 +574,23 @@ pub fn load_agents_md_files_from_dir(working_dir: Option<&Path>) -> (Option = None; + match crate::provider_catalog::rehydrate_active_named_provider_profile_env_from_config(cfg) + { + Ok(Some(profile_name)) => default_named_provider_profile = Some(profile_name), + Ok(None) => {} + Err(err) => crate::logging::warn(&format!( + "Failed to rehydrate active provider profile from config: {}", + err + )), + } if std::env::var_os("JCODE_PROVIDER_PROFILE_ACTIVE").is_none() && std::env::var_os("JCODE_NAMED_PROVIDER_PROFILE").is_none() && let Some(pref) = provider_state.default_provider_key() diff --git a/src/provider_catalog.rs b/src/provider_catalog.rs index 61931f1f1..fb87a8f2b 100644 --- a/src/provider_catalog.rs +++ b/src/provider_catalog.rs @@ -480,6 +480,29 @@ pub fn apply_named_provider_profile_env(profile_name: &str) -> anyhow::Result anyhow::Result> { + let profile_name = std::env::var("JCODE_NAMED_PROVIDER_PROFILE") + .ok() + .or_else(|| std::env::var("JCODE_PROVIDER_PROFILE_NAME").ok()) + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()); + + let Some(profile_name) = profile_name else { + return Ok(None); + }; + + if !config.providers.contains_key(&profile_name) { + return Ok(None); + } + + let profile_name = apply_named_provider_profile_env_from_config(&profile_name, config)?; + crate::env::set_var("JCODE_PROVIDER_PROFILE_NAME", &profile_name); + crate::env::set_var("JCODE_PROVIDER_PROFILE_ACTIVE", "1"); + Ok(Some(profile_name)) +} + pub fn apply_named_provider_profile_env_from_config( profile_name: &str, config: &crate::config::Config, diff --git a/src/provider_catalog_tests.rs b/src/provider_catalog_tests.rs index 4c4de00ed..3379327be 100644 --- a/src/provider_catalog_tests.rs +++ b/src/provider_catalog_tests.rs @@ -504,6 +504,64 @@ fn named_provider_profile_maps_to_openai_compatible_runtime_env() { ); } +#[test] +fn active_named_provider_profile_is_rehydrated_from_config_when_env_is_partial() { + let _lock = crate::storage::lock_test_env(); + let _guard = EnvGuard::save(&[ + "JCODE_OPENROUTER_API_BASE", + "JCODE_OPENROUTER_API_KEY_NAME", + "JCODE_OPENROUTER_ENV_FILE", + "JCODE_OPENROUTER_CACHE_NAMESPACE", + "JCODE_OPENROUTER_MODEL", + "JCODE_NAMED_PROVIDER_PROFILE", + "JCODE_PROVIDER_PROFILE_NAME", + "JCODE_PROVIDER_PROFILE_ACTIVE", + ]); + + let cfg: crate::config::Config = toml::from_str( + r#" + [providers.ollama-cloud] + type = "openai-compatible" + base_url = "https://ollama.com/v1" + auth = "bearer" + api_key_env = "OLLAMA_API_KEY" + env_file = "ollama.env" + default_model = "deepseek-v4-pro" + "#, + ) + .expect("config should parse"); + + crate::env::set_var("JCODE_NAMED_PROVIDER_PROFILE", "ollama-cloud"); + crate::env::remove_var("JCODE_OPENROUTER_API_BASE"); + crate::env::remove_var("JCODE_OPENROUTER_API_KEY_NAME"); + crate::env::remove_var("JCODE_OPENROUTER_ENV_FILE"); + + let applied = rehydrate_active_named_provider_profile_env_from_config(&cfg) + .expect("rehydrate should succeed"); + + assert_eq!(applied.as_deref(), Some("ollama-cloud")); + assert_eq!( + std::env::var("JCODE_OPENROUTER_API_BASE").ok().as_deref(), + Some("https://ollama.com/v1") + ); + assert_eq!( + std::env::var("JCODE_OPENROUTER_API_KEY_NAME") + .ok() + .as_deref(), + Some("OLLAMA_API_KEY") + ); + assert_eq!( + std::env::var("JCODE_OPENROUTER_ENV_FILE").ok().as_deref(), + Some("ollama.env") + ); + assert_eq!( + std::env::var("JCODE_PROVIDER_PROFILE_ACTIVE") + .ok() + .as_deref(), + Some("1") + ); +} + #[test] fn named_provider_inline_api_key_is_private_runtime_fallback() { let _lock = crate::storage::lock_test_env(); diff --git a/src/server.rs b/src/server.rs index 714522807..5993ca95c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -15,6 +15,7 @@ mod comm_control; mod comm_plan; mod comm_session; mod comm_sync; +mod config_watcher; mod debug; mod debug_ambient; mod debug_command_exec; @@ -913,6 +914,12 @@ impl Server { .await; }); + // Watch config and instruction files from inside the shared daemon. New + // client processes often attach to an already-running server, so relying + // on the launcher to re-read config is not enough. A changed config must + // make the daemon exec a fresh copy before future sessions are created. + config_watcher::spawn_config_reload_watcher(self.identity.git_hash.clone()); + // Log when we receive SIGTERM for debugging #[cfg(unix)] { diff --git a/src/server/client_session.rs b/src/server/client_session.rs index 1e6b635d3..b861254f4 100644 --- a/src/server/client_session.rs +++ b/src/server/client_session.rs @@ -470,18 +470,22 @@ pub(super) async fn handle_subscribe( } } if let Some(new_id) = new_coordinator.clone() { - let members = swarm_members.read().await; - if let Some(member) = members.get(&new_id) { - let _ = member.event_tx.send(ServerEvent::Notification { - from_session: new_id.clone(), - from_name: member.friendly_name.clone(), - notification_type: NotificationType::Message { - scope: Some("swarm".to_string()), - channel: None, - }, - message: "You are now the coordinator for this swarm.".to_string(), - }); - } + let friendly_name = { + let members = swarm_members.read().await; + members + .get(&new_id) + .and_then(|member| member.friendly_name.clone()) + }; + let event = ServerEvent::Notification { + from_session: new_id.clone(), + from_name: friendly_name, + notification_type: NotificationType::Message { + scope: Some("swarm".to_string()), + channel: None, + }, + message: "You are now the coordinator for this swarm.".to_string(), + }; + let _ = super::fanout_session_event(swarm_members, &new_id, event).await; } } } diff --git a/src/server/comm_control.rs b/src/server/comm_control.rs index 594e35f2b..b6995ee51 100644 --- a/src/server/comm_control.rs +++ b/src/server/comm_control.rs @@ -1066,17 +1066,16 @@ pub(super) async fn handle_comm_assign_task( sessions, ) .await; - if let Some(member) = swarm_members.read().await.get(&target_session) { - let _ = member.event_tx.send(ServerEvent::Notification { - from_session: req_session_id.clone(), - from_name: coordinator_name.clone(), - notification_type: NotificationType::Message { - scope: Some("dm".to_string()), - channel: None, - }, - message: notification, - }); - } + let dm_event = ServerEvent::Notification { + from_session: req_session_id.clone(), + from_name: coordinator_name.clone(), + notification_type: NotificationType::Message { + scope: Some("dm".to_string()), + channel: None, + }, + message: notification, + }; + let _ = super::fanout_session_event(swarm_members, &target_session, dm_event).await; let target_has_client = { let connections = client_connections.read().await; @@ -1116,22 +1115,20 @@ pub(super) async fn handle_comm_assign_task( "Plan updated: task '{}' assigned to {}.", selected_task_id, target_session ); - let members = swarm_members.read().await; for sid in participant_ids { if sid == target_session || sid == req_session_id { continue; } - if let Some(member) = members.get(&sid) { - let _ = member.event_tx.send(ServerEvent::Notification { - from_session: req_session_id.clone(), - from_name: coordinator_name.clone(), - notification_type: NotificationType::Message { - scope: Some("plan".to_string()), - channel: None, - }, - message: plan_msg.clone(), - }); - } + let plan_event = ServerEvent::Notification { + from_session: req_session_id.clone(), + from_name: coordinator_name.clone(), + notification_type: NotificationType::Message { + scope: Some("plan".to_string()), + channel: None, + }, + message: plan_msg.clone(), + }; + let _ = super::fanout_session_event(swarm_members, &sid, plan_event).await; } finish_swarm_mutation_request( diff --git a/src/server/comm_plan.rs b/src/server/comm_plan.rs index 5a9b9cc70..35f018823 100644 --- a/src/server/comm_plan.rs +++ b/src/server/comm_plan.rs @@ -97,7 +97,6 @@ pub(super) async fn handle_comm_propose_plan( (plan.version, plan.participants.clone()) }; - let members = swarm_members.read().await; let notification_msg = format!( "Plan updated by {} ({} items, v{})", from_label, @@ -108,17 +107,16 @@ pub(super) async fn handle_comm_propose_plan( if sid == req_session_id { continue; } - if let Some(member) = members.get(&sid) { - let _ = member.event_tx.send(ServerEvent::Notification { - from_session: req_session_id.clone(), - from_name: from_name.clone(), - notification_type: NotificationType::Message { - scope: Some("plan".to_string()), - channel: None, - }, - message: notification_msg.clone(), - }); - } + let plan_event = ServerEvent::Notification { + from_session: req_session_id.clone(), + from_name: from_name.clone(), + notification_type: NotificationType::Message { + scope: Some("plan".to_string()), + channel: None, + }, + message: notification_msg.clone(), + }; + let _ = super::fanout_session_event(swarm_members, &sid, plan_event).await; let _ = queue_soft_interrupt_for_session( &sid, notification_msg.clone(), diff --git a/src/server/config_watcher.rs b/src/server/config_watcher.rs new file mode 100644 index 000000000..1c4a4c07f --- /dev/null +++ b/src/server/config_watcher.rs @@ -0,0 +1,189 @@ +use std::collections::HashSet; +use std::path::{Path, PathBuf}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +const CONFIG_RELOAD_WATCH_INTERVAL: Duration = Duration::from_secs(2); +const CONFIG_RELOAD_SETTLE_DELAY: Duration = Duration::from_millis(500); + +#[derive(Debug, Clone, PartialEq, Eq)] +struct WatchedFileState { + path: PathBuf, + exists: bool, + modified: Option, + len: Option, +} + +pub(super) fn spawn_config_reload_watcher(server_git_hash: String) { + if config_reload_watcher_disabled_by_env() { + crate::logging::info("Server config reload watcher disabled by JCODE_CONFIG_AUTO_RELOAD=0"); + return; + } + + tokio::spawn(async move { + run_config_reload_watcher(server_git_hash).await; + }); +} + +async fn run_config_reload_watcher(server_git_hash: String) { + let mut previous = watched_config_state(); + let mut interval = tokio::time::interval(CONFIG_RELOAD_WATCH_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + let current = watched_config_state(); + if current == previous { + continue; + } + + tokio::time::sleep(CONFIG_RELOAD_SETTLE_DELAY).await; + let settled = watched_config_state(); + previous = settled.clone(); + + // Force the reloadable config cache to observe the changed file before + // deciding whether process reloads are enabled. This means flipping + // display.auto_server_reload to false takes effect without restarting. + if !config_auto_server_reload_enabled() { + crate::logging::info( + "Jcode config/instruction files changed; server auto reload is disabled by display.auto_server_reload=false", + ); + continue; + } + + let changed_paths = format_watched_paths(&settled); + crate::logging::info(&format!( + "Jcode config/instruction files changed ({}); reloading shared server so future sessions use fresh configuration", + changed_paths + )); + let reload_hash = format!( + "{}:config:{}", + server_git_hash, + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_millis()) + .unwrap_or_default() + ); + super::send_reload_signal(reload_hash, None, false); + } +} + +fn config_auto_server_reload_enabled() -> bool { + if config_reload_watcher_disabled_by_env() { + return false; + } + crate::config::config().display.auto_server_reload +} + +fn config_reload_watcher_disabled_by_env() -> bool { + std::env::var("JCODE_CONFIG_AUTO_RELOAD") + .ok() + .map(|value| { + let value = value.trim(); + value == "0" || value.eq_ignore_ascii_case("false") || value.eq_ignore_ascii_case("off") + }) + .unwrap_or(false) +} + +fn watched_config_state() -> Vec { + watched_config_paths() + .into_iter() + .map(|path| watched_file_state(path.as_path())) + .collect() +} + +fn watched_config_paths() -> Vec { + let mut paths = Vec::new(); + + if let Some(path) = crate::config::Config::path() { + paths.push(path); + } + + if let Ok(jcode_dir) = crate::storage::jcode_dir() { + paths.push(jcode_dir.join("prompt-overlay.md")); + } + + if let Ok(home_agents) = crate::storage::user_home_path("AGENTS.md") { + paths.push(home_agents); + } + + if let Ok(current_dir) = std::env::current_dir() { + paths.push(current_dir.join("AGENTS.md")); + paths.push(current_dir.join(".jcode").join("prompt-overlay.md")); + } + + dedupe_paths(paths) +} + +fn watched_file_state(path: &Path) -> WatchedFileState { + let metadata = std::fs::metadata(path).ok(); + WatchedFileState { + path: path.to_path_buf(), + exists: metadata.is_some(), + modified: metadata + .as_ref() + .and_then(|metadata| metadata.modified().ok()), + len: metadata.as_ref().map(std::fs::Metadata::len), + } +} + +fn dedupe_paths(paths: Vec) -> Vec { + let mut seen = HashSet::new(); + let mut deduped = Vec::new(); + + for path in paths { + let key = path + .canonicalize() + .unwrap_or_else(|_| path.clone()) + .to_string_lossy() + .to_string(); + if seen.insert(key) { + deduped.push(path); + } + } + + deduped +} + +fn format_watched_paths(states: &[WatchedFileState]) -> String { + states + .iter() + .filter(|state| state.exists) + .map(|state| state.path.display().to_string()) + .collect::>() + .join(", ") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn watched_file_state_changes_when_file_is_created() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = dir.path().join("config.toml"); + + let before = watched_file_state(&path); + std::fs::write(&path, "[display]\nauto_server_reload = true\n").expect("write config"); + let after = watched_file_state(&path); + + assert!(!before.exists); + assert!(after.exists); + assert_ne!(before, after); + } + + #[test] + fn dedupe_paths_collapses_symlinked_instruction_files() { + let dir = tempfile::tempdir().expect("tempdir"); + let target = dir.path().join("AGENTS.md"); + let link = dir.path().join(".AGENTS.md"); + std::fs::write(&target, "instructions").expect("write target"); + + #[cfg(unix)] + std::os::unix::fs::symlink(&target, &link).expect("symlink"); + #[cfg(windows)] + std::os::windows::fs::symlink_file(&target, &link).expect("symlink"); + + let deduped = dedupe_paths(vec![target.clone(), link]); + assert_eq!(deduped, vec![target]); + } +} diff --git a/src/server/debug_swarm_write.rs b/src/server/debug_swarm_write.rs index 11883fb25..5b4b396fd 100644 --- a/src/server/debug_swarm_write.rs +++ b/src/server/debug_swarm_write.rs @@ -81,29 +81,43 @@ pub(super) async fn maybe_handle_swarm_write_command( }; if let Some(swarm_id) = swarm_id { - let swarms = ctx.swarms_by_id.read().await; - let members = ctx.swarm_members.read().await; - let current_session = ctx.session_id.read().await; - let from_name = members - .get(&*current_session) - .and_then(|member| member.friendly_name.clone()); + // Snapshot recipient list and the from_name under the read locks, + // then drop the locks before calling fanout (which needs the + // swarm_members write lock). + let (recipients, from_name, from_session) = { + let swarms = ctx.swarms_by_id.read().await; + let members = ctx.swarm_members.read().await; + let current_session = ctx.session_id.read().await; + let from_name = members + .get(&*current_session) + .and_then(|member| member.friendly_name.clone()); + let recipients: Vec = swarms + .get(&swarm_id) + .map(|ids| ids.iter().cloned().collect()) + .unwrap_or_default(); + (recipients, from_name, current_session.clone()) + }; - if let Some(member_ids) = swarms.get(&swarm_id) { + if !recipients.is_empty() { let mut sent_count = 0; - for member_id in member_ids { - if let Some(member) = members.get(member_id) { - let notification = ServerEvent::Notification { - from_session: current_session.clone(), - from_name: from_name.clone(), - notification_type: NotificationType::Message { - scope: Some("broadcast".to_string()), - channel: None, - }, - message: message.clone(), - }; - if member.event_tx.send(notification).is_ok() { - sent_count += 1; - } + for member_id in &recipients { + let notification = ServerEvent::Notification { + from_session: from_session.clone(), + from_name: from_name.clone(), + notification_type: NotificationType::Message { + scope: Some("broadcast".to_string()), + channel: None, + }, + message: message.clone(), + }; + let delivered = super::fanout_session_event( + ctx.swarm_members, + member_id, + notification, + ) + .await; + if delivered > 0 { + sent_count += 1; } } return Ok(Some( @@ -134,27 +148,44 @@ pub(super) async fn maybe_handle_swarm_write_command( return Err(anyhow::anyhow!("swarm:notify requires a message")); } - let members = ctx.swarm_members.read().await; - let current_session = ctx.session_id.read().await; - let from_name = members - .get(&*current_session) - .and_then(|member| member.friendly_name.clone()); + // Snapshot from_name and target friendly_name under the read + // lock, then drop it before fanning out. + let (from_name, from_session, target_exists, target_friendly_name) = { + let members = ctx.swarm_members.read().await; + let current_session = ctx.session_id.read().await; + let from_name = members + .get(&*current_session) + .and_then(|member| member.friendly_name.clone()); + let target = members.get(target_session); + ( + from_name, + current_session.clone(), + target.is_some(), + target.and_then(|m| m.friendly_name.clone()), + ) + }; - if let Some(target) = members.get(target_session) { + if target_exists { let notification = ServerEvent::Notification { - from_session: current_session.clone(), - from_name: from_name.clone(), + from_session, + from_name, notification_type: NotificationType::Message { scope: Some("dm".to_string()), channel: None, }, message: message.to_string(), }; - if target.event_tx.send(notification).is_ok() { + let delivered = super::fanout_session_event( + ctx.swarm_members, + target_session, + notification, + ) + .await; + if delivered > 0 { return Ok(Some( serde_json::json!({ "sent_to": target_session, - "sent_to_name": target.friendly_name.clone(), + "sent_to_name": target_friendly_name, "message": message, }) .to_string(), @@ -226,12 +257,9 @@ pub(super) async fn maybe_handle_swarm_write_command( .map(|sessions| sessions.iter().cloned().collect()) .unwrap_or_default() }; - let members = ctx.swarm_members.read().await; for sid in &swarm_session_ids { - if sid != acting_session - && let Some(member) = members.get(sid) - { - let _ = member.event_tx.send(ServerEvent::Notification { + if sid != acting_session { + let notification = ServerEvent::Notification { from_session: acting_session.to_string(), from_name: friendly_name.clone(), notification_type: NotificationType::SharedContext { @@ -239,7 +267,10 @@ pub(super) async fn maybe_handle_swarm_write_command( value: value.clone(), }, message: format!("Shared context: {} = {}", key, value), - }); + }; + let _ = + super::fanout_session_event(ctx.swarm_members, sid, notification) + .await; } } diff --git a/src/server/state.rs b/src/server/state.rs index ef5ccbf68..153f46a1a 100644 --- a/src/server/state.rs +++ b/src/server/state.rs @@ -298,7 +298,15 @@ pub(super) async fn register_session_event_sender( ) { let mut members = swarm_members.write().await; if let Some(member) = members.get_mut(session_id) { - member.event_tx = event_tx.clone(); + // Only adopt this sender as the singular fallback when the existing + // one is closed (e.g. the headless intake or a prior connection has + // gone away). Overwriting an already-live fallback would shift where + // subsequent direct `member.event_tx` users send their payloads, + // which is one of the root causes of cross-connection wire + // corruption observed in production logs. + if member.event_tx.is_closed() { + member.event_tx = event_tx.clone(); + } member.event_txs.insert(connection_id.to_string(), event_tx); } } @@ -311,9 +319,12 @@ pub(super) async fn unregister_session_event_sender( let mut members = swarm_members.write().await; if let Some(member) = members.get_mut(session_id) { member.event_txs.remove(connection_id); - if let Some((_, tx)) = member.event_txs.iter().next() { - member.event_tx = tx.clone(); - } + // Intentionally do NOT silently re-point `member.event_tx` here. The + // singular `event_tx` is a fallback set at member creation (headless + // spawn intake). Silently swapping it to a surviving connection's + // sender means later code that reads `member.event_tx` can deliver + // events to a different connection's writer than intended, which has + // shown up as cross-connection protocol corruption in the wild. } } @@ -333,9 +344,11 @@ pub(super) async fn fanout_session_event( if member.event_txs.is_empty() { vec![member.event_tx.clone()] } else { - if let Some((_, tx)) = member.event_txs.iter().next() { - member.event_tx = tx.clone(); - } + // Snapshot all live attachments. Do not mutate `member.event_tx` + // here: this function is called from many fanout sites and the + // HashMap iteration order is non-deterministic, so re-pointing + // the singular fallback would silently re-route subsequent + // direct uses of `member.event_tx` to an arbitrary connection. member.event_txs.values().cloned().collect::>() } }; @@ -564,3 +577,231 @@ pub(super) async fn queue_soft_interrupt_for_session( }) } } + +#[cfg(test)] +mod multi_connection_protocol_tests { + //! Regression tests for the multi-session protocol corruption that showed + //! up as truncated/interleaved JSON lines on the wire (e.g. + //! `nt","is_headless":true,...`) and `Remote protocol error is not + //! retryable` reconnect-loop terminations in + //! `~/.jcode/logs/jcode-2026-05-21.log`. + //! + //! Root cause: several server code paths used the singular fallback + //! `member.event_tx` directly instead of fanning out to all + //! `member.event_txs`, AND `register/unregister/fanout_session_event` + //! silently overwrote `member.event_tx` to point at whichever + //! connection's writer happened to be touched last. That meant a `send` + //! intended for one client's writer could land on another client's + //! writer mid-line, splicing event tails into unrelated frames. + + use super::*; + use crate::protocol::ServerEvent; + + fn fresh_member( + session_id: &str, + fallback_tx: mpsc::UnboundedSender, + ) -> SwarmMember { + SwarmMember { + session_id: session_id.to_string(), + event_tx: fallback_tx, + event_txs: HashMap::new(), + working_dir: None, + swarm_id: None, + swarm_enabled: false, + status: "ready".to_string(), + detail: None, + friendly_name: None, + report_back_to_session_id: None, + latest_completion_report: None, + role: "agent".to_string(), + joined_at: Instant::now(), + last_status_change: Instant::now(), + is_headless: true, + } + } + + fn drain(rx: &mut mpsc::UnboundedReceiver) -> Vec { + let mut out = Vec::new(); + while let Ok(v) = rx.try_recv() { + out.push(v); + } + out + } + + /// `register_session_event_sender` must NOT silently overwrite the + /// singular `event_tx` fallback while it is still live. Doing so caused + /// later direct `member.event_tx.send(...)` callers to deliver to a + /// different connection's writer than intended. + #[tokio::test] + async fn register_does_not_overwrite_live_fallback_sender() { + let (fallback_tx, mut fallback_rx) = mpsc::unbounded_channel::(); + let (conn_a_tx, _conn_a_rx) = mpsc::unbounded_channel::(); + + let mut map = HashMap::new(); + map.insert("sess-1".to_string(), fresh_member("sess-1", fallback_tx)); + let members = Arc::new(RwLock::new(map)); + + register_session_event_sender(&members, "sess-1", "conn-a", conn_a_tx).await; + + // Fallback must still point at the original headless intake. + let guard = members.read().await; + let member = guard.get("sess-1").expect("member exists"); + member + .event_tx + .send(ServerEvent::Ack { id: 7 }) + .expect("fallback sender still alive"); + drop(guard); + + let delivered = drain(&mut fallback_rx); + assert_eq!(delivered.len(), 1, "fallback receiver got the event"); + } + + /// When the live fallback is closed, `register_session_event_sender` is + /// allowed to adopt the new sender so the member is not stranded. + #[tokio::test] + async fn register_adopts_new_sender_when_fallback_is_closed() { + let (fallback_tx, fallback_rx) = mpsc::unbounded_channel::(); + drop(fallback_rx); // closes the channel + + let (conn_a_tx, mut conn_a_rx) = mpsc::unbounded_channel::(); + + let mut map = HashMap::new(); + map.insert("sess-1".to_string(), fresh_member("sess-1", fallback_tx)); + let members = Arc::new(RwLock::new(map)); + + register_session_event_sender(&members, "sess-1", "conn-a", conn_a_tx).await; + + let guard = members.read().await; + let member = guard.get("sess-1").expect("member exists"); + member + .event_tx + .send(ServerEvent::Ack { id: 42 }) + .expect("new fallback should be live"); + drop(guard); + + let delivered = drain(&mut conn_a_rx); + assert_eq!(delivered.len(), 1, "new live conn picked up the fallback"); + } + + /// `unregister_session_event_sender` must not re-point the singular + /// `event_tx` to a surviving connection. The fallback is owned by + /// member-creation (headless intake) and silently swapping it caused + /// cross-wired writes between connections. + #[tokio::test] + async fn unregister_does_not_repoint_fallback_to_survivor() { + let (fallback_tx, mut fallback_rx) = mpsc::unbounded_channel::(); + let (conn_a_tx, _conn_a_rx) = mpsc::unbounded_channel::(); + let (conn_b_tx, mut conn_b_rx) = mpsc::unbounded_channel::(); + + let mut map = HashMap::new(); + map.insert("sess-1".to_string(), fresh_member("sess-1", fallback_tx)); + let members = Arc::new(RwLock::new(map)); + + register_session_event_sender(&members, "sess-1", "conn-a", conn_a_tx).await; + register_session_event_sender(&members, "sess-1", "conn-b", conn_b_tx).await; + + // conn-a goes away. + unregister_session_event_sender(&members, "sess-1", "conn-a").await; + + // Sending to the singular `event_tx` must land on the original + // fallback receiver, NOT on conn-b's writer. + { + let guard = members.read().await; + let member = guard.get("sess-1").expect("member exists"); + member + .event_tx + .send(ServerEvent::Ack { id: 1 }) + .expect("fallback still alive"); + } + + let fallback_msgs = drain(&mut fallback_rx); + let conn_b_msgs = drain(&mut conn_b_rx); + assert_eq!( + fallback_msgs.len(), + 1, + "fallback receiver should get the event" + ); + assert!( + conn_b_msgs.is_empty(), + "surviving connection must NOT silently absorb fallback traffic" + ); + } + + /// `fanout_session_event` must deliver to every live attachment exactly + /// once and must not mutate the singular `event_tx` to point at one of + /// them. Without this guarantee, subsequent direct `event_tx` users + /// would deliver to an arbitrary connection (HashMap iteration order), + /// which is exactly the wire-corruption pattern observed in production. + #[tokio::test] + async fn fanout_delivers_to_all_connections_and_does_not_mutate_fallback() { + let (fallback_tx, mut fallback_rx) = mpsc::unbounded_channel::(); + let (conn_a_tx, mut conn_a_rx) = mpsc::unbounded_channel::(); + let (conn_b_tx, mut conn_b_rx) = mpsc::unbounded_channel::(); + + let mut map = HashMap::new(); + map.insert("sess-1".to_string(), fresh_member("sess-1", fallback_tx)); + let members = Arc::new(RwLock::new(map)); + + register_session_event_sender(&members, "sess-1", "conn-a", conn_a_tx).await; + register_session_event_sender(&members, "sess-1", "conn-b", conn_b_tx).await; + + let delivered = + fanout_session_event(&members, "sess-1", ServerEvent::Ack { id: 99 }).await; + assert_eq!(delivered, 2, "fanout reaches both live attachments"); + + let a = drain(&mut conn_a_rx); + let b = drain(&mut conn_b_rx); + assert_eq!(a.len(), 1, "conn-a got exactly one copy"); + assert_eq!(b.len(), 1, "conn-b got exactly one copy"); + + // Fallback receiver must NOT have received anything: when live + // attachments exist, fanout snapshots `event_txs` and does NOT + // duplicate to `event_tx`. + let fb = drain(&mut fallback_rx); + assert!( + fb.is_empty(), + "fanout must not duplicate to the singular fallback when live conns exist" + ); + + // And the singular `event_tx` must STILL be the original fallback + // (i.e. fanout did not silently re-point it). The cheapest check is + // to send via `event_tx` and confirm the fallback receiver gets it. + { + let guard = members.read().await; + let member = guard.get("sess-1").expect("member exists"); + member + .event_tx + .send(ServerEvent::Ack { id: 100 }) + .expect("fallback sender still alive"); + } + let fb_after = drain(&mut fallback_rx); + assert_eq!( + fb_after.len(), + 1, + "singular event_tx must remain pointed at the original fallback" + ); + assert!( + drain(&mut conn_a_rx).is_empty() + && drain(&mut conn_b_rx).is_empty(), + "direct fallback send must NOT bleed into live connections" + ); + } + + /// When no live attachments are registered, fanout falls back to the + /// singular `event_tx` so headless-only sessions still receive events. + #[tokio::test] + async fn fanout_falls_back_to_singular_sender_when_no_live_conns() { + let (fallback_tx, mut fallback_rx) = mpsc::unbounded_channel::(); + + let mut map = HashMap::new(); + map.insert("sess-1".to_string(), fresh_member("sess-1", fallback_tx)); + let members = Arc::new(RwLock::new(map)); + + let delivered = + fanout_session_event(&members, "sess-1", ServerEvent::Ack { id: 1 }).await; + assert_eq!(delivered, 1, "fanout used the singular fallback"); + + let msgs = drain(&mut fallback_rx); + assert_eq!(msgs.len(), 1, "fallback receiver got the event"); + } +} diff --git a/src/server/swarm.rs b/src/server/swarm.rs index c21956b66..db760ebc1 100644 --- a/src/server/swarm.rs +++ b/src/server/swarm.rs @@ -432,11 +432,12 @@ pub(super) async fn broadcast_swarm_plan_with_previous( summary: Some(summary), }; - let members = swarm_members.read().await; for sid in participants { - if let Some(member) = members.get(&sid) { - let _ = member.event_tx.send(event.clone()); - } + // Use fanout so every live attachment of this session receives the + // event; falling back to a single `member.event_tx` here can pick a + // stale or wrong-connection sender and drops events for other + // attachments. + let _ = super::fanout_session_event(swarm_members, &sid, event.clone()).await; } } @@ -565,18 +566,24 @@ pub(super) async fn remove_session_from_swarm( if let Some(vp) = plans.get_mut(swarm_id) { vp.participants.insert(new_id.clone()); } - let members = swarm_members.read().await; - if let Some(member) = members.get(&new_id) { - let _ = member.event_tx.send(ServerEvent::Notification { - from_session: new_id.clone(), - from_name: member.friendly_name.clone(), - notification_type: NotificationType::Message { - scope: Some("swarm".to_string()), - channel: None, - }, - message: "You are now the coordinator for this swarm.".to_string(), - }); - } + drop(plans); + let friendly_name = { + let members = swarm_members.read().await; + members + .get(&new_id) + .and_then(|member| member.friendly_name.clone()) + }; + let notification = ServerEvent::Notification { + from_session: new_id.clone(), + from_name: friendly_name, + notification_type: NotificationType::Message { + scope: Some("swarm".to_string()), + channel: None, + }, + message: "You are now the coordinator for this swarm.".to_string(), + }; + // Fan out to every live attachment of the promoted session. + let _ = super::fanout_session_event(swarm_members, &new_id, notification).await; } } diff --git a/src/server/swarm_persistence_tests.rs b/src/server/swarm_persistence_tests.rs index 2b930d19d..172ec4ef6 100644 --- a/src/server/swarm_persistence_tests.rs +++ b/src/server/swarm_persistence_tests.rs @@ -3,6 +3,13 @@ use std::time::Instant; struct EnvGuard { runtime: Option, + // Hold the global test-env mutex for the lifetime of the guard so that + // concurrent tests cannot race on `JCODE_RUNTIME_DIR`. Previously this + // guard was dropped at the end of `test_env(...)`, allowing other tests + // running in parallel to overwrite the env var mid-test and corrupt + // `load_runtime_state()` results. The field is intentionally read-only; + // we only need RAII. + _lock: std::sync::MutexGuard<'static, ()>, } impl Drop for EnvGuard { @@ -16,10 +23,13 @@ impl Drop for EnvGuard { } fn test_env(dir: &tempfile::TempDir) -> EnvGuard { - let _guard = storage::lock_test_env(); + let lock = storage::lock_test_env(); let previous = std::env::var_os("JCODE_RUNTIME_DIR"); crate::env::set_var("JCODE_RUNTIME_DIR", dir.path()); - EnvGuard { runtime: previous } + EnvGuard { + runtime: previous, + _lock: lock, + } } #[test] diff --git a/src/tool/db_execute.rs b/src/tool/db_execute.rs new file mode 100644 index 000000000..f879b77bd --- /dev/null +++ b/src/tool/db_execute.rs @@ -0,0 +1,161 @@ +use super::{Tool, ToolContext, ToolOutput}; +use anyhow::Result; +use async_trait::async_trait; +use serde::Deserialize; +use serde_json::{Value, json}; +use std::process::Stdio; +use tokio::process::Command as TokioCommand; + +const DB_EXECUTE_DESCRIPTION: &str = "Execute a SQL statement against the agent's local Postgres database. Statements run as a per-session role that owns the session's schema; agents cannot access other sessions' data. Use for CREATE TABLE, INSERT, UPDATE, DELETE, SELECT, DROP TABLE, etc. For queries that may return large results, limit with SQL clauses."; + +pub struct DbExecuteTool; + +impl DbExecuteTool { + pub fn new() -> Self { + Self + } +} + +#[derive(Deserialize)] +struct DbExecuteInput { + sql: String, +} + +/// Build a db-execute tool that scopes SQL to the agent's session schema. +/// The container and credentials are well-known localhost defaults. +fn agent_schema_name(session_id: &str) -> String { + // Sanitize: schema names must start with a letter or underscore, + // contain only lowercase letters, digits, and underscores, and be <= 63 chars. + let sanitized: String = session_id + .chars() + .map(|c| { + if c.is_ascii_alphanumeric() || c == '_' { + c.to_ascii_lowercase() + } else { + '_' + } + }) + .collect(); + // Ensure it starts with a letter + let prefixed = if sanitized.starts_with(|c: char| c.is_ascii_alphabetic()) { + sanitized + } else { + format!("a_{}", sanitized) + }; + // Truncate to 30 chars, then add "agent_" prefix (fits within 63-char limit) + let short: String = prefixed.chars().take(30).collect(); + format!("agent_{}", short) +} + +fn provision_role_and_schema_sql(schema: &str) -> String { + // Creates a NOLOGIN role for the session (if missing), grants it to + // jcode_agent, creates/owns the schema, and sets the effective role + // + search_path. All SQL from the agent runs as this per-session role, + // which owns its schema but has no USAGE on any other agent's schema. + format!( + "DO $$\n\ + BEGIN\n\ + IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{schema}') THEN\n\ + CREATE ROLE {schema} NOLOGIN;\n\ + END IF;\n\ + END\n\ + $$;\n\ + GRANT {schema} TO jcode_agent;\n\ + CREATE SCHEMA IF NOT EXISTS {schema} AUTHORIZATION {schema};\n\ + ALTER SCHEMA {schema} OWNER TO {schema};\n\ + SET ROLE {schema};\n\ + SET search_path TO {schema};" + ) +} + +#[async_trait] +impl Tool for DbExecuteTool { + fn name(&self) -> &str { + "db-execute" + } + + fn description(&self) -> &str { + DB_EXECUTE_DESCRIPTION + } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "required": ["sql"], + "properties": { + "intent": super::intent_schema_property(), + "sql": { + "type": "string", + "description": "SQL statement to execute. Scoped to agent's schema." + } + } + }) + } + + async fn execute(&self, input: Value, ctx: ToolContext) -> Result { + let params: DbExecuteInput = serde_json::from_value(input)?; + let schema = agent_schema_name(&ctx.session_id); + + let full_sql = format!( + "{}\n{}", + provision_role_and_schema_sql(&schema), + params.sql.trim() + ); + + let result = run_psql(&full_sql).await?; + Ok(ToolOutput::new(result)) + } +} + +async fn run_psql(sql: &str) -> Result { + let mut child = TokioCommand::new("docker") + .args([ + "exec", + "-i", + "jcode-agent-db", + "psql", + "-U", + "jcode_agent", + "-d", + "jcode_agent_workspace", + "-v", + "ON_ERROR_STOP=1", + "-A", // unaligned output + "-t", // tuples only (no headers) + "-q", // quiet + ]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + // Write SQL to stdin + if let Some(mut stdin) = child.stdin.take() { + use tokio::io::AsyncWriteExt; + stdin.write_all(sql.as_bytes()).await?; + stdin.write_all(b"\n").await?; + // stdin is dropped here, closing the pipe + } + + let output = child.wait_with_output().await?; + + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + + if output.status.success() { + if stdout.is_empty() && stderr.is_empty() { + Ok("OK".to_string()) + } else if stdout.is_empty() { + Ok(stderr) + } else { + Ok(stdout) + } + } else { + Err(anyhow::anyhow!( + "psql error (exit {}): {}\n{}", + output.status.code().unwrap_or(-1), + stderr, + stdout + )) + } +} diff --git a/src/tool/mod.rs b/src/tool/mod.rs index 9ce518a8f..c37d419f1 100644 --- a/src/tool/mod.rs +++ b/src/tool/mod.rs @@ -8,6 +8,7 @@ mod browser; mod codesearch; mod communicate; mod conversation_search; +mod db_execute; mod debug_socket; mod edit; mod glob; @@ -186,6 +187,12 @@ impl Registry { Self::insert_tool_timed(&mut m, &mut timings, "gmail", gmail::GmailTool::new); Self::insert_tool_timed(&mut m, &mut timings, "schedule", ambient::ScheduleTool::new); Self::insert_tool_timed(&mut m, &mut timings, "selfdev", selfdev::SelfDevTool::new); + Self::insert_tool_timed( + &mut m, + &mut timings, + "db-execute", + db_execute::DbExecuteTool::new, + ); let nonzero: Vec = timings .iter() .filter(|(_, ms)| *ms > 0) diff --git a/src/tool/todo.rs b/src/tool/todo.rs index d638c611e..021d3474e 100644 --- a/src/tool/todo.rs +++ b/src/tool/todo.rs @@ -3,7 +3,7 @@ use crate::bus::{Bus, BusEvent, TodoEvent}; use crate::todo::{TodoItem, load_todos, save_todos}; use anyhow::Result; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Deserializer}; use serde_json::{Value, json}; pub struct TodoTool; @@ -16,9 +16,58 @@ impl TodoTool { #[derive(Deserialize)] struct TodoInput { + #[serde(default, deserialize_with = "deserialize_todos_flexible")] todos: Option>, } +/// Accept `todos` as any of: +/// - a real JSON array (the schema-correct form) +/// - a JSON-string-encoded array (some LLMs emit `"todos": "[...]"`) +/// - a single object (auto-wrapped into a one-element vec) +/// - `null` / missing (returns `None` → triggers a read) +/// +/// This makes the todo tool robust to common tool-call wire-format quirks +/// across providers without changing the advertised schema. +fn deserialize_todos_flexible<'de, D>(deserializer: D) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + use serde::de::Error; + + let value = Value::deserialize(deserializer)?; + parse_todos_value(value).map_err(D::Error::custom) +} + +fn parse_todos_value(value: Value) -> Result>, String> { + match value { + Value::Null => Ok(None), + Value::Array(_) => serde_json::from_value::>(value) + .map(Some) + .map_err(|e| format!("invalid todos array: {}", e)), + Value::Object(_) => serde_json::from_value::(value) + .map(|item| Some(vec![item])) + .map_err(|e| format!("invalid single-todo object: {}", e)), + Value::String(s) => { + let trimmed = s.trim(); + if trimmed.is_empty() { + return Ok(None); + } + let inner: Value = serde_json::from_str(trimmed) + .map_err(|e| format!("todos string was not valid JSON: {}", e))?; + // Recurse to handle array / object / null inside the string. + parse_todos_value(inner) + } + other => Err(format!( + "todos must be an array, object, or JSON-string; got {}", + match other { + Value::Bool(_) => "bool", + Value::Number(_) => "number", + _ => "unknown", + } + )), + } +} + #[async_trait] impl Tool for TodoTool { fn name(&self) -> &str { @@ -107,6 +156,10 @@ impl Tool for TodoTool { mod tests { use super::*; + fn sample_todo_json() -> Value { + json!({"id":"a","content":"do thing","status":"pending","priority":"high"}) + } + #[test] fn tool_is_named_todo() { assert_eq!(TodoTool::new().name(), "todo"); @@ -123,4 +176,69 @@ mod tests { assert!(props.contains_key("intent")); assert!(props.contains_key("todos")); } + + #[test] + fn accepts_native_array() { + let input = json!({"todos": [sample_todo_json()]}); + let parsed: TodoInput = serde_json::from_value(input).expect("native array"); + let todos = parsed.todos.expect("todos present"); + assert_eq!(todos.len(), 1); + assert_eq!(todos[0].id, "a"); + } + + #[test] + fn accepts_json_string_encoded_array() { + // Some LLMs emit `"todos": "[...]"` instead of `"todos": [...]`. + let encoded = serde_json::to_string(&vec![sample_todo_json()]).unwrap(); + let input = json!({"todos": encoded}); + let parsed: TodoInput = + serde_json::from_value(input).expect("string-encoded array should parse"); + let todos = parsed.todos.expect("todos present"); + assert_eq!(todos.len(), 1); + assert_eq!(todos[0].id, "a"); + } + + #[test] + fn accepts_single_object_and_wraps_in_vec() { + let input = json!({"todos": sample_todo_json()}); + let parsed: TodoInput = serde_json::from_value(input).expect("single object"); + let todos = parsed.todos.expect("todos present"); + assert_eq!(todos.len(), 1); + assert_eq!(todos[0].id, "a"); + } + + #[test] + fn missing_todos_field_is_read_operation() { + let input = json!({}); + let parsed: TodoInput = serde_json::from_value(input).expect("empty"); + assert!(parsed.todos.is_none()); + } + + #[test] + fn null_todos_field_is_read_operation() { + let input = json!({"todos": null}); + let parsed: TodoInput = serde_json::from_value(input).expect("null"); + assert!(parsed.todos.is_none()); + } + + #[test] + fn empty_string_todos_is_read_operation() { + let input = json!({"todos": ""}); + let parsed: TodoInput = serde_json::from_value(input).expect("empty string"); + assert!(parsed.todos.is_none()); + } + + #[test] + fn rejects_non_json_string() { + let input = json!({"todos": "not json at all"}); + let result: Result = serde_json::from_value(input); + assert!(result.is_err(), "non-JSON string should be rejected"); + } + + #[test] + fn rejects_number_todos() { + let input = json!({"todos": 42}); + let result: Result = serde_json::from_value(input); + assert!(result.is_err(), "scalar todos should be rejected"); + } } diff --git a/src/tui/backend.rs b/src/tui/backend.rs index b7e942432..4d881e360 100644 --- a/src/tui/backend.rs +++ b/src/tui/backend.rs @@ -803,6 +803,14 @@ impl RemoteConnection { /// Read the next event from the server. pub async fn next_event(&mut self) -> RemoteRead { + // Tolerate up to this many consecutive malformed frames before + // giving up. A single corrupt line on the wire used to take down + // every attached session, but skipping the bad frame and resyncing + // at the next `\n` boundary lets the connection survive a benign + // glitch (e.g. an unescaped newline from a buggy emitter, a stale + // half-buffer from a reload, etc.). + const MAX_CONSECUTIVE_MALFORMED_FRAMES: u32 = 16; + let mut malformed_in_a_row: u32 = 0; loop { self.line_buffer.clear(); match self.reader.read_line(&mut self.line_buffer).await { @@ -822,15 +830,40 @@ impl RemoteConnection { continue; } match serde_json::from_str(&self.line_buffer) { - Ok(event) => return RemoteRead::Event(event), + Ok(event) => { + malformed_in_a_row = 0; + return RemoteRead::Event(event); + } Err(error) => { + malformed_in_a_row = malformed_in_a_row.saturating_add(1); + // Truncate the dump so the log isn't flooded by + // multi-KB SwarmStatus payloads. + let preview: String = self + .line_buffer + .chars() + .take(240) + .collect(); crate::logging::warn(&format!( - "RemoteConnection::next_event: protocol error={} line={:?} (session_id={:?}, client_instance_id={:?})", - error, self.line_buffer, self.session_id, self.client_instance_id - )); - return RemoteRead::Disconnected(RemoteDisconnectReason::Protocol( - error.to_string(), + "RemoteConnection::next_event: protocol error={} (consecutive_malformed={}, len={}) line_preview={:?} (session_id={:?}, client_instance_id={:?})", + error, + malformed_in_a_row, + self.line_buffer.len(), + preview, + self.session_id, + self.client_instance_id )); + if malformed_in_a_row >= MAX_CONSECUTIVE_MALFORMED_FRAMES { + crate::logging::error(&format!( + "RemoteConnection::next_event: {} consecutive malformed frames; giving up to avoid a busy loop", + malformed_in_a_row + )); + return RemoteRead::Disconnected( + RemoteDisconnectReason::Protocol(error.to_string()), + ); + } + // Skip this corrupt frame and resync at the next + // `\n` instead of tearing down the session. + continue; } } }