From b08e4ecc9175b79dc87b0214dff35d1b5c5d9759 Mon Sep 17 00:00:00 2001 From: AI Agent Date: Thu, 5 Mar 2026 15:48:17 -0600 Subject: [PATCH] feat(adb_vision): restore live piloting and expand audit logging --- TDD_IMPLEMENTATION_PLAN.md | 9 +- adb_vision/README.md | 5 +- adb_vision/desktop_capture.ps1 | 189 ++++++++++++++++ adb_vision/diagnose.py | 395 +++++++++++++++++++++++++++++++++ adb_vision/pilot.py | 135 +++++++++++ adb_vision/pyproject.toml | 3 + adb_vision/screenshot.py | 324 ++++++++++++++++++++++++--- adb_vision/server.py | 56 ++++- adb_vision/test_server.py | 25 ++- adb_vision/test_tool_audit.py | 70 ++++++ adb_vision/tool_audit.py | 359 ++++++++++++++++++++++++++++++ docs/ARCHITECTURE.md | 18 +- docs/ROADMAP.md | 5 +- docs/dev/logging.md | 14 ++ 14 files changed, 1547 insertions(+), 60 deletions(-) create mode 100644 adb_vision/desktop_capture.ps1 create mode 100644 adb_vision/diagnose.py create mode 100644 adb_vision/pilot.py create mode 100644 adb_vision/test_tool_audit.py create mode 100644 adb_vision/tool_audit.py diff --git a/TDD_IMPLEMENTATION_PLAN.md b/TDD_IMPLEMENTATION_PLAN.md index fd55714474..14ef0b6990 100644 --- a/TDD_IMPLEMENTATION_PLAN.md +++ b/TDD_IMPLEMENTATION_PLAN.md @@ -12,8 +12,8 @@ | Component | Location | Status | TDD Value | |-----------|----------|--------|-----------| -| Standalone MCP server | `adb_vision/server.py` | ✅ **DONE** | 6 tools, DroidCast primary backend, action logging | -| ADB screenshot backends | `adb_vision/screenshot.py` | ✅ **DONE** | DroidCast/u2/scrcpy — 17 unit tests passing | +| Standalone MCP server | `adb_vision/server.py` | ✅ **DONE** | 6 tools, structured audit logging, fixed tool surface | +| ADB screenshot backends | `adb_vision/screenshot.py` | ✅ **DONE** | DroidCast/scrcpy/u2/screenrecord — 22 unit tests passing | | ADB raw tap/swipe/keyevent | `adb_vision/server.py` | ✅ **DONE** | Via `adb_tap`, `adb_swipe`, `adb_keyevent` tools | | ALAS state machine | `alas_wrapped/module/ui/page.py` | Reference only | 43 pages, 98 transitions — extract knowledge, not code | | MEmu config | `docs/dev/memu_playbook.md` | Documented | Admin-at-startup solved via memuc.exe | @@ -56,8 +56,9 @@ def test_screenshot_returns_valid_image(): # Assert: img.size == (1280, 720), b64 is valid base64 PNG ``` -**Status:** Implemented and tested. See `adb_vision/screenshot.py` (DroidCast/u2/scrcpy backends) and -`adb_vision/test_server.py` (17 unit tests passing). Live test in `adb_vision/test_live.py`. +**Status:** Implemented and tested. See `adb_vision/screenshot.py` (DroidCast/scrcpy/u2/screenrecord backends) and +`adb_vision/test_server.py` + `adb_vision/test_tool_audit.py` (22 unit tests passing). Live verification via `diagnose.py` and `pilot.py`. +- Operational note: `adb_vision/diagnose.py` is the first-line health gate when screenshots are black; it must treat live ADB as authoritative even when `memuc.exe` requires elevation. ### P0-T3: Raw Input Tests ```python diff --git a/adb_vision/README.md b/adb_vision/README.md index 7761d0da1d..1c5c294126 100644 --- a/adb_vision/README.md +++ b/adb_vision/README.md @@ -37,7 +37,7 @@ adb_vision/ | Tool | Description | |------|-------------| -| `adb_screenshot(method)` | Screenshot via pluggable backend (auto/droidcast/scrcpy/u2/screencap) | +| `adb_screenshot(method)` | Screenshot via pluggable backend (auto/droidcast/scrcpy/u2/screenrecord/screencap) | | `adb_tap(x, y)` | Tap coordinate | | `adb_swipe(x1, y1, x2, y2, duration_ms)` | Swipe gesture | | `adb_keyevent(keycode)` | Send key event (4=BACK, 3=HOME) | @@ -48,11 +48,12 @@ adb_vision/ The screenshot problem: **`adb shell screencap` returns blank images on MEmu/VirtualBox** because the GPU never populates the Linux framebuffer. -Three alternative backends are being implemented (see GitHub issues #40-#42): +Alternative backends for MEmu/VirtualBox: 1. **DroidCast** (#40) — APK that streams screen over HTTP via SurfaceControl API 2. **scrcpy** (#41) — H.264 stream decoded to single frame 3. **uiautomator2 ATX** (#42) — ATX agent HTTP API screenshot endpoint +4. **screenrecord** — short MP4 capture + ffmpeg first-frame extraction; slower, but works when the live HTTP paths fail The `method="auto"` default tries each backend in order until one returns a valid (>5KB) image. diff --git a/adb_vision/desktop_capture.ps1 b/adb_vision/desktop_capture.ps1 new file mode 100644 index 0000000000..48bf1896d8 --- /dev/null +++ b/adb_vision/desktop_capture.ps1 @@ -0,0 +1,189 @@ +<# +.SYNOPSIS + Capture a screenshot of the MEmu emulator window using .NET drawing. +.PARAMETER OutputPath + Path to save the PNG screenshot. +.PARAMETER AutoDelete + If set, deletes the screenshot after this many seconds. Default: 300 (5 min). +#> +param( + [Parameter(Mandatory=$true)] + [string]$OutputPath, + + [int]$AutoDelete = 300 +) + +Add-Type -AssemblyName System.Windows.Forms +Add-Type -AssemblyName System.Drawing +Add-Type -AssemblyName Microsoft.VisualBasic + +# Find the actual MEmu VM window first. +# Exact title lookup for "MEmu" can resolve to a tiny hidden helper window, +# which produces useless 50x15 captures. Prefer the real process window. +Add-Type @" +using System; +using System.Runtime.InteropServices; +public class WinAPI { + [DllImport("user32.dll")] + public static extern IntPtr GetForegroundWindow(); + + [DllImport("user32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + public static extern bool SetForegroundWindow(IntPtr hWnd); + + [DllImport("user32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + public static extern bool GetWindowRect(IntPtr hWnd, out RECT lpRect); + + [DllImport("user32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + public static extern bool IsIconic(IntPtr hWnd); + + [DllImport("user32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + public static extern bool ShowWindow(IntPtr hWnd, int nCmdShow); + + [DllImport("user32.dll")] + [return: MarshalAs(UnmanagedType.Bool)] + public static extern bool PrintWindow(IntPtr hWnd, IntPtr hdcBlt, int nFlags); +} + +public struct RECT { + public int Left; + public int Top; + public int Right; + public int Bottom; +} +"@ + +function Get-WindowRectObject { + param([IntPtr]$Handle) + $rect = New-Object RECT + if (-not [WinAPI]::GetWindowRect($Handle, [ref]$rect)) { + return $null + } + [pscustomobject]@{ + Rect = $rect + Width = $rect.Right - $rect.Left + Height = $rect.Bottom - $rect.Top + Area = ($rect.Right - $rect.Left) * ($rect.Bottom - $rect.Top) + } +} + +function Get-CandidateWindow { + param([System.Diagnostics.Process[]]$Processes) + + $best = $null + foreach ($proc in $Processes) { + if ($proc.MainWindowHandle -eq 0) { continue } + $title = ($proc.MainWindowTitle | Out-String).Trim() + if (-not $title) { continue } + + $rectInfo = Get-WindowRectObject -Handle ([IntPtr]$proc.MainWindowHandle) + if ($null -eq $rectInfo) { continue } + if ($rectInfo.Width -le 200 -or $rectInfo.Height -le 200) { continue } + + $candidate = [pscustomobject]@{ + ProcessId = $proc.Id + Handle = [IntPtr]$proc.MainWindowHandle + Title = $title + Rect = $rectInfo.Rect + Width = $rectInfo.Width + Height = $rectInfo.Height + Area = $rectInfo.Area + } + + if ($null -eq $best -or $candidate.Area -gt $best.Area) { + $best = $candidate + } + } + return $best +} + +$memuWindow = Get-CandidateWindow -Processes ( + Get-Process -Name "MEmu" -ErrorAction SilentlyContinue +) + +if ($null -eq $memuWindow) { + $fallbackProcesses = Get-Process | Where-Object { + $_.MainWindowHandle -ne 0 -and ( + $_.ProcessName -like "*MEmu*" -or + $_.MainWindowTitle -like "*(MEmu*" -or + $_.MainWindowTitle -like "*MEmu*" + ) + } + $memuWindow = Get-CandidateWindow -Processes $fallbackProcesses +} + +if ($null -eq $memuWindow) { + Write-Error "MEmu window not found" + exit 1 +} + +$memuHwnd = $memuWindow.Handle + +# Save the currently active window so we can restore it +$previousWindow = [WinAPI]::GetForegroundWindow() + +# If MEmu is minimized, restore it +if ([WinAPI]::IsIconic($memuHwnd)) { + [WinAPI]::ShowWindow($memuHwnd, 9) # SW_RESTORE + Start-Sleep -Milliseconds 500 +} + +# Bring MEmu to front +[WinAPI]::ShowWindow($memuHwnd, 5) | Out-Null # SW_SHOW +try { + [Microsoft.VisualBasic.Interaction]::AppActivate($memuWindow.ProcessId) | Out-Null +} catch { + [WinAPI]::SetForegroundWindow($memuHwnd) | Out-Null +} +Start-Sleep -Milliseconds 300 + +# Get window rect +$rect = $memuWindow.Rect +$width = $memuWindow.Width +$height = $memuWindow.Height + +if ($width -le 0 -or $height -le 0) { + Write-Error "Invalid window dimensions: ${width}x${height}" + # Restore previous window + [WinAPI]::SetForegroundWindow($previousWindow) | Out-Null + exit 1 +} + +# Capture the screen region +$bitmap = New-Object System.Drawing.Bitmap($width, $height) +$graphics = [System.Drawing.Graphics]::FromImage($bitmap) +$hdc = $graphics.GetHdc() +$printed = $false +try { + $printed = [WinAPI]::PrintWindow($memuHwnd, $hdc, 2) +} finally { + $graphics.ReleaseHdc($hdc) +} +if (-not $printed) { + $graphics.CopyFromScreen($rect.Left, $rect.Top, 0, 0, + (New-Object System.Drawing.Size($width, $height))) +} +$graphics.Dispose() + +# Save +$dir = Split-Path -Parent $OutputPath +if ($dir -and !(Test-Path $dir)) { New-Item -ItemType Directory -Path $dir -Force | Out-Null } +$bitmap.Save($OutputPath, [System.Drawing.Imaging.ImageFormat]::Png) +$bitmap.Dispose() + +Write-Output "Screenshot saved: $OutputPath" + +# Restore previous window +[WinAPI]::SetForegroundWindow($previousWindow) | Out-Null + +# Schedule auto-delete +if ($AutoDelete -gt 0) { + Start-Job -ScriptBlock { + param($path, $delay) + Start-Sleep -Seconds $delay + if (Test-Path $path) { Remove-Item $path -Force } + } -ArgumentList $OutputPath, $AutoDelete | Out-Null +} diff --git a/adb_vision/diagnose.py b/adb_vision/diagnose.py new file mode 100644 index 0000000000..0c1590faad --- /dev/null +++ b/adb_vision/diagnose.py @@ -0,0 +1,395 @@ +"""Diagnostic CLI for screenshot/backend debugging. + +Usage: + uv run python diagnose.py + uv run python diagnose.py --json + +This script treats ADB connectivity as the source of truth for a live emulator +session. A Python-side VM probe is recorded for context only and never used to +short-circuit the diagnosis when ADB is healthy. +""" + +from __future__ import annotations + +import argparse +import asyncio +import base64 +import io +import json +import subprocess +import time +from datetime import datetime +from pathlib import Path +from typing import Any + +from PIL import Image, ImageStat +from emulator import memuc_cli +from screenshot import take_screenshot +from tool_audit import audit_cli_call, configure, record_command + +ADB = r"D:\Program Files\Microvirt\MEmu\adb.exe" +SERIAL = "127.0.0.1:21513" +GAME_PACKAGE = "com.YoStarEN.AzurLane" +SCREEN_DIR = Path(__file__).parent / "pilot_screens" +BLACK_STDDEV_THRESHOLD = 5.0 +SCREENSHOT_METHODS = ("droidcast", "scrcpy", "u2", "screenrecord", "screencap") + + +def _timestamp() -> str: + return datetime.now().strftime("%Y%m%dT%H%M%S") + + +async def _adb_run(*args: str, timeout: float = 10.0) -> bytes: + started = time.perf_counter() + argv = [ADB, "-s", SERIAL, *args] + proc = await asyncio.create_subprocess_exec( + *argv, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + record_command( + command_name="adb.exec", + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="error", + error=f"Timeout after {timeout}s", + ) + raise TimeoutError(f"adb timed out after {timeout}s: {' '.join(args)}") + if proc.returncode != 0: + stderr_text = stderr.decode(errors="replace").strip() + record_command( + command_name="adb.exec", + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="error", + error=f"exit {proc.returncode}", + stderr=stderr_text, + ) + raise RuntimeError( + f"adb {' '.join(args)} failed (exit {proc.returncode}): " + f"{stderr_text}" + ) + stdout_text = stdout.decode(errors="replace").strip() + stderr_text = stderr.decode(errors="replace").strip() + record_command( + command_name="adb.exec", + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="success", + stdout=stdout_text, + stderr=stderr_text, + ) + return stdout + + +def _adb_host_run(*args: str, timeout: float = 10.0) -> subprocess.CompletedProcess[str]: + started = time.perf_counter() + argv = [ADB, *args] + try: + result = subprocess.run(argv, capture_output=True, text=True, timeout=timeout) + except Exception as exc: + record_command( + command_name="adb.exec", + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="error", + error=f"{type(exc).__name__}: {exc}", + ) + raise + record_command( + command_name="adb.exec", + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="success" if result.returncode == 0 else "error", + error=None if result.returncode == 0 else f"exit {result.returncode}", + stdout=result.stdout, + stderr=result.stderr, + ) + return result + + +def _probe_vm_running_python() -> dict[str, Any]: + """Best-effort VM probe for context only. + + A live ADB device remains the source of truth for an active session. + """ + result: dict[str, Any] = { + "available": False, + "running": None, + "error": None, + "source": "pymemuc", + } + try: + from pymemuc import PyMemuc + + pmc = PyMemuc() + result["available"] = True + try: + result["running"] = bool(pmc.vm_is_running(0)) + except TypeError: + result["running"] = bool(pmc.vm_is_running()) + except Exception as exc: + result["error"] = f"{type(exc).__name__}: {exc}" + if memuc_cli.adb_connected(): + result["available"] = True + result["running"] = True + result["source"] = "adb_fallback" + return result + + +def check_adb_connected() -> tuple[bool, str]: + try: + _adb_host_run("connect", SERIAL, timeout=5) + result = _adb_host_run("-s", SERIAL, "get-state", timeout=5) + state = result.stdout.strip() or result.stderr.strip() + return state == "device", state + except Exception as exc: + return False, f"{type(exc).__name__}: {exc}" + + +def check_foreground_app() -> str | None: + try: + result = _adb_host_run("-s", SERIAL, "shell", "dumpsys", "window", "windows", timeout=8) + for line in result.stdout.splitlines(): + if "mCurrentFocus" not in line: + continue + import re + + match = re.search(r"(\S+)/\S+\}", line) + if match: + return match.group(1) + return None + except Exception: + return None + + +def is_screenshot_black(raw_png: bytes) -> bool: + if len(raw_png) < 100: + return True + img = Image.open(io.BytesIO(raw_png)).convert("L") + stddev = ImageStat.Stat(img).stddev[0] + return stddev < BLACK_STDDEV_THRESHOLD + + +def _save_png(raw_png: bytes, prefix: str) -> str: + SCREEN_DIR.mkdir(exist_ok=True) + out_path = SCREEN_DIR / f"{prefix}_{_timestamp()}.png" + out_path.write_bytes(raw_png) + return str(out_path) + + +async def _capture_backend(method: str) -> dict[str, Any]: + started = time.monotonic() + try: + png_b64 = await take_screenshot( + adb_run=_adb_run, + serial=SERIAL, + adb_exe=ADB, + method=method, + ) + raw_png = base64.b64decode(png_b64) + is_black = is_screenshot_black(raw_png) + path = _save_png(raw_png, f"diag_{method}") + return { + "success": True, + "method": method, + "bytes": len(raw_png), + "duration_ms": int((time.monotonic() - started) * 1000), + "is_black": is_black, + "path": path, + "error": None, + } + except Exception as exc: + return { + "success": False, + "method": method, + "bytes": 0, + "duration_ms": int((time.monotonic() - started) * 1000), + "is_black": None, + "path": "", + "error": f"{type(exc).__name__}: {exc}", + } + + +def probe_screenshot_backends() -> dict[str, Any]: + attempts: dict[str, Any] = {} + first_real: dict[str, Any] | None = None + first_success: dict[str, Any] | None = None + + for method in SCREENSHOT_METHODS: + result = asyncio.run(_capture_backend(method)) + attempts[method] = result + if result["success"] and first_success is None: + first_success = result + if result["success"] and not result["is_black"] and first_real is None: + first_real = result + + return { + "attempts": attempts, + "first_success": first_success, + "first_real": first_real, + } + + +def take_desktop_screenshot() -> str: + SCREEN_DIR.mkdir(exist_ok=True) + out_path = (SCREEN_DIR / f"desktop_{_timestamp()}.png").resolve() + ps_script = Path(__file__).parent / "desktop_capture.ps1" + if not ps_script.exists(): + return "" + + argv = [ + "powershell", + "-ExecutionPolicy", + "Bypass", + "-File", + str(ps_script), + "-OutputPath", + str(out_path), + "-AutoDelete", + "0", + ] + started = time.perf_counter() + try: + completed = subprocess.run(argv, capture_output=True, text=True, timeout=20) + produced_file = out_path.exists() and out_path.stat().st_size > 100 + record_command( + command_name="powershell.exec", + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="success" if completed.returncode == 0 and produced_file else "error", + error=( + None + if completed.returncode == 0 and produced_file + else ( + f"exit {completed.returncode}" + if completed.returncode != 0 + else "desktop capture did not produce a file" + ) + ), + stdout=completed.stdout, + stderr=completed.stderr, + ) + if produced_file: + return str(out_path) + return "" + except Exception as exc: + record_command( + command_name="powershell.exec", + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="error", + error=f"{type(exc).__name__}: {exc}", + ) + return "" + + +def diagnose() -> dict[str, Any]: + result: dict[str, Any] = { + "vm_probe": _probe_vm_running_python(), + "adb_connected": False, + "adb_state": "", + "foreground_app": None, + "focus_matches_game": False, + "screenshot_backend": None, + "screenshot_path": "", + "screenshot_is_black": None, + "backend_attempts": {}, + "desktop_fallback_used": False, + "desktop_screenshot_path": "", + "diagnosis": "UNKNOWN", + } + + adb_connected, adb_state = check_adb_connected() + result["adb_connected"] = adb_connected + result["adb_state"] = adb_state + + if not adb_connected: + result["diagnosis"] = "ADB_DISCONNECTED" + desktop_path = take_desktop_screenshot() + if desktop_path: + result["desktop_fallback_used"] = True + result["desktop_screenshot_path"] = desktop_path + return result + + result["foreground_app"] = check_foreground_app() + result["focus_matches_game"] = result["foreground_app"] == GAME_PACKAGE + + screenshot_probe = probe_screenshot_backends() + result["backend_attempts"] = screenshot_probe["attempts"] + + chosen = screenshot_probe["first_real"] or screenshot_probe["first_success"] + if chosen is not None: + result["screenshot_backend"] = chosen["method"] + result["screenshot_path"] = chosen["path"] + result["screenshot_is_black"] = chosen["is_black"] + + if screenshot_probe["first_real"] is not None: + result["diagnosis"] = "OK" if result["focus_matches_game"] else "APP_NOT_IN_FOREGROUND" + return result + + desktop_path = take_desktop_screenshot() + if desktop_path: + result["desktop_fallback_used"] = True + result["desktop_screenshot_path"] = desktop_path + + if screenshot_probe["first_success"] is not None: + result["diagnosis"] = ( + "BLACK_SCREEN_DESKTOP_OK" if desktop_path else "BLACK_SCREEN_NO_FALLBACK" + ) + else: + result["diagnosis"] = ( + "SCREENSHOT_BACKENDS_FAILED_DESKTOP_OK" + if desktop_path + else "SCREENSHOT_BACKENDS_FAILED" + ) + + return result + + +def _run_cli(json: bool = False) -> dict[str, Any]: + _ = json + return diagnose() + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--json", action="store_true", help="Print JSON only") + parser.add_argument("--debug-audit", action="store_true", help="Mirror audit JSON to stderr") + args = parser.parse_args() + + configure(debug=args.debug_audit) + diag = audit_cli_call("diagnose", {"json": args.json}, _run_cli) + if args.json: + print(json.dumps(diag, indent=2)) + return + + print(f"VM Probe: {json.dumps(diag['vm_probe'])}") + print(f"ADB Connected: {diag['adb_connected']}") + print(f"ADB State: {diag['adb_state']}") + print(f"Foreground App: {diag['foreground_app']}") + print(f"Focus Matches: {diag['focus_matches_game']}") + print(f"Screenshot Mode: {diag['screenshot_backend']}") + print(f"Screenshot: {diag['screenshot_path']}") + print(f"Screenshot Black: {diag['screenshot_is_black']}") + if diag["desktop_fallback_used"]: + print(f"Desktop Fallback: {diag['desktop_screenshot_path']}") + print("Backend Attempts:") + for method, attempt in diag["backend_attempts"].items(): + if attempt["success"]: + print( + f" - {method}: ok bytes={attempt['bytes']} " + f"black={attempt['is_black']} path={attempt['path']}" + ) + else: + print(f" - {method}: failed {attempt['error']}") + print(f"Diagnosis: {diag['diagnosis']}") + + +if __name__ == "__main__": + main() diff --git a/adb_vision/pilot.py b/adb_vision/pilot.py new file mode 100644 index 0000000000..ca547ed88b --- /dev/null +++ b/adb_vision/pilot.py @@ -0,0 +1,135 @@ +"""Minimal CLI for piloting the emulator through adb_vision tools. + +Usage: + uv run python pilot.py screenshot + uv run python pilot.py tap 640 360 + uv run python pilot.py swipe 640 200 640 600 + uv run python pilot.py keyevent 4 + uv run python pilot.py focus + uv run python pilot.py launch + uv run python pilot.py diagnose +""" + +from __future__ import annotations + +import argparse +import asyncio +import base64 +from datetime import datetime +from pathlib import Path + +import server +from diagnose import diagnose +from tool_audit import audit_cli_call, configure + +SCREEN_DIR = Path(__file__).parent / "pilot_screens" + + +def _save_image(raw: bytes, stem: str) -> str: + SCREEN_DIR.mkdir(exist_ok=True) + path = SCREEN_DIR / stem + path.write_bytes(raw) + return str(path) + + +def screenshot() -> str: + try: + result = asyncio.run(server.adb_screenshot(method="auto")) + except Exception as exc: + diag = diagnose() + raise RuntimeError( + f"pilot screenshot failed ({diag['diagnosis']}); check diagnose.py output" + ) from exc + raw = base64.b64decode(result["content"][0]["data"]) + filename = Path(f"screen_{datetime.now().strftime('%Y%m%dT%H%M%S')}.png") + path = _save_image(raw, filename.name) + print(f"Screenshot saved: {path} ({len(raw)} bytes)") + return path + + +def tap(x: int, y: int) -> str: + result = asyncio.run(server.adb_tap(x, y)) + print(result) + return result + + +def swipe(x1: int, y1: int, x2: int, y2: int, duration_ms: int = 300) -> str: + result = asyncio.run(server.adb_swipe(x1, y1, x2, y2, duration_ms=duration_ms)) + print(result) + return result + + +def keyevent(code: int) -> str: + result = asyncio.run(server.adb_keyevent(code)) + print(result) + return result + + +def focus() -> dict: + result = asyncio.run(server.adb_get_focus()) + print(result) + return result + + +def launch() -> str: + result = asyncio.run(server.adb_launch_game()) + print(result) + return result + + +def run_diagnose() -> dict: + result = diagnose() + print(result) + return result + + +def _main() -> int: + parser = argparse.ArgumentParser(description="Pilot the emulator through adb_vision tools") + parser.add_argument("command", help="screenshot|tap|swipe|keyevent|focus|launch|diagnose") + parser.add_argument("args", nargs="*") + parser.add_argument("--debug", action="store_true", help="enable audit debug logging") + parsed = parser.parse_args() + + configure(debug=parsed.debug) + command = parsed.command.lower() + args = parsed.args + + if command == "screenshot": + audit_cli_call("pilot.screenshot", {}, screenshot) + return 0 + if command == "tap" and len(args) >= 2: + audit_cli_call("pilot.tap", {"x": int(args[0]), "y": int(args[1])}, tap) + return 0 + if command == "swipe" and len(args) >= 4: + duration_ms = int(args[4]) if len(args) > 4 else 300 + audit_cli_call( + "pilot.swipe", + { + "x1": int(args[0]), + "y1": int(args[1]), + "x2": int(args[2]), + "y2": int(args[3]), + "duration_ms": duration_ms, + }, + swipe, + ) + return 0 + if command == "keyevent" and len(args) >= 1: + audit_cli_call("pilot.keyevent", {"code": int(args[0])}, keyevent) + return 0 + if command == "focus": + audit_cli_call("pilot.focus", {}, focus) + return 0 + if command == "launch": + audit_cli_call("pilot.launch", {}, launch) + return 0 + if command == "diagnose": + audit_cli_call("pilot.diagnose", {}, run_diagnose) + return 0 + + parser.print_help() + return 1 + + +if __name__ == "__main__": + raise SystemExit(_main()) diff --git a/adb_vision/pyproject.toml b/adb_vision/pyproject.toml index 0c1bcb385e..238172b28a 100644 --- a/adb_vision/pyproject.toml +++ b/adb_vision/pyproject.toml @@ -6,8 +6,11 @@ requires-python = ">=3.11" dependencies = [ "fastmcp>=3.0.0b1", "pillow>=10.0.0", + "av>=16.1.0", "aiofiles>=25.1.0", "google-genai>=1.0.0", + "uiautomator2>=3.5.0", + "pymemuc>=0.6.0", ] [tool.hatch.build.targets.wheel] diff --git a/adb_vision/screenshot.py b/adb_vision/screenshot.py index d08c44b16f..f7fa6289d3 100644 --- a/adb_vision/screenshot.py +++ b/adb_vision/screenshot.py @@ -3,15 +3,21 @@ import asyncio import base64 +import io import json import logging import os import shutil import tempfile +import time import urllib.error +import urllib.parse import urllib.request +from pathlib import Path from typing import Awaitable, Callable +from tool_audit import record_command, record_event + log = logging.getLogger(__name__) AdbRunFn = Callable[..., Awaitable[bytes]] @@ -28,26 +34,96 @@ ) _DROIDCAST_APK_REMOTE = "/data/local/tmp/DroidCast_raw.apk" _HTTP_TIMEOUT = 2.5 +_SCREENRECORD_SECONDS = 2 def _http_get_bytes(url: str, timeout: float = _HTTP_TIMEOUT) -> bytes: request = urllib.request.Request(url) + started = time.perf_counter() try: with urllib.request.urlopen(request, timeout=timeout) as resp: - if getattr(resp, "status", 200) >= 400: - raise RuntimeError(f"HTTP {resp.status} for {url}") + status_code = getattr(resp, "status", 200) + if status_code >= 400: + raise RuntimeError(f"HTTP {status_code} for {url}") data = resp.read() if not data: raise RuntimeError(f"HTTP response had no body: {url}") + record_event( + tool_name="http.get", + arguments={"url": url, "timeout": timeout}, + status="success", + result_summary=f"status={status_code} bytes={len(data)}", + duration_ms=(time.perf_counter() - started) * 1000, + mode="http", + event_type="http", + ) return data - except urllib.error.URLError as exc: - raise RuntimeError(f"HTTP request failed for {url}: {exc}") from exc + except Exception as exc: + record_event( + tool_name="http.get", + arguments={"url": url, "timeout": timeout}, + status="error", + result_summary="request failed", + duration_ms=(time.perf_counter() - started) * 1000, + error=f"{type(exc).__name__}: {exc}", + mode="http", + event_type="http", + ) + if isinstance(exc, urllib.error.URLError): + raise RuntimeError(f"HTTP request failed for {url}: {exc}") from exc + raise async def _http_bytes(url: str, timeout: float = _HTTP_TIMEOUT) -> bytes: return await asyncio.to_thread(_http_get_bytes, url, timeout) +def _http_post_form_json(url: str, form: dict[str, str], timeout: float = _HTTP_TIMEOUT) -> dict: + started = time.perf_counter() + data = urllib.parse.urlencode(form).encode("utf-8") + request = urllib.request.Request(url, data=data, method="POST") + try: + with urllib.request.urlopen(request, timeout=timeout) as resp: + status_code = getattr(resp, "status", 200) + if status_code >= 400: + raise RuntimeError(f"HTTP {status_code} for {url}") + body = resp.read() + if not body: + raise RuntimeError(f"HTTP response had no body: {url}") + payload = json.loads(body.decode("utf-8", errors="replace")) + record_event( + tool_name="http.post", + arguments={"url": url, "timeout": timeout, "form_keys": sorted(form.keys())}, + status="success", + result_summary=( + f"status={status_code} " + f"keys={sorted(payload.keys()) if isinstance(payload, dict) else type(payload).__name__}" + ), + duration_ms=(time.perf_counter() - started) * 1000, + mode="http", + event_type="http", + ) + return payload + except Exception as exc: + record_event( + tool_name="http.post", + arguments={"url": url, "timeout": timeout, "form_keys": sorted(form.keys())}, + status="error", + result_summary="request failed", + duration_ms=(time.perf_counter() - started) * 1000, + error=f"{type(exc).__name__}: {exc}", + mode="http", + event_type="http", + ) + if isinstance(exc, urllib.error.URLError): + raise RuntimeError(f"HTTP request failed for {url}: {exc}") from exc + raise + + +async def _http_form_json(url: str, form: dict[str, str], timeout: float = _HTTP_TIMEOUT) -> dict: + return await asyncio.to_thread(_http_post_form_json, url, form, timeout) + + def _to_png_bytes(raw: bytes) -> bytes: if raw.startswith(b"\x89PNG"): return raw @@ -70,6 +146,91 @@ def _is_png(data: bytes) -> bool: return data.startswith(b"\x89PNG") +def _find_scrcpy() -> str | None: + env_path = os.environ.get("SCRCPY_EXECUTABLE") + if env_path and os.path.isfile(env_path): + return env_path + + found = shutil.which("scrcpy") + if found: + return found + + repo_root = Path(__file__).resolve().parent.parent + tools_root = repo_root / "tools" / "scrcpy" + if tools_root.is_dir(): + candidates = sorted(tools_root.rglob("scrcpy.exe"), reverse=True) + if candidates: + return str(candidates[0]) + return None + + +def _first_video_frame_to_png(video_path: str) -> bytes: + import av + + container = av.open(video_path) + try: + for frame in container.decode(video=0): + image = frame.to_image() + buffer = io.BytesIO() + image.save(buffer, format="PNG") + return buffer.getvalue() + finally: + container.close() + raise RuntimeError(f"scrcpy recording had no video frames: {video_path}") + + +async def _run_logged_process( + *, + argv: list[str], + timeout: float, + command_name: str, +) -> tuple[bytes, bytes]: + started = time.perf_counter() + proc = await asyncio.create_subprocess_exec( + *argv, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + record_command( + command_name=command_name, + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="error", + error=f"Timeout after {timeout}s", + ) + raise RuntimeError(f"{command_name} timed out after {timeout}s") + stdout_text = stdout.decode(errors="replace").strip() + stderr_text = stderr.decode(errors="replace").strip() + if proc.returncode != 0: + record_command( + command_name=command_name, + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="error", + error=f"exit {proc.returncode}", + stdout=stdout_text, + stderr=stderr_text, + ) + raise RuntimeError( + f"{command_name} exited with code {proc.returncode}: " + f"{stderr_text[:300]}" + ) + record_command( + command_name=command_name, + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="success", + stdout=stdout_text, + stderr=stderr_text, + ) + return stdout, stderr + + async def _ensure_tcp_forward(adb_run: AdbRunFn, local_port: int, remote_port: int) -> None: local = f"tcp:{local_port}" remote = f"tcp:{remote_port}" @@ -91,6 +252,29 @@ async def _start_droidcast_server(adb_run: AdbRunFn, _serial: str, _adb_exe: str else: log.warning("DroidCast APK not found locally: %s", _DROIDCAST_APK_LOCAL) + # ATX agent is already present in this environment and provides the same + # background shell path ALAS uses for DroidCast startup. This is far more + # reliable on MEmu than a raw "adb shell nohup ..." launch. + await _ensure_tcp_forward(adb_run, _U2_PORT, _U2_PORT) + await _start_uiautomator_agent(adb_run, _serial, _adb_exe) + try: + payload = await _http_form_json( + f"http://127.0.0.1:{_U2_PORT}/shell/background", + { + "command": ( + f"CLASSPATH={_DROIDCAST_APK_REMOTE} " + "app_process / ink.mol.droidcast_raw.Main > /dev/null" + ), + "timeout": "10", + }, + timeout=5.0, + ) + if not payload.get("success", False): + raise RuntimeError(f"DroidCast background launch failed: {payload}") + return + except Exception as exc: + log.warning("ATX shell/background DroidCast launch failed: %s", exc) + command = ( f"CLASSPATH={_DROIDCAST_APK_REMOTE} " "app_process / ink.mol.droidcast_raw.Main >/dev/null 2>&1 &" @@ -130,18 +314,50 @@ async def take_screenshot( serial: str, adb_exe: str, method: str = "auto", -) -> str: + include_metadata: bool = False, +) -> str | tuple[str, dict[str, str | int]]: backends = _resolve_backends(method) last_error: Exception | None = None for name, capture_fn in backends: + started = time.perf_counter() try: log.debug("trying screenshot backend: %s", name) b64 = await capture_fn(adb_run=adb_run, serial=serial, adb_exe=adb_exe) - if b64 and len(base64.b64decode(b64)) > 5000: - return b64 + if b64: + png_bytes = len(base64.b64decode(b64)) + if png_bytes > 5000: + record_event( + tool_name="screenshot.backend", + arguments={"requested_method": method, "backend": name}, + status="success", + result_summary=f"png_bytes={png_bytes}", + duration_ms=(time.perf_counter() - started) * 1000, + event_type="backend", + ) + if include_metadata: + return b64, {"backend": name, "png_bytes": png_bytes} + return b64 last_error = RuntimeError(f"{name}: image too small, likely blank") + record_event( + tool_name="screenshot.backend", + arguments={"requested_method": method, "backend": name}, + status="error", + error=str(last_error), + result_summary="image too small", + duration_ms=(time.perf_counter() - started) * 1000, + event_type="backend", + ) except Exception as exc: last_error = exc + record_event( + tool_name="screenshot.backend", + arguments={"requested_method": method, "backend": name}, + status="error", + error=f"{type(exc).__name__}: {exc}", + result_summary="backend failed", + duration_ms=(time.perf_counter() - started) * 1000, + event_type="backend", + ) log.warning("screenshot backend %s failed: %s", name, exc) raise RuntimeError(f"All screenshot backends failed. Last error: {last_error}") @@ -151,6 +367,7 @@ def _resolve_backends(method: str): ("droidcast", _capture_droidcast), ("scrcpy", _capture_scrcpy), ("u2", _capture_u2), + ("screenrecord", _capture_screenrecord), ("screencap", _capture_screencap), ] if method == "auto": @@ -166,6 +383,58 @@ async def _capture_screencap(*, adb_run: AdbRunFn, serial: str, adb_exe: str) -> return base64.b64encode(png_data).decode("ascii") +async def _capture_screenrecord(*, adb_run: AdbRunFn, serial: str, adb_exe: str) -> str: + ffmpeg_exe = shutil.which("ffmpeg") + if not ffmpeg_exe: + raise RuntimeError("ffmpeg not found in PATH; required for screenrecord fallback") + + remote_path = f"/data/local/tmp/adb_vision_capture_{os.getpid()}.mp4" + with tempfile.TemporaryDirectory(prefix="adb_vision_") as tmpdir: + video_path = Path(tmpdir) / "capture.mp4" + frame_path = Path(tmpdir) / "frame.png" + png_data = b"" + + try: + await adb_run("shell", "rm", "-f", remote_path, timeout=5.0) + except Exception: + pass + + try: + await adb_run( + "shell", + "screenrecord", + f"--time-limit={_SCREENRECORD_SECONDS}", + remote_path, + timeout=10.0 + _SCREENRECORD_SECONDS, + ) + await adb_run("pull", remote_path, str(video_path), timeout=20.0) + await _run_logged_process( + argv=[ + ffmpeg_exe, + "-y", + "-i", + str(video_path), + "-frames:v", + "1", + "-update", + "1", + str(frame_path), + ], + timeout=20.0, + command_name="ffmpeg.exec", + ) + png_data = frame_path.read_bytes() + finally: + try: + await adb_run("shell", "rm", "-f", remote_path, timeout=5.0) + except Exception: + pass + + if not _is_png(png_data): + raise RuntimeError("screenrecord frame extraction did not produce a PNG") + return base64.b64encode(png_data).decode("ascii") + + async def _capture_droidcast(*, adb_run: AdbRunFn, serial: str, adb_exe: str) -> str: await _ensure_tcp_forward(adb_run, _DROIDCAST_PORT, _DROIDCAST_PORT) last_error: Exception | None = None @@ -188,33 +457,33 @@ async def _capture_droidcast(*, adb_run: AdbRunFn, serial: str, adb_exe: str) -> async def _capture_scrcpy(*, adb_run: AdbRunFn, serial: str, adb_exe: str) -> str: - scrcpy_exe = shutil.which("scrcpy") + scrcpy_exe = _find_scrcpy() if not scrcpy_exe: - raise RuntimeError("scrcpy not found in PATH; install scrcpy v2.7+") + raise RuntimeError("scrcpy not found; set SCRCPY_EXECUTABLE or place a portable build under tools/scrcpy/") - with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: + with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: tmp_path = tmp.name png_data = b"" try: - proc = await asyncio.create_subprocess_exec( - scrcpy_exe, - "--serial", - serial, - "screenshot", - tmp_path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + await _run_logged_process( + argv=[ + scrcpy_exe, + "--serial", + serial, + "--no-window", + "--no-audio-playback", + "--record", + tmp_path, + "--record-format", + "mp4", + "--time-limit", + "1", + ], + timeout=15.0, + command_name="scrcpy.exec", ) - try: - _, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0) - except asyncio.TimeoutError: - proc.kill() - raise RuntimeError("scrcpy screenshot timed out after 15 seconds") - if proc.returncode != 0: - raise RuntimeError(f"scrcpy exited with code {proc.returncode}: {stderr.decode()[:300]}") - with open(tmp_path, "rb") as f: - png_data = f.read() + png_data = await asyncio.to_thread(_first_video_frame_to_png, tmp_path) finally: try: os.unlink(tmp_path) @@ -243,4 +512,3 @@ async def _capture_u2(*, adb_run: AdbRunFn, serial: str, adb_exe: str) -> str: last_error = exc log.warning("u2 endpoint failed: %s", exc) raise RuntimeError(f"Failed to capture via uiautomator2: {last_error}") - diff --git a/adb_vision/server.py b/adb_vision/server.py index 8ee29c5b11..ff199ba3f4 100644 --- a/adb_vision/server.py +++ b/adb_vision/server.py @@ -1,7 +1,7 @@ """Clean ADB + VLM MCP server — no ALAS dependency. Exposes: - adb_screenshot — pluggable backend (DroidCast / scrcpy / u2 / screencap) + adb_screenshot — pluggable backend (DroidCast / scrcpy / u2 / screenrecord / screencap) adb_tap — input tap via ADB CLI adb_swipe — input swipe via ADB CLI adb_launch_game — am start Azur Lane @@ -26,6 +26,7 @@ from typing import Any, Dict, Optional from screenshot import take_screenshot +from tool_audit import AuditMiddleware, configure as configure_audit, record_command # --------------------------------------------------------------------------- # FastMCP @@ -109,11 +110,10 @@ def _find_adb() -> str: async def _adb_run(*args: str, timeout: float = 10.0) -> bytes: """Run ``adb -s `` as a non-blocking subprocess.""" + started = time.perf_counter() + argv = [ADB_EXECUTABLE, "-s", ADB_SERIAL, *args] proc = await asyncio.create_subprocess_exec( - ADB_EXECUTABLE, - "-s", - ADB_SERIAL, - *args, + *argv, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) @@ -124,14 +124,40 @@ async def _adb_run(*args: str, timeout: float = 10.0) -> bytes: except asyncio.TimeoutError: proc.kill() await proc.wait() + record_command( + command_name="adb.exec", + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="error", + error=f"Timeout after {timeout}s", + ) raise TimeoutError( f"adb timed out after {timeout}s: adb -s {ADB_SERIAL} {' '.join(args)}" ) if proc.returncode != 0: + stderr_text = stderr.decode(errors="replace").strip() + record_command( + command_name="adb.exec", + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="error", + error=f"exit {proc.returncode}", + stderr=stderr_text, + ) raise RuntimeError( f"adb {' '.join(args)} failed (exit {proc.returncode}): " - f"{stderr.decode(errors='replace').strip()}" + f"{stderr_text}" ) + stdout_text = stdout.decode(errors="replace").strip() + stderr_text = stderr.decode(errors="replace").strip() + record_command( + command_name="adb.exec", + argv=argv, + duration_ms=(time.perf_counter() - started) * 1000, + status="success", + stdout=stdout_text, + stderr=stderr_text, + ) return stdout @@ -144,25 +170,27 @@ async def adb_screenshot(method: str = "auto") -> Dict[str, Any]: """Take a screenshot from the emulator. Args: - method: Screenshot backend — "droidcast", "scrcpy", "u2", "screencap", + method: Screenshot backend — "droidcast", "scrcpy", "u2", "screenrecord", "screencap", or "auto" (tries each until one works). Returns image content with mimeType image/png and base64 data. """ t0 = time.monotonic() - png_b64 = await take_screenshot( + payload = await take_screenshot( adb_run=_adb_run, serial=ADB_SERIAL, adb_exe=ADB_EXECUTABLE, method=method, + include_metadata=True, ) + png_b64, meta = payload ms = int((time.monotonic() - t0) * 1000) png_bytes = base64.b64decode(png_b64) saved = _save_screenshot_png(png_b64, _action_seq + 1) _action_log( "adb_screenshot", {"serial": ADB_SERIAL, "method": method}, - f"png_bytes={len(png_bytes)} saved={saved}", + f"png_bytes={len(png_bytes)} backend={meta.get('backend')} saved={saved}", "", ms, ) @@ -298,11 +326,19 @@ def main(): parser.add_argument( "--screenshot-method", default="auto", - choices=["auto", "droidcast", "scrcpy", "u2", "screencap"], + choices=["auto", "droidcast", "scrcpy", "u2", "screenrecord", "screencap"], help="Default screenshot method", ) + parser.add_argument( + "--debug-audit", + action="store_true", + help="Mirror structured audit events to stderr.", + ) args = parser.parse_args() ADB_SERIAL = args.serial + configure_audit(debug=args.debug_audit) + if AuditMiddleware is not None: + mcp.add_middleware(AuditMiddleware()) mcp.run(transport="stdio") diff --git a/adb_vision/test_server.py b/adb_vision/test_server.py index f18037ef17..a6955d3f52 100644 --- a/adb_vision/test_server.py +++ b/adb_vision/test_server.py @@ -59,7 +59,7 @@ async def mock_run(*args, timeout=10.0): @pytest.mark.asyncio async def test_take_screenshot_auto_falls_through(): - """auto mode follows explicit order: droidcast -> scrcpy -> u2 -> screencap.""" + """auto mode follows explicit order until the first successful backend.""" from screenshot import take_screenshot calls = [] @@ -127,7 +127,7 @@ async def fake_scrcpy(*, adb_run, serial, adb_exe): @pytest.mark.asyncio async def test_take_screenshot_scrcpy_fallback_chain(): - """With auto, scrcpy failure falls through to u2 then screencap.""" + """With auto, scrcpy failure falls through to u2, screenrecord, then screencap.""" from screenshot import take_screenshot calls = [] @@ -147,6 +147,10 @@ async def fake_u2(*, adb_run, serial, adb_exe): calls.append("u2") raise RuntimeError("u2 unavailable") + async def fake_screenrecord(*, adb_run, serial, adb_exe): + calls.append("screenrecord") + raise RuntimeError("screenrecord unavailable") + async def fake_screencap(*, adb_run, serial, adb_exe): calls.append("screencap") return FAKE_PNG_B64 @@ -154,6 +158,8 @@ async def fake_screencap(*, adb_run, serial, adb_exe): with mock.patch("screenshot._capture_droidcast", side_effect=fake_droidcast), mock.patch( "screenshot._capture_scrcpy", side_effect=fake_scrcpy ), mock.patch("screenshot._capture_u2", side_effect=fake_u2), mock.patch( + "screenshot._capture_screenrecord", side_effect=fake_screenrecord + ), mock.patch( "screenshot._capture_screencap", side_effect=fake_screencap ): result = await take_screenshot( @@ -161,7 +167,7 @@ async def fake_screencap(*, adb_run, serial, adb_exe): ) decoded = base64.b64decode(result) assert len(decoded) > 5000 - assert calls == ["droidcast", "scrcpy", "u2", "screencap"] + assert calls == ["droidcast", "scrcpy", "u2", "screenrecord", "screencap"] @pytest.mark.asyncio @@ -292,8 +298,8 @@ async def test_scrcpy_backend_not_in_path(): """scrcpy backend raises RuntimeError when scrcpy is not in PATH.""" import screenshot as sc - with mock.patch("screenshot.shutil.which", return_value=None): - with pytest.raises(RuntimeError, match="scrcpy not found in PATH"): + with mock.patch("screenshot._find_scrcpy", return_value=None): + with pytest.raises(RuntimeError, match="scrcpy not found"): await sc._capture_scrcpy(adb_run=_mock_adb_run, serial="test", adb_exe="adb") @@ -302,15 +308,14 @@ async def test_scrcpy_backend_success(): """scrcpy backend returns base64 PNG when scrcpy succeeds.""" import screenshot as sc - with mock.patch("screenshot.shutil.which", return_value="C:/tools/scrcpy.exe"): + with mock.patch("screenshot._find_scrcpy", return_value="C:/tools/scrcpy.exe"): with mock.patch("screenshot.tempfile.NamedTemporaryFile") as mock_tmp: tmp_file = mock.MagicMock() tmp_file.__enter__ = mock.Mock(return_value=tmp_file) tmp_file.__exit__ = mock.Mock(return_value=False) - tmp_file.name = "C:/tmp/fake_screen.png" + tmp_file.name = "C:/tmp/fake_screen.mp4" mock_tmp.return_value = tmp_file - # Mock subprocess to exit 0 and write fake PNG to tmp file mock_proc = mock.AsyncMock() mock_proc.communicate = mock.AsyncMock(return_value=(b"", b"")) mock_proc.returncode = 0 @@ -319,7 +324,7 @@ async def test_scrcpy_backend_success(): "screenshot.asyncio.create_subprocess_exec", return_value=mock_proc, ): - with mock.patch("builtins.open", mock.mock_open(read_data=FAKE_PNG_BYTES)): + with mock.patch("screenshot._first_video_frame_to_png", return_value=FAKE_PNG_BYTES): with mock.patch("screenshot.os.unlink"): result = await sc._capture_scrcpy( adb_run=_mock_adb_run, serial="test", adb_exe="adb" @@ -404,7 +409,7 @@ async def test_adb_screenshot_tool(): import server async def mock_take_screenshot(**kwargs): - return FAKE_PNG_B64 + return FAKE_PNG_B64, {"backend": "mock", "png_bytes": len(FAKE_PNG_BYTES)} with mock.patch("server.take_screenshot", side_effect=mock_take_screenshot): result = await server.adb_screenshot() diff --git a/adb_vision/test_tool_audit.py b/adb_vision/test_tool_audit.py new file mode 100644 index 0000000000..c3c6295a3d --- /dev/null +++ b/adb_vision/test_tool_audit.py @@ -0,0 +1,70 @@ +import json +from pathlib import Path + +import tool_audit + + +def _read_records(path: Path) -> list[dict]: + return [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines()] + + +def test_record_command_writes_subprocess_event(tmp_path): + audit_path = tmp_path / "audit.jsonl" + tool_audit.configure(audit_path=str(audit_path)) + + tool_audit.record_command( + command_name="adb.exec", + argv=["adb", "-s", "serial", "get-state"], + duration_ms=12.5, + status="success", + stdout="device", + ) + + records = _read_records(audit_path) + assert len(records) == 1 + assert records[0]["tool"] == "adb.exec" + assert records[0]["mode"] == "subprocess" + assert records[0]["event_type"] == "subprocess" + assert records[0]["parent_event_id"] is None + + +def test_audit_cli_call_sets_parent_for_child_events(tmp_path): + audit_path = tmp_path / "audit.jsonl" + tool_audit.configure(audit_path=str(audit_path)) + + def _run(): + tool_audit.record_event( + tool_name="http.get", + arguments={"url": "http://127.0.0.1:7912/screenshot"}, + status="success", + result_summary="bytes=1024", + duration_ms=3.0, + event_type="http", + ) + return "ok" + + result = tool_audit.audit_cli_call("pilot.screenshot", {}, _run) + assert result == "ok" + + records = _read_records(audit_path) + assert len(records) == 2 + child, parent = records[0], records[1] + assert child["parent_event_id"] == parent["event_id"] + assert child["event_type"] == "http" + assert parent["event_type"] == "tool_call" + + +def test_record_command_summarizes_binary_streams(tmp_path): + audit_path = tmp_path / "audit.jsonl" + tool_audit.configure(audit_path=str(audit_path)) + + tool_audit.record_command( + command_name="adb.exec", + argv=["adb", "exec-out", "screencap", "-p"], + duration_ms=10.0, + status="success", + stdout=b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR", + ) + + records = _read_records(audit_path) + assert " None: + global _DEBUG, _AUDIT_PATH + _DEBUG = debug + if audit_path: + _AUDIT_PATH = Path(audit_path) + + +def _append_jsonl(path: Path, payload: Dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + if path.exists() and path.stat().st_size >= _ROTATE_BYTES: + stamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + path.replace(path.with_name(f"{path.stem}.{stamp}{path.suffix or '.jsonl'}")) + with path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(payload, ensure_ascii=True) + "\n") + + +def _truncate(value: str, limit: int = 200) -> str: + value = value or "" + if len(value) <= limit: + return value + return f"{value[:limit]}...<{len(value)} chars>" + + +def _looks_binary(data: bytes) -> bool: + if not data: + return False + if b"\x00" in data: + return True + sample = data[:256] + printable = sum(chr(byte) in string.printable for byte in sample) + return printable / len(sample) < 0.85 + + +def _summarize_blob(blob: str | bytes, limit: int = 200) -> str: + if isinstance(blob, bytes): + if _looks_binary(blob): + return f"" + blob = blob.decode("utf-8", errors="replace") + return _truncate(blob.strip(), limit) + + +def _sanitize_arguments(arguments: Optional[Dict[str, Any]]) -> Dict[str, Any]: + if not arguments: + return {} + clean: Dict[str, Any] = {} + for key, value in arguments.items(): + if key == "_caller": + continue + if isinstance(value, str): + clean[key] = _truncate(value, 500) + elif isinstance(value, (list, tuple)) and len(value) > 50: + clean[key] = f"<{len(value)} items>" + else: + clean[key] = value + return clean + + +def detect_caller(arguments: Optional[Dict[str, Any]] = None) -> str: + if arguments and "_caller" in arguments: + return str(arguments["_caller"]) + env_caller = os.environ.get("MCP_CALLER") + if env_caller: + return env_caller + term = os.environ.get("TERM_PROGRAM", "") + if "claude" in term.lower(): + return "claude-code" + return "unknown" + + +def summarize_result(result: Any) -> str: + if hasattr(result, "content") and isinstance(result.content, list): + parts = [] + for block in result.content: + block_type = getattr(block, "type", None) + if block_type == "text": + parts.append(f"text:{_truncate(getattr(block, 'text', ''), 120)}") + elif block_type == "image": + mime = getattr(block, "mimeType", "image/unknown") + data = getattr(block, "data", "") + size_kb = (len(data) * 3 // 4) / 1024 if data else 0 + parts.append(f"image:{mime}~{size_kb:.0f}KB") + return "; ".join(parts) if parts else "" + + if isinstance(result, dict): + if "content" in result and isinstance(result["content"], list): + parts = [] + for item in result["content"]: + if not isinstance(item, dict): + continue + if item.get("type") == "image": + mime = item.get("mimeType", "image/unknown") + data = item.get("data", "") + size_kb = (len(data) * 3 // 4) / 1024 if data else 0 + parts.append(f"image:{mime}~{size_kb:.0f}KB") + elif item.get("type") == "text": + parts.append(f"text:{_truncate(item.get('text', ''), 120)}") + return "; ".join(parts) if parts else _truncate(str(result)) + if "success" in result: + status = "ok" if result.get("success") else "fail" + state = result.get("observed_state") or result.get("expected_state") + return f"{status}, state={state}" + return _truncate(str(result)) + + if isinstance(result, list): + return f"list[{len(result)}]" + if isinstance(result, str): + return _truncate(result) + return _truncate(str(result)) + + +def _next_event_id() -> str: + return f"evt-{os.getpid()}-{next(_EVENT_COUNTER)}" + + +def _current_context_snapshot() -> dict[str, str] | None: + return _ACTIVE_CONTEXT.get() + + +def current_context() -> Dict[str, str | None]: + context = _current_context_snapshot() or {} + return { + "tool": context.get("tool_name"), + "caller": context.get("caller"), + "event_id": context.get("event_id"), + } + + +def _current_parent_event_id() -> str | None: + context = _current_context_snapshot() or {} + return context.get("event_id") + + +def _current_caller(arguments: Optional[Dict[str, Any]] = None) -> str: + context = _current_context_snapshot() or {} + return context.get("caller") or detect_caller(arguments) + + +def _build_record( + *, + tool_name: str, + arguments: Optional[Dict[str, Any]], + caller: str, + duration_ms: float, + status: str, + error: Optional[str], + result_summary: str, + mode: str, + event_type: str, + event_id: str, + parent_event_id: str | None, +) -> Dict[str, Any]: + return { + "ts": datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z"), + "tool": tool_name, + "arguments": _sanitize_arguments(arguments), + "caller": caller, + "duration_ms": round(duration_ms, 2), + "status": status, + "error": error, + "result_summary": result_summary, + "pid": os.getpid(), + "mode": mode, + "event_type": event_type, + "event_id": event_id, + "parent_event_id": parent_event_id, + } + + +def _write_record(record: Dict[str, Any]) -> None: + try: + _append_jsonl(_AUDIT_PATH, record) + if _DEBUG: + print(json.dumps(record, ensure_ascii=True), file=sys.stderr) + except Exception: + pass + + +def record_event( + *, + tool_name: str, + arguments: Optional[Dict[str, Any]], + status: str, + result_summary: str, + duration_ms: float, + error: Optional[str] = None, + mode: str = "child", + event_type: str = "child", + caller: Optional[str] = None, +) -> None: + record = _build_record( + tool_name=tool_name, + arguments=arguments, + caller=caller or _current_caller(arguments), + duration_ms=duration_ms, + status=status, + error=error, + result_summary=result_summary, + mode=mode, + event_type=event_type, + event_id=_next_event_id(), + parent_event_id=_current_parent_event_id(), + ) + _write_record(record) + + +def record_command( + *, + command_name: str, + argv: list[str], + duration_ms: float, + status: str, + error: str | None = None, + stdout: str | bytes = "", + stderr: str | bytes = "", +) -> None: + summary_parts = [f"argv={' '.join(argv)}"] + if stdout: + summary_parts.append(f"stdout={_summarize_blob(stdout, 120)}") + if stderr: + summary_parts.append(f"stderr={_summarize_blob(stderr, 120)}") + record_event( + tool_name=command_name, + arguments={"argv": argv}, + duration_ms=duration_ms, + status=status, + error=error, + result_summary=" | ".join(summary_parts), + mode="subprocess", + event_type="subprocess", + ) + + +def audit_cli_call(tool_name: str, arguments: Dict[str, Any], func: Callable[..., Any]) -> Any: + caller = detect_caller(arguments) + clean_args = {k: v for k, v in arguments.items() if k != "_caller"} + event_id = _next_event_id() + token = _ACTIVE_CONTEXT.set( + {"event_id": event_id, "tool_name": tool_name, "caller": caller} + ) + start = time.perf_counter() + status = "success" + error_msg: str | None = None + result_summary = "" + try: + result = func(**clean_args) + result_summary = summarize_result(result) + return result + except Exception as exc: + status = "error" + error_msg = f"{type(exc).__name__}: {exc}" + if _DEBUG: + error_msg += "\n" + traceback.format_exc(limit=6) + raise + finally: + _ACTIVE_CONTEXT.reset(token) + _write_record( + _build_record( + tool_name=tool_name, + arguments=arguments, + caller=caller, + duration_ms=(time.perf_counter() - start) * 1000, + status=status, + error=error_msg, + result_summary=result_summary, + mode="cli", + event_type="tool_call", + event_id=event_id, + parent_event_id=None, + ) + ) + + +AuditMiddleware = None + +try: + import mcp.types as _mt + from fastmcp.server.middleware import CallNext, Middleware, MiddlewareContext + from fastmcp.tools.tool import ToolResult as _ToolResult + + class _AuditMiddleware(Middleware): + async def on_call_tool( + self, + context: MiddlewareContext[_mt.CallToolRequestParams], + call_next: CallNext[_mt.CallToolRequestParams, _ToolResult], + ) -> _ToolResult: + tool_name = context.message.name + arguments = dict(context.message.arguments or {}) + caller = detect_caller(arguments) + event_id = _next_event_id() + token = _ACTIVE_CONTEXT.set( + {"event_id": event_id, "tool_name": tool_name, "caller": caller} + ) + + if "_caller" in arguments: + clean_args = {k: v for k, v in arguments.items() if k != "_caller"} + context = context.copy( + message=_mt.CallToolRequestParams(name=tool_name, arguments=clean_args) + ) + + start = time.perf_counter() + status = "success" + error_msg: str | None = None + result_summary = "" + try: + result = await call_next(context) + result_summary = summarize_result(result) + return result + except Exception as exc: + status = "error" + error_msg = f"{type(exc).__name__}: {exc}" + if _DEBUG: + error_msg += "\n" + traceback.format_exc(limit=6) + raise + finally: + _ACTIVE_CONTEXT.reset(token) + _write_record( + _build_record( + tool_name=tool_name, + arguments=arguments, + caller=caller, + duration_ms=(time.perf_counter() - start) * 1000, + status=status, + error=error_msg, + result_summary=result_summary, + mode="mcp", + event_type="tool_call", + event_id=event_id, + parent_event_id=None, + ) + ) + + AuditMiddleware = _AuditMiddleware +except ImportError: + pass diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index ca6c0692aa..0a77b5d5f3 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -23,23 +23,27 @@ ▼ ┌─────────────────────────────────────────────────────┐ │ adb_vision MCP Server (FastMCP) │ -│ Backend: DroidCast (primary), scrcpy, u2 │ +│ Backend: DroidCast / scrcpy / u2 / screenrecord │ ├─────────────────────────────────────────────────────┤ │ Tools: │ │ • adb_screenshot (PNG base64) │ │ • adb_tap, adb_swipe, adb_keyevent │ │ • adb_launch_game, adb_get_focus │ -│ • memuc_start, memuc_stop (emulator control) │ │ │ -│ Logging: mcp_actions.jsonl, mcp_screenshots/ │ +│ Logging: mcp_audit.jsonl, mcp_actions.jsonl, │ +│ mcp_screenshots/ │ +│ Surface: fixed tool allowlist only; no generic │ +│ "call arbitrary tool/script" bridge │ └────────────────┬────────────────────────────────────┘ │ ADB CLI (subprocess) + memuc ▼ ┌─────────────────────────────────────────────────────┐ │ MEmu Android Emulator │ │ Serial: 127.0.0.1:21513 │ -│ Render: OpenGL (DroidCast via MediaProjection) │ -│ Control: memuc.exe CLI (Admin via Multi-Manager) │ +│ Render: OpenGL (DroidCast/u2 may fail; │ +│ screenrecord fallback remains available) │ +│ Control: ADB for live piloting; memuc only in │ +│ recovery ladder / admin fallback │ └─────────────────────────────────────────────────────┘ ``` @@ -56,3 +60,7 @@ - Code that imports ALAS internals → `alas_wrapped/tools/` - MCP server, CLI wrappers, blueprints → `adb_vision/` - Documentation → `docs/` + +## Service Surface Rules +- Canonical MCP surface is `adb_vision/server.py`. +- Every MCP tool call is audit-logged as structured JSONL before returning to the caller. diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index a81f265e7e..ecf08eb7ae 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -6,7 +6,7 @@ - ✅ adb_vision MCP server skeleton (screenshot, tap, swipe core) - ✅ State machine ported (43-page graph as data) - ✅ Task catalog available (via ALAS reference) -- ✅ DroidCast backend integration (adb_vision/screenshot.py) +- ✅ Screenshot fallback ladder in `adb_vision/screenshot.py` (DroidCast / scrcpy / u2 / screenrecord / screencap) - ✅ P0: MEmu control layer (`emulator/memuc_cli.py`) with elevation detection - ✅ P0: Gemini Flash VLM integration (`vision/gemini_flash.py`) - ✅ P0: Persistent state cache (`state/cache.py`, thread-safe JSONL) @@ -15,6 +15,9 @@ - ✅ P1 baseline: scheduler loader/decider/master-loop skeleton + tests (`test_scheduler_*`, `test_master_loop.py`) - ✅ P1 complete: live loop run (3 cycles) against MEmu + per-cycle state persistence + deterministic-first/fallback flow - ✅ MEmu task bridge script for non-elevated triggering (`scripts/memu_task_bridge.ps1`) +- ✅ Structured MCP audit logging for `adb_vision/server.py` (`adb_vision/mcp_audit.jsonl`) +- ✅ MEmu diagnostics corrected to trust live ADB connectivity when `memuc.exe` elevation blocks status checks +- ✅ Live piloting path proven on March 5, 2026: screenshot -> close announcements modal -> lobby via `adb_vision/pilot.py` - ⏳ Piloting mode (vision + manual recovery) not started - ⏳ Blueprint library not started diff --git a/docs/dev/logging.md b/docs/dev/logging.md index 162a7d035b..fda3a20ddf 100644 --- a/docs/dev/logging.md +++ b/docs/dev/logging.md @@ -14,6 +14,7 @@ Minimal logging during successful deterministic execution: - Tool invocations with parameters - State transitions (expected → actual) - Timing information +- MCP audit record per tool call in JSONL (`adb_vision/mcp_audit.jsonl`) ### Failure Capture Rich context capture when tools fail or state is unexpected: @@ -29,6 +30,19 @@ When LLM recovery is triggered: - Whether recovery succeeded or failed - If logged for human review, what was logged +## Current Implementation + +- Canonical MCP surface: `adb_vision/server.py` +- Structured audit log: `adb_vision/mcp_audit.jsonl` +- Per-action compatibility log: `adb_vision/mcp_actions.jsonl` +- Screenshot artifacts: `adb_vision/mcp_screenshots/` +- Local piloting/diagnostic CLI calls (`pilot.py`, `diagnose.py`) also emit audit records so manual recovery sessions share the same timeline. + +## Service Surface Guardrails + +- Keep the MCP surface fixed and explicit; do not add generic shell/script execution tools. +- If a tool invocation is blocked by policy, log the blocked attempt with the same audit schema instead of failing silently. + ## In-Flight Enhancements ### 1) Scheduler JSONL Status Stream (Planned)