Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/rig-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = { workspace = true }
publish = false

[dependencies]
rig-core = "0.15.1"
rig-core = "0.28.0"
tokio = { version = "1", features = ["full"] }
rmcp = { workspace = true, features = [
"client",
Expand Down
94 changes: 49 additions & 45 deletions examples/rig-integration/src/chat.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use futures::StreamExt;
use rig::{
agent::Agent,
completion::{AssistantContent, CompletionModel},
message::Message,
streaming::StreamingChat,
agent::{Agent, MultiTurnStreamItem},
completion::CompletionModel,
message::{Message, Text},
streaming::{StreamedAssistantContent, StreamingChat},
};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};

pub async fn cli_chatbot<M>(chatbot: Agent<M>) -> anyhow::Result<()>
where
M: CompletionModel,
M: CompletionModel + 'static,
M::StreamingResponse: Send,
{
let mut chat_log = vec![];

Expand All @@ -28,49 +29,52 @@ where
if input == ":q" {
break;
}
match chatbot.stream_chat(input, chat_log.clone()).await {
Ok(mut response) => {
tracing::info!(%input);
chat_log.push(Message::user(input));
stream_output_agent_start(&mut output).await?;
let mut message_buf = String::new();
while let Some(message) = response.next().await {
match message {
Ok(AssistantContent::Text(text)) => {
message_buf.push_str(&text.text);
output_agent(text.text, &mut output).await?;
}
Ok(AssistantContent::ToolCall(tool_call)) => {
let name = tool_call.function.name;
let arguments = tool_call.function.arguments;
chat_log.push(Message::assistant(format!(
"Calling tool: {name} with args: {arguments}"
)));
let result = chatbot.tools.call(&name, arguments.to_string()).await;
match result {
Ok(tool_call_result) => {
stream_output_agent_finished(&mut output).await?;
stream_output_toolcall(&tool_call_result, &mut output).await?;
stream_output_agent_start(&mut output).await?;
chat_log.push(Message::user(tool_call_result));
}
Err(e) => {
output_error(e, &mut output).await?;
}
}
}
Err(error) => {
output_error(error, &mut output).await?;
}
}

tracing::info!(%input);
chat_log.push(Message::user(input));

let mut response = chatbot.stream_chat(input, chat_log.clone()).await;
stream_output_agent_start(&mut output).await?;
let mut message_buf = String::new();

while let Some(message) = response.next().await {
match message {
Ok(MultiTurnStreamItem::StreamAssistantItem(StreamedAssistantContent::Text(
Text { text },
))) => {
message_buf.push_str(&text);
output_agent(&text, &mut output).await?;
}
Ok(MultiTurnStreamItem::StreamAssistantItem(
StreamedAssistantContent::ToolCall(tool_call),
)) => {
let name = &tool_call.function.name;
let arguments = &tool_call.function.arguments;
stream_output_toolcall(
format!("Calling tool: {name} with args: {arguments}"),
&mut output,
)
.await?;
}
Ok(MultiTurnStreamItem::StreamUserItem(user_content)) => {
// Tool results are streamed back as user items
stream_output_toolcall(format!("Tool result: {:?}", user_content), &mut output)
.await?;
}
Ok(MultiTurnStreamItem::FinalResponse(final_response)) => {
tracing::info!("Final response received: {:?}", final_response);
}
Ok(_) => {
// Handle other stream items (reasoning, deltas, etc.)
}
Err(error) => {
output_error(error, &mut output).await?;
}
chat_log.push(Message::assistant(message_buf));
stream_output_agent_finished(&mut output).await?;
}
Err(error) => {
output_error(error, &mut output).await?;
}
}

chat_log.push(Message::assistant(message_buf));
stream_output_agent_finished(&mut output).await?;
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions examples/rig-integration/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ async fn main() -> anyhow::Result<()> {
let config = config::Config::retrieve("config.toml").await?;
let deepseek_client = {
if let Some(key) = config.deepseek_key {
deepseek::Client::new(&key)
deepseek::Client::new(&key)?
} else {
deepseek::Client::from_env()
}
};
let cohere_client = {
if let Some(key) = config.cohere_key {
cohere::Client::new(&key)
cohere::Client::new(&key)?
} else {
cohere::Client::from_env()
}
Expand Down
8 changes: 3 additions & 5 deletions examples/rig-integration/src/mcp_adaptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ impl RigTool for McpToolAdaptor {
fn definition(
&self,
_prompt: String,
) -> std::pin::Pin<Box<dyn Future<Output = rig::completion::ToolDefinition> + Send + Sync + '_>>
{
) -> std::pin::Pin<Box<dyn Future<Output = rig::completion::ToolDefinition> + Send + '_>> {
Box::pin(std::future::ready(rig::completion::ToolDefinition {
name: self.name(),
description: self
Expand All @@ -37,9 +36,8 @@ impl RigTool for McpToolAdaptor {
fn call(
&self,
args: String,
) -> std::pin::Pin<
Box<dyn Future<Output = Result<String, rig::tool::ToolError>> + Send + Sync + '_>,
> {
) -> std::pin::Pin<Box<dyn Future<Output = Result<String, rig::tool::ToolError>> + Send + '_>>
{
let server = self.server.clone();
Box::pin(async move {
let call_mcp_tool_result = server
Expand Down