Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion TDD_IMPLEMENTATION_PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
| ADB raw tap/swipe/keyevent | `adb_vision/server.py` | ✅ **DONE** | Via `adb_tap`, `adb_swipe`, `adb_keyevent` tools |
| ALAS state machine | `alas_wrapped/module/ui/page.py` | Reference only | 43 pages, 98 transitions — extract knowledge, not code |
| MEmu config | `docs/dev/memu_playbook.md` | Documented | Admin-at-startup solved via memuc.exe |
| ALAS MCP server | `agent_orchestrator/alas_mcp_server.py` | ⚠️ **DEPRECATED** | Do not extend — use `adb_vision/server.py` |
| ALAS MCP server | `agent_orchestrator/alas_mcp_server.py` | ⚠️ **DEPRECATED** | Do not extend — generic internal tool bridge is disabled by default |

### What Must Be Built (Greenfield)

Expand Down
209 changes: 129 additions & 80 deletions agent_orchestrator/alas_mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,24 @@
import asyncio
import base64
import io
import json
import logging
import os
import sys
import inspect
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict, Any
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mcp_audit is imported before the sys.path injection that makes alas_wrapped/module/* importable (see the later sys.path.insert(0, alas_wrapped) block). As a result, mcp_audit will consistently fall back to its local append_jsonl implementation even in real server runs, which can silently diverge from module.base.jsonl.append_jsonl over time. Consider moving the sys.path setup above the mcp_audit import (or making mcp_audit resolve append_jsonl lazily) so production uses the canonical helper when available.

Suggested change
from typing import Optional, List, Dict, Any
from typing import Optional, List, Dict, Any
# Ensure alas_wrapped modules are importable before importing mcp_audit,
# so mcp_audit can use the canonical helpers from alas_wrapped/module/*.
_REPO_ROOT = Path(__file__).resolve().parents[1]
_ALAS_WRAPPED = _REPO_ROOT / "alas_wrapped"
if _ALAS_WRAPPED.is_dir():
sys.path.insert(0, str(_ALAS_WRAPPED))

Copilot uses AI. Check for mistakes.
from mcp_audit import record_command, record_event

# ---------------------------------------------------------------------------
# Always-on action log — every MCP tool call appended as a JSONL line.
# Screenshots are also saved as PNG files alongside the log.
# ---------------------------------------------------------------------------
_ACTION_LOG_PATH = Path(__file__).parent / "mcp_actions.jsonl"
_SCREENSHOT_DIR = Path(__file__).parent / "mcp_screenshots"
_action_seq = 0 # monotonic call counter within this server process


def _action_log(tool: str, args: dict, result_summary: str, error: str = "", duration_ms: int = 0):
"""Append one JSONL record to the action log (never raises)."""
global _action_seq
_action_seq += 1
record = {
"seq": _action_seq,
"ts": datetime.now(timezone.utc).isoformat(),
"tool": tool,
"args": args,
"result": result_summary,
"error": error,
"duration_ms": duration_ms,
}
try:
_ACTION_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(_ACTION_LOG_PATH, "a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=True) + "\n")
except Exception:
pass # never disrupt a tool call due to logging failure
_SCREENSHOT_DIR = Path(__file__).parent / "mcp_screenshots"


def _save_screenshot_png(data_b64: str, seq: int) -> str:
def _save_screenshot_png(data_b64: str) -> str:
"""Save a base64 PNG to mcp_screenshots/<seq>_<ts>.png; return the path."""
try:
_SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%dT%H%M%S")
fname = _SCREENSHOT_DIR / f"{seq:05d}_{ts}.png"
stamp = time.time_ns()
fname = _SCREENSHOT_DIR / f"{stamp}_{os.getpid()}.png"
Comment on lines +17 to +22
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _save_screenshot_png docstring says screenshots are saved as mcp_screenshots/<seq>_<ts>.png, but the implementation now uses {time.time_ns()}_{pid}.png (no seq, no formatted timestamp). Update the docstring to match the actual filename scheme so operators can locate files reliably.

Copilot uses AI. Check for mistakes.
fname.write_bytes(base64.b64decode(data_b64))
return str(fname)
except Exception:
Expand Down Expand Up @@ -84,6 +56,22 @@ def _find_adb() -> str:
# ADB serial — set from config in main(); used by async ADB CLI tools.
# ---------------------------------------------------------------------------
ADB_SERIAL: str = "127.0.0.1:21513"
ALLOW_UNSAFE_STATE_MACHINE_CALLS: bool = (
os.environ.get("ALAS_ALLOW_UNSAFE_STATE_MACHINE_CALLS", "").strip().lower()
in {"1", "true", "yes"}
)
REMOTE_ALLOWED_ALAS_TOOLS: set[str] = {
"main.collect_mail",
"dorm.collect_rewards",
"dorm.feed_ships",
"dorm.buy_furniture",
"dorm.get_ship_count",
"commission.run",
"research.run",
"shop.run",
"guild.collect_lobby_rewards",
"workflow.daily_base_sweep",
}


async def _adb_run(*args: str, timeout: float) -> bytes:
Expand All @@ -93,8 +81,10 @@ async def _adb_run(*args: str, timeout: float) -> bytes:
Raises TimeoutError if the command exceeds *timeout* seconds.
Raises RuntimeError if adb exits non-zero.
"""
argv = [ADB_EXECUTABLE, "-s", ADB_SERIAL, *args]
started = time.perf_counter()
proc = await asyncio.create_subprocess_exec(
ADB_EXECUTABLE, "-s", ADB_SERIAL, *args,
*argv,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
Expand All @@ -103,12 +93,36 @@ async def _adb_run(*args: str, timeout: float) -> bytes:
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
record_command(
command_name="adb.exec",
argv=argv,
duration_ms=(time.perf_counter() - started) * 1000,
status="error",
error=f"Timeout after {timeout}s",
)
raise TimeoutError(f"adb timed out after {timeout}s: adb -s {ADB_SERIAL} {' '.join(args)}")
if proc.returncode != 0:
stderr_text = stderr.decode(errors="replace").strip()
record_command(
command_name="adb.exec",
argv=argv,
duration_ms=(time.perf_counter() - started) * 1000,
status="error",
error=f"exit {proc.returncode}",
stderr=stderr_text,
)
raise RuntimeError(
f"adb {' '.join(args)} failed (exit {proc.returncode}): "
f"{stderr.decode(errors='replace').strip()}"
f"{stderr_text}"
)
record_command(
command_name="adb.exec",
argv=argv,
duration_ms=(time.perf_counter() - started) * 1000,
status="success",
stdout=stdout,
stderr=stderr,
)
return stdout

try:
Expand Down Expand Up @@ -234,15 +248,27 @@ def _take_screenshot() -> str:
timeout=25.0,
)
except asyncio.TimeoutError:
_action_log("adb_screenshot", {"serial": ADB_SERIAL}, "TIMEOUT",
"screencap timed out after 25s", 25000)
record_event(
tool_name="adb_screenshot.capture",
arguments={"serial": ADB_SERIAL},
status="error",
result_summary="capture timeout",
duration_ms=25000,
error="screencap timed out after 25s",
event_type="capture",
)
raise RuntimeError("adb_screenshot timed out after 25s")

ms = int((time.monotonic() - t0) * 1000)
png_bytes = base64.b64decode(data)
saved = _save_screenshot_png(data, _action_seq + 1)
_action_log("adb_screenshot", {"serial": ADB_SERIAL},
f"png_bytes={len(png_bytes)} saved={saved}", "", ms)
saved = _save_screenshot_png(data)
record_event(
tool_name="adb_screenshot.capture",
arguments={"serial": ADB_SERIAL},
status="success",
result_summary=f"png_bytes={len(png_bytes)} saved={saved}",
duration_ms=(time.monotonic() - t0) * 1000,
event_type="capture",
)
return {
"content": [
{"type": "image", "mimeType": "image/png", "data": data}
Expand All @@ -260,10 +286,7 @@ async def adb_tap(x: int, y: int) -> str:
x: X coordinate (integer)
y: Y coordinate (integer)
"""
t0 = time.monotonic()
await _adb_run("shell", "input", "tap", str(x), str(y), timeout=5.0)
ms = int((time.monotonic() - t0) * 1000)
_action_log("adb_tap", {"x": x, "y": y, "serial": ADB_SERIAL}, f"tapped {x},{y}", "", ms)
return f"tapped {x},{y}"

@mcp.tool()
Expand All @@ -276,16 +299,13 @@ async def adb_launch_game() -> str:

Timeout: 10 seconds.
"""
t0 = time.monotonic()
await _adb_run(
"shell", "am", "start",
"-a", "android.intent.action.MAIN",
"-c", "android.intent.category.LAUNCHER",
"-n", "com.YoStarEN.AzurLane/com.manjuu.azurlane.PrePermissionActivity",
timeout=10.0,
)
ms = int((time.monotonic() - t0) * 1000)
_action_log("adb_launch_game", {"serial": ADB_SERIAL}, "launch intent sent", "", ms)
return "Azur Lane launch intent sent"


Expand All @@ -303,12 +323,10 @@ async def adb_get_focus() -> Dict[str, Any]:
"activity": "com.manjuu.azurlane.MainActivity" # or null
}
"""
t0 = time.monotonic()
stdout = await _adb_run(
"shell", "dumpsys", "window", "windows",
timeout=8.0,
)
ms = int((time.monotonic() - t0) * 1000)
raw_text = stdout.decode(errors="replace")
focus_line = ""
for line in raw_text.splitlines():
Expand All @@ -326,7 +344,6 @@ async def adb_get_focus() -> Dict[str, Any]:
activity = m.group(2)

result = {"raw": focus_line, "package": package, "activity": activity}
_action_log("adb_get_focus", {"serial": ADB_SERIAL}, f"{package}/{activity}", "", ms)
return result


Expand All @@ -344,16 +361,11 @@ async def adb_swipe(x1: int, y1: int, x2: int, y2: int, duration_ms: int = 300)
y2: Ending Y coordinate
duration_ms: Duration in milliseconds (default: 300)
"""
t0 = time.monotonic()
await _adb_run(
"shell", "input", "swipe",
str(x1), str(y1), str(x2), str(y2), str(duration_ms),
timeout=5.0 + duration_ms / 1000.0,
)
ms = int((time.monotonic() - t0) * 1000)
_action_log("adb_swipe", {"x1": x1, "y1": y1, "x2": x2, "y2": y2,
"duration_ms": duration_ms, "serial": ADB_SERIAL},
f"swiped {x1},{y1}->{x2},{y2}", "", ms)
return f"swiped {x1},{y1}->{x2},{y2}"

@mcp.tool()
Expand All @@ -365,10 +377,7 @@ def alas_get_current_state() -> str:
"""
if ctx is None:
raise RuntimeError("ALAS context not initialized")
t0 = time.monotonic()
page = ctx._state_machine.get_current_state()
ms = int((time.monotonic() - t0) * 1000)
_action_log("alas_get_current_state", {}, str(page), "", ms)
return str(page)

@mcp.tool()
Expand All @@ -383,17 +392,19 @@ def alas_goto(page: str) -> str:
"""
if ctx is None:
raise RuntimeError("ALAS context not initialized")
t0 = time.monotonic()
err = ""
from module.ui.page import Page
destination = Page.all_pages.get(page)
if destination is None:
err = f"unknown page: {page}"
_action_log("alas_goto", {"page": page}, "FAILED", err, 0)
raise ValueError(err)
raise ValueError(f"unknown page: {page}")
ctx._state_machine.transition(destination)
ms = int((time.monotonic() - t0) * 1000)
_action_log("alas_goto", {"page": page}, f"navigated to {page}", "", ms)
record_event(
tool_name="state_machine.transition",
arguments={"page": page},
status="success",
result_summary=f"navigated to {page}",
duration_ms=0,
event_type="delegate",
)
return f"navigated to {page}"

@mcp.tool()
Expand All @@ -412,8 +423,8 @@ def alas_list_tools() -> List[Dict[str, Any]]:
"parameters": t.parameters
}
for t in ctx._state_machine.get_all_tools()
if ALLOW_UNSAFE_STATE_MACHINE_CALLS or t.name in REMOTE_ALLOWED_ALAS_TOOLS
]
_action_log("alas_list_tools", {}, f"{len(tools)} tools", "", 0)
return tools

@mcp.tool()
Expand All @@ -427,17 +438,43 @@ def alas_call_tool(name: str, arguments: Optional[Dict[str, Any]] = None) -> Any
if ctx is None:
raise RuntimeError("ALAS context not initialized")
t0 = time.monotonic()
err = ""
args = arguments or {}
if not ALLOW_UNSAFE_STATE_MACHINE_CALLS and name not in REMOTE_ALLOWED_ALAS_TOOLS:
err = (
f"state machine tool '{name}' is not exposed through MCP. "
"Add it to REMOTE_ALLOWED_ALAS_TOOLS for an intentional surface expansion."
)
record_event(
tool_name="state_machine.call_tool",
arguments={"name": name, "args": args},
status="blocked",
result_summary="blocked by MCP allowlist",
duration_ms=(time.monotonic() - t0) * 1000,
error=err,
event_type="delegate",
)
raise PermissionError(err)
try:
result = ctx._state_machine.call_tool(name, **args)
except Exception as e:
err = str(e)
_action_log("alas_call_tool", {"name": name, "args": args}, "FAILED", err, int((time.monotonic()-t0)*1000))
except Exception as exc:
record_event(
tool_name="state_machine.call_tool",
arguments={"name": name, "args": args},
status="error",
result_summary="call failed",
duration_ms=(time.monotonic() - t0) * 1000,
error=f"{type(exc).__name__}: {exc}",
event_type="delegate",
)
raise
ms = int((time.monotonic() - t0) * 1000)
result_str = str(result)[:200] if result is not None else "None"
_action_log("alas_call_tool", {"name": name, "args": args}, result_str, "", ms)
record_event(
tool_name="state_machine.call_tool",
arguments={"name": name, "args": args},
status="success",
result_summary=str(result)[:200] if result is not None else "None",
duration_ms=(time.monotonic() - t0) * 1000,
event_type="delegate",
)
return result


Expand All @@ -462,7 +499,6 @@ def alas_login_ensure_main(
from alas_wrapped.tools.login import ensure_main_with_config_device

t0 = time.monotonic()
err = ""
try:
result = ensure_main_with_config_device(
ctx.script.config,
Expand All @@ -472,12 +508,25 @@ def alas_login_ensure_main(
dismiss_popups=dismiss_popups,
get_ship=get_ship,
)
except Exception as e:
err = str(e)
_action_log("alas_login_ensure_main", {"max_wait_s": max_wait_s}, "FAILED", err, int((time.monotonic()-t0)*1000))
except Exception as exc:
record_event(
tool_name="login.ensure_main",
arguments={"max_wait_s": max_wait_s},
status="error",
result_summary="ensure_main failed",
duration_ms=(time.monotonic() - t0) * 1000,
error=f"{type(exc).__name__}: {exc}",
event_type="delegate",
)
raise
ms = int((time.monotonic() - t0) * 1000)
_action_log("alas_login_ensure_main", {"max_wait_s": max_wait_s}, str(result.get("observed_state","?") if isinstance(result, dict) else result)[:200], "", ms)
record_event(
tool_name="login.ensure_main",
arguments={"max_wait_s": max_wait_s},
status="success",
result_summary=str(result.get("observed_state", "?") if isinstance(result, dict) else result)[:200],
duration_ms=(time.monotonic() - t0) * 1000,
event_type="delegate",
)
return result

def main():
Expand All @@ -504,4 +553,4 @@ def main():
mcp.run(transport="stdio")

if __name__ == "__main__":
main()
main()
Loading