diff --git a/Cargo.lock b/Cargo.lock index 4fb0f9106..041a86617 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11029,6 +11029,7 @@ dependencies = [ "proptest", "rand 0.10.1", "rand_distr 0.6.0", + "reqwest 0.12.28", "reqwest 0.13.3", "ripemd", "rubato", diff --git a/crates/zeph-agent-context/src/helpers.rs b/crates/zeph-agent-context/src/helpers.rs index ad62717b4..560986d05 100644 --- a/crates/zeph-agent-context/src/helpers.rs +++ b/crates/zeph-agent-context/src/helpers.rs @@ -969,9 +969,9 @@ pub async fn fetch_semantic_recall( fn format_plain_recall_entry(item: &zeph_memory::RecalledMessage) -> String { let role_label = match item.message.role { - Role::User => "user", Role::Assistant => "assistant", Role::System => "system", + Role::User | _ => "user", }; format!("- [{}] {}\n", role_label, item.message.content) } @@ -979,9 +979,9 @@ fn format_plain_recall_entry(item: &zeph_memory::RecalledMessage) -> String { #[allow(clippy::map_unwrap_or)] fn format_structured_recall_entry(item: &zeph_memory::RecalledMessage) -> String { let source = match item.message.role { - Role::User => "user", Role::Assistant => "assistant", Role::System => "system", + Role::User | _ => "user", }; // Use compacted_at as a proxy for message age when available; otherwise "unknown". // A full timestamp lookup from SQLite would require an async DB call in the assembler diff --git a/crates/zeph-agent-context/src/memory_backend.rs b/crates/zeph-agent-context/src/memory_backend.rs index 0fbd36c85..9946c430f 100644 --- a/crates/zeph-agent-context/src/memory_backend.rs +++ b/crates/zeph-agent-context/src/memory_backend.rs @@ -69,9 +69,9 @@ fn map_correction(c: zeph_memory::UserCorrectionRow) -> MemCorrection { fn map_recalled_message(r: zeph_memory::RecalledMessage) -> MemRecalledMessage { use zeph_llm::provider::Role; let role = match r.message.role { - Role::User => "user", Role::Assistant => "assistant", Role::System => "system", + Role::User | _ => "user", } .to_owned(); MemRecalledMessage { diff --git a/crates/zeph-agent-context/src/summarization/scheduling.rs b/crates/zeph-agent-context/src/summarization/scheduling.rs index 839ee3454..4577d9291 100644 --- a/crates/zeph-agent-context/src/summarization/scheduling.rs +++ b/crates/zeph-agent-context/src/summarization/scheduling.rs @@ -383,9 +383,9 @@ fn spawn_task_goal_extraction( let mut context_text = String::new(); for (role, content) in &recent { let role_str = match role { - Role::User => "user", Role::Assistant => "assistant", Role::System => "system", + Role::User | _ => "user", }; let preview = if content.len() > 300 { let end = content.floor_char_boundary(300); @@ -460,9 +460,9 @@ fn spawn_subgoal_extraction( let mut context_text = String::new(); for (role, content) in &recent { let role_str = match role { - Role::User => "user", Role::Assistant => "assistant", Role::System => "system", + Role::User | _ => "user", }; let preview = if content.len() > 300 { let end = content.floor_char_boundary(300); diff --git a/crates/zeph-context/src/fidelity.rs b/crates/zeph-context/src/fidelity.rs index 4023b00a3..d0abf334c 100644 --- a/crates/zeph-context/src/fidelity.rs +++ b/crates/zeph-context/src/fidelity.rs @@ -525,7 +525,7 @@ fn role_weight(role: Role) -> f32 { match role { Role::System => 1.0, Role::User => 0.8, - Role::Assistant => 0.6, + Role::Assistant | _ => 0.6, } } @@ -785,8 +785,8 @@ fn truncate_to_tokens(content: &mut String, max_tokens: usize, tc: &dyn TokenCou fn render_placeholder(msg: &mut Message, score: f32, original_tokens: u32) { let role_str = match msg.role { Role::System => "system", - Role::User => "user", Role::Assistant => "assistant", + Role::User | _ => "user", }; msg.content = format!( "[placeholder: role={role_str}, original_tokens={original_tokens}, importance={score:.2}]" @@ -836,8 +836,8 @@ fn merge_consecutive_placeholders(messages: &mut Vec) -> usize { }; let role_str = match role { Role::System => "system", - Role::User => "user", Role::Assistant => "assistant", + Role::User | _ => "user", }; let merged_content = format!( "[placeholder: {count} messages, role={role_str}, total_tokens={total_tokens}, avg_importance={avg_importance:.2}]" diff --git a/crates/zeph-context/src/summarization.rs b/crates/zeph-context/src/summarization.rs index 82214821c..44be33150 100644 --- a/crates/zeph-context/src/summarization.rs +++ b/crates/zeph-context/src/summarization.rs @@ -456,6 +456,7 @@ pub fn build_metadata_summary(messages: &[Message], truncate: fn(&str, usize) -> } } Role::System => system_count += 1, + _ => {} } } @@ -649,9 +650,9 @@ fn format_history(messages: &[Message]) -> String { history_text.push_str("\n\n"); } let role = match m.role { - Role::User => "user", Role::Assistant => "assistant", Role::System => "system", + Role::User | _ => "user", }; let _ = write!(history_text, "[{role}]: {}", m.content); } diff --git a/crates/zeph-core/src/agent/magic_docs.rs b/crates/zeph-core/src/agent/magic_docs.rs index c8afe7da5..e58cf6d10 100644 --- a/crates/zeph-core/src/agent/magic_docs.rs +++ b/crates/zeph-core/src/agent/magic_docs.rs @@ -439,7 +439,7 @@ mod tests { } } } - Role::System => {} + _ => {} } } diff --git a/crates/zeph-core/src/agent/mod.rs b/crates/zeph-core/src/agent/mod.rs index 1c4aef753..8cfe34fbd 100644 --- a/crates/zeph-core/src/agent/mod.rs +++ b/crates/zeph-core/src/agent/mod.rs @@ -610,9 +610,9 @@ impl Agent { .iter() .map(|m| { let role = match m.role { - Role::User => "user".to_owned(), Role::Assistant => "assistant".to_owned(), Role::System => "system".to_owned(), + Role::User | _ => "user".to_owned(), }; (zeph_memory::MessageId(0), role, m.content.clone()) }) diff --git a/crates/zeph-core/src/agent/session_digest.rs b/crates/zeph-core/src/agent/session_digest.rs index 8ddb15065..9ebd52cd0 100644 --- a/crates/zeph-core/src/agent/session_digest.rs +++ b/crates/zeph-core/src/agent/session_digest.rs @@ -94,9 +94,9 @@ fn format_and_sanitize_conversation( let mut result = String::new(); for msg in messages { let role = match msg.role { - Role::User => "User", Role::Assistant => "Assistant", Role::System => "System", + Role::User | _ => "User", }; // Redact credentials first, then sanitize for injection patterns. let redacted: Cow<'_, str> = scrub_content(&msg.content); diff --git a/crates/zeph-core/src/agent/speculative/stream_drainer.rs b/crates/zeph-core/src/agent/speculative/stream_drainer.rs index e703daff5..68e2b4cb6 100644 --- a/crates/zeph-core/src/agent/speculative/stream_drainer.rs +++ b/crates/zeph-core/src/agent/speculative/stream_drainer.rs @@ -125,9 +125,6 @@ impl SpeculativeStreamDrainer { ToolSseEvent::ThinkingBlockDone(block) => { thinking_blocks.push(block); } - ToolSseEvent::ThinkingChunk(_) => { - // Pass-through; full block assembled via ThinkingBlockDone. - } ToolSseEvent::ContentChunk(text) => { text_buf.push_str(&text); } @@ -140,6 +137,8 @@ impl SpeculativeStreamDrainer { ToolSseEvent::Error(e) => { return Err(e); } + // ThinkingChunk pass-through; full block assembled via ThinkingBlockDone. + ToolSseEvent::ThinkingChunk(_) | _ => {} } } diff --git a/crates/zeph-core/src/agent/tool_execution/focus.rs b/crates/zeph-core/src/agent/tool_execution/focus.rs index ffa70c29b..1e120907e 100644 --- a/crates/zeph-core/src/agent/tool_execution/focus.rs +++ b/crates/zeph-core/src/agent/tool_execution/focus.rs @@ -421,9 +421,9 @@ fn build_compression_prompt( to_compress: &[zeph_llm::provider::Message], ) -> Vec { let role_label = |role: &zeph_llm::provider::Role| match role { - zeph_llm::provider::Role::User => "user", zeph_llm::provider::Role::Assistant => "assistant", zeph_llm::provider::Role::System => "system", + zeph_llm::provider::Role::User | _ => "user", }; let bullet_list: String = to_compress .iter() diff --git a/crates/zeph-core/src/debug_dump/mod.rs b/crates/zeph-core/src/debug_dump/mod.rs index b308bf303..a0f323e89 100644 --- a/crates/zeph-core/src/debug_dump/mod.rs +++ b/crates/zeph-core/src/debug_dump/mod.rs @@ -480,7 +480,7 @@ fn messages_to_api_value(messages: &[Message]) -> serde_json::Value { let role = match m.role { Role::User => "user", Role::Assistant => "assistant", - Role::System => return None, + Role::System | _ => return None, }; let is_assistant = m.role == Role::Assistant; let has_structured = m.parts.iter().any(|p| { diff --git a/crates/zeph-core/src/memory_tools.rs b/crates/zeph-core/src/memory_tools.rs index 67b47bfaf..3880c4b15 100644 --- a/crates/zeph-core/src/memory_tools.rs +++ b/crates/zeph-core/src/memory_tools.rs @@ -147,9 +147,9 @@ impl ToolExecutor for MemoryToolExecutor { let _ = writeln!(output, "## Recalled Messages ({} results)", recalled.len()); for r in &recalled { let role = match r.message.role { - zeph_llm::provider::Role::User => "user", zeph_llm::provider::Role::Assistant => "assistant", zeph_llm::provider::Role::System => "system", + zeph_llm::provider::Role::User | _ => "user", }; let content = r.message.content.trim(); let _ = writeln!(output, "[score: {:.2}] {role}: {content}", r.score); diff --git a/crates/zeph-llm/Cargo.toml b/crates/zeph-llm/Cargo.toml index 18ec262fc..597beb6e3 100644 --- a/crates/zeph-llm/Cargo.toml +++ b/crates/zeph-llm/Cargo.toml @@ -42,6 +42,7 @@ parking_lot.workspace = true rand.workspace = true rand_distr.workspace = true reqwest = { workspace = true, features = ["json", "multipart", "rustls", "stream"] } +reqwest012 = { package = "reqwest", version = "0.12", default-features = false, features = ["rustls-tls"] } ripemd = { workspace = true, optional = true } rubato = { workspace = true, optional = true } schemars = { workspace = true } diff --git a/crates/zeph-llm/src/any.rs b/crates/zeph-llm/src/any.rs index 05a9eae9a..541050a79 100644 --- a/crates/zeph-llm/src/any.rs +++ b/crates/zeph-llm/src/any.rs @@ -76,6 +76,7 @@ macro_rules! delegate_provider { /// [`Arc`](crate::provider_dyn::LlmProviderDyn). /// /// The `Candle` variant is only available when the `candle` feature is enabled. +#[non_exhaustive] #[derive(Debug, Clone)] pub enum AnyProvider { Ollama(OllamaProvider), diff --git a/crates/zeph-llm/src/candle_provider/loader.rs b/crates/zeph-llm/src/candle_provider/loader.rs index 3bbf7444f..0ed3f4d89 100644 --- a/crates/zeph-llm/src/candle_provider/loader.rs +++ b/crates/zeph-llm/src/candle_provider/loader.rs @@ -10,6 +10,7 @@ use candle_core::quantized::gguf_file; use candle_transformers::models::quantized_llama::ModelWeights; use tokenizers::Tokenizer; +#[non_exhaustive] #[derive(Debug, Clone)] pub enum ModelSource { Local { diff --git a/crates/zeph-llm/src/candle_provider/template.rs b/crates/zeph-llm/src/candle_provider/template.rs index cc68fd7cb..cf6f25340 100644 --- a/crates/zeph-llm/src/candle_provider/template.rs +++ b/crates/zeph-llm/src/candle_provider/template.rs @@ -3,6 +3,7 @@ use crate::provider::{Message, Role}; +#[non_exhaustive] #[derive(Debug, Clone, Copy)] pub enum ChatTemplate { Llama3, diff --git a/crates/zeph-llm/src/ollama.rs b/crates/zeph-llm/src/ollama.rs index 9ec1eb470..98d8554c6 100644 --- a/crates/zeph-llm/src/ollama.rs +++ b/crates/zeph-llm/src/ollama.rs @@ -55,6 +55,17 @@ use crate::provider::{ }; use crate::usage::UsageTracker; +/// Build a reqwest 0.12 HTTP client (the version used by ollama-rs) with a 600-second hard +/// backstop timeout and a 30-second connect timeout. +fn ollama_reqwest_client() -> reqwest012::Client { + reqwest012::Client::builder() + .connect_timeout(std::time::Duration::from_secs(30)) + .timeout(std::time::Duration::from_mins(10)) + .user_agent(concat!("zeph/", env!("CARGO_PKG_VERSION"))) + .build() + .expect("Ollama HTTP client construction must not fail") +} + /// Metadata returned by `/api/show` for the configured chat model. #[derive(Debug)] pub struct ModelInfo { @@ -114,7 +125,7 @@ impl OllamaProvider { pub fn new(base_url: &str, model: String, embedding_model: String) -> Self { let (host, port) = parse_host_port(base_url); Self { - client: Ollama::new(host, port), + client: Ollama::new_with_client(host, port, ollama_reqwest_client()), model, embedding_model, context_window_size: None, @@ -647,6 +658,14 @@ mod tests { std::env::var("OLLAMA_EMBED_MODEL").unwrap_or_else(|_| "qwen3-embedding".into()) } + // --- #4729: Ollama HTTP client smoke test --- + + #[test] + fn ollama_reqwest_client_builds_without_panicking() { + // Verify that constructing the timeout-configured reqwest client does not panic. + let _client = ollama_reqwest_client(); + } + #[test] fn context_length_error_keywords_are_detected() { // Verify the helper used at each chat/stream/tools call site works for Ollama error strings. diff --git a/crates/zeph-llm/src/provider.rs b/crates/zeph-llm/src/provider.rs index d299ae41b..36030339b 100644 --- a/crates/zeph-llm/src/provider.rs +++ b/crates/zeph-llm/src/provider.rs @@ -246,6 +246,7 @@ pub struct GenerationOverrides { /// - `System` — global instructions prepended before the conversation /// - `User` — human turn input /// - `Assistant` — previous model output (used for multi-turn context) +#[non_exhaustive] #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum Role { diff --git a/crates/zeph-llm/src/sse.rs b/crates/zeph-llm/src/sse.rs index 188164bd3..ad19e134c 100644 --- a/crates/zeph-llm/src/sse.rs +++ b/crates/zeph-llm/src/sse.rs @@ -13,6 +13,7 @@ use crate::provider::{ChatStream, StreamChunk, ThinkingBlock, ToolUseRequest}; /// Never exposed as a `StreamChunk` downstream — the drainer consumes these and /// surfaces only `StreamChunk` variants, keeping `StreamChunk` exhaustiveness stable /// (critic M3). `pub` so `zeph-core` can use the drainer without feature-gating. +#[non_exhaustive] #[derive(Debug)] pub enum ToolSseEvent { /// Tool block opened: id and name are now known, before any `InputJsonDelta` for this index. @@ -320,7 +321,12 @@ fn parse_tool_delta_event(state: &mut ClaudeSseState, data: &str) -> Vec { if let Some((_, _, _, ref mut json)) = state.tool_block { - json.push_str(&delta.partial_json); + const MAX_TOOL_JSON_BUF: usize = 4 * 1024 * 1024; + if json.len() >= MAX_TOOL_JSON_BUF { + tracing::warn!("tool JSON buffer exceeded 4 MiB cap; discarding excess"); + } else { + json.push_str(&delta.partial_json); + } } let index = state.current_block_index; vec![ToolSseEvent::InputJsonDelta { @@ -330,7 +336,12 @@ fn parse_tool_delta_event(state: &mut ClaudeSseState, data: &str) -> Vec { if let Some((ref mut t, _)) = state.thinking_block { - t.push_str(&delta.thinking); + const MAX_THINKING_BUF: usize = 1024 * 1024; + if t.len() >= MAX_THINKING_BUF { + tracing::warn!("thinking buffer exceeded 1 MiB cap; discarding excess"); + } else { + t.push_str(&delta.thinking); + } } vec![ToolSseEvent::ThinkingChunk(delta.thinking)] } @@ -995,4 +1006,70 @@ mod tests { let chunk = result.unwrap().unwrap(); assert!(matches!(chunk, StreamChunk::Content(s) if s == "Hello world")); } + + // --- #4727: SSE buffer cap tests --- + + #[test] + fn tool_json_buf_capped_at_4mib() { + let mut state = ClaudeSseState { + tool_block: Some(( + 0, + "toolu_01".into(), + "bash".into(), + "x".repeat(4 * 1024 * 1024), + )), + current_block_index: 0, + ..Default::default() + }; + let data = r#"{"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"OVERFLOW"}}"#; + parse_tool_delta_event(&mut state, data); + let buf_len = state.tool_block.as_ref().unwrap().3.len(); + assert_eq!( + buf_len, + 4 * 1024 * 1024, + "tool JSON buffer must not grow beyond 4 MiB" + ); + } + + #[test] + fn tool_json_buf_accepts_data_below_cap() { + let mut state = ClaudeSseState { + tool_block: Some((0, "toolu_01".into(), "bash".into(), String::new())), + current_block_index: 0, + ..Default::default() + }; + let data = r#"{"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"{\"k\":1}"}}"#; + parse_tool_delta_event(&mut state, data); + let buf = &state.tool_block.as_ref().unwrap().3; + assert_eq!(buf, r#"{"k":1}"#); + } + + #[test] + fn thinking_buf_capped_at_1mib() { + let mut state = ClaudeSseState { + thinking_block: Some(("t".repeat(1024 * 1024), String::new())), + in_thinking_block: true, + ..Default::default() + }; + let data = r#"{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"OVERFLOW"}}"#; + parse_tool_delta_event(&mut state, data); + let thinking_len = state.thinking_block.as_ref().unwrap().0.len(); + assert_eq!( + thinking_len, + 1024 * 1024, + "thinking buffer must not grow beyond 1 MiB" + ); + } + + #[test] + fn thinking_buf_accepts_data_below_cap() { + let mut state = ClaudeSseState { + thinking_block: Some((String::new(), String::new())), + in_thinking_block: true, + ..Default::default() + }; + let data = r#"{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"step one"}}"#; + parse_tool_delta_event(&mut state, data); + assert_eq!(state.thinking_block.as_ref().unwrap().0, "step one"); + } } diff --git a/crates/zeph-memory/src/compaction_probe.rs b/crates/zeph-memory/src/compaction_probe.rs index b1d9d177a..4f67de4d5 100644 --- a/crates/zeph-memory/src/compaction_probe.rs +++ b/crates/zeph-memory/src/compaction_probe.rs @@ -335,9 +335,9 @@ pub async fn generate_probe_questions( let mut history = String::new(); for msg in &truncated { let role = match msg.role { - Role::User => "user", Role::Assistant => "assistant", Role::System => "system", + Role::User | _ => "user", }; history.push_str(role); history.push_str(": "); diff --git a/crates/zeph-memory/src/store/messages/mod.rs b/crates/zeph-memory/src/store/messages/mod.rs index 511db8575..930f536a1 100644 --- a/crates/zeph-memory/src/store/messages/mod.rs +++ b/crates/zeph-memory/src/store/messages/mod.rs @@ -27,8 +27,8 @@ fn parse_role(s: &str) -> Role { pub fn role_str(role: Role) -> &'static str { match role { Role::System => "system", - Role::User => "user", Role::Assistant => "assistant", + Role::User | _ => "user", } }