From ef009e8ffc24f6f14946aca97ab4bd00f5f07bea Mon Sep 17 00:00:00 2001 From: Lawrence Lane Date: Fri, 3 Apr 2026 15:57:31 -0400 Subject: [PATCH 1/4] Unlock AI-native potential: annotations, middleware, observability, streaming, constraints Wire disconnected systems and add missing MCP protocol features: - Add tool annotations (readOnlyHint, destructiveHint, etc.) to @cli.command - Route MCP tool/resource/prompt calls through middleware stack - Instrument MCP server with RequestLogger and milo://stats resource - Add Annotated type constraints (MinLen, MaxLen, Gt, Lt, Ge, Le, Pattern, Description) to schema generation - Stream Progress yields as MCP notifications/progress in real time - Add CLI.call_raw() for raw generator access - Add flagship deploy example demonstrating dual-mode commands - Rewrite README to lead with AI-native CLI story Co-Authored-By: Claude Opus 4.6 --- README.md | 272 ++++++++++++++----------- examples/deploy/app.py | 230 +++++++++++++++++++++ examples/deploy/templates/confirm.kida | 18 ++ src/milo/__init__.py | 16 ++ src/milo/_command_defs.py | 10 +- src/milo/_jsonrpc.py | 7 + src/milo/commands.py | 39 ++++ src/milo/llms.py | 15 ++ src/milo/mcp.py | 109 +++++++++- src/milo/schema.py | 105 +++++++++- tests/test_ai_native.py | 220 ++++++++++++++++++++ tests/test_mcp_resources.py | 4 +- tests/test_schema_v2.py | 92 ++++++++- 13 files changed, 999 insertions(+), 138 deletions(-) create mode 100644 examples/deploy/app.py create mode 100644 examples/deploy/templates/confirm.kida diff --git a/README.md b/README.md index 7e575e7..15464da 100644 --- a/README.md +++ b/README.md @@ -5,46 +5,60 @@ [![Python 3.14+](https://img.shields.io/badge/python-3.14+-blue.svg)](https://pypi.org/project/milo-cli/) [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT) -**Template-driven CLI applications for free-threaded Python** +**Build CLIs that humans and AI agents both use natively** ```python -from milo import App, Action +from milo import CLI -def reducer(state, action): - if state is None: - return {"count": 0} - if action.type == "@@KEY" and action.payload.char == " ": - return {**state, "count": state["count"] + 1} - return state +cli = CLI(name="deployer", description="Deploy services to environments") -app = App(template="counter.kida", reducer=reducer, initial_state=None) -final_state = app.run() +@cli.command("deploy", description="Deploy a service", annotations={"destructiveHint": True}) +def deploy(environment: str, service: str, version: str = "latest") -> dict: + """Deploy a service to the specified environment.""" + return {"status": "deployed", "environment": environment, "service": service, "version": version} + +cli.run() +``` + +Three protocols from one decorator: + +```bash +# Human CLI +deployer deploy --environment production --service api + +# MCP tool (AI agent calls this via JSON-RPC) +echo '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"deploy","arguments":{"environment":"staging","service":"api"}}}' \ + | deployer --mcp + +# AI-readable discovery document +deployer --llms-txt ``` --- ## What is Milo? -Milo is a framework for building interactive terminal applications in Python 3.14t. It uses the Elm Architecture (Model-View-Update) — an immutable state tree managed by pure reducer functions, a view layer driven by Kida templates, and generator-based sagas for side effects. The result is CLI apps that are predictable, testable, and free-threading ready. +Milo is a Python framework where every CLI is simultaneously a terminal app, a command-line tool, and an MCP server. Write one function with type annotations and a docstring — Milo generates the argparse subcommand, the MCP tool schema, and the llms.txt entry automatically. **Why people pick it:** +- **Every CLI is an MCP server** — `@cli.command` produces an argparse subcommand, MCP tool, and llms.txt entry from one function. AI agents discover and call your tools with zero extra code. +- **Dual-mode commands** — The same command shows an interactive UI when a human runs it, and returns structured JSON when an AI calls it via MCP. +- **Annotated schemas** — Type hints + `Annotated` constraints generate rich JSON Schema. Agents validate inputs before calling. +- **Streaming progress** — Commands that yield `Progress` objects stream notifications to MCP clients in real time. - **Elm Architecture** — Immutable state, pure reducers, declarative views. Every state transition is explicit and testable. -- **Template-driven UI** — Render terminal output with Kida templates. Same syntax you use for HTML, now for CLI. - **Free-threading ready** — Built for Python 3.14t (PEP 703). Sagas run on `ThreadPoolExecutor` with no GIL contention. -- **Declarative flows** — Chain multi-screen state machines with the `>>` operator. No manual navigation plumbing. -- **Built-in forms** — Text, select, confirm, and password fields with validation, keyboard navigation, and TTY fallback. - **One runtime dependency** — Just `kida-templates`. No click, no rich, no curses. ## Use Milo For +- **AI agent toolchains** — Every CLI doubles as an MCP server; register multiple CLIs behind a single gateway - **Interactive CLI tools** — Wizards, installers, configuration prompts, and guided workflows +- **Dual-mode commands** — Interactive when a human runs them, structured when an AI calls them - **Multi-screen terminal apps** — Declarative flows with `>>` operator for screen-to-screen navigation - **Forms and data collection** — Text, select, confirm, and password fields with validation - **Dev tools with hot reload** — `milo dev` watches templates and live-reloads on change - **Session recording and replay** — Record user sessions to JSONL, replay for debugging or CI regression tests -- **Styled terminal output** — Kida terminal templates with ANSI colors, progress bars, and live rendering -- **AI agent integration** — Every CLI is an MCP server; register multiple CLIs behind a single gateway --- @@ -62,23 +76,33 @@ Requires Python 3.14+ ## Quick Start +### AI-Native CLI + +| Function | Description | +|----------|-------------| +| `CLI(name, description, version)` | Create a CLI application | +| `@cli.command(name, description)` | Register a typed command | +| `cli.group(name, description)` | Create a command group | +| `cli.run()` | Parse args and dispatch | +| `cli.call("cmd", **kwargs)` | Programmatic invocation | +| `--mcp` | Run as MCP server | +| `--llms-txt` | Generate AI discovery doc | +| `--mcp-install` | Register in gateway | +| `annotations={...}` | MCP behavioral hints | +| `Annotated[str, MinLen(1)]` | Schema constraints | + +### Interactive Apps + | Function | Description | |----------|-------------| | `App(template, reducer, initial_state)` | Create a single-screen app | | `App.from_flow(flow)` | Create a multi-screen app from a `Flow` | -| `app.run()` | Run the event loop, return final state | -| `Store(reducer, initial_state)` | Standalone state container | -| `combine_reducers(**reducers)` | Compose slice-based reducers | | `form(*specs)` | Run an interactive form, return `{field: value}` | | `FlowScreen(name, template, reducer)` | Define a named screen | | `flow = screen_a >> screen_b` | Chain screens into a flow | -| `render_html(state, template)` | One-shot static HTML render | -| `App.from_dir(__file__, ...)` | Auto-discover template directory | | `ctx.run_app(reducer, template, state)` | Bridge CLI commands to interactive apps | | `quit_on`, `with_cursor`, `with_confirm` | Reducer combinator decorators | -| `Cmd(fn)` | Lightweight side effect (runs on thread pool) | -| `Batch(cmds)`, `Sequence(cmds)` | Concurrent / serial command combinators | -| `TickCmd(interval)` | Self-sustaining tick (return another to keep ticking) | +| `Cmd(fn)`, `Batch(cmds)`, `Sequence(cmds)` | Side effects on thread pool | | `ViewState(cursor_visible=True, ...)` | Declarative terminal state | | `DevServer(app, watch_dirs)` | Hot-reload dev server | @@ -88,6 +112,14 @@ Requires Python 3.14+ | Feature | Description | Docs | |---------|-------------|------| +| **MCP Server** | Every CLI doubles as an MCP server — AI agents discover and call commands via JSON-RPC | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) | +| **MCP Gateway** | Single gateway aggregates all registered Milo CLIs for unified AI agent access | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) | +| **Tool Annotations** | Declare `readOnlyHint`, `destructiveHint`, `idempotentHint` per MCP spec | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) | +| **Streaming Progress** | Commands yield `Progress` objects; MCP clients receive real-time notifications | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) | +| **Schema Constraints** | `Annotated[str, MinLen(1), MaxLen(100)]` generates rich JSON Schema | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) | +| **llms.txt** | Generate AI-readable discovery documents from CLI command definitions | [llms.txt →](https://lbliii.github.io/milo-cli/docs/usage/llms/) | +| **Middleware** | Intercept MCP calls and CLI commands for logging, auth, and transformation | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) | +| **Observability** | Built-in request logging with latency stats (`milo://stats` resource) | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) | | **State Management** | Redux-style `Store` with dispatch, listeners, middleware, and saga scheduling | [State →](https://lbliii.github.io/milo-cli/docs/usage/state/) | | **Commands** | Lightweight `Cmd` thunks, `Batch`, `Sequence`, `TickCmd` for one-shot effects | [Commands →](https://lbliii.github.io/milo-cli/docs/usage/commands-effects/) | | **Sagas** | Generator-based side effects: `Call`, `Put`, `Select`, `Fork`, `Delay`, `Retry` | [Sagas →](https://lbliii.github.io/milo-cli/docs/usage/sagas/) | @@ -98,24 +130,107 @@ Requires Python 3.14+ | **Templates** | Kida-powered terminal rendering with built-in form, field, help, and progress templates | [Templates →](https://lbliii.github.io/milo-cli/docs/usage/templates/) | | **Dev Server** | `milo dev` with filesystem polling and `@@HOT_RELOAD` dispatch | [Dev →](https://lbliii.github.io/milo-cli/docs/usage/dev/) | | **Session Recording** | JSONL action log with state hashes for debugging and regression testing | [Testing →](https://lbliii.github.io/milo-cli/docs/usage/testing/) | -| **Replay** | Time-travel debugging, speed control, step-by-step mode, CI hash assertions | [Testing →](https://lbliii.github.io/milo-cli/docs/usage/testing/) | | **Snapshot Testing** | `assert_renders`, `assert_state`, `assert_saga` for deterministic test coverage | [Testing →](https://lbliii.github.io/milo-cli/docs/usage/testing/) | | **Help Rendering** | `HelpRenderer` — drop-in `argparse.HelpFormatter` using Kida templates | [Help →](https://lbliii.github.io/milo-cli/docs/usage/help/) | -| **MCP Server** | Every CLI doubles as an MCP server — AI agents discover and call commands via JSON-RPC | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) | -| **MCP Gateway** | Single gateway aggregates all registered Milo CLIs for unified AI agent access | [MCP →](https://lbliii.github.io/milo-cli/docs/usage/mcp/) | -| **llms.txt** | Generate AI-readable discovery documents from CLI command definitions | [llms.txt →](https://lbliii.github.io/milo-cli/docs/usage/llms/) | -| **Error System** | Structured error hierarchy with namespaced codes (`M-INP-001`, `M-STA-003`) | [Errors →](https://lbliii.github.io/milo-cli/docs/reference/errors/) | -| **Reducer Combinators** | `quit_on`, `with_cursor`, `with_confirm` decorators eliminate boilerplate key handling | [State →](https://lbliii.github.io/milo-cli/docs/usage/state/) | -| **Shell Completions** | Generate bash/zsh/fish completions from CLI definitions | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) | -| **Doctor Diagnostics** | `run_doctor()` validates environment, dependencies, and config health | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) | -| **Version Checking** | Automatic PyPI upgrade notices with `check_version()` | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) | | **Context** | Execution context with verbosity, output format, global options, and `run_app()` bridge | [Context →](https://lbliii.github.io/milo-cli/docs/usage/context/) | | **Configuration** | `Config` with validation, init scaffolding, and profile support | [Config →](https://lbliii.github.io/milo-cli/docs/usage/config/) | +| **Shell Completions** | Generate bash/zsh/fish completions from CLI definitions | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) | +| **Doctor Diagnostics** | `run_doctor()` validates environment, dependencies, and config health | [CLI →](https://lbliii.github.io/milo-cli/docs/usage/cli/) | --- ## Usage +
+Dual-Mode Commands — Interactive for humans, structured for AI + +```python +from milo import CLI, Context, Action, Quit, SpecialKey +from milo.streaming import Progress +from typing import Annotated +from milo import MinLen + +cli = CLI(name="deployer", description="Deploy services") + +@cli.command("deploy", description="Deploy a service", annotations={"destructiveHint": True}) +def deploy( + environment: Annotated[str, MinLen(1)], + service: Annotated[str, MinLen(1)], + ctx: Context = None, +) -> dict: + """Deploy a service to an environment.""" + # Interactive mode: show confirmation UI + if ctx and ctx.is_interactive: + if not ctx.confirm(f"Deploy {service} to {environment}?"): + return {"status": "cancelled"} + + # Stream progress (MCP clients see real-time notifications) + yield Progress(status=f"Deploying {service}", step=0, total=2) + yield Progress(status="Verifying health", step=1, total=2) + + return {"status": "deployed", "environment": environment, "service": service} +``` + +Run by a human: interactive confirmation, then progress output. +Called via MCP: progress notifications stream, then structured JSON result. + +
+ +
+MCP Server & Gateway — AI agent integration + +Every Milo CLI is automatically an MCP server: + +```bash +# Run as MCP server (stdin/stdout JSON-RPC) +myapp --mcp + +# Register with an AI host directly +claude mcp add myapp -- uv run python examples/deploy/app.py --mcp +``` + +For multiple CLIs, register them and run a single gateway: + +```bash +# Register CLIs +taskman --mcp-install +deployer --mcp-install + +# Run the unified gateway +uv run python -m milo.gateway --mcp + +# Or register the gateway with your AI host +claude mcp add milo -- uv run python -m milo.gateway --mcp +``` + +The gateway namespaces tools automatically: `taskman.add`, `deployer.deploy`, etc. Implements MCP 2025-11-25 with `outputSchema`, `structuredContent`, tool `annotations`, and streaming `Progress` notifications. + +Built-in `milo://stats` resource exposes request latency, error counts, and throughput. + +
+ +
+Schema Constraints — Rich validation from type hints + +```python +from typing import Annotated +from milo import CLI, MinLen, MaxLen, Gt, Lt, Pattern, Description + +cli = CLI(name="app") + +@cli.command("create-user", description="Create a user account") +def create_user( + name: Annotated[str, MinLen(1), MaxLen(100), Description("Full name")], + age: Annotated[int, Gt(0), Lt(200)], + email: Annotated[str, Pattern(r"^[^@]+@[^@]+$")], +) -> dict: + return {"name": name, "age": age, "email": email} +``` + +Generates JSON Schema with `minLength`, `maxLength`, `exclusiveMinimum`, `exclusiveMaximum`, `pattern`, and `description` — AI agents validate inputs before calling. + +
+
Single-Screen App — Counter with keyboard input @@ -218,64 +333,6 @@ def reducer(state, action):
-
-Middleware — Intercept and transform dispatches - -```python -def logging_middleware(dispatch, get_state): - def wrapper(action): - print(f"Action: {action.type}") - return dispatch(action) - return wrapper - -app = App( - template="app.kida", - reducer=reducer, - initial_state=None, - middleware=[logging_middleware], -) -``` - -
- -
-Dev Server — Hot reload templates - -```bash -# Watch templates and reload on change -milo dev myapp:app --watch ./templates --poll 0.25 -``` - -```python -from milo import App, DevServer - -app = App(template="dashboard.kida", reducer=reducer, initial_state=None) -server = DevServer(app, watch_dirs=("./templates",), poll_interval=0.5) -server.run() -``` - -
- -
-Session Recording & Replay — Debug and regression testing - -```python -# Record a session -app = App(template="app.kida", reducer=reducer, initial_state=None, record=True) -app.run() # Writes to session.jsonl - -# Replay for debugging -milo replay session.jsonl --speed 2.0 --diff - -# CI regression: assert state hashes match -milo replay session.jsonl --assert --reducer myapp:reducer - -# Step-by-step interactive replay -milo replay session.jsonl --step -``` - -
-
Testing Utilities — Snapshot, state, and saga assertions @@ -297,37 +354,6 @@ Set `MILO_UPDATE_SNAPSHOTS=1` to regenerate snapshot files.
-
-MCP Server & Gateway — AI agent integration - -Every Milo CLI is automatically an MCP server: - -```bash -# Run as MCP server (stdin/stdout JSON-RPC) -myapp --mcp - -# Register with an AI host directly -claude mcp add myapp -- uv run python examples/taskman/app.py --mcp -``` - -For multiple CLIs, register them and run a single gateway: - -```bash -# Register CLIs -taskman --mcp-install -ghub --mcp-install - -# Run the unified gateway -uv run python -m milo.gateway --mcp - -# Or register the gateway with your AI host -claude mcp add milo -- uv run python -m milo.gateway --mcp -``` - -The gateway namespaces tools automatically: `taskman.add`, `ghub.repo.list`, etc. Implements MCP 2025-11-25 with `outputSchema`, `structuredContent`, and tool `title` fields. - -
- --- ## Architecture @@ -402,9 +428,9 @@ App.run() | Section | Description | |---------|-------------| | [Get Started](https://lbliii.github.io/milo-cli/docs/get-started/) | Installation and quickstart | +| [MCP & AI](https://lbliii.github.io/milo-cli/docs/usage/mcp/) | MCP server, gateway, annotations, streaming, and llms.txt | | [Usage](https://lbliii.github.io/milo-cli/docs/usage/) | State, sagas, flows, forms, templates | | [Testing](https://lbliii.github.io/milo-cli/docs/usage/testing/) | Snapshots, recording, replay | -| [MCP & AI](https://lbliii.github.io/milo-cli/docs/usage/mcp/) | MCP server, gateway, and llms.txt | | [Reference](https://lbliii.github.io/milo-cli/docs/reference/) | Complete API documentation | --- diff --git a/examples/deploy/app.py b/examples/deploy/app.py new file mode 100644 index 0000000..dbebd43 --- /dev/null +++ b/examples/deploy/app.py @@ -0,0 +1,230 @@ +"""Deploy — flagship dual-mode example for milo. + +Demonstrates the core milo idea: one command that works as both an +interactive terminal app (when run by a human) and a structured MCP tool +(when called by an AI agent). + +Human usage (interactive confirmation flow): + + uv run python examples/deploy/app.py deploy --environment production --service api + +AI usage (structured JSON via MCP): + + echo '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"deploy","arguments":{"environment":"staging","service":"api"}}}' \ + | uv run python examples/deploy/app.py --mcp + +Discovery: + + uv run python examples/deploy/app.py --llms-txt + uv run python examples/deploy/app.py --mcp # then send initialize + tools/list +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass, replace +from typing import Annotated + +from milo import ( + CLI, + Action, + App, + Context, + Gt, + MaxLen, + MinLen, + Quit, + SpecialKey, +) +from milo.streaming import Progress + +# --------------------------------------------------------------------------- +# Interactive confirmation state +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True, slots=True) +class ConfirmState: + environment: str = "" + service: str = "" + version: str = "" + confirmed: bool = False + + +def confirm_reducer(state: ConfirmState | None, action: Action) -> ConfirmState | Quit: + if state is None: + return ConfirmState() + if action.type == "@@KEY": + key = action.payload + if key.name == SpecialKey.ENTER: + return Quit(state=replace(state, confirmed=True)) + if key.name == SpecialKey.ESCAPE or (key.char == "q"): + return Quit(state=replace(state, confirmed=False), code=1) + return state + + +# --------------------------------------------------------------------------- +# CLI definition +# --------------------------------------------------------------------------- + +cli = CLI( + name="deployer", + description="Deploy services to environments. Works as both a human CLI and an AI tool.", + version="0.2.0", +) + + +@cli.command( + "deploy", + description="Deploy a service to an environment", + annotations={"destructiveHint": True}, +) +def deploy( + environment: Annotated[str, MinLen(1), MaxLen(50)], + service: Annotated[str, MinLen(1)], + version: str = "latest", + ctx: Context = None, +) -> dict: + """Deploy a service to the specified environment. + + Args: + environment: Target environment (dev, staging, production). + service: Service name to deploy. + version: Version tag to deploy (default: latest). + """ + # Interactive mode: show confirmation UI + if ctx and ctx.is_interactive: + initial = ConfirmState( + environment=environment, + service=service, + version=version, + ) + final = ctx.run_app( + reducer=confirm_reducer, + template="confirm.kida", + initial_state=initial, + ) + if not final.confirmed: + return {"status": "cancelled", "environment": environment, "service": service} + + # Simulate deployment with progress + yield Progress(status=f"Preparing {service}", step=0, total=3) + time.sleep(0.3) + + yield Progress(status=f"Deploying {service} to {environment}", step=1, total=3) + time.sleep(0.5) + + yield Progress(status="Verifying health checks", step=2, total=3) + time.sleep(0.2) + + return { + "status": "deployed", + "environment": environment, + "service": service, + "version": version, + } + + +@cli.command( + "status", + description="Check deployment status", + annotations={"readOnlyHint": True}, +) +def status( + environment: Annotated[str, MinLen(1)], + service: Annotated[str, MinLen(1)], +) -> dict: + """Check the current deployment status of a service. + + Args: + environment: Target environment to check. + service: Service name to check. + """ + # Simulated status + return { + "environment": environment, + "service": service, + "version": "latest", + "status": "healthy", + "uptime": "2h 15m", + "replicas": 3, + } + + +@cli.command( + "rollback", + description="Rollback to previous version", + annotations={"destructiveHint": True, "idempotentHint": True}, +) +def rollback( + environment: Annotated[str, MinLen(1)], + service: Annotated[str, MinLen(1)], + target_version: str = "previous", + ctx: Context = None, +) -> dict: + """Rollback a service to a previous version. + + Args: + environment: Target environment. + service: Service name to rollback. + target_version: Version to rollback to (default: previous). + """ + if ctx and ctx.is_interactive and not ctx.confirm( + f"Rollback {service} in {environment} to {target_version}?" + ): + return {"status": "cancelled"} + + yield Progress(status=f"Rolling back {service}", step=0, total=2) + time.sleep(0.3) + yield Progress(status="Verifying rollback", step=1, total=2) + time.sleep(0.2) + + return { + "status": "rolled_back", + "environment": environment, + "service": service, + "version": target_version, + } + + +@cli.command( + "environments", + description="List available environments", + annotations={"readOnlyHint": True}, +) +def environments() -> list[dict]: + """List all available deployment environments.""" + return [ + {"name": "dev", "status": "active", "region": "us-east-1"}, + {"name": "staging", "status": "active", "region": "us-east-1"}, + {"name": "production", "status": "active", "region": "us-east-1,eu-west-1"}, + ] + + +@cli.resource("deploy://environments", description="Available deployment environments") +def env_resource() -> list[dict]: + return environments() + + +@cli.prompt("deploy-checklist", description="Pre-deployment verification checklist") +def deploy_checklist(environment: str) -> list[dict]: + return [ + { + "role": "user", + "content": { + "type": "text", + "text": ( + f"Before deploying to {environment}, verify:\n" + f"1. All tests pass on the target branch\n" + f"2. Database migrations are ready\n" + f"3. Feature flags are configured for {environment}\n" + f"4. Monitoring dashboards are set up\n" + f"5. Rollback plan is documented" + ), + }, + } + ] + + +if __name__ == "__main__": + cli.run() diff --git a/examples/deploy/templates/confirm.kida b/examples/deploy/templates/confirm.kida new file mode 100644 index 0000000..04fbbdc --- /dev/null +++ b/examples/deploy/templates/confirm.kida @@ -0,0 +1,18 @@ +{% from "components/_defs.kida" import header, status_line, kv_pair, key_hints %} +{{ header("Deploy") }} +{{ hr() }} + + {{ kv_pair("Environment", state.environment | bold) }} + {{ kv_pair("Service", state.service | bold) }} + {{ kv_pair("Version", state.version | bold) }} + +{{ hr() }} + +{% if state.confirmed %} + {{ status_line("success", "Deploying...") }} +{% else %} + Press {{ "Enter" | green }} to confirm or {{ "Esc" | red }} to cancel. +{% endif %} + +{{ hr() }} +{{ key_hints([{"key": "enter", "action": "confirm"}, {"key": "esc", "action": "cancel"}]) }} diff --git a/src/milo/__init__.py b/src/milo/__init__.py index 134d167..2502e16 100644 --- a/src/milo/__init__.py +++ b/src/milo/__init__.py @@ -89,6 +89,14 @@ def __getattr__(name: str): # Plugins "HookRegistry": "plugins", "function_to_schema": "schema", + "MinLen": "schema", + "MaxLen": "schema", + "Gt": "schema", + "Lt": "schema", + "Ge": "schema", + "Le": "schema", + "Pattern": "schema", + "Description": "schema", "format_output": "output", "write_output": "output", "generate_llms_txt": "llms", @@ -150,6 +158,7 @@ def _Py_mod_gil() -> int: # noqa: N802 "ConfigSpec", "Context", "Delay", + "Description", "DevServer", "DoctorReport", "ErrorCode", @@ -163,18 +172,25 @@ def _Py_mod_gil() -> int: # noqa: N802 "Fork", "FormError", "FormState", + "Ge", "GlobalOption", "Group", "GroupDef", + "Gt", "HelpRenderer", "HookRegistry", "InputError", "InvokeResult", "Key", "LazyCommandDef", + "Le", + "Lt", "MCPCall", + "MaxLen", "MiddlewareStack", "MiloError", + "MinLen", + "Pattern", "Phase", "PhaseStatus", "Pipeline", diff --git a/src/milo/_command_defs.py b/src/milo/_command_defs.py index 734b4a9..cae1c31 100644 --- a/src/milo/_command_defs.py +++ b/src/milo/_command_defs.py @@ -6,7 +6,7 @@ import inspect import threading from collections.abc import Callable -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any @@ -57,6 +57,8 @@ class CommandDef: examples: tuple[dict[str, Any], ...] = () confirm: str = "" """If non-empty, prompt for confirmation before running.""" + annotations: dict[str, Any] = field(default_factory=dict) + """MCP tool annotations (readOnlyHint, destructiveHint, etc.).""" class LazyCommandDef: @@ -75,6 +77,7 @@ class LazyCommandDef: "_resolved", "_schema", "aliases", + "annotations", "confirm", "description", "examples", @@ -96,6 +99,7 @@ def __init__( hidden: bool = False, examples: tuple[dict[str, Any], ...] | list[dict[str, Any]] = (), confirm: str = "", + annotations: dict[str, Any] | None = None, ) -> None: self.name = name self.description = description @@ -105,6 +109,7 @@ def __init__( self.hidden = hidden self.examples = tuple(examples) self.confirm = confirm + self.annotations = annotations or {} self._schema = schema self._resolved: CommandDef | None = None self._lock = threading.Lock() @@ -153,6 +158,7 @@ def resolve(self) -> CommandDef: hidden=self.hidden, examples=self.examples, confirm=self.confirm, + annotations=self.annotations, ) return self._resolved @@ -178,6 +184,7 @@ def _make_command_def( hidden: bool = False, examples: tuple[dict[str, Any], ...] = (), confirm: str = "", + annotations: dict[str, Any] | None = None, ) -> CommandDef: """Build a CommandDef from a function and decorator kwargs.""" from milo.schema import function_to_schema @@ -196,6 +203,7 @@ def _make_command_def( hidden=hidden, examples=examples, confirm=confirm, + annotations=annotations or {}, ) diff --git a/src/milo/_jsonrpc.py b/src/milo/_jsonrpc.py index b41081c..6624599 100644 --- a/src/milo/_jsonrpc.py +++ b/src/milo/_jsonrpc.py @@ -25,6 +25,13 @@ def _write_error(req_id: Any, code: int, message: str) -> None: sys.stdout.flush() +def _write_notification(method: str, params: dict[str, Any]) -> None: + """Write a JSON-RPC notification (no id field, no response expected).""" + notification = {"jsonrpc": "2.0", "method": method, "params": params} + sys.stdout.write(json.dumps(notification) + "\n") + sys.stdout.flush() + + def _stderr(message: str) -> None: sys.stderr.write(message + "\n") sys.stderr.flush() diff --git a/src/milo/commands.py b/src/milo/commands.py index ceaa4be..4201868 100644 --- a/src/milo/commands.py +++ b/src/milo/commands.py @@ -185,6 +185,7 @@ def command( hidden: bool = False, examples: tuple[dict[str, Any], ...] | list[dict[str, Any]] = (), confirm: str = "", + annotations: dict[str, Any] | None = None, ) -> Callable: """Register a function as a CLI command. @@ -195,6 +196,8 @@ def command( Args: confirm: If set, prompt user with this message before executing. + annotations: MCP tool annotations (readOnlyHint, destructiveHint, + idempotentHint, openWorldHint). """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @@ -207,6 +210,7 @@ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: hidden=hidden, examples=tuple(examples), confirm=confirm, + annotations=annotations, ) self._commands[name] = cmd for alias in aliases: @@ -228,6 +232,7 @@ def lazy_command( hidden: bool = False, examples: tuple[dict[str, Any], ...] | list[dict[str, Any]] = (), confirm: str = "", + annotations: dict[str, Any] | None = None, ) -> LazyCommandDef: """Register a lazy-loaded command. @@ -244,6 +249,7 @@ def lazy_command( hidden=hidden, examples=examples, confirm=confirm, + annotations=annotations, ) self._commands[name] = cmd for alias in aliases: @@ -1003,6 +1009,39 @@ def call(self, command_name: str, **kwargs: Any) -> Any: return result + def call_raw(self, command_name: str, **kwargs: Any) -> Any: + """Call a command without consuming generators. + + Like :meth:`call`, but returns the raw result — if the handler + returns a generator, it is *not* consumed. The MCP server uses + this to stream ``Progress`` yields as notifications. + """ + found = self.get_command(command_name) + if not found: + suggestion = self.suggest_command(command_name) + msg = f"Unknown command: {command_name!r}" + if suggestion: + msg += f". Did you mean {suggestion!r}?" + raise ValueError(msg) + + cmd = found.resolve() if isinstance(found, LazyCommandDef) else found + + sig = inspect.signature(cmd.handler) + valid = { + k: v + for k, v in kwargs.items() + if k in sig.parameters and not _is_context_param(sig.parameters[k]) + } + + if self._middleware: + from milo.context import Context as ContextClass + from milo.middleware import MCPCall + + ctx = ContextClass() + call = MCPCall(method="tools/call", name=command_name, arguments=valid) + return self._middleware.execute(ctx, call, lambda c: cmd.handler(**c.arguments)) + return cmd.handler(**valid) + def suggest_command(self, name: str) -> str | None: """Suggest the closest command name for typo correction.""" all_names = [path for path, _ in self.walk_commands()] diff --git a/src/milo/llms.py b/src/milo/llms.py index 60516ac..27068c6 100644 --- a/src/milo/llms.py +++ b/src/milo/llms.py @@ -129,6 +129,21 @@ def _format_command(cmd: CommandDef | LazyCommandDef) -> str: parts.append(f": {cmd.description}" if cmd.description else "") + # Annotations (behavioral hints) + annotations = getattr(cmd, "annotations", {}) + if annotations: + hints = [] + if annotations.get("readOnlyHint"): + hints.append("read-only") + if annotations.get("destructiveHint"): + hints.append("destructive") + if annotations.get("idempotentHint"): + hints.append("idempotent") + if annotations.get("openWorldHint"): + hints.append("open-world") + if hints: + parts.append(f" [{', '.join(hints)}]") + # Parameter summary props = cmd.schema.get("properties", {}) required = set(cmd.schema.get("required", [])) diff --git a/src/milo/mcp.py b/src/milo/mcp.py index 2cfcdcf..0966ee4 100644 --- a/src/milo/mcp.py +++ b/src/milo/mcp.py @@ -4,12 +4,14 @@ import json import sys +import time from typing import TYPE_CHECKING, Any from milo import __version__ as _server_version from milo._jsonrpc import MCP_VERSION as _MCP_VERSION -from milo._jsonrpc import _stderr, _write_error, _write_result +from milo._jsonrpc import _stderr, _write_error, _write_notification, _write_result from milo._mcp_router import dispatch +from milo.observability import RequestLogger, log_request, new_correlation_id if TYPE_CHECKING: from milo.commands import CLI, CommandDef, LazyCommandDef @@ -28,6 +30,7 @@ class _CLIHandler: def __init__(self, cli: CLI, cached_tools: list[dict[str, Any]] | None = None) -> None: self._cli = cli self._cached_tools = cached_tools + self._logger = RequestLogger() def initialize(self, params: dict[str, Any]) -> dict[str, Any]: return { @@ -46,19 +49,37 @@ def list_tools(self, params: dict[str, Any]) -> dict[str, Any]: return {"tools": tools} def call_tool(self, params: dict[str, Any]) -> dict[str, Any]: - return _call_tool(self._cli, params) + new_correlation_id() + start = time.monotonic() + result = _call_tool(self._cli, params) + error = "" if not result.get("isError") else result["content"][0].get("text", "") + log_request( + self._logger, "tools/call", params.get("name", ""), start, error=error, + ) + return result def list_resources(self, params: dict[str, Any]) -> dict[str, Any]: - return {"resources": _list_resources(self._cli)} + return {"resources": _list_resources(self._cli) + _builtin_resources()} def read_resource(self, params: dict[str, Any]) -> dict[str, Any]: - return _read_resource(self._cli, params) + uri = params.get("uri", "") + if uri == "milo://stats": + return _stats_resource(self._logger) + new_correlation_id() + start = time.monotonic() + result = _read_resource(self._cli, params) + log_request(self._logger, "resources/read", uri, start) + return result def list_prompts(self, params: dict[str, Any]) -> dict[str, Any]: return {"prompts": _list_prompts(self._cli)} def get_prompt(self, params: dict[str, Any]) -> dict[str, Any]: - return _get_prompt(self._cli, params) + new_correlation_id() + start = time.monotonic() + result = _get_prompt(self._cli, params) + log_request(self._logger, "prompts/get", params.get("name", ""), start) + return result def run_mcp_server(cli: CLI) -> None: @@ -119,6 +140,25 @@ def run_mcp_server(cli: CLI) -> None: _write_error(req_id, -32603, str(e)) +def _builtin_resources() -> list[dict[str, Any]]: + """Built-in MCP resources provided by the milo runtime.""" + return [ + { + "uri": "milo://stats", + "name": "Server Statistics", + "description": "Request latency, error counts, and throughput for this MCP server", + "mimeType": "application/json", + }, + ] + + +def _stats_resource(logger: RequestLogger) -> dict[str, Any]: + """Return server statistics as an MCP resource.""" + stats = logger.stats() + text = json.dumps(stats, indent=2) + return {"contents": [{"uri": "milo://stats", "text": text, "mimeType": "application/json"}]} + + def _list_tools(cli: CLI) -> list[dict[str, Any]]: """Generate MCP tools/list response from all commands including groups. @@ -145,6 +185,10 @@ def _list_tools(cli: CLI) -> list[dict[str, Any]]: if output_schema: tool["outputSchema"] = output_schema + # annotations: MCP behavioral hints (readOnlyHint, destructiveHint, etc.) + if cmd.annotations: + tool["annotations"] = cmd.annotations + tools.append(tool) return tools @@ -186,12 +230,43 @@ def _output_schema(cmd: CommandDef | LazyCommandDef) -> dict[str, Any] | None: def _call_tool(cli: CLI, params: dict[str, Any]) -> dict[str, Any]: - """Handle tools/call by dispatching to the command handler.""" + """Handle tools/call by dispatching to the command handler. + + Routes through the CLI's middleware stack when present, so middleware + can intercept MCP-originated calls just like CLI-originated ones. + + If the handler returns a generator yielding Progress objects, each + Progress is emitted as a ``notifications/progress`` JSON-RPC + notification before the final result is returned. + """ tool_name = params.get("name", "") arguments = params.get("arguments", {}) try: - result = cli.call(tool_name, **arguments) + result = cli.call_raw(tool_name, **arguments) + + # Stream progress notifications for generator results + from milo.streaming import Progress, is_generator_result + + if is_generator_result(result): + final_value = None + try: + while True: + value = next(result) + if isinstance(value, Progress): + _write_notification( + "notifications/progress", + { + "progressToken": tool_name, + "progress": value.step, + "total": value.total or None, + "message": value.status, + }, + ) + except StopIteration as e: + final_value = e.value + result = final_value + except Exception as e: return { "content": [{"type": "text", "text": f"Error: {e}"}], @@ -235,7 +310,15 @@ def _read_resource(cli: CLI, params: dict[str, Any]) -> dict[str, Any]: return {"contents": []} try: - result = res.handler() + if cli._middleware: + from milo.context import Context as ContextClass + from milo.middleware import MCPCall + + ctx = ContextClass() + call = MCPCall(method="resources/read", name=uri, arguments={}) + result = cli._middleware.execute(ctx, call, lambda _c: res.handler()) + else: + result = res.handler() except Exception as e: return {"contents": [{"uri": uri, "text": f"Error: {e}", "mimeType": "text/plain"}]} @@ -268,7 +351,15 @@ def _get_prompt(cli: CLI, params: dict[str, Any]) -> dict[str, Any]: return {"messages": []} try: - result = p.handler(**arguments) + if cli._middleware: + from milo.context import Context as ContextClass + from milo.middleware import MCPCall + + ctx = ContextClass() + call = MCPCall(method="prompts/get", name=name, arguments=arguments) + result = cli._middleware.execute(ctx, call, lambda c: p.handler(**c.arguments)) + else: + result = p.handler(**arguments) except Exception as e: return {"messages": [{"role": "user", "content": {"type": "text", "text": f"Error: {e}"}}]} diff --git a/src/milo/schema.py b/src/milo/schema.py index b41a5f0..f79022a 100644 --- a/src/milo/schema.py +++ b/src/milo/schema.py @@ -11,6 +11,79 @@ from collections.abc import Callable from typing import Any, Literal, Union, get_args, get_origin +# --------------------------------------------------------------------------- +# Annotated constraint markers +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass(frozen=True, slots=True) +class MinLen: + """Minimum length constraint for strings/arrays.""" + + value: int + + +@dataclasses.dataclass(frozen=True, slots=True) +class MaxLen: + """Maximum length constraint for strings/arrays.""" + + value: int + + +@dataclasses.dataclass(frozen=True, slots=True) +class Gt: + """Exclusive minimum constraint for numbers.""" + + value: int | float + + +@dataclasses.dataclass(frozen=True, slots=True) +class Lt: + """Exclusive maximum constraint for numbers.""" + + value: int | float + + +@dataclasses.dataclass(frozen=True, slots=True) +class Ge: + """Inclusive minimum constraint for numbers.""" + + value: int | float + + +@dataclasses.dataclass(frozen=True, slots=True) +class Le: + """Inclusive maximum constraint for numbers.""" + + value: int | float + + +@dataclasses.dataclass(frozen=True, slots=True) +class Pattern: + """Regex pattern constraint for strings.""" + + value: str + + +@dataclasses.dataclass(frozen=True, slots=True) +class Description: + """Override or supplement the parameter description.""" + + value: str + + +_CONSTRAINT_MAP: dict[type, str] = { + MinLen: "minLength", + MaxLen: "maxLength", + Gt: "exclusiveMinimum", + Lt: "exclusiveMaximum", + Ge: "minimum", + Le: "maximum", + Pattern: "pattern", + Description: "description", +} + + _TYPE_MAP: dict[type, str] = { str: "string", int: "integer", @@ -33,8 +106,9 @@ def function_to_schema(func: Callable[..., Any]) -> dict[str, Any]: """ sig = inspect.signature(func) # Resolve string annotations (from __future__ import annotations) + # include_extras=True preserves Annotated metadata for constraint extraction try: - hints = typing.get_type_hints(func) + hints = typing.get_type_hints(func, include_extras=True) except Exception: hints = {} @@ -57,9 +131,22 @@ def function_to_schema(func: Callable[..., Any]) -> dict[str, Any]: if _is_context_type(annotation, name): continue - is_optional = _is_optional(annotation) + # Unwrap Annotated to check optional underneath + bare = annotation + annotated_meta: tuple = () + if get_origin(bare) is typing.Annotated: + annotated_args = get_args(bare) + bare = annotated_args[0] + annotated_meta = annotated_args[1:] + + is_optional = _is_optional(bare) if is_optional: - annotation = _unwrap_optional(annotation) + unwrapped = _unwrap_optional(bare) + if annotated_meta: + # Re-wrap: Annotated[unwrapped_type, *meta] + annotation = typing.Annotated[(unwrapped, *annotated_meta)] + else: + annotation = unwrapped prop = _type_to_schema(annotation) @@ -81,6 +168,18 @@ def function_to_schema(func: Callable[..., Any]) -> dict[str, Any]: def _type_to_schema(annotation: Any, _seen: set[int] | None = None) -> dict[str, Any]: """Convert Python type annotation to JSON Schema fragment.""" + # Annotated[T, constraints...] — unwrap and apply constraints + origin = get_origin(annotation) + if origin is typing.Annotated: + args = get_args(annotation) + base_type = args[0] + schema = _type_to_schema(base_type, _seen) + for meta in args[1:]: + key = _CONSTRAINT_MAP.get(type(meta)) + if key: + schema[key] = meta.value + return schema + # Primitive types if annotation in _TYPE_MAP: return {"type": _TYPE_MAP[annotation]} diff --git a/tests/test_ai_native.py b/tests/test_ai_native.py index 7d9807b..ca4af83 100644 --- a/tests/test_ai_native.py +++ b/tests/test_ai_native.py @@ -973,6 +973,226 @@ def deploy(env: str = "local") -> str: assert "Deploy to prod" in md +# --------------------------------------------------------------------------- +# Phase 1: Annotations, middleware in MCP, observability +# --------------------------------------------------------------------------- + + +class TestMCPAnnotations: + def test_annotations_in_tools_list(self): + cli = CLI(name="test") + + @cli.command( + "delete", + description="Delete a resource", + annotations={"destructiveHint": True, "idempotentHint": True}, + ) + def delete(name: str) -> str: + return f"Deleted {name}" + + tools = _list_tools(cli) + tool = next(t for t in tools if t["name"] == "delete") + assert tool["annotations"]["destructiveHint"] is True + assert tool["annotations"]["idempotentHint"] is True + + def test_no_annotations_when_empty(self): + cli = CLI(name="test") + + @cli.command("list", description="List items") + def list_cmd() -> str: + return "items" + + tools = _list_tools(cli) + tool = next(t for t in tools if t["name"] == "list") + assert "annotations" not in tool + + def test_annotations_in_llms_txt(self): + cli = CLI(name="app") + + @cli.command( + "rm", + description="Remove files", + annotations={"destructiveHint": True, "readOnlyHint": False}, + ) + def rm(path: str) -> str: + return f"Removed {path}" + + txt = generate_llms_txt(cli) + assert "[destructive]" in txt + + def test_read_only_annotation_in_llms_txt(self): + cli = CLI(name="app") + + @cli.command( + "status", + description="Show status", + annotations={"readOnlyHint": True}, + ) + def status() -> str: + return "ok" + + txt = generate_llms_txt(cli) + assert "[read-only]" in txt + + +class TestMCPMiddleware: + def test_middleware_fires_on_tool_call(self): + cli = CLI(name="test") + calls = [] + + @cli.middleware + def track(ctx, call, next_fn): + calls.append(call.method) + return next_fn(call) + + @cli.command("greet", description="Say hello") + def greet(name: str) -> str: + return f"Hello, {name}!" + + result = _call_tool(cli, {"name": "greet", "arguments": {"name": "Agent"}}) + assert result["content"][0]["text"] == "Hello, Agent!" + assert "tools/call" in calls + + def test_middleware_can_transform_args(self): + cli = CLI(name="test") + + @cli.middleware + def inject_default(ctx, call, next_fn): + from milo.middleware import MCPCall + + if call.name == "greet" and "name" not in call.arguments: + call = MCPCall( + method=call.method, + name=call.name, + arguments={**call.arguments, "name": "Default"}, + ) + return next_fn(call) + + @cli.command("greet", description="Say hello") + def greet(name: str) -> str: + return f"Hello, {name}!" + + result = _call_tool(cli, {"name": "greet", "arguments": {}}) + assert "Default" in result["content"][0]["text"] + + +class TestMCPObservability: + def test_stats_resource_exists(self): + cli = CLI(name="test") + + @cli.command("ping", description="Ping") + def ping() -> str: + return "pong" + + handler = _CLIHandler(cli) + resources = handler.list_resources({})["resources"] + uris = [r["uri"] for r in resources] + assert "milo://stats" in uris + + def test_stats_populated_after_calls(self): + cli = CLI(name="test") + + @cli.command("ping", description="Ping") + def ping() -> str: + return "pong" + + handler = _CLIHandler(cli) + handler.call_tool({"name": "ping", "arguments": {}}) + handler.call_tool({"name": "ping", "arguments": {}}) + + stats_result = handler.read_resource({"uri": "milo://stats"}) + stats = json.loads(stats_result["contents"][0]["text"]) + assert stats["total"] == 2 + assert stats["errors"] == 0 + assert stats["avg_latency_ms"] >= 0 + + def test_stats_tracks_errors(self): + cli = CLI(name="test") + + @cli.command("fail", description="Fail") + def fail() -> str: + raise RuntimeError("boom") + + handler = _CLIHandler(cli) + handler.call_tool({"name": "fail", "arguments": {}}) + + stats_result = handler.read_resource({"uri": "milo://stats"}) + stats = json.loads(stats_result["contents"][0]["text"]) + assert stats["total"] == 1 + assert stats["errors"] == 1 + + +class TestMCPStreaming: + def test_streaming_progress_notifications(self): + from milo.streaming import Progress + + cli = CLI(name="test") + + @cli.command("deploy", description="Deploy") + def deploy(env: str = "dev"): + yield Progress(status="Starting", step=0, total=2) + yield Progress(status="Deploying", step=1, total=2) + return f"Deployed to {env}" + + # Capture stdout to see progress notifications + captured = io.StringIO() + with patch("sys.stdout", captured): + result = _call_tool(cli, {"name": "deploy", "arguments": {"env": "prod"}}) + + # Final result should be correct + assert result["content"][0]["text"] == "Deployed to prod" + assert "isError" not in result + + # Progress notifications should have been written + lines = captured.getvalue().strip().split("\n") + notifications = [json.loads(line) for line in lines] + assert len(notifications) == 2 + assert notifications[0]["method"] == "notifications/progress" + assert notifications[0]["params"]["message"] == "Starting" + assert notifications[1]["params"]["message"] == "Deploying" + assert notifications[1]["params"]["progress"] == 1 + + def test_streaming_with_no_progress(self): + cli = CLI(name="test") + + @cli.command("simple", description="Simple") + def simple() -> str: + return "done" + + # Non-generator commands should work normally (no notifications) + captured = io.StringIO() + with patch("sys.stdout", captured): + result = _call_tool(cli, {"name": "simple", "arguments": {}}) + + assert result["content"][0]["text"] == "done" + assert captured.getvalue() == "" + + def test_call_raw_returns_generator(self): + from milo.streaming import Progress, is_generator_result + + cli = CLI(name="test") + + @cli.command("work", description="Work") + def work(): + yield Progress(status="Working", step=1, total=1) + return "finished" + + result = cli.call_raw("work") + assert is_generator_result(result) + + # Consume it manually + values = [] + try: + while True: + values.append(next(result)) + except StopIteration as e: + final = e.value + + assert len(values) == 1 + assert values[0].status == "Working" + assert final == "finished" + + class TestGenerateHelpAllBacktickFix: def test_global_option_short_flag_formatting(self): cli = CLI(name="myapp") diff --git a/tests/test_mcp_resources.py b/tests/test_mcp_resources.py index dc34c4c..786e424 100644 --- a/tests/test_mcp_resources.py +++ b/tests/test_mcp_resources.py @@ -44,9 +44,11 @@ class TestMCPResourcesList: def test_list_resources(self, cli: CLI) -> None: client = MCPClient(cli) resources = client.list_resources() - assert len(resources) == 2 + # 2 user resources + 1 built-in (milo://stats) + assert len(resources) == 3 names = [r["name"] for r in resources] assert "get_config" in names + assert "Server Statistics" in names def test_resource_fields(self, cli: CLI) -> None: client = MCPClient(cli) diff --git a/tests/test_schema_v2.py b/tests/test_schema_v2.py index 8a5c5d5..5e5a2d6 100644 --- a/tests/test_schema_v2.py +++ b/tests/test_schema_v2.py @@ -4,10 +4,12 @@ import enum from dataclasses import dataclass, field -from typing import Literal, TypedDict +from typing import Annotated, Literal, TypedDict import pytest +from milo.schema import Description, Ge, Gt, Le, Lt, MaxLen, MinLen, Pattern + from milo.commands import CLI from milo.schema import _parse_param_docs, _type_to_schema, function_to_schema @@ -314,3 +316,91 @@ def test_parse_param_docs_sphinx(self): """) assert result["name"] == "The user's name." assert result["count"] == "How many times." + + +# --------------------------------------------------------------------------- +# Annotated constraint tests +# --------------------------------------------------------------------------- + + +class TestAnnotatedConstraints: + def test_min_max_length(self): + + def func(name: Annotated[str, MinLen(1), MaxLen(100)]): + pass + + schema = function_to_schema(func) + prop = schema["properties"]["name"] + assert prop["type"] == "string" + assert prop["minLength"] == 1 + assert prop["maxLength"] == 100 + + def test_gt_lt(self): + + def func(age: Annotated[int, Gt(0), Lt(200)]): + pass + + schema = function_to_schema(func) + prop = schema["properties"]["age"] + assert prop["type"] == "integer" + assert prop["exclusiveMinimum"] == 0 + assert prop["exclusiveMaximum"] == 200 + + def test_ge_le(self): + + def func(score: Annotated[float, Ge(0.0), Le(100.0)]): + pass + + schema = function_to_schema(func) + prop = schema["properties"]["score"] + assert prop["type"] == "number" + assert prop["minimum"] == 0.0 + assert prop["maximum"] == 100.0 + + def test_pattern(self): + + def func(email: Annotated[str, Pattern(r"^[^@]+@[^@]+$")]): + pass + + schema = function_to_schema(func) + prop = schema["properties"]["email"] + assert prop["type"] == "string" + assert prop["pattern"] == r"^[^@]+@[^@]+$" + + def test_description_override(self): + + def func(name: Annotated[str, Description("The user's full name")]): + pass + + schema = function_to_schema(func) + prop = schema["properties"]["name"] + assert prop["description"] == "The user's full name" + + def test_unknown_annotations_ignored(self): + def func(x: Annotated[str, "some random metadata", 42]): + pass + + schema = function_to_schema(func) + prop = schema["properties"]["x"] + assert prop == {"type": "string"} + + def test_annotated_optional(self): + + def func(name: Annotated[str | None, MinLen(1)] = None): + pass + + schema = function_to_schema(func) + prop = schema["properties"]["name"] + assert prop["type"] == "string" + assert prop["minLength"] == 1 + assert "required" not in schema + + def test_annotated_with_list(self): + + def func(tags: Annotated[list[str], MinLen(1)]): + pass + + schema = function_to_schema(func) + prop = schema["properties"]["tags"] + assert prop["type"] == "array" + assert prop["minLength"] == 1 From d6fd796f1fbc3ae962dc8b57d06ef0c1756c1948 Mon Sep 17 00:00:00 2001 From: Lawrence Lane Date: Fri, 3 Apr 2026 16:05:30 -0400 Subject: [PATCH 2/4] Fix import sorting in test_schema_v2.py Co-Authored-By: Claude Opus 4.6 --- tests/test_schema_v2.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/test_schema_v2.py b/tests/test_schema_v2.py index 5e5a2d6..1458973 100644 --- a/tests/test_schema_v2.py +++ b/tests/test_schema_v2.py @@ -8,10 +8,20 @@ import pytest -from milo.schema import Description, Ge, Gt, Le, Lt, MaxLen, MinLen, Pattern - from milo.commands import CLI -from milo.schema import _parse_param_docs, _type_to_schema, function_to_schema +from milo.schema import ( + Description, + Ge, + Gt, + Le, + Lt, + MaxLen, + MinLen, + Pattern, + _parse_param_docs, + _type_to_schema, + function_to_schema, +) # --- Test Enum --- From b74aee7b0511821fc6d91e4cf123db0068843150 Mon Sep 17 00:00:00 2001 From: Lawrence Lane Date: Fri, 3 Apr 2026 16:11:00 -0400 Subject: [PATCH 3/4] Fix ruff formatting in mcp.py Co-Authored-By: Claude Opus 4.6 --- src/milo/mcp.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/milo/mcp.py b/src/milo/mcp.py index 0966ee4..2cc3f99 100644 --- a/src/milo/mcp.py +++ b/src/milo/mcp.py @@ -54,7 +54,11 @@ def call_tool(self, params: dict[str, Any]) -> dict[str, Any]: result = _call_tool(self._cli, params) error = "" if not result.get("isError") else result["content"][0].get("text", "") log_request( - self._logger, "tools/call", params.get("name", ""), start, error=error, + self._logger, + "tools/call", + params.get("name", ""), + start, + error=error, ) return result From 4c0f353ba73950abe4fde484abbb38ab5603f6fa Mon Sep 17 00:00:00 2001 From: Lawrence Lane Date: Fri, 3 Apr 2026 16:24:12 -0400 Subject: [PATCH 4/4] Address PR review: fix schema array constraints, instrument all MCP endpoints - MinLen/MaxLen now emit minItems/maxItems for array types instead of minLength/maxLength (correct per JSON Schema spec) - Add observability logging to list_tools, list_resources, list_prompts - Capture errors in read_resource and get_prompt logging - Update test to expect minItems for list types Co-Authored-By: Claude Opus 4.6 --- src/milo/mcp.py | 31 +++++++++++++++++++++++++++---- src/milo/schema.py | 10 ++++++++-- tests/test_schema_v2.py | 2 +- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/milo/mcp.py b/src/milo/mcp.py index 2cc3f99..0eb4b27 100644 --- a/src/milo/mcp.py +++ b/src/milo/mcp.py @@ -45,7 +45,10 @@ def initialize(self, params: dict[str, Any]) -> dict[str, Any]: } def list_tools(self, params: dict[str, Any]) -> dict[str, Any]: + new_correlation_id() + start = time.monotonic() tools = self._cached_tools if self._cached_tools is not None else _list_tools(self._cli) + log_request(self._logger, "tools/list", "", start) return {"tools": tools} def call_tool(self, params: dict[str, Any]) -> dict[str, Any]: @@ -63,7 +66,11 @@ def call_tool(self, params: dict[str, Any]) -> dict[str, Any]: return result def list_resources(self, params: dict[str, Any]) -> dict[str, Any]: - return {"resources": _list_resources(self._cli) + _builtin_resources()} + new_correlation_id() + start = time.monotonic() + resources = _list_resources(self._cli) + _builtin_resources() + log_request(self._logger, "resources/list", "", start) + return {"resources": resources} def read_resource(self, params: dict[str, Any]) -> dict[str, Any]: uri = params.get("uri", "") @@ -71,18 +78,34 @@ def read_resource(self, params: dict[str, Any]) -> dict[str, Any]: return _stats_resource(self._logger) new_correlation_id() start = time.monotonic() - result = _read_resource(self._cli, params) + try: + result = _read_resource(self._cli, params) + except Exception as e: + log_request(self._logger, "resources/read", uri, start, error=str(e)) + raise log_request(self._logger, "resources/read", uri, start) return result def list_prompts(self, params: dict[str, Any]) -> dict[str, Any]: - return {"prompts": _list_prompts(self._cli)} + new_correlation_id() + start = time.monotonic() + prompts = _list_prompts(self._cli) + log_request(self._logger, "prompts/list", "", start) + return {"prompts": prompts} def get_prompt(self, params: dict[str, Any]) -> dict[str, Any]: new_correlation_id() start = time.monotonic() result = _get_prompt(self._cli, params) - log_request(self._logger, "prompts/get", params.get("name", ""), start) + # Detect errors returned as message payloads + error = "" + for message in result.get("messages", []): + content = message.get("content", {}) + text = content.get("text", "") if isinstance(content, dict) else "" + if text.startswith("Error:"): + error = text + break + log_request(self._logger, "prompts/get", params.get("name", ""), start, error=error) return result diff --git a/src/milo/schema.py b/src/milo/schema.py index f79022a..03acdbd 100644 --- a/src/milo/schema.py +++ b/src/milo/schema.py @@ -18,14 +18,14 @@ @dataclasses.dataclass(frozen=True, slots=True) class MinLen: - """Minimum length constraint for strings/arrays.""" + """Minimum length for strings (minLength) or items for arrays (minItems).""" value: int @dataclasses.dataclass(frozen=True, slots=True) class MaxLen: - """Maximum length constraint for strings/arrays.""" + """Maximum length for strings (maxLength) or items for arrays (maxItems).""" value: int @@ -174,9 +174,15 @@ def _type_to_schema(annotation: Any, _seen: set[int] | None = None) -> dict[str, args = get_args(annotation) base_type = args[0] schema = _type_to_schema(base_type, _seen) + is_array = schema.get("type") == "array" for meta in args[1:]: key = _CONSTRAINT_MAP.get(type(meta)) if key: + # MinLen/MaxLen map to minItems/maxItems for arrays + if is_array and key == "minLength": + key = "minItems" + elif is_array and key == "maxLength": + key = "maxItems" schema[key] = meta.value return schema diff --git a/tests/test_schema_v2.py b/tests/test_schema_v2.py index 1458973..8cd2208 100644 --- a/tests/test_schema_v2.py +++ b/tests/test_schema_v2.py @@ -413,4 +413,4 @@ def func(tags: Annotated[list[str], MinLen(1)]): schema = function_to_schema(func) prop = schema["properties"]["tags"] assert prop["type"] == "array" - assert prop["minLength"] == 1 + assert prop["minItems"] == 1