Stabilize OpenClaw tool loops through MoA#808
Conversation
There was a problem hiding this comment.
Pull request overview
This PR hardens the MoA (Mixture-of-Agents) gateway for OpenClaw-style agent/tool loops by bounding context, improving worker selection (context + latency), normalizing incoming tool schemas into OpenAI function-tool form, and adding resilience paths when reducer tool grammar fails.
Changes:
- Add tool-schema normalization in
Session::ingestand introduce direct-tool short-circuiting when a single required tool/arguments can be inferred safely. - Bound worker/reducer context sizes with compaction and tighter history windows; add an answer-only tool-result fallback packing path.
- Improve mesh worker backend selection (context/latency ranking) and add a hedged remote backend that retries across multiple peer candidates.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/mesh-mixture-of-agents/tests/sim_tool_result_routes_to_reducer.rs | Adds coverage for tool-result reducer failure falling back to answer-only retry. |
| crates/mesh-mixture-of-agents/src/session.rs | Normalizes flat tool schemas into OpenAI type:function tool format during ingest. |
| crates/mesh-mixture-of-agents/src/lib.rs | Adds DirectTool turn kind, inferred required tool selection, improved tool arg inference, and tool-result reducer retry without native tools. |
| crates/mesh-mixture-of-agents/src/context.rs | Bounds/compacts context windows and adds answer-only tool-result context packing. |
| crates/mesh-llm-host-runtime/src/network/openai/moa_gateway/remote_backend.rs | Introduces a multi-peer hedged remote backend over QUIC HTTP tunnels. |
| crates/mesh-llm-host-runtime/src/network/openai/moa_gateway/mod.rs | Integrates context budgeting, ranked worker selection, remote backend hedging, and updated MoA timeouts/grace. |
| crates/mesh-llm-host-runtime/src/network/openai/moa_gateway/context_selection.rs | Expands remote selection to return ordered host lists and adds context/latency helpers. |
| crates/mesh-llm-host-runtime/src/network/openai/moa_gateway/context_budget.rs | Adds MoA-specific required-token estimation + reserves for chat/tools/tool-results. |
| use crate::mesh; | ||
| use mesh_mixture_of_agents as moa; | ||
|
|
||
| type RemoteJoinSet = tokio::task::JoinSet<(iroh::EndpointId, Result<serde_json::Value, String>)>; |
| let marker = format!( | ||
| "\n\n[MoA compacted this message from {} chars. Middle content was omitted. \ | ||
| The text after this notice is the preserved ending of the original message.]\n\n", | ||
| text.len() | ||
| ); |
| pub(in crate::network::openai) async fn best_remote_latency_ms( | ||
| node: &mesh::Node, | ||
| hosts: &[iroh::EndpointId], | ||
| ) -> Option<u32> { | ||
| let latencies = remote_latency_map(node).await; | ||
| hosts | ||
| .iter() | ||
| .filter_map(|host| latencies.get(host).copied()) | ||
| .min() | ||
| } |
| const MOA_LOCAL_INFLIGHT_SOFT_LIMIT: u64 = 3; | ||
| const MOA_MAX_WORKERS: usize = 3; | ||
| const MOA_REMOTE_PEER_HEDGE_DELAY: std::time::Duration = std::time::Duration::from_secs(3); | ||
| const MOA_WORKER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25); | ||
| const MOA_REDUCER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25); | ||
| const MOA_FIRST_ANSWER_GRACE: std::time::Duration = std::time::Duration::from_secs(2); |
|
🤖 Additional validation after commit 86037c8: Fixed the remaining OpenClaw Telegram-shaped failure. The failed path was not Telegram transport: MoA was letting Minimax XML tool syntax leak as assistant text after repeated web_search results. The repeated-tool guard disabled all tools, so a legitimate web_fetch proposal was flattened into text and OpenClaw marked replayInvalid=true. Changes added:
Validation:
Mini deploy/proof:
|
| let marker = format!( | ||
| "\n\n[MoA compacted this message from {} chars. Middle content was omitted. \ | ||
| The text after this notice is the preserved ending of the original message.]\n\n", | ||
| text.len() | ||
| ); |
| pub(in crate::network::openai) async fn select_remote_hosts( | ||
| node: &mesh::Node, | ||
| model: &str, | ||
| required_tokens: Option<u32>, | ||
| hosts: Vec<iroh::EndpointId>, | ||
| ) -> Option<iroh::EndpointId> { | ||
| let Some(required_tokens) = required_tokens else { | ||
| return hosts.into_iter().next(); | ||
| }; | ||
|
|
||
| let mut unknown = None; | ||
| ) -> Vec<iroh::EndpointId> { | ||
| let latencies = remote_latency_map(node).await; | ||
| let mut adequate = Vec::new(); | ||
| let mut unknown = Vec::new(); |
| pub(in crate::network::openai) async fn best_remote_latency_ms( | ||
| node: &mesh::Node, | ||
| hosts: &[iroh::EndpointId], | ||
| ) -> Option<u32> { | ||
| let latencies = remote_latency_map(node).await; | ||
| hosts | ||
| .iter() | ||
| .filter_map(|host| latencies.get(host).copied()) | ||
| .min() | ||
| } |
| use crate::mesh; | ||
| use mesh_mixture_of_agents as moa; | ||
|
|
| '"' | '\'' | '`' | '<' | '>' | '(' | ')' | '[' | ']' | '{' | '}' | ',' | ';' | ':' | ||
| ) | ||
| }) | ||
| .trim_end_matches(['.', ',', ';', ':', ')', ']', '}']) | ||
| .to_string() |
|
🤖 Update after live Telegram/OpenClaw rerun:
Validation:
|
| fn compact_chat_message(msg: &Value, max_chars: usize) -> Value { | ||
| let Some(content) = msg.get("content").and_then(Value::as_str) else { | ||
| return msg.clone(); | ||
| }; | ||
| let compacted = compact_text_for_context(content, max_chars); | ||
| if compacted == content { | ||
| return msg.clone(); | ||
| } | ||
| let mut compact = msg.clone(); | ||
| if let Some(obj) = compact.as_object_mut() { | ||
| obj.insert("content".to_string(), Value::String(compacted)); | ||
| } | ||
| compact | ||
| } |
| fn xml_unescape(value: &str) -> String { | ||
| value | ||
| .replace(""", "\"") | ||
| .replace(""", "\"") | ||
| .replace("'", "'") | ||
| .replace("'", "'") | ||
| .replace("<", "<") | ||
| .replace(">", ">") | ||
| .replace("&", "&") | ||
| } |
ndizazzo
left a comment
There was a problem hiding this comment.
Generally this looks like the right direction... but a thing I noticed was that some of the late repair logic is pretty GitHub/web-specific for generic MoA. Hard-coded web_search / web_fetch policy and GitHub answer grounding feel like the one-off tail of for this structural change.
Overall quality looks good and well checked, but I’d prefer front-loading our work so that we make some kind of "tool family" or "provider policy" course correction system that we can extend with other specific fixes - I'm sure as we use Hermes and other agents we'll find lots of this stuff and need to have a cohesive place for it.
|
Yes still working on this to make it not harness specific. That was a mistake. Should be generic |
…gram-timeouts * origin/main: Reuse Skippy decode wire messages Reduce Skippy decode hot-path overhead
| let recent = session.recent_messages(SPECIALIST_CONTEXT_WINDOW); | ||
| let user_text = session.last_user_text(); | ||
| let mut has_last_user = false; | ||
| for msg in &recent { | ||
| let role = msg.get("role").and_then(|r| r.as_str()).unwrap_or(""); | ||
| if role == "user" || (role == "assistant" && msg.get("tool_calls").is_none()) { | ||
| messages.push(msg.clone()); | ||
| has_last_user |= role == "user" | ||
| && msg.get("content").and_then(Value::as_str) == Some(user_text.as_str()); | ||
| messages.push(compact_chat_message( | ||
| msg, | ||
| SPECIALIST_MESSAGE_CONTEXT_MAX_CHARS, | ||
| )); | ||
| } | ||
| } | ||
|
|
||
| // Ensure the last message is the current user turn | ||
| let user_text = session.last_user_text(); | ||
| if messages | ||
| .last() | ||
| .and_then(|m| m.get("content").and_then(|c| c.as_str())) | ||
| != Some(&user_text) | ||
| { | ||
| messages.push(json!({"role": "user", "content": user_text})); | ||
| if !has_last_user { | ||
| messages.push(json!({ | ||
| "role": "user", | ||
| "content": compact_text_for_context( | ||
| &user_text, | ||
| SPECIALIST_MESSAGE_CONTEXT_MAX_CHARS, | ||
| ), | ||
| })); |
| let recent = session.recent_messages(STRONG_CONTEXT_WINDOW); | ||
| let user_text = session.last_user_text(); | ||
| let mut has_last_user = false; | ||
| for msg in &recent { | ||
| let role = msg.get("role").and_then(|r| r.as_str()).unwrap_or(""); | ||
| if role != "system" && !role.is_empty() { | ||
| messages.push(msg.clone()); | ||
| has_last_user |= role == "user" | ||
| && msg.get("content").and_then(Value::as_str) == Some(user_text.as_str()); | ||
| messages.push(compact_chat_message(msg, STRONG_MESSAGE_CONTEXT_MAX_CHARS)); | ||
| } | ||
| } | ||
|
|
||
| let user_text = session.last_user_text(); | ||
| if messages | ||
| .last() | ||
| .and_then(|m| m.get("content").and_then(|c| c.as_str())) | ||
| != Some(&user_text) | ||
| { | ||
| messages.push(json!({"role": "user", "content": user_text})); | ||
| if !has_last_user { | ||
| messages.push(json!({ | ||
| "role": "user", | ||
| "content": compact_text_for_context(&user_text, STRONG_MESSAGE_CONTEXT_MAX_CHARS), | ||
| })); | ||
| } |
| while let Some(parameter_start) = cursor.find("<parameter") { | ||
| cursor = &cursor[parameter_start..]; | ||
| let tag_end = cursor.find('>')?; | ||
| let tag = &cursor[..=tag_end]; | ||
| let name = extract_xml_attr(tag, "name")?; | ||
| let value_start = tag_end + 1; | ||
| let value_tail = &cursor[value_start..]; | ||
| let value_end = value_tail.find("</parameter>")?; | ||
| let value = xml_unescape(value_tail[..value_end].trim()); | ||
| arguments.insert(name, Value::String(value)); | ||
| cursor = &value_tail[value_end + "</parameter>".len()..]; | ||
| } |
| _ => { | ||
| let repaired = repair_tool_result_answer(session, &reduced.payload); | ||
| chat_or_schema_command_tool_response( | ||
| &repaired, | ||
| session.tools(), | ||
| response_allowed_tools, | ||
| response_prompt_profiles, | ||
| Some(&session.last_user_text()), | ||
| ) | ||
| } |
| fn prompt_tool_catalog_source(session: &Session) -> Option<String> { | ||
| if let Some(system) = session | ||
| .system_prompt() | ||
| .filter(|prompt| !prompt.trim().is_empty()) | ||
| { | ||
| return Some(system); | ||
| } | ||
|
|
||
| let parts: Vec<String> = session | ||
| .messages() | ||
| .iter() | ||
| .filter_map(message_text_content) | ||
| .collect(); | ||
| (!parts.is_empty()).then(|| parts.join("\n")) | ||
| } |
| fn xml_attr_value(tag: &str, attr: &str) -> Option<String> { | ||
| let needle = format!("{attr}=\""); | ||
| let value_start = tag.find(&needle)? + needle.len(); | ||
| let value_end = tag[value_start..].find('"')? + value_start; | ||
| Some(xml_text_unescape(&tag[value_start..value_end])) | ||
| } |
| fn xml_text_unescape(text: &str) -> String { | ||
| text.replace(""", "\"") | ||
| .replace("'", "'") | ||
| .replace("'", "'") | ||
| .replace("<", "<") | ||
| .replace(">", ">") | ||
| .replace("&", "&") | ||
| } |
| fn body_has_tool_result(body: &serde_json::Value) -> bool { | ||
| body.get("messages") | ||
| .and_then(serde_json::Value::as_array) | ||
| .map(|messages| { | ||
| messages.iter().any(|message| { | ||
| message.get("role").and_then(serde_json::Value::as_str) == Some("tool") | ||
| }) | ||
| }) | ||
| .unwrap_or(false) |
| for msg in &recent { | ||
| let role = msg.get("role").and_then(|r| r.as_str()).unwrap_or(""); | ||
| if role == "user" || (role == "assistant" && msg.get("tool_calls").is_none()) { | ||
| messages.push(msg.clone()); | ||
| has_last_user |= role == "user" | ||
| && msg.get("content").and_then(Value::as_str) == Some(user_text.as_str()); | ||
| messages.push(compact_chat_message( | ||
| msg, | ||
| SPECIALIST_MESSAGE_CONTEXT_MAX_CHARS, | ||
| )); |
| let recent = session.recent_messages(STRONG_CONTEXT_WINDOW); | ||
| let user_text = session.last_user_text(); | ||
| let mut has_last_user = false; | ||
| for msg in &recent { | ||
| let role = msg.get("role").and_then(|r| r.as_str()).unwrap_or(""); | ||
| if role != "system" && !role.is_empty() { | ||
| messages.push(msg.clone()); | ||
| has_last_user |= role == "user" | ||
| && msg.get("content").and_then(Value::as_str) == Some(user_text.as_str()); | ||
| messages.push(compact_chat_message(msg, STRONG_MESSAGE_CONTEXT_MAX_CHARS)); | ||
| } |
| pub(in crate::network::openai) async fn best_remote_latency_ms( | ||
| node: &mesh::Node, | ||
| hosts: &[iroh::EndpointId], | ||
| ) -> Option<u32> { | ||
| let latencies = remote_latency_map(node).await; | ||
| hosts | ||
| .iter() | ||
| .filter_map(|host| latencies.get(host).copied()) | ||
| .min() | ||
| } | ||
|
|
||
| async fn remote_latency_map(node: &mesh::Node) -> HashMap<iroh::EndpointId, u32> { | ||
| node.peers() | ||
| .await | ||
| .into_iter() | ||
| .filter_map(|peer| { | ||
| let latency = peer.display_latency().latency_ms?; | ||
| Some((peer.id, latency)) | ||
| }) | ||
| .collect() |
| if should_skip_local_for_inflight(name, resolution.node.inflight_requests()) { | ||
| return false; | ||
| } |
| use crate::mesh; | ||
| use mesh_mixture_of_agents as moa; | ||
|
|
||
| type RemoteJoinSet = tokio::task::JoinSet<(iroh::EndpointId, Result<serde_json::Value, String>)>; |
| let mut user = String::new(); | ||
| let last_user = session.last_user_text(); | ||
| if !last_user.trim().is_empty() { | ||
| user.push_str("User request:\n"); | ||
| user.push_str(&compact_text_for_context( | ||
| &last_user, | ||
| REDUCER_USER_CONTEXT_MAX_CHARS / 2, | ||
| )); | ||
| user.push_str("\n\n"); | ||
| } |
| adequate.sort_by_key(|candidate| (Reverse(candidate.1), candidate.2.unwrap_or(u32::MAX))); | ||
| if !adequate.is_empty() { | ||
| return adequate | ||
| .into_iter() | ||
| .map(|(host, _, _)| host) | ||
| .collect::<Vec<_>>(); | ||
| } | ||
|
|
||
| unknown.sort_by_key(|candidate| candidate.1.unwrap_or(u32::MAX)); | ||
| unknown | ||
| .into_iter() | ||
| .map(|(host, _)| host) | ||
| .collect::<Vec<_>>() | ||
| } |
| let mut user = String::new(); | ||
| let last_user = session.last_user_text(); | ||
| if !last_user.trim().is_empty() { | ||
| user.push_str("User request:\n"); | ||
| user.push_str(&compact_text_for_context( | ||
| &last_user, | ||
| REDUCER_USER_CONTEXT_MAX_CHARS / 2, | ||
| )); | ||
| user.push_str("\n\n"); | ||
| } |
| if let Some(parameters) = tool | ||
| .get("parameters") | ||
| .or_else(|| tool.get("input_schema")) | ||
| .cloned() | ||
| { | ||
| function.insert("parameters".to_string(), parameters); | ||
| } |
| pub(in crate::network::openai) async fn select_remote_hosts( | ||
| node: &mesh::Node, | ||
| model: &str, | ||
| required_tokens: Option<u32>, | ||
| hosts: Vec<iroh::EndpointId>, | ||
| ) -> Option<iroh::EndpointId> { | ||
| let Some(required_tokens) = required_tokens else { | ||
| return hosts.into_iter().next(); | ||
| }; | ||
|
|
||
| let mut unknown = None; | ||
| ) -> Vec<iroh::EndpointId> { | ||
| let latencies = remote_latency_map(node).await; | ||
| let mut adequate = Vec::new(); |
| ); | ||
|
|
||
| let mut user = String::new(); | ||
| let last_user = session.last_user_text(); |
| if role == "user" || (role == "assistant" && msg.get("tool_calls").is_none()) { | ||
| messages.push(msg.clone()); | ||
| has_last_user |= role == "user" | ||
| && msg.get("content").and_then(Value::as_str) == Some(user_text.as_str()); | ||
| messages.push(compact_chat_message( | ||
| msg, | ||
| SPECIALIST_MESSAGE_CONTEXT_MAX_CHARS, | ||
| )); |
| let role = msg.get("role").and_then(|r| r.as_str()).unwrap_or(""); | ||
| if role != "system" && !role.is_empty() { | ||
| messages.push(msg.clone()); | ||
| has_last_user |= role == "user" | ||
| && msg.get("content").and_then(Value::as_str) == Some(user_text.as_str()); | ||
| messages.push(compact_chat_message(msg, STRONG_MESSAGE_CONTEXT_MAX_CHARS)); | ||
| } |
| fn compact_chat_message(msg: &Value, max_chars: usize) -> Value { | ||
| let Some(content) = msg.get("content").and_then(Value::as_str) else { | ||
| return msg.clone(); | ||
| }; | ||
| let compacted = compact_text_for_context(content, max_chars); | ||
| if compacted == content { | ||
| return msg.clone(); | ||
| } | ||
| let mut compact = msg.clone(); | ||
| if let Some(obj) = compact.as_object_mut() { | ||
| obj.insert("content".to_string(), Value::String(compacted)); | ||
| } | ||
| compact | ||
| } |
| if let Some(parameters) = tool | ||
| .get("parameters") | ||
| .or_else(|| tool.get("input_schema")) | ||
| .cloned() | ||
| { | ||
| function.insert("parameters".to_string(), parameters); | ||
| } |
| pub(in crate::network::openai) async fn best_remote_latency_ms( | ||
| node: &mesh::Node, | ||
| hosts: &[iroh::EndpointId], | ||
| ) -> Option<u32> { | ||
| let latencies = remote_latency_map(node).await; | ||
| hosts | ||
| .iter() | ||
| .filter_map(|host| latencies.get(host).copied()) | ||
| .min() | ||
| } |
| use crate::mesh; | ||
| use mesh_mixture_of_agents as moa; | ||
|
|
||
| type RemoteJoinSet = tokio::task::JoinSet<(iroh::EndpointId, Result<serde_json::Value, String>)>; |
| pub(in crate::network::openai) async fn best_remote_latency_ms( | ||
| node: &mesh::Node, | ||
| hosts: &[iroh::EndpointId], | ||
| ) -> Option<u32> { | ||
| let latencies = remote_latency_map(node).await; | ||
| hosts | ||
| .iter() | ||
| .filter_map(|host| latencies.get(host).copied()) | ||
| .min() | ||
| } |
| if role == "user" || (role == "assistant" && msg.get("tool_calls").is_none()) { | ||
| messages.push(msg.clone()); | ||
| has_last_user |= role == "user" | ||
| && msg.get("content").and_then(Value::as_str) == Some(user_text.as_str()); | ||
| messages.push(compact_chat_message( |
| let role = msg.get("role").and_then(|r| r.as_str()).unwrap_or(""); | ||
| if role != "system" && !role.is_empty() { | ||
| messages.push(msg.clone()); | ||
| has_last_user |= role == "user" | ||
| && msg.get("content").and_then(Value::as_str) == Some(user_text.as_str()); | ||
| messages.push(compact_chat_message(msg, STRONG_MESSAGE_CONTEXT_MAX_CHARS)); |
| let mut user = String::new(); | ||
| let last_user = session.last_user_text(); | ||
| if !last_user.trim().is_empty() { |
| pub(in crate::network::openai) async fn best_remote_latency_ms( | ||
| node: &mesh::Node, | ||
| hosts: &[iroh::EndpointId], | ||
| ) -> Option<u32> { | ||
| let latencies = remote_latency_map(node).await; | ||
| hosts |
OpenClaw/Gateway users can now route Telegram-style agent turns through
model=meshwithout small-context workers or slow peers turning ordinary tool loops into timeouts.What changed
read/search-style requests.Validation
cargo fmt --all -- --checkcargo test -p mesh-mixture-of-agentscargo test -p mesh-llm-host-runtime --lib moa_gatewayjust buildcargo clippy -p mesh-mixture-of-agents --all-targets -- -D warningscargo clippy -p mesh-llm --all-targets -- -D warningsLab proof on mini
9c882b66a51409cd658334fbf8994d32472692adto mini as~/mesh-llm-mainand~/mesh-llm-fastjoin; OpenClaw Gateway remained running./v1/modelsexposedmeshwithcontext_length=131072and localunsloth/Qwen3.5-9B-GGUF:Q4_K_Mready.agent:main:mainturns timing out withFailoverError: LLM request timed out.agent:main:mainsession:SENTINEL_OPENCLAW_TELEGRAM_MAIN_OK_606in 5.65s.agent:main:mainsession: nativereadtool call executed and returnedSENTINEL_FILE_VALUE_DIRECT_TOOL_606; final answerSENTINEL_OPENCLAW_TELEGRAM_DIRECT_TOOL_OK_606 SENTINEL_FILE_VALUE_DIRECT_TOOL_606in 14.70s.16:15:13.587, tool result at16:15:13.610, final answer at16:15:25.553.Notes
--deliver; this exercised OpenClaw Gateway with--channel telegram --reply-channel telegram --reply-to telegram:8454832168against the real Telegram session without sending a test message back to Telegram.