Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/zeph-agent-context/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,19 +969,19 @@ pub async fn fetch_semantic_recall(

fn format_plain_recall_entry(item: &zeph_memory::RecalledMessage) -> String {
let role_label = match item.message.role {
Role::User => "user",
Role::Assistant => "assistant",
Role::System => "system",
Role::User | _ => "user",
};
format!("- [{}] {}\n", role_label, item.message.content)
}

#[allow(clippy::map_unwrap_or)]
fn format_structured_recall_entry(item: &zeph_memory::RecalledMessage) -> String {
let source = match item.message.role {
Role::User => "user",
Role::Assistant => "assistant",
Role::System => "system",
Role::User | _ => "user",
};
// Use compacted_at as a proxy for message age when available; otherwise "unknown".
// A full timestamp lookup from SQLite would require an async DB call in the assembler
Expand Down
2 changes: 1 addition & 1 deletion crates/zeph-agent-context/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ fn map_correction(c: zeph_memory::UserCorrectionRow) -> MemCorrection {
fn map_recalled_message(r: zeph_memory::RecalledMessage) -> MemRecalledMessage {
use zeph_llm::provider::Role;
let role = match r.message.role {
Role::User => "user",
Role::Assistant => "assistant",
Role::System => "system",
Role::User | _ => "user",
}
.to_owned();
MemRecalledMessage {
Expand Down
4 changes: 2 additions & 2 deletions crates/zeph-agent-context/src/summarization/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,9 @@ fn spawn_task_goal_extraction(
let mut context_text = String::new();
for (role, content) in &recent {
let role_str = match role {
Role::User => "user",
Role::Assistant => "assistant",
Role::System => "system",
Role::User | _ => "user",
};
let preview = if content.len() > 300 {
let end = content.floor_char_boundary(300);
Expand Down Expand Up @@ -460,9 +460,9 @@ fn spawn_subgoal_extraction(
let mut context_text = String::new();
for (role, content) in &recent {
let role_str = match role {
Role::User => "user",
Role::Assistant => "assistant",
Role::System => "system",
Role::User | _ => "user",
};
let preview = if content.len() > 300 {
let end = content.floor_char_boundary(300);
Expand Down
6 changes: 3 additions & 3 deletions crates/zeph-context/src/fidelity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ fn role_weight(role: Role) -> f32 {
match role {
Role::System => 1.0,
Role::User => 0.8,
Role::Assistant => 0.6,
Role::Assistant | _ => 0.6,
}
}

Expand Down Expand Up @@ -785,8 +785,8 @@ fn truncate_to_tokens(content: &mut String, max_tokens: usize, tc: &dyn TokenCou
fn render_placeholder(msg: &mut Message, score: f32, original_tokens: u32) {
let role_str = match msg.role {
Role::System => "system",
Role::User => "user",
Role::Assistant => "assistant",
Role::User | _ => "user",
};
msg.content = format!(
"[placeholder: role={role_str}, original_tokens={original_tokens}, importance={score:.2}]"
Expand Down Expand Up @@ -836,8 +836,8 @@ fn merge_consecutive_placeholders(messages: &mut Vec<Message>) -> usize {
};
let role_str = match role {
Role::System => "system",
Role::User => "user",
Role::Assistant => "assistant",
Role::User | _ => "user",
};
let merged_content = format!(
"[placeholder: {count} messages, role={role_str}, total_tokens={total_tokens}, avg_importance={avg_importance:.2}]"
Expand Down
3 changes: 2 additions & 1 deletion crates/zeph-context/src/summarization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ pub fn build_metadata_summary(messages: &[Message], truncate: fn(&str, usize) ->
}
}
Role::System => system_count += 1,
_ => {}
}
}

Expand Down Expand Up @@ -649,9 +650,9 @@ fn format_history(messages: &[Message]) -> String {
history_text.push_str("\n\n");
}
let role = match m.role {
Role::User => "user",
Role::Assistant => "assistant",
Role::System => "system",
Role::User | _ => "user",
};
let _ = write!(history_text, "[{role}]: {}", m.content);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/zeph-core/src/agent/magic_docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ mod tests {
}
}
}
Role::System => {}
_ => {}
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/zeph-core/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,9 +610,9 @@ impl<C: Channel> Agent<C> {
.iter()
.map(|m| {
let role = match m.role {
Role::User => "user".to_owned(),
Role::Assistant => "assistant".to_owned(),
Role::System => "system".to_owned(),
Role::User | _ => "user".to_owned(),
};
(zeph_memory::MessageId(0), role, m.content.clone())
})
Expand Down
2 changes: 1 addition & 1 deletion crates/zeph-core/src/agent/session_digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ fn format_and_sanitize_conversation(
let mut result = String::new();
for msg in messages {
let role = match msg.role {
Role::User => "User",
Role::Assistant => "Assistant",
Role::System => "System",
Role::User | _ => "User",
};
// Redact credentials first, then sanitize for injection patterns.
let redacted: Cow<'_, str> = scrub_content(&msg.content);
Expand Down
5 changes: 2 additions & 3 deletions crates/zeph-core/src/agent/speculative/stream_drainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ impl SpeculativeStreamDrainer {
ToolSseEvent::ThinkingBlockDone(block) => {
thinking_blocks.push(block);
}
ToolSseEvent::ThinkingChunk(_) => {
// Pass-through; full block assembled via ThinkingBlockDone.
}
ToolSseEvent::ContentChunk(text) => {
text_buf.push_str(&text);
}
Expand All @@ -140,6 +137,8 @@ impl SpeculativeStreamDrainer {
ToolSseEvent::Error(e) => {
return Err(e);
}
// ThinkingChunk pass-through; full block assembled via ThinkingBlockDone.
ToolSseEvent::ThinkingChunk(_) | _ => {}
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/zeph-core/src/agent/tool_execution/focus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,9 @@ fn build_compression_prompt(
to_compress: &[zeph_llm::provider::Message],
) -> Vec<zeph_llm::provider::Message> {
let role_label = |role: &zeph_llm::provider::Role| match role {
zeph_llm::provider::Role::User => "user",
zeph_llm::provider::Role::Assistant => "assistant",
zeph_llm::provider::Role::System => "system",
zeph_llm::provider::Role::User | _ => "user",
};
let bullet_list: String = to_compress
.iter()
Expand Down
2 changes: 1 addition & 1 deletion crates/zeph-core/src/debug_dump/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ fn messages_to_api_value(messages: &[Message]) -> serde_json::Value {
let role = match m.role {
Role::User => "user",
Role::Assistant => "assistant",
Role::System => return None,
Role::System | _ => return None,
};
let is_assistant = m.role == Role::Assistant;
let has_structured = m.parts.iter().any(|p| {
Expand Down
2 changes: 1 addition & 1 deletion crates/zeph-core/src/memory_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ impl ToolExecutor for MemoryToolExecutor {
let _ = writeln!(output, "## Recalled Messages ({} results)", recalled.len());
for r in &recalled {
let role = match r.message.role {
zeph_llm::provider::Role::User => "user",
zeph_llm::provider::Role::Assistant => "assistant",
zeph_llm::provider::Role::System => "system",
zeph_llm::provider::Role::User | _ => "user",
};
let content = r.message.content.trim();
let _ = writeln!(output, "[score: {:.2}] {role}: {content}", r.score);
Expand Down
1 change: 1 addition & 0 deletions crates/zeph-llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ parking_lot.workspace = true
rand.workspace = true
rand_distr.workspace = true
reqwest = { workspace = true, features = ["json", "multipart", "rustls", "stream"] }
reqwest012 = { package = "reqwest", version = "0.12", default-features = false, features = ["rustls-tls"] }
ripemd = { workspace = true, optional = true }
rubato = { workspace = true, optional = true }
schemars = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/zeph-llm/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ macro_rules! delegate_provider {
/// [`Arc<dyn LlmProviderDyn>`](crate::provider_dyn::LlmProviderDyn).
///
/// The `Candle` variant is only available when the `candle` feature is enabled.
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum AnyProvider {
Ollama(OllamaProvider),
Expand Down
1 change: 1 addition & 0 deletions crates/zeph-llm/src/candle_provider/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use candle_core::quantized::gguf_file;
use candle_transformers::models::quantized_llama::ModelWeights;
use tokenizers::Tokenizer;

#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum ModelSource {
Local {
Expand Down
1 change: 1 addition & 0 deletions crates/zeph-llm/src/candle_provider/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::provider::{Message, Role};

#[non_exhaustive]
#[derive(Debug, Clone, Copy)]
pub enum ChatTemplate {
Llama3,
Expand Down
21 changes: 20 additions & 1 deletion crates/zeph-llm/src/ollama.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ use crate::provider::{
};
use crate::usage::UsageTracker;

/// Build a reqwest 0.12 HTTP client (the version used by ollama-rs) with a 600-second hard
/// backstop timeout and a 30-second connect timeout.
fn ollama_reqwest_client() -> reqwest012::Client {
reqwest012::Client::builder()
.connect_timeout(std::time::Duration::from_secs(30))
.timeout(std::time::Duration::from_mins(10))
.user_agent(concat!("zeph/", env!("CARGO_PKG_VERSION")))
.build()
.expect("Ollama HTTP client construction must not fail")
}

/// Metadata returned by `/api/show` for the configured chat model.
#[derive(Debug)]
pub struct ModelInfo {
Expand Down Expand Up @@ -114,7 +125,7 @@ impl OllamaProvider {
pub fn new(base_url: &str, model: String, embedding_model: String) -> Self {
let (host, port) = parse_host_port(base_url);
Self {
client: Ollama::new(host, port),
client: Ollama::new_with_client(host, port, ollama_reqwest_client()),
model,
embedding_model,
context_window_size: None,
Expand Down Expand Up @@ -647,6 +658,14 @@ mod tests {
std::env::var("OLLAMA_EMBED_MODEL").unwrap_or_else(|_| "qwen3-embedding".into())
}

// --- #4729: Ollama HTTP client smoke test ---

#[test]
fn ollama_reqwest_client_builds_without_panicking() {
// Verify that constructing the timeout-configured reqwest client does not panic.
let _client = ollama_reqwest_client();
}

#[test]
fn context_length_error_keywords_are_detected() {
// Verify the helper used at each chat/stream/tools call site works for Ollama error strings.
Expand Down
1 change: 1 addition & 0 deletions crates/zeph-llm/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ pub struct GenerationOverrides {
/// - `System` — global instructions prepended before the conversation
/// - `User` — human turn input
/// - `Assistant` — previous model output (used for multi-turn context)
#[non_exhaustive]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Role {
Expand Down
81 changes: 79 additions & 2 deletions crates/zeph-llm/src/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::provider::{ChatStream, StreamChunk, ThinkingBlock, ToolUseRequest};
/// Never exposed as a `StreamChunk` downstream — the drainer consumes these and
/// surfaces only `StreamChunk` variants, keeping `StreamChunk` exhaustiveness stable
/// (critic M3). `pub` so `zeph-core` can use the drainer without feature-gating.
#[non_exhaustive]
#[derive(Debug)]
pub enum ToolSseEvent {
/// Tool block opened: id and name are now known, before any `InputJsonDelta` for this index.
Expand Down Expand Up @@ -320,7 +321,12 @@ fn parse_tool_delta_event(state: &mut ClaudeSseState, data: &str) -> Vec<ToolSse
}
"input_json_delta" if !delta.partial_json.is_empty() => {
if let Some((_, _, _, ref mut json)) = state.tool_block {
json.push_str(&delta.partial_json);
const MAX_TOOL_JSON_BUF: usize = 4 * 1024 * 1024;
if json.len() >= MAX_TOOL_JSON_BUF {
tracing::warn!("tool JSON buffer exceeded 4 MiB cap; discarding excess");
} else {
json.push_str(&delta.partial_json);
}
}
let index = state.current_block_index;
vec![ToolSseEvent::InputJsonDelta {
Expand All @@ -330,7 +336,12 @@ fn parse_tool_delta_event(state: &mut ClaudeSseState, data: &str) -> Vec<ToolSse
}
"thinking_delta" if !delta.thinking.is_empty() => {
if let Some((ref mut t, _)) = state.thinking_block {
t.push_str(&delta.thinking);
const MAX_THINKING_BUF: usize = 1024 * 1024;
if t.len() >= MAX_THINKING_BUF {
tracing::warn!("thinking buffer exceeded 1 MiB cap; discarding excess");
} else {
t.push_str(&delta.thinking);
}
}
vec![ToolSseEvent::ThinkingChunk(delta.thinking)]
}
Expand Down Expand Up @@ -995,4 +1006,70 @@ mod tests {
let chunk = result.unwrap().unwrap();
assert!(matches!(chunk, StreamChunk::Content(s) if s == "Hello world"));
}

// --- #4727: SSE buffer cap tests ---

#[test]
fn tool_json_buf_capped_at_4mib() {
let mut state = ClaudeSseState {
tool_block: Some((
0,
"toolu_01".into(),
"bash".into(),
"x".repeat(4 * 1024 * 1024),
)),
current_block_index: 0,
..Default::default()
};
let data = r#"{"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"OVERFLOW"}}"#;
parse_tool_delta_event(&mut state, data);
let buf_len = state.tool_block.as_ref().unwrap().3.len();
assert_eq!(
buf_len,
4 * 1024 * 1024,
"tool JSON buffer must not grow beyond 4 MiB"
);
}

#[test]
fn tool_json_buf_accepts_data_below_cap() {
let mut state = ClaudeSseState {
tool_block: Some((0, "toolu_01".into(), "bash".into(), String::new())),
current_block_index: 0,
..Default::default()
};
let data = r#"{"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"{\"k\":1}"}}"#;
parse_tool_delta_event(&mut state, data);
let buf = &state.tool_block.as_ref().unwrap().3;
assert_eq!(buf, r#"{"k":1}"#);
}

#[test]
fn thinking_buf_capped_at_1mib() {
let mut state = ClaudeSseState {
thinking_block: Some(("t".repeat(1024 * 1024), String::new())),
in_thinking_block: true,
..Default::default()
};
let data = r#"{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"OVERFLOW"}}"#;
parse_tool_delta_event(&mut state, data);
let thinking_len = state.thinking_block.as_ref().unwrap().0.len();
assert_eq!(
thinking_len,
1024 * 1024,
"thinking buffer must not grow beyond 1 MiB"
);
}

#[test]
fn thinking_buf_accepts_data_below_cap() {
let mut state = ClaudeSseState {
thinking_block: Some((String::new(), String::new())),
in_thinking_block: true,
..Default::default()
};
let data = r#"{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"step one"}}"#;
parse_tool_delta_event(&mut state, data);
assert_eq!(state.thinking_block.as_ref().unwrap().0, "step one");
}
}
2 changes: 1 addition & 1 deletion crates/zeph-memory/src/compaction_probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ pub async fn generate_probe_questions(
let mut history = String::new();
for msg in &truncated {
let role = match msg.role {
Role::User => "user",
Role::Assistant => "assistant",
Role::System => "system",
Role::User | _ => "user",
};
history.push_str(role);
history.push_str(": ");
Expand Down
Loading
Loading