diff --git a/src-rust/crates/query/src/lib.rs b/src-rust/crates/query/src/lib.rs index 926f7ba..0dde6ca 100644 --- a/src-rust/crates/query/src/lib.rs +++ b/src-rust/crates/query/src/lib.rs @@ -1316,37 +1316,19 @@ pub async fn run_query_loop( } }).collect(); - // Execute tools if any tool_use blocks were returned. - // Note: we check the blocks themselves rather than relying - // solely on stop_str == "tool_use" because many OpenAI- - // compatible providers (Ollama, LM Studio, etc.) return - // finish_reason "stop" even when tool calls are present. + // Execute tool-use blocks through the same hook-aware path used by + // Anthropic streaming. Some OpenAI-compatible providers return + // finish_reason "stop" even when tool calls are present, so the + // block presence is authoritative here, but execution must still + // enforce PreToolUse/plugin policy and emit post-hook events. if !tool_use_blocks.is_empty() { - let mut tool_results = Vec::new(); - for (tool_id, tool_name, tool_input) in tool_use_blocks { - // Notify TUI that a tool is starting (matches Anthropic path). - if let Some(ref tx) = event_tx { - let _ = tx.send(QueryEvent::ToolStart { - tool_name: tool_name.clone(), - tool_id: tool_id.clone(), - input_json: tool_input.to_string(), - }); - } - let result = execute_tool(&*tool_name, &tool_input, tools, &tool_ctx).await; - if let Some(ref tx) = event_tx { - let _ = tx.send(QueryEvent::ToolEnd { - tool_name: tool_name.clone(), - tool_id: tool_id.clone(), - result: result.content.clone(), - is_error: result.is_error, - }); - } - tool_results.push(ContentBlock::ToolResult { - tool_use_id: tool_id, - content: claurst_core::types::ToolResultContent::Text(result.content), - is_error: Some(result.is_error), - }); - } + let tool_results = execute_tool_blocks_with_hooks( + tool_use_blocks, + tools, + &tool_ctx, + event_tx.as_ref(), + ) + .await; messages.push(Message { role: claurst_core::types::Role::User, content: claurst_core::types::MessageContent::Blocks(tool_results), @@ -1908,157 +1890,23 @@ pub async fn run_query_loop( }; } - // --------------------------------------------------------------------------- - // Streaming tool executor: parallel non-agent tool dispatch. - // - // Phase 1: Run PreToolUse hooks sequentially (they can block/deny execution - // and may display interactive permission dialogs). - // Phase 2: Dispatch all non-blocked tool executions concurrently via - // futures::future::join_all, preserving original order. - // Phase 3: Fire PostToolUse hooks + emit events, then collect results. - // - // This mirrors the TypeScript StreamingToolExecutor pattern. - // --------------------------------------------------------------------------- - - // Intermediate record produced during Phase 1. - struct PreparedTool { - id: String, - name: String, - input: Value, - /// None means the pre-hook blocked execution; the String is the error reason. - blocked_result: Option, - } - - // Phase 1: sequential pre-hook pass. - let mut prepared: Vec = Vec::with_capacity(tool_blocks.len()); - for block in tool_blocks { - if let ContentBlock::ToolUse { id, name, input } = block { - // Clone from the references returned by get_tool_use_blocks() - let id = id.clone(); - let name = name.clone(); - let input = input.clone(); - - if let Some(ref tx) = event_tx { - let _ = tx.send(QueryEvent::ToolStart { - tool_name: name.clone(), - tool_id: id.clone(), - input_json: input.to_string(), - }); - } - - let hooks = &tool_ctx.config.hooks; - let hook_ctx = claurst_core::hooks::HookContext { - event: "PreToolUse".to_string(), - tool_name: Some(name.clone()), - tool_input: Some(input.clone()), - tool_output: None, - is_error: None, - session_id: Some(tool_ctx.session_id.clone()), - }; - let pre_outcome = claurst_core::hooks::run_hooks( - hooks, - claurst_core::config::HookEvent::PreToolUse, - &hook_ctx, - &tool_ctx.working_dir, - ) - .await; - - let plugin_pre_outcome = - claurst_plugins::run_global_pre_tool_hook(&name, &input); - - let blocked_result = - if let claurst_core::hooks::HookOutcome::Blocked(reason) = pre_outcome { - warn!(tool = %name, reason = %reason, "PreToolUse hook blocked execution"); - Some(claurst_tools::ToolResult::error(format!( - "Blocked by hook: {}", - reason - ))) - } else if let claurst_plugins::HookOutcome::Deny(reason) = plugin_pre_outcome { - warn!(tool = %name, reason = %reason, "Plugin PreToolUse hook blocked execution"); - Some(claurst_tools::ToolResult::error(format!( - "Blocked by plugin hook: {}", - reason - ))) - } else { - None - }; - - prepared.push(PreparedTool { - id, - name, - input, - blocked_result, - }); - } - } - - // Phase 2: build execution futures for non-blocked tools and join them. - // Blocked tools yield a ready future with the pre-computed error result. - // Non-blocked tools execute concurrently via join_all. - // Each async block owns its cloned name/input so there are no lifetime issues. - let exec_futures: Vec<_> = prepared - .iter() - .map(|p| { - if p.blocked_result.is_some() { - let r = p.blocked_result.clone().unwrap(); - futures::future::Either::Left(async move { r }) + let tool_invocations: Vec<_> = tool_blocks + .into_iter() + .filter_map(|block| { + if let ContentBlock::ToolUse { id, name, input } = block { + Some((id.clone(), name.clone(), input.clone())) } else { - let name = p.name.clone(); - let input = p.input.clone(); - futures::future::Either::Right(async move { - execute_tool(&name, &input, tools, tool_ctx).await - }) + None } }) .collect(); - - // Run all tool futures concurrently; join_all preserves order. - let exec_results: Vec = - futures::future::join_all(exec_futures).await; - - // Phase 3: post-hooks, event emission, and result block assembly. - let mut result_blocks: Vec = - Vec::with_capacity(prepared.len()); - for (p, result) in prepared.iter().zip(exec_results.into_iter()) { - let hooks = &tool_ctx.config.hooks; - let post_ctx = claurst_core::hooks::HookContext { - event: "PostToolUse".to_string(), - tool_name: Some(p.name.clone()), - tool_input: Some(p.input.clone()), - tool_output: Some(result.content.clone()), - is_error: Some(result.is_error), - session_id: Some(tool_ctx.session_id.clone()), - }; - claurst_core::hooks::run_hooks( - hooks, - claurst_core::config::HookEvent::PostToolUse, - &post_ctx, - &tool_ctx.working_dir, - ) - .await; - - claurst_plugins::run_global_post_tool_hook( - &p.name, - &p.input, - &result.content, - result.is_error, - ); - - if let Some(ref tx) = event_tx { - let _ = tx.send(QueryEvent::ToolEnd { - tool_name: p.name.clone(), - tool_id: p.id.clone(), - result: result.content.clone(), - is_error: result.is_error, - }); - } - - result_blocks.push(ContentBlock::ToolResult { - tool_use_id: p.id.clone(), - content: ToolResultContent::Text(result.content), - is_error: if result.is_error { Some(true) } else { None }, - }); - } + let result_blocks = execute_tool_blocks_with_hooks( + tool_invocations, + tools, + tool_ctx, + event_tx.as_ref(), + ) + .await; // Append tool results as a user message messages.push(Message::user_blocks(result_blocks)); @@ -2107,6 +1955,137 @@ pub async fn run_query_loop( } } +struct PreparedTool { + id: String, + name: String, + input: Value, + blocked_result: Option, +} + +/// Execute tool invocations through the common policy-aware tool path. +async fn execute_tool_blocks_with_hooks( + tool_blocks: Vec<(String, String, Value)>, + tools: &[Box], + tool_ctx: &ToolContext, + event_tx: Option<&mpsc::UnboundedSender>, +) -> Vec { + // Phase 1: Run PreToolUse hooks sequentially so policy denials can block + // execution before any tool side effects occur. + let mut prepared: Vec = Vec::with_capacity(tool_blocks.len()); + for (id, name, input) in tool_blocks { + if let Some(tx) = event_tx { + let _ = tx.send(QueryEvent::ToolStart { + tool_name: name.clone(), + tool_id: id.clone(), + input_json: input.to_string(), + }); + } + + let hook_ctx = claurst_core::hooks::HookContext { + event: "PreToolUse".to_string(), + tool_name: Some(name.clone()), + tool_input: Some(input.clone()), + tool_output: None, + is_error: None, + session_id: Some(tool_ctx.session_id.clone()), + }; + let pre_outcome = claurst_core::hooks::run_hooks( + &tool_ctx.config.hooks, + claurst_core::config::HookEvent::PreToolUse, + &hook_ctx, + &tool_ctx.working_dir, + ) + .await; + + let plugin_pre_outcome = claurst_plugins::run_global_pre_tool_hook(&name, &input); + + let blocked_result = + if let claurst_core::hooks::HookOutcome::Blocked(reason) = pre_outcome { + warn!(tool = %name, reason = %reason, "PreToolUse hook blocked execution"); + Some(claurst_tools::ToolResult::error(format!( + "Blocked by hook: {}", + reason + ))) + } else if let claurst_plugins::HookOutcome::Deny(reason) = plugin_pre_outcome { + warn!(tool = %name, reason = %reason, "Plugin PreToolUse hook blocked execution"); + Some(claurst_tools::ToolResult::error(format!( + "Blocked by plugin hook: {}", + reason + ))) + } else { + None + }; + + prepared.push(PreparedTool { + id, + name, + input, + blocked_result, + }); + } + + // Phase 2: execute non-blocked tools concurrently. + let exec_futures: Vec<_> = prepared + .iter() + .map(|p| { + if let Some(result) = p.blocked_result.clone() { + futures::future::Either::Left(async move { result }) + } else { + let name = p.name.clone(); + let input = p.input.clone(); + futures::future::Either::Right(async move { + execute_tool(&name, &input, tools, tool_ctx).await + }) + } + }) + .collect(); + let exec_results: Vec = futures::future::join_all(exec_futures).await; + + // Phase 3: run post-hooks, emit completion events, and return result blocks. + let mut result_blocks: Vec = Vec::with_capacity(prepared.len()); + for (p, result) in prepared.iter().zip(exec_results.into_iter()) { + let post_ctx = claurst_core::hooks::HookContext { + event: "PostToolUse".to_string(), + tool_name: Some(p.name.clone()), + tool_input: Some(p.input.clone()), + tool_output: Some(result.content.clone()), + is_error: Some(result.is_error), + session_id: Some(tool_ctx.session_id.clone()), + }; + claurst_core::hooks::run_hooks( + &tool_ctx.config.hooks, + claurst_core::config::HookEvent::PostToolUse, + &post_ctx, + &tool_ctx.working_dir, + ) + .await; + + claurst_plugins::run_global_post_tool_hook( + &p.name, + &p.input, + &result.content, + result.is_error, + ); + + if let Some(tx) = event_tx { + let _ = tx.send(QueryEvent::ToolEnd { + tool_name: p.name.clone(), + tool_id: p.id.clone(), + result: result.content.clone(), + is_error: result.is_error, + }); + } + + result_blocks.push(ContentBlock::ToolResult { + tool_use_id: p.id.clone(), + content: ToolResultContent::Text(result.content), + is_error: if result.is_error { Some(true) } else { None }, + }); + } + + result_blocks +} + /// Execute a single tool invocation. async fn execute_tool( name: &str,