From 2872e8b71b20463e8e5370bdfb449f7671ffe97d Mon Sep 17 00:00:00 2001 From: Junghwan <70629228+shaun0927@users.noreply.github.com> Date: Wed, 20 May 2026 12:01:58 +0900 Subject: [PATCH 1/4] fix(plugin): block fail_closed terminal-observability hooks at dispatch Refs #1142, #939. Closes the #1137 code review HIGH finding on _run_lifecycle_hooks defense-in-depth gap. --- src/ouroboros/plugin/firewall.py | 44 +++++ .../plugin/test_lifecycle_observability.py | 177 ++++++++++++++++++ 2 files changed, 221 insertions(+) diff --git a/src/ouroboros/plugin/firewall.py b/src/ouroboros/plugin/firewall.py index bc7c31035..f35d5c43d 100644 --- a/src/ouroboros/plugin/firewall.py +++ b/src/ouroboros/plugin/firewall.py @@ -51,6 +51,7 @@ HOOK_COMPLETED_EVENT, HOOK_FAILED_EVENT, HOOK_INVOKED_EVENT, + TERMINAL_OBSERVABILITY_HOOK_KINDS, HookFailurePolicy, HookKind, ) @@ -565,6 +566,49 @@ def _to_bytes(value: object) -> bytes: def _run_lifecycle_hooks(hook_kind: HookKind) -> tuple[bool, str]: for hook in _matching_hooks(manifest, hook_kind): + # Defense-in-depth: terminal observability hooks (``on_error`` / + # ``on_cancel``) MUST be fail-open at the contract level. The + # manifest JSON schema and loader already reject ``fail_closed`` + # for these kinds, but a HookSpec constructed programmatically + # (bypassing the loader) could still smuggle ``fail_closed`` + # into runtime. If we ever see that here, refuse to dispatch + # the subprocess, emit a bounded audit record, and continue — + # terminal observability stays fail-open at the contract level + # so the original ``plugin.failed`` cause reaches the caller + # unchanged. + if ( + hook_kind in TERMINAL_OBSERVABILITY_HOOK_KINDS + and hook.failure_policy == HookFailurePolicy.FAIL_CLOSED.value + ): + _emit( + _event_envelope( + event_type=HOOK_BLOCKED_EVENT, + manifest=manifest, + namespace=namespace, + command_name=command_name, + argv=argv, + trust_state=trust_state, + permissions_used=hook.permissions, + result={ + "status": "blocked", + "message": ( + "terminal observability hook " + f"{hook.name!r} declared fail_closed; " + "terminal hooks must be fail_open." + ), + }, + provenance={ + "correlation_id": correlation_id, + "hook_name": hook.name, + "hook_kind": hook_kind.value, + "failure_policy": hook.failure_policy, + "reason": "terminal_observability_must_be_fail_open", + }, + schema_version=HOOK_AUDIT_SCHEMA_VERSION, + ) + ) + continue + hook_provenance = { "correlation_id": correlation_id, "hook_name": hook.name, diff --git a/tests/unit/plugin/test_lifecycle_observability.py b/tests/unit/plugin/test_lifecycle_observability.py index d798693e6..de2825760 100644 --- a/tests/unit/plugin/test_lifecycle_observability.py +++ b/tests/unit/plugin/test_lifecycle_observability.py @@ -22,6 +22,7 @@ from __future__ import annotations +import dataclasses import json from pathlib import Path import subprocess @@ -40,6 +41,7 @@ HookKind, ) from ouroboros.plugin.manifest import ( + HookSpec, PluginManifestError, load_manifest, ) @@ -628,3 +630,178 @@ def test_blocked_hook_event_constant_export(self) -> None: assert HOOK_COMPLETED_EVENT == "plugin.hook.completed" assert HOOK_BLOCKED_EVENT == "plugin.hook.blocked" assert HOOK_FAILED_EVENT == "plugin.hook.failed" + + +# --------------------------------------------------------------------------- +# Defense-in-depth: terminal observability fail_closed guard +# --------------------------------------------------------------------------- + + +def _forge_fail_closed_terminal_hook( + tmp_path: Path, + hook_name: str, +): + """Register a legit fail_open hook then forge fail_closed at runtime. + + The manifest JSON schema and loader already reject ``fail_closed`` for + terminal observability hooks. To exercise the runtime defense-in-depth + guard inside ``_run_lifecycle_hooks``, we register the legitimate + fail-open manifest and then replace the in-memory ``HookSpec`` with a + forged ``fail_closed`` variant, bypassing the validator path entirely. + """ + + program = _register(tmp_path, _payload_with_hooks(hook_name)) + original_hooks = program.manifest.hooks + assert len(original_hooks) == 1 + forged_hook = dataclasses.replace(original_hooks[0], failure_policy="fail_closed") + forged_manifest = dataclasses.replace(program.manifest, hooks=(forged_hook,)) + return dataclasses.replace(program, manifest=forged_manifest) + + +class TestTerminalFailClosedRuntimeGuard: + """``_run_lifecycle_hooks`` must refuse to dispatch terminal hooks + declared ``fail_closed`` even if a HookSpec smuggles that policy past + the manifest layer — terminal observability is contract-level + fail-open. + """ + + def test_on_error_fail_closed_is_blocked_and_subprocess_skipped( + self, tmp_path: Path + ) -> None: + program = _forge_fail_closed_terminal_hook(tmp_path, "on_error") + trust = _grant_full_trust(tmp_path) + + hook_calls: list[list[str]] = [] + + def runner(argv, *args, **kwargs): # noqa: ARG001 + if argv[:2] == ["python", "-m"] and argv[2].startswith("hook_"): + hook_calls.append(list(argv)) + return subprocess.CompletedProcess( + args=argv, returncode=0, stdout=b"", stderr=b"" + ) + return subprocess.CompletedProcess( + args=argv, returncode=42, stdout=b"", stderr=b"" + ) + + events: list[dict] = [] + result = invoke_plugin( + program, + command_name="review", + argv=["https://example.com/pr/1"], + trust_record=trust, + event_sink=events.append, + correlation_id="corr-on-error-forged-fail-closed", + subprocess_runner=runner, + ) + + # Original plugin.failed cause is preserved unchanged. + assert result.status == "failed" + assert result.exit_code == 42 + assert "exited with code 42" in result.message + + terminal_failed = [ + event for event in events if event["event_type"] == "plugin.failed" + ] + assert len(terminal_failed) == 1 + assert terminal_failed[0]["result"]["message"] == result.message + + # The forged hook subprocess is NEVER invoked. + assert hook_calls == [] + + # A plugin.hook.blocked audit event is emitted for the forged hook. + blocked = [ + event for event in events if event["event_type"] == HOOK_BLOCKED_EVENT + ] + assert blocked, "fail_closed terminal hook must emit plugin.hook.blocked" + assert blocked[0]["provenance"]["hook_name"] == "on_error" + assert blocked[0]["provenance"]["hook_kind"] == "on_error" + assert blocked[0]["provenance"]["failure_policy"] == "fail_closed" + assert ( + blocked[0]["provenance"]["reason"] + == "terminal_observability_must_be_fail_open" + ) + assert blocked[0]["result"]["status"] == "blocked" + + # The block must not produce plugin.hook.invoked / .completed for + # the forged hook — the subprocess gate was never crossed. + names = [event["event_type"] for event in events] + assert HOOK_INVOKED_EVENT not in names + assert HOOK_COMPLETED_EVENT not in names + # And the original failure is not masked: no extra plugin.failed. + assert names.count("plugin.failed") == 1 + + def test_on_cancel_fail_closed_is_blocked_and_subprocess_skipped( + self, tmp_path: Path + ) -> None: + program = _forge_fail_closed_terminal_hook(tmp_path, "on_cancel") + trust = _grant_full_trust(tmp_path) + + hook_calls: list[list[str]] = [] + command_calls: list[list[str]] = [] + + def runner(argv, *args, **kwargs): # noqa: ARG001 + if argv[:2] == ["python", "-m"] and argv[2].startswith("hook_"): + hook_calls.append(list(argv)) + return subprocess.CompletedProcess( + args=argv, returncode=0, stdout=b"", stderr=b"" + ) + command_calls.append(list(argv)) + return subprocess.CompletedProcess( + args=argv, returncode=0, stdout=b"", stderr=b"" + ) + + events: list[dict] = [] + result = invoke_plugin( + program, + command_name="review", + argv=["https://example.com/pr/1"], + trust_record=trust, + event_sink=events.append, + correlation_id="corr-on-cancel-forged-fail-closed", + subprocess_runner=runner, + cancellation_requested=True, + ) + + # Original cancel cause preserved end-to-end. + assert result.status == "failed" + assert "cancelled" in result.message + terminal_failed = [ + event for event in events if event["event_type"] == "plugin.failed" + ] + assert len(terminal_failed) == 1 + assert terminal_failed[0]["provenance"]["reason"] == "cancelled" + assert terminal_failed[0]["result"]["message"] == result.message + + # Forged on_cancel hook subprocess is NEVER invoked. + assert hook_calls == [] + # Cancellation also short-circuits the launched command. + assert command_calls == [] + + # plugin.hook.blocked is emitted with the forged-hook provenance. + blocked = [ + event for event in events if event["event_type"] == HOOK_BLOCKED_EVENT + ] + assert blocked, "fail_closed terminal hook must emit plugin.hook.blocked" + assert blocked[0]["provenance"]["hook_name"] == "on_cancel" + assert blocked[0]["provenance"]["hook_kind"] == "on_cancel" + assert blocked[0]["provenance"]["failure_policy"] == "fail_closed" + assert ( + blocked[0]["provenance"]["reason"] + == "terminal_observability_must_be_fail_open" + ) + assert blocked[0]["result"]["status"] == "blocked" + + names = [event["event_type"] for event in events] + assert HOOK_INVOKED_EVENT not in names + assert HOOK_COMPLETED_EVENT not in names + + def test_forged_hookspec_keeps_hookspec_type(self, tmp_path: Path) -> None: + # Sanity: the forge path produces an actual HookSpec — this guard + # is testing runtime behaviour for valid HookSpec instances that + # bypassed the schema/loader, not for arbitrary duck-types. + program = _forge_fail_closed_terminal_hook(tmp_path, "on_error") + assert len(program.manifest.hooks) == 1 + forged = program.manifest.hooks[0] + assert isinstance(forged, HookSpec) + assert forged.failure_policy == "fail_closed" + assert forged.name == "on_error" From 3737c176e241b3a4f417a3e927d371f0ba0e8b1d Mon Sep 17 00:00:00 2001 From: Junghwan <70629228+shaun0927@users.noreply.github.com> Date: Wed, 20 May 2026 12:13:27 +0900 Subject: [PATCH 2/4] chore(ruff): fix lint violations Refs #1142. --- .../plugin/test_lifecycle_observability.py | 50 +++++-------------- 1 file changed, 12 insertions(+), 38 deletions(-) diff --git a/tests/unit/plugin/test_lifecycle_observability.py b/tests/unit/plugin/test_lifecycle_observability.py index de2825760..8777ef89e 100644 --- a/tests/unit/plugin/test_lifecycle_observability.py +++ b/tests/unit/plugin/test_lifecycle_observability.py @@ -665,9 +665,7 @@ class TestTerminalFailClosedRuntimeGuard: fail-open. """ - def test_on_error_fail_closed_is_blocked_and_subprocess_skipped( - self, tmp_path: Path - ) -> None: + def test_on_error_fail_closed_is_blocked_and_subprocess_skipped(self, tmp_path: Path) -> None: program = _forge_fail_closed_terminal_hook(tmp_path, "on_error") trust = _grant_full_trust(tmp_path) @@ -676,12 +674,8 @@ def test_on_error_fail_closed_is_blocked_and_subprocess_skipped( def runner(argv, *args, **kwargs): # noqa: ARG001 if argv[:2] == ["python", "-m"] and argv[2].startswith("hook_"): hook_calls.append(list(argv)) - return subprocess.CompletedProcess( - args=argv, returncode=0, stdout=b"", stderr=b"" - ) - return subprocess.CompletedProcess( - args=argv, returncode=42, stdout=b"", stderr=b"" - ) + return subprocess.CompletedProcess(args=argv, returncode=0, stdout=b"", stderr=b"") + return subprocess.CompletedProcess(args=argv, returncode=42, stdout=b"", stderr=b"") events: list[dict] = [] result = invoke_plugin( @@ -699,9 +693,7 @@ def runner(argv, *args, **kwargs): # noqa: ARG001 assert result.exit_code == 42 assert "exited with code 42" in result.message - terminal_failed = [ - event for event in events if event["event_type"] == "plugin.failed" - ] + terminal_failed = [event for event in events if event["event_type"] == "plugin.failed"] assert len(terminal_failed) == 1 assert terminal_failed[0]["result"]["message"] == result.message @@ -709,17 +701,12 @@ def runner(argv, *args, **kwargs): # noqa: ARG001 assert hook_calls == [] # A plugin.hook.blocked audit event is emitted for the forged hook. - blocked = [ - event for event in events if event["event_type"] == HOOK_BLOCKED_EVENT - ] + blocked = [event for event in events if event["event_type"] == HOOK_BLOCKED_EVENT] assert blocked, "fail_closed terminal hook must emit plugin.hook.blocked" assert blocked[0]["provenance"]["hook_name"] == "on_error" assert blocked[0]["provenance"]["hook_kind"] == "on_error" assert blocked[0]["provenance"]["failure_policy"] == "fail_closed" - assert ( - blocked[0]["provenance"]["reason"] - == "terminal_observability_must_be_fail_open" - ) + assert blocked[0]["provenance"]["reason"] == "terminal_observability_must_be_fail_open" assert blocked[0]["result"]["status"] == "blocked" # The block must not produce plugin.hook.invoked / .completed for @@ -730,9 +717,7 @@ def runner(argv, *args, **kwargs): # noqa: ARG001 # And the original failure is not masked: no extra plugin.failed. assert names.count("plugin.failed") == 1 - def test_on_cancel_fail_closed_is_blocked_and_subprocess_skipped( - self, tmp_path: Path - ) -> None: + def test_on_cancel_fail_closed_is_blocked_and_subprocess_skipped(self, tmp_path: Path) -> None: program = _forge_fail_closed_terminal_hook(tmp_path, "on_cancel") trust = _grant_full_trust(tmp_path) @@ -742,13 +727,9 @@ def test_on_cancel_fail_closed_is_blocked_and_subprocess_skipped( def runner(argv, *args, **kwargs): # noqa: ARG001 if argv[:2] == ["python", "-m"] and argv[2].startswith("hook_"): hook_calls.append(list(argv)) - return subprocess.CompletedProcess( - args=argv, returncode=0, stdout=b"", stderr=b"" - ) + return subprocess.CompletedProcess(args=argv, returncode=0, stdout=b"", stderr=b"") command_calls.append(list(argv)) - return subprocess.CompletedProcess( - args=argv, returncode=0, stdout=b"", stderr=b"" - ) + return subprocess.CompletedProcess(args=argv, returncode=0, stdout=b"", stderr=b"") events: list[dict] = [] result = invoke_plugin( @@ -765,9 +746,7 @@ def runner(argv, *args, **kwargs): # noqa: ARG001 # Original cancel cause preserved end-to-end. assert result.status == "failed" assert "cancelled" in result.message - terminal_failed = [ - event for event in events if event["event_type"] == "plugin.failed" - ] + terminal_failed = [event for event in events if event["event_type"] == "plugin.failed"] assert len(terminal_failed) == 1 assert terminal_failed[0]["provenance"]["reason"] == "cancelled" assert terminal_failed[0]["result"]["message"] == result.message @@ -778,17 +757,12 @@ def runner(argv, *args, **kwargs): # noqa: ARG001 assert command_calls == [] # plugin.hook.blocked is emitted with the forged-hook provenance. - blocked = [ - event for event in events if event["event_type"] == HOOK_BLOCKED_EVENT - ] + blocked = [event for event in events if event["event_type"] == HOOK_BLOCKED_EVENT] assert blocked, "fail_closed terminal hook must emit plugin.hook.blocked" assert blocked[0]["provenance"]["hook_name"] == "on_cancel" assert blocked[0]["provenance"]["hook_kind"] == "on_cancel" assert blocked[0]["provenance"]["failure_policy"] == "fail_closed" - assert ( - blocked[0]["provenance"]["reason"] - == "terminal_observability_must_be_fail_open" - ) + assert blocked[0]["provenance"]["reason"] == "terminal_observability_must_be_fail_open" assert blocked[0]["result"]["status"] == "blocked" names = [event["event_type"] for event in events] From 9b0d45c3e2febbe9fceee1ab6c9dca2f5014fc00 Mon Sep 17 00:00:00 2001 From: shaun0927 Date: Wed, 20 May 2026 03:22:22 +0000 Subject: [PATCH 3/4] chore: retrigger terminal hook review gates From 21040123a863bed142d25e667a3d0e7e4e5363f4 Mon Sep 17 00:00:00 2001 From: shaun0927 Date: Wed, 20 May 2026 03:33:10 +0000 Subject: [PATCH 4/4] chore: retry bot review after sandbox hiccup