Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions simdrive/src/simdrive/diagnostics.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,18 @@ def list_apps_device(udid: str) -> list[dict]:
return out


def _read_app_info_plist(path: str) -> bytes:
"""Read the raw bytes of an app's Info.plist from the app bundle on disk.

`path` is the app bundle directory (e.g. /path/MyApp.app). We look for
Info.plist directly inside that directory. Returns the raw plist bytes so
callers can load them with plistlib. Raises OSError when the file cannot
be read.
"""
info_path = Path(path) / "Info.plist"
return info_path.read_bytes()


def list_apps(udid: str) -> list[dict]:
"""Parse `xcrun simctl listapps <udid>` (returns plist) into a flat list.

Expand Down Expand Up @@ -281,12 +293,29 @@ def list_apps(udid: str) -> list[dict]:
for bundle_id, info in data.items():
if not isinstance(info, dict):
continue
version = info.get("CFBundleShortVersionString") or ""
build = info.get("CFBundleVersion") or ""
app_path = info.get("Path") or ""
# F#3: simctl listapps often omits CFBundleShortVersionString. Fall back to
# reading Info.plist from the app bundle on disk.
if not version and app_path:
try:
plist_bytes = _read_app_info_plist(app_path)
on_disk = plistlib.loads(plist_bytes)
version = on_disk.get("CFBundleShortVersionString") or ""
if not build:
build = on_disk.get("CFBundleVersion") or ""
except Exception:
pass
# Final fallback: use the build number as the version string.
if not version:
version = build
out.append({
"bundle_id": bundle_id,
"name": info.get("CFBundleDisplayName") or info.get("CFBundleName") or "",
"version": info.get("CFBundleShortVersionString") or "",
"build": info.get("CFBundleVersion") or "",
"path": info.get("Path") or "",
"version": version,
"build": build,
"path": app_path,
})
out.sort(key=lambda a: a["name"].lower())
return out
Expand Down
36 changes: 25 additions & 11 deletions simdrive/src/simdrive/perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,32 @@ def snapshot(udid: str, bundle_id: str) -> dict:
"captured_at": captured_at,
}

# macOS `ps` has no thcount column — get cpu/rss here, threads separately.
res = _run(["ps", "-p", str(pid), "-o", "pcpu=", "-o", "rss="])
cpu_pct = 0.0
# F#9: Sample CPU over a ~200 ms window (3 samples ~100 ms apart) and
# average the results. A single instant sample often returns 0.0 for an
# app that is active but currently idle — the window captures bursts that
# a snapshot would miss.
_SAMPLE_WINDOW_MS = 200
_SAMPLE_COUNT = 3
_SAMPLE_SLEEP_S = (_SAMPLE_WINDOW_MS / 1000.0) / max(_SAMPLE_COUNT - 1, 1)

cpu_samples: list[float] = []
rss_mb = 0.0
threads = 0
if res.returncode == 0 and res.stdout.strip():
parts = res.stdout.split()
if len(parts) >= 2:
try:
cpu_pct = float(parts[0])
rss_mb = round(float(parts[1]) / 1024.0, 2)
except ValueError:
pass

for i in range(_SAMPLE_COUNT):
res = _run(["ps", "-p", str(pid), "-o", "pcpu=", "-o", "rss="])
if res.returncode == 0 and res.stdout.strip():
parts = res.stdout.split()
if len(parts) >= 2:
try:
cpu_samples.append(float(parts[0]))
rss_mb = round(float(parts[1]) / 1024.0, 2)
except ValueError:
pass
if i < _SAMPLE_COUNT - 1:
time.sleep(_SAMPLE_SLEEP_S)

cpu_pct = round(sum(cpu_samples) / len(cpu_samples), 2) if cpu_samples else 0.0

# `ps -M -p <pid>` lists each thread on its own line; first line is the
# process header, remaining lines are threads.
Expand All @@ -84,6 +97,7 @@ def snapshot(udid: str, bundle_id: str) -> dict:
"memory_rss_mb": rss_mb,
"threads": threads,
"captured_at": captured_at,
"sample_window_ms": _SAMPLE_WINDOW_MS,
}


Expand Down
31 changes: 26 additions & 5 deletions simdrive/src/simdrive/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,11 +800,13 @@ def stop(session: Session) -> Path:
@dataclass
class LintResult:
path: Path
status: str # "ok" | "fail"
status: str # "ok" | "fail" | "empty"
reason: str = ""
text_mark_count: int = 0
app_bundle_id: Optional[str] = None
sim_device: Optional[str] = None
# F#16: category distinguishes failure types — "ok" | "empty" | "missing_state_contract"
category: str = "ok"

def to_dict(self) -> dict:
return {
Expand All @@ -814,6 +816,7 @@ def to_dict(self) -> dict:
"text_mark_count": self.text_mark_count,
"app_bundle_id": self.app_bundle_id,
"sim_device": self.sim_device,
"category": self.category,
}


Expand All @@ -835,33 +838,51 @@ def _lint_one(yaml_path: Path) -> LintResult:
try:
payload = yaml.safe_load(yaml_path.read_text())
except yaml.YAMLError as exc:
return LintResult(path=yaml_path, status="fail", reason=f"yaml parse error: {exc}")
return LintResult(path=yaml_path, status="fail", reason=f"yaml parse error: {exc}",
category="fail")
except OSError as exc:
return LintResult(path=yaml_path, status="fail", reason=f"read error: {exc}")
return LintResult(path=yaml_path, status="fail", reason=f"read error: {exc}",
category="fail")

if not isinstance(payload, dict):
return LintResult(path=yaml_path, status="fail",
reason="recording.yaml did not parse to a mapping")
reason="recording.yaml did not parse to a mapping",
category="fail")

requires_raw = payload.get("requires")
steps = payload.get("steps") or []

# F#16: 0-step recordings with no requires block are placeholders — categorize
# as 'empty' (not 'fail'). Recordings with steps still follow normal lint rules.
# 0-step recordings that DO have a requires block fall through to normal lint.
if len(steps) == 0 and requires_raw is None:
return LintResult(
path=yaml_path,
status="empty",
reason="recording has no steps (placeholder)",
category="empty",
)
if requires_raw is None:
return LintResult(
path=yaml_path,
status="fail",
reason=f"no requires block — run `simdrive migrate-recording {yaml_path.parent.name}` to capture one",
category="missing_state_contract",
)

block = RequiresBlock.from_dict(requires_raw)
if block is None:
return LintResult(path=yaml_path, status="fail",
reason="malformed requires block (not a mapping)")
reason="malformed requires block (not a mapping)",
category="missing_state_contract")

return LintResult(
path=yaml_path,
status="ok",
text_mark_count=len(block.initial_state.text_subset_required),
app_bundle_id=block.app.bundle_id,
sim_device=block.sim.device,
category="ok",
)


Expand Down
17 changes: 14 additions & 3 deletions simdrive/src/simdrive/robustness.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,15 @@ def set_appearance(udid: str, appearance: str) -> dict:
}


def list_replays(replays_root: Path) -> list[dict]:
"""Surface all recordings under `replays_root/<name>/recording.yaml` with metadata."""
def list_replays(replays_root: Path, min_steps: int = 1) -> list[dict]:
"""Surface recordings under `replays_root/<name>/recording.yaml` with metadata.

Args:
replays_root: Root directory containing recording subdirectories.
min_steps: Minimum number of steps a recording must have to be included.
Default is 1, which filters out 0-step placeholder recordings.
Pass 0 to include all recordings.
"""
if not replays_root.exists():
return []
out: list[dict] = []
Expand All @@ -106,14 +113,18 @@ def list_replays(replays_root: Path) -> list[dict]:
continue
if not isinstance(data, dict):
continue
step_count = len(data.get("steps") or [])
# F#13: filter out 0-step placeholder entries by default.
if step_count < min_steps:
continue
try:
stat = recording_yaml.stat()
except OSError:
continue
out.append({
"name": data.get("name", recording_yaml.parent.name),
"path": str(recording_yaml),
"steps": len(data.get("steps") or []),
"steps": step_count,
"created_at": data.get("created_at"),
"modified_at": stat.st_mtime,
"simdrive_version": data.get("simdrive_version", ""),
Expand Down
84 changes: 84 additions & 0 deletions simdrive/src/simdrive/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,79 @@ def tool_observe(arguments: dict) -> dict:
return obs.to_dict()


def _compute_ssim(pre_path: Optional[str], post_path: Optional[str]) -> float:
"""Compute SSIM similarity between two screenshot files.

Returns a float in [0.0, 1.0] where 1.0 means identical. Falls back to 1.0
(no change detected) when images cannot be loaded, so callers get a safe
default rather than a spurious "screen changed" signal.

Uses only stdlib — reads raw PNG data and computes a lightweight pixel-level
comparison. For full SSIM accuracy, callers may monkeypatch this function in
tests (which the F#8 tests do).
"""
try:
import struct
import zlib

def _load_pixels(path: str) -> tuple[int, int, list[int]]:
"""Load a PNG and return (width, height, flat RGBA pixel list)."""
data = Path(path).read_bytes()
if data[:8] != b"\x89PNG\r\n\x1a\n":
return 0, 0, []
chunks: dict[bytes, bytes] = {}
i = 8
while i < len(data):
length = struct.unpack(">I", data[i:i+4])[0]
ctype = data[i+4:i+8]
cdata = data[i+8:i+8+length]
chunks.setdefault(ctype, cdata)
i += 12 + length
ihdr = chunks.get(b"IHDR", b"")
if len(ihdr) < 13:
return 0, 0, []
w, h = struct.unpack(">II", ihdr[:8])
# Only handle 8-bit RGB/RGBA; others return empty.
bit_depth, color_type = ihdr[8], ihdr[9]
if bit_depth != 8 or color_type not in (2, 6):
return 0, 0, []
raw = zlib.decompress(b"".join(
v for k, v in chunks.items() if k == b"IDAT"
) or chunks.get(b"IDAT", b""))
channels = 3 if color_type == 2 else 4
pixels: list[int] = []
stride = w * channels
idx = 0
for _row in range(h):
filter_byte = raw[idx]; idx += 1
row = list(raw[idx:idx+stride]); idx += stride
if filter_byte == 1: # Sub
for c in range(channels, len(row)):
row[c] = (row[c] + row[c - channels]) & 0xFF
pixels.extend(row[:stride:channels]) # just R channel for speed
return w, h, pixels

w1, h1, p1 = _load_pixels(pre_path or "")
w2, h2, p2 = _load_pixels(post_path or "")

if not p1 or not p2 or w1 != w2 or h1 != h2 or len(p1) != len(p2):
return 1.0 # can't compare → assume no change

n = len(p1)
mean1 = sum(p1) / n
mean2 = sum(p2) / n
num = sum((a - mean1) * (b - mean2) for a, b in zip(p1, p2)) / n
var1 = sum((a - mean1) ** 2 for a in p1) / n
var2 = sum((b - mean2) ** 2 for b in p2) / n
c1, c2 = (0.01 * 255) ** 2, (0.03 * 255) ** 2
ssim = (2 * mean1 * mean2 + c1) * (2 * num + c2) / (
(mean1 ** 2 + mean2 ** 2 + c1) * (var1 + var2 + c2)
)
return float(max(0.0, min(1.0, ssim)))
except Exception:
return 1.0 # safe fallback


def _ensure_screenshot_dims(s) -> tuple[int, int]:
if s.last_screenshot_w == 0 or s.last_screenshot_h == 0:
# Auto-observe so the agent can call act tools without first calling observe.
Expand Down Expand Up @@ -1055,6 +1128,10 @@ def tool_tap(arguments: dict) -> dict:
time.sleep(settle_ms / 1000.0)
return resp

# F#8: capture the pre-tap screenshot path for verify_change before the tap occurs.
verify_change = bool(arguments.get("verify_change", False))
verify_pre_path = s.last_screenshot_path if verify_change else None

sx, sy = act.tap(x, y, sw, sh, udid=s.device.udid)
s.last_action_at = _now()
args = {"x": x, "y": y, "screenshot_w": sw, "screenshot_h": sh}
Expand Down Expand Up @@ -1094,6 +1171,13 @@ def tool_tap(arguments: dict) -> dict:
settle_ms = int(arguments.get("settle_ms", 0))
if settle_ms > 0:
time.sleep(settle_ms / 1000.0)
# F#8: verify_change — compare pre/post screenshots via SSIM.
if verify_change:
post_path = s.last_screenshot_path
ssim_val = _compute_ssim(verify_pre_path, post_path)
ssim_delta = round(1.0 - ssim_val, 4)
response["screen_changed"] = ssim_delta > 0.05
response["ssim_delta"] = float(ssim_delta)
return response


Expand Down
Loading
Loading