Skip to content
39 changes: 38 additions & 1 deletion crates/jcode-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2089,13 +2089,50 @@ fn default_model_direction() -> i8 {
1
}

/// Encode an event as a newline-terminated JSON string
/// Encode an event as a newline-terminated JSON string.
///
/// Defends the line-delimited protocol against accidental raw-newline
/// injection. A correctly-serialized JSON value contains no unescaped
/// newline bytes; if one slips through (custom Display impl, hand-built
/// JSON, etc.) the receiving side would resync mid-frame and see
/// "invalid type / expected value" errors that crash sessions. Stripping
/// embedded raw newlines here keeps the wire frame intact and surfaces
/// the bug via the eprintln below without taking the client down.
pub fn encode_event(event: &ServerEvent) -> String {
let mut json = serde_json::to_string(event).unwrap_or_else(|_| "{}".to_string());
if json.as_bytes().contains(&b'\n') {
// This should be impossible with serde_json, but if it ever
// happens we'd rather log loudly and ship a still-parseable
// single-line frame than corrupt the wire.
eprintln!(
"jcode-protocol: encode_event produced embedded newline; stripping. event_kind={}",
event_kind_for_debug(event)
);
json = json.replace('\n', " ");
}
debug_assert!(
!json.as_bytes().contains(&b'\n'),
"encode_event must not produce embedded newlines"
);
json.push('\n');
json
}

fn event_kind_for_debug(event: &ServerEvent) -> &'static str {
match event {
ServerEvent::Ack { .. } => "Ack",
ServerEvent::Pong { .. } => "Pong",
ServerEvent::Error { .. } => "Error",
ServerEvent::Done { .. } => "Done",
ServerEvent::Interrupted => "Interrupted",
ServerEvent::TextDelta { .. } => "TextDelta",
ServerEvent::ToolStart { .. } => "ToolStart",
ServerEvent::ToolDone { .. } => "ToolDone",
ServerEvent::SwarmStatus { .. } => "SwarmStatus",
_ => "other",
}
}

/// Decode a request from a JSON string
pub fn decode_request(line: &str) -> Result<Request, serde_json::Error> {
serde_json::from_str(line)
Expand Down
26 changes: 26 additions & 0 deletions docker-compose.agent-db.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Agent DB Substrate — local Postgres for agent analytics
# Well-known credentials; only accessible from localhost.
# Start: docker compose -f docker-compose.agent-db.yml up -d
# Stop: docker compose -f docker-compose.agent-db.yml down -v

services:
postgres:
image: postgres:16-alpine
container_name: jcode-agent-db
restart: unless-stopped
ports:
- "5432:5432"
environment:
POSTGRES_USER: jcode_agent
POSTGRES_PASSWORD: jcode_agent_local
POSTGRES_DB: jcode_agent_workspace
volumes:
- jcode_agent_db_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U jcode_agent -d jcode_agent_workspace"]
interval: 5s
timeout: 3s
retries: 5

volumes:
jcode_agent_db_data:
107 changes: 103 additions & 4 deletions src/ambient/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,75 @@ use tokio::sync::{Notify, RwLock};

const MAX_IDLE_POLL_SECS: u64 = 30;

/// Pure helper: given the error returned by an ambient agent turn, decide
/// whether the runner should attempt a provider switch and what label to
/// switch to. Returns `Some(target_provider_id)` when the error carries a
/// failover prompt; `None` otherwise.
///
/// Split out from [`try_headless_provider_failover`] so it can be unit
/// tested without constructing a real `Agent`/`Provider`.
fn parse_failover_target(run_result: &anyhow::Result<String>) -> Option<String> {
let err = run_result.as_ref().err()?;
let prompt = crate::provider::parse_failover_prompt_message(&err.to_string())?;
Some(prompt.to_provider)
}

/// Try to recover from a provider-level failure during a headless ambient cycle
/// by switching providers once and re-running the original message.
///
/// Returns `true` if a switch + retry was attempted (regardless of outcome).
/// The caller is responsible for inspecting `ambient_tools::take_cycle_result()`
/// after this returns; if the retry produced a result, it will be picked up
/// by the same code path that handles a normal successful turn.
///
/// We intentionally cap this at a single retry per cycle to avoid burning
/// through every configured account/provider in one wake-up when an outage
/// is global rather than per-provider.
async fn try_headless_provider_failover(
provider: &Arc<dyn Provider>,
agent: &mut Agent,
run_result: &anyhow::Result<String>,
initial_message: &str,
) -> bool {
let Some(target_provider) = parse_failover_target(run_result) else {
return false;
};
// We re-parse here for the human-readable labels in the log line; the
// pure helper above only returns the machine-readable target id.
let prompt_for_log = run_result
.as_ref()
.err()
.and_then(|e| crate::provider::parse_failover_prompt_message(&e.to_string()));
match provider.switch_active_provider_to(&target_provider) {
Ok(()) => {
if let Some(prompt) = prompt_for_log {
logging::info(&format!(
"Ambient cycle: headless failover {} → {} (reason: {}); retrying initial message",
prompt.from_label, prompt.to_label, prompt.reason
));
} else {
logging::info(&format!(
"Ambient cycle: headless failover → {}; retrying initial message",
target_provider
));
}
}
Err(switch_err) => {
logging::warn(&format!(
"Ambient cycle: headless failover to {} failed: {}",
target_provider, switch_err
));
return false;
}
}
// Re-issue the original user message on the new provider. The agent's
// conversation history already contains the failed turn, which the new
// provider will see as prior assistant context; that's intentional, since
// the failover prompt explicitly notes the input would be resent.
let _ = agent.run_once_capture(initial_message).await;
true
}

/// Shared ambient runner state, accessible from the server, debug socket, and TUI.
#[derive(Clone)]
pub struct AmbientRunnerHandle {
Expand Down Expand Up @@ -881,11 +950,20 @@ impl AmbientRunnerHandle {
self.set_running_detail("gathering context").await;
let (system_prompt, initial_message) = self.build_cycle_context(provider).await?;

// Visible mode: spawn a full TUI instead of running headlessly
// Visible mode: spawn a full TUI instead of running headlessly.
// If the configured terminal is unavailable, fall back to headless so
// external wakeups (Discord/Telegram/email) still produce a cycle.
if visible {
return self
.run_cycle_visible(started_at, system_prompt, initial_message)
.await;
match self
.run_cycle_visible(started_at, system_prompt.clone(), initial_message.clone())
.await
{
Ok(result) => return Ok(result),
Err(e) => logging::warn(&format!(
"Ambient visible cycle failed ({}), falling back to headless",
e
)),
}
}

// Headless mode: run agent directly
Expand Down Expand Up @@ -927,6 +1005,27 @@ impl AmbientRunnerHandle {
});
}

// If the initial turn errored with a provider-failover prompt (e.g. the
// active provider hit its rate cap), try switching providers once and
// re-running the original message. In headless mode there is no TUI to
// surface the countdown, so this is the analogue of the TUI's auto-
// switch path. A single retry keeps the blast radius bounded.
let attempted_failover =
try_headless_provider_failover(provider, &mut agent, &run_result, &initial_message)
.await;

if attempted_failover && let Some(result) = ambient_tools::take_cycle_result() {
ambient_tools::unregister_ambient_session(&ambient_session_id);
let conversation = agent.export_conversation_markdown();
agent.mark_closed();
return Ok(AmbientCycleResult {
started_at,
ended_at: Utc::now(),
conversation: Some(conversation),
..result
});
}

// Agent didn't call end_ambient_cycle - try continuation
if run_result.is_err() {
logging::warn("Ambient cycle: agent error without calling end_ambient_cycle");
Expand Down
28 changes: 28 additions & 0 deletions src/ambient/runner_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::AmbientRunnerHandle;
use super::parse_failover_target;
use crate::ambient::{Priority, ScheduleTarget, ScheduledItem};
use crate::message::{Message, Role, StreamEvent, ToolDefinition};
use crate::provider::{EventStream, Provider};
Expand All @@ -15,6 +16,33 @@ struct EnvVarGuard {
prev: Option<std::ffi::OsString>,
}

#[test]
fn parse_failover_target_extracts_provider_from_failover_prompt_error() {
let prompt = crate::provider::ProviderFailoverPrompt {
from_provider: "openai".to_string(),
from_label: "OpenAI".to_string(),
to_provider: "claude".to_string(),
to_label: "Anthropic".to_string(),
reason: "rate limit reached".to_string(),
estimated_input_chars: 1234,
estimated_input_tokens: 567,
};
let err: anyhow::Result<String> = Err(anyhow::anyhow!(prompt.to_error_message()));
assert_eq!(parse_failover_target(&err).as_deref(), Some("claude"));
}

#[test]
fn parse_failover_target_returns_none_for_unrelated_errors() {
let err: anyhow::Result<String> = Err(anyhow::anyhow!("network timeout after 30s"));
assert!(parse_failover_target(&err).is_none());
}

#[test]
fn parse_failover_target_returns_none_for_ok_results() {
let ok: anyhow::Result<String> = Ok("done".to_string());
assert!(parse_failover_target(&ok).is_none());
}

impl EnvVarGuard {
fn set_path(key: &'static str, value: &std::path::Path) -> Self {
let prev = std::env::var_os(key);
Expand Down
3 changes: 3 additions & 0 deletions src/cli/provider_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,9 @@ async fn init_provider_with_options(
crate::env::set_var("JCODE_PROVIDER_PROFILE_ACTIVE", "1");
}

let cfg = crate::config::config();
crate::provider_catalog::rehydrate_active_named_provider_profile_env_from_config(cfg)?;

if std::env::var_os("JCODE_PROVIDER_PROFILE_ACTIVE").is_none()
&& std::env::var_os("JCODE_NAMED_PROVIDER_PROFILE").is_none()
{
Expand Down
15 changes: 11 additions & 4 deletions src/prompt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ fn gpu_summary() -> Option<String> {
/// Load AGENTS.md files from a specific working directory
pub fn load_agents_md_files_from_dir(working_dir: Option<&Path>) -> (Option<String>, ContextInfo) {
let mut contents = vec![];
let mut loaded_paths = Vec::new();
let mut info = ContextInfo::default();

// Helper to load a file if it exists, returns (formatted_content, raw_size)
Expand All @@ -573,17 +574,23 @@ pub fn load_agents_md_files_from_dir(working_dir: Option<&Path>) -> (Option<Stri

// Project-level files (from specified working directory or current directory)
let project_dir = working_dir.unwrap_or(Path::new("."));
if let Some((content, size)) = load_file(
&project_dir.join("AGENTS.md"),
"Project Instructions (AGENTS.md)",
) {
let project_agents_md = project_dir.join("AGENTS.md");
if let Some((content, size)) = load_file(&project_agents_md, "Project Instructions (AGENTS.md)")
{
info.has_project_agents_md = true;
info.project_agents_md_chars = size;
if let Ok(canonical) = project_agents_md.canonicalize() {
loaded_paths.push(canonical);
}
contents.push(content);
}

// Home directory files
if let Ok(global_agents_md) = crate::storage::user_home_path("AGENTS.md")
&& global_agents_md
.canonicalize()
.map(|canonical| !loaded_paths.iter().any(|loaded| loaded == &canonical))
.unwrap_or(true)
&& let Some((content, size)) =
load_file(&global_agents_md, "Global Instructions (~/.AGENTS.md)")
{
Expand Down
24 changes: 24 additions & 0 deletions src/prompt_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,30 @@ fn test_load_agents_md_files_uses_sandboxed_global_files() {
}
}

#[test]
fn test_load_agents_md_files_deduplicates_home_project_file() {
let _guard = crate::storage::lock_test_env();
let prev_home = std::env::var_os("JCODE_HOME");
let temp = tempfile::TempDir::new().unwrap();
crate::env::set_var("JCODE_HOME", temp.path());
let external = temp.path().join("external");
std::fs::create_dir_all(&external).unwrap();
std::fs::write(external.join("AGENTS.md"), "shared home instructions").unwrap();

let (content, info) = load_agents_md_files_from_dir(Some(&external));

assert!(info.has_project_agents_md);
assert!(!info.has_global_agents_md);
let content = content.expect("project instructions content");
assert_eq!(content.matches("shared home instructions").count(), 1);

if let Some(prev_home) = prev_home {
crate::env::set_var("JCODE_HOME", prev_home);
} else {
crate::env::remove_var("JCODE_HOME");
}
}

#[test]
fn test_session_context_includes_time_timezone_and_system_info() {
let context = build_session_context(None);
Expand Down
9 changes: 9 additions & 0 deletions src/provider/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ impl MultiProvider {
let cfg = crate::config::config();
let provider_state = ProviderState::from_parts(cfg, &auth_status);
let mut default_named_provider_profile: Option<String> = None;
match crate::provider_catalog::rehydrate_active_named_provider_profile_env_from_config(cfg)
{
Ok(Some(profile_name)) => default_named_provider_profile = Some(profile_name),
Ok(None) => {}
Err(err) => crate::logging::warn(&format!(
"Failed to rehydrate active provider profile from config: {}",
err
)),
}
if std::env::var_os("JCODE_PROVIDER_PROFILE_ACTIVE").is_none()
&& std::env::var_os("JCODE_NAMED_PROVIDER_PROFILE").is_none()
&& let Some(pref) = provider_state.default_provider_key()
Expand Down
23 changes: 23 additions & 0 deletions src/provider_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,29 @@ pub fn apply_named_provider_profile_env(profile_name: &str) -> anyhow::Result<St
apply_named_provider_profile_env_from_config(profile_name, config)
}

pub fn rehydrate_active_named_provider_profile_env_from_config(
config: &crate::config::Config,
) -> anyhow::Result<Option<String>> {
let profile_name = std::env::var("JCODE_NAMED_PROVIDER_PROFILE")
.ok()
.or_else(|| std::env::var("JCODE_PROVIDER_PROFILE_NAME").ok())
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty());

let Some(profile_name) = profile_name else {
return Ok(None);
};

if !config.providers.contains_key(&profile_name) {
return Ok(None);
}

let profile_name = apply_named_provider_profile_env_from_config(&profile_name, config)?;
crate::env::set_var("JCODE_PROVIDER_PROFILE_NAME", &profile_name);
crate::env::set_var("JCODE_PROVIDER_PROFILE_ACTIVE", "1");
Ok(Some(profile_name))
}

pub fn apply_named_provider_profile_env_from_config(
profile_name: &str,
config: &crate::config::Config,
Expand Down
Loading