diff --git a/README.md b/README.md
index 7e575e7..15464da 100644
--- a/README.md
+++ b/README.md
@@ -5,46 +5,60 @@
[](https://pypi.org/project/milo-cli/)
[](https://opensource.org/licenses/MIT)
-**Template-driven CLI applications for free-threaded Python**
+**Build CLIs that humans and AI agents both use natively**
```python
-from milo import App, Action
+from milo import CLI
-def reducer(state, action):
- if state is None:
- return {"count": 0}
- if action.type == "@@KEY" and action.payload.char == " ":
- return {**state, "count": state["count"] + 1}
- return state
+cli = CLI(name="deployer", description="Deploy services to environments")
-app = App(template="counter.kida", reducer=reducer, initial_state=None)
-final_state = app.run()
+@cli.command("deploy", description="Deploy a service", annotations={"destructiveHint": True})
+def deploy(environment: str, service: str, version: str = "latest") -> dict:
+ """Deploy a service to the specified environment."""
+ return {"status": "deployed", "environment": environment, "service": service, "version": version}
+
+cli.run()
+```
+
+Three protocols from one decorator:
+
+```bash
+# Human CLI
+deployer deploy --environment production --service api
+
+# MCP tool (AI agent calls this via JSON-RPC)
+echo '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"deploy","arguments":{"environment":"staging","service":"api"}}}' \
+ | deployer --mcp
+
+# AI-readable discovery document
+deployer --llms-txt
```
---
## What is Milo?
-Milo is a framework for building interactive terminal applications in Python 3.14t. It uses the Elm Architecture (Model-View-Update) — an immutable state tree managed by pure reducer functions, a view layer driven by Kida templates, and generator-based sagas for side effects. The result is CLI apps that are predictable, testable, and free-threading ready.
+Milo is a Python framework where every CLI is simultaneously a terminal app, a command-line tool, and an MCP server. Write one function with type annotations and a docstring — Milo generates the argparse subcommand, the MCP tool schema, and the llms.txt entry automatically.
**Why people pick it:**
+- **Every CLI is an MCP server** — `@cli.command` produces an argparse subcommand, MCP tool, and llms.txt entry from one function. AI agents discover and call your tools with zero extra code.
+- **Dual-mode commands** — The same command shows an interactive UI when a human runs it, and returns structured JSON when an AI calls it via MCP.
+- **Annotated schemas** — Type hints + `Annotated` constraints generate rich JSON Schema. Agents validate inputs before calling.
+- **Streaming progress** — Commands that yield `Progress` objects stream notifications to MCP clients in real time.
- **Elm Architecture** — Immutable state, pure reducers, declarative views. Every state transition is explicit and testable.
-- **Template-driven UI** — Render terminal output with Kida templates. Same syntax you use for HTML, now for CLI.
- **Free-threading ready** — Built for Python 3.14t (PEP 703). Sagas run on `ThreadPoolExecutor` with no GIL contention.
-- **Declarative flows** — Chain multi-screen state machines with the `>>` operator. No manual navigation plumbing.
-- **Built-in forms** — Text, select, confirm, and password fields with validation, keyboard navigation, and TTY fallback.
- **One runtime dependency** — Just `kida-templates`. No click, no rich, no curses.
## Use Milo For
+- **AI agent toolchains** — Every CLI doubles as an MCP server; register multiple CLIs behind a single gateway
- **Interactive CLI tools** — Wizards, installers, configuration prompts, and guided workflows
+- **Dual-mode commands** — Interactive when a human runs them, structured when an AI calls them
- **Multi-screen terminal apps** — Declarative flows with `>>` operator for screen-to-screen navigation
- **Forms and data collection** — Text, select, confirm, and password fields with validation
- **Dev tools with hot reload** — `milo dev` watches templates and live-reloads on change
- **Session recording and replay** — Record user sessions to JSONL, replay for debugging or CI regression tests
-- **Styled terminal output** — Kida terminal templates with ANSI colors, progress bars, and live rendering
-- **AI agent integration** — Every CLI is an MCP server; register multiple CLIs behind a single gateway
---
@@ -62,23 +76,33 @@ Requires Python 3.14+
## Quick Start
+### AI-Native CLI
+
+| Function | Description |
+|----------|-------------|
+| `CLI(name, description, version)` | Create a CLI application |
+| `@cli.command(name, description)` | Register a typed command |
+| `cli.group(name, description)` | Create a command group |
+| `cli.run()` | Parse args and dispatch |
+| `cli.call("cmd", **kwargs)` | Programmatic invocation |
+| `--mcp` | Run as MCP server |
+| `--llms-txt` | Generate AI discovery doc |
+| `--mcp-install` | Register in gateway |
+| `annotations={...}` | MCP behavioral hints |
+| `Annotated[str, MinLen(1)]` | Schema constraints |
+
+### Interactive Apps
+
| Function | Description |
|----------|-------------|
| `App(template, reducer, initial_state)` | Create a single-screen app |
| `App.from_flow(flow)` | Create a multi-screen app from a `Flow` |
-| `app.run()` | Run the event loop, return final state |
-| `Store(reducer, initial_state)` | Standalone state container |
-| `combine_reducers(**reducers)` | Compose slice-based reducers |
| `form(*specs)` | Run an interactive form, return `{field: value}` |
| `FlowScreen(name, template, reducer)` | Define a named screen |
| `flow = screen_a >> screen_b` | Chain screens into a flow |
-| `render_html(state, template)` | One-shot static HTML render |
-| `App.from_dir(__file__, ...)` | Auto-discover template directory |
| `ctx.run_app(reducer, template, state)` | Bridge CLI commands to interactive apps |
| `quit_on`, `with_cursor`, `with_confirm` | Reducer combinator decorators |
-| `Cmd(fn)` | Lightweight side effect (runs on thread pool) |
-| `Batch(cmds)`, `Sequence(cmds)` | Concurrent / serial command combinators |
-| `TickCmd(interval)` | Self-sustaining tick (return another to keep ticking) |
+| `Cmd(fn)`, `Batch(cmds)`, `Sequence(cmds)` | Side effects on thread pool |
| `ViewState(cursor_visible=True, ...)` | Declarative terminal state |
| `DevServer(app, watch_dirs)` | Hot-reload dev server |
@@ -88,6 +112,14 @@ Requires Python 3.14+
| Feature | Description | Docs |
|---------|-------------|------|
+| **MCP Server** | Every CLI doubles as an MCP server — AI agents discover and call commands via JSON-RPC | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) |
+| **MCP Gateway** | Single gateway aggregates all registered Milo CLIs for unified AI agent access | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) |
+| **Tool Annotations** | Declare `readOnlyHint`, `destructiveHint`, `idempotentHint` per MCP spec | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) |
+| **Streaming Progress** | Commands yield `Progress` objects; MCP clients receive real-time notifications | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) |
+| **Schema Constraints** | `Annotated[str, MinLen(1), MaxLen(100)]` generates rich JSON Schema | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) |
+| **llms.txt** | Generate AI-readable discovery documents from CLI command definitions | [llms.txt →](https://lbliii.github.io/milo-cli/docs/usage/llms/) |
+| **Middleware** | Intercept MCP calls and CLI commands for logging, auth, and transformation | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) |
+| **Observability** | Built-in request logging with latency stats (`milo://stats` resource) | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) |
| **State Management** | Redux-style `Store` with dispatch, listeners, middleware, and saga scheduling | [State →](https://lbliii.github.io/milo-cli/docs/usage/state/) |
| **Commands** | Lightweight `Cmd` thunks, `Batch`, `Sequence`, `TickCmd` for one-shot effects | [Commands →](https://lbliii.github.io/milo-cli/docs/usage/commands-effects/) |
| **Sagas** | Generator-based side effects: `Call`, `Put`, `Select`, `Fork`, `Delay`, `Retry` | [Sagas →](https://lbliii.github.io/milo-cli/docs/usage/sagas/) |
@@ -98,24 +130,107 @@ Requires Python 3.14+
| **Templates** | Kida-powered terminal rendering with built-in form, field, help, and progress templates | [Templates →](https://lbliii.github.io/milo-cli/docs/usage/templates/) |
| **Dev Server** | `milo dev` with filesystem polling and `@@HOT_RELOAD` dispatch | [Dev →](https://lbliii.github.io/milo-cli/docs/usage/dev/) |
| **Session Recording** | JSONL action log with state hashes for debugging and regression testing | [Testing →](https://lbliii.github.io/milo-cli/docs/usage/testing/) |
-| **Replay** | Time-travel debugging, speed control, step-by-step mode, CI hash assertions | [Testing →](https://lbliii.github.io/milo-cli/docs/usage/testing/) |
| **Snapshot Testing** | `assert_renders`, `assert_state`, `assert_saga` for deterministic test coverage | [Testing →](https://lbliii.github.io/milo-cli/docs/usage/testing/) |
| **Help Rendering** | `HelpRenderer` — drop-in `argparse.HelpFormatter` using Kida templates | [Help →](https://lbliii.github.io/milo-cli/docs/usage/help/) |
-| **MCP Server** | Every CLI doubles as an MCP server — AI agents discover and call commands via JSON-RPC | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) |
-| **MCP Gateway** | Single gateway aggregates all registered Milo CLIs for unified AI agent access | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) |
-| **llms.txt** | Generate AI-readable discovery documents from CLI command definitions | [llms.txt →](https://lbliii.github.io/milo-cli/docs/usage/llms/) |
-| **Error System** | Structured error hierarchy with namespaced codes (`M-INP-001`, `M-STA-003`) | [Errors →](https://lbliii.github.io/milo-cli/docs/reference/errors/) |
-| **Reducer Combinators** | `quit_on`, `with_cursor`, `with_confirm` decorators eliminate boilerplate key handling | [State →](https://lbliii.github.io/milo-cli/docs/usage/state/) |
-| **Shell Completions** | Generate bash/zsh/fish completions from CLI definitions | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) |
-| **Doctor Diagnostics** | `run_doctor()` validates environment, dependencies, and config health | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) |
-| **Version Checking** | Automatic PyPI upgrade notices with `check_version()` | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) |
| **Context** | Execution context with verbosity, output format, global options, and `run_app()` bridge | [Context →](https://lbliii.github.io/milo-cli/docs/usage/context/) |
| **Configuration** | `Config` with validation, init scaffolding, and profile support | [Config →](https://lbliii.github.io/milo-cli/docs/usage/config/) |
+| **Shell Completions** | Generate bash/zsh/fish completions from CLI definitions | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) |
+| **Doctor Diagnostics** | `run_doctor()` validates environment, dependencies, and config health | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) |
---
## Usage
+
+Dual-Mode Commands — Interactive for humans, structured for AI
+
+```python
+from milo import CLI, Context, Action, Quit, SpecialKey
+from milo.streaming import Progress
+from typing import Annotated
+from milo import MinLen
+
+cli = CLI(name="deployer", description="Deploy services")
+
+@cli.command("deploy", description="Deploy a service", annotations={"destructiveHint": True})
+def deploy(
+ environment: Annotated[str, MinLen(1)],
+ service: Annotated[str, MinLen(1)],
+ ctx: Context = None,
+) -> dict:
+ """Deploy a service to an environment."""
+ # Interactive mode: show confirmation UI
+ if ctx and ctx.is_interactive:
+ if not ctx.confirm(f"Deploy {service} to {environment}?"):
+ return {"status": "cancelled"}
+
+ # Stream progress (MCP clients see real-time notifications)
+ yield Progress(status=f"Deploying {service}", step=0, total=2)
+ yield Progress(status="Verifying health", step=1, total=2)
+
+ return {"status": "deployed", "environment": environment, "service": service}
+```
+
+Run by a human: interactive confirmation, then progress output.
+Called via MCP: progress notifications stream, then structured JSON result.
+
+
+
+
+MCP Server & Gateway — AI agent integration
+
+Every Milo CLI is automatically an MCP server:
+
+```bash
+# Run as MCP server (stdin/stdout JSON-RPC)
+myapp --mcp
+
+# Register with an AI host directly
+claude mcp add myapp -- uv run python examples/deploy/app.py --mcp
+```
+
+For multiple CLIs, register them and run a single gateway:
+
+```bash
+# Register CLIs
+taskman --mcp-install
+deployer --mcp-install
+
+# Run the unified gateway
+uv run python -m milo.gateway --mcp
+
+# Or register the gateway with your AI host
+claude mcp add milo -- uv run python -m milo.gateway --mcp
+```
+
+The gateway namespaces tools automatically: `taskman.add`, `deployer.deploy`, etc. Implements MCP 2025-11-25 with `outputSchema`, `structuredContent`, tool `annotations`, and streaming `Progress` notifications.
+
+Built-in `milo://stats` resource exposes request latency, error counts, and throughput.
+
+
+
+
+Schema Constraints — Rich validation from type hints
+
+```python
+from typing import Annotated
+from milo import CLI, MinLen, MaxLen, Gt, Lt, Pattern, Description
+
+cli = CLI(name="app")
+
+@cli.command("create-user", description="Create a user account")
+def create_user(
+ name: Annotated[str, MinLen(1), MaxLen(100), Description("Full name")],
+ age: Annotated[int, Gt(0), Lt(200)],
+ email: Annotated[str, Pattern(r"^[^@]+@[^@]+$")],
+) -> dict:
+ return {"name": name, "age": age, "email": email}
+```
+
+Generates JSON Schema with `minLength`, `maxLength`, `exclusiveMinimum`, `exclusiveMaximum`, `pattern`, and `description` — AI agents validate inputs before calling.
+
+
+
Single-Screen App — Counter with keyboard input
@@ -218,64 +333,6 @@ def reducer(state, action):
-
-Middleware — Intercept and transform dispatches
-
-```python
-def logging_middleware(dispatch, get_state):
- def wrapper(action):
- print(f"Action: {action.type}")
- return dispatch(action)
- return wrapper
-
-app = App(
- template="app.kida",
- reducer=reducer,
- initial_state=None,
- middleware=[logging_middleware],
-)
-```
-
-
-
-
-Dev Server — Hot reload templates
-
-```bash
-# Watch templates and reload on change
-milo dev myapp:app --watch ./templates --poll 0.25
-```
-
-```python
-from milo import App, DevServer
-
-app = App(template="dashboard.kida", reducer=reducer, initial_state=None)
-server = DevServer(app, watch_dirs=("./templates",), poll_interval=0.5)
-server.run()
-```
-
-
-
-
-Session Recording & Replay — Debug and regression testing
-
-```python
-# Record a session
-app = App(template="app.kida", reducer=reducer, initial_state=None, record=True)
-app.run() # Writes to session.jsonl
-
-# Replay for debugging
-milo replay session.jsonl --speed 2.0 --diff
-
-# CI regression: assert state hashes match
-milo replay session.jsonl --assert --reducer myapp:reducer
-
-# Step-by-step interactive replay
-milo replay session.jsonl --step
-```
-
-
-
Testing Utilities — Snapshot, state, and saga assertions
@@ -297,37 +354,6 @@ Set `MILO_UPDATE_SNAPSHOTS=1` to regenerate snapshot files.
-
-MCP Server & Gateway — AI agent integration
-
-Every Milo CLI is automatically an MCP server:
-
-```bash
-# Run as MCP server (stdin/stdout JSON-RPC)
-myapp --mcp
-
-# Register with an AI host directly
-claude mcp add myapp -- uv run python examples/taskman/app.py --mcp
-```
-
-For multiple CLIs, register them and run a single gateway:
-
-```bash
-# Register CLIs
-taskman --mcp-install
-ghub --mcp-install
-
-# Run the unified gateway
-uv run python -m milo.gateway --mcp
-
-# Or register the gateway with your AI host
-claude mcp add milo -- uv run python -m milo.gateway --mcp
-```
-
-The gateway namespaces tools automatically: `taskman.add`, `ghub.repo.list`, etc. Implements MCP 2025-11-25 with `outputSchema`, `structuredContent`, and tool `title` fields.
-
-
-
---
## Architecture
@@ -402,9 +428,9 @@ App.run()
| Section | Description |
|---------|-------------|
| [Get Started](https://lbliii.github.io/milo-cli/docs/get-started/) | Installation and quickstart |
+| [MCP & AI](https://lbliii.github.io/milo-cli/docs/usage/mcp/) | MCP server, gateway, annotations, streaming, and llms.txt |
| [Usage](https://lbliii.github.io/milo-cli/docs/usage/) | State, sagas, flows, forms, templates |
| [Testing](https://lbliii.github.io/milo-cli/docs/usage/testing/) | Snapshots, recording, replay |
-| [MCP & AI](https://lbliii.github.io/milo-cli/docs/usage/mcp/) | MCP server, gateway, and llms.txt |
| [Reference](https://lbliii.github.io/milo-cli/docs/reference/) | Complete API documentation |
---
diff --git a/examples/deploy/app.py b/examples/deploy/app.py
new file mode 100644
index 0000000..dbebd43
--- /dev/null
+++ b/examples/deploy/app.py
@@ -0,0 +1,230 @@
+"""Deploy — flagship dual-mode example for milo.
+
+Demonstrates the core milo idea: one command that works as both an
+interactive terminal app (when run by a human) and a structured MCP tool
+(when called by an AI agent).
+
+Human usage (interactive confirmation flow):
+
+ uv run python examples/deploy/app.py deploy --environment production --service api
+
+AI usage (structured JSON via MCP):
+
+ echo '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"deploy","arguments":{"environment":"staging","service":"api"}}}' \
+ | uv run python examples/deploy/app.py --mcp
+
+Discovery:
+
+ uv run python examples/deploy/app.py --llms-txt
+ uv run python examples/deploy/app.py --mcp # then send initialize + tools/list
+"""
+
+from __future__ import annotations
+
+import time
+from dataclasses import dataclass, replace
+from typing import Annotated
+
+from milo import (
+ CLI,
+ Action,
+ App,
+ Context,
+ Gt,
+ MaxLen,
+ MinLen,
+ Quit,
+ SpecialKey,
+)
+from milo.streaming import Progress
+
+# ---------------------------------------------------------------------------
+# Interactive confirmation state
+# ---------------------------------------------------------------------------
+
+
+@dataclass(frozen=True, slots=True)
+class ConfirmState:
+ environment: str = ""
+ service: str = ""
+ version: str = ""
+ confirmed: bool = False
+
+
+def confirm_reducer(state: ConfirmState | None, action: Action) -> ConfirmState | Quit:
+ if state is None:
+ return ConfirmState()
+ if action.type == "@@KEY":
+ key = action.payload
+ if key.name == SpecialKey.ENTER:
+ return Quit(state=replace(state, confirmed=True))
+ if key.name == SpecialKey.ESCAPE or (key.char == "q"):
+ return Quit(state=replace(state, confirmed=False), code=1)
+ return state
+
+
+# ---------------------------------------------------------------------------
+# CLI definition
+# ---------------------------------------------------------------------------
+
+cli = CLI(
+ name="deployer",
+ description="Deploy services to environments. Works as both a human CLI and an AI tool.",
+ version="0.2.0",
+)
+
+
+@cli.command(
+ "deploy",
+ description="Deploy a service to an environment",
+ annotations={"destructiveHint": True},
+)
+def deploy(
+ environment: Annotated[str, MinLen(1), MaxLen(50)],
+ service: Annotated[str, MinLen(1)],
+ version: str = "latest",
+ ctx: Context = None,
+) -> dict:
+ """Deploy a service to the specified environment.
+
+ Args:
+ environment: Target environment (dev, staging, production).
+ service: Service name to deploy.
+ version: Version tag to deploy (default: latest).
+ """
+ # Interactive mode: show confirmation UI
+ if ctx and ctx.is_interactive:
+ initial = ConfirmState(
+ environment=environment,
+ service=service,
+ version=version,
+ )
+ final = ctx.run_app(
+ reducer=confirm_reducer,
+ template="confirm.kida",
+ initial_state=initial,
+ )
+ if not final.confirmed:
+ return {"status": "cancelled", "environment": environment, "service": service}
+
+ # Simulate deployment with progress
+ yield Progress(status=f"Preparing {service}", step=0, total=3)
+ time.sleep(0.3)
+
+ yield Progress(status=f"Deploying {service} to {environment}", step=1, total=3)
+ time.sleep(0.5)
+
+ yield Progress(status="Verifying health checks", step=2, total=3)
+ time.sleep(0.2)
+
+ return {
+ "status": "deployed",
+ "environment": environment,
+ "service": service,
+ "version": version,
+ }
+
+
+@cli.command(
+ "status",
+ description="Check deployment status",
+ annotations={"readOnlyHint": True},
+)
+def status(
+ environment: Annotated[str, MinLen(1)],
+ service: Annotated[str, MinLen(1)],
+) -> dict:
+ """Check the current deployment status of a service.
+
+ Args:
+ environment: Target environment to check.
+ service: Service name to check.
+ """
+ # Simulated status
+ return {
+ "environment": environment,
+ "service": service,
+ "version": "latest",
+ "status": "healthy",
+ "uptime": "2h 15m",
+ "replicas": 3,
+ }
+
+
+@cli.command(
+ "rollback",
+ description="Rollback to previous version",
+ annotations={"destructiveHint": True, "idempotentHint": True},
+)
+def rollback(
+ environment: Annotated[str, MinLen(1)],
+ service: Annotated[str, MinLen(1)],
+ target_version: str = "previous",
+ ctx: Context = None,
+) -> dict:
+ """Rollback a service to a previous version.
+
+ Args:
+ environment: Target environment.
+ service: Service name to rollback.
+ target_version: Version to rollback to (default: previous).
+ """
+ if ctx and ctx.is_interactive and not ctx.confirm(
+ f"Rollback {service} in {environment} to {target_version}?"
+ ):
+ return {"status": "cancelled"}
+
+ yield Progress(status=f"Rolling back {service}", step=0, total=2)
+ time.sleep(0.3)
+ yield Progress(status="Verifying rollback", step=1, total=2)
+ time.sleep(0.2)
+
+ return {
+ "status": "rolled_back",
+ "environment": environment,
+ "service": service,
+ "version": target_version,
+ }
+
+
+@cli.command(
+ "environments",
+ description="List available environments",
+ annotations={"readOnlyHint": True},
+)
+def environments() -> list[dict]:
+ """List all available deployment environments."""
+ return [
+ {"name": "dev", "status": "active", "region": "us-east-1"},
+ {"name": "staging", "status": "active", "region": "us-east-1"},
+ {"name": "production", "status": "active", "region": "us-east-1,eu-west-1"},
+ ]
+
+
+@cli.resource("deploy://environments", description="Available deployment environments")
+def env_resource() -> list[dict]:
+ return environments()
+
+
+@cli.prompt("deploy-checklist", description="Pre-deployment verification checklist")
+def deploy_checklist(environment: str) -> list[dict]:
+ return [
+ {
+ "role": "user",
+ "content": {
+ "type": "text",
+ "text": (
+ f"Before deploying to {environment}, verify:\n"
+ f"1. All tests pass on the target branch\n"
+ f"2. Database migrations are ready\n"
+ f"3. Feature flags are configured for {environment}\n"
+ f"4. Monitoring dashboards are set up\n"
+ f"5. Rollback plan is documented"
+ ),
+ },
+ }
+ ]
+
+
+if __name__ == "__main__":
+ cli.run()
diff --git a/examples/deploy/templates/confirm.kida b/examples/deploy/templates/confirm.kida
new file mode 100644
index 0000000..04fbbdc
--- /dev/null
+++ b/examples/deploy/templates/confirm.kida
@@ -0,0 +1,18 @@
+{% from "components/_defs.kida" import header, status_line, kv_pair, key_hints %}
+{{ header("Deploy") }}
+{{ hr() }}
+
+ {{ kv_pair("Environment", state.environment | bold) }}
+ {{ kv_pair("Service", state.service | bold) }}
+ {{ kv_pair("Version", state.version | bold) }}
+
+{{ hr() }}
+
+{% if state.confirmed %}
+ {{ status_line("success", "Deploying...") }}
+{% else %}
+ Press {{ "Enter" | green }} to confirm or {{ "Esc" | red }} to cancel.
+{% endif %}
+
+{{ hr() }}
+{{ key_hints([{"key": "enter", "action": "confirm"}, {"key": "esc", "action": "cancel"}]) }}
diff --git a/src/milo/__init__.py b/src/milo/__init__.py
index 134d167..2502e16 100644
--- a/src/milo/__init__.py
+++ b/src/milo/__init__.py
@@ -89,6 +89,14 @@ def __getattr__(name: str):
# Plugins
"HookRegistry": "plugins",
"function_to_schema": "schema",
+ "MinLen": "schema",
+ "MaxLen": "schema",
+ "Gt": "schema",
+ "Lt": "schema",
+ "Ge": "schema",
+ "Le": "schema",
+ "Pattern": "schema",
+ "Description": "schema",
"format_output": "output",
"write_output": "output",
"generate_llms_txt": "llms",
@@ -150,6 +158,7 @@ def _Py_mod_gil() -> int: # noqa: N802
"ConfigSpec",
"Context",
"Delay",
+ "Description",
"DevServer",
"DoctorReport",
"ErrorCode",
@@ -163,18 +172,25 @@ def _Py_mod_gil() -> int: # noqa: N802
"Fork",
"FormError",
"FormState",
+ "Ge",
"GlobalOption",
"Group",
"GroupDef",
+ "Gt",
"HelpRenderer",
"HookRegistry",
"InputError",
"InvokeResult",
"Key",
"LazyCommandDef",
+ "Le",
+ "Lt",
"MCPCall",
+ "MaxLen",
"MiddlewareStack",
"MiloError",
+ "MinLen",
+ "Pattern",
"Phase",
"PhaseStatus",
"Pipeline",
diff --git a/src/milo/_command_defs.py b/src/milo/_command_defs.py
index 734b4a9..cae1c31 100644
--- a/src/milo/_command_defs.py
+++ b/src/milo/_command_defs.py
@@ -6,7 +6,7 @@
import inspect
import threading
from collections.abc import Callable
-from dataclasses import dataclass
+from dataclasses import dataclass, field
from typing import Any
@@ -57,6 +57,8 @@ class CommandDef:
examples: tuple[dict[str, Any], ...] = ()
confirm: str = ""
"""If non-empty, prompt for confirmation before running."""
+ annotations: dict[str, Any] = field(default_factory=dict)
+ """MCP tool annotations (readOnlyHint, destructiveHint, etc.)."""
class LazyCommandDef:
@@ -75,6 +77,7 @@ class LazyCommandDef:
"_resolved",
"_schema",
"aliases",
+ "annotations",
"confirm",
"description",
"examples",
@@ -96,6 +99,7 @@ def __init__(
hidden: bool = False,
examples: tuple[dict[str, Any], ...] | list[dict[str, Any]] = (),
confirm: str = "",
+ annotations: dict[str, Any] | None = None,
) -> None:
self.name = name
self.description = description
@@ -105,6 +109,7 @@ def __init__(
self.hidden = hidden
self.examples = tuple(examples)
self.confirm = confirm
+ self.annotations = annotations or {}
self._schema = schema
self._resolved: CommandDef | None = None
self._lock = threading.Lock()
@@ -153,6 +158,7 @@ def resolve(self) -> CommandDef:
hidden=self.hidden,
examples=self.examples,
confirm=self.confirm,
+ annotations=self.annotations,
)
return self._resolved
@@ -178,6 +184,7 @@ def _make_command_def(
hidden: bool = False,
examples: tuple[dict[str, Any], ...] = (),
confirm: str = "",
+ annotations: dict[str, Any] | None = None,
) -> CommandDef:
"""Build a CommandDef from a function and decorator kwargs."""
from milo.schema import function_to_schema
@@ -196,6 +203,7 @@ def _make_command_def(
hidden=hidden,
examples=examples,
confirm=confirm,
+ annotations=annotations or {},
)
diff --git a/src/milo/_jsonrpc.py b/src/milo/_jsonrpc.py
index b41081c..6624599 100644
--- a/src/milo/_jsonrpc.py
+++ b/src/milo/_jsonrpc.py
@@ -25,6 +25,13 @@ def _write_error(req_id: Any, code: int, message: str) -> None:
sys.stdout.flush()
+def _write_notification(method: str, params: dict[str, Any]) -> None:
+ """Write a JSON-RPC notification (no id field, no response expected)."""
+ notification = {"jsonrpc": "2.0", "method": method, "params": params}
+ sys.stdout.write(json.dumps(notification) + "\n")
+ sys.stdout.flush()
+
+
def _stderr(message: str) -> None:
sys.stderr.write(message + "\n")
sys.stderr.flush()
diff --git a/src/milo/commands.py b/src/milo/commands.py
index ceaa4be..4201868 100644
--- a/src/milo/commands.py
+++ b/src/milo/commands.py
@@ -185,6 +185,7 @@ def command(
hidden: bool = False,
examples: tuple[dict[str, Any], ...] | list[dict[str, Any]] = (),
confirm: str = "",
+ annotations: dict[str, Any] | None = None,
) -> Callable:
"""Register a function as a CLI command.
@@ -195,6 +196,8 @@ def command(
Args:
confirm: If set, prompt user with this message before executing.
+ annotations: MCP tool annotations (readOnlyHint, destructiveHint,
+ idempotentHint, openWorldHint).
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@@ -207,6 +210,7 @@ def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
hidden=hidden,
examples=tuple(examples),
confirm=confirm,
+ annotations=annotations,
)
self._commands[name] = cmd
for alias in aliases:
@@ -228,6 +232,7 @@ def lazy_command(
hidden: bool = False,
examples: tuple[dict[str, Any], ...] | list[dict[str, Any]] = (),
confirm: str = "",
+ annotations: dict[str, Any] | None = None,
) -> LazyCommandDef:
"""Register a lazy-loaded command.
@@ -244,6 +249,7 @@ def lazy_command(
hidden=hidden,
examples=examples,
confirm=confirm,
+ annotations=annotations,
)
self._commands[name] = cmd
for alias in aliases:
@@ -1003,6 +1009,39 @@ def call(self, command_name: str, **kwargs: Any) -> Any:
return result
+ def call_raw(self, command_name: str, **kwargs: Any) -> Any:
+ """Call a command without consuming generators.
+
+ Like :meth:`call`, but returns the raw result — if the handler
+ returns a generator, it is *not* consumed. The MCP server uses
+ this to stream ``Progress`` yields as notifications.
+ """
+ found = self.get_command(command_name)
+ if not found:
+ suggestion = self.suggest_command(command_name)
+ msg = f"Unknown command: {command_name!r}"
+ if suggestion:
+ msg += f". Did you mean {suggestion!r}?"
+ raise ValueError(msg)
+
+ cmd = found.resolve() if isinstance(found, LazyCommandDef) else found
+
+ sig = inspect.signature(cmd.handler)
+ valid = {
+ k: v
+ for k, v in kwargs.items()
+ if k in sig.parameters and not _is_context_param(sig.parameters[k])
+ }
+
+ if self._middleware:
+ from milo.context import Context as ContextClass
+ from milo.middleware import MCPCall
+
+ ctx = ContextClass()
+ call = MCPCall(method="tools/call", name=command_name, arguments=valid)
+ return self._middleware.execute(ctx, call, lambda c: cmd.handler(**c.arguments))
+ return cmd.handler(**valid)
+
def suggest_command(self, name: str) -> str | None:
"""Suggest the closest command name for typo correction."""
all_names = [path for path, _ in self.walk_commands()]
diff --git a/src/milo/llms.py b/src/milo/llms.py
index 60516ac..27068c6 100644
--- a/src/milo/llms.py
+++ b/src/milo/llms.py
@@ -129,6 +129,21 @@ def _format_command(cmd: CommandDef | LazyCommandDef) -> str:
parts.append(f": {cmd.description}" if cmd.description else "")
+ # Annotations (behavioral hints)
+ annotations = getattr(cmd, "annotations", {})
+ if annotations:
+ hints = []
+ if annotations.get("readOnlyHint"):
+ hints.append("read-only")
+ if annotations.get("destructiveHint"):
+ hints.append("destructive")
+ if annotations.get("idempotentHint"):
+ hints.append("idempotent")
+ if annotations.get("openWorldHint"):
+ hints.append("open-world")
+ if hints:
+ parts.append(f" [{', '.join(hints)}]")
+
# Parameter summary
props = cmd.schema.get("properties", {})
required = set(cmd.schema.get("required", []))
diff --git a/src/milo/mcp.py b/src/milo/mcp.py
index 2cfcdcf..0eb4b27 100644
--- a/src/milo/mcp.py
+++ b/src/milo/mcp.py
@@ -4,12 +4,14 @@
import json
import sys
+import time
from typing import TYPE_CHECKING, Any
from milo import __version__ as _server_version
from milo._jsonrpc import MCP_VERSION as _MCP_VERSION
-from milo._jsonrpc import _stderr, _write_error, _write_result
+from milo._jsonrpc import _stderr, _write_error, _write_notification, _write_result
from milo._mcp_router import dispatch
+from milo.observability import RequestLogger, log_request, new_correlation_id
if TYPE_CHECKING:
from milo.commands import CLI, CommandDef, LazyCommandDef
@@ -28,6 +30,7 @@ class _CLIHandler:
def __init__(self, cli: CLI, cached_tools: list[dict[str, Any]] | None = None) -> None:
self._cli = cli
self._cached_tools = cached_tools
+ self._logger = RequestLogger()
def initialize(self, params: dict[str, Any]) -> dict[str, Any]:
return {
@@ -42,23 +45,68 @@ def initialize(self, params: dict[str, Any]) -> dict[str, Any]:
}
def list_tools(self, params: dict[str, Any]) -> dict[str, Any]:
+ new_correlation_id()
+ start = time.monotonic()
tools = self._cached_tools if self._cached_tools is not None else _list_tools(self._cli)
+ log_request(self._logger, "tools/list", "", start)
return {"tools": tools}
def call_tool(self, params: dict[str, Any]) -> dict[str, Any]:
- return _call_tool(self._cli, params)
+ new_correlation_id()
+ start = time.monotonic()
+ result = _call_tool(self._cli, params)
+ error = "" if not result.get("isError") else result["content"][0].get("text", "")
+ log_request(
+ self._logger,
+ "tools/call",
+ params.get("name", ""),
+ start,
+ error=error,
+ )
+ return result
def list_resources(self, params: dict[str, Any]) -> dict[str, Any]:
- return {"resources": _list_resources(self._cli)}
+ new_correlation_id()
+ start = time.monotonic()
+ resources = _list_resources(self._cli) + _builtin_resources()
+ log_request(self._logger, "resources/list", "", start)
+ return {"resources": resources}
def read_resource(self, params: dict[str, Any]) -> dict[str, Any]:
- return _read_resource(self._cli, params)
+ uri = params.get("uri", "")
+ if uri == "milo://stats":
+ return _stats_resource(self._logger)
+ new_correlation_id()
+ start = time.monotonic()
+ try:
+ result = _read_resource(self._cli, params)
+ except Exception as e:
+ log_request(self._logger, "resources/read", uri, start, error=str(e))
+ raise
+ log_request(self._logger, "resources/read", uri, start)
+ return result
def list_prompts(self, params: dict[str, Any]) -> dict[str, Any]:
- return {"prompts": _list_prompts(self._cli)}
+ new_correlation_id()
+ start = time.monotonic()
+ prompts = _list_prompts(self._cli)
+ log_request(self._logger, "prompts/list", "", start)
+ return {"prompts": prompts}
def get_prompt(self, params: dict[str, Any]) -> dict[str, Any]:
- return _get_prompt(self._cli, params)
+ new_correlation_id()
+ start = time.monotonic()
+ result = _get_prompt(self._cli, params)
+ # Detect errors returned as message payloads
+ error = ""
+ for message in result.get("messages", []):
+ content = message.get("content", {})
+ text = content.get("text", "") if isinstance(content, dict) else ""
+ if text.startswith("Error:"):
+ error = text
+ break
+ log_request(self._logger, "prompts/get", params.get("name", ""), start, error=error)
+ return result
def run_mcp_server(cli: CLI) -> None:
@@ -119,6 +167,25 @@ def run_mcp_server(cli: CLI) -> None:
_write_error(req_id, -32603, str(e))
+def _builtin_resources() -> list[dict[str, Any]]:
+ """Built-in MCP resources provided by the milo runtime."""
+ return [
+ {
+ "uri": "milo://stats",
+ "name": "Server Statistics",
+ "description": "Request latency, error counts, and throughput for this MCP server",
+ "mimeType": "application/json",
+ },
+ ]
+
+
+def _stats_resource(logger: RequestLogger) -> dict[str, Any]:
+ """Return server statistics as an MCP resource."""
+ stats = logger.stats()
+ text = json.dumps(stats, indent=2)
+ return {"contents": [{"uri": "milo://stats", "text": text, "mimeType": "application/json"}]}
+
+
def _list_tools(cli: CLI) -> list[dict[str, Any]]:
"""Generate MCP tools/list response from all commands including groups.
@@ -145,6 +212,10 @@ def _list_tools(cli: CLI) -> list[dict[str, Any]]:
if output_schema:
tool["outputSchema"] = output_schema
+ # annotations: MCP behavioral hints (readOnlyHint, destructiveHint, etc.)
+ if cmd.annotations:
+ tool["annotations"] = cmd.annotations
+
tools.append(tool)
return tools
@@ -186,12 +257,43 @@ def _output_schema(cmd: CommandDef | LazyCommandDef) -> dict[str, Any] | None:
def _call_tool(cli: CLI, params: dict[str, Any]) -> dict[str, Any]:
- """Handle tools/call by dispatching to the command handler."""
+ """Handle tools/call by dispatching to the command handler.
+
+ Routes through the CLI's middleware stack when present, so middleware
+ can intercept MCP-originated calls just like CLI-originated ones.
+
+ If the handler returns a generator yielding Progress objects, each
+ Progress is emitted as a ``notifications/progress`` JSON-RPC
+ notification before the final result is returned.
+ """
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
try:
- result = cli.call(tool_name, **arguments)
+ result = cli.call_raw(tool_name, **arguments)
+
+ # Stream progress notifications for generator results
+ from milo.streaming import Progress, is_generator_result
+
+ if is_generator_result(result):
+ final_value = None
+ try:
+ while True:
+ value = next(result)
+ if isinstance(value, Progress):
+ _write_notification(
+ "notifications/progress",
+ {
+ "progressToken": tool_name,
+ "progress": value.step,
+ "total": value.total or None,
+ "message": value.status,
+ },
+ )
+ except StopIteration as e:
+ final_value = e.value
+ result = final_value
+
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error: {e}"}],
@@ -235,7 +337,15 @@ def _read_resource(cli: CLI, params: dict[str, Any]) -> dict[str, Any]:
return {"contents": []}
try:
- result = res.handler()
+ if cli._middleware:
+ from milo.context import Context as ContextClass
+ from milo.middleware import MCPCall
+
+ ctx = ContextClass()
+ call = MCPCall(method="resources/read", name=uri, arguments={})
+ result = cli._middleware.execute(ctx, call, lambda _c: res.handler())
+ else:
+ result = res.handler()
except Exception as e:
return {"contents": [{"uri": uri, "text": f"Error: {e}", "mimeType": "text/plain"}]}
@@ -268,7 +378,15 @@ def _get_prompt(cli: CLI, params: dict[str, Any]) -> dict[str, Any]:
return {"messages": []}
try:
- result = p.handler(**arguments)
+ if cli._middleware:
+ from milo.context import Context as ContextClass
+ from milo.middleware import MCPCall
+
+ ctx = ContextClass()
+ call = MCPCall(method="prompts/get", name=name, arguments=arguments)
+ result = cli._middleware.execute(ctx, call, lambda c: p.handler(**c.arguments))
+ else:
+ result = p.handler(**arguments)
except Exception as e:
return {"messages": [{"role": "user", "content": {"type": "text", "text": f"Error: {e}"}}]}
diff --git a/src/milo/schema.py b/src/milo/schema.py
index b41a5f0..03acdbd 100644
--- a/src/milo/schema.py
+++ b/src/milo/schema.py
@@ -11,6 +11,79 @@
from collections.abc import Callable
from typing import Any, Literal, Union, get_args, get_origin
+# ---------------------------------------------------------------------------
+# Annotated constraint markers
+# ---------------------------------------------------------------------------
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class MinLen:
+ """Minimum length for strings (minLength) or items for arrays (minItems)."""
+
+ value: int
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class MaxLen:
+ """Maximum length for strings (maxLength) or items for arrays (maxItems)."""
+
+ value: int
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class Gt:
+ """Exclusive minimum constraint for numbers."""
+
+ value: int | float
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class Lt:
+ """Exclusive maximum constraint for numbers."""
+
+ value: int | float
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class Ge:
+ """Inclusive minimum constraint for numbers."""
+
+ value: int | float
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class Le:
+ """Inclusive maximum constraint for numbers."""
+
+ value: int | float
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class Pattern:
+ """Regex pattern constraint for strings."""
+
+ value: str
+
+
+@dataclasses.dataclass(frozen=True, slots=True)
+class Description:
+ """Override or supplement the parameter description."""
+
+ value: str
+
+
+_CONSTRAINT_MAP: dict[type, str] = {
+ MinLen: "minLength",
+ MaxLen: "maxLength",
+ Gt: "exclusiveMinimum",
+ Lt: "exclusiveMaximum",
+ Ge: "minimum",
+ Le: "maximum",
+ Pattern: "pattern",
+ Description: "description",
+}
+
+
_TYPE_MAP: dict[type, str] = {
str: "string",
int: "integer",
@@ -33,8 +106,9 @@ def function_to_schema(func: Callable[..., Any]) -> dict[str, Any]:
"""
sig = inspect.signature(func)
# Resolve string annotations (from __future__ import annotations)
+ # include_extras=True preserves Annotated metadata for constraint extraction
try:
- hints = typing.get_type_hints(func)
+ hints = typing.get_type_hints(func, include_extras=True)
except Exception:
hints = {}
@@ -57,9 +131,22 @@ def function_to_schema(func: Callable[..., Any]) -> dict[str, Any]:
if _is_context_type(annotation, name):
continue
- is_optional = _is_optional(annotation)
+ # Unwrap Annotated to check optional underneath
+ bare = annotation
+ annotated_meta: tuple = ()
+ if get_origin(bare) is typing.Annotated:
+ annotated_args = get_args(bare)
+ bare = annotated_args[0]
+ annotated_meta = annotated_args[1:]
+
+ is_optional = _is_optional(bare)
if is_optional:
- annotation = _unwrap_optional(annotation)
+ unwrapped = _unwrap_optional(bare)
+ if annotated_meta:
+ # Re-wrap: Annotated[unwrapped_type, *meta]
+ annotation = typing.Annotated[(unwrapped, *annotated_meta)]
+ else:
+ annotation = unwrapped
prop = _type_to_schema(annotation)
@@ -81,6 +168,24 @@ def function_to_schema(func: Callable[..., Any]) -> dict[str, Any]:
def _type_to_schema(annotation: Any, _seen: set[int] | None = None) -> dict[str, Any]:
"""Convert Python type annotation to JSON Schema fragment."""
+ # Annotated[T, constraints...] — unwrap and apply constraints
+ origin = get_origin(annotation)
+ if origin is typing.Annotated:
+ args = get_args(annotation)
+ base_type = args[0]
+ schema = _type_to_schema(base_type, _seen)
+ is_array = schema.get("type") == "array"
+ for meta in args[1:]:
+ key = _CONSTRAINT_MAP.get(type(meta))
+ if key:
+ # MinLen/MaxLen map to minItems/maxItems for arrays
+ if is_array and key == "minLength":
+ key = "minItems"
+ elif is_array and key == "maxLength":
+ key = "maxItems"
+ schema[key] = meta.value
+ return schema
+
# Primitive types
if annotation in _TYPE_MAP:
return {"type": _TYPE_MAP[annotation]}
diff --git a/tests/test_ai_native.py b/tests/test_ai_native.py
index 7d9807b..ca4af83 100644
--- a/tests/test_ai_native.py
+++ b/tests/test_ai_native.py
@@ -973,6 +973,226 @@ def deploy(env: str = "local") -> str:
assert "Deploy to prod" in md
+# ---------------------------------------------------------------------------
+# Phase 1: Annotations, middleware in MCP, observability
+# ---------------------------------------------------------------------------
+
+
+class TestMCPAnnotations:
+ def test_annotations_in_tools_list(self):
+ cli = CLI(name="test")
+
+ @cli.command(
+ "delete",
+ description="Delete a resource",
+ annotations={"destructiveHint": True, "idempotentHint": True},
+ )
+ def delete(name: str) -> str:
+ return f"Deleted {name}"
+
+ tools = _list_tools(cli)
+ tool = next(t for t in tools if t["name"] == "delete")
+ assert tool["annotations"]["destructiveHint"] is True
+ assert tool["annotations"]["idempotentHint"] is True
+
+ def test_no_annotations_when_empty(self):
+ cli = CLI(name="test")
+
+ @cli.command("list", description="List items")
+ def list_cmd() -> str:
+ return "items"
+
+ tools = _list_tools(cli)
+ tool = next(t for t in tools if t["name"] == "list")
+ assert "annotations" not in tool
+
+ def test_annotations_in_llms_txt(self):
+ cli = CLI(name="app")
+
+ @cli.command(
+ "rm",
+ description="Remove files",
+ annotations={"destructiveHint": True, "readOnlyHint": False},
+ )
+ def rm(path: str) -> str:
+ return f"Removed {path}"
+
+ txt = generate_llms_txt(cli)
+ assert "[destructive]" in txt
+
+ def test_read_only_annotation_in_llms_txt(self):
+ cli = CLI(name="app")
+
+ @cli.command(
+ "status",
+ description="Show status",
+ annotations={"readOnlyHint": True},
+ )
+ def status() -> str:
+ return "ok"
+
+ txt = generate_llms_txt(cli)
+ assert "[read-only]" in txt
+
+
+class TestMCPMiddleware:
+ def test_middleware_fires_on_tool_call(self):
+ cli = CLI(name="test")
+ calls = []
+
+ @cli.middleware
+ def track(ctx, call, next_fn):
+ calls.append(call.method)
+ return next_fn(call)
+
+ @cli.command("greet", description="Say hello")
+ def greet(name: str) -> str:
+ return f"Hello, {name}!"
+
+ result = _call_tool(cli, {"name": "greet", "arguments": {"name": "Agent"}})
+ assert result["content"][0]["text"] == "Hello, Agent!"
+ assert "tools/call" in calls
+
+ def test_middleware_can_transform_args(self):
+ cli = CLI(name="test")
+
+ @cli.middleware
+ def inject_default(ctx, call, next_fn):
+ from milo.middleware import MCPCall
+
+ if call.name == "greet" and "name" not in call.arguments:
+ call = MCPCall(
+ method=call.method,
+ name=call.name,
+ arguments={**call.arguments, "name": "Default"},
+ )
+ return next_fn(call)
+
+ @cli.command("greet", description="Say hello")
+ def greet(name: str) -> str:
+ return f"Hello, {name}!"
+
+ result = _call_tool(cli, {"name": "greet", "arguments": {}})
+ assert "Default" in result["content"][0]["text"]
+
+
+class TestMCPObservability:
+ def test_stats_resource_exists(self):
+ cli = CLI(name="test")
+
+ @cli.command("ping", description="Ping")
+ def ping() -> str:
+ return "pong"
+
+ handler = _CLIHandler(cli)
+ resources = handler.list_resources({})["resources"]
+ uris = [r["uri"] for r in resources]
+ assert "milo://stats" in uris
+
+ def test_stats_populated_after_calls(self):
+ cli = CLI(name="test")
+
+ @cli.command("ping", description="Ping")
+ def ping() -> str:
+ return "pong"
+
+ handler = _CLIHandler(cli)
+ handler.call_tool({"name": "ping", "arguments": {}})
+ handler.call_tool({"name": "ping", "arguments": {}})
+
+ stats_result = handler.read_resource({"uri": "milo://stats"})
+ stats = json.loads(stats_result["contents"][0]["text"])
+ assert stats["total"] == 2
+ assert stats["errors"] == 0
+ assert stats["avg_latency_ms"] >= 0
+
+ def test_stats_tracks_errors(self):
+ cli = CLI(name="test")
+
+ @cli.command("fail", description="Fail")
+ def fail() -> str:
+ raise RuntimeError("boom")
+
+ handler = _CLIHandler(cli)
+ handler.call_tool({"name": "fail", "arguments": {}})
+
+ stats_result = handler.read_resource({"uri": "milo://stats"})
+ stats = json.loads(stats_result["contents"][0]["text"])
+ assert stats["total"] == 1
+ assert stats["errors"] == 1
+
+
+class TestMCPStreaming:
+ def test_streaming_progress_notifications(self):
+ from milo.streaming import Progress
+
+ cli = CLI(name="test")
+
+ @cli.command("deploy", description="Deploy")
+ def deploy(env: str = "dev"):
+ yield Progress(status="Starting", step=0, total=2)
+ yield Progress(status="Deploying", step=1, total=2)
+ return f"Deployed to {env}"
+
+ # Capture stdout to see progress notifications
+ captured = io.StringIO()
+ with patch("sys.stdout", captured):
+ result = _call_tool(cli, {"name": "deploy", "arguments": {"env": "prod"}})
+
+ # Final result should be correct
+ assert result["content"][0]["text"] == "Deployed to prod"
+ assert "isError" not in result
+
+ # Progress notifications should have been written
+ lines = captured.getvalue().strip().split("\n")
+ notifications = [json.loads(line) for line in lines]
+ assert len(notifications) == 2
+ assert notifications[0]["method"] == "notifications/progress"
+ assert notifications[0]["params"]["message"] == "Starting"
+ assert notifications[1]["params"]["message"] == "Deploying"
+ assert notifications[1]["params"]["progress"] == 1
+
+ def test_streaming_with_no_progress(self):
+ cli = CLI(name="test")
+
+ @cli.command("simple", description="Simple")
+ def simple() -> str:
+ return "done"
+
+ # Non-generator commands should work normally (no notifications)
+ captured = io.StringIO()
+ with patch("sys.stdout", captured):
+ result = _call_tool(cli, {"name": "simple", "arguments": {}})
+
+ assert result["content"][0]["text"] == "done"
+ assert captured.getvalue() == ""
+
+ def test_call_raw_returns_generator(self):
+ from milo.streaming import Progress, is_generator_result
+
+ cli = CLI(name="test")
+
+ @cli.command("work", description="Work")
+ def work():
+ yield Progress(status="Working", step=1, total=1)
+ return "finished"
+
+ result = cli.call_raw("work")
+ assert is_generator_result(result)
+
+ # Consume it manually
+ values = []
+ try:
+ while True:
+ values.append(next(result))
+ except StopIteration as e:
+ final = e.value
+
+ assert len(values) == 1
+ assert values[0].status == "Working"
+ assert final == "finished"
+
+
class TestGenerateHelpAllBacktickFix:
def test_global_option_short_flag_formatting(self):
cli = CLI(name="myapp")
diff --git a/tests/test_mcp_resources.py b/tests/test_mcp_resources.py
index dc34c4c..786e424 100644
--- a/tests/test_mcp_resources.py
+++ b/tests/test_mcp_resources.py
@@ -44,9 +44,11 @@ class TestMCPResourcesList:
def test_list_resources(self, cli: CLI) -> None:
client = MCPClient(cli)
resources = client.list_resources()
- assert len(resources) == 2
+ # 2 user resources + 1 built-in (milo://stats)
+ assert len(resources) == 3
names = [r["name"] for r in resources]
assert "get_config" in names
+ assert "Server Statistics" in names
def test_resource_fields(self, cli: CLI) -> None:
client = MCPClient(cli)
diff --git a/tests/test_schema_v2.py b/tests/test_schema_v2.py
index 8a5c5d5..8cd2208 100644
--- a/tests/test_schema_v2.py
+++ b/tests/test_schema_v2.py
@@ -4,12 +4,24 @@
import enum
from dataclasses import dataclass, field
-from typing import Literal, TypedDict
+from typing import Annotated, Literal, TypedDict
import pytest
from milo.commands import CLI
-from milo.schema import _parse_param_docs, _type_to_schema, function_to_schema
+from milo.schema import (
+ Description,
+ Ge,
+ Gt,
+ Le,
+ Lt,
+ MaxLen,
+ MinLen,
+ Pattern,
+ _parse_param_docs,
+ _type_to_schema,
+ function_to_schema,
+)
# --- Test Enum ---
@@ -314,3 +326,91 @@ def test_parse_param_docs_sphinx(self):
""")
assert result["name"] == "The user's name."
assert result["count"] == "How many times."
+
+
+# ---------------------------------------------------------------------------
+# Annotated constraint tests
+# ---------------------------------------------------------------------------
+
+
+class TestAnnotatedConstraints:
+ def test_min_max_length(self):
+
+ def func(name: Annotated[str, MinLen(1), MaxLen(100)]):
+ pass
+
+ schema = function_to_schema(func)
+ prop = schema["properties"]["name"]
+ assert prop["type"] == "string"
+ assert prop["minLength"] == 1
+ assert prop["maxLength"] == 100
+
+ def test_gt_lt(self):
+
+ def func(age: Annotated[int, Gt(0), Lt(200)]):
+ pass
+
+ schema = function_to_schema(func)
+ prop = schema["properties"]["age"]
+ assert prop["type"] == "integer"
+ assert prop["exclusiveMinimum"] == 0
+ assert prop["exclusiveMaximum"] == 200
+
+ def test_ge_le(self):
+
+ def func(score: Annotated[float, Ge(0.0), Le(100.0)]):
+ pass
+
+ schema = function_to_schema(func)
+ prop = schema["properties"]["score"]
+ assert prop["type"] == "number"
+ assert prop["minimum"] == 0.0
+ assert prop["maximum"] == 100.0
+
+ def test_pattern(self):
+
+ def func(email: Annotated[str, Pattern(r"^[^@]+@[^@]+$")]):
+ pass
+
+ schema = function_to_schema(func)
+ prop = schema["properties"]["email"]
+ assert prop["type"] == "string"
+ assert prop["pattern"] == r"^[^@]+@[^@]+$"
+
+ def test_description_override(self):
+
+ def func(name: Annotated[str, Description("The user's full name")]):
+ pass
+
+ schema = function_to_schema(func)
+ prop = schema["properties"]["name"]
+ assert prop["description"] == "The user's full name"
+
+ def test_unknown_annotations_ignored(self):
+ def func(x: Annotated[str, "some random metadata", 42]):
+ pass
+
+ schema = function_to_schema(func)
+ prop = schema["properties"]["x"]
+ assert prop == {"type": "string"}
+
+ def test_annotated_optional(self):
+
+ def func(name: Annotated[str | None, MinLen(1)] = None):
+ pass
+
+ schema = function_to_schema(func)
+ prop = schema["properties"]["name"]
+ assert prop["type"] == "string"
+ assert prop["minLength"] == 1
+ assert "required" not in schema
+
+ def test_annotated_with_list(self):
+
+ def func(tags: Annotated[list[str], MinLen(1)]):
+ pass
+
+ schema = function_to_schema(func)
+ prop = schema["properties"]["tags"]
+ assert prop["type"] == "array"
+ assert prop["minItems"] == 1