Production-grade Rust SDK for building autonomous AI agents over the MXP protocol.
MXP Agents Runtime is the production runtime for agents that speak MXP natively. It focuses on predictable behavior, governance, and interoperability.
- Protocol-native: handles MXP
Call,Response,Event,Stream*,AgentRegister, andAgentHeartbeat. - Deterministic lifecycle and concurrency: explicit
Lifecycletransitions plus boundedTaskScheduler. - Governance-first: policy checks for tools, inference, and memory with audit emission hooks.
- Capability-scoped tools: a typed registry with metadata and versioning.
- Operations-ready: graceful shutdown and checkpoint-ready recovery.
LangChain/LangGraph are great for Python-first prototyping and graph orchestration. MXP Agents Runtime targets protocol-native, production-grade agent systems.
| Need | MXP Agents Runtime | Chain frameworks |
|---|---|---|
| Protocol-native A2A | MXP message types and transport | Typically HTTP/JSON adapters |
| Runtime lifecycle | Explicit lifecycle + bounded scheduler | Orchestration focused |
| Governance | Policy engine + audit emission | Usually external or custom |
| Operations | Graceful shutdown + recovery hooks | Often add-on |
| Language strategy | Rust core; JS/Py SDKs planned | Python-first |
- Customer support agent with tools and MXP responses:
examples/customer-support-agent - Enterprise setup with policy + telemetry:
examples/enterprise-agent - Multi-agent pipeline and real-world demos:
examples/real-world - MXP agent mesh over UDP (coordinator + agents + test client):
examples/RUN_AGENTS.md - Aviation Flight Pulse (DXB live feed + weather):
examples/aviation-flight-pulse
- Positioning and why we built this:
docs/positioning.md - Comparison and when to use MXP vs chain frameworks:
docs/comparison.md - Demo guide with real MXP flows:
docs/demos.md - Project updates and release notes:
docs/updates.md - Community channels:
docs/community.md - Roadmap and milestones:
ROADMAP.md - Contributing guidelines:
CONTRIBUTING.md
[dependencies]
mxp-agents = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
futures = "0.3"Small end-to-end flow: build a kernel, send an MXP Call, and read the outcome.
use mxp::{Message, MessageType};
use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::kernel::{AgentKernel, CollectingSink, KernelMessageHandler, TaskScheduler};
use mxp_agents::primitives::AgentId;
use mxp_agents::tools::registry::{ToolMetadata, ToolRegistry};
use serde_json::{json, Value};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let adapter = Arc::new(GeminiAdapter::new(
GeminiConfig::from_env("gemini-2.0-flash").with_stream(false),
)?);
let tools = Arc::new(ToolRegistry::new());
tools.register_tool(
ToolMetadata::new("lookup_order", "1.0.0")?,
|input: Value| async move { Ok(json!({ "status": "shipped", "input": input })) },
)?;
let sink = CollectingSink::new();
let handler = Arc::new(KernelMessageHandler::new(adapter, tools, sink.clone()));
let kernel = AgentKernel::new(AgentId::random(), handler, TaskScheduler::default());
let payload = json!({
"messages": [
{ "role": "user", "content": "Look up order ORD-12345" }
],
"tools": [
{ "name": "lookup_order", "input": { "order_id": "ORD-12345" } }
]
});
kernel
.handle_message(Message::new(MessageType::Call, serde_json::to_vec(&payload)?))
.await?;
let outcome = sink.drain().pop().expect("call outcome");
println!("Response: {}", outcome.response());
Ok(())
}Get a complete response in one shot:
use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create adapter (streaming disabled by default)
let adapter = GeminiAdapter::new(
GeminiConfig::from_env("gemini-2.0-flash")
)?;
let request = InferenceRequest::new(vec![
PromptMessage::new(MessageRole::User, "What is Rust?"),
])?;
let mut stream = adapter.infer(request).await?;
// Single chunk with complete response
while let Some(chunk) = stream.next().await {
println!("{}", chunk?.delta);
}
Ok(())
}Stream tokens as they're generated:
use std::io::{Write, stdout};
use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Enable streaming with .with_stream(true)
let adapter = GeminiAdapter::new(
GeminiConfig::from_env("gemini-2.0-flash")
.with_stream(true)
)?;
let request = InferenceRequest::new(vec![
PromptMessage::new(MessageRole::User, "Write a haiku about Rust."),
])?;
let mut stream = adapter.infer(request).await?;
// Tokens arrive incrementally
while let Some(chunk) = stream.next().await {
print!("{}", chunk?.delta);
stdout().flush()?; // IMPORTANT: flush to see tokens immediately
}
Ok(())
}use mxp_agents::adapters::openai::{OpenAiAdapter, OpenAiConfig};
use mxp_agents::adapters::anthropic::{AnthropicAdapter, AnthropicConfig};
use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::adapters::ollama::{OllamaAdapter, OllamaConfig};
// OpenAI (requires OPENAI_API_KEY env var)
let openai = OpenAiAdapter::new(
OpenAiConfig::from_env("gpt-4o")
.with_stream(true)
)?;
// Anthropic (requires ANTHROPIC_API_KEY env var)
let anthropic = AnthropicAdapter::new(
AnthropicConfig::from_env("claude-sonnet-4-20250514")
.with_stream(true)
)?;
// Google Gemini (requires GEMINI_API_KEY env var)
let gemini = GeminiAdapter::new(
GeminiConfig::from_env("gemini-2.0-flash")
.with_stream(true)
)?;
// Ollama (local, no API key needed)
let ollama = OllamaAdapter::new(
OllamaConfig::new("llama3.2")
.with_stream(true)
)?;use mxp_agents::adapters::openai::{OpenAiAdapter, OpenAiConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};
let adapter = OpenAiAdapter::new(OpenAiConfig::from_env("gpt-4o"))?;
let request = InferenceRequest::new(vec![
PromptMessage::new(MessageRole::User, "Explain quantum computing"),
])?
.with_system_prompt("You are a physics professor. Explain concepts simply.")
.with_temperature(0.7) // 0.0 = deterministic, 1.0 = creative
.with_max_output_tokens(500); // Limit response length
let mut stream = adapter.infer(request).await?;use mxp_agents::adapters::ollama::{OllamaAdapter, OllamaConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};
let adapter = OllamaAdapter::new(OllamaConfig::new("llama3.2"))?;
// Build conversation history
let request = InferenceRequest::new(vec![
PromptMessage::new(MessageRole::User, "My name is Alice."),
PromptMessage::new(MessageRole::Assistant, "Hello Alice! How can I help you today?"),
PromptMessage::new(MessageRole::User, "What's my name?"),
])?;
let mut stream = adapter.infer(request).await?;
// Response: "Your name is Alice."Production-grade error handling:
use mxp_agents::adapters::openai::{OpenAiAdapter, OpenAiConfig};
use mxp_agents::adapters::resilience::{
ResilientAdapter, CircuitBreakerConfig, RetryConfig, BackoffStrategy,
};
use std::time::Duration;
let base = OpenAiAdapter::new(OpenAiConfig::from_env("gpt-4o"))?;
let resilient = ResilientAdapter::builder(base)
// Circuit breaker: stop calling after 5 failures, wait 30s before retry
.with_circuit_breaker(CircuitBreakerConfig {
failure_threshold: 5,
cooldown: Duration::from_secs(30),
success_threshold: 2,
})
// Retry: exponential backoff with jitter
.with_retry(RetryConfig {
max_attempts: 3,
backoff: BackoffStrategy::Exponential {
base: Duration::from_millis(100),
max: Duration::from_secs(10),
jitter: true,
},
..Default::default()
})
// Timeout: fail if request takes > 30s
.with_timeout_duration(Duration::from_secs(30))
.build();
// Use exactly like any other adapter
let mut stream = resilient.infer(request).await?;use mxp_agents::telemetry::PrometheusExporter;
let exporter = PrometheusExporter::new();
// Register metrics for runtime and adapters
exporter.register_runtime();
exporter.register_adapter("openai");
exporter.register_adapter("ollama");
// Export Prometheus-format metrics
let metrics = exporter.export();
println!("{}", metrics);
// Metrics include:
// - mxp_adapter_requests_total{adapter="openai"}
// - mxp_adapter_request_duration_seconds{adapter="openai"}
// - mxp_adapter_errors_total{adapter="openai", error_type="timeout"}
// - mxp_circuit_breaker_state{adapter="openai"}use mxp_agents::telemetry::{HealthReporter, HealthStatus, ComponentHealth};
use std::time::Duration;
let reporter = HealthReporter::new();
// Register health checks
reporter.register("database", || async {
// Your health check logic
ComponentHealth::healthy("connected")
});
reporter.register("llm_adapter", || async {
ComponentHealth::healthy("openai responding")
});
// Kubernetes endpoints
let readiness = reporter.readiness().await; // /readyz
let liveness = reporter.liveness().await; // /healthz
match readiness.status {
HealthStatus::Healthy => println!("Ready to serve traffic"),
HealthStatus::Degraded => println!("Partially available"),
HealthStatus::Unhealthy => println!("Not ready"),
}use mxp_agents::config::{Secret, EnvSecretProvider, ChainedSecretProvider};
// Load from environment (redacted in Debug output)
let provider = EnvSecretProvider::new();
let api_key: Secret<String> = provider.get("OPENAI_API_KEY")?;
// Safe to log - shows "[REDACTED]"
println!("API Key: {:?}", api_key);
// Access the actual value when needed
let actual_key = api_key.expose();
// Chain multiple providers (env -> file -> vault)
let chain = ChainedSecretProvider::new(vec![
Box::new(EnvSecretProvider::new()),
Box::new(FileSecretProvider::new("/run/secrets")),
]);use mxp_agents::primitives::{RateLimiter, AgentRateLimiter, AgentId};
use std::time::Duration;
// Per-adapter rate limiter
let limiter = RateLimiter::new(
100.0, // 100 requests per second
200, // burst capacity
);
if limiter.try_acquire() {
// Proceed with request
} else {
// Rate limited, back off
}
// Per-agent rate limiting
let agent_limiter = AgentRateLimiter::new(10.0, 20); // 10 req/s per agent
let agent_id = AgentId::random();
if agent_limiter.try_acquire(&agent_id) {
// This agent can proceed
}Build agents that communicate over MXP protocol:
use mxp_agents::kernel::{AgentKernel, AgentMessageHandler, HandlerContext, TaskScheduler, LifecycleEvent};
use mxp_agents::primitives::AgentId;
use async_trait::async_trait;
use std::sync::Arc;
struct MyHandler;
#[async_trait]
impl AgentMessageHandler for MyHandler {
async fn handle_call(&self, ctx: HandlerContext) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Received: {:?}", ctx.message());
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let agent_id = AgentId::random();
let handler = Arc::new(MyHandler);
let scheduler = TaskScheduler::default();
let mut kernel = AgentKernel::new(agent_id, handler, scheduler);
// Lifecycle: Created -> Ready -> Active
kernel.transition(LifecycleEvent::Boot)?;
kernel.transition(LifecycleEvent::Activate)?;
println!("Agent {} is active", agent_id);
Ok(())
}use mxp_agents::kernel::{ShutdownCoordinator, WorkGuard};
use std::sync::Arc;
use std::time::Duration;
let coordinator = Arc::new(ShutdownCoordinator::new(Duration::from_secs(30)));
// Register in-flight work
let guard: WorkGuard = coordinator.register_work()?;
// Do work...
tokio::spawn({
let coord = coordinator.clone();
async move {
// Work completes, guard drops automatically
drop(guard);
}
});
// Initiate shutdown (waits for in-flight work)
coordinator.shutdown().await?;Define tools that LLMs can call:
use mxp_agents::tools::{ToolRegistry, ToolMetadata, ToolBinding};
use serde_json::{json, Value};
let mut registry = ToolRegistry::new();
// Register a tool
registry.register(ToolBinding {
metadata: ToolMetadata {
name: "get_weather".into(),
description: "Get current weather for a city".into(),
parameters: json!({
"type": "object",
"properties": {
"city": { "type": "string" }
},
"required": ["city"]
}),
},
executor: Box::new(|args: Value| {
Box::pin(async move {
let city = args["city"].as_str().unwrap_or("unknown");
Ok(json!({ "temp": 72, "city": city }))
})
}),
})?;
// Invoke tool
let result = registry.invoke("get_weather", json!({"city": "Seattle"})).await?;use mxp_agents::policy::{PolicyEngine, PolicyRule, PolicyDecision, PolicyRequest};
let mut engine = PolicyEngine::new();
// Add rules
engine.add_rule(PolicyRule {
name: "block_dangerous_tools".into(),
condition: |req: &PolicyRequest| req.tool_name == "delete_all",
decision: PolicyDecision::Deny("Dangerous operation blocked".into()),
});
// Check policy before tool execution
let request = PolicyRequest { tool_name: "delete_all".into(), ..Default::default() };
match engine.evaluate(&request) {
PolicyDecision::Allow => { /* proceed */ },
PolicyDecision::Deny(reason) => println!("Blocked: {}", reason),
PolicyDecision::Escalate => { /* require human approval */ },
}| Variable | Provider | Required |
|---|---|---|
OPENAI_API_KEY |
OpenAI | Yes |
ANTHROPIC_API_KEY |
Anthropic | Yes |
GEMINI_API_KEY |
Google Gemini | Yes |
| (none) | Ollama | No (local) |
mxp-agents (facade)
├── agent-adapters # LLM providers (OpenAI, Anthropic, Gemini, Ollama)
├── agent-kernel # Agent lifecycle, MXP handlers, scheduler
├── agent-primitives # Core types (AgentId, Capability, RateLimiter)
├── agent-tools # Tool registry and execution
├── agent-policy # Governance and policy engine
├── agent-memory # Memory bus, journal, embeddings
├── agent-prompts # System prompts, context window management
├── agent-config # Configuration loading, secrets
└── agent-telemetry # Metrics, health checks, tracing
| Example | Description |
|---|---|
examples/real-world |
Real-world agents - Customer support, data analysis, code review, multi-agent pipelines |
examples/RUN_AGENTS.md |
MXP agent mesh - Coordinator plus agents over MXP transport |
examples/aviation-flight-pulse |
Aviation Flight Pulse - DXB region feed + weather with MXP events |
examples/cookbook |
SDK features demo - Resilience, metrics, health checks, rate limiting |
examples/streaming-test |
Token-by-token streaming demo |
examples/basic-agent |
Simple agent with LLM adapter |
examples/enterprise-agent |
Production setup with resilience & metrics |
Run the interactive real-world examples:
GEMINI_API_KEY=your_key cargo run -p real-world1. Customer Support Agent - Handles queries, looks up orders, processes refunds with policy checks
2. Data Analysis Agent - Queries structured data, generates insights, creates reports
3. Code Review Agent - Reviews code for bugs, security issues, performance problems
4. Multi-Agent Pipeline - Research → Write → Edit workflow with 3 agents collaborating
Sample output (Multi-Agent Pipeline):
STAGE 1: Research Agent
🔬 Research Agent working...
⏱️ Completed in 4.1s
STAGE 2: Writer Agent
✍️ Writer Agent working...
⏱️ Completed in 2.5s
STAGE 3: Editor Agent
📝 Editor Agent working...
⏱️ Completed in 1.7s
📊 Pipeline Summary
Total pipeline time: 8.4s
Agents involved: 3 (Research, Writer, Editor)
Data passed between agents: 3216 bytes
Run the cookbook demo:
GEMINI_API_KEY=your_key cargo run -p cookbookSample output:
[1/6] SECRETS MANAGEMENT
API Key (debug): [REDACTED]
✓ Secrets are automatically redacted in logs
[2/6] RATE LIMITING
Attempted 25 requests, 20 allowed (burst limit)
✓ Rate limiting prevents resource exhaustion
[3/6] HEALTH CHECKS (Kubernetes Ready)
Readiness: Healthy
Liveness: Healthy
✓ Health checks ready for Kubernetes probes
[4/6] PROMETHEUS METRICS
Metrics exported (2258 bytes)
✓ Metrics ready for Prometheus scraping
[5/6] RESILIENT STREAMING (Circuit Breaker + Retry)
Response (streaming): Rust achieves memory safety through...
Chunks received: 5
Time to first token: 1.3s
✓ Resilient streaming completed successfully
[6/6] GRACEFUL SHUTDOWN
Shutdown completed gracefully
✓ Graceful shutdown with work draining
- Rust 1.85+ (MSRV)
- Tokio async runtime
- API keys for cloud providers (or Ollama for local)
MIT OR Apache-2.0
