Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 156 additions & 177 deletions src-rust/crates/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1316,37 +1316,19 @@ pub async fn run_query_loop(
}
}).collect();

// Execute tools if any tool_use blocks were returned.
// Note: we check the blocks themselves rather than relying
// solely on stop_str == "tool_use" because many OpenAI-
// compatible providers (Ollama, LM Studio, etc.) return
// finish_reason "stop" even when tool calls are present.
// Execute tool-use blocks through the same hook-aware path used by
// Anthropic streaming. Some OpenAI-compatible providers return
// finish_reason "stop" even when tool calls are present, so the
// block presence is authoritative here, but execution must still
// enforce PreToolUse/plugin policy and emit post-hook events.
if !tool_use_blocks.is_empty() {
let mut tool_results = Vec::new();
for (tool_id, tool_name, tool_input) in tool_use_blocks {
// Notify TUI that a tool is starting (matches Anthropic path).
if let Some(ref tx) = event_tx {
let _ = tx.send(QueryEvent::ToolStart {
tool_name: tool_name.clone(),
tool_id: tool_id.clone(),
input_json: tool_input.to_string(),
});
}
let result = execute_tool(&*tool_name, &tool_input, tools, &tool_ctx).await;
if let Some(ref tx) = event_tx {
let _ = tx.send(QueryEvent::ToolEnd {
tool_name: tool_name.clone(),
tool_id: tool_id.clone(),
result: result.content.clone(),
is_error: result.is_error,
});
}
tool_results.push(ContentBlock::ToolResult {
tool_use_id: tool_id,
content: claurst_core::types::ToolResultContent::Text(result.content),
is_error: Some(result.is_error),
});
}
let tool_results = execute_tool_blocks_with_hooks(
tool_use_blocks,
tools,
&tool_ctx,
event_tx.as_ref(),
)
.await;
messages.push(Message {
role: claurst_core::types::Role::User,
content: claurst_core::types::MessageContent::Blocks(tool_results),
Expand Down Expand Up @@ -1908,157 +1890,23 @@ pub async fn run_query_loop(
};
}

// ---------------------------------------------------------------------------
// Streaming tool executor: parallel non-agent tool dispatch.
//
// Phase 1: Run PreToolUse hooks sequentially (they can block/deny execution
// and may display interactive permission dialogs).
// Phase 2: Dispatch all non-blocked tool executions concurrently via
// futures::future::join_all, preserving original order.
// Phase 3: Fire PostToolUse hooks + emit events, then collect results.
//
// This mirrors the TypeScript StreamingToolExecutor pattern.
// ---------------------------------------------------------------------------

// Intermediate record produced during Phase 1.
struct PreparedTool {
id: String,
name: String,
input: Value,
/// None means the pre-hook blocked execution; the String is the error reason.
blocked_result: Option<ToolResult>,
}

// Phase 1: sequential pre-hook pass.
let mut prepared: Vec<PreparedTool> = Vec::with_capacity(tool_blocks.len());
for block in tool_blocks {
if let ContentBlock::ToolUse { id, name, input } = block {
// Clone from the references returned by get_tool_use_blocks()
let id = id.clone();
let name = name.clone();
let input = input.clone();

if let Some(ref tx) = event_tx {
let _ = tx.send(QueryEvent::ToolStart {
tool_name: name.clone(),
tool_id: id.clone(),
input_json: input.to_string(),
});
}

let hooks = &tool_ctx.config.hooks;
let hook_ctx = claurst_core::hooks::HookContext {
event: "PreToolUse".to_string(),
tool_name: Some(name.clone()),
tool_input: Some(input.clone()),
tool_output: None,
is_error: None,
session_id: Some(tool_ctx.session_id.clone()),
};
let pre_outcome = claurst_core::hooks::run_hooks(
hooks,
claurst_core::config::HookEvent::PreToolUse,
&hook_ctx,
&tool_ctx.working_dir,
)
.await;

let plugin_pre_outcome =
claurst_plugins::run_global_pre_tool_hook(&name, &input);

let blocked_result =
if let claurst_core::hooks::HookOutcome::Blocked(reason) = pre_outcome {
warn!(tool = %name, reason = %reason, "PreToolUse hook blocked execution");
Some(claurst_tools::ToolResult::error(format!(
"Blocked by hook: {}",
reason
)))
} else if let claurst_plugins::HookOutcome::Deny(reason) = plugin_pre_outcome {
warn!(tool = %name, reason = %reason, "Plugin PreToolUse hook blocked execution");
Some(claurst_tools::ToolResult::error(format!(
"Blocked by plugin hook: {}",
reason
)))
} else {
None
};

prepared.push(PreparedTool {
id,
name,
input,
blocked_result,
});
}
}

// Phase 2: build execution futures for non-blocked tools and join them.
// Blocked tools yield a ready future with the pre-computed error result.
// Non-blocked tools execute concurrently via join_all.
// Each async block owns its cloned name/input so there are no lifetime issues.
let exec_futures: Vec<_> = prepared
.iter()
.map(|p| {
if p.blocked_result.is_some() {
let r = p.blocked_result.clone().unwrap();
futures::future::Either::Left(async move { r })
let tool_invocations: Vec<_> = tool_blocks
.into_iter()
.filter_map(|block| {
if let ContentBlock::ToolUse { id, name, input } = block {
Some((id.clone(), name.clone(), input.clone()))
} else {
let name = p.name.clone();
let input = p.input.clone();
futures::future::Either::Right(async move {
execute_tool(&name, &input, tools, tool_ctx).await
})
None
}
})
.collect();

// Run all tool futures concurrently; join_all preserves order.
let exec_results: Vec<ToolResult> =
futures::future::join_all(exec_futures).await;

// Phase 3: post-hooks, event emission, and result block assembly.
let mut result_blocks: Vec<ContentBlock> =
Vec::with_capacity(prepared.len());
for (p, result) in prepared.iter().zip(exec_results.into_iter()) {
let hooks = &tool_ctx.config.hooks;
let post_ctx = claurst_core::hooks::HookContext {
event: "PostToolUse".to_string(),
tool_name: Some(p.name.clone()),
tool_input: Some(p.input.clone()),
tool_output: Some(result.content.clone()),
is_error: Some(result.is_error),
session_id: Some(tool_ctx.session_id.clone()),
};
claurst_core::hooks::run_hooks(
hooks,
claurst_core::config::HookEvent::PostToolUse,
&post_ctx,
&tool_ctx.working_dir,
)
.await;

claurst_plugins::run_global_post_tool_hook(
&p.name,
&p.input,
&result.content,
result.is_error,
);

if let Some(ref tx) = event_tx {
let _ = tx.send(QueryEvent::ToolEnd {
tool_name: p.name.clone(),
tool_id: p.id.clone(),
result: result.content.clone(),
is_error: result.is_error,
});
}

result_blocks.push(ContentBlock::ToolResult {
tool_use_id: p.id.clone(),
content: ToolResultContent::Text(result.content),
is_error: if result.is_error { Some(true) } else { None },
});
}
let result_blocks = execute_tool_blocks_with_hooks(
tool_invocations,
tools,
tool_ctx,
event_tx.as_ref(),
)
.await;

// Append tool results as a user message
messages.push(Message::user_blocks(result_blocks));
Expand Down Expand Up @@ -2107,6 +1955,137 @@ pub async fn run_query_loop(
}
}

struct PreparedTool {
id: String,
name: String,
input: Value,
blocked_result: Option<ToolResult>,
}

/// Execute tool invocations through the common policy-aware tool path.
async fn execute_tool_blocks_with_hooks(
tool_blocks: Vec<(String, String, Value)>,
tools: &[Box<dyn Tool>],
tool_ctx: &ToolContext,
event_tx: Option<&mpsc::UnboundedSender<QueryEvent>>,
) -> Vec<ContentBlock> {
Comment on lines +1965 to +1971
// Phase 1: Run PreToolUse hooks sequentially so policy denials can block
// execution before any tool side effects occur.
let mut prepared: Vec<PreparedTool> = Vec::with_capacity(tool_blocks.len());
for (id, name, input) in tool_blocks {
if let Some(tx) = event_tx {
let _ = tx.send(QueryEvent::ToolStart {
tool_name: name.clone(),
tool_id: id.clone(),
input_json: input.to_string(),
});
}

let hook_ctx = claurst_core::hooks::HookContext {
event: "PreToolUse".to_string(),
tool_name: Some(name.clone()),
tool_input: Some(input.clone()),
tool_output: None,
is_error: None,
session_id: Some(tool_ctx.session_id.clone()),
};
let pre_outcome = claurst_core::hooks::run_hooks(
&tool_ctx.config.hooks,
claurst_core::config::HookEvent::PreToolUse,
&hook_ctx,
&tool_ctx.working_dir,
)
.await;

let plugin_pre_outcome = claurst_plugins::run_global_pre_tool_hook(&name, &input);

let blocked_result =
if let claurst_core::hooks::HookOutcome::Blocked(reason) = pre_outcome {
warn!(tool = %name, reason = %reason, "PreToolUse hook blocked execution");
Some(claurst_tools::ToolResult::error(format!(
"Blocked by hook: {}",
reason
)))
} else if let claurst_plugins::HookOutcome::Deny(reason) = plugin_pre_outcome {
warn!(tool = %name, reason = %reason, "Plugin PreToolUse hook blocked execution");
Some(claurst_tools::ToolResult::error(format!(
"Blocked by plugin hook: {}",
reason
)))
} else {
None
};

prepared.push(PreparedTool {
id,
name,
input,
blocked_result,
});
}

// Phase 2: execute non-blocked tools concurrently.
let exec_futures: Vec<_> = prepared
.iter()
.map(|p| {
if let Some(result) = p.blocked_result.clone() {
futures::future::Either::Left(async move { result })
} else {
let name = p.name.clone();
let input = p.input.clone();
futures::future::Either::Right(async move {
execute_tool(&name, &input, tools, tool_ctx).await
})
}
})
.collect();
let exec_results: Vec<ToolResult> = futures::future::join_all(exec_futures).await;

// Phase 3: run post-hooks, emit completion events, and return result blocks.
let mut result_blocks: Vec<ContentBlock> = Vec::with_capacity(prepared.len());
for (p, result) in prepared.iter().zip(exec_results.into_iter()) {
let post_ctx = claurst_core::hooks::HookContext {
event: "PostToolUse".to_string(),
tool_name: Some(p.name.clone()),
tool_input: Some(p.input.clone()),
tool_output: Some(result.content.clone()),
is_error: Some(result.is_error),
session_id: Some(tool_ctx.session_id.clone()),
};
claurst_core::hooks::run_hooks(
&tool_ctx.config.hooks,
claurst_core::config::HookEvent::PostToolUse,
&post_ctx,
&tool_ctx.working_dir,
)
.await;

claurst_plugins::run_global_post_tool_hook(
&p.name,
&p.input,
&result.content,
result.is_error,
);

if let Some(tx) = event_tx {
let _ = tx.send(QueryEvent::ToolEnd {
tool_name: p.name.clone(),
tool_id: p.id.clone(),
result: result.content.clone(),
is_error: result.is_error,
});
}

result_blocks.push(ContentBlock::ToolResult {
tool_use_id: p.id.clone(),
content: ToolResultContent::Text(result.content),
is_error: if result.is_error { Some(true) } else { None },
});
}

result_blocks
}

/// Execute a single tool invocation.
async fn execute_tool(
name: &str,
Expand Down