From 98c27111f8463a4167fffdad9bac833819f3a031 Mon Sep 17 00:00:00 2001 From: Jager Cooper <100608609+Zephyr709@users.noreply.github.com> Date: Thu, 14 May 2026 02:13:21 -0230 Subject: [PATCH 1/9] =?UTF-8?q?feat:=20agent=20DB=20substrate=20first=20sl?= =?UTF-8?q?ice=20=E2=80=94=20docker-compose=20+=20db-execute=20tool?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - docker-compose.agent-db.yml: postgres:16-alpine on port 5432, well-known localhost creds - db-execute tool: agents execute scoped SQL via docker exec psql into their session schema - Schema-per-session isolation (agent_ convention) - Session-id sanitization: lowercase alphanumeric with leading-letter guarantee - Compiles clean, passes end-to-end smoke test with CRUD + cross-schema namespace isolation Part of goal: autonomous-local-neon-postgres-analytics-substrate-for-agents --- docker-compose.agent-db.yml | 26 +++++++ src/tool/db_execute.rs | 146 ++++++++++++++++++++++++++++++++++++ src/tool/mod.rs | 7 ++ 3 files changed, 179 insertions(+) create mode 100644 docker-compose.agent-db.yml create mode 100644 src/tool/db_execute.rs diff --git a/docker-compose.agent-db.yml b/docker-compose.agent-db.yml new file mode 100644 index 000000000..2436b2dde --- /dev/null +++ b/docker-compose.agent-db.yml @@ -0,0 +1,26 @@ +# Agent DB Substrate — local Postgres for agent analytics +# Well-known credentials; only accessible from localhost. +# Start: docker compose -f docker-compose.agent-db.yml up -d +# Stop: docker compose -f docker-compose.agent-db.yml down -v + +services: + postgres: + image: postgres:16-alpine + container_name: jcode-agent-db + restart: unless-stopped + ports: + - "5432:5432" + environment: + POSTGRES_USER: jcode_agent + POSTGRES_PASSWORD: jcode_agent_local + POSTGRES_DB: jcode_agent_workspace + volumes: + - jcode_agent_db_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U jcode_agent -d jcode_agent_workspace"] + interval: 5s + timeout: 3s + retries: 5 + +volumes: + jcode_agent_db_data: diff --git a/src/tool/db_execute.rs b/src/tool/db_execute.rs new file mode 100644 index 000000000..cc342b05b --- /dev/null +++ b/src/tool/db_execute.rs @@ -0,0 +1,146 @@ +use super::{Tool, ToolContext, ToolOutput}; +use anyhow::Result; +use async_trait::async_trait; +use serde::Deserialize; +use serde_json::{Value, json}; +use std::process::Stdio; +use tokio::process::Command as TokioCommand; + +const DB_EXECUTE_DESCRIPTION: &str = "Execute a SQL statement against the agent's local Postgres database. The statement is scoped to the agent's own schema. Use for CREATE TABLE, INSERT, UPDATE, DELETE, SELECT, DROP TABLE, etc. For queries that may return large results, limit with SQL clauses."; + +pub struct DbExecuteTool; + +impl DbExecuteTool { + pub fn new() -> Self { + Self + } +} + +#[derive(Deserialize)] +struct DbExecuteInput { + sql: String, +} + +/// Build a db-execute tool that scopes SQL to the agent's session schema. +/// The container and credentials are well-known localhost defaults. +fn agent_schema_name(session_id: &str) -> String { + // Sanitize: schema names must start with a letter or underscore, + // contain only lowercase letters, digits, and underscores, and be <= 63 chars. + let sanitized: String = session_id + .chars() + .map(|c| { + if c.is_ascii_alphanumeric() || c == '_' { + c.to_ascii_lowercase() + } else { + '_' + } + }) + .collect(); + // Ensure it starts with a letter + let prefixed = if sanitized.starts_with(|c: char| c.is_ascii_alphabetic()) { + sanitized + } else { + format!("a_{}", sanitized) + }; + // Truncate to 30 chars, then add "agent_" prefix (fits within 63-char limit) + let short: String = prefixed.chars().take(30).collect(); + format!("agent_{}", short) +} + +fn provision_schema_sql(schema: &str) -> String { + format!( + "CREATE SCHEMA IF NOT EXISTS {schema};\nSET search_path TO {schema};" + ) +} + +#[async_trait] +impl Tool for DbExecuteTool { + fn name(&self) -> &str { + "db-execute" + } + + fn description(&self) -> &str { + DB_EXECUTE_DESCRIPTION + } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "required": ["sql"], + "properties": { + "intent": super::intent_schema_property(), + "sql": { + "type": "string", + "description": "SQL statement to execute. Scoped to agent's schema." + } + } + }) + } + + async fn execute(&self, input: Value, ctx: ToolContext) -> Result { + let params: DbExecuteInput = serde_json::from_value(input)?; + let schema = agent_schema_name(&ctx.session_id); + + let full_sql = format!( + "{}\n{}", + provision_schema_sql(&schema), + params.sql.trim() + ); + + let result = run_psql(&full_sql).await?; + Ok(ToolOutput::new(result)) + } +} + +async fn run_psql(sql: &str) -> Result { + let mut child = TokioCommand::new("docker") + .args([ + "exec", + "-i", + "jcode-agent-db", + "psql", + "-U", + "jcode_agent", + "-d", + "jcode_agent_workspace", + "-v", + "ON_ERROR_STOP=1", + "-A", // unaligned output + "-t", // tuples only (no headers) + "-q", // quiet + ]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + // Write SQL to stdin + if let Some(mut stdin) = child.stdin.take() { + use tokio::io::AsyncWriteExt; + stdin.write_all(sql.as_bytes()).await?; + stdin.write_all(b"\n").await?; + // stdin is dropped here, closing the pipe + } + + let output = child.wait_with_output().await?; + + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + + if output.status.success() { + if stdout.is_empty() && stderr.is_empty() { + Ok("OK".to_string()) + } else if stdout.is_empty() { + Ok(stderr) + } else { + Ok(stdout) + } + } else { + Err(anyhow::anyhow!( + "psql error (exit {}): {}\n{}", + output.status.code().unwrap_or(-1), + stderr, + stdout + )) + } +} diff --git a/src/tool/mod.rs b/src/tool/mod.rs index 9ce518a8f..63d5a1b3d 100644 --- a/src/tool/mod.rs +++ b/src/tool/mod.rs @@ -7,6 +7,7 @@ mod bg; mod browser; mod codesearch; mod communicate; +mod db_execute; mod conversation_search; mod debug_socket; mod edit; @@ -186,6 +187,12 @@ impl Registry { Self::insert_tool_timed(&mut m, &mut timings, "gmail", gmail::GmailTool::new); Self::insert_tool_timed(&mut m, &mut timings, "schedule", ambient::ScheduleTool::new); Self::insert_tool_timed(&mut m, &mut timings, "selfdev", selfdev::SelfDevTool::new); + Self::insert_tool_timed( + &mut m, + &mut timings, + "db-execute", + db_execute::DbExecuteTool::new, + ); let nonzero: Vec = timings .iter() .filter(|(_, ms)| *ms > 0) From 93af153692e35fe068742fb74f42894a8a34b01c Mon Sep 17 00:00:00 2001 From: Jager Cooper <100608609+Zephyr709@users.noreply.github.com> Date: Thu, 14 May 2026 23:18:43 -0230 Subject: [PATCH 2/9] fix: isolate agent db execute schemas --- src/tool/db_execute.rs | 29 ++++++++++++++++++++++------- src/tool/mod.rs | 2 +- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/src/tool/db_execute.rs b/src/tool/db_execute.rs index cc342b05b..f879b77bd 100644 --- a/src/tool/db_execute.rs +++ b/src/tool/db_execute.rs @@ -6,7 +6,7 @@ use serde_json::{Value, json}; use std::process::Stdio; use tokio::process::Command as TokioCommand; -const DB_EXECUTE_DESCRIPTION: &str = "Execute a SQL statement against the agent's local Postgres database. The statement is scoped to the agent's own schema. Use for CREATE TABLE, INSERT, UPDATE, DELETE, SELECT, DROP TABLE, etc. For queries that may return large results, limit with SQL clauses."; +const DB_EXECUTE_DESCRIPTION: &str = "Execute a SQL statement against the agent's local Postgres database. Statements run as a per-session role that owns the session's schema; agents cannot access other sessions' data. Use for CREATE TABLE, INSERT, UPDATE, DELETE, SELECT, DROP TABLE, etc. For queries that may return large results, limit with SQL clauses."; pub struct DbExecuteTool; @@ -47,9 +47,24 @@ fn agent_schema_name(session_id: &str) -> String { format!("agent_{}", short) } -fn provision_schema_sql(schema: &str) -> String { +fn provision_role_and_schema_sql(schema: &str) -> String { + // Creates a NOLOGIN role for the session (if missing), grants it to + // jcode_agent, creates/owns the schema, and sets the effective role + // + search_path. All SQL from the agent runs as this per-session role, + // which owns its schema but has no USAGE on any other agent's schema. format!( - "CREATE SCHEMA IF NOT EXISTS {schema};\nSET search_path TO {schema};" + "DO $$\n\ + BEGIN\n\ + IF NOT EXISTS (SELECT FROM pg_catalog.pg_roles WHERE rolname = '{schema}') THEN\n\ + CREATE ROLE {schema} NOLOGIN;\n\ + END IF;\n\ + END\n\ + $$;\n\ + GRANT {schema} TO jcode_agent;\n\ + CREATE SCHEMA IF NOT EXISTS {schema} AUTHORIZATION {schema};\n\ + ALTER SCHEMA {schema} OWNER TO {schema};\n\ + SET ROLE {schema};\n\ + SET search_path TO {schema};" ) } @@ -83,7 +98,7 @@ impl Tool for DbExecuteTool { let full_sql = format!( "{}\n{}", - provision_schema_sql(&schema), + provision_role_and_schema_sql(&schema), params.sql.trim() ); @@ -105,9 +120,9 @@ async fn run_psql(sql: &str) -> Result { "jcode_agent_workspace", "-v", "ON_ERROR_STOP=1", - "-A", // unaligned output - "-t", // tuples only (no headers) - "-q", // quiet + "-A", // unaligned output + "-t", // tuples only (no headers) + "-q", // quiet ]) .stdin(Stdio::piped()) .stdout(Stdio::piped()) diff --git a/src/tool/mod.rs b/src/tool/mod.rs index 63d5a1b3d..c37d419f1 100644 --- a/src/tool/mod.rs +++ b/src/tool/mod.rs @@ -7,8 +7,8 @@ mod bg; mod browser; mod codesearch; mod communicate; -mod db_execute; mod conversation_search; +mod db_execute; mod debug_socket; mod edit; mod glob; From b8407c64eed481138d27aaab882355f7cdd44b3f Mon Sep 17 00:00:00 2001 From: Jager Cooper <100608609+Zephyr709@users.noreply.github.com> Date: Fri, 15 May 2026 14:28:06 -0230 Subject: [PATCH 3/9] ambient: fall back to headless when visible terminal is missing External wakeups (Discord/Telegram/email injection) previously aborted the whole cycle when [ambient].visible = true but the configured terminal (kitty) was not installed. The error bubbled out of run_cycle() and the ambient runner re-scheduled 2h out, so no response was ever produced. Now run_cycle() wraps run_cycle_visible() in a match, logs a warning, and continues into the headless agent path on failure. This keeps the 'visible by default' UX while guaranteeing forward progress whenever an external directive demands a response. Validated: triggered an ambient cycle with kitty absent; runner logged "Ambient visible: failed to spawn kitty ..., falling back to headless" and the headless agent ran against the Discord directive. --- src/ambient/runner.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/ambient/runner.rs b/src/ambient/runner.rs index 61ec9112e..d6eafd039 100644 --- a/src/ambient/runner.rs +++ b/src/ambient/runner.rs @@ -881,11 +881,20 @@ impl AmbientRunnerHandle { self.set_running_detail("gathering context").await; let (system_prompt, initial_message) = self.build_cycle_context(provider).await?; - // Visible mode: spawn a full TUI instead of running headlessly + // Visible mode: spawn a full TUI instead of running headlessly. + // If the configured terminal is unavailable, fall back to headless so + // external wakeups (Discord/Telegram/email) still produce a cycle. if visible { - return self - .run_cycle_visible(started_at, system_prompt, initial_message) - .await; + match self + .run_cycle_visible(started_at, system_prompt.clone(), initial_message.clone()) + .await + { + Ok(result) => return Ok(result), + Err(e) => logging::warn(&format!( + "Ambient visible cycle failed ({}), falling back to headless", + e + )), + } } // Headless mode: run agent directly From 076225e803f1c6165f6d906cc84db06e3edb9a86 Mon Sep 17 00:00:00 2001 From: Jager Cooper <100608609+Zephyr709@users.noreply.github.com> Date: Fri, 15 May 2026 15:25:38 -0230 Subject: [PATCH 4/9] ambient: headless cross-provider failover after agent error Previously, when an ambient cycle's agent turn errored with a provider failover prompt (e.g. OpenAI rate-limit caused the multi-provider stack to surface a ProviderFailoverPrompt-as-error), the headless runner had no equivalent of the TUI's auto-switch countdown. It just fell into the continuation-prompt path, retrying against the same exhausted provider and ultimately force-ending the cycle without progress. This adds a single-shot headless failover step between the initial agent turn and the continuation prompt: 1. Inspect the run_result Err for a ProviderFailoverPrompt (cheap pure helper, unit tested). 2. If present, call Provider::switch_active_provider_to(prompt.to_provider). 3. Re-run the original initial_message on the new active provider. 4. If that produces a cycle_result, return it; otherwise fall through to the existing continuation logic. Bounded blast radius: we attempt at most one switch per cycle. The existing same-provider account failover inside MultiProvider already runs first, so this only triggers after the provider stack has already exhausted account rotation and decided to surface a cross-provider prompt. Tests: - parse_failover_target_extracts_provider_from_failover_prompt_error - parse_failover_target_returns_none_for_unrelated_errors - parse_failover_target_returns_none_for_ok_results - ambient::runner tests still pass (5/5) - jcode-provider-core failover tests still pass (4/4) Validation: - cargo build --release: clean - cargo test --lib ambient::runner: 5/5 pass - cargo test parse_failover_target: 3/3 pass --- src/ambient/runner.rs | 90 +++++++++++++++++++++++++++++++++++++ src/ambient/runner_tests.rs | 28 ++++++++++++ 2 files changed, 118 insertions(+) diff --git a/src/ambient/runner.rs b/src/ambient/runner.rs index d6eafd039..d90996347 100644 --- a/src/ambient/runner.rs +++ b/src/ambient/runner.rs @@ -28,6 +28,75 @@ use tokio::sync::{Notify, RwLock}; const MAX_IDLE_POLL_SECS: u64 = 30; +/// Pure helper: given the error returned by an ambient agent turn, decide +/// whether the runner should attempt a provider switch and what label to +/// switch to. Returns `Some(target_provider_id)` when the error carries a +/// failover prompt; `None` otherwise. +/// +/// Split out from [`try_headless_provider_failover`] so it can be unit +/// tested without constructing a real `Agent`/`Provider`. +fn parse_failover_target(run_result: &anyhow::Result) -> Option { + let err = run_result.as_ref().err()?; + let prompt = crate::provider::parse_failover_prompt_message(&err.to_string())?; + Some(prompt.to_provider) +} + +/// Try to recover from a provider-level failure during a headless ambient cycle +/// by switching providers once and re-running the original message. +/// +/// Returns `true` if a switch + retry was attempted (regardless of outcome). +/// The caller is responsible for inspecting `ambient_tools::take_cycle_result()` +/// after this returns; if the retry produced a result, it will be picked up +/// by the same code path that handles a normal successful turn. +/// +/// We intentionally cap this at a single retry per cycle to avoid burning +/// through every configured account/provider in one wake-up when an outage +/// is global rather than per-provider. +async fn try_headless_provider_failover( + provider: &Arc, + agent: &mut Agent, + run_result: &anyhow::Result, + initial_message: &str, +) -> bool { + let Some(target_provider) = parse_failover_target(run_result) else { + return false; + }; + // We re-parse here for the human-readable labels in the log line; the + // pure helper above only returns the machine-readable target id. + let prompt_for_log = run_result + .as_ref() + .err() + .and_then(|e| crate::provider::parse_failover_prompt_message(&e.to_string())); + match provider.switch_active_provider_to(&target_provider) { + Ok(()) => { + if let Some(prompt) = prompt_for_log { + logging::info(&format!( + "Ambient cycle: headless failover {} → {} (reason: {}); retrying initial message", + prompt.from_label, prompt.to_label, prompt.reason + )); + } else { + logging::info(&format!( + "Ambient cycle: headless failover → {}; retrying initial message", + target_provider + )); + } + } + Err(switch_err) => { + logging::warn(&format!( + "Ambient cycle: headless failover to {} failed: {}", + target_provider, switch_err + )); + return false; + } + } + // Re-issue the original user message on the new provider. The agent's + // conversation history already contains the failed turn, which the new + // provider will see as prior assistant context; that's intentional, since + // the failover prompt explicitly notes the input would be resent. + let _ = agent.run_once_capture(initial_message).await; + true +} + /// Shared ambient runner state, accessible from the server, debug socket, and TUI. #[derive(Clone)] pub struct AmbientRunnerHandle { @@ -936,6 +1005,27 @@ impl AmbientRunnerHandle { }); } + // If the initial turn errored with a provider-failover prompt (e.g. the + // active provider hit its rate cap), try switching providers once and + // re-running the original message. In headless mode there is no TUI to + // surface the countdown, so this is the analogue of the TUI's auto- + // switch path. A single retry keeps the blast radius bounded. + let attempted_failover = + try_headless_provider_failover(provider, &mut agent, &run_result, &initial_message) + .await; + + if attempted_failover && let Some(result) = ambient_tools::take_cycle_result() { + ambient_tools::unregister_ambient_session(&ambient_session_id); + let conversation = agent.export_conversation_markdown(); + agent.mark_closed(); + return Ok(AmbientCycleResult { + started_at, + ended_at: Utc::now(), + conversation: Some(conversation), + ..result + }); + } + // Agent didn't call end_ambient_cycle - try continuation if run_result.is_err() { logging::warn("Ambient cycle: agent error without calling end_ambient_cycle"); diff --git a/src/ambient/runner_tests.rs b/src/ambient/runner_tests.rs index 94146aaa0..75800c393 100644 --- a/src/ambient/runner_tests.rs +++ b/src/ambient/runner_tests.rs @@ -1,4 +1,5 @@ use super::AmbientRunnerHandle; +use super::parse_failover_target; use crate::ambient::{Priority, ScheduleTarget, ScheduledItem}; use crate::message::{Message, Role, StreamEvent, ToolDefinition}; use crate::provider::{EventStream, Provider}; @@ -15,6 +16,33 @@ struct EnvVarGuard { prev: Option, } +#[test] +fn parse_failover_target_extracts_provider_from_failover_prompt_error() { + let prompt = crate::provider::ProviderFailoverPrompt { + from_provider: "openai".to_string(), + from_label: "OpenAI".to_string(), + to_provider: "claude".to_string(), + to_label: "Anthropic".to_string(), + reason: "rate limit reached".to_string(), + estimated_input_chars: 1234, + estimated_input_tokens: 567, + }; + let err: anyhow::Result = Err(anyhow::anyhow!(prompt.to_error_message())); + assert_eq!(parse_failover_target(&err).as_deref(), Some("claude")); +} + +#[test] +fn parse_failover_target_returns_none_for_unrelated_errors() { + let err: anyhow::Result = Err(anyhow::anyhow!("network timeout after 30s")); + assert!(parse_failover_target(&err).is_none()); +} + +#[test] +fn parse_failover_target_returns_none_for_ok_results() { + let ok: anyhow::Result = Ok("done".to_string()); + assert!(parse_failover_target(&ok).is_none()); +} + impl EnvVarGuard { fn set_path(key: &'static str, value: &std::path::Path) -> Self { let prev = std::env::var_os(key); From 386d083a43d550a5d1a190a8b5a64af62741ec58 Mon Sep 17 00:00:00 2001 From: Jager Cooper <100608609+Zephyr709@users.noreply.github.com> Date: Fri, 15 May 2026 15:49:34 -0230 Subject: [PATCH 5/9] tool/todo: accept array, JSON-string-encoded array, or single object Some LLM providers serialize array-typed tool arguments as JSON strings (`"todos": "[...]"`) instead of native arrays. The previous strict deserialize_with caused these calls to fail with a confusing `expected a sequence` error and silently no-op the todo update. Add a flexible deserializer that accepts: - a real JSON array (schema-correct) - a JSON-string-encoded array (LLM quirk) - a single object (auto-wrapped into a one-element vec) - null / missing / empty string (treated as a read) Reject non-JSON strings and scalars with a clear error. Tests: 8 new cases covering each branch (10 total in tool::todo). --- src/tool/todo.rs | 120 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 119 insertions(+), 1 deletion(-) diff --git a/src/tool/todo.rs b/src/tool/todo.rs index d638c611e..021d3474e 100644 --- a/src/tool/todo.rs +++ b/src/tool/todo.rs @@ -3,7 +3,7 @@ use crate::bus::{Bus, BusEvent, TodoEvent}; use crate::todo::{TodoItem, load_todos, save_todos}; use anyhow::Result; use async_trait::async_trait; -use serde::Deserialize; +use serde::{Deserialize, Deserializer}; use serde_json::{Value, json}; pub struct TodoTool; @@ -16,9 +16,58 @@ impl TodoTool { #[derive(Deserialize)] struct TodoInput { + #[serde(default, deserialize_with = "deserialize_todos_flexible")] todos: Option>, } +/// Accept `todos` as any of: +/// - a real JSON array (the schema-correct form) +/// - a JSON-string-encoded array (some LLMs emit `"todos": "[...]"`) +/// - a single object (auto-wrapped into a one-element vec) +/// - `null` / missing (returns `None` → triggers a read) +/// +/// This makes the todo tool robust to common tool-call wire-format quirks +/// across providers without changing the advertised schema. +fn deserialize_todos_flexible<'de, D>(deserializer: D) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + use serde::de::Error; + + let value = Value::deserialize(deserializer)?; + parse_todos_value(value).map_err(D::Error::custom) +} + +fn parse_todos_value(value: Value) -> Result>, String> { + match value { + Value::Null => Ok(None), + Value::Array(_) => serde_json::from_value::>(value) + .map(Some) + .map_err(|e| format!("invalid todos array: {}", e)), + Value::Object(_) => serde_json::from_value::(value) + .map(|item| Some(vec![item])) + .map_err(|e| format!("invalid single-todo object: {}", e)), + Value::String(s) => { + let trimmed = s.trim(); + if trimmed.is_empty() { + return Ok(None); + } + let inner: Value = serde_json::from_str(trimmed) + .map_err(|e| format!("todos string was not valid JSON: {}", e))?; + // Recurse to handle array / object / null inside the string. + parse_todos_value(inner) + } + other => Err(format!( + "todos must be an array, object, or JSON-string; got {}", + match other { + Value::Bool(_) => "bool", + Value::Number(_) => "number", + _ => "unknown", + } + )), + } +} + #[async_trait] impl Tool for TodoTool { fn name(&self) -> &str { @@ -107,6 +156,10 @@ impl Tool for TodoTool { mod tests { use super::*; + fn sample_todo_json() -> Value { + json!({"id":"a","content":"do thing","status":"pending","priority":"high"}) + } + #[test] fn tool_is_named_todo() { assert_eq!(TodoTool::new().name(), "todo"); @@ -123,4 +176,69 @@ mod tests { assert!(props.contains_key("intent")); assert!(props.contains_key("todos")); } + + #[test] + fn accepts_native_array() { + let input = json!({"todos": [sample_todo_json()]}); + let parsed: TodoInput = serde_json::from_value(input).expect("native array"); + let todos = parsed.todos.expect("todos present"); + assert_eq!(todos.len(), 1); + assert_eq!(todos[0].id, "a"); + } + + #[test] + fn accepts_json_string_encoded_array() { + // Some LLMs emit `"todos": "[...]"` instead of `"todos": [...]`. + let encoded = serde_json::to_string(&vec![sample_todo_json()]).unwrap(); + let input = json!({"todos": encoded}); + let parsed: TodoInput = + serde_json::from_value(input).expect("string-encoded array should parse"); + let todos = parsed.todos.expect("todos present"); + assert_eq!(todos.len(), 1); + assert_eq!(todos[0].id, "a"); + } + + #[test] + fn accepts_single_object_and_wraps_in_vec() { + let input = json!({"todos": sample_todo_json()}); + let parsed: TodoInput = serde_json::from_value(input).expect("single object"); + let todos = parsed.todos.expect("todos present"); + assert_eq!(todos.len(), 1); + assert_eq!(todos[0].id, "a"); + } + + #[test] + fn missing_todos_field_is_read_operation() { + let input = json!({}); + let parsed: TodoInput = serde_json::from_value(input).expect("empty"); + assert!(parsed.todos.is_none()); + } + + #[test] + fn null_todos_field_is_read_operation() { + let input = json!({"todos": null}); + let parsed: TodoInput = serde_json::from_value(input).expect("null"); + assert!(parsed.todos.is_none()); + } + + #[test] + fn empty_string_todos_is_read_operation() { + let input = json!({"todos": ""}); + let parsed: TodoInput = serde_json::from_value(input).expect("empty string"); + assert!(parsed.todos.is_none()); + } + + #[test] + fn rejects_non_json_string() { + let input = json!({"todos": "not json at all"}); + let result: Result = serde_json::from_value(input); + assert!(result.is_err(), "non-JSON string should be rejected"); + } + + #[test] + fn rejects_number_todos() { + let input = json!({"todos": 42}); + let result: Result = serde_json::from_value(input); + assert!(result.is_err(), "scalar todos should be rejected"); + } } From e1f5e4e31c71642998705fc6292d8eee27dc907c Mon Sep 17 00:00:00 2001 From: Jager Cooper <100608609+Zephyr709@users.noreply.github.com> Date: Fri, 15 May 2026 19:02:36 -0230 Subject: [PATCH 6/9] fix config profile rehydration and agents dedupe --- src/cli/provider_init.rs | 3 ++ src/prompt.rs | 15 ++++++--- src/prompt_tests.rs | 24 +++++++++++++++ src/provider/startup.rs | 9 ++++++ src/provider_catalog.rs | 23 ++++++++++++++ src/provider_catalog_tests.rs | 58 +++++++++++++++++++++++++++++++++++ 6 files changed, 128 insertions(+), 4 deletions(-) diff --git a/src/cli/provider_init.rs b/src/cli/provider_init.rs index 09616f367..8b76502f7 100644 --- a/src/cli/provider_init.rs +++ b/src/cli/provider_init.rs @@ -1225,6 +1225,9 @@ async fn init_provider_with_options( crate::env::set_var("JCODE_PROVIDER_PROFILE_ACTIVE", "1"); } + let cfg = crate::config::config(); + crate::provider_catalog::rehydrate_active_named_provider_profile_env_from_config(cfg)?; + if std::env::var_os("JCODE_PROVIDER_PROFILE_ACTIVE").is_none() && std::env::var_os("JCODE_NAMED_PROVIDER_PROFILE").is_none() { diff --git a/src/prompt.rs b/src/prompt.rs index 0431a468d..1cfce0583 100644 --- a/src/prompt.rs +++ b/src/prompt.rs @@ -556,6 +556,7 @@ fn gpu_summary() -> Option { /// Load AGENTS.md files from a specific working directory pub fn load_agents_md_files_from_dir(working_dir: Option<&Path>) -> (Option, ContextInfo) { let mut contents = vec![]; + let mut loaded_paths = Vec::new(); let mut info = ContextInfo::default(); // Helper to load a file if it exists, returns (formatted_content, raw_size) @@ -573,17 +574,23 @@ pub fn load_agents_md_files_from_dir(working_dir: Option<&Path>) -> (Option = None; + match crate::provider_catalog::rehydrate_active_named_provider_profile_env_from_config(cfg) + { + Ok(Some(profile_name)) => default_named_provider_profile = Some(profile_name), + Ok(None) => {} + Err(err) => crate::logging::warn(&format!( + "Failed to rehydrate active provider profile from config: {}", + err + )), + } if std::env::var_os("JCODE_PROVIDER_PROFILE_ACTIVE").is_none() && std::env::var_os("JCODE_NAMED_PROVIDER_PROFILE").is_none() && let Some(pref) = provider_state.default_provider_key() diff --git a/src/provider_catalog.rs b/src/provider_catalog.rs index 61931f1f1..fb87a8f2b 100644 --- a/src/provider_catalog.rs +++ b/src/provider_catalog.rs @@ -480,6 +480,29 @@ pub fn apply_named_provider_profile_env(profile_name: &str) -> anyhow::Result anyhow::Result> { + let profile_name = std::env::var("JCODE_NAMED_PROVIDER_PROFILE") + .ok() + .or_else(|| std::env::var("JCODE_PROVIDER_PROFILE_NAME").ok()) + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()); + + let Some(profile_name) = profile_name else { + return Ok(None); + }; + + if !config.providers.contains_key(&profile_name) { + return Ok(None); + } + + let profile_name = apply_named_provider_profile_env_from_config(&profile_name, config)?; + crate::env::set_var("JCODE_PROVIDER_PROFILE_NAME", &profile_name); + crate::env::set_var("JCODE_PROVIDER_PROFILE_ACTIVE", "1"); + Ok(Some(profile_name)) +} + pub fn apply_named_provider_profile_env_from_config( profile_name: &str, config: &crate::config::Config, diff --git a/src/provider_catalog_tests.rs b/src/provider_catalog_tests.rs index 4c4de00ed..3379327be 100644 --- a/src/provider_catalog_tests.rs +++ b/src/provider_catalog_tests.rs @@ -504,6 +504,64 @@ fn named_provider_profile_maps_to_openai_compatible_runtime_env() { ); } +#[test] +fn active_named_provider_profile_is_rehydrated_from_config_when_env_is_partial() { + let _lock = crate::storage::lock_test_env(); + let _guard = EnvGuard::save(&[ + "JCODE_OPENROUTER_API_BASE", + "JCODE_OPENROUTER_API_KEY_NAME", + "JCODE_OPENROUTER_ENV_FILE", + "JCODE_OPENROUTER_CACHE_NAMESPACE", + "JCODE_OPENROUTER_MODEL", + "JCODE_NAMED_PROVIDER_PROFILE", + "JCODE_PROVIDER_PROFILE_NAME", + "JCODE_PROVIDER_PROFILE_ACTIVE", + ]); + + let cfg: crate::config::Config = toml::from_str( + r#" + [providers.ollama-cloud] + type = "openai-compatible" + base_url = "https://ollama.com/v1" + auth = "bearer" + api_key_env = "OLLAMA_API_KEY" + env_file = "ollama.env" + default_model = "deepseek-v4-pro" + "#, + ) + .expect("config should parse"); + + crate::env::set_var("JCODE_NAMED_PROVIDER_PROFILE", "ollama-cloud"); + crate::env::remove_var("JCODE_OPENROUTER_API_BASE"); + crate::env::remove_var("JCODE_OPENROUTER_API_KEY_NAME"); + crate::env::remove_var("JCODE_OPENROUTER_ENV_FILE"); + + let applied = rehydrate_active_named_provider_profile_env_from_config(&cfg) + .expect("rehydrate should succeed"); + + assert_eq!(applied.as_deref(), Some("ollama-cloud")); + assert_eq!( + std::env::var("JCODE_OPENROUTER_API_BASE").ok().as_deref(), + Some("https://ollama.com/v1") + ); + assert_eq!( + std::env::var("JCODE_OPENROUTER_API_KEY_NAME") + .ok() + .as_deref(), + Some("OLLAMA_API_KEY") + ); + assert_eq!( + std::env::var("JCODE_OPENROUTER_ENV_FILE").ok().as_deref(), + Some("ollama.env") + ); + assert_eq!( + std::env::var("JCODE_PROVIDER_PROFILE_ACTIVE") + .ok() + .as_deref(), + Some("1") + ); +} + #[test] fn named_provider_inline_api_key_is_private_runtime_fallback() { let _lock = crate::storage::lock_test_env(); From 3281e099a2b6fd395981ee4eacb133ff9ddcc578 Mon Sep 17 00:00:00 2001 From: Jager Cooper <100608609+Zephyr709@users.noreply.github.com> Date: Fri, 15 May 2026 19:25:27 -0230 Subject: [PATCH 7/9] auto reload server on config changes --- src/server.rs | 7 ++ src/server/config_watcher.rs | 189 +++++++++++++++++++++++++++++++++++ 2 files changed, 196 insertions(+) create mode 100644 src/server/config_watcher.rs diff --git a/src/server.rs b/src/server.rs index 714522807..5993ca95c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -15,6 +15,7 @@ mod comm_control; mod comm_plan; mod comm_session; mod comm_sync; +mod config_watcher; mod debug; mod debug_ambient; mod debug_command_exec; @@ -913,6 +914,12 @@ impl Server { .await; }); + // Watch config and instruction files from inside the shared daemon. New + // client processes often attach to an already-running server, so relying + // on the launcher to re-read config is not enough. A changed config must + // make the daemon exec a fresh copy before future sessions are created. + config_watcher::spawn_config_reload_watcher(self.identity.git_hash.clone()); + // Log when we receive SIGTERM for debugging #[cfg(unix)] { diff --git a/src/server/config_watcher.rs b/src/server/config_watcher.rs new file mode 100644 index 000000000..1c4a4c07f --- /dev/null +++ b/src/server/config_watcher.rs @@ -0,0 +1,189 @@ +use std::collections::HashSet; +use std::path::{Path, PathBuf}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +const CONFIG_RELOAD_WATCH_INTERVAL: Duration = Duration::from_secs(2); +const CONFIG_RELOAD_SETTLE_DELAY: Duration = Duration::from_millis(500); + +#[derive(Debug, Clone, PartialEq, Eq)] +struct WatchedFileState { + path: PathBuf, + exists: bool, + modified: Option, + len: Option, +} + +pub(super) fn spawn_config_reload_watcher(server_git_hash: String) { + if config_reload_watcher_disabled_by_env() { + crate::logging::info("Server config reload watcher disabled by JCODE_CONFIG_AUTO_RELOAD=0"); + return; + } + + tokio::spawn(async move { + run_config_reload_watcher(server_git_hash).await; + }); +} + +async fn run_config_reload_watcher(server_git_hash: String) { + let mut previous = watched_config_state(); + let mut interval = tokio::time::interval(CONFIG_RELOAD_WATCH_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick().await; + let current = watched_config_state(); + if current == previous { + continue; + } + + tokio::time::sleep(CONFIG_RELOAD_SETTLE_DELAY).await; + let settled = watched_config_state(); + previous = settled.clone(); + + // Force the reloadable config cache to observe the changed file before + // deciding whether process reloads are enabled. This means flipping + // display.auto_server_reload to false takes effect without restarting. + if !config_auto_server_reload_enabled() { + crate::logging::info( + "Jcode config/instruction files changed; server auto reload is disabled by display.auto_server_reload=false", + ); + continue; + } + + let changed_paths = format_watched_paths(&settled); + crate::logging::info(&format!( + "Jcode config/instruction files changed ({}); reloading shared server so future sessions use fresh configuration", + changed_paths + )); + let reload_hash = format!( + "{}:config:{}", + server_git_hash, + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_millis()) + .unwrap_or_default() + ); + super::send_reload_signal(reload_hash, None, false); + } +} + +fn config_auto_server_reload_enabled() -> bool { + if config_reload_watcher_disabled_by_env() { + return false; + } + crate::config::config().display.auto_server_reload +} + +fn config_reload_watcher_disabled_by_env() -> bool { + std::env::var("JCODE_CONFIG_AUTO_RELOAD") + .ok() + .map(|value| { + let value = value.trim(); + value == "0" || value.eq_ignore_ascii_case("false") || value.eq_ignore_ascii_case("off") + }) + .unwrap_or(false) +} + +fn watched_config_state() -> Vec { + watched_config_paths() + .into_iter() + .map(|path| watched_file_state(path.as_path())) + .collect() +} + +fn watched_config_paths() -> Vec { + let mut paths = Vec::new(); + + if let Some(path) = crate::config::Config::path() { + paths.push(path); + } + + if let Ok(jcode_dir) = crate::storage::jcode_dir() { + paths.push(jcode_dir.join("prompt-overlay.md")); + } + + if let Ok(home_agents) = crate::storage::user_home_path("AGENTS.md") { + paths.push(home_agents); + } + + if let Ok(current_dir) = std::env::current_dir() { + paths.push(current_dir.join("AGENTS.md")); + paths.push(current_dir.join(".jcode").join("prompt-overlay.md")); + } + + dedupe_paths(paths) +} + +fn watched_file_state(path: &Path) -> WatchedFileState { + let metadata = std::fs::metadata(path).ok(); + WatchedFileState { + path: path.to_path_buf(), + exists: metadata.is_some(), + modified: metadata + .as_ref() + .and_then(|metadata| metadata.modified().ok()), + len: metadata.as_ref().map(std::fs::Metadata::len), + } +} + +fn dedupe_paths(paths: Vec) -> Vec { + let mut seen = HashSet::new(); + let mut deduped = Vec::new(); + + for path in paths { + let key = path + .canonicalize() + .unwrap_or_else(|_| path.clone()) + .to_string_lossy() + .to_string(); + if seen.insert(key) { + deduped.push(path); + } + } + + deduped +} + +fn format_watched_paths(states: &[WatchedFileState]) -> String { + states + .iter() + .filter(|state| state.exists) + .map(|state| state.path.display().to_string()) + .collect::>() + .join(", ") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn watched_file_state_changes_when_file_is_created() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = dir.path().join("config.toml"); + + let before = watched_file_state(&path); + std::fs::write(&path, "[display]\nauto_server_reload = true\n").expect("write config"); + let after = watched_file_state(&path); + + assert!(!before.exists); + assert!(after.exists); + assert_ne!(before, after); + } + + #[test] + fn dedupe_paths_collapses_symlinked_instruction_files() { + let dir = tempfile::tempdir().expect("tempdir"); + let target = dir.path().join("AGENTS.md"); + let link = dir.path().join(".AGENTS.md"); + std::fs::write(&target, "instructions").expect("write target"); + + #[cfg(unix)] + std::os::unix::fs::symlink(&target, &link).expect("symlink"); + #[cfg(windows)] + std::os::windows::fs::symlink_file(&target, &link).expect("symlink"); + + let deduped = dedupe_paths(vec![target.clone(), link]); + assert_eq!(deduped, vec![target]); + } +} From 266e8759cbbd94af56080f8bce9a2c35f6724a36 Mon Sep 17 00:00:00 2001 From: Jager Cooper <100608609+Zephyr709@users.noreply.github.com> Date: Thu, 21 May 2026 10:56:38 -0230 Subject: [PATCH 8/9] fix(server): eliminate multi-session protocol corruption Root cause: several server code paths used the singular fallback member.event_tx directly instead of fanning out to all member.event_txs, AND register/unregister/fanout_session_event silently overwrote member.event_tx to point at whichever connection's writer happened to be touched last. With multiple attached clients on a shared-server (swarm + multi-window), a send intended for one client's writer could land on another client's writer mid-line, splicing event tails into unrelated frames. The receiving client saw 'expected value at line 1 column 1' protocol errors and tore the session down with a non-retryable error. Symptoms in production logs (~/.jcode/logs/jcode-2026-05-21.log): - 'Remote protocol error is not retryable; stopping reconnect loop' - truncated/interleaved JSON: line="role\":\"agent\"... - mass session crashes within ~100ms of each other - repeated Claude OAuth preflight after every forced reconnect Fix: - register_session_event_sender: only adopt new sender as singular fallback when existing fallback is closed. - unregister_session_event_sender: do not silently re-point member.event_tx to a surviving connection. - fanout_session_event: snapshot all live attachments without mutating the singular fallback. - comm_plan/comm_control/debug_swarm_write/swarm/client_session: route via super::fanout_session_event instead of direct member.event_tx.send, dropping read locks before fanout acquires the write lock. Regression tests added in src/server/state.rs:: multi_connection_protocol_tests (5 tests, all green) cover register/unregister/fanout semantics that previously caused the cross-connection writer corruption. Full suite: 188 passed, 0 failed (server::), including the swarm_persistence tests that were failing on the prior commit. --- src/server/client_session.rs | 28 +-- src/server/comm_control.rs | 43 ++--- src/server/comm_plan.rs | 22 +-- src/server/debug_swarm_write.rs | 105 +++++++---- src/server/state.rs | 255 +++++++++++++++++++++++++- src/server/swarm.rs | 39 ++-- src/server/swarm_persistence_tests.rs | 14 +- 7 files changed, 397 insertions(+), 109 deletions(-) diff --git a/src/server/client_session.rs b/src/server/client_session.rs index 1e6b635d3..b861254f4 100644 --- a/src/server/client_session.rs +++ b/src/server/client_session.rs @@ -470,18 +470,22 @@ pub(super) async fn handle_subscribe( } } if let Some(new_id) = new_coordinator.clone() { - let members = swarm_members.read().await; - if let Some(member) = members.get(&new_id) { - let _ = member.event_tx.send(ServerEvent::Notification { - from_session: new_id.clone(), - from_name: member.friendly_name.clone(), - notification_type: NotificationType::Message { - scope: Some("swarm".to_string()), - channel: None, - }, - message: "You are now the coordinator for this swarm.".to_string(), - }); - } + let friendly_name = { + let members = swarm_members.read().await; + members + .get(&new_id) + .and_then(|member| member.friendly_name.clone()) + }; + let event = ServerEvent::Notification { + from_session: new_id.clone(), + from_name: friendly_name, + notification_type: NotificationType::Message { + scope: Some("swarm".to_string()), + channel: None, + }, + message: "You are now the coordinator for this swarm.".to_string(), + }; + let _ = super::fanout_session_event(swarm_members, &new_id, event).await; } } } diff --git a/src/server/comm_control.rs b/src/server/comm_control.rs index 594e35f2b..b6995ee51 100644 --- a/src/server/comm_control.rs +++ b/src/server/comm_control.rs @@ -1066,17 +1066,16 @@ pub(super) async fn handle_comm_assign_task( sessions, ) .await; - if let Some(member) = swarm_members.read().await.get(&target_session) { - let _ = member.event_tx.send(ServerEvent::Notification { - from_session: req_session_id.clone(), - from_name: coordinator_name.clone(), - notification_type: NotificationType::Message { - scope: Some("dm".to_string()), - channel: None, - }, - message: notification, - }); - } + let dm_event = ServerEvent::Notification { + from_session: req_session_id.clone(), + from_name: coordinator_name.clone(), + notification_type: NotificationType::Message { + scope: Some("dm".to_string()), + channel: None, + }, + message: notification, + }; + let _ = super::fanout_session_event(swarm_members, &target_session, dm_event).await; let target_has_client = { let connections = client_connections.read().await; @@ -1116,22 +1115,20 @@ pub(super) async fn handle_comm_assign_task( "Plan updated: task '{}' assigned to {}.", selected_task_id, target_session ); - let members = swarm_members.read().await; for sid in participant_ids { if sid == target_session || sid == req_session_id { continue; } - if let Some(member) = members.get(&sid) { - let _ = member.event_tx.send(ServerEvent::Notification { - from_session: req_session_id.clone(), - from_name: coordinator_name.clone(), - notification_type: NotificationType::Message { - scope: Some("plan".to_string()), - channel: None, - }, - message: plan_msg.clone(), - }); - } + let plan_event = ServerEvent::Notification { + from_session: req_session_id.clone(), + from_name: coordinator_name.clone(), + notification_type: NotificationType::Message { + scope: Some("plan".to_string()), + channel: None, + }, + message: plan_msg.clone(), + }; + let _ = super::fanout_session_event(swarm_members, &sid, plan_event).await; } finish_swarm_mutation_request( diff --git a/src/server/comm_plan.rs b/src/server/comm_plan.rs index 5a9b9cc70..35f018823 100644 --- a/src/server/comm_plan.rs +++ b/src/server/comm_plan.rs @@ -97,7 +97,6 @@ pub(super) async fn handle_comm_propose_plan( (plan.version, plan.participants.clone()) }; - let members = swarm_members.read().await; let notification_msg = format!( "Plan updated by {} ({} items, v{})", from_label, @@ -108,17 +107,16 @@ pub(super) async fn handle_comm_propose_plan( if sid == req_session_id { continue; } - if let Some(member) = members.get(&sid) { - let _ = member.event_tx.send(ServerEvent::Notification { - from_session: req_session_id.clone(), - from_name: from_name.clone(), - notification_type: NotificationType::Message { - scope: Some("plan".to_string()), - channel: None, - }, - message: notification_msg.clone(), - }); - } + let plan_event = ServerEvent::Notification { + from_session: req_session_id.clone(), + from_name: from_name.clone(), + notification_type: NotificationType::Message { + scope: Some("plan".to_string()), + channel: None, + }, + message: notification_msg.clone(), + }; + let _ = super::fanout_session_event(swarm_members, &sid, plan_event).await; let _ = queue_soft_interrupt_for_session( &sid, notification_msg.clone(), diff --git a/src/server/debug_swarm_write.rs b/src/server/debug_swarm_write.rs index 11883fb25..5b4b396fd 100644 --- a/src/server/debug_swarm_write.rs +++ b/src/server/debug_swarm_write.rs @@ -81,29 +81,43 @@ pub(super) async fn maybe_handle_swarm_write_command( }; if let Some(swarm_id) = swarm_id { - let swarms = ctx.swarms_by_id.read().await; - let members = ctx.swarm_members.read().await; - let current_session = ctx.session_id.read().await; - let from_name = members - .get(&*current_session) - .and_then(|member| member.friendly_name.clone()); + // Snapshot recipient list and the from_name under the read locks, + // then drop the locks before calling fanout (which needs the + // swarm_members write lock). + let (recipients, from_name, from_session) = { + let swarms = ctx.swarms_by_id.read().await; + let members = ctx.swarm_members.read().await; + let current_session = ctx.session_id.read().await; + let from_name = members + .get(&*current_session) + .and_then(|member| member.friendly_name.clone()); + let recipients: Vec = swarms + .get(&swarm_id) + .map(|ids| ids.iter().cloned().collect()) + .unwrap_or_default(); + (recipients, from_name, current_session.clone()) + }; - if let Some(member_ids) = swarms.get(&swarm_id) { + if !recipients.is_empty() { let mut sent_count = 0; - for member_id in member_ids { - if let Some(member) = members.get(member_id) { - let notification = ServerEvent::Notification { - from_session: current_session.clone(), - from_name: from_name.clone(), - notification_type: NotificationType::Message { - scope: Some("broadcast".to_string()), - channel: None, - }, - message: message.clone(), - }; - if member.event_tx.send(notification).is_ok() { - sent_count += 1; - } + for member_id in &recipients { + let notification = ServerEvent::Notification { + from_session: from_session.clone(), + from_name: from_name.clone(), + notification_type: NotificationType::Message { + scope: Some("broadcast".to_string()), + channel: None, + }, + message: message.clone(), + }; + let delivered = super::fanout_session_event( + ctx.swarm_members, + member_id, + notification, + ) + .await; + if delivered > 0 { + sent_count += 1; } } return Ok(Some( @@ -134,27 +148,44 @@ pub(super) async fn maybe_handle_swarm_write_command( return Err(anyhow::anyhow!("swarm:notify requires a message")); } - let members = ctx.swarm_members.read().await; - let current_session = ctx.session_id.read().await; - let from_name = members - .get(&*current_session) - .and_then(|member| member.friendly_name.clone()); + // Snapshot from_name and target friendly_name under the read + // lock, then drop it before fanning out. + let (from_name, from_session, target_exists, target_friendly_name) = { + let members = ctx.swarm_members.read().await; + let current_session = ctx.session_id.read().await; + let from_name = members + .get(&*current_session) + .and_then(|member| member.friendly_name.clone()); + let target = members.get(target_session); + ( + from_name, + current_session.clone(), + target.is_some(), + target.and_then(|m| m.friendly_name.clone()), + ) + }; - if let Some(target) = members.get(target_session) { + if target_exists { let notification = ServerEvent::Notification { - from_session: current_session.clone(), - from_name: from_name.clone(), + from_session, + from_name, notification_type: NotificationType::Message { scope: Some("dm".to_string()), channel: None, }, message: message.to_string(), }; - if target.event_tx.send(notification).is_ok() { + let delivered = super::fanout_session_event( + ctx.swarm_members, + target_session, + notification, + ) + .await; + if delivered > 0 { return Ok(Some( serde_json::json!({ "sent_to": target_session, - "sent_to_name": target.friendly_name.clone(), + "sent_to_name": target_friendly_name, "message": message, }) .to_string(), @@ -226,12 +257,9 @@ pub(super) async fn maybe_handle_swarm_write_command( .map(|sessions| sessions.iter().cloned().collect()) .unwrap_or_default() }; - let members = ctx.swarm_members.read().await; for sid in &swarm_session_ids { - if sid != acting_session - && let Some(member) = members.get(sid) - { - let _ = member.event_tx.send(ServerEvent::Notification { + if sid != acting_session { + let notification = ServerEvent::Notification { from_session: acting_session.to_string(), from_name: friendly_name.clone(), notification_type: NotificationType::SharedContext { @@ -239,7 +267,10 @@ pub(super) async fn maybe_handle_swarm_write_command( value: value.clone(), }, message: format!("Shared context: {} = {}", key, value), - }); + }; + let _ = + super::fanout_session_event(ctx.swarm_members, sid, notification) + .await; } } diff --git a/src/server/state.rs b/src/server/state.rs index ef5ccbf68..153f46a1a 100644 --- a/src/server/state.rs +++ b/src/server/state.rs @@ -298,7 +298,15 @@ pub(super) async fn register_session_event_sender( ) { let mut members = swarm_members.write().await; if let Some(member) = members.get_mut(session_id) { - member.event_tx = event_tx.clone(); + // Only adopt this sender as the singular fallback when the existing + // one is closed (e.g. the headless intake or a prior connection has + // gone away). Overwriting an already-live fallback would shift where + // subsequent direct `member.event_tx` users send their payloads, + // which is one of the root causes of cross-connection wire + // corruption observed in production logs. + if member.event_tx.is_closed() { + member.event_tx = event_tx.clone(); + } member.event_txs.insert(connection_id.to_string(), event_tx); } } @@ -311,9 +319,12 @@ pub(super) async fn unregister_session_event_sender( let mut members = swarm_members.write().await; if let Some(member) = members.get_mut(session_id) { member.event_txs.remove(connection_id); - if let Some((_, tx)) = member.event_txs.iter().next() { - member.event_tx = tx.clone(); - } + // Intentionally do NOT silently re-point `member.event_tx` here. The + // singular `event_tx` is a fallback set at member creation (headless + // spawn intake). Silently swapping it to a surviving connection's + // sender means later code that reads `member.event_tx` can deliver + // events to a different connection's writer than intended, which has + // shown up as cross-connection protocol corruption in the wild. } } @@ -333,9 +344,11 @@ pub(super) async fn fanout_session_event( if member.event_txs.is_empty() { vec![member.event_tx.clone()] } else { - if let Some((_, tx)) = member.event_txs.iter().next() { - member.event_tx = tx.clone(); - } + // Snapshot all live attachments. Do not mutate `member.event_tx` + // here: this function is called from many fanout sites and the + // HashMap iteration order is non-deterministic, so re-pointing + // the singular fallback would silently re-route subsequent + // direct uses of `member.event_tx` to an arbitrary connection. member.event_txs.values().cloned().collect::>() } }; @@ -564,3 +577,231 @@ pub(super) async fn queue_soft_interrupt_for_session( }) } } + +#[cfg(test)] +mod multi_connection_protocol_tests { + //! Regression tests for the multi-session protocol corruption that showed + //! up as truncated/interleaved JSON lines on the wire (e.g. + //! `nt","is_headless":true,...`) and `Remote protocol error is not + //! retryable` reconnect-loop terminations in + //! `~/.jcode/logs/jcode-2026-05-21.log`. + //! + //! Root cause: several server code paths used the singular fallback + //! `member.event_tx` directly instead of fanning out to all + //! `member.event_txs`, AND `register/unregister/fanout_session_event` + //! silently overwrote `member.event_tx` to point at whichever + //! connection's writer happened to be touched last. That meant a `send` + //! intended for one client's writer could land on another client's + //! writer mid-line, splicing event tails into unrelated frames. + + use super::*; + use crate::protocol::ServerEvent; + + fn fresh_member( + session_id: &str, + fallback_tx: mpsc::UnboundedSender, + ) -> SwarmMember { + SwarmMember { + session_id: session_id.to_string(), + event_tx: fallback_tx, + event_txs: HashMap::new(), + working_dir: None, + swarm_id: None, + swarm_enabled: false, + status: "ready".to_string(), + detail: None, + friendly_name: None, + report_back_to_session_id: None, + latest_completion_report: None, + role: "agent".to_string(), + joined_at: Instant::now(), + last_status_change: Instant::now(), + is_headless: true, + } + } + + fn drain(rx: &mut mpsc::UnboundedReceiver) -> Vec { + let mut out = Vec::new(); + while let Ok(v) = rx.try_recv() { + out.push(v); + } + out + } + + /// `register_session_event_sender` must NOT silently overwrite the + /// singular `event_tx` fallback while it is still live. Doing so caused + /// later direct `member.event_tx.send(...)` callers to deliver to a + /// different connection's writer than intended. + #[tokio::test] + async fn register_does_not_overwrite_live_fallback_sender() { + let (fallback_tx, mut fallback_rx) = mpsc::unbounded_channel::(); + let (conn_a_tx, _conn_a_rx) = mpsc::unbounded_channel::(); + + let mut map = HashMap::new(); + map.insert("sess-1".to_string(), fresh_member("sess-1", fallback_tx)); + let members = Arc::new(RwLock::new(map)); + + register_session_event_sender(&members, "sess-1", "conn-a", conn_a_tx).await; + + // Fallback must still point at the original headless intake. + let guard = members.read().await; + let member = guard.get("sess-1").expect("member exists"); + member + .event_tx + .send(ServerEvent::Ack { id: 7 }) + .expect("fallback sender still alive"); + drop(guard); + + let delivered = drain(&mut fallback_rx); + assert_eq!(delivered.len(), 1, "fallback receiver got the event"); + } + + /// When the live fallback is closed, `register_session_event_sender` is + /// allowed to adopt the new sender so the member is not stranded. + #[tokio::test] + async fn register_adopts_new_sender_when_fallback_is_closed() { + let (fallback_tx, fallback_rx) = mpsc::unbounded_channel::(); + drop(fallback_rx); // closes the channel + + let (conn_a_tx, mut conn_a_rx) = mpsc::unbounded_channel::(); + + let mut map = HashMap::new(); + map.insert("sess-1".to_string(), fresh_member("sess-1", fallback_tx)); + let members = Arc::new(RwLock::new(map)); + + register_session_event_sender(&members, "sess-1", "conn-a", conn_a_tx).await; + + let guard = members.read().await; + let member = guard.get("sess-1").expect("member exists"); + member + .event_tx + .send(ServerEvent::Ack { id: 42 }) + .expect("new fallback should be live"); + drop(guard); + + let delivered = drain(&mut conn_a_rx); + assert_eq!(delivered.len(), 1, "new live conn picked up the fallback"); + } + + /// `unregister_session_event_sender` must not re-point the singular + /// `event_tx` to a surviving connection. The fallback is owned by + /// member-creation (headless intake) and silently swapping it caused + /// cross-wired writes between connections. + #[tokio::test] + async fn unregister_does_not_repoint_fallback_to_survivor() { + let (fallback_tx, mut fallback_rx) = mpsc::unbounded_channel::(); + let (conn_a_tx, _conn_a_rx) = mpsc::unbounded_channel::(); + let (conn_b_tx, mut conn_b_rx) = mpsc::unbounded_channel::(); + + let mut map = HashMap::new(); + map.insert("sess-1".to_string(), fresh_member("sess-1", fallback_tx)); + let members = Arc::new(RwLock::new(map)); + + register_session_event_sender(&members, "sess-1", "conn-a", conn_a_tx).await; + register_session_event_sender(&members, "sess-1", "conn-b", conn_b_tx).await; + + // conn-a goes away. + unregister_session_event_sender(&members, "sess-1", "conn-a").await; + + // Sending to the singular `event_tx` must land on the original + // fallback receiver, NOT on conn-b's writer. + { + let guard = members.read().await; + let member = guard.get("sess-1").expect("member exists"); + member + .event_tx + .send(ServerEvent::Ack { id: 1 }) + .expect("fallback still alive"); + } + + let fallback_msgs = drain(&mut fallback_rx); + let conn_b_msgs = drain(&mut conn_b_rx); + assert_eq!( + fallback_msgs.len(), + 1, + "fallback receiver should get the event" + ); + assert!( + conn_b_msgs.is_empty(), + "surviving connection must NOT silently absorb fallback traffic" + ); + } + + /// `fanout_session_event` must deliver to every live attachment exactly + /// once and must not mutate the singular `event_tx` to point at one of + /// them. Without this guarantee, subsequent direct `event_tx` users + /// would deliver to an arbitrary connection (HashMap iteration order), + /// which is exactly the wire-corruption pattern observed in production. + #[tokio::test] + async fn fanout_delivers_to_all_connections_and_does_not_mutate_fallback() { + let (fallback_tx, mut fallback_rx) = mpsc::unbounded_channel::(); + let (conn_a_tx, mut conn_a_rx) = mpsc::unbounded_channel::(); + let (conn_b_tx, mut conn_b_rx) = mpsc::unbounded_channel::(); + + let mut map = HashMap::new(); + map.insert("sess-1".to_string(), fresh_member("sess-1", fallback_tx)); + let members = Arc::new(RwLock::new(map)); + + register_session_event_sender(&members, "sess-1", "conn-a", conn_a_tx).await; + register_session_event_sender(&members, "sess-1", "conn-b", conn_b_tx).await; + + let delivered = + fanout_session_event(&members, "sess-1", ServerEvent::Ack { id: 99 }).await; + assert_eq!(delivered, 2, "fanout reaches both live attachments"); + + let a = drain(&mut conn_a_rx); + let b = drain(&mut conn_b_rx); + assert_eq!(a.len(), 1, "conn-a got exactly one copy"); + assert_eq!(b.len(), 1, "conn-b got exactly one copy"); + + // Fallback receiver must NOT have received anything: when live + // attachments exist, fanout snapshots `event_txs` and does NOT + // duplicate to `event_tx`. + let fb = drain(&mut fallback_rx); + assert!( + fb.is_empty(), + "fanout must not duplicate to the singular fallback when live conns exist" + ); + + // And the singular `event_tx` must STILL be the original fallback + // (i.e. fanout did not silently re-point it). The cheapest check is + // to send via `event_tx` and confirm the fallback receiver gets it. + { + let guard = members.read().await; + let member = guard.get("sess-1").expect("member exists"); + member + .event_tx + .send(ServerEvent::Ack { id: 100 }) + .expect("fallback sender still alive"); + } + let fb_after = drain(&mut fallback_rx); + assert_eq!( + fb_after.len(), + 1, + "singular event_tx must remain pointed at the original fallback" + ); + assert!( + drain(&mut conn_a_rx).is_empty() + && drain(&mut conn_b_rx).is_empty(), + "direct fallback send must NOT bleed into live connections" + ); + } + + /// When no live attachments are registered, fanout falls back to the + /// singular `event_tx` so headless-only sessions still receive events. + #[tokio::test] + async fn fanout_falls_back_to_singular_sender_when_no_live_conns() { + let (fallback_tx, mut fallback_rx) = mpsc::unbounded_channel::(); + + let mut map = HashMap::new(); + map.insert("sess-1".to_string(), fresh_member("sess-1", fallback_tx)); + let members = Arc::new(RwLock::new(map)); + + let delivered = + fanout_session_event(&members, "sess-1", ServerEvent::Ack { id: 1 }).await; + assert_eq!(delivered, 1, "fanout used the singular fallback"); + + let msgs = drain(&mut fallback_rx); + assert_eq!(msgs.len(), 1, "fallback receiver got the event"); + } +} diff --git a/src/server/swarm.rs b/src/server/swarm.rs index c21956b66..db760ebc1 100644 --- a/src/server/swarm.rs +++ b/src/server/swarm.rs @@ -432,11 +432,12 @@ pub(super) async fn broadcast_swarm_plan_with_previous( summary: Some(summary), }; - let members = swarm_members.read().await; for sid in participants { - if let Some(member) = members.get(&sid) { - let _ = member.event_tx.send(event.clone()); - } + // Use fanout so every live attachment of this session receives the + // event; falling back to a single `member.event_tx` here can pick a + // stale or wrong-connection sender and drops events for other + // attachments. + let _ = super::fanout_session_event(swarm_members, &sid, event.clone()).await; } } @@ -565,18 +566,24 @@ pub(super) async fn remove_session_from_swarm( if let Some(vp) = plans.get_mut(swarm_id) { vp.participants.insert(new_id.clone()); } - let members = swarm_members.read().await; - if let Some(member) = members.get(&new_id) { - let _ = member.event_tx.send(ServerEvent::Notification { - from_session: new_id.clone(), - from_name: member.friendly_name.clone(), - notification_type: NotificationType::Message { - scope: Some("swarm".to_string()), - channel: None, - }, - message: "You are now the coordinator for this swarm.".to_string(), - }); - } + drop(plans); + let friendly_name = { + let members = swarm_members.read().await; + members + .get(&new_id) + .and_then(|member| member.friendly_name.clone()) + }; + let notification = ServerEvent::Notification { + from_session: new_id.clone(), + from_name: friendly_name, + notification_type: NotificationType::Message { + scope: Some("swarm".to_string()), + channel: None, + }, + message: "You are now the coordinator for this swarm.".to_string(), + }; + // Fan out to every live attachment of the promoted session. + let _ = super::fanout_session_event(swarm_members, &new_id, notification).await; } } diff --git a/src/server/swarm_persistence_tests.rs b/src/server/swarm_persistence_tests.rs index 2b930d19d..172ec4ef6 100644 --- a/src/server/swarm_persistence_tests.rs +++ b/src/server/swarm_persistence_tests.rs @@ -3,6 +3,13 @@ use std::time::Instant; struct EnvGuard { runtime: Option, + // Hold the global test-env mutex for the lifetime of the guard so that + // concurrent tests cannot race on `JCODE_RUNTIME_DIR`. Previously this + // guard was dropped at the end of `test_env(...)`, allowing other tests + // running in parallel to overwrite the env var mid-test and corrupt + // `load_runtime_state()` results. The field is intentionally read-only; + // we only need RAII. + _lock: std::sync::MutexGuard<'static, ()>, } impl Drop for EnvGuard { @@ -16,10 +23,13 @@ impl Drop for EnvGuard { } fn test_env(dir: &tempfile::TempDir) -> EnvGuard { - let _guard = storage::lock_test_env(); + let lock = storage::lock_test_env(); let previous = std::env::var_os("JCODE_RUNTIME_DIR"); crate::env::set_var("JCODE_RUNTIME_DIR", dir.path()); - EnvGuard { runtime: previous } + EnvGuard { + runtime: previous, + _lock: lock, + } } #[test] From d3e3b7532bb8654f3d1abea3f8c5654635257bf3 Mon Sep 17 00:00:00 2001 From: Jager Cooper <100608609+Zephyr709@users.noreply.github.com> Date: Thu, 21 May 2026 11:33:04 -0230 Subject: [PATCH 9/9] fix(protocol): tolerate corrupt wire frames instead of crashing Two complementary defenses against the multi-session protocol corruption seen in production: 1. `encode_event` now refuses to emit a JSON frame containing raw newlines. If serialization ever produces one (custom Display impls, hand-built JSON, etc.) we log the kind and strip the byte instead of shipping a frame that would split into two on the receiver and crash every attached client. 2. `RemoteConnection::next_event` no longer treats a single malformed frame as fatal. It logs a truncated preview and resyncs at the next newline, giving up only after 16 consecutive corrupt frames to avoid busy looping. A single bad write on the wire used to take down every session attached to the shared server; now those sessions stay alive while we hunt the upstream cause. Together these prevent the cross-session crash cascades observed when multiple TUI sessions were attached to the same shared-server PID. --- crates/jcode-protocol/src/lib.rs | 39 ++++++++++++++++++++++++++- src/tui/backend.rs | 45 +++++++++++++++++++++++++++----- 2 files changed, 77 insertions(+), 7 deletions(-) diff --git a/crates/jcode-protocol/src/lib.rs b/crates/jcode-protocol/src/lib.rs index 57880e1c1..a74941c89 100644 --- a/crates/jcode-protocol/src/lib.rs +++ b/crates/jcode-protocol/src/lib.rs @@ -2089,13 +2089,50 @@ fn default_model_direction() -> i8 { 1 } -/// Encode an event as a newline-terminated JSON string +/// Encode an event as a newline-terminated JSON string. +/// +/// Defends the line-delimited protocol against accidental raw-newline +/// injection. A correctly-serialized JSON value contains no unescaped +/// newline bytes; if one slips through (custom Display impl, hand-built +/// JSON, etc.) the receiving side would resync mid-frame and see +/// "invalid type / expected value" errors that crash sessions. Stripping +/// embedded raw newlines here keeps the wire frame intact and surfaces +/// the bug via the eprintln below without taking the client down. pub fn encode_event(event: &ServerEvent) -> String { let mut json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string()); + if json.as_bytes().contains(&b'\n') { + // This should be impossible with serde_json, but if it ever + // happens we'd rather log loudly and ship a still-parseable + // single-line frame than corrupt the wire. + eprintln!( + "jcode-protocol: encode_event produced embedded newline; stripping. event_kind={}", + event_kind_for_debug(event) + ); + json = json.replace('\n', " "); + } + debug_assert!( + !json.as_bytes().contains(&b'\n'), + "encode_event must not produce embedded newlines" + ); json.push('\n'); json } +fn event_kind_for_debug(event: &ServerEvent) -> &'static str { + match event { + ServerEvent::Ack { .. } => "Ack", + ServerEvent::Pong { .. } => "Pong", + ServerEvent::Error { .. } => "Error", + ServerEvent::Done { .. } => "Done", + ServerEvent::Interrupted => "Interrupted", + ServerEvent::TextDelta { .. } => "TextDelta", + ServerEvent::ToolStart { .. } => "ToolStart", + ServerEvent::ToolDone { .. } => "ToolDone", + ServerEvent::SwarmStatus { .. } => "SwarmStatus", + _ => "other", + } +} + /// Decode a request from a JSON string pub fn decode_request(line: &str) -> Result { serde_json::from_str(line) diff --git a/src/tui/backend.rs b/src/tui/backend.rs index b7e942432..4d881e360 100644 --- a/src/tui/backend.rs +++ b/src/tui/backend.rs @@ -803,6 +803,14 @@ impl RemoteConnection { /// Read the next event from the server. pub async fn next_event(&mut self) -> RemoteRead { + // Tolerate up to this many consecutive malformed frames before + // giving up. A single corrupt line on the wire used to take down + // every attached session, but skipping the bad frame and resyncing + // at the next `\n` boundary lets the connection survive a benign + // glitch (e.g. an unescaped newline from a buggy emitter, a stale + // half-buffer from a reload, etc.). + const MAX_CONSECUTIVE_MALFORMED_FRAMES: u32 = 16; + let mut malformed_in_a_row: u32 = 0; loop { self.line_buffer.clear(); match self.reader.read_line(&mut self.line_buffer).await { @@ -822,15 +830,40 @@ impl RemoteConnection { continue; } match serde_json::from_str(&self.line_buffer) { - Ok(event) => return RemoteRead::Event(event), + Ok(event) => { + malformed_in_a_row = 0; + return RemoteRead::Event(event); + } Err(error) => { + malformed_in_a_row = malformed_in_a_row.saturating_add(1); + // Truncate the dump so the log isn't flooded by + // multi-KB SwarmStatus payloads. + let preview: String = self + .line_buffer + .chars() + .take(240) + .collect(); crate::logging::warn(&format!( - "RemoteConnection::next_event: protocol error={} line={:?} (session_id={:?}, client_instance_id={:?})", - error, self.line_buffer, self.session_id, self.client_instance_id - )); - return RemoteRead::Disconnected(RemoteDisconnectReason::Protocol( - error.to_string(), + "RemoteConnection::next_event: protocol error={} (consecutive_malformed={}, len={}) line_preview={:?} (session_id={:?}, client_instance_id={:?})", + error, + malformed_in_a_row, + self.line_buffer.len(), + preview, + self.session_id, + self.client_instance_id )); + if malformed_in_a_row >= MAX_CONSECUTIVE_MALFORMED_FRAMES { + crate::logging::error(&format!( + "RemoteConnection::next_event: {} consecutive malformed frames; giving up to avoid a busy loop", + malformed_in_a_row + )); + return RemoteRead::Disconnected( + RemoteDisconnectReason::Protocol(error.to_string()), + ); + } + // Skip this corrupt frame and resync at the next + // `\n` instead of tearing down the session. + continue; } } }