Skip to content

yafatek/mxp-agents-runtime

Repository files navigation

MXP Agents Runtime SDK

Crates.io Docs.rs License Rust 1.85+

MXP Agents Runtime overview

Production-grade Rust SDK for building autonomous AI agents over the MXP protocol.

Why MXP Agents Runtime

MXP Agents Runtime is the production runtime for agents that speak MXP natively. It focuses on predictable behavior, governance, and interoperability.

  • Protocol-native: handles MXP Call, Response, Event, Stream*, AgentRegister, and AgentHeartbeat.
  • Deterministic lifecycle and concurrency: explicit Lifecycle transitions plus bounded TaskScheduler.
  • Governance-first: policy checks for tools, inference, and memory with audit emission hooks.
  • Capability-scoped tools: a typed registry with metadata and versioning.
  • Operations-ready: graceful shutdown and checkpoint-ready recovery.

Compared to chain frameworks

LangChain/LangGraph are great for Python-first prototyping and graph orchestration. MXP Agents Runtime targets protocol-native, production-grade agent systems.

Need MXP Agents Runtime Chain frameworks
Protocol-native A2A MXP message types and transport Typically HTTP/JSON adapters
Runtime lifecycle Explicit lifecycle + bounded scheduler Orchestration focused
Governance Policy engine + audit emission Usually external or custom
Operations Graceful shutdown + recovery hooks Often add-on
Language strategy Rust core; JS/Py SDKs planned Python-first

MXP-native real-world examples

  • Customer support agent with tools and MXP responses: examples/customer-support-agent
  • Enterprise setup with policy + telemetry: examples/enterprise-agent
  • Multi-agent pipeline and real-world demos: examples/real-world
  • MXP agent mesh over UDP (coordinator + agents + test client): examples/RUN_AGENTS.md
  • Aviation Flight Pulse (DXB live feed + weather): examples/aviation-flight-pulse

Community and roadmap

  • Positioning and why we built this: docs/positioning.md
  • Comparison and when to use MXP vs chain frameworks: docs/comparison.md
  • Demo guide with real MXP flows: docs/demos.md
  • Project updates and release notes: docs/updates.md
  • Community channels: docs/community.md
  • Roadmap and milestones: ROADMAP.md
  • Contributing guidelines: CONTRIBUTING.md

Installation

[dependencies]
mxp-agents = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
futures = "0.3"

MXP-native quickstart

Small end-to-end flow: build a kernel, send an MXP Call, and read the outcome.

use mxp::{Message, MessageType};
use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::kernel::{AgentKernel, CollectingSink, KernelMessageHandler, TaskScheduler};
use mxp_agents::primitives::AgentId;
use mxp_agents::tools::registry::{ToolMetadata, ToolRegistry};
use serde_json::{json, Value};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let adapter = Arc::new(GeminiAdapter::new(
        GeminiConfig::from_env("gemini-2.0-flash").with_stream(false),
    )?);

    let tools = Arc::new(ToolRegistry::new());
    tools.register_tool(
        ToolMetadata::new("lookup_order", "1.0.0")?,
        |input: Value| async move { Ok(json!({ "status": "shipped", "input": input })) },
    )?;

    let sink = CollectingSink::new();
    let handler = Arc::new(KernelMessageHandler::new(adapter, tools, sink.clone()));
    let kernel = AgentKernel::new(AgentId::random(), handler, TaskScheduler::default());

    let payload = json!({
        "messages": [
            { "role": "user", "content": "Look up order ORD-12345" }
        ],
        "tools": [
            { "name": "lookup_order", "input": { "order_id": "ORD-12345" } }
        ]
    });

    kernel
        .handle_message(Message::new(MessageType::Call, serde_json::to_vec(&payload)?))
        .await?;

    let outcome = sink.drain().pop().expect("call outcome");
    println!("Response: {}", outcome.response());
    Ok(())
}

Cookbook

1. Basic LLM Inference (Non-Streaming)

Get a complete response in one shot:

use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create adapter (streaming disabled by default)
    let adapter = GeminiAdapter::new(
        GeminiConfig::from_env("gemini-2.0-flash")
    )?;

    let request = InferenceRequest::new(vec![
        PromptMessage::new(MessageRole::User, "What is Rust?"),
    ])?;

    let mut stream = adapter.infer(request).await?;
    
    // Single chunk with complete response
    while let Some(chunk) = stream.next().await {
        println!("{}", chunk?.delta);
    }
    
    Ok(())
}

2. Streaming LLM Responses (Token-by-Token)

Stream tokens as they're generated:

use std::io::{Write, stdout};
use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Enable streaming with .with_stream(true)
    let adapter = GeminiAdapter::new(
        GeminiConfig::from_env("gemini-2.0-flash")
            .with_stream(true)
    )?;

    let request = InferenceRequest::new(vec![
        PromptMessage::new(MessageRole::User, "Write a haiku about Rust."),
    ])?;

    let mut stream = adapter.infer(request).await?;
    
    // Tokens arrive incrementally
    while let Some(chunk) = stream.next().await {
        print!("{}", chunk?.delta);
        stdout().flush()?;  // IMPORTANT: flush to see tokens immediately
    }
    
    Ok(())
}

3. All Supported LLM Providers

use mxp_agents::adapters::openai::{OpenAiAdapter, OpenAiConfig};
use mxp_agents::adapters::anthropic::{AnthropicAdapter, AnthropicConfig};
use mxp_agents::adapters::gemini::{GeminiAdapter, GeminiConfig};
use mxp_agents::adapters::ollama::{OllamaAdapter, OllamaConfig};

// OpenAI (requires OPENAI_API_KEY env var)
let openai = OpenAiAdapter::new(
    OpenAiConfig::from_env("gpt-4o")
        .with_stream(true)
)?;

// Anthropic (requires ANTHROPIC_API_KEY env var)
let anthropic = AnthropicAdapter::new(
    AnthropicConfig::from_env("claude-sonnet-4-20250514")
        .with_stream(true)
)?;

// Google Gemini (requires GEMINI_API_KEY env var)
let gemini = GeminiAdapter::new(
    GeminiConfig::from_env("gemini-2.0-flash")
        .with_stream(true)
)?;

// Ollama (local, no API key needed)
let ollama = OllamaAdapter::new(
    OllamaConfig::new("llama3.2")
        .with_stream(true)
)?;

4. System Prompts & Temperature

use mxp_agents::adapters::openai::{OpenAiAdapter, OpenAiConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};

let adapter = OpenAiAdapter::new(OpenAiConfig::from_env("gpt-4o"))?;

let request = InferenceRequest::new(vec![
    PromptMessage::new(MessageRole::User, "Explain quantum computing"),
])?
.with_system_prompt("You are a physics professor. Explain concepts simply.")
.with_temperature(0.7)        // 0.0 = deterministic, 1.0 = creative
.with_max_output_tokens(500); // Limit response length

let mut stream = adapter.infer(request).await?;

5. Multi-Turn Conversations

use mxp_agents::adapters::ollama::{OllamaAdapter, OllamaConfig};
use mxp_agents::adapters::traits::{InferenceRequest, MessageRole, ModelAdapter, PromptMessage};

let adapter = OllamaAdapter::new(OllamaConfig::new("llama3.2"))?;

// Build conversation history
let request = InferenceRequest::new(vec![
    PromptMessage::new(MessageRole::User, "My name is Alice."),
    PromptMessage::new(MessageRole::Assistant, "Hello Alice! How can I help you today?"),
    PromptMessage::new(MessageRole::User, "What's my name?"),
])?;

let mut stream = adapter.infer(request).await?;
// Response: "Your name is Alice."

6. Resilient Adapter (Circuit Breaker + Retry)

Production-grade error handling:

use mxp_agents::adapters::openai::{OpenAiAdapter, OpenAiConfig};
use mxp_agents::adapters::resilience::{
    ResilientAdapter, CircuitBreakerConfig, RetryConfig, BackoffStrategy,
};
use std::time::Duration;

let base = OpenAiAdapter::new(OpenAiConfig::from_env("gpt-4o"))?;

let resilient = ResilientAdapter::builder(base)
    // Circuit breaker: stop calling after 5 failures, wait 30s before retry
    .with_circuit_breaker(CircuitBreakerConfig {
        failure_threshold: 5,
        cooldown: Duration::from_secs(30),
        success_threshold: 2,
    })
    // Retry: exponential backoff with jitter
    .with_retry(RetryConfig {
        max_attempts: 3,
        backoff: BackoffStrategy::Exponential {
            base: Duration::from_millis(100),
            max: Duration::from_secs(10),
            jitter: true,
        },
        ..Default::default()
    })
    // Timeout: fail if request takes > 30s
    .with_timeout_duration(Duration::from_secs(30))
    .build();

// Use exactly like any other adapter
let mut stream = resilient.infer(request).await?;

7. Prometheus Metrics

use mxp_agents::telemetry::PrometheusExporter;

let exporter = PrometheusExporter::new();

// Register metrics for runtime and adapters
exporter.register_runtime();
exporter.register_adapter("openai");
exporter.register_adapter("ollama");

// Export Prometheus-format metrics
let metrics = exporter.export();
println!("{}", metrics);

// Metrics include:
// - mxp_adapter_requests_total{adapter="openai"}
// - mxp_adapter_request_duration_seconds{adapter="openai"}
// - mxp_adapter_errors_total{adapter="openai", error_type="timeout"}
// - mxp_circuit_breaker_state{adapter="openai"}

8. Health Checks (Kubernetes Ready)

use mxp_agents::telemetry::{HealthReporter, HealthStatus, ComponentHealth};
use std::time::Duration;

let reporter = HealthReporter::new();

// Register health checks
reporter.register("database", || async {
    // Your health check logic
    ComponentHealth::healthy("connected")
});

reporter.register("llm_adapter", || async {
    ComponentHealth::healthy("openai responding")
});

// Kubernetes endpoints
let readiness = reporter.readiness().await;  // /readyz
let liveness = reporter.liveness().await;    // /healthz

match readiness.status {
    HealthStatus::Healthy => println!("Ready to serve traffic"),
    HealthStatus::Degraded => println!("Partially available"),
    HealthStatus::Unhealthy => println!("Not ready"),
}

9. Secrets Management

use mxp_agents::config::{Secret, EnvSecretProvider, ChainedSecretProvider};

// Load from environment (redacted in Debug output)
let provider = EnvSecretProvider::new();
let api_key: Secret<String> = provider.get("OPENAI_API_KEY")?;

// Safe to log - shows "[REDACTED]"
println!("API Key: {:?}", api_key);

// Access the actual value when needed
let actual_key = api_key.expose();

// Chain multiple providers (env -> file -> vault)
let chain = ChainedSecretProvider::new(vec![
    Box::new(EnvSecretProvider::new()),
    Box::new(FileSecretProvider::new("/run/secrets")),
]);

10. Rate Limiting

use mxp_agents::primitives::{RateLimiter, AgentRateLimiter, AgentId};
use std::time::Duration;

// Per-adapter rate limiter
let limiter = RateLimiter::new(
    100.0,  // 100 requests per second
    200,    // burst capacity
);

if limiter.try_acquire() {
    // Proceed with request
} else {
    // Rate limited, back off
}

// Per-agent rate limiting
let agent_limiter = AgentRateLimiter::new(10.0, 20); // 10 req/s per agent
let agent_id = AgentId::random();

if agent_limiter.try_acquire(&agent_id) {
    // This agent can proceed
}

11. MXP Agent Kernel

Build agents that communicate over MXP protocol:

use mxp_agents::kernel::{AgentKernel, AgentMessageHandler, HandlerContext, TaskScheduler, LifecycleEvent};
use mxp_agents::primitives::AgentId;
use async_trait::async_trait;
use std::sync::Arc;

struct MyHandler;

#[async_trait]
impl AgentMessageHandler for MyHandler {
    async fn handle_call(&self, ctx: HandlerContext) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        println!("Received: {:?}", ctx.message());
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let agent_id = AgentId::random();
    let handler = Arc::new(MyHandler);
    let scheduler = TaskScheduler::default();

    let mut kernel = AgentKernel::new(agent_id, handler, scheduler);
    
    // Lifecycle: Created -> Ready -> Active
    kernel.transition(LifecycleEvent::Boot)?;
    kernel.transition(LifecycleEvent::Activate)?;

    println!("Agent {} is active", agent_id);
    Ok(())
}

12. Graceful Shutdown

use mxp_agents::kernel::{ShutdownCoordinator, WorkGuard};
use std::sync::Arc;
use std::time::Duration;

let coordinator = Arc::new(ShutdownCoordinator::new(Duration::from_secs(30)));

// Register in-flight work
let guard: WorkGuard = coordinator.register_work()?;

// Do work...
tokio::spawn({
    let coord = coordinator.clone();
    async move {
        // Work completes, guard drops automatically
        drop(guard);
    }
});

// Initiate shutdown (waits for in-flight work)
coordinator.shutdown().await?;

13. Tool Registration

Define tools that LLMs can call:

use mxp_agents::tools::{ToolRegistry, ToolMetadata, ToolBinding};
use serde_json::{json, Value};

let mut registry = ToolRegistry::new();

// Register a tool
registry.register(ToolBinding {
    metadata: ToolMetadata {
        name: "get_weather".into(),
        description: "Get current weather for a city".into(),
        parameters: json!({
            "type": "object",
            "properties": {
                "city": { "type": "string" }
            },
            "required": ["city"]
        }),
    },
    executor: Box::new(|args: Value| {
        Box::pin(async move {
            let city = args["city"].as_str().unwrap_or("unknown");
            Ok(json!({ "temp": 72, "city": city }))
        })
    }),
})?;

// Invoke tool
let result = registry.invoke("get_weather", json!({"city": "Seattle"})).await?;

14. Policy Enforcement

use mxp_agents::policy::{PolicyEngine, PolicyRule, PolicyDecision, PolicyRequest};

let mut engine = PolicyEngine::new();

// Add rules
engine.add_rule(PolicyRule {
    name: "block_dangerous_tools".into(),
    condition: |req: &PolicyRequest| req.tool_name == "delete_all",
    decision: PolicyDecision::Deny("Dangerous operation blocked".into()),
});

// Check policy before tool execution
let request = PolicyRequest { tool_name: "delete_all".into(), ..Default::default() };
match engine.evaluate(&request) {
    PolicyDecision::Allow => { /* proceed */ },
    PolicyDecision::Deny(reason) => println!("Blocked: {}", reason),
    PolicyDecision::Escalate => { /* require human approval */ },
}

Environment Variables

Variable Provider Required
OPENAI_API_KEY OpenAI Yes
ANTHROPIC_API_KEY Anthropic Yes
GEMINI_API_KEY Google Gemini Yes
(none) Ollama No (local)

Crate Structure

mxp-agents (facade)
├── agent-adapters     # LLM providers (OpenAI, Anthropic, Gemini, Ollama)
├── agent-kernel       # Agent lifecycle, MXP handlers, scheduler
├── agent-primitives   # Core types (AgentId, Capability, RateLimiter)
├── agent-tools        # Tool registry and execution
├── agent-policy       # Governance and policy engine
├── agent-memory       # Memory bus, journal, embeddings
├── agent-prompts      # System prompts, context window management
├── agent-config       # Configuration loading, secrets
└── agent-telemetry    # Metrics, health checks, tracing

Examples

Example Description
examples/real-world Real-world agents - Customer support, data analysis, code review, multi-agent pipelines
examples/RUN_AGENTS.md MXP agent mesh - Coordinator plus agents over MXP transport
examples/aviation-flight-pulse Aviation Flight Pulse - DXB region feed + weather with MXP events
examples/cookbook SDK features demo - Resilience, metrics, health checks, rate limiting
examples/streaming-test Token-by-token streaming demo
examples/basic-agent Simple agent with LLM adapter
examples/enterprise-agent Production setup with resilience & metrics

Real-World Examples

Run the interactive real-world examples:

GEMINI_API_KEY=your_key cargo run -p real-world

1. Customer Support Agent - Handles queries, looks up orders, processes refunds with policy checks

2. Data Analysis Agent - Queries structured data, generates insights, creates reports

3. Code Review Agent - Reviews code for bugs, security issues, performance problems

4. Multi-Agent Pipeline - Research → Write → Edit workflow with 3 agents collaborating

Sample output (Multi-Agent Pipeline):

STAGE 1: Research Agent
  🔬 Research Agent working...
  ⏱️  Completed in 4.1s

STAGE 2: Writer Agent  
  ✍️  Writer Agent working...
  ⏱️  Completed in 2.5s

STAGE 3: Editor Agent
  📝 Editor Agent working...
  ⏱️  Completed in 1.7s

📊 Pipeline Summary
  Total pipeline time: 8.4s
  Agents involved: 3 (Research, Writer, Editor)
  Data passed between agents: 3216 bytes

SDK Features Demo

Run the cookbook demo:

GEMINI_API_KEY=your_key cargo run -p cookbook

Sample output:

[1/6] SECRETS MANAGEMENT
API Key (debug): [REDACTED]
✓ Secrets are automatically redacted in logs

[2/6] RATE LIMITING
Attempted 25 requests, 20 allowed (burst limit)
✓ Rate limiting prevents resource exhaustion

[3/6] HEALTH CHECKS (Kubernetes Ready)
Readiness: Healthy
Liveness:  Healthy
✓ Health checks ready for Kubernetes probes

[4/6] PROMETHEUS METRICS
Metrics exported (2258 bytes)
✓ Metrics ready for Prometheus scraping

[5/6] RESILIENT STREAMING (Circuit Breaker + Retry)
Response (streaming): Rust achieves memory safety through...
  Chunks received: 5
  Time to first token: 1.3s
✓ Resilient streaming completed successfully

[6/6] GRACEFUL SHUTDOWN
Shutdown completed gracefully
✓ Graceful shutdown with work draining

Requirements

  • Rust 1.85+ (MSRV)
  • Tokio async runtime
  • API keys for cloud providers (or Ollama for local)

License

MIT OR Apache-2.0


Links

About

Production Rust runtime for MXP-native agents: lifecycle, tools, policy, memory, telemetry.

Topics

Resources

Contributing

Stars

Watchers

Forks

Sponsor this project

Packages

No packages published