Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/penguin/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,20 @@ def getint(d):
run_from_config(project_dir, config, output, timeout=timeout, verbose=ctx.obj['VERBOSE'])


@cli.command()
@click.option("--transport", type=str, default="stdio", help="MCP transport (default: stdio).")
@click.pass_context
def mcp(ctx, transport):
"""
Start the MCP server for AI-led rehosting (runs in-container, speaks MCP over stdio).

Exposes Penguin's loop to an LLM agent as tools: run, config mutations (accumulated in
patch_90_mcp.yaml), and structured diagnostics over results/N/. See penguin.mcp.
"""
from .mcp.server import serve
serve(transport=transport)


@cli.command()
@click.argument("project_dir", type=str)
@click.option("--config", type=str, default=None, help="Path to a config file. Defaults to <project_dir>/config.yaml.")
Expand Down
43 changes: 43 additions & 0 deletions src/penguin/mcp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Penguin MCP server (Phase 1)

A [Model Context Protocol](https://modelcontextprotocol.io) adapter that lets an LLM agent
drive Penguin's rehosting loop through discrete tools instead of hand-editing YAML and
grepping raw result files. It runs **inside the Penguin container** and speaks MCP over
stdio.

## Run it

```sh
penguin mcp # stdio transport; launch under your MCP client / agent
```

The wrapper launches this in-container, so the server has direct access to the project
tree, the run entry point, and `plugins.db`.

## Tools

**Lifecycle**
- `run(project_dir, timeout?)` — execute one emulation (config + auto-merged patches) and
return the new `results/N` dir plus a health summary.

**Config mutations** (each deep-merges into a single reviewable `patch_90_mcp.yaml`,
auto-merged by Penguin's `auto_patching`; revert with `reset_patch`)
- `set_env`, `set_nvram`, `set_uboot_env`, `add_netdev`, `block_signal`,
`add_pseudofile`, `add_static_file`, `show_patch`, `reset_patch`

**Structured diagnostics** (parsed JSON, not file dumps)
- `health`, `missing_env`, `pseudofile_failures`, `netbinds`, `console(pattern)`,
`db_query(sql)`, `missing_files(procname)`

## Design notes

- `diagnostics.py` and `mutations.py` are dependency-free (pyyaml + stdlib) and unit-tested
(`tests/unit_tests/test_mcp.py`) — they need neither a container nor the `mcp` package.
- `server.py` is the only module that imports `mcp` (FastMCP); it's loaded by `penguin mcp`.
- Mutations never touch `config.yaml`; they live in one patch file so changes are auditable
and reversible — matching the "prefer patches" discipline.

## Not yet (Phase 2)

Live control via the `remotectrl` Unix socket (add uprobes/syscall hooks, toggle plugins on
a *running* guest, no reboot) and guest interaction (`guest_cmd`, VPN-bridge reachability).
22 changes: 22 additions & 0 deletions src/penguin/mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Penguin MCP server (Phase 1) — a Model Context Protocol adapter for AI-led rehosting.

This package exposes Penguin's rehosting loop to an LLM agent as discrete tools instead
of "hand-edit YAML, reboot the VM, grep multi-KB result files". It runs **inside the
Penguin container** (where it has direct access to the project tree, the run entry point,
and the SQLite event DB) and speaks MCP over stdio (`penguin mcp`).

Layout:
- ``diagnostics`` — dependency-free readers that parse ``results/N/`` artifacts into JSON.
- ``mutations`` — dependency-free writers that express config changes as a reviewable
``patch_90_mcp.yaml`` (auto-merged by Penguin's ``auto_patching``).
- ``server`` — the FastMCP server wiring those + ``run`` into MCP tools (imports the
``mcp`` package; only loaded by the ``penguin mcp`` subcommand).

Phase 1 = lifecycle (run) + config-mutation patch-writers + structured diagnostics.
Phase 2 (not yet) = live control via the ``remotectrl`` socket + guest interaction.
"""

from . import diagnostics, mutations

__all__ = ["diagnostics", "mutations"]
167 changes: 167 additions & 0 deletions src/penguin/mcp/diagnostics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
"""
Structured diagnostic readers over a Penguin ``results/N/`` directory.

These functions parse the artifacts Penguin actually writes (verified against the
``loggers``/``analysis`` pyplugins) into plain Python/JSON structures, so an agent gets
the *answer* instead of a multi-KB file dump. They are deliberately dependency-free
(pyyaml + stdlib only) and defensive: a missing/!written file yields ``{"error": ...}``
rather than raising, because not every plugin runs every time.

Verified artifact names (do NOT use the stale ``*.txt`` names from old docs):
console.log, health_final.yaml, env_missing.yaml, pseudofiles_failures.yaml,
pseudofiles_modeled.yaml, netbinds.csv, netbinds_summary.csv, nvram.csv,
uboot.log, plugins.db (SQLite).
"""

from __future__ import annotations

import csv
import os
import re
import sqlite3
from typing import Any, Optional

import yaml


def latest_results(proj_dir: str) -> Optional[str]:
"""Return the newest ``results/N`` dir for a project (resolving ``results/latest``)."""
results_base = os.path.join(proj_dir, "results")
latest = os.path.join(results_base, "latest")
if os.path.islink(latest) or os.path.isdir(latest):
return os.path.realpath(latest)
if not os.path.isdir(results_base):
return None
nums = []
for d in os.listdir(results_base):
if d.isdigit() and os.path.isdir(os.path.join(results_base, d)):
nums.append(int(d))
if not nums:
return None
return os.path.join(results_base, str(max(nums)))


def _resolve(results_dir: Optional[str], proj_dir: Optional[str]) -> Optional[str]:
if results_dir:
return results_dir
if proj_dir:
return latest_results(proj_dir)
return None


def _load_yaml(path: str) -> Any:
with open(path) as f:
return yaml.safe_load(f)


def _need(results_dir: Optional[str], proj_dir: Optional[str], name: str):
rd = _resolve(results_dir, proj_dir)
if not rd:
return None, {"error": "no results dir found; pass results_dir or run first"}
path = os.path.join(rd, name)
if not os.path.exists(path):
return None, {"error": f"{name} not present in {rd} (plugin may not have run)"}
return path, None


def read_health(results_dir: str = None, proj_dir: str = None) -> dict:
"""The end-of-run health summary (score components, panic flag, counts)."""
path, err = _need(results_dir, proj_dir, "health_final.yaml")
if err:
return err
return {"health": _load_yaml(path)}


def read_missing_env(results_dir: str = None, proj_dir: str = None) -> dict:
"""Env vars / ``/proc/cmdline`` keys the firmware read but the config didn't provide."""
path, err = _need(results_dir, proj_dir, "env_missing.yaml")
if err:
return err
return {"missing_env": _load_yaml(path)}


def read_pseudofile_failures(results_dir: str = None, proj_dir: str = None) -> dict:
"""Missing/unmodeled /dev /proc /sys files the firmware touched, with op counts."""
path, err = _need(results_dir, proj_dir, "pseudofiles_failures.yaml")
if err:
return err
return {"pseudofile_failures": _load_yaml(path)}


def read_netbinds(results_dir: str = None, proj_dir: str = None) -> dict:
"""Listening sockets the guest opened (the success signal). Rows from netbinds.csv."""
path, err = _need(results_dir, proj_dir, "netbinds.csv")
if err:
return err
rows = []
with open(path, newline="") as f:
for row in csv.reader(f):
if row:
rows.append(row)
return {"netbinds": rows, "count": len(rows)}


def grep_console(
results_dir: str = None, proj_dir: str = None, pattern: str = None, max_lines: int = 100
) -> dict:
"""Return console.log lines matching a regex (or the tail if no pattern)."""
path, err = _need(results_dir, proj_dir, "console.log")
if err:
return err
with open(path, errors="replace") as f:
lines = f.read().splitlines()
if pattern:
try:
rx = re.compile(pattern)
except re.error as e:
return {"error": f"bad regex: {e}"}
hits = [ln for ln in lines if rx.search(ln)]
else:
hits = lines
truncated = len(hits) > max_lines
return {"lines": hits[-max_lines:], "truncated": truncated, "total_matched": len(hits)}


def query_db(
sql: str, results_dir: str = None, proj_dir: str = None, limit: int = 100
) -> dict:
"""Run a read-only SELECT against ``plugins.db`` (syscalls_logger/exec_logger events).

The DB has a parent ``event`` table joined to ``syscall``/``read``/``write``/``exec`` on
``id`` (procname lives on ``event`` — you must JOIN). Only SELECT is allowed.
"""
path, err = _need(results_dir, proj_dir, "plugins.db")
if err:
return err
if not sql.lstrip().lower().startswith("select"):
return {"error": "only SELECT queries are allowed"}
if ";" in sql.rstrip().rstrip(";"):
return {"error": "multiple statements are not allowed"}
con = sqlite3.connect(f"file:{path}?mode=ro", uri=True)
try:
con.row_factory = sqlite3.Row
cur = con.execute(sql)
out = [dict(r) for r in cur.fetchmany(limit)]
return {"rows": out, "count": len(out), "truncated": len(out) == limit}
except sqlite3.Error as e:
return {"error": f"sqlite: {e}"}
finally:
con.close()


def missing_files(
results_dir: str = None, proj_dir: str = None, procname: str = None, limit: int = 30
) -> dict:
"""Canned query: files a process tried to open/stat that returned ENOENT (what to add)."""
where = (
"s.name IN ('sys_open','sys_openat','sys_stat64','sys_access','sys_faccessat') "
"AND s.retno_repr LIKE '%ENOENT%'"
)
if procname:
where += f" AND e.procname = '{procname}'"
sql = (
"SELECT e.procname, s.arg0_repr AS path, COUNT(*) AS n "
"FROM syscall s JOIN event e ON e.id = s.id "
f"WHERE {where} GROUP BY e.procname, s.arg0_repr ORDER BY n DESC"
)
return query_db(sql, results_dir=results_dir, proj_dir=proj_dir, limit=limit)
139 changes: 139 additions & 0 deletions src/penguin/mcp/mutations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""
Config-mutation writers — express an agent's config changes as a reviewable patch.

Rather than editing ``config.yaml`` in place, every mutation deep-merges into a single
``patch_90_mcp.yaml`` in the project directory. Penguin auto-discovers ``patch_*.yaml``
(when ``core.auto_patching`` is on, the default) and merges it into the validated config,
so the agent's changes are: (a) applied without touching the base config, (b) all in one
auditable file, and (c) trivially reverted (``reset_patch``). The ``90`` prefix orders it
after lower-numbered hand-authored patches.

Dependency-free (pyyaml + stdlib). Each function returns the resulting patch dict so the
caller/agent can see the new state.
"""

from __future__ import annotations

import os
from typing import Any, Optional

import yaml

PATCH_NAME = "patch_90_mcp.yaml"

_HEADER = (
"# Managed by the Penguin MCP server (penguin.mcp). Each tool call deep-merges here.\n"
"# Safe to edit or delete by hand; `reset_patch` removes it.\n"
)


def _patch_path(proj_dir: str) -> str:
return os.path.join(proj_dir, PATCH_NAME)


def _load(proj_dir: str) -> dict:
path = _patch_path(proj_dir)
if os.path.exists(path):
with open(path) as f:
return yaml.safe_load(f) or {}
return {}


def _deep_merge(dst: dict, src: dict) -> dict:
"""Recursively merge src into dst (dicts merge; lists union-append; scalars overwrite)."""
for k, v in src.items():
if isinstance(v, dict) and isinstance(dst.get(k), dict):
_deep_merge(dst[k], v)
elif isinstance(v, list) and isinstance(dst.get(k), list):
for item in v:
if item not in dst[k]:
dst[k].append(item)
else:
dst[k] = v
return dst


def _apply(proj_dir: str, fragment: dict) -> dict:
if not os.path.isdir(proj_dir):
raise ValueError(f"project dir does not exist: {proj_dir}")
patch = _deep_merge(_load(proj_dir), fragment)
with open(_patch_path(proj_dir), "w") as f:
f.write(_HEADER)
yaml.safe_dump(patch, f, sort_keys=False, default_flow_style=False)
return patch


# --- individual mutations -------------------------------------------------------------

def set_env(proj_dir: str, key: str, value: Any) -> dict:
"""Set an environment variable / boot arg (e.g. ``igloo_init``, a model string)."""
return _apply(proj_dir, {"env": {key: value}})


def set_nvram(proj_dir: str, key: str, value: Any) -> dict:
"""Seed an initial NVRAM key/value."""
return _apply(proj_dir, {"nvram": {key: value}})


def set_uboot_env(proj_dir: str, key: str, value: str) -> dict:
"""Seed a U-Boot env var (served via fw_getenv by the ``uboot`` plugin)."""
return _apply(proj_dir, {"uboot_env": {key: value}})


def add_netdev(proj_dir: str, name: str) -> dict:
"""Declare a network interface the firmware expects (e.g. ``egiga0``, ``vlan1``)."""
return _apply(proj_dir, {"netdevs": [name]})


def block_signal(proj_dir: str, signum: int) -> dict:
"""Block a signal guest-wide (supported: 6/9/15/17)."""
return _apply(proj_dir, {"blocked_signals": [int(signum)]})


def add_pseudofile(
proj_dir: str,
path: str,
read: Optional[dict] = None,
write: Optional[dict] = None,
ioctl: Optional[dict] = None,
) -> dict:
"""Create/model a pseudofile. With no models, just makes the path exist (``{}``).

Example models: read={"model": "const_buf", "val": "hello"},
write={"model": "discard"}, ioctl={"*": {"model": "return_const", "val": 0}}.
"""
spec: dict = {}
if read is not None:
spec["read"] = read
if write is not None:
spec["write"] = write
if ioctl is not None:
spec["ioctl"] = ioctl
return _apply(proj_dir, {"pseudofiles": {path: spec}})


def add_static_file(proj_dir: str, path: str, spec: dict) -> dict:
"""Add a pre-boot filesystem edit. ``spec`` is the action dict, e.g.
{"type": "symlink", "target": "/igloo/utils/exit0.sh"} or
{"type": "inline_file", "contents": "...", "mode": 0o755}.
"""
return _apply(proj_dir, {"static_files": {path: spec}})


# --- inspection / lifecycle -----------------------------------------------------------

def show_patch(proj_dir: str) -> dict:
"""Return the current MCP-managed patch (the accumulated agent changes)."""
path = _patch_path(proj_dir)
if not os.path.exists(path):
return {"patch": {}, "exists": False}
return {"patch": _load(proj_dir), "exists": True, "path": path}


def reset_patch(proj_dir: str) -> dict:
"""Delete the MCP-managed patch (revert all agent changes)."""
path = _patch_path(proj_dir)
existed = os.path.exists(path)
if existed:
os.remove(path)
return {"removed": existed}
Loading
Loading