diff --git a/.github/workflows/test_schema.yaml b/.github/workflows/test_schema.yaml index 71fc66eb3..2fe280b15 100644 --- a/.github/workflows/test_schema.yaml +++ b/.github/workflows/test_schema.yaml @@ -11,7 +11,7 @@ jobs: steps: - uses: actions/checkout@v4 - - run: pip install pydantic pydantic-partial pyyaml yamlcore + - run: pip install pydantic pyyaml yamlcore - run: python3 src/penguin/penguin_config/gen_docs.py docs > schema_doc.md # Ensure generated schema_doc.md matches the one in the repo at docs/schema_doc.md - run: | diff --git a/Dockerfile b/Dockerfile index 14ed4dc7f..75059c24d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ ARG BASE_IMAGE="${REGISTRY}/ubuntu:22.04" ARG VPN_VERSION="1.0.25" ARG BUSYBOX_VERSION="0.0.15" ARG LINUX_VERSION="3.5.14-beta" -ARG IGLOO_DRIVER_VERSION="0.0.21" +ARG IGLOO_DRIVER_VERSION="0.0.25" ARG LIBNVRAM_VERSION="0.0.23" ARG CONSOLE_VERSION="1.0.7" ARG GUESTHOPPER_VERSION="1.0.20" @@ -243,7 +243,6 @@ RUN --mount=type=cache,target=/root/.cache/pip \ lz4 \ openai \ pydantic \ - pydantic-partial \ pyelftools \ pyyaml \ pyvis \ diff --git a/docs/schema_doc.md b/docs/schema_doc.md index d089039f7..f54303164 100644 --- a/docs/schema_doc.md +++ b/docs/schema_doc.md @@ -317,8 +317,7 @@ true ||| |-|-| -|__Type__|string| -|__Patch merge behavior__|Concatenate strings separated by `' '`| +|__Type__|string or null| |__Default__|`null`| A list of additional QEMU command-line arguments to use when booting the guest @@ -853,7 +852,7 @@ NVRAM values to add to the guest ||| |-|-| |__Type__|list of string| -|__Default__|`[]`| +|__Default__|`null`| Names for guest network interfaces @@ -889,7 +888,7 @@ Value of the U-Boot environment variable ||| |-|-| |__Type__|list of integer| -|__Default__|`[]`| +|__Default__|`null`| Signals numbers to block within the guest. Supported values are 6 (SIGABRT), 9 (SIGKILL), 15 (SIGTERM), and 17 (SIGCHLD). @@ -935,8 +934,7 @@ nvram_init ||| |-|-| -|__Type__|string| -|__Patch merge behavior__|Concatenate strings separated by `'\n'`| +|__Type__|string or null| |__Default__|`null`| Custom source code for library functions to intercept and model @@ -1243,7 +1241,7 @@ thumb ||| |-|-| -|__Type__|string or null| +|__Type__|string| |__Default__|`null`| @@ -1266,10 +1264,6 @@ Whether to enable this plugin (default depends on plugin) ## `network` Network Configuration -||| -|-|-| -|__Default__|`null`| - Configuration for networks to attach to guest ### `network.external` Set up NAT for outgoing connections diff --git a/pyplugins/analysis/proctree.py b/pyplugins/analysis/proctree.py new file mode 100644 index 000000000..8624689d5 --- /dev/null +++ b/pyplugins/analysis/proctree.py @@ -0,0 +1,411 @@ +import os +import json +from collections import defaultdict +from penguin import plugins, Plugin, getColoredLogger + + +class Proctree(Plugin): + """ + Tracks process execution events and builds a process tree. + Each process is uniquely identified by (pid, start_time, exec_num). + Supports configurable output formats and live updating. + """ + + def __init__(self): + self.outdir = self.get_arg("outdir") + self.logger = getColoredLogger("plugins.proctree") + self.procs = {} # (pid, start_time, exec_num) -> procinfo dict + self.children = defaultdict(list) # (ppid, pstart, pexec) -> list of (pid, start_time, exec_num) + self.exec_counters = defaultdict(int) # (pid, start_time) -> next exec_num + + # New options + self.output_types = self.get_arg("output_types") or ["text", "json"] + self.live_update = self.get_arg("live_update") or False + # self.live_update = True + self.output_types = ["text", "csv", "json"] + + plugins.subscribe(plugins.Execs, "exec_event", self.on_exec_event) + + # @plugins.syscalls.syscall("on_sys_exit_enter") + def on_exit_event(self, pt_regs, proto, syscall, error_code): + proc = yield from plugins.OSI.get_proc() + pid = proc.pid + start_time = proc.create_time + candidates = [k for k in self.procs if k[0] == pid and k[1] == start_time] + if candidates: + latest = max(candidates, key=lambda k: k[2]) + self.procs[latest]["exit_reason"] = f"exit({error_code})" + # self._write_exit_livelog(latest, error_code) + else: + self.logger.warning(f"Exit event for unknown process pid={pid} start_time={start_time}") + + # @plugins.syscalls.syscall("on_sys_exit_group_enter") + def on_exit_group_event(self, pt_regs, proto, syscall, error_code): + proc = yield from plugins.OSI.get_proc() + tgid = proc.tgid + pid = proc.pid + start_time = proc.create_time + # First, set exit_reason for the latest exec of the current process + candidates = [k for k in self.procs if k[0] == pid and k[1] == start_time] + if candidates: + latest = max(candidates, key=lambda k: k[2]) + self.procs[latest]["exit_reason"] = f"{error_code}" + else: + breakpoint() + self.logger.warning(f"Exit group event for unknown process pid={pid} start_time={start_time}") + # Then, for all other processes with matching tgid and start_time, set exit_reason if not already set and pid != current pid + updated = False + for k, info in self.procs.items(): + if info["tgid"] == tgid and info["start_time"] == start_time and info["pid"] != pid and "exit_reason" not in info: + info["exit_reason"] = f"exit_group({error_code}) (group member)" + updated = True + if not updated: + self.logger.debug(f"No additional group members found for exit_group tgid={tgid} start_time={start_time}") + + def _get_next_exec_num(self, pid, start_time): + key = (pid, start_time) + num = self.exec_counters[key] + self.exec_counters[key] += 1 + return num + + def _add_process(self, pid, tgid, ppid, start_time, parent_start_time, parent_exec_num, procname, argv, exec_num=None): + self.logger.info(f"Adding process PID={pid} PPID={ppid} START={start_time} NAME={procname}") + if exec_num is None: + exec_num = self._get_next_exec_num(pid, start_time) + reexec_time = None + exit_reason = None + if exec_num > 0: + reexec_time = yield from plugins.OSI.read_time() + exit_reason = "re-exec" + proc_id = (pid, start_time, exec_num) + self.logger.info(f"Registering process PID={pid} PPID={ppid} START={start_time} EXEC={exec_num} NAME={procname}") + if proc_id in self.procs: + self.logger.warning(f"Process PID={pid} START={start_time} EXEC={exec_num} already registered") + self.procs[proc_id] = { + "pid": pid, + "tgid": tgid, + "ppid": ppid, + "parent_start_time": parent_start_time, + "parent_exec_num": parent_exec_num, + "start_time": start_time, + "exec_num": exec_num, + "reexec_time": reexec_time, + "procname": procname, + "argv": argv, + "exit_reason": exit_reason, + } + # Incrementally update children mapping + parent_id = (ppid, parent_start_time, parent_exec_num) + self.children[parent_id].append(proc_id) + self.children[parent_id].sort(key=lambda cid: ( + self.procs[cid]["start_time"], + self.procs[cid]["pid"], + self.procs[cid]["exec_num"] + )) + return proc_id + + def on_exec_event(self, event): + yield from gbreak() + parent = event.get("parent") + # Find parent's exec_num if possible + parent_candidates = [ + k for k in self.procs + if k[0] == parent.pid and k[1] == parent.start_time + ] + parent_exec_num = max([k[2] for k in parent_candidates], default=0) + exec_num = self._get_next_exec_num(event["proc"].pid, event["proc"].start_time) + yield from self._add_process( + event["proc"].pid, + event["proc"].tgid, + event["proc"].ppid, + event["proc"].start_time, + parent.start_time, + parent_exec_num, + event["procname"], + event["argv"], + exec_num=exec_num + ) + parent_id = (parent.pid, parent.start_time, parent_exec_num) + if parent_id not in self.procs: + args = yield from plugins.OSI.get_args(parent.pid) + parent_parent = yield from plugins.OSI.get_proc(parent.ppid) + parent_parent_start_time = parent_parent.start_time if parent_parent else None + # Find parent's parent exec_num if possible + parent_parent_exec_num = 0 + if parent_parent: + parent_parent_candidates = [ + k for k in self.procs + if k[0] == parent.ppid and k[1] == parent_parent_start_time + ] + parent_parent_exec_num = max([k[2] for k in parent_parent_candidates], default=0) + yield from self._add_process( + parent.pid, + parent.tgid, + parent.ppid, + parent.start_time, + parent_parent_start_time, + parent_parent_exec_num, + getattr(parent, "name", None), + args, + exec_num=parent_exec_num + ) + # Live update logic + if self.live_update: + # self._write_outputs() + self._write_livelog_tree(event["proc"].pid, event["proc"].start_time, exec_num) + + def _single_pstree_text(self, proc_id, prefix="", is_last=True): + """Render a tree-style line for a single process and its parent chain.""" + info = self.procs.get(proc_id) + if not info: + return [] + name = info["procname"] or str(info["pid"]) + pid = info["pid"] + exec_num = info["exec_num"] + if exec_num > 0: + start_time = info["reexec_time"] + else: + start_time = info["start_time"] + start_time_sec = start_time / 1e9 if start_time is not None else "?" + argv = " ".join(info["argv"]) if info.get("argv") else "" + exec_nums = [k[2] for k in self.procs if k[0] == pid and k[1] == info["start_time"]] + if len(exec_nums) == 1: + pid_str = f"{pid}" + else: + pid_str = f"{pid}/{exec_num}" + # Add exit reason if present + exit_reason = info.get("exit_reason") + exit_str = f" [{exit_reason}]" if exit_reason else "" + line = prefix + if prefix: + line += "└─" if is_last else "├─" + line += f"{name} ({pid_str}) [t={start_time_sec}] [{argv}]{exit_str}" + # Find parent + parent_id = (info["ppid"], info["parent_start_time"], info["parent_exec_num"]) + if parent_id in self.procs: + # Recursively build parent chain + parent_lines = self._single_pstree_text(parent_id, prefix + (" " if is_last else "│ "), True) + return parent_lines + [line] + else: + return [line] + + def _write_livelog_tree(self, pid, start_time, exec_num): + """Append tree-style line for the new process and its parent chain to livelog.txt.""" + proc_id = (pid, start_time, exec_num) + lines = self._single_pstree_text(proc_id) + if not os.path.exists(self.outdir): + os.makedirs(self.outdir) + with open(f"{self.outdir}/livelog.txt", "a") as f: + for line in lines: + f.write(line + "\n") + + def _write_livelog(self, event, parent, exec_num, parent_exec_num): + """Append new process and its ancestor info up to the root to livelog.txt in live mode.""" + if not os.path.exists(self.outdir): + os.makedirs(self.outdir) + proc = event["proc"] + procname = event["procname"] + argv = " ".join(event["argv"]) if event.get("argv") else "" + # Build ancestor chain + ancestor_lines = [] + current_parent = parent + current_exec_num = parent_exec_num + while current_parent is not None: + parentname = getattr(current_parent, "name", None) + parent_pid = current_parent.pid + parent_start = current_parent.start_time + ancestor_lines.append( + f"parent={parentname} (pid={parent_pid}, exec={current_exec_num}, start={parent_start})" + ) + # Find next ancestor in procs if available + parent_id = (parent_pid, parent_start, current_exec_num) + parent_info = self.procs.get(parent_id) + if parent_info: + next_pid = parent_info["ppid"] + next_start = parent_info["parent_start_time"] + next_exec = parent_info["parent_exec_num"] + # Avoid infinite loop if parent points to itself or missing + if (next_pid, next_start, next_exec) == parent_id or next_pid is None: + break + # Try to get next ancestor from procs + next_parent_info = None + for k, v in self.procs.items(): + if v["pid"] == next_pid and v["start_time"] == next_start and v["exec_num"] == next_exec: + next_parent_info = v + break + if next_parent_info: + # Create a dummy object with .pid, .start_time, .name + class DummyParent: + pass + dummy = DummyParent() + dummy.pid = next_parent_info["pid"] + dummy.start_time = next_parent_info["start_time"] + dummy.name = next_parent_info.get("procname") + current_parent = dummy + current_exec_num = next_parent_info["exec_num"] + continue + break + # Compose log line + line = ( + f"NEW: {procname} (pid={proc.pid}, exec={exec_num}, start={proc.start_time}) " + f"argv=[{argv}] " + + " <- ".join(ancestor_lines) + ) + with open(f"{self.outdir}/livelog.txt", "a") as f: + f.write(line + "\n") + + def _find_roots(self): + roots = [] + for proc_id, info in self.procs.items(): + ppid = info["ppid"] + parent_start_time = info["parent_start_time"] + parent_exec_num = info["parent_exec_num"] + parent_id = (ppid, parent_start_time, parent_exec_num) + if parent_id not in self.procs: + roots.append(proc_id) + roots.sort(key=lambda cid: (self.procs[cid]["start_time"], self.procs[cid]["pid"], self.procs[cid]["exec_num"])) + return roots + + def _group_children(self, children): + # Group children by procname, count, and collect their ids + name_map = defaultdict(list) + for cid in children: + name = self.procs[cid]["procname"] or "?" + name_map[name].append(cid) + return name_map + + def _pstree_text(self, proc_id, prefix="", is_last=True): + info = self.procs[proc_id] + name = info["procname"] or str(info["pid"]) + pid = info["pid"] + exec_num = info["exec_num"] + if exec_num > 0: + start_time = info["reexec_time"] + else: + start_time = info["start_time"] + # Convert start_time from time64 (nanoseconds) to seconds + start_time_sec = start_time / 1e9 if start_time is not None else "?" + argv = " ".join(info["argv"]) if info.get("argv") else "" + exec_nums = [k[2] for k in self.procs if k[0] == pid and k[1] == start_time] + if len(exec_nums) == 1: + pid_str = f"{pid}" + else: + pid_str = f"{pid}/{exec_num}" + line = prefix + if prefix: + line += "└─" if is_last else "├─" + # Show timeline in seconds + line += f"{name} ({pid_str}) [t={start_time_sec}] [{argv}]" + children = self.children.get(proc_id, []) + children = sorted(children, key=lambda cid: ( + self.procs[cid]["start_time"], + self.procs[cid]["pid"], + self.procs[cid]["exec_num"] + )) + name_map = self._group_children(children) + result = [line] + child_items = list(name_map.items()) + for idx, (cname, cids) in enumerate(child_items): + is_last_group = idx == len(child_items) - 1 + for j, cid in enumerate(cids): + sub_prefix = prefix + (" " if is_last else "│ ") + result += self._pstree_text(cid, sub_prefix, is_last_group and j == len(cids) - 1) + return result + + def _pstree_json(self, proc_id): + info = self.procs[proc_id] + name = info["procname"] or str(info["pid"]) + pid = info["pid"] + exec_num = info["exec_num"] + argv = info["argv"] if info.get("argv") else [] + start_time = info["start_time"] + ppid = info["ppid"] + parent_start_time = info["parent_start_time"] + parent_exec_num = info["parent_exec_num"] + node = { + "name": name, + "pid": pid, + "exec_num": exec_num, + "argv": argv, + "start_time": start_time, + "ppid": ppid, + "parent_start_time": parent_start_time, + "parent_exec_num": parent_exec_num, + "reexec_time": info["reexec_time"], + "children": [] + } + children = self.children.get(proc_id, []) + # Ensure children are sorted by (start_time, pid, exec_num) + children = sorted( + children, + key=lambda cid: ( + self.procs[cid]["start_time"], + self.procs[cid]["pid"], + self.procs[cid]["exec_num"] + ) + ) + for cid in children: + node["children"].append(self._pstree_json(cid)) + return node + + def _pstree_csv(self, proc_id, parent_chain=None, rows=None): + if rows is None: + rows = [] + if parent_chain is None: + parent_chain = [] + info = self.procs[proc_id] + name = info["procname"] or str(info["pid"]) + pid = info["pid"] + exec_num = info["exec_num"] + argv = " ".join(info["argv"]) if info.get("argv") else "" + exec_nums = [k[2] for k in self.procs if k[0] == pid and k[1] == info["start_time"]] + if len(exec_nums) == 1: + pid_str = f"{pid}" + else: + pid_str = f"{pid}/{exec_num}" + row = parent_chain + [name, pid_str, argv] + rows.append(row) + children = self.children.get(proc_id, []) + children = sorted(children, key=lambda cid: ( + self.procs[cid]["start_time"], + self.procs[cid]["pid"], + self.procs[cid]["exec_num"] + )) + for cid in children: + self._pstree_csv(cid, row, rows) + return rows + + def _write_outputs(self): + self.logger.info("Writing process tree outputs...") + """Write all requested output types.""" + if not os.path.exists(self.outdir): + os.makedirs(self.outdir) + roots = self._find_roots() + if "text" in self.output_types: + lines = [] + for i, root in enumerate(roots): + lines += self._pstree_text(root, "", i == len(roots) - 1) + with open(f"{self.outdir}/proctree.txt", "w") as f: + for line in lines: + f.write(line + "\n") + if "csv" in self.output_types: + rows = [] + for root in roots: + rows += self._pstree_csv(root) + maxlen = max(len(r) for r in rows) if rows else 0 + with open(f"{self.outdir}/proctree.csv", "w") as f: + for row in rows: + f.write(",".join(row + [""] * (maxlen - len(row))) + "\n") + if "json" in self.output_types: + forest = [self._pstree_json(root) for root in roots] + with open(f"{self.outdir}/proctree.json", "w") as f: + json.dump(forest, f, indent=2) + + def dump_tree(self, fmt=None): + # If fmt is specified, only write that format; else write all requested types + if fmt: + self.output_types = [fmt] + self._write_outputs() + + def uninit(self): + self._write_outputs() diff --git a/pyplugins/apis/osi.py b/pyplugins/apis/osi.py index 7c2c9a056..ae2d75ffa 100644 --- a/pyplugins/apis/osi.py +++ b/pyplugins/apis/osi.py @@ -567,3 +567,18 @@ def get_mapping_by_addr(self, addr: int) -> Generator[Any, None, Optional[Mappin return mapping else: self.logger.debug(f"No mapping found for addr={addr:#x}") + + def read_time(self) -> Generator[Any, None, int]: + """ + Read the current time from the guest OS. + + This will return nanoseconds since boot (u64). + + Returns + ------- + int + Current time in seconds since epoch. + """ + self.logger.debug("read_time called") + t = yield PortalCmd(hop.HYPER_OP_READ_TIME, 0, 0) + return t \ No newline at end of file diff --git a/src/penguin/common.py b/src/penguin/common.py index bbf17d133..ca3f898a3 100644 --- a/src/penguin/common.py +++ b/src/penguin/common.py @@ -1,10 +1,11 @@ import hashlib import logging import re +from pathlib import Path import coloredlogs import yaml from os.path import join, isfile -from yamlcore import CoreDumper +from yamlcore import CoreLoader, CoreDumper # Hex integers @@ -51,87 +52,44 @@ def hash_yaml(section_to_hash): return hash_digest -def patch_config(logger, base_config, patch): +def patch_config(base_config, patch): + # Merge configs. + def _recursive_update(base, new): + for k, v in new.items(): + if isinstance(v, dict): + base[k] = _recursive_update(base.get(k, {}), v) + elif isinstance(v, list): + # Append + base[k] = base.get(k, []) + v + else: + base[k] = v + return base + + if issubclass(type(patch), Path): + with open(patch, "r") as f: + patch = yaml.load(f, Loader=CoreLoader) if not patch: # Empty patch, possibly an empty file or one with all comments return base_config - - # Merge configs. - def _recursive_update(base, new, config_option): - if base is None: - return new - if new is None: - return base - - # assert type(base) is type(new) - - if hasattr(base, "merge"): - return base.merge(new) - - if hasattr(base, "model_fields_set"): - result = dict() - for base_key in base.model_fields_set: - result[base_key] = getattr(base, base_key) - if base.model_extra is not None: - for base_key, base_value in base.model_extra.items(): - result[base_key] = base_value - for new_key in new.model_fields_set: - new_value = getattr(new, new_key) - if new_key in result: - result[new_key] = _recursive_update( - result[new_key], - new_value, - f"{config_option}.{new_key}" if config_option else new_key, - ) - else: - result[new_key] = new_value - if new.model_extra is not None: - for new_key, new_value in new.model_extra.items(): - if new_key in result: - result[new_key] = _recursive_update( - result[new_key], - new_value, - f"{config_option}.{new_key}" if config_option else new_key, - ) - else: - result[new_key] = new_value - return type(base)(**result) - - if isinstance(base, list): - return base + new - - if isinstance(base, dict): - result = dict() - for key, base_value in base.items(): - if key in new: - new_value = new[key] - result[key] = _recursive_update( - base_value, - new_value, - f"{config_option}.{key}" if config_option else key, - ) - else: - result[key] = base_value - for new_key, new_value in new.items(): - if new_key not in base: - result[new_key] = new_value - return result - - if base == new: - return base - - base_str = yaml.dump(base).strip().removesuffix("...").strip() - new_str = yaml.dump(new).strip().removesuffix("...").strip() - change_str = ( - f"\n```\n{base_str}\n```↓\n```\n{new_str}\n```" - if "\n" in base_str + new_str - else f"`{base_str}` → `{new_str}`" - ) - logger.warning(f"patch conflict: {config_option}: {change_str}") - - return new - - return _recursive_update(base_config, patch, None) + for key, value in patch.items(): + # Check if the key already exists in the base_config + if key in base_config: + # If the value is a dictionary, update subfields + if isinstance(value, dict): + # Recursive update to handle nested dictionaries + base_config[key] = _recursive_update(base_config.get(key, {}), value) + elif isinstance(value, list): + # Merge lists + seen = set() + combined = base_config[key] + value + base_config[key] = [x for x in combined if not (x in seen or seen.add(x))] + else: + # Replace the base value with the incoming value + base_config[key] = value + else: + # New key, add all data directly + base_config[key] = value + return base_config class PathHighlightingFormatter(coloredlogs.ColoredFormatter): diff --git a/src/penguin/genetic.py b/src/penguin/genetic.py index d84cd2378..b077380d5 100755 --- a/src/penguin/genetic.py +++ b/src/penguin/genetic.py @@ -509,7 +509,7 @@ def get_patched_config(self, config: ConfigChromosome): for p in patched_config.get("patches", []): # kinda funny how the names wound up... p_yaml = load_unpatched_config(os.path.join(self.proj_dir, p)) - patched_config = patch_config(self.logger, patched_config, p_yaml) + patched_config = patch_config(patched_config, p_yaml) return patched_config def run_config(self, config: ConfigChromosome, run_index: int) -> Tuple[List[Failure], float]: diff --git a/src/penguin/penguin_config/__init__.py b/src/penguin/penguin_config/__init__.py index 25de17ab4..a3ac9fe79 100644 --- a/src/penguin/penguin_config/__init__.py +++ b/src/penguin/penguin_config/__init__.py @@ -192,31 +192,26 @@ def load_config(proj_dir, path, validate=True, resolved_kernel=None): """Load penguin config from path""" with open(path, "r") as f: config = yaml.load(f, Loader=CoreLoader) - config = structure.Patch(**config) # look for files called patch_*.yaml in the same directory as the config file - if config.core.auto_patching: + if config["core"].get("auto_patching", False) is True: patch_files = list(Path(proj_dir).glob("patch_*.yaml")) patches_dir = Path(proj_dir, "patches") if patches_dir.exists(): patch_files += list(patches_dir.glob("*.yaml")) if patch_files: - if config.patches.root is None: - config.patches.root = [] + if config.get("patches", None) is None: + config["patches"] = [] for patch_file in patch_files: - config.patches.root.append(str(patch_file)) - if config.patches.root is not None: - patch_list = config.patches.root + config["patches"].append(str(patch_file)) + if config.get("patches", None) is not None: + patch_list = config["patches"] for patch in patch_list: # patches are loaded relative to the main config file patch_relocated = Path(proj_dir, patch) if patch_relocated.exists(): # TODO: If we're missing a patch we should warn, but this happens 3-4x # and that's too verbose. - with open(patch_relocated, "r") as f: - patch = yaml.load(f, Loader=CoreLoader) - patch = structure.Patch(**patch) - config = patch_config(logger, config, patch) - config = config.model_dump() + config = patch_config(config, patch_relocated) if config["core"].get("guest_cmd", False) is True: config["static_files"]["/igloo/utils/guesthopper"] = dict( type="host_file", diff --git a/src/penguin/penguin_config/gen_docs.py b/src/penguin/penguin_config/gen_docs.py index 7dff7ed01..3ea42887a 100644 --- a/src/penguin/penguin_config/gen_docs.py +++ b/src/penguin/penguin_config/gen_docs.py @@ -77,8 +77,6 @@ def gen_docs_field(path, docs_field, include_type=True): out += "|-|-|\n" if include_type: out += f"|__Type__|{gen_docs_type_name(docs_field.type_)}|\n" - if docs_field.merge_behavior is not None: - out += f"|__Patch merge behavior__|{docs_field.merge_behavior}|\n" if include_docs: out += f"|__Default__|`{gen_docs_yaml_dump(docs_field.default)}`|\n" out += "\n" @@ -98,7 +96,6 @@ class DocsField: """Information about a field of the config, for generating docs""" type_: type - merge_behavior: Optional[str] title: Optional[str] description: Optional[str] default: Union[PydanticUndefinedType, Any] @@ -117,11 +114,7 @@ def from_type(type_: type) -> "DocsField": if hasattr(type_, "model_config"): # Inherits BaseModel or RootModel - try: - merge_behavior = type_.merge_behavior() - except AttributeError: - merge_behavior = None - title = type_.model_config.get("title") + title = type_.model_config["title"] description = type_.__doc__ try: default = type_.model_config["default"] @@ -133,17 +126,16 @@ def from_type(type_: type) -> "DocsField": examples = [] else: # Doesn't inherit BaseModel or RootModel, so make all values empty - merge_behavior = title = description = None + title = description = None default = PydanticUndefined examples = [] - return DocsField(type_, merge_behavior, title, description, default, examples) + return DocsField(type_, title, description, default, examples) def from_field(field) -> "DocsField": """Create a `DocsField` from a Pydantic `Field`""" return DocsField( field.annotation, - None, field.title, field.description, field.default, @@ -156,15 +148,10 @@ def merge(self, other: "DocsField") -> "DocsField": """ return DocsField( self.type_, - self.merge_behavior or other.merge_behavior, self.title or other.title, self.description or other.description, other.default if self.default is PydanticUndefined else self.default, - ( - self.examples - if self.examples == other.examples - else self.examples + other.examples - ), + self.examples + other.examples, ) @@ -246,7 +233,7 @@ def gen_docs(path=[], docs_field=DocsField.from_type(structure.Main)): # The type is `Optional[T]`. Try again with just `T`. out += gen_docs( path=path, - docs_field=DocsField.from_type(first_model_arg).merge(docs_field), + docs_field=DocsField.from_type(first_model_arg), ) else: # The type does not inherit from `BaseModel` and it doesn't have an argument that does. diff --git a/src/penguin/penguin_config/structure.py b/src/penguin/penguin_config/structure.py index e73c20785..c12ba4f2b 100644 --- a/src/penguin/penguin_config/structure.py +++ b/src/penguin/penguin_config/structure.py @@ -1,7 +1,6 @@ -from typing import Annotated, Dict, List, Literal, Optional, Union, ClassVar +from typing import Annotated, Dict, List, Literal, Optional, Union from pydantic import BaseModel, Field, RootModel from pydantic.config import ConfigDict -from pydantic_partial import PartialModelMixin, create_partial_model ''' We cannot import anything from penguin here as its used to generate the schema @@ -12,26 +11,6 @@ ENV_MAGIC_VAL = "DYNVALDYNVALDYNVAL" -class StrSep(RootModel): - root: str - separator: ClassVar = None - - @classmethod - def merge_behavior(cls): - return f"Concatenate strings separated by `{repr(cls.separator)}`" - - def merge(self, other): - return self.root + self.separator + other.root - - -class StrLines(StrSep): - separator = "\n" - - -class StrSepSpace(StrSep): - separator = " " - - def _newtype(class_name, type_, title, description=None, default=None, examples=None): return type( class_name, @@ -51,7 +30,7 @@ def _newtype(class_name, type_, title, description=None, default=None, examples= def _variant(discrim_val, title, description, discrim_key, discrim_title, fields): return type( discrim_val, - (PartialModelMixin, BaseModel), + (BaseModel,), dict( model_config=ConfigDict(title=title, extra="forbid"), __doc__=description, @@ -94,7 +73,7 @@ def _union(class_name, title, description, discrim_key, discrim_title, variants) ) -class Core(PartialModelMixin, BaseModel): +class Core(BaseModel): """Core configuration options for this rehosting""" model_config = ConfigDict(title="Core configuration options", extra="forbid") @@ -248,7 +227,7 @@ class Core(PartialModelMixin, BaseModel): ), ] extra_qemu_args: Annotated[ - Optional[StrSepSpace], + Optional[str], Field( None, title="Extra QEMU arguments", @@ -321,14 +300,14 @@ class Core(PartialModelMixin, BaseModel): ], ) NetDevs = Field( - default=[], + default=None, title="Network devices", description="Names for guest network interfaces", examples=[["eth0", "eth1"], ["ens33", "wlp3s0"]], ) BlockedSignalsField = Field( - default=[], + default=None, title="List of blocked signals", description="Signals numbers to block within the guest. Supported values are 6 (SIGABRT), 9 (SIGKILL), 15 (SIGTERM), and 17 (SIGCHLD).", example=[[9], [9, 15]], @@ -530,7 +509,7 @@ class Core(PartialModelMixin, BaseModel): ) -class Pseudofile(PartialModelMixin, BaseModel): +class Pseudofile(BaseModel): """How to emulate a device file""" model_config = ConfigDict(title="File emulation spec", extra="forbid") @@ -621,7 +600,7 @@ class Pseudofile(PartialModelMixin, BaseModel): ) -class LibInject(PartialModelMixin, BaseModel): +class LibInject(BaseModel): """Library functions to be intercepted""" model_config = ConfigDict(title="Injected library configuration", extra="forbid") @@ -636,7 +615,7 @@ class LibInject(PartialModelMixin, BaseModel): ] extra: Annotated[ - Optional[StrLines], + Optional[str], Field( None, title="Extra injected library code", @@ -817,11 +796,11 @@ class StaticFiles(RootModel): ) -class Plugin(PartialModelMixin, BaseModel): +class Plugin(BaseModel): model_config = ConfigDict(title="Plugin", extra="allow") description: Annotated[Optional[str], Field(None, title="Plugin description")] - depends_on: Annotated[Optional[str], Field(None, title="Plugin dependency")] + depends_on: Annotated[str, Field(None, title="Plugin dependency")] enabled: Annotated[ bool, Field( @@ -833,7 +812,7 @@ class Plugin(PartialModelMixin, BaseModel): version: Annotated[Optional[str], Field(None, title="Plugin version")] -class ExternalNetwork(PartialModelMixin, BaseModel): +class ExternalNetwork(BaseModel): """Configuration for NAT for external connections""" model_config = ConfigDict(title="Set up NAT for outgoing connections", extra="forbid") @@ -857,7 +836,7 @@ class ExternalNetwork(PartialModelMixin, BaseModel): ) -class Network(PartialModelMixin, BaseModel): +class Network(BaseModel): """Configuration for networks to attach to guest""" model_config = ConfigDict(title="Network Configuration", extra="forbid") @@ -865,7 +844,7 @@ class Network(PartialModelMixin, BaseModel): external: ExternalNetwork = Field(default_factory=ExternalNetwork) -class Main(PartialModelMixin, BaseModel): +class Main(BaseModel): """Configuration file for config-file-based rehosting with IGLOO""" model_config = ConfigDict(title="Penguin Configuration", extra="forbid") @@ -882,6 +861,3 @@ class Main(PartialModelMixin, BaseModel): static_files: StaticFiles plugins: Annotated[dict[str, Plugin], Field(title="Plugins")] network: Optional[Network] = None - - -Patch = create_partial_model(Main, recursive=True) diff --git a/tests/unit_tests/test_target/base_config.yaml b/tests/unit_tests/test_target/base_config.yaml index 40d2e68c9..e5f905ec7 100644 --- a/tests/unit_tests/test_target/base_config.yaml +++ b/tests/unit_tests/test_target/base_config.yaml @@ -67,6 +67,7 @@ plugins: indiv_debug: {} syscalls_logger: {} nvram2: {} + proctree: {} static_files: /run_tests.sh: