diff --git a/.gitignore b/.gitignore index f86428a..d6fb6bb 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ *.tar.gz *.new tests/tmp_downloads/ +__pycache__/ # Added by cargo diff --git a/Dockerfile b/Dockerfile index 24784f0..baae880 100644 --- a/Dockerfile +++ b/Dockerfile @@ -84,9 +84,11 @@ RUN pip install --upgrade pip && \ git+https://github.com/marin-m/vmlinux-to-elf \ jefferson \ gnupg \ + openai \ poetry \ psycopg2-binary \ pycryptodome \ + pydantic \ pylzma \ pyyaml \ setuptools \ @@ -164,13 +166,31 @@ RUN --mount=type=secret,id=github_token \ "https://raw.githubusercontent.com/qkaiser/arpy/23faf88a88488c41fc4348ea2b70996803f84f40/arpy.py" \ /usr/local/lib/python3.10/dist-packages/arpy.py -# Copy wrapper script into container so we can copy out - note we don't put it on guest path +# Copy host wrappers into container so we can copy them out via the install +# scripts — not put on guest PATH (the in-container entry points are +# fakeroot_fw2tar and fwstitch, which live in /usr/local/bin). COPY ./fw2tar /usr/local/src/fw2tar_wrapper -# And add install helpers which generate shell commands to install it on host -COPY ./src/resources/banner.sh ./src/resources/fw2tar_install ./src/resources/fw2tar_install.local /usr/local/bin/ +COPY ./fwstitch /usr/local/src/fwstitch_wrapper +# Install helpers (emit shell scripts the user pipes to sh / sudo sh on host) +COPY ./src/resources/banner.sh \ + ./src/resources/fw2tar_install ./src/resources/fw2tar_install.local \ + ./src/resources/fwstitch_install ./src/resources/fwstitch_install.local \ + /usr/local/bin/ +RUN chmod +x /usr/local/bin/fwstitch_install /usr/local/bin/fwstitch_install.local # Warn on interactive shell sessions and provide instructions for install RUN echo '[ ! -z "$TERM" ] && [ -z "$NOBANNER" ] && /usr/local/bin/banner.sh' >> /etc/bash.bashrc COPY src/fakeroot_fw2tar /usr/local/bin/fakeroot_fw2tar +# Multi-filesystem stitcher (utils/stitch). The package goes to /opt/fw2tar so +# `python3 -m stitch ...` works; `fwstitch` is a thin shim on PATH. The +# Python entry point auto-re-execs under fakeroot for the `shard`/`all` +# subcommands so firmware uid/gid metadata is preserved. +COPY ./utils/stitch /opt/fw2tar/stitch +RUN printf '%s\n' \ + '#!/bin/bash' \ + 'exec env PYTHONPATH="/opt/fw2tar${PYTHONPATH:+:$PYTHONPATH}" python3 -m stitch "$@"' \ + > /usr/local/bin/fwstitch && \ + chmod +x /usr/local/bin/fwstitch + CMD ["/usr/local/bin/banner.sh"] diff --git a/fwstitch b/fwstitch new file mode 100755 index 0000000..8780ec4 --- /dev/null +++ b/fwstitch @@ -0,0 +1,237 @@ +#!/bin/bash +# Host wrapper for the fw2tar shard/plan/apply stitcher inside the rehosting/fw2tar container. +# +# - Auto-mounts file and directory arguments under /host_ +# - Passes through LLM_BASE_URL / LLM_API_KEY / LLM_MODEL / LLM_INSECURE so the +# container's `plan` step can reach your local model server +# - Uses --network host for plan/all commands so http://localhost:8000/v1 etc. +# on the host is reachable from inside the container +# - Mirrors ./fw2tar style. Subcommands: shard | plan | apply | all +# +# Usage examples: +# ./fwstitch shard ./firmware.bin -o ./shards +# ./fwstitch plan ./shards +# ./fwstitch apply ./shards ./shards/stitch_plan.yaml --out ./fw.stitched.rootfs.tar.gz +# LLM_BASE_URL=http://localhost:8000/v1 LLM_MODEL=gpt-oss-120b \ +# ./fwstitch all ./firmware.bin --shard-dir ./shards --out ./fw.stitched.rootfs.tar.gz +set -eu + +image="rehosting/fw2tar" +verbose=false +network_host=false +extra_docker_args=() +env_file="" + +# Load a KEY=VALUE-style .env file. Process env wins over file contents — we +# only set keys that aren't already in the environment. +load_env_file() { + local f="$1" + [[ -f "$f" ]] || return 0 + while IFS= read -r line || [[ -n "$line" ]]; do + # Strip leading whitespace + line="${line#"${line%%[![:space:]]*}"}" + [[ -z "$line" ]] && continue + [[ "${line:0:1}" == "#" ]] && continue + # Allow optional `export ` prefix + line="${line#export }" + if [[ "$line" =~ ^([A-Za-z_][A-Za-z0-9_]*)=(.*)$ ]]; then + local k="${BASH_REMATCH[1]}" + local v="${BASH_REMATCH[2]}" + # Strip surrounding single or double quotes (but only if matched) + if [[ ${#v} -ge 2 && "${v:0:1}" == "${v: -1}" && ( "${v:0:1}" == '"' || "${v:0:1}" == "'" ) ]]; then + v="${v:1:${#v}-2}" + fi + if [[ -z "${!k:-}" ]]; then + export "$k=$v" + fi + fi + done < "$f" +} + +# Subcommands that need network access for the LLM server. +needs_network() { + case "${1:-}" in + plan|all) return 0 ;; + *) return 1 ;; + esac +} + +print_help() { + cat < [SUBCOMMAND FLAGS] ... +Wrapper script for running fw2tar's stitcher in a Docker container. + +Wrapper flags (must precede the subcommand): + --image NAME image to run (default: $image) + --network MODE docker network mode (default: 'host' for plan/all, bridge otherwise) + --env-file PATH load env vars from a KEY=VALUE file (process env wins) + --verbose print mappings + docker command + --wrapper-help this message + +.env auto-discovery: ~/.config/fwstitch/.env then ./.env are loaded if present. +The format is plain KEY=VALUE per line (#-comments and an optional 'export ' +prefix are allowed). Process env vars always win over file contents. + +Subcommands (forwarded to fwstitch inside the container): + shard FIRMWARE -o SHARD_DIR extract per-fragment .tar.gz + manifest + plan SHARD_DIR drive an LLM to produce stitch_plan.yaml + apply SHARD_DIR PLAN_YAML --out X build the unified stitched .tar.gz + all FIRMWARE --shard-dir D --out X end-to-end + +Env vars forwarded into the container for plan/all: + LLM_BASE_URL, LLM_API_KEY, LLM_MODEL, LLM_INSECURE + +Pass through any subcommand-level flags as usual; this wrapper does no +parsing of them, it only auto-mounts file and directory arguments. +EOF +} + +# Parse wrapper-level flags. Stop at the first non-flag arg (the subcommand). +while [[ $# -gt 0 ]]; do + case "$1" in + --wrapper-help) print_help; exit 0 ;; + --image) image="$2"; shift 2 ;; + --network) extra_docker_args+=("--network" "$2"); network_host=true; shift 2 ;; + --env-file) env_file="$2"; shift 2 ;; + --verbose) verbose=true; shift ;; + --) shift; break ;; + *) break ;; + esac +done + +# Auto-load .env. Each load_env_file only sets keys that aren't already in the +# environment, so order = precedence: process env > --env-file > ./.env > +# ~/.config/fwstitch/.env. We feed them most-specific-first so the more +# specific source wins when it disagrees with a more generic one. +if [[ -n "$env_file" ]]; then + load_env_file "$env_file" +fi +load_env_file "./.env" +load_env_file "${HOME}/.config/fwstitch/.env" + +if [[ $# -eq 0 ]]; then + print_help + exit 1 +fi + +subcmd="$1" +shift +cmd=("$subcmd" "$@") + +# Auto-mount any arg that is an existing file or directory, plus the value +# immediately after --out, --plan-out, --shard-dir, --from-extracted (which +# may not exist yet but should be writable on the host). +maps=() + +map_path() { + local arg="$1" + local abspath + if [[ -e "$arg" ]]; then + abspath=$(realpath "$arg") + elif [[ "$arg" = /* ]]; then + abspath="$arg" + else + abspath="$PWD/$arg" + fi + local host_dir + local guest_dir + if [[ -d "$abspath" ]]; then + host_dir="$abspath" + guest_dir="/host_$(basename "$abspath")" + # Replace with full guest path (the dir itself, since we mount it) + new_value="$guest_dir" + elif [[ -f "$abspath" ]]; then + host_dir=$(dirname "$abspath") + guest_dir="/host_$(basename "$host_dir")" + new_value="$guest_dir/$(basename "$abspath")" + else + # Path doesn't exist; assume it's an output. Create parent dir on host, + # then mount that. + local parent + parent=$(dirname "$abspath") + mkdir -p "$parent" + host_dir="$parent" + guest_dir="/host_$(basename "$parent")" + new_value="$guest_dir/$(basename "$abspath")" + fi + maps+=("$host_dir:$guest_dir") + REWRITTEN="$new_value" +} + +# Rewrite cmd[] in place. We mount: +# * any cmd[i] that is an existing path +# * the value after --out / --plan-out / --shard-dir / --from-extracted +out_flags=(--out --plan-out --shard-dir --from-extracted -o) +for ((i=1; i<${#cmd[@]}; i++)); do + arg="${cmd[$i]}" + prev="${cmd[$((i-1))]}" + is_out_value=false + for f in "${out_flags[@]}"; do + if [[ "$prev" == "$f" ]]; then is_out_value=true; break; fi + done + if $is_out_value || [[ -e "$arg" ]]; then + if [[ "$arg" == -* ]]; then continue; fi + map_path "$arg" + cmd[$i]="$REWRITTEN" + fi +done + +# Deduplicate and sort mappings (longest first to avoid shadowing). +if [[ ${#maps[@]} -gt 0 ]]; then + IFS=$'\n' maps=($(printf '%s\n' "${maps[@]}" | sort -u)) + unset IFS +fi + +# Build docker command +docker_cmd=(docker run --rm) +docker_cmd+=(-u "$(id -u):$(id -g)") + +# Network: --network host for plan/all unless the user overrode it +if ! $network_host && needs_network "$subcmd"; then + docker_cmd+=(--network host) +fi +docker_cmd+=("${extra_docker_args[@]}") + +# Mounts +for m in "${maps[@]}"; do + docker_cmd+=(-v "$m") +done + +# LLM env vars — accept both LLM_API_KEY and the shorter LLM_KEY. +for v in LLM_BASE_URL LLM_API_KEY LLM_KEY LLM_MODEL LLM_INSECURE; do + if [[ -n "${!v-}" ]]; then + docker_cmd+=(-e "$v=${!v}") + fi +done + +# fw2tar hash for traceability (matches ./fw2tar) +hash=$(sha1sum "$0" | awk '{print $1}') +docker_cmd+=(-e "FWSTITCH_HASH=$hash") + +docker_cmd+=("$image" fwstitch "${cmd[@]}") + +if $verbose; then + echo "Mappings:" + for m in "${maps[@]}"; do echo " $m"; done + echo "Docker command:" + redacted=() + redact_next=false + for tok in "${docker_cmd[@]}"; do + if $redact_next; then + # Redact the VALUE of an env-var assignment that names a secret. + case "$tok" in + LLM_API_KEY=*|LLM_KEY=*|*TOKEN=*|*SECRET=*|*PASSWORD=*) + redacted+=("${tok%%=*}=") ;; + *) redacted+=("$tok") ;; + esac + redact_next=false + else + redacted+=("$tok") + [[ "$tok" == "-e" ]] && redact_next=true + fi + done + echo " ${redacted[*]}" + echo +fi + +exec "${docker_cmd[@]}" diff --git a/src/resources/banner.sh b/src/resources/banner.sh index 7be77c2..2b61477 100755 --- a/src/resources/banner.sh +++ b/src/resources/banner.sh @@ -41,3 +41,8 @@ echo -e " $ docker run rehosting/fw2tar fw2tar_install.local | sh\n" echo -e "${BOLD}${RED}${STARS}Step 2: Run ${GREEN}fw2tar${RESET}${STARS}" echo -e " $ fw2tar --help\n" +echo -e "${BOLD}${RED}${STARS}Optional: Install ${GREEN}fwstitch${RESET}${STARS} (LLM-driven multi-filesystem stitcher)" +echo -e " $ docker run rehosting/fw2tar fwstitch_install | sudo sh" +echo -e " $ docker run rehosting/fw2tar fwstitch_install.local | sh" +echo -e " $ fwstitch --wrapper-help\n" + diff --git a/src/resources/fwstitch_install b/src/resources/fwstitch_install new file mode 100644 index 0000000..cdbfb1f --- /dev/null +++ b/src/resources/fwstitch_install @@ -0,0 +1,13 @@ +#!/bin/bash +# Emits a shell script that installs the fwstitch host wrapper system-wide. +# Run as: docker run rehosting/fw2tar fwstitch_install | sudo sh +# Use a unique heredoc delimiter — the wrapper itself contains an 'EOF' inside +# its print_help() heredoc, which would otherwise terminate the outer heredoc +# early. +echo "#!/bin/bash" +echo "cat << '__FWSTITCH_WRAPPER_EOF__' | sudo tee /usr/local/bin/fwstitch >/dev/null" +printf "%s" "$(cat /usr/local/src/fwstitch_wrapper)" +echo +echo "__FWSTITCH_WRAPPER_EOF__" +echo "sudo chmod +x /usr/local/bin/fwstitch" +echo "echo \"fwstitch installed successfully to /usr/local/bin/fwstitch\"" diff --git a/src/resources/fwstitch_install.local b/src/resources/fwstitch_install.local new file mode 100644 index 0000000..099d42f --- /dev/null +++ b/src/resources/fwstitch_install.local @@ -0,0 +1,23 @@ +#!/bin/bash +# Emits a shell script that installs the fwstitch host wrapper to ~/.local/bin. +# Run as: docker run rehosting/fw2tar fwstitch_install.local | sh +echo "#!/bin/bash" +echo "mkdir -p \$HOME/.local/bin" +# Unique delimiter — the wrapper has an 'EOF' inside its own print_help heredoc. +echo "cat << '__FWSTITCH_WRAPPER_EOF__' > \$HOME/.local/bin/fwstitch" +printf "%s" "$(cat /usr/local/src/fwstitch_wrapper)" +echo +echo "__FWSTITCH_WRAPPER_EOF__" +echo "chmod +x \$HOME/.local/bin/fwstitch" +echo "case \":\$PATH:\" in" +echo " *:\"\$HOME/.local/bin\":*) ;;" +echo " *) echo 'export PATH=\"\$HOME/.local/bin:\$PATH\"' >> \$HOME/.bashrc ;;" +echo "esac" + +echo 'BOLD=$(tput bold 2>/dev/null || true)' +echo 'RESET=$(tput sgr0 2>/dev/null || true)' + +echo "echo \"\${BOLD}Success!\${RESET} fwstitch installed to ~/.local/bin/fwstitch. If ~/.local/bin wasn't on PATH yet, source ~/.bashrc (or open a new shell) to pick it up.\"" +echo "echo" +echo "echo \"Try:\"" +echo "echo \" \${BOLD}fwstitch --wrapper-help\${RESET}\"" diff --git a/utils/stitch/README.md b/utils/stitch/README.md new file mode 100644 index 0000000..fd497ee --- /dev/null +++ b/utils/stitch/README.md @@ -0,0 +1,440 @@ +# fw2tar stitch + +LLM-driven filesystem stitching for firmware images that have more than one +filesystem in them. + +`fw2tar` itself assumes a firmware blob contains one big rootfs and picks the +"best" candidate. That's wrong for plenty of real devices — e.g. the Rigol +MSO5000 (rootfs.img + an app UBIFS at `/rigol`), the D-Link DNS320 (cpio +ramdisk + a gzip-compressed config blob mounted at `/etc/NAS_CFG` + a squashfs +modules partition at `/usr/local/modules`), and most appliance-style firmware. +This tool extracts every filesystem fragment as a separate tarball ("shard"), +then drives a local LLM through the evidence (init scripts, fstab, dangling +symlinks, strings in binaries, fs-type from unblob's naming) to produce a +plan describing where each shard should be mounted. The plan can be reviewed +or auto-applied to produce a single unified `.rootfs.tar.gz`. + +Local models in mind throughout: anything that speaks the OpenAI-compatible +HTTP API works (vllm, ollama, llama.cpp server, gpt-oss-120b, etc.). No +`langchain`. Two harness modes — native tool-calling and a JSON-emission +fallback — with auto-detection, so it stays robust across server quirks. + + +## Install + +The stitcher and its Python deps (`openai`, `pydantic`, `pyyaml`) are baked +into the `rehosting/fw2tar` Docker image. The simplest invocation path is the +`./fwstitch` host wrapper, which mirrors `./fw2tar`: + +```bash +./fwstitch shard ./firmware.bin -o ./shards +./fwstitch plan ./shards # auto-uses --network host +./fwstitch apply ./shards ./shards/stitch_plan.yaml --out ./fw.stitched.rootfs.tar.gz +./fwstitch all ./firmware.bin --shard-dir ./shards --out ./fw.stitched.rootfs.tar.gz +``` + +The wrapper auto-mounts any file/dir argument under `/host_` inside +the container, passes through the `LLM_*` env vars, and adds `--network host` +for `plan`/`all` so a local LLM server on the host is reachable. + +You can also run it directly without the wrapper: + +- **Inside the container**: `fwstitch ...` (on PATH already). +- **On the host**: `pip install -r fw2tar/utils/stitch/requirements.txt` and + invoke `python -m stitch ...` (set `PYTHONPATH` to the parent of the + `stitch/` directory, or `cd fw2tar/utils`). + +Runtime deps (all already in the container, install on host as needed): + +- Python 3.10+ +- `openai` (any base URL — used for local servers too), `pydantic`, `pyyaml` +- For `shard`: `unblob` (preferred) or `binwalk` on PATH +- For perm-preserving re-extraction: `cpio` on PATH (plus `gunzip` / + `bunzip2` / `unxz` / `lz4` if you have compressed-cpio blobs like + `ramdisk_el`) +- `fakeroot` on PATH (the `shard` and `all` subcommands auto-re-exec under + fakeroot so firmware uid/gid metadata survives — see "Ownership" below) + + +## Ownership: shard runs under fakeroot + +Firmware images contain files owned by `root`, setuid binaries, and other +ownership metadata that must be preserved. unblob/binwalk extraction would +normally do `chown()` calls that need privilege; cpio does the same. + +The `shard` and `all` subcommands automatically re-exec themselves under +`fakeroot --` if they're not already inside one (and you're not root). +fakeroot intercepts those calls, records the intended uid/gid in shadow +metadata, and when the resulting tree is tarred up the headers reflect what +the firmware actually wanted. The `plan` and `apply` subcommands are +read-only / tar-header-only and don't need fakeroot. + +To opt out (e.g. when debugging with everything owned by your real uid), pass +`--no-fakeroot`. If `fakeroot` isn't on PATH, a loud warning is printed and +extraction proceeds with your real uid — useful for trials, harmful for any +firmware you actually plan to boot. + + +## Three steps: shard, plan, apply + +``` +firmware blob ──[shard]──> shard_dir/ ──[plan]──> stitch_plan.yaml ──[apply]──> stitched.rootfs.tar.gz + (N tarballs (LLM-produced (one unified rootfs) + + shards.json) stitching plan) +``` + +You can also collapse to one command with `all`. + + +### shard — extract every filesystem fragment + +```bash +python -m utils.stitch shard firmware.bin -o ./shards +``` + +Runs unblob into a temp directory, walks the extraction tree, and emits one +`.tar.gz` per detected filesystem fragment plus a `shards.json` manifest with +provenance. Robust to "piles of shards" — there's no `--primary-limit` +or "is this root-like?" filter; every leaf of the extraction tree that looks +like a filesystem comes through as its own tarball. + +Selection logic, in priority order: + +1. **Terminal `*_extract` directories** (unblob's naming). When unblob finishes + extracting a chunk it produces `foo._extract/` next to `foo.`. The + suffix encodes the fs type (`ubifs_extract`, `squashfs_v4_le_extract`, + `jffs2_extract`, `cpio_extract`, `gzip_extract`, etc.) — that flows into + the manifest as `fs_type_guess` and the LLM sees it as a strong hint. + Wrapper directories like `squashfs-root/` are descended automatically. +2. **Score-based fallback.** For trees that don't use unblob's naming (binwalk + output, pre-extracted directories), each directory gets a score from + filesystem-like signals (top-level `bin`/`etc`/`sbin`/..., presence of + `/bin/sh`, `/etc/passwd`, `/sbin/init`, etc.). The highest-scoring path in + each ancestor chain wins. + +#### Native re-extract for cpio (and similar) + +unblob and binwalk both delegate cpio extraction to 7z, which **does not** +preserve setuid bits, restrictive permissions, or sometimes even symlinks. +The shard step automatically detects cpio shards (by `fs_type_guess`), locates +the original blob next to the `*_extract` directory, and re-extracts with +native `cpio -idmu --no-absolute-filenames`. Compressed-cpio wrappers +(`gunzip|cpio`, `bunzip2|cpio`, `unxz|cpio`, `lz4 -d|cpio`) are auto-detected +by magic bytes. + +If `cpio` isn't on PATH or the re-extract fails, the 7z output is used +unchanged and the manifest records `reextracted_with: null` for that shard +(visible to the LLM). The mapping lives in `shard.py`: + +```python +REEXTRACTOR_FOR_TYPE = {"cpio": "cpio"} +REEXTRACTORS = {"cpio": reextract_cpio} +``` + +Add new entries here for any other format where the upstream extractor is +lossy. + +#### Flags + +``` +python -m utils.stitch shard FIRMWARE -o OUT + [--extractor unblob|binwalk] # default unblob + [--from-extracted DIR] # skip extraction, walk a pre-extracted tree + [--min-score 3] # score-pass floor; only matters when *_extract isn't used + [--no-reextract] # keep 7z's broken-perms output (debug only) + [-v] # log every candidate + each reextract +``` + + +### plan — LLM produces a stitch plan + +```bash +export LLM_BASE_URL=http://localhost:8000/v1 +export LLM_API_KEY=dummy +export LLM_MODEL=gpt-oss-120b +python -m utils.stitch plan ./shards +# writes ./shards/stitch_plan.yaml +``` + +The harness: + +1. Loads every shard tarball into a `FragmentCache`. +2. Pre-digests each shard via `fs_summary` (rootfs file presence, top-dir + counts, fs_type_guess from the manifest, unblob root path, score, + reextracted_with) and injects that into the initial prompt so the LLM + doesn't burn turns asking for the obvious. +3. Loops, exposing six read-only tools the LLM can call to gather more + evidence: + - `list_paths(fragment, pattern)` — glob inside the shard + - `read_file(fragment, path, max_bytes)` — for `/etc/fstab`, init scripts + - `grep_in_fragment(fragment, pattern, path_glob)` — find `mount` commands + - `strings_of(fragment, path)` — paths hardcoded in init binaries + - `find_dangling_symlinks(fragment)` — absolute symlinks whose target is + missing in this shard (strongest cross-fragment signal) + - `fs_summary(fragment)` — the precomputed digest +4. The LLM terminates by calling `submit_plan` with a `StitchPlan`. The + harness validates the plan against the pydantic schema; failures are + reported back to the model and it retries up to a bounded number of times. +5. On the last turn `tool_choice` is forced to `submit_plan` so the loop + always exits with a plan (which may be low-confidence). + +#### Native tool-calling vs JSON fallback + +Most modern servers (recent ollama, vllm, llama.cpp) support OpenAI-style +tool calling. The harness uses it by default. If the server rejects the +`tools` parameter, or returns empty `tool_calls` twice in a row, the harness +transparently switches to a JSON-emission fallback where the model emits +exactly one of: + +``` +{"tool": "", "args": { ... }} +{"final": { ...StitchPlan... }} +``` + +per turn. Pass `--no-native-tools` to force this mode from the start. + +#### Flags + +``` +python -m utils.stitch plan SHARD_DIR + [--plan-out plan.yaml] # default: /stitch_plan.yaml + [--model NAME] # else $LLM_MODEL + [--base-url URL] # else $LLM_BASE_URL + [--api-key KEY] # else $LLM_API_KEY, defaults to 'dummy' + [--max-turns 10] + [--no-native-tools] + [-k] [--insecure] # skip TLS cert verification (self-signed local server) + [-v] # log each turn + every tool call/result +``` + +If you're hitting a self-hosted model over HTTPS with a self-signed cert +(common on internal vllm / llama.cpp deployments), pass `-k` (or +`--insecure`), or set `OPENAI_INSECURE=1`. Same idea as `curl -k`. + + +### apply — build the stitched rootfs + +```bash +python -m utils.stitch apply ./shards ./shards/stitch_plan.yaml \ + --out ./shards/firmware.stitched.rootfs.tar.gz +``` + +Streams each shard's tar members into a single output `.tar.gz`, rewriting +paths to sit under the chosen mount point and preserving mode / uid / gid / +mtime / symlinks (no re-tar-from-disk; permissions never round-trip through +the filesystem). The fw2tar metadata trailer (`stitched_from: [...]`, plan +hash, confidence) is appended so `fw2tar/utils/show_metadata.py` can still +read the output. + +Mount semantics: + +- Base shard is processed first (its members land at `/`). +- Overlays are applied in plan order. **Absolute symlink targets in overlays + are left unchanged** — they're meant to resolve in the unified rootfs + view, which is what the original firmware author intended. +- Path collisions: default policy is `overlay` wins (matches union-mount + intuition). `--strict` errors on collision instead. Sample collisions are + always printed to stderr. +- `confidence: low` plans are refused unless `--force`. + +#### Flags + +``` +python -m utils.stitch apply SHARD_DIR PLAN_YAML + [--out PATH] + [--on-conflict {base,overlay,error}] # default overlay + [--strict] # alias for --on-conflict error + [--force] # apply even if confidence=low + [-v] +``` + + +### all — shard → plan → apply, end-to-end + +```bash +python -m utils.stitch all firmware.bin --shard-dir ./shards --out ./firmware.stitched.rootfs.tar.gz +``` + +Same args as the three commands combined. Pass `--no-apply` to stop after +the plan (useful in CI where a human reviews before commit). + + +## Environment variables + +All consumed by the `plan` step; CLI flags override. + +| Variable | Meaning | +| --------------- | ------------------------------------------------------------- | +| `LLM_BASE_URL` | LLM server endpoint, e.g. `http://localhost:8000/v1`. Omit for OpenAI proper. | +| `LLM_API_KEY` | API key; defaults to `"dummy"` since most local servers ignore it | +| `LLM_MODEL` | Model name, e.g. `gpt-4o-mini`, `gpt-oss-120b`, `gemma3:27b`, `qwen2.5:32b` | +| `LLM_INSECURE` | `1` to skip TLS verification (same as `-k` / `--insecure`) | + +### `.env` files + +Both the host wrapper and the in-container Python entry auto-load env files +when present. Precedence, most-specific wins: + +1. process env vars (anything already `export`ed in your shell) +2. `--env-file PATH` (explicit, on either the wrapper or the Python CLI) +3. `./.env` (project-local — the directory you invoke from) +4. `~/.config/fwstitch/.env` (machine-wide defaults) + +Format is plain `KEY=VALUE` per line. `#` comments allowed; an optional +`export ` prefix is accepted; matched surrounding single or double quotes are +stripped. + +```ini +# ~/.config/fwstitch/.env +LLM_API_KEY=sk-... +LLM_MODEL=gpt-4o-mini +# LLM_BASE_URL omitted -> openai SDK uses api.openai.com by default +``` + +Then any plan/all invocation just works: + +```bash +./fwstitch plan ./shards +``` + + +## File formats + +### `shards.json` (manifest produced by `shard`) + +```yaml +firmware: dns320_fw.bin +firmware_stem: dns320_fw +extractor: unblob # or binwalk, or preextracted +shards: + - name: dns320_fw.shard.00.firmware.bin_extract__ramdisk_el_extract.tar.gz + score: 46 # higher = more rootfs-like + root_path: firmware.bin_extract/ramdisk_el_extract + fs_type_guess: cpio + matched_root_dirs: [bin, etc, lib, sbin, usr] + matched_rootfs_files: [etc/passwd, sbin/init, bin/sh] + file_count: 1247 + reextracted_with: cpio # null if not re-extracted + source_blob: firmware.bin_extract/ramdisk_el # the original blob, if known + - name: dns320_fw.shard.01.firmware.bin_extract__default_gzip_extract__NAS_CFG.tar.gz + score: 5 + root_path: firmware.bin_extract/default_gzip_extract/NAS_CFG + fs_type_guess: gzip + ... +``` + +### `stitch_plan.yaml` (produced by `plan`, consumed by `apply`) + +```yaml +fragments: + - source: dns320_fw.shard.00.firmware.bin_extract__ramdisk_el_extract.tar.gz + mount_point: / + role: base + fs_type: cpio + notes: contains /bin/sh, /sbin/init; /etc/init.d/rcS mounts the others + - source: dns320_fw.shard.01.firmware.bin_extract__default_gzip_extract__NAS_CFG.tar.gz + mount_point: /etc/NAS_CFG + role: overlay + fs_type: gzip + - source: dns320_fw.shard.02.firmware.bin_extract__modules.squashfs_v4_le_extract__modules.tar.gz + mount_point: /usr/local/modules + role: overlay + fs_type: squashfs +reasoning: | + Base picked from the cpio ramdisk — it's the only fragment with /sbin/init + and /etc/passwd. /etc/init.d/rcS references /etc/NAS_CFG and + /usr/local/modules; those are missing in the base but provided by the + other two shards. +confidence: high +open_questions: [] +``` + +Validation rules (enforced by pydantic): + +- exactly one fragment with `role: base` +- the base must have `mount_point: /` +- `mount_point`s are unique and absolute +- `mount_point`s are normalized (e.g. `/foo//` -> `/foo`) + + +## Architectural notes + +- The whole stitch package is stdlib + `openai` + `pydantic` + `pyyaml`. No + langchain. The "agent" is ~250 lines of `harness.py`. +- Tool outputs are capped (`max_bytes`, `max_hits`, member-name caps) so the + 10-turn loop fits in a 16k context on small local models. +- `tar`-time path rewriting: each shard's members are streamed straight from + the input `.tar.gz` into the output with their `name` prefixed by the + mount point. We never extract to disk, so perms/uid/gid/mtime/symlinks + pass through losslessly. +- The metadata trailer is appended as a second gzip member; gzip's + multi-member format means `fw2tar/utils/show_metadata.py` reads stitched + outputs unchanged. + + +## Limits / gotchas + +- The plan currently models exactly one base. Multi-base layouts (e.g. dual- + firmware images, A/B partitions where both are usable rootfs) would need a + schema change. +- `--from-extracted` works for re-extraction only if the original blobs are + still next to the `*_extract` directories. If you've deleted them, run the + shard step against the firmware blob instead. +- The `fs_type_guess` is best-effort; it comes from unblob's naming. + Pre-extracted trees may have `fs_type_guess: null` and the LLM falls back + to other evidence. +- Low-confidence plans (`confidence: low`) refuse to `--apply` without + `--force`. Usually that's the harness telling you it didn't find enough + signal — check `open_questions`, the verbose log, or hand-edit the YAML. +- Empty shard directories produce a hard error. One-shard directories produce + a warning (stitching would be a no-op). +- The harness sends a 1-token completion at startup to verify the endpoint is + reachable. If your server has long cold-start times, increase + `request_timeout` in `HarnessConfig` (not currently exposed via CLI). + + +## Hacking + +Module layout: + +``` +fw2tar/utils/stitch/ + __main__.py # python -m utils.stitch entry point + cli.py # argparse, subcommand dispatch + shard.py # extractor invocation, candidate selection, re-extract + harness.py # tool-use loop (native + JSON-fallback modes) + tools.py # the six LLM-callable tools + FragmentCache + prompts.py # SYSTEM_PROMPT and friends (terse on purpose) + plan.py # StitchPlan schema, yaml IO, apply_plan() + requirements.txt + README.md # this file +``` + +Adding a new tool the LLM can call: write a pydantic args model + a function +in `tools.py`, append to the `TOOLS` list. The schema is auto-projected into +the OpenAI tools array and into the JSON-fallback prompt. + +Adding a perm-preserving re-extractor: write a function with signature +`(blob: Path, out_dir: Path, verbose: bool) -> bool` in `shard.py`, register +it in `REEXTRACTORS`, and map its fs type in `REEXTRACTOR_FOR_TYPE`. The +shard step handles the rest. + +Adding a new fs-type guess from extraction-dir naming: append to +`EXTRACT_SUFFIX_TYPES` in `shard.py` (longest suffix first). + + +## Testing without a real LLM + +The schema, apply path, shard selection, and cpio re-extract are all +exercisable without an LLM or pydantic. The repository already includes +ad-hoc smoke tests as shell one-liners in the commit history; promote them +to `fw2tar/utils/stitch/tests/` if you want to wire them into CI. + +Quick manual e2e (requires unblob + cpio + a local LLM): + +```bash +python -m utils.stitch all path/to/MSO5000.bin --shard-dir /tmp/mso5k -v +# inspect /tmp/mso5k/stitch_plan.yaml +# the resulting /tmp/mso5k/mso5k.stitched.rootfs.tar.gz should mount the +# UBIFS app partition at /rigol and the rootfs at / +``` diff --git a/utils/stitch/__init__.py b/utils/stitch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/stitch/__main__.py b/utils/stitch/__main__.py new file mode 100644 index 0000000..9ae637f --- /dev/null +++ b/utils/stitch/__main__.py @@ -0,0 +1,4 @@ +from .cli import main + +if __name__ == "__main__": + main() diff --git a/utils/stitch/cli.py b/utils/stitch/cli.py new file mode 100644 index 0000000..7bb59f1 --- /dev/null +++ b/utils/stitch/cli.py @@ -0,0 +1,370 @@ +"""CLI for the fw2tar stitcher. + +Subcommands: + shard - run an extractor on a firmware blob and emit per-shard .tar.gz + manifest + plan - drive an LLM to produce a stitch_plan.yaml from a shard directory + apply - apply a stitch_plan.yaml (LLM-produced or human-edited) to build the unified tar + all - shard -> plan -> apply, end-to-end +""" +from __future__ import annotations + +import argparse +import os +import shutil +import sys +from pathlib import Path + +from .harness import HarnessConfig, run +from .plan import apply_plan, dump_plan, load_plan + + +def _load_env_file(path: Path) -> int: + """Load KEY=VALUE lines from a .env file. Process env wins (we only set + keys that aren't already in os.environ). Returns the number of keys set. + """ + if not path.is_file(): + return 0 + count = 0 + with open(path, "r") as f: + for raw in f: + line = raw.strip() + if not line or line.startswith("#"): + continue + if line.startswith("export "): + line = line[len("export "):] + if "=" not in line: + continue + key, _, value = line.partition("=") + key = key.strip() + value = value.strip() + # Strip matched surrounding quotes + if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'): + value = value[1:-1] + if key and key.isidentifier() and key not in os.environ: + os.environ[key] = value + count += 1 + return count + + +def _autoload_env_files() -> None: + """Load ./.env then ~/.config/fwstitch/.env. _load_env_file only sets + keys not already in os.environ, so the more-specific source wins: + process env > --env-file > ./.env > ~/.config/fwstitch/.env. + """ + paths = [ + Path(".env"), + Path.home() / ".config" / "fwstitch" / ".env", + ] + for p in paths: + _load_env_file(p) + + +def _peek_arg(argv: list[str], flag: str) -> str | None: + """Pull a flag's value out of argv before argparse runs. Returns None if + the flag isn't present. Supports both `--flag VAL` and `--flag=VAL` forms. + """ + for i, a in enumerate(argv): + if a == flag and i + 1 < len(argv): + return argv[i + 1] + if a.startswith(flag + "="): + return a.split("=", 1)[1] + return None + + +# Commands that perform on-disk extraction and therefore need fakeroot so that +# uid/gid metadata from the firmware survives into the shard tarballs. `plan` +# and `apply` are read-only / tar-header-only and don't need it. +_FAKEROOT_CMDS = {"shard", "all"} + + +def _under_fakeroot_or_root() -> bool: + return os.environ.get("FAKEROOTKEY") is not None or os.geteuid() == 0 + + +def _reexec_under_fakeroot(cmd_name: str) -> None: + """If the requested command needs fakeroot and we're not already inside + one (or root), re-exec ourselves through `fakeroot --`. Carries argv and + env through transparently. No-op if --no-fakeroot was passed. + """ + if cmd_name not in _FAKEROOT_CMDS: + return + if "--no-fakeroot" in sys.argv: + return + if _under_fakeroot_or_root(): + return + if not shutil.which("fakeroot"): + print( + "WARNING: 'fakeroot' is not on PATH. Extraction will run with your " + "real uid/gid, so firmware file ownership (root, setuid binaries, " + "etc.) will be LOST in the shard tarballs. Install fakeroot, or " + "pass --no-fakeroot to suppress this warning.", + file=sys.stderr, + ) + return + # We were invoked via either `python -m stitch ...` (sys.argv[0] points + # at __main__.py) or via the fwstitch shim (sys.argv[0] points at it). + # Either way, re-launching the same invocation under fakeroot is what we + # want, so just prepend `fakeroot --` to argv. + new_argv = ["fakeroot", "--", sys.executable] + ["-m", "stitch"] + sys.argv[1:] + os.execvp("fakeroot", new_argv) + + +# --------------- helpers --------------- + +def _resolve_llm_env(args) -> tuple[str | None, str, str]: + base_url = args.base_url or os.environ.get("LLM_BASE_URL") + # Accept both LLM_API_KEY (verbose) and LLM_KEY (short) — LLM_API_KEY wins + # if both are set, since it's the more explicit name. + api_key = (args.api_key or os.environ.get("LLM_API_KEY") + or os.environ.get("LLM_KEY") or "dummy") + model = args.model or os.environ.get("LLM_MODEL") + if not model: + raise SystemExit("--model not given and LLM_MODEL not set") + return base_url, api_key, model + + +def _resolve_insecure(args) -> bool: + if getattr(args, "insecure", False): + return True + return os.environ.get("LLM_INSECURE", "").lower() in ("1", "true", "yes") + + +def _add_llm_args(p: argparse.ArgumentParser) -> None: + p.add_argument("--model", default=None, help="LLM model name (else $LLM_MODEL)") + p.add_argument("--base-url", default=None, help="OpenAI-compatible endpoint URL (else $LLM_BASE_URL)") + p.add_argument("--api-key", default=None, help="API key (else $LLM_API_KEY, defaults to 'dummy')") + p.add_argument("--max-turns", type=int, default=10) + p.add_argument("--no-native-tools", action="store_true", + help="Skip native tool-calling, use JSON fallback mode") + p.add_argument("-k", "--insecure", action="store_true", + help="Skip TLS cert verification (for self-signed self-hosted models). " + "Also honored via env: LLM_INSECURE=1.") + + +def _add_apply_args(p: argparse.ArgumentParser) -> None: + p.add_argument("--on-conflict", choices=["base", "overlay", "error"], default="overlay", + help="Path collision policy (default: overlay wins)") + p.add_argument("--strict", action="store_true", help="Alias for --on-conflict error") + p.add_argument("--force", action="store_true", help="Apply even if confidence=low") + + +def _print_plan_summary(plan) -> None: + print(f"[stitch] plan confidence: {plan.confidence}") + for f in plan.fragments: + extra = f" ({f.fs_type})" if f.fs_type else "" + print(f" {f.role:7s} {f.mount_point:25s} <- {f.source}{extra}") + if plan.open_questions: + print("[stitch] open questions:") + for q in plan.open_questions: + print(f" - {q}") + + +def _print_apply_summary(stats: dict) -> None: + print(f"[stitch] applied: {stats['members_written']} members, " + f"{stats['conflicts']} conflicts -> {stats['out_path']}") + if stats["conflict_samples"]: + print("[stitch] sample conflicts (path, kept_from, replaced_by):") + for path, kept, repl in stats["conflict_samples"]: + print(f" {path}: {kept} <- {repl}") + + +def _default_out(frag_dir: Path) -> Path: + return frag_dir / f"{frag_dir.resolve().name}.stitched.rootfs.tar.gz" + + +# --------------- subcommand handlers --------------- + +def cmd_shard(args) -> int: + from .shard import shard + summary = shard( + firmware=args.firmware, + out_dir=args.out, + extractor=args.extractor, + extracted_dir=args.from_extracted, + min_score=args.min_score, + reextract=not args.no_reextract, + verbose=args.verbose, + ) + print(f"[shard] wrote {summary['count']} shards to {summary['shard_dir']}") + if summary.get("reextracted_count"): + print(f"[shard] re-extracted {summary['reextracted_count']} shard(s) with " + f"native tools (perms preserved)") + print(f"[shard] manifest: {summary['manifest']}") + if args.verbose: + for s in summary["shards"]: + rx = f" reextracted_with={s['reextracted_with']}" if s.get('reextracted_with') else "" + print(f" {s['name']} score={s['score']} fs_type={s['fs_type_guess']} " + f"root_path={s['root_path']}{rx}") + if summary["count"] == 0: + print("[shard] no shards found. Try lowering --min-score or pre-extracting " + "and pointing with --from-extracted.", file=sys.stderr) + return 2 + return 0 + + +def cmd_plan(args) -> int: + base_url, api_key, model = _resolve_llm_env(args) + cfg = HarnessConfig( + base_url=base_url, api_key=api_key, model=model, + max_turns=args.max_turns, force_fallback=args.no_native_tools, + insecure=_resolve_insecure(args), verbose=args.verbose, + ) + result = run(args.shard_dir, cfg) + plan_out = args.plan_out or (args.shard_dir / "stitch_plan.yaml") + dump_plan(result.plan, plan_out) + print(f"[plan] wrote {plan_out} ({result.turns} turns, " + f"{'fallback' if result.used_fallback else 'native'} mode)") + _print_plan_summary(result.plan) + return 0 + + +def cmd_apply(args) -> int: + plan = load_plan(args.plan) + if plan.confidence == "low" and not args.force: + print("[apply] plan confidence is 'low' — refusing. Re-run with --force.", + file=sys.stderr) + return 2 + on_conflict = "error" if args.strict else args.on_conflict + out_path = args.out or _default_out(args.shard_dir) + stats = apply_plan(plan, args.shard_dir, out_path, + on_conflict=on_conflict, verbose=args.verbose) + _print_apply_summary(stats) + return 0 + + +def cmd_all(args) -> int: + """shard -> plan -> apply in one go. Useful for batch jobs.""" + from .shard import shard + summary = shard( + firmware=args.firmware, + out_dir=args.shard_dir, + extractor=args.extractor, + min_score=args.min_score, + reextract=not args.no_reextract, + verbose=args.verbose, + ) + print(f"[all] {summary['count']} shards extracted") + if summary["count"] == 0: + return 2 + + base_url, api_key, model = _resolve_llm_env(args) + cfg = HarnessConfig( + base_url=base_url, api_key=api_key, model=model, + max_turns=args.max_turns, force_fallback=args.no_native_tools, + insecure=_resolve_insecure(args), verbose=args.verbose, + ) + result = run(args.shard_dir, cfg) + plan_out = args.shard_dir / "stitch_plan.yaml" + dump_plan(result.plan, plan_out) + _print_plan_summary(result.plan) + + if not args.no_apply: + if result.plan.confidence == "low" and not args.force: + print("[all] confidence=low — not applying. Re-run with --force or " + "use --no-apply.", file=sys.stderr) + return 2 + on_conflict = "error" if args.strict else args.on_conflict + out_path = args.out or _default_out(args.shard_dir) + stats = apply_plan(result.plan, args.shard_dir, out_path, + on_conflict=on_conflict, verbose=args.verbose) + _print_apply_summary(stats) + return 0 + + +# --------------- top-level parser --------------- + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="fw2tar.utils.stitch", + description="LLM-driven multi-shard filesystem stitching for fw2tar firmware analysis.", + ) + parser.add_argument("-v", "--verbose", action="store_true") + parser.add_argument("--env-file", default=None, + help="Load KEY=VALUE pairs from this file. Auto-discovers " + "~/.config/fwstitch/.env and ./.env if present (process env wins).") + # Common subcommand flag so `fwstitch shard ... -v` works in addition to + # `fwstitch -v shard ...` — easier on the user. + common = argparse.ArgumentParser(add_help=False) + common.add_argument("-v", "--verbose", action="store_true") + sub = parser.add_subparsers(dest="cmd", required=True, parser_class=lambda **kw: argparse.ArgumentParser(parents=[common], **kw)) + + # shard + sp = sub.add_parser("shard", help="extract a firmware blob into per-shard tarballs + manifest") + sp.add_argument("firmware", type=Path, nargs="?", help="firmware blob (omit if --from-extracted)") + sp.add_argument("-o", "--out", type=Path, required=True, help="shard output directory") + sp.add_argument("--extractor", choices=["unblob", "binwalk"], default="unblob") + sp.add_argument("--from-extracted", type=Path, default=None, + help="skip extraction; walk this pre-extracted tree directly") + sp.add_argument("--min-score", type=int, default=3, + help="extra-candidate floor for the score-based pass (default 3). " + "Selection primarily uses unblob's *_extract naming; this only " + "matters for trees that don't follow that convention.") + sp.add_argument("--no-reextract", action="store_true", + help="Skip native re-extraction (cpio etc.) — keeps unblob/binwalk's " + "7z output even though 7z corrupts permissions on cpio.") + sp.add_argument("--no-fakeroot", action="store_true", + help="Don't re-exec under fakeroot. Without fakeroot, firmware uid/gid " + "ownership (e.g. files owned by root) is lost in the shard tarballs.") + sp.set_defaults(func=cmd_shard) + + # plan + sp = sub.add_parser("plan", help="drive an LLM to produce stitch_plan.yaml") + sp.add_argument("shard_dir", type=Path) + sp.add_argument("--plan-out", type=Path, default=None, + help="output YAML (default: /stitch_plan.yaml)") + _add_llm_args(sp) + sp.set_defaults(func=cmd_plan) + + # apply + sp = sub.add_parser("apply", help="build the stitched .tar.gz from a stitch_plan.yaml") + sp.add_argument("shard_dir", type=Path) + sp.add_argument("plan", type=Path, help="stitch_plan.yaml") + sp.add_argument("--out", type=Path, default=None, + help="output .tar.gz (default: /.stitched.rootfs.tar.gz)") + _add_apply_args(sp) + sp.set_defaults(func=cmd_apply) + + # all + sp = sub.add_parser("all", help="shard -> plan -> apply end-to-end") + sp.add_argument("firmware", type=Path) + sp.add_argument("--shard-dir", type=Path, required=True) + sp.add_argument("--out", type=Path, default=None) + sp.add_argument("--extractor", choices=["unblob", "binwalk"], default="unblob") + sp.add_argument("--min-score", type=int, default=3) + sp.add_argument("--no-reextract", action="store_true") + sp.add_argument("--no-fakeroot", action="store_true") + sp.add_argument("--no-apply", action="store_true", help="stop after plan, don't build stitched tar") + _add_llm_args(sp) + _add_apply_args(sp) + sp.set_defaults(func=cmd_all) + + return parser + + +def main(argv: list[str] | None = None) -> int: + # Auto-load env files first (before reading any env vars). Done early so + # the fakeroot re-exec inherits the resulting environment too. + _autoload_env_files() + explicit = _peek_arg(sys.argv if argv is None else argv, "--env-file") + if explicit is not None: + _load_env_file(Path(explicit)) + + # Peek at the subcommand before parsing so we can re-exec under fakeroot + # without losing flags or burning argparse on a doomed parse. argv=None + # means "use sys.argv", which is the normal case where re-exec applies. + if argv is None: + cmd_name = next((a for a in sys.argv[1:] if not a.startswith("-")), "") + _reexec_under_fakeroot(cmd_name) + + parser = build_parser() + args = parser.parse_args(argv) + + if args.cmd == "shard": + if args.firmware is None and args.from_extracted is None: + parser.error("shard: provide either FIRMWARE or --from-extracted") + + return args.func(args) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/utils/stitch/harness.py b/utils/stitch/harness.py new file mode 100644 index 0000000..17eda71 --- /dev/null +++ b/utils/stitch/harness.py @@ -0,0 +1,455 @@ +"""Tool-use loop driving a local LLM through firmware-fragment analysis. + +Two modes: + * Native: OpenAI-style tool calling (most servers — vllm, recent ollama, etc.) + * Fallback: model emits one JSON object per turn — for servers that don't + support tools. + +Auto-detect: try native first, fall back if the server 400s on `tools=` or +returns two empty `tool_calls` in a row. +""" +from __future__ import annotations + +import json +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from pydantic import ValidationError + +from .plan import StitchPlan +from .prompts import ( + FALLBACK_SYSTEM_PROMPT, + INITIAL_USER_PROMPT, + NUDGE_FORCE_SUBMIT, + NUDGE_NO_TOOL, + NUDGE_VALIDATION, + SYSTEM_PROMPT, +) +from .tools import ( + TOOLS, + TOOLS_BY_NAME, + FragmentCache, + FragmentOnlyArgs, + tool_fs_summary, + to_openai_schemas, +) + + +def _import_openai(): + try: + from openai import OpenAI # type: ignore + return OpenAI + except ImportError as e: + raise SystemExit( + "openai package not installed. " + "Install with: pip install -r fw2tar/utils/stitch/requirements.txt" + ) from e + + +@dataclass +class HarnessConfig: + base_url: str | None + api_key: str + model: str + max_turns: int = 10 + request_timeout: float = 120.0 + force_fallback: bool = False + verbose: bool = False + insecure: bool = False # skip TLS cert verification (self-signed local models) + + +def _make_client(OpenAI, cfg: "HarnessConfig"): + """Construct the OpenAI client, honoring cfg.insecure for self-signed servers.""" + if cfg.insecure: + try: + import httpx # openai SDK already depends on httpx + except ImportError as e: + raise SystemExit( + "--insecure requires httpx (a transitive dep of openai). Reinstall openai." + ) from e + http_client = httpx.Client(verify=False, timeout=cfg.request_timeout) + return OpenAI(base_url=cfg.base_url, api_key=cfg.api_key, http_client=http_client) + return OpenAI(base_url=cfg.base_url, api_key=cfg.api_key, timeout=cfg.request_timeout) + + +@dataclass +class TurnLog: + role: str + content: str + tool_name: str | None = None + + +@dataclass +class RunResult: + plan: StitchPlan + used_fallback: bool + turns: int + transcript: list[TurnLog] = field(default_factory=list) + + +def _fragment_summary_block(cache: FragmentCache) -> str: + """Build the precomputed fs_summary block injected into the initial user + message. Saves the LLM from spending its first N turns calling fs_summary. + """ + chunks = [] + for name in cache.names(): + s = tool_fs_summary(cache, FragmentOnlyArgs(fragment=name)) + provenance_parts = [] + if "fs_type_guess" in s: + provenance_parts.append(f"fs_type_guess={s['fs_type_guess']}") + if "root_path" in s: + provenance_parts.append(f"unblob_path={s['root_path']!r}") + if "shard_score" in s: + provenance_parts.append(f"score={s['shard_score']}") + provenance = ("\n " + ", ".join(provenance_parts)) if provenance_parts else "" + chunks.append( + f"- {name}\n" + f" extractor={s['extractor']}, index={s['index']}, " + f"size={s['compressed_size']} bytes" + provenance + "\n" + f" has_etc_passwd={s['has_etc_passwd']}, has_sbin_init={s['has_sbin_init']}, " + f"has_bin_sh={s['has_bin_sh']}, has_lib_ld={s['has_lib_ld']}, " + f"has_etc_fstab={s['has_etc_fstab']}, has_etc_inittab={s['has_etc_inittab']}, " + f"has_etc_init_d_rcS={s['has_etc_init_d_rcS']}\n" + f" top_dirs={[(d['name'], d['count']) for d in s['top_dirs']]}" + ) + return "\n".join(chunks) + + +# --------------- Reachability --------------- + +def reachability_check(client, model: str) -> None: + try: + client.chat.completions.create( + model=model, + messages=[{"role": "user", "content": "ping"}], + max_tokens=1, + timeout=10.0, + ) + except Exception as e: + raise SystemExit(f"LLM endpoint unreachable (model={model!r}): {e}") + + +# --------------- Native tool-use loop --------------- + +def _run_native(cache: FragmentCache, cfg: HarnessConfig, OpenAI) -> RunResult: + client = _make_client(OpenAI, cfg) + reachability_check(client, cfg.model) + + plan_schema = StitchPlan.model_json_schema() + tool_schemas = to_openai_schemas(plan_schema) + + messages: list[dict] = [ + {"role": "system", "content": SYSTEM_PROMPT}, + { + "role": "user", + "content": INITIAL_USER_PROMPT.format( + fragment_summaries=_fragment_summary_block(cache) + ), + }, + ] + transcript: list[TurnLog] = [TurnLog(role="system", content=SYSTEM_PROMPT), + TurnLog(role="user", content=messages[1]["content"])] + + consecutive_empty = 0 + same_tool_validation_fail: dict[str, int] = {} + + for turn in range(cfg.max_turns): + force_submit = (turn == cfg.max_turns - 1) + if force_submit: + messages.append({"role": "user", "content": NUDGE_FORCE_SUBMIT}) + transcript.append(TurnLog(role="user", content=NUDGE_FORCE_SUBMIT)) + + kwargs: dict[str, Any] = { + "model": cfg.model, + "messages": messages, + "tools": tool_schemas, + "temperature": 0.0, + } + if force_submit: + kwargs["tool_choice"] = {"type": "function", "function": {"name": "submit_plan"}} + else: + kwargs["tool_choice"] = "auto" + + if cfg.verbose: + print(f"[harness] turn {turn+1}/{cfg.max_turns} (native)", file=sys.stderr) + + try: + resp = client.chat.completions.create(**kwargs) + except Exception as e: + err_msg = str(e) + # Crude heuristic: server rejected tools — escalate to fallback. + if "tool" in err_msg.lower() and ("not support" in err_msg.lower() or "400" in err_msg): + raise _SwitchToFallback(err_msg) + raise + + msg = resp.choices[0].message + tool_calls = getattr(msg, "tool_calls", None) or [] + text = msg.content or "" + + transcript.append(TurnLog( + role="assistant", + content=text or json.dumps([ + {"name": tc.function.name, "args": tc.function.arguments} for tc in tool_calls + ]), + )) + + assistant_msg: dict[str, Any] = {"role": "assistant", "content": text} + if tool_calls: + assistant_msg["tool_calls"] = [ + { + "id": tc.id, + "type": "function", + "function": {"name": tc.function.name, "arguments": tc.function.arguments}, + } + for tc in tool_calls + ] + messages.append(assistant_msg) + + if not tool_calls: + consecutive_empty += 1 + if consecutive_empty >= 2 and not force_submit: + # Server probably doesn't support tools; escalate. + raise _SwitchToFallback("two consecutive empty tool_calls") + messages.append({"role": "user", "content": NUDGE_NO_TOOL}) + transcript.append(TurnLog(role="user", content=NUDGE_NO_TOOL)) + continue + consecutive_empty = 0 + + # Handle each tool call sequentially. + terminated_plan: StitchPlan | None = None + for tc in tool_calls: + name = tc.function.name + try: + raw_args = json.loads(tc.function.arguments or "{}") + except json.JSONDecodeError as e: + messages.append({ + "role": "tool", "tool_call_id": tc.id, "name": name, + "content": json.dumps({"error": f"bad json arguments: {e}"}), + }) + continue + + if name == "submit_plan": + try: + plan = StitchPlan.model_validate(raw_args) + terminated_plan = plan + messages.append({ + "role": "tool", "tool_call_id": tc.id, "name": name, + "content": json.dumps({"ok": True}), + }) + except ValidationError as e: + err = {"error": "plan failed validation", "details": e.errors()[:5]} + messages.append({ + "role": "tool", "tool_call_id": tc.id, "name": name, + "content": json.dumps(err), + }) + continue + + tool = TOOLS_BY_NAME.get(name) + if tool is None: + messages.append({ + "role": "tool", "tool_call_id": tc.id, "name": name, + "content": json.dumps({"error": f"unknown tool: {name}"}), + }) + continue + + try: + args_obj = tool.args_model.model_validate(raw_args) + except ValidationError as e: + same_tool_validation_fail[name] = same_tool_validation_fail.get(name, 0) + 1 + if same_tool_validation_fail[name] > 2: + messages.append({ + "role": "tool", "tool_call_id": tc.id, "name": name, + "content": json.dumps({"error": "too many validation failures for this tool, stop using it"}), + }) + continue + schema = tool.args_model.model_json_schema() + messages.append({ + "role": "tool", "tool_call_id": tc.id, "name": name, + "content": NUDGE_VALIDATION.format(error=str(e), schema=json.dumps(schema)[:1500]), + }) + continue + + try: + result = tool.fn(cache, args_obj) + except Exception as e: + result = {"error": f"tool raised: {e}"} + messages.append({ + "role": "tool", "tool_call_id": tc.id, "name": name, + "content": json.dumps(result)[:8000], + }) + transcript.append(TurnLog(role="tool", content=json.dumps(result)[:1000], tool_name=name)) + + if terminated_plan is not None: + return RunResult(plan=terminated_plan, used_fallback=False, + turns=turn + 1, transcript=transcript) + + raise SystemExit("loop terminated without a valid plan (max_turns reached)") + + +# --------------- Fallback (JSON-mode) loop --------------- + +class _SwitchToFallback(Exception): + pass + + +def _extract_first_json_object(text: str) -> dict | None: + """Extract the first balanced {...} from text. Used for fallback parsing. + + Re's nested group recursion isn't available in stdlib; we do a manual scan. + """ + s = text + start = s.find("{") + while start != -1: + depth = 0 + in_str = False + esc = False + for i in range(start, len(s)): + c = s[i] + if in_str: + if esc: + esc = False + elif c == "\\": + esc = True + elif c == '"': + in_str = False + else: + if c == '"': + in_str = True + elif c == "{": + depth += 1 + elif c == "}": + depth -= 1 + if depth == 0: + candidate = s[start:i + 1] + try: + return json.loads(candidate) + except json.JSONDecodeError: + break + start = s.find("{", start + 1) + return None + + +def _tool_descriptions_block() -> str: + parts = [] + for t in TOOLS: + parts.append(f" - {t.name}: {t.description}\n args schema: {json.dumps(t.args_model.model_json_schema())}") + return "\n".join(parts) + + +def _run_fallback(cache: FragmentCache, cfg: HarnessConfig, OpenAI) -> RunResult: + client = _make_client(OpenAI, cfg) + reachability_check(client, cfg.model) + + plan_schema = StitchPlan.model_json_schema() + system = FALLBACK_SYSTEM_PROMPT.format( + tool_descriptions=_tool_descriptions_block(), + plan_schema=json.dumps(plan_schema), + ) + messages: list[dict] = [ + {"role": "system", "content": system}, + { + "role": "user", + "content": INITIAL_USER_PROMPT.format( + fragment_summaries=_fragment_summary_block(cache) + ), + }, + ] + transcript: list[TurnLog] = [TurnLog(role="system", content=""), + TurnLog(role="user", content=messages[1]["content"])] + + consecutive_unparsed = 0 + consecutive_validation_fail = 0 + + for turn in range(cfg.max_turns): + force_submit = (turn == cfg.max_turns - 1) + if force_submit: + messages.append({"role": "user", "content": NUDGE_FORCE_SUBMIT}) + transcript.append(TurnLog(role="user", content=NUDGE_FORCE_SUBMIT)) + + if cfg.verbose: + print(f"[harness] turn {turn+1}/{cfg.max_turns} (fallback)", file=sys.stderr) + + resp = client.chat.completions.create( + model=cfg.model, messages=messages, temperature=0.0, + ) + text = resp.choices[0].message.content or "" + transcript.append(TurnLog(role="assistant", content=text)) + messages.append({"role": "assistant", "content": text}) + + obj = _extract_first_json_object(text) + if obj is None: + consecutive_unparsed += 1 + if consecutive_unparsed >= 2: + raise SystemExit("fallback: model produced no parseable JSON for 2 turns") + messages.append({"role": "user", "content": "Your last message did not contain a parseable JSON object. Respond with exactly one JSON object."}) + continue + consecutive_unparsed = 0 + + if "final" in obj: + try: + plan = StitchPlan.model_validate(obj["final"]) + return RunResult(plan=plan, used_fallback=True, + turns=turn + 1, transcript=transcript) + except ValidationError as e: + consecutive_validation_fail += 1 + if consecutive_validation_fail >= 2: + raise SystemExit(f"fallback: plan failed validation twice: {e}") + messages.append({"role": "user", "content": f"Your plan failed validation: {e}. Fix and resend."}) + continue + + if "tool" in obj: + name = obj.get("tool") + args = obj.get("args") or {} + tool = TOOLS_BY_NAME.get(name) + if tool is None: + messages.append({"role": "user", "content": json.dumps({"error": f"unknown tool: {name}"})}) + continue + try: + args_obj = tool.args_model.model_validate(args) + except ValidationError as e: + messages.append({"role": "user", "content": json.dumps({"error": "bad args", "details": str(e)})}) + continue + try: + result = tool.fn(cache, args_obj) + except Exception as e: + result = {"error": f"tool raised: {e}"} + messages.append({"role": "user", "content": json.dumps({"tool_result": {"name": name, "result": result}})[:8000]}) + transcript.append(TurnLog(role="tool", content=json.dumps(result)[:1000], tool_name=name)) + continue + + messages.append({"role": "user", "content": "JSON did not contain 'tool' or 'final' — re-read instructions and try again."}) + + raise SystemExit("fallback loop terminated without a valid plan (max_turns reached)") + + +# --------------- Entry point --------------- + +def run(frag_dir: Path, cfg: HarnessConfig) -> RunResult: + OpenAI = _import_openai() + cache = FragmentCache(frag_dir) + if not cache.names(): + raise SystemExit( + f"no fragment .tar.gz files found in {frag_dir}. " + "Run `fwstitch shard -o ` first to produce shards." + ) + if len(cache.names()) == 1: + print( + f"WARNING: only one fragment in {frag_dir}: {cache.names()[0]}. " + "Stitching may be a no-op. Try `fwstitch shard --min-score 3` (lower threshold) " + "or `fwstitch shard --extractor binwalk` to capture more fragments.", + file=sys.stderr, + ) + + try: + if cfg.force_fallback: + return _run_fallback(cache, cfg, OpenAI) + try: + return _run_native(cache, cfg, OpenAI) + except _SwitchToFallback as e: + if cfg.verbose: + print(f"[harness] switching to fallback: {e}", file=sys.stderr) + return _run_fallback(cache, cfg, OpenAI) + finally: + cache.close() diff --git a/utils/stitch/plan.py b/utils/stitch/plan.py new file mode 100644 index 0000000..73075ad --- /dev/null +++ b/utils/stitch/plan.py @@ -0,0 +1,189 @@ +"""StitchPlan schema, yaml IO, and apply(). + +A StitchPlan describes how to merge multiple filesystem fragments (each one a +.tar.gz produced by fw2tar) into a single unified rootfs tarball. One fragment +is the "base" mounted at /, the rest are "overlays" mounted at sub-paths. + +apply_plan() streams members from each input tar, rewrites their paths to sit +under the chosen mount point, and writes a single gzipped output tar that +preserves permissions, ownership, mtimes, and symlinks. +""" +from __future__ import annotations + +import gzip +import hashlib +import json +import posixpath +import sys +import tarfile +from pathlib import Path +from typing import Literal + +import yaml +from pydantic import BaseModel, Field, model_validator + + +class Fragment(BaseModel): + source: str + mount_point: str + role: Literal["base", "overlay"] + fs_type: str | None = None + notes: str | None = None + + @model_validator(mode="after") + def normalize(self): + mp = self.mount_point + if not mp.startswith("/"): + raise ValueError(f"mount_point must be absolute: {mp!r}") + self.mount_point = posixpath.normpath(mp) + return self + + +class StitchPlan(BaseModel): + fragments: list[Fragment] = Field(min_length=1) + reasoning: str + confidence: Literal["low", "medium", "high"] + open_questions: list[str] = [] + + @model_validator(mode="after") + def one_base(self): + bases = [f for f in self.fragments if f.role == "base"] + if len(bases) != 1: + raise ValueError(f"plan must have exactly one base fragment, got {len(bases)}") + if bases[0].mount_point != "/": + raise ValueError(f"base fragment must be mounted at /, got {bases[0].mount_point!r}") + seen_mounts: set[str] = set() + for f in self.fragments: + if f.mount_point in seen_mounts: + raise ValueError(f"duplicate mount_point: {f.mount_point}") + seen_mounts.add(f.mount_point) + return self + + +def dump_plan(plan: StitchPlan, path: Path) -> None: + data = plan.model_dump(exclude_none=True) + with open(path, "w") as f: + yaml.safe_dump(data, f, sort_keys=False, default_flow_style=False) + + +def load_plan(path: Path) -> StitchPlan: + with open(path, "r") as f: + data = yaml.safe_load(f) + return StitchPlan.model_validate(data) + + +def plan_hash(plan: StitchPlan) -> str: + canonical = json.dumps(plan.model_dump(), sort_keys=True).encode() + return hashlib.sha1(canonical).hexdigest() + + +def _rewrite_path(mount_point: str, name: str) -> str: + name = name.lstrip("./") + if mount_point == "/": + joined = "/" + name + else: + joined = mount_point.rstrip("/") + "/" + name + return posixpath.normpath(joined).lstrip("/") + + +def _rewrite_linkname(mount_point: str, linkname: str) -> str: + # Absolute symlink targets are left alone: at runtime they resolve against + # the unified rootfs view, which is exactly what the original firmware + # author intended when the partition was mounted at . Relative + # targets are unchanged since they resolve relative to the link's location. + return linkname + + +def apply_plan( + plan: StitchPlan, + frag_dir: Path, + out_path: Path, + on_conflict: Literal["base", "overlay", "error"] = "overlay", + verbose: bool = False, +) -> dict: + """Produce a single stitched .tar.gz from the plan. + + Returns a stats dict with conflict counts, members written, and the plan + hash. on_conflict controls which side wins when two fragments place a + member at the same path: "base" keeps the first occurrence (base is + processed first), "overlay" keeps the last (matches union-mount intuition), + "error" raises. + """ + ordered = sorted(plan.fragments, key=lambda f: 0 if f.role == "base" else 1) + + seen: dict[str, str] = {} + conflicts: list[tuple[str, str, str]] = [] + members_written = 0 + + out_path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = out_path.with_suffix(out_path.suffix + ".tmp") + + with tarfile.open(tmp_path, "w:gz") as out_tar: + for frag in ordered: + src = frag_dir / frag.source + if not src.exists(): + raise FileNotFoundError(f"fragment not found: {src}") + if verbose: + print(f"[apply] {frag.source} ({frag.role}) -> {frag.mount_point}", file=sys.stderr) + with tarfile.open(src, "r:*") as in_tar: + for ti in in_tar: + new_name = _rewrite_path(frag.mount_point, ti.name) + if not new_name: + continue + if new_name in seen: + conflicts.append((new_name, seen[new_name], frag.source)) + if on_conflict == "error": + raise RuntimeError( + f"path collision at {new_name}: {seen[new_name]} vs {frag.source}" + ) + if on_conflict == "base": + continue + new_ti = tarfile.TarInfo(name=new_name) + new_ti.size = ti.size + new_ti.mode = ti.mode + new_ti.uid = ti.uid + new_ti.gid = ti.gid + new_ti.uname = ti.uname + new_ti.gname = ti.gname + new_ti.mtime = ti.mtime + new_ti.type = ti.type + new_ti.linkname = _rewrite_linkname(frag.mount_point, ti.linkname) if ti.linkname else "" + new_ti.devmajor = ti.devmajor + new_ti.devminor = ti.devminor + if ti.isreg(): + f = in_tar.extractfile(ti) + out_tar.addfile(new_ti, fileobj=f) + else: + out_tar.addfile(new_ti) + seen[new_name] = frag.source + members_written += 1 + + # fw2tar metadata trailer — see show_metadata.py and src/archive.rs. The + # trailer lives in the *decompressed* gzip view, after the tar EOF blocks. + # gzip supports multi-member concatenation, so we append a second gzip + # member that decompresses to: 16 nulls + json + "made with fw2tar". + metadata = { + "file": str(out_path.name), + "fw2tar_command": "stitch (fw2tar.utils.stitch)", + "input_hash": plan_hash(plan), + "stitched_from": [f.source for f in plan.fragments], + "stitch_plan_confidence": plan.confidence, + } + # Note the "\n" separator: show_metadata.py does string.split("\n") to split + # the json blob from the magic. archive.rs omits it (latent inconsistency + # between fw2tar and its own utility); we match show_metadata.py here so the + # existing tool keeps working on stitched outputs. + with open(tmp_path, "ab") as f, gzip.GzipFile(fileobj=f, mode="wb") as g: + g.write(b"\x00" * 0x10) + g.write(json.dumps(metadata).encode()) + g.write(b"\nmade with fw2tar") + + tmp_path.rename(out_path) + + return { + "members_written": members_written, + "conflicts": len(conflicts), + "conflict_samples": conflicts[:10], + "plan_hash": plan_hash(plan), + "out_path": str(out_path), + } diff --git a/utils/stitch/prompts.py b/utils/stitch/prompts.py new file mode 100644 index 0000000..970a8a8 --- /dev/null +++ b/utils/stitch/prompts.py @@ -0,0 +1,77 @@ +"""Prompts for the stitcher harness. Kept terse — local models do better with +short, structured instructions than long expository prose. +""" + +SYSTEM_PROMPT = """\ +You are a firmware-analysis assistant. Multiple Linux filesystem fragments +were extracted from one firmware blob and you must decide how to stitch them +into a single rootfs. + +Exactly ONE fragment is the BASE and is mounted at "/" — it has /bin, /etc, +/sbin/init or similar. The others are OVERLAYS mounted under a sub-path of +the base (e.g. an app partition at /opt/app, a config partition at +/etc/CONFIG, a modules partition at /usr/local/modules). + +To decide the layout, gather evidence by calling tools. Useful signals: + * fs_type_guess from the manifest (squashfs / ubifs / jffs2 / cpio / ...) — + type and unblob's extraction path are strong hints for the role of a + fragment (e.g. ubifs partitions are often app/data overlays). + * /etc/fstab entries (mount points and device names) + * mount commands in /etc/init.d/rcS, /etc/inittab, /etc/rc.local + * dangling absolute symlinks (link target missing inside this fragment ==> + that path lives in another fragment) + * hardcoded paths in /sbin/init or /bin/busybox via strings_of + +Constraints: + * Call tools one at a time. Be terse. + * Do not ask the user questions; act on the evidence. + * When you have enough evidence (or after a few rounds), call submit_plan + with your StitchPlan. The harness validates it against a schema. + * One base at "/", every other fragment is an overlay at a non-"/" absolute + path. Mount points must be unique. + +If evidence is ambiguous, pick the most likely layout, set confidence="low", +and list your uncertainties in open_questions. +""" + +INITIAL_USER_PROMPT = """\ +Below is a one-shot summary of each fragment (precomputed fs_summary output). +Use the tools to investigate further as needed, then call submit_plan. + +Fragments in this run: + +{fragment_summaries} +""" + +NUDGE_NO_TOOL = ( + "You did not call a tool. You must call exactly one tool per turn. " + "Either gather more evidence with a tool, or finalize with submit_plan." +) + +NUDGE_VALIDATION = ( + "Your last tool call failed schema validation: {error}\n" + "Schema: {schema}\n" + "Try again with corrected arguments." +) + +NUDGE_FORCE_SUBMIT = ( + "This is the final turn. Call submit_plan now with your best guess." +) + +# Fallback (no-native-tools) mode: model emits JSON per turn. +FALLBACK_SYSTEM_PROMPT = SYSTEM_PROMPT + """ + +Your server does not support native tool calling. Instead, on each turn, +respond with a SINGLE JSON object and nothing else, in one of these forms: + + Tool call: {"tool": "", "args": { ... }} + Final plan: {"final": { ...StitchPlan... }} + +Available tools and their argument schemas: + +{tool_descriptions} + +Final plan schema (StitchPlan): + +{plan_schema} +""" diff --git a/utils/stitch/requirements.txt b/utils/stitch/requirements.txt new file mode 100644 index 0000000..7af9ac3 --- /dev/null +++ b/utils/stitch/requirements.txt @@ -0,0 +1,3 @@ +openai>=1.40 +pydantic>=2.6 +pyyaml>=6 diff --git a/utils/stitch/shard.py b/utils/stitch/shard.py new file mode 100644 index 0000000..5a5c262 --- /dev/null +++ b/utils/stitch/shard.py @@ -0,0 +1,682 @@ +"""Pile-of-shards extraction. + +The `shard` step replaces fw2tar's single-rootfs assumption. It runs an +extractor (unblob preferred, binwalk fallback), walks the resulting tree, +identifies every directory that looks like a Linux filesystem fragment, and +emits each as its own .tar.gz alongside a `shards.json` manifest with +provenance. + +Why this beats wiring up `_secondary_limit` in fw2tar's Rust: + * fw2tar's "is this root-like?" heuristic (find_linux_filesystems.rs) + discards UBIFS app partitions, squashfs module blobs, config-only + partitions — exactly the shards we need to stitch. + * unblob's `*_extract` directory naming encodes the on-disk fs type + (squashfs/ubifs/jffs2/cpio/gzip/...). That metadata flows through to the + stitcher as a hint for the LLM. + * No Rust rebuild; iterates fast on host. + +Where this runs: anywhere `unblob` (or `binwalk`) is on PATH. Inside the +fw2tar Docker container is the most reliable place — both extractors are +already installed there. +""" +from __future__ import annotations + +import json +import os +import re +import shutil +import subprocess +import sys +import tarfile +import tempfile +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Optional + + +# Top-level dir names that, when present, scream "Linux rootfs." +ROOT_DIRS = frozenset([ + "bin", "sbin", "etc", "usr", "lib", "lib32", "lib64", + "var", "opt", "root", "dev", "proc", "sys", "tmp", + "home", "mnt", "media", "run", "boot", "srv", +]) + +# Specific paths that strongly indicate a rootfs. +ROOTFS_FILES = ( + "etc/passwd", "etc/inittab", "etc/fstab", "etc/init.d/rcS", + "sbin/init", "bin/sh", "bin/busybox", +) + +# Filesystem types whose default extractor in unblob/binwalk (7z) is known to +# corrupt permissions, ownership, or setuid bits. For these, we locate the +# original blob next to the *_extract dir and re-extract with the native tool. +# Maps fs_type_guess -> reextractor name (key into REEXTRACTORS below). +REEXTRACTOR_FOR_TYPE: dict[str, str] = { + "cpio": "cpio", +} + + +# unblob extraction-directory suffixes -> fs type hints. +# Order matters: longer, more-specific suffixes come first. +EXTRACT_SUFFIX_TYPES: list[tuple[str, str]] = [ + ("squashfs_v4_le_extract", "squashfs"), + ("squashfs_v4_be_extract", "squashfs"), + ("squashfs_v3_le_extract", "squashfs"), + ("squashfs_v3_be_extract", "squashfs"), + ("squashfs_extract", "squashfs"), + ("ubifs_extract", "ubifs"), + ("ubi_extract", "ubi"), + ("jffs2_extract", "jffs2"), + ("cramfs_extract", "cramfs"), + ("yaffs2_extract", "yaffs2"), + ("yaffs_extract", "yaffs"), + ("ramdisk_el_extract", "cpio"), + ("ramdisk_eb_extract", "cpio"), + ("cpio_extract", "cpio"), + ("tar_extract", "tar"), + ("gzip_extract", "gzip"), + ("ext_extract", "ext"), + ("fat_extract", "fat"), + ("iso9660_extract", "iso9660"), + ("romfs_extract", "romfs"), +] + + +@dataclass +class ShardInfo: + name: str # tarball basename + score: int # filesystem-likeness score + root_path: str # path relative to the extraction root + fs_type_guess: Optional[str] + matched_root_dirs: list[str] = field(default_factory=list) + matched_rootfs_files: list[str] = field(default_factory=list) + file_count: int = 0 + reextracted_with: Optional[str] = None # native tool used to re-extract, if any + source_blob: Optional[str] = None # path of the original blob (relative) + + +def _guess_fs_type(path: Path, extraction_root: Path) -> Optional[str]: + """Best-effort fs type guess from unblob's directory naming. Walk up the + ancestors until we hit a known `*_extract` suffix. + """ + rel = path.relative_to(extraction_root) + for part in reversed(rel.parts): + for suffix, ty in EXTRACT_SUFFIX_TYPES: + if part.endswith(suffix): + return ty + return None + + +def _count_files(path: Path, cap: int = 5000) -> int: + """Cheap file count, capped to avoid pathological cost.""" + n = 0 + for _, _, filenames in os.walk(path): + n += len(filenames) + if n >= cap: + return n + return n + + +def score_directory(path: Path, top_children: list[str], extraction_root: Path | None = None) -> tuple[int, dict]: + """Score how filesystem-like a directory is. Score is informational — used + for ranking and to help the LLM tell base from overlay. Selection is + primarily driven by unblob's `*_extract` naming (see find_shards). + """ + top_set = set(top_children) + matched_root_dirs = sorted(top_set & ROOT_DIRS) + score = 5 * len(matched_root_dirs) + + matched_files: list[str] = [] + for f in ROOTFS_FILES: + if (path / f).exists(): + score += 3 + matched_files.append(f) + + interesting_subpaths = ("etc/init.d", "usr/local", "usr/bin", "lib/modules", "etc/config") + for sp in interesting_subpaths: + if (path / sp).exists(): + score += 2 + + if extraction_root is not None: + try: + rel = path.relative_to(extraction_root) + for part in rel.parts: + for suffix, _ty in EXTRACT_SUFFIX_TYPES: + if part.endswith(suffix): + score += 5 + break + except ValueError: + pass + + file_count = _count_files(path) if score > 0 or top_children else 0 + + return score, { + "matched_root_dirs": matched_root_dirs, + "matched_files": matched_files, + "file_count": file_count, + } + + +def _is_descendant(child: Path, parent: Path) -> bool: + try: + child.relative_to(parent) + return child != parent + except ValueError: + return False + + +def _has_extract_descendant(d: Path) -> bool: + """True if any directory inside `d` is itself a `*_extract` dir.""" + for _dirpath, dirnames, _ in os.walk(d): + for sub in dirnames: + if sub.endswith("_extract"): + return True + return False + + +# Of the fs types in EXTRACT_SUFFIX_TYPES, these are real on-disk filesystems +# or whole-tree archives — when unblob produces a `*__extract` directory +# it IS the filesystem boundary. The remainder ("gzip", "bzip2", ...) are +# transparent compression wrappers that just unwrap to a single blob inside, +# which we still want to recurse into. +_TERMINAL_FS_TYPES = frozenset({ + "squashfs", "ubifs", "ubi", "jffs2", "cramfs", "yaffs2", "yaffs", + "cpio", "tar", "ext", "fat", "iso9660", "romfs", +}) + + +def _has_known_fs_type_suffix(name: str) -> bool: + """True if the dir name carries a known on-disk-filesystem suffix + (squashfs, ubifs, jffs2, cpio, ramdisk_el, ...) and that type is a + full filesystem rather than a transparent compression wrapper. unblob + applies these suffixes only when it identifies the chunk as a filesystem + image — when present, the directory IS the filesystem regardless of + whether unblob further recursed into individual files inside it. + """ + for suffix, ty in EXTRACT_SUFFIX_TYPES: + if name.endswith(suffix) and ty in _TERMINAL_FS_TYPES: + return True + return False + + +def _find_fs_root(extract_dir: Path) -> Path: + """Descend through single-subdirectory wrappers (e.g. `squashfs-root`) to + reach the actual filesystem root. Stops if the wrapper itself looks like a + sub-extract or if there's branching. + """ + current = extract_dir + for _ in range(8): # hard cap to avoid pathological symlink loops + try: + children = list(current.iterdir()) + except OSError: + return current + dir_children = [c for c in children if c.is_dir()] + if ( + len(children) == 1 + and len(dir_children) == 1 + and not dir_children[0].name.endswith("_extract") + ): + current = dir_children[0] + continue + return current + return current + + +def find_shards(extracted: Path, min_score: int = 3, max_depth: int = 14) -> list[tuple[Path, int, dict]]: + """Pick filesystem-fragment leaves from an extraction tree. + + Selection rules (union): + * Every terminal `*_extract` directory (no further `*_extract` beneath + it) that has at least one subdirectory is a candidate. Selection is + independent of filesystem-likeness scoring, so overlay-shape shards + (e.g. a config-only partition) are not dropped. + * Additionally, any directory with a strong fs-likeness score >= + min_score is a candidate, to handle binwalk output or pre-extracted + trees that don't use unblob's naming. + + Then return only the leaves of the candidate forest — most-specific wins. + For each terminal extract, descend through single-child wrappers (e.g. + `squashfs-root`) to find the real filesystem root. + """ + # Pass 1: *_extract directories that look like a complete filesystem. + # A directory qualifies if EITHER: + # (a) its name carries a known on-disk-fs suffix (ubifs_extract, + # squashfs_v4_le_extract, jffs2_extract, cpio_extract, ramdisk_el_extract, + # ...) — in that case it IS the filesystem even if unblob also + # recursed into a sub-blob inside it, OR + # (b) it's a terminal *_extract (no further *_extract anywhere below) — + # used for generic blob chains where unblob couldn't name the fs type. + extract_candidates: set[Path] = set() + for dirpath, dirnames, _filenames in os.walk(extracted): + d = Path(dirpath) + if d == extracted: + continue + if len(d.relative_to(extracted).parts) > max_depth: + dirnames[:] = [] + continue + if d.name.endswith("_extract"): + qualifies = ( + _has_known_fs_type_suffix(d.name) + or not _has_extract_descendant(d) + ) + if qualifies and any(c.is_dir() for c in d.iterdir()): + extract_candidates.add(_find_fs_root(d)) + # Don't recurse into this candidate — its insides aren't + # separate shards. + dirnames[:] = [] + + # Pass 2: score-based fallback for trees that don't use unblob's naming + # (binwalk output, pre-extracted directories, etc.). Gate: skip anything + # at or below an already-identified extract shard. + score_candidates: set[Path] = set() + for dirpath, dirnames, _filenames in os.walk(extracted): + d = Path(dirpath) + if d == extracted: + continue + if len(d.relative_to(extracted).parts) > max_depth: + dirnames[:] = [] + continue + if any(d == ec or _is_descendant(d, ec) for ec in extract_candidates): + continue + score, _ev = score_directory(d, dirnames, extraction_root=extracted) + if score >= min_score: + score_candidates.add(d) + + # Score every candidate, then keep the highest-scoring path in each + # ancestor chain. Extract candidates get an unbeatable boost so they always + # win against score-based candidates inside the same chain (though the + # gate above already prevents most overlaps). + EXTRACT_BOOST = 10_000 + scored: list[tuple[Path, int, dict, bool]] = [] + for p in extract_candidates | score_candidates: + children = [c.name for c in p.iterdir() if c.is_dir()] + s, ev = score_directory(p, children, extraction_root=extracted) + is_extract = p in extract_candidates + rank = s + (EXTRACT_BOOST if is_extract else 0) + scored.append((p, rank, ev, is_extract)) + + # Highest rank wins; tie-break by deeper path so descendants edge out parents. + scored.sort(key=lambda c: (-c[1], -len(c[0].parts))) + kept: list[tuple[Path, int, dict, bool]] = [] + for p, rank, ev, is_extract in scored: + if any(_is_descendant(p, k[0]) or _is_descendant(k[0], p) for k in kept): + continue + kept.append((p, rank, ev, is_extract)) + + results: list[tuple[Path, int, dict]] = [] + for p, rank, ev, is_extract in kept: + score = rank - (EXTRACT_BOOST if is_extract else 0) + results.append((p, score, ev)) + results.sort(key=lambda c: (-c[1], str(c[0]))) + return results + + +def _slugify(rel: Path) -> str: + s = "__".join(rel.parts) + s = re.sub(r"[^a-zA-Z0-9._-]+", "_", s).strip("_") + return s[:80] or "shard" + + +def tar_shards( + shards: list[tuple[Path, int, dict]], + extracted: Path, + out_dir: Path, + firmware_stem: str, + scratch_root: Optional[Path] = None, + reextract: bool = True, + verbose: bool = False, +) -> list[ShardInfo]: + """Tar each shard. When `reextract` is True and a shard's fs type has a + native perm-preserving extractor available, the shard is re-extracted from + its original blob before tarring (avoids 7z's permission corruption for + cpio and similar). The re-extraction happens under `scratch_root`; if + omitted, no re-extraction is attempted. + """ + out_dir.mkdir(parents=True, exist_ok=True) + infos: list[ShardInfo] = [] + for i, (path, score, ev) in enumerate(shards): + rel = path.relative_to(extracted) + slug = _slugify(rel) + fs_type = _guess_fs_type(path, extracted) + + tar_source = path + reextractor_used: Optional[str] = None + blob_used: Optional[Path] = None + if reextract and scratch_root is not None: + tar_source, reextractor_used, blob_used = reextract_shard( + path, fs_type, extracted, scratch_root, verbose=verbose, + ) + + tar_name = f"{firmware_stem}.shard.{i:02d}.{slug}.tar.gz" + tar_path = out_dir / tar_name + with tarfile.open(tar_path, "w:gz") as t: + t.add(tar_source, arcname=".", recursive=True) + infos.append(ShardInfo( + name=tar_name, score=score, root_path=str(rel), + fs_type_guess=fs_type, + matched_root_dirs=ev.get("matched_root_dirs", []), + matched_rootfs_files=ev.get("matched_files", []), + file_count=ev.get("file_count", 0), + reextracted_with=reextractor_used, + source_blob=(str(blob_used.relative_to(extracted)) if blob_used else None), + )) + return infos + + +def write_manifest(infos: list[ShardInfo], out_dir: Path, firmware: Optional[Path], extractor: str) -> Path: + manifest_path = out_dir / "shards.json" + payload = { + "firmware": firmware.name if firmware is not None else None, + "firmware_stem": firmware.stem if firmware is not None else None, + "extractor": extractor, + "shards": [asdict(i) for i in infos], + } + with open(manifest_path, "w") as f: + json.dump(payload, f, indent=2) + return manifest_path + + +def load_manifest(shard_dir: Path) -> Optional[dict]: + p = shard_dir / "shards.json" + if not p.exists(): + return None + with open(p, "r") as f: + return json.load(f) + + +# --------------- Re-extraction (perm-preserving) --------------- + +_GZIP_MAGIC = b"\x1f\x8b\x08" +_BZIP2_MAGIC = b"BZh" +_XZ_MAGIC = b"\xfd7zXZ\x00" +_LZ4_MAGIC = b"\x04\x22\x4d\x18" + +# cpio magic numbers (any of these means "this is a raw cpio archive"). +_CPIO_MAGICS = (b"070701", b"070702", b"070707", b"\xc7\x71", b"\x71\xc7") + + +def _read_magic(path: Path, n: int = 8) -> bytes: + try: + with open(path, "rb") as f: + return f.read(n) + except OSError: + return b"" + + +def _is_cpio(magic: bytes) -> bool: + return any(magic.startswith(m) for m in _CPIO_MAGICS) + + +def _decompress_pipeline(magic: bytes) -> Optional[list[str]]: + """Return the decompression command for a wrapper format, or None if the + blob is already raw. The command reads from stdin and writes to stdout. + """ + if magic.startswith(_GZIP_MAGIC): + return ["gunzip", "-c"] + if magic.startswith(_BZIP2_MAGIC): + return ["bunzip2", "-c"] + if magic.startswith(_XZ_MAGIC): + return ["unxz", "-c"] + if magic.startswith(_LZ4_MAGIC) and _which("lz4"): + return ["lz4", "-d", "-c"] + return None + + +def reextract_cpio(blob_path: Path, out_dir: Path, verbose: bool = False) -> bool: + """Re-extract a cpio (or gzipped/bzip2/xz-wrapped cpio) blob into out_dir + using native cpio so permissions, setuid bits, and symlinks are preserved. + + Returns True on success, False if anything went wrong (caller falls back + to the original 7z-extracted directory). + """ + if not _which("cpio"): + return False + out_dir.mkdir(parents=True, exist_ok=True) + magic = _read_magic(blob_path, 8) + + decomp = _decompress_pipeline(magic) + cpio_cmd = ["cpio", "-idmu", "--no-absolute-filenames", "--quiet"] + err_buf: bytes + try: + if decomp is not None: + if not _which(decomp[0]): + return False + with open(blob_path, "rb") as src: + p1 = subprocess.Popen(decomp, stdin=src, stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL) + p2 = subprocess.Popen(cpio_cmd, stdin=p1.stdout, cwd=out_dir, + stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) + if p1.stdout is not None: + p1.stdout.close() + _, err_buf = p2.communicate() + p1.wait() + ok = (p1.returncode == 0 and p2.returncode == 0) + elif _is_cpio(magic): + with open(blob_path, "rb") as src: + r = subprocess.run(cpio_cmd, stdin=src, cwd=out_dir, + stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, + check=False) + err_buf = r.stderr + ok = (r.returncode == 0) + else: + # Not a recognizable cpio (raw or compressed) — bail. + return False + except OSError as e: + if verbose: + print(f"[reextract] cpio failed for {blob_path}: {e}", file=sys.stderr) + return False + + if not ok and verbose: + print(f"[reextract] cpio non-zero exit for {blob_path}: {err_buf[:200]!r}", file=sys.stderr) + # Even on partial success we want at least one extracted file for the + # output to be useful; otherwise treat as failure. + if ok: + any_extracted = any(True for _ in out_dir.rglob("*")) + return any_extracted + return False + + +# Registry: maps reextractor key -> function (blob, out_dir, verbose) -> bool. +# Add entries here as new perm-preserving native extractors are needed. +REEXTRACTORS: dict[str, callable] = { + "cpio": reextract_cpio, +} + + +def _find_extract_ancestor(path: Path, extraction_root: Path) -> Optional[Path]: + """Walk up from `path` (inclusive) to find the first ancestor named + `*_extract`. Returns None if there isn't one within extraction_root. + """ + cur = path + while cur != extraction_root and cur != cur.parent: + if cur.name.endswith("_extract"): + return cur + cur = cur.parent + return None + + +def _find_original_blob(extract_dir: Path) -> Optional[Path]: + """Given a `_extract` directory, return the path of the original + blob `` if it exists as a sibling. Tries `` and `.` + variants for resilience. + """ + parent = extract_dir.parent + stem = extract_dir.name[: -len("_extract")] + candidate = parent / stem + if candidate.is_file(): + return candidate + # Some extractors emit `_extract` where the original was decompressed + # into `.uncompressed` first. We don't follow that chain here. + return None + + +def reextract_shard( + shard_path: Path, + fs_type: Optional[str], + extraction_root: Path, + scratch_root: Path, + verbose: bool = False, +) -> tuple[Path, Optional[str], Optional[Path]]: + """If this shard's type has a known native re-extractor and we can locate + the original blob, re-extract into a new directory under scratch_root and + return that path. Otherwise return the original path. + + Returns (effective_path, reextractor_name_or_None, source_blob_or_None). + """ + if fs_type is None or fs_type not in REEXTRACTOR_FOR_TYPE: + return shard_path, None, None + extractor_name = REEXTRACTOR_FOR_TYPE[fs_type] + fn = REEXTRACTORS.get(extractor_name) + if fn is None: + return shard_path, None, None + extract_dir = _find_extract_ancestor(shard_path, extraction_root) + if extract_dir is None: + return shard_path, None, None + blob = _find_original_blob(extract_dir) + if blob is None: + return shard_path, None, None + + # Place the re-extraction under scratch_root so it gets cleaned up. + safe_slug = re.sub(r"[^a-zA-Z0-9._-]+", "_", str(shard_path.relative_to(extraction_root)))[:120] + out = scratch_root / "reextract" / f"{extractor_name}_{safe_slug}" + if verbose: + print(f"[reextract] {extractor_name}: {blob.name} -> {out}", file=sys.stderr) + ok = fn(blob, out, verbose=verbose) + if not ok: + if verbose: + print(f"[reextract] {extractor_name} failed; keeping 7z extraction at {shard_path}", + file=sys.stderr) + return shard_path, None, None + return out, extractor_name, blob + + +# --------------- Extractor invocation --------------- + +class ExtractorMissing(RuntimeError): + pass + + +def _which(cmd: str) -> Optional[str]: + return shutil.which(cmd) + + +def run_unblob(firmware: Path, scratch: Path, verbose: bool = False) -> Path: + """Run unblob into scratch/unblob, return that directory.""" + if not _which("unblob"): + raise ExtractorMissing( + "unblob not found on PATH. Install it locally or run this inside " + "the fw2tar Docker container where it's already available." + ) + out = scratch / "unblob" + out.mkdir(parents=True, exist_ok=True) + # unblob's default --log is /.log which is at the filesystem + # root and not writable for non-root users in the container. Pin it inside + # the scratch dir instead. + log_path = scratch / "unblob.log" + cmd = ["unblob", "--extract-dir", str(out), "--log", str(log_path), str(firmware)] + if verbose: + print(f"[shard] running: {' '.join(cmd)}", file=sys.stderr) + # unblob can be chatty even on success; suppress unless verbose. + subprocess.run( + cmd, check=True, + stdout=None if verbose else subprocess.DEVNULL, + stderr=None if verbose else subprocess.DEVNULL, + ) + return out + + +def run_binwalk(firmware: Path, scratch: Path, verbose: bool = False) -> Path: + """Run binwalk recursive extraction into scratch/binwalk.""" + if not _which("binwalk"): + raise ExtractorMissing( + "binwalk not found on PATH. Install it locally or run this inside " + "the fw2tar Docker container where it's already available." + ) + out = scratch / "binwalk" + out.mkdir(parents=True, exist_ok=True) + cmd = ["binwalk", "--extract", "--matryoshka", "--directory", str(out), str(firmware)] + if verbose: + print(f"[shard] running: {' '.join(cmd)}", file=sys.stderr) + subprocess.run( + cmd, check=True, + stdout=None if verbose else subprocess.DEVNULL, + stderr=None if verbose else subprocess.DEVNULL, + ) + return out + + +# --------------- Top-level --------------- + +def shard( + firmware: Optional[Path], + out_dir: Path, + extractor: str = "unblob", + extracted_dir: Optional[Path] = None, + min_score: int = 3, + reextract: bool = True, + verbose: bool = False, +) -> dict: + """Extract a firmware blob into per-shard .tar.gz files + a manifest. + + If `extracted_dir` is supplied, `firmware` may be None and the tree is + walked directly. Re-extraction (e.g. native cpio for perm preservation) + still works as long as the original blobs are present next to the + *_extract dirs. + + Returns a dict summary suitable for printing. + """ + cleanup_scratch: Path | None = None + scratch_root: Path + if extracted_dir is not None: + if not extracted_dir.is_dir(): + raise FileNotFoundError(f"extracted_dir not found: {extracted_dir}") + extraction_root = extracted_dir + used_extractor = "preextracted" + # Even with a pre-extracted tree we need a scratch dir for re-extraction. + scratch_root = Path(tempfile.mkdtemp(prefix="fw2shard_")) + cleanup_scratch = scratch_root + else: + if firmware is None or not firmware.is_file(): + raise FileNotFoundError(f"firmware not found: {firmware}") + scratch_root = Path(tempfile.mkdtemp(prefix="fw2shard_")) + cleanup_scratch = scratch_root + if extractor == "unblob": + extraction_root = run_unblob(firmware, scratch_root, verbose=verbose) + elif extractor == "binwalk": + extraction_root = run_binwalk(firmware, scratch_root, verbose=verbose) + else: + raise ValueError(f"unknown extractor: {extractor!r}") + used_extractor = extractor + + # When --from-extracted was used we may not have a firmware path; pick a + # stable stem from the extracted dir name so the per-shard tarball names + # are deterministic. + firmware_stem = firmware.stem if firmware is not None else extraction_root.resolve().name + + try: + candidates = find_shards(extraction_root, min_score=min_score) + if verbose: + print(f"[shard] {len(candidates)} candidate fragment(s) selected", file=sys.stderr) + for p, s, ev in candidates: + print(f" score={s:3d} files={ev.get('file_count')} " + f"{p.relative_to(extraction_root)}", file=sys.stderr) + infos = tar_shards( + candidates, extraction_root, out_dir, firmware_stem, + scratch_root=scratch_root, reextract=reextract, verbose=verbose, + ) + manifest_path = write_manifest(infos, out_dir, firmware, used_extractor) + reextract_count = sum(1 for i in infos if i.reextracted_with) + if verbose and reextract_count: + print(f"[shard] re-extracted {reextract_count} shard(s) with native tools " + f"(perm-preserving)", file=sys.stderr) + return { + "shard_dir": str(out_dir), + "manifest": str(manifest_path), + "extractor": used_extractor, + "reextracted_count": reextract_count, + "count": len(infos), + "shards": [asdict(i) for i in infos], + } + finally: + if cleanup_scratch is not None: + shutil.rmtree(cleanup_scratch, ignore_errors=True) diff --git a/utils/stitch/tools.py b/utils/stitch/tools.py new file mode 100644 index 0000000..2871018 --- /dev/null +++ b/utils/stitch/tools.py @@ -0,0 +1,463 @@ +"""Tools the LLM can call to inspect filesystem fragments. + +Each tool: a pydantic args model + a function that takes the FragmentCache and +returns JSON-serializable output. The TOOLS registry is projected into OpenAI +tool schemas at startup. All tools enforce caps so context stays bounded on +small local models. +""" +from __future__ import annotations + +import fnmatch +import json +import re +import tarfile +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Callable + +from pydantic import BaseModel, Field + + +# fw2tar's per-extractor output naming: ...tar.gz +_FW2TAR_NAME_RE = re.compile(r"^(?P.+?)\.(?Pbinwalk|binwalkv3|binwalk3|unblob)\.(?P\d+)\.tar\.gz$") + +# The shard step's output naming: .shard...tar.gz +_SHARD_NAME_RE = re.compile(r"^(?P.+?)\.shard\.(?P\d+)\.(?P.+)\.tar\.gz$") + + +@dataclass +class FragmentInfo: + name: str + extractor: str | None + index: int | None + path: Path + size: int + # Populated from shards.json when the fragment dir was produced by the + # shard step. Strongest signals for the LLM live here. + fs_type_guess: str | None = None + root_path: str | None = None + matched_root_dirs: list[str] = field(default_factory=list) + matched_rootfs_files: list[str] = field(default_factory=list) + shard_score: int | None = None + file_count: int | None = None + reextracted_with: str | None = None + + +def _load_manifest(frag_dir: Path) -> dict[str, dict]: + """Return shards.json keyed by shard name, or {} if no manifest.""" + p = frag_dir / "shards.json" + if not p.exists(): + return {} + try: + with open(p, "r") as f: + data = json.load(f) + except (OSError, json.JSONDecodeError): + return {} + return {s["name"]: s for s in data.get("shards", [])} + + +class FragmentCache: + """Owns open TarFile handles, keyed by fragment basename.""" + + def __init__(self, frag_dir: Path): + self.frag_dir = frag_dir + self._infos: dict[str, FragmentInfo] = {} + self._tars: dict[str, tarfile.TarFile] = {} + self._names: dict[str, list[str]] = {} + manifest = _load_manifest(frag_dir) + for p in sorted(frag_dir.iterdir()): + if not p.is_file() or not p.name.endswith(".tar.gz"): + continue + if ".rootfs." in p.name and ".stitched." not in p.name: + # Skip fw2tar's final selected output; we want the raw per-extractor or shard pieces. + continue + extractor = None + idx = None + m = _SHARD_NAME_RE.match(p.name) + if m: + extractor = "shard" + idx = int(m.group("idx")) + else: + m2 = _FW2TAR_NAME_RE.match(p.name) + if m2: + extractor = m2.group("extractor") + idx = int(m2.group("idx")) + info = FragmentInfo( + name=p.name, extractor=extractor, index=idx, + path=p, size=p.stat().st_size, + ) + meta = manifest.get(p.name) + if meta: + info.fs_type_guess = meta.get("fs_type_guess") + info.root_path = meta.get("root_path") + info.matched_root_dirs = list(meta.get("matched_root_dirs") or []) + info.matched_rootfs_files = list(meta.get("matched_rootfs_files") or []) + info.shard_score = meta.get("score") + info.file_count = meta.get("file_count") + info.reextracted_with = meta.get("reextracted_with") + self._infos[p.name] = info + + def names(self) -> list[str]: + return list(self._infos.keys()) + + def info(self, name: str) -> FragmentInfo: + if name not in self._infos: + raise KeyError(f"unknown fragment: {name!r} (known: {list(self._infos)})") + return self._infos[name] + + def tar(self, name: str) -> tarfile.TarFile: + if name not in self._tars: + self._tars[name] = tarfile.open(self.info(name).path, "r:*") + return self._tars[name] + + def member_names(self, name: str) -> list[str]: + if name not in self._names: + self._names[name] = self.tar(name).getnames() + return self._names[name] + + def close(self): + for t in self._tars.values(): + try: + t.close() + except Exception: + pass + + +# ---------- Args models ---------- + +class ListPathsArgs(BaseModel): + fragment: str + pattern: str = Field(description="glob pattern, e.g. 'etc/*' or '**/init*'") + max: int = Field(default=50, ge=1, le=500) + + +class ReadFileArgs(BaseModel): + fragment: str + path: str + max_bytes: int = Field(default=4096, ge=1, le=32768) + + +class GrepArgs(BaseModel): + fragment: str + pattern: str = Field(description="regex (python re) matched per line") + path_glob: str = Field(default="etc/**") + max_hits: int = Field(default=20, ge=1, le=200) + + +class StringsArgs(BaseModel): + fragment: str + path: str + min_len: int = Field(default=6, ge=3, le=64) + max_hits: int = Field(default=80, ge=1, le=500) + + +class FragmentArgs(BaseModel): + fragment: str + max: int = Field(default=30, ge=1, le=200) + + +class FragmentOnlyArgs(BaseModel): + fragment: str + + +# ---------- Helpers ---------- + +def _normalize(name: str) -> str: + n = name.lstrip("./") + return n + + +def _resolve_member(tf: tarfile.TarFile, path: str) -> tarfile.TarInfo | None: + """Find a member by relaxed path lookup. Tarballs may store names as './foo'.""" + candidates = [path, "./" + path.lstrip("/"), path.lstrip("/")] + for c in candidates: + try: + return tf.getmember(c) + except KeyError: + continue + norm = _normalize(path) + for m in tf.getmembers(): + if _normalize(m.name) == norm: + return m + return None + + +def _glob_paths(names: list[str], pattern: str, limit: int) -> list[str]: + norm_names = [_normalize(n) for n in names] + matched: list[str] = [] + if "**" in pattern: + # fnmatch doesn't handle ** — convert to a regex. + regex_pat = re.escape(pattern).replace(r"\*\*", ".*").replace(r"\*", "[^/]*").replace(r"\?", ".") + rx = re.compile("^" + regex_pat + "$") + for n in norm_names: + if rx.match(n): + matched.append(n) + if len(matched) >= limit: + break + else: + for n in norm_names: + if fnmatch.fnmatchcase(n, pattern): + matched.append(n) + if len(matched) >= limit: + break + return matched + + +def _read_member_bytes(tf: tarfile.TarFile, ti: tarfile.TarInfo, max_bytes: int) -> bytes: + f = tf.extractfile(ti) + if f is None: + return b"" + data = f.read(max_bytes + 1) + return data + + +def _safe_decode(data: bytes) -> str: + return data.decode("utf-8", errors="replace") + + +# ---------- Tool implementations ---------- + +def tool_list_paths(cache: FragmentCache, args: ListPathsArgs) -> dict: + names = cache.member_names(args.fragment) + hits = _glob_paths(names, args.pattern, args.max) + return {"fragment": args.fragment, "pattern": args.pattern, "count": len(hits), "paths": hits} + + +def tool_read_file(cache: FragmentCache, args: ReadFileArgs) -> dict: + tf = cache.tar(args.fragment) + ti = _resolve_member(tf, args.path) + if ti is None: + return {"fragment": args.fragment, "path": args.path, "error": "not found"} + if ti.issym() or ti.islnk(): + return { + "fragment": args.fragment, "path": args.path, + "symlink_to": ti.linkname, "size": 0, "truncated": False, "content": "", + } + if not ti.isreg(): + return {"fragment": args.fragment, "path": args.path, "error": f"not a regular file (type={ti.type!r})"} + data = _read_member_bytes(tf, ti, args.max_bytes) + truncated = len(data) > args.max_bytes + data = data[: args.max_bytes] + return { + "fragment": args.fragment, "path": args.path, + "size": ti.size, "mode": oct(ti.mode), "truncated": truncated, + "content": _safe_decode(data), + } + + +def tool_grep(cache: FragmentCache, args: GrepArgs) -> dict: + tf = cache.tar(args.fragment) + names = cache.member_names(args.fragment) + try: + rx = re.compile(args.pattern) + except re.error as e: + return {"error": f"bad regex: {e}"} + candidate_paths = _glob_paths(names, args.path_glob, limit=500) + hits: list[dict] = [] + for p in candidate_paths: + if len(hits) >= args.max_hits: + break + ti = _resolve_member(tf, p) + if ti is None or not ti.isreg(): + continue + if ti.size > 256 * 1024: + continue + data = _read_member_bytes(tf, ti, 256 * 1024) + try: + text = data.decode("utf-8", errors="strict") + except UnicodeDecodeError: + continue + for i, line in enumerate(text.splitlines(), 1): + if rx.search(line): + hits.append({"path": p, "line_no": i, "line": line[:240]}) + if len(hits) >= args.max_hits: + break + return { + "fragment": args.fragment, "pattern": args.pattern, + "path_glob": args.path_glob, "count": len(hits), "hits": hits, + } + + +_STRINGS_RE = re.compile(rb"[\x20-\x7e]{%d,}") + + +def tool_strings(cache: FragmentCache, args: StringsArgs) -> dict: + tf = cache.tar(args.fragment) + ti = _resolve_member(tf, args.path) + if ti is None: + return {"error": "not found", "fragment": args.fragment, "path": args.path} + if not ti.isreg(): + return {"error": "not a regular file", "fragment": args.fragment, "path": args.path} + rx = re.compile(rb"[\x20-\x7e]{%d,}" % args.min_len) + data = _read_member_bytes(tf, ti, 2 * 1024 * 1024) + hits = [] + for m in rx.finditer(data): + s = m.group(0).decode("ascii", errors="replace") + # Bias toward strings that look like paths or mount-related tokens. + if "/" in s or any(tok in s for tok in ("mount", "/dev/", "/etc/", "/var/", "/usr/", "/opt/", "fstab", ".sh")): + hits.append(s) + if len(hits) >= args.max_hits: + break + return { + "fragment": args.fragment, "path": args.path, + "count": len(hits), "strings": hits, + } + + +def tool_find_dangling_symlinks(cache: FragmentCache, args: FragmentArgs) -> dict: + tf = cache.tar(args.fragment) + names_set = set(_normalize(n) for n in cache.member_names(args.fragment)) + hits = [] + for ti in tf.getmembers(): + if not (ti.issym() or ti.islnk()): + continue + target = ti.linkname + if not target.startswith("/"): + continue + rel = target.lstrip("/") + if rel not in names_set: + hits.append({"link": _normalize(ti.name), "target": target}) + if len(hits) >= args.max: + break + return {"fragment": args.fragment, "count": len(hits), "dangling": hits} + + +_KEY_CHECKS = { + "has_etc_passwd": "etc/passwd", + "has_sbin_init": "sbin/init", + "has_bin_sh": "bin/sh", + "has_lib_ld": "lib", # checked specially below + "has_etc_fstab": "etc/fstab", + "has_etc_inittab": "etc/inittab", + "has_etc_init_d_rcS": "etc/init.d/rcS", +} + + +def tool_fs_summary(cache: FragmentCache, args: FragmentOnlyArgs) -> dict: + tf = cache.tar(args.fragment) + names = cache.member_names(args.fragment) + norm = [_normalize(n) for n in names] + norm_set = set(norm) + result: dict[str, Any] = {"fragment": args.fragment} + for k, p in _KEY_CHECKS.items(): + if k == "has_lib_ld": + result[k] = any(n.startswith("lib/ld-") or n.startswith("lib/ld.") or n == "lib/ld-linux.so" for n in norm) + else: + result[k] = p in norm_set or any(n == p for n in norm) + # top-level directory counts + counts: dict[str, int] = {} + for n in norm: + head = n.split("/", 1)[0] if "/" in n else n + counts[head] = counts.get(head, 0) + 1 + top_dirs = sorted(counts.items(), key=lambda x: -x[1])[:12] + info = cache.info(args.fragment) + result["top_dirs"] = [{"name": d, "count": c} for d, c in top_dirs] + result["extractor"] = info.extractor + result["index"] = info.index + result["compressed_size"] = info.size + # Manifest-derived provenance (present when this came from the shard step). + if info.fs_type_guess is not None: + result["fs_type_guess"] = info.fs_type_guess + if info.root_path is not None: + result["root_path"] = info.root_path + if info.shard_score is not None: + result["shard_score"] = info.shard_score + if info.file_count is not None: + result["manifest_file_count"] = info.file_count + if info.reextracted_with is not None: + result["reextracted_with"] = info.reextracted_with + return result + + +# ---------- Registry ---------- + +@dataclass +class Tool: + name: str + description: str + args_model: type[BaseModel] + fn: Callable[[FragmentCache, BaseModel], dict] + + +TOOLS: list[Tool] = [ + Tool( + name="list_paths", + description=( + "List member paths in a fragment matching a glob pattern. " + "Use to check whether key files exist (e.g. pattern='etc/fstab', 'sbin/init', '**/rcS')." + ), + args_model=ListPathsArgs, + fn=tool_list_paths, + ), + Tool( + name="read_file", + description=( + "Read a small text file from a fragment (up to max_bytes). " + "Use for /etc/fstab, /etc/init.d/rcS, /etc/inittab, /etc/rc.local, /etc/profile." + ), + args_model=ReadFileArgs, + fn=tool_read_file, + ), + Tool( + name="grep_in_fragment", + description=( + "Run a regex across small text files in a fragment under path_glob. " + "Use to find 'mount' commands or hardcoded mount paths in init scripts." + ), + args_model=GrepArgs, + fn=tool_grep, + ), + Tool( + name="strings_of", + description=( + "Extract printable strings biased toward paths from a binary inside a fragment. " + "Use on /sbin/init, /bin/busybox, or service binaries when init scripts are unhelpful." + ), + args_model=StringsArgs, + fn=tool_strings, + ), + Tool( + name="find_dangling_symlinks", + description=( + "List absolute symlinks in a fragment whose target does not exist inside the same fragment. " + "Strongest cross-fragment-dependency signal." + ), + args_model=FragmentArgs, + fn=tool_find_dangling_symlinks, + ), + Tool( + name="fs_summary", + description=( + "Summarize a fragment: presence of key root-fs files plus top-level directory counts." + ), + args_model=FragmentOnlyArgs, + fn=tool_fs_summary, + ), +] + +TOOLS_BY_NAME: dict[str, Tool] = {t.name: t for t in TOOLS} + + +def to_openai_schemas(plan_schema: dict) -> list[dict]: + """Return the OpenAI 'tools' array including submit_plan.""" + out: list[dict] = [] + for t in TOOLS: + out.append({ + "type": "function", + "function": { + "name": t.name, + "description": t.description, + "parameters": t.args_model.model_json_schema(), + }, + }) + out.append({ + "type": "function", + "function": { + "name": "submit_plan", + "description": ( + "Submit your final StitchPlan. The harness will validate it and the loop ends on success." + ), + "parameters": plan_schema, + }, + }) + return out