Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/ouroboros/plugin/firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
HOOK_COMPLETED_EVENT,
HOOK_FAILED_EVENT,
HOOK_INVOKED_EVENT,
TERMINAL_OBSERVABILITY_HOOK_KINDS,
HookFailurePolicy,
HookKind,
)
Expand Down Expand Up @@ -565,6 +566,49 @@ def _to_bytes(value: object) -> bytes:

def _run_lifecycle_hooks(hook_kind: HookKind) -> tuple[bool, str]:
for hook in _matching_hooks(manifest, hook_kind):
# Defense-in-depth: terminal observability hooks (``on_error`` /
# ``on_cancel``) MUST be fail-open at the contract level. The
# manifest JSON schema and loader already reject ``fail_closed``
# for these kinds, but a HookSpec constructed programmatically
# (bypassing the loader) could still smuggle ``fail_closed``
# into runtime. If we ever see that here, refuse to dispatch
# the subprocess, emit a bounded audit record, and continue —
# terminal observability stays fail-open at the contract level
# so the original ``plugin.failed`` cause reaches the caller
# unchanged.
if (
hook_kind in TERMINAL_OBSERVABILITY_HOOK_KINDS
and hook.failure_policy == HookFailurePolicy.FAIL_CLOSED.value
):
_emit(
_event_envelope(
event_type=HOOK_BLOCKED_EVENT,
manifest=manifest,
namespace=namespace,
command_name=command_name,
argv=argv,
trust_state=trust_state,
permissions_used=hook.permissions,
result={
"status": "blocked",
"message": (
"terminal observability hook "
f"{hook.name!r} declared fail_closed; "
"terminal hooks must be fail_open."
),
},
provenance={
"correlation_id": correlation_id,
"hook_name": hook.name,
"hook_kind": hook_kind.value,
"failure_policy": hook.failure_policy,
"reason": "terminal_observability_must_be_fail_open",
},
schema_version=HOOK_AUDIT_SCHEMA_VERSION,
)
)
continue

hook_provenance = {
"correlation_id": correlation_id,
"hook_name": hook.name,
Expand Down
151 changes: 151 additions & 0 deletions tests/unit/plugin/test_lifecycle_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from __future__ import annotations

import dataclasses
import json
from pathlib import Path
import subprocess
Expand All @@ -40,6 +41,7 @@
HookKind,
)
from ouroboros.plugin.manifest import (
HookSpec,
PluginManifestError,
load_manifest,
)
Expand Down Expand Up @@ -628,3 +630,152 @@ def test_blocked_hook_event_constant_export(self) -> None:
assert HOOK_COMPLETED_EVENT == "plugin.hook.completed"
assert HOOK_BLOCKED_EVENT == "plugin.hook.blocked"
assert HOOK_FAILED_EVENT == "plugin.hook.failed"


# ---------------------------------------------------------------------------
# Defense-in-depth: terminal observability fail_closed guard
# ---------------------------------------------------------------------------


def _forge_fail_closed_terminal_hook(
tmp_path: Path,
hook_name: str,
):
"""Register a legit fail_open hook then forge fail_closed at runtime.

The manifest JSON schema and loader already reject ``fail_closed`` for
terminal observability hooks. To exercise the runtime defense-in-depth
guard inside ``_run_lifecycle_hooks``, we register the legitimate
fail-open manifest and then replace the in-memory ``HookSpec`` with a
forged ``fail_closed`` variant, bypassing the validator path entirely.
"""

program = _register(tmp_path, _payload_with_hooks(hook_name))
original_hooks = program.manifest.hooks
assert len(original_hooks) == 1
forged_hook = dataclasses.replace(original_hooks[0], failure_policy="fail_closed")
forged_manifest = dataclasses.replace(program.manifest, hooks=(forged_hook,))
return dataclasses.replace(program, manifest=forged_manifest)


class TestTerminalFailClosedRuntimeGuard:
"""``_run_lifecycle_hooks`` must refuse to dispatch terminal hooks
declared ``fail_closed`` even if a HookSpec smuggles that policy past
the manifest layer — terminal observability is contract-level
fail-open.
"""

def test_on_error_fail_closed_is_blocked_and_subprocess_skipped(self, tmp_path: Path) -> None:
program = _forge_fail_closed_terminal_hook(tmp_path, "on_error")
trust = _grant_full_trust(tmp_path)

hook_calls: list[list[str]] = []

def runner(argv, *args, **kwargs): # noqa: ARG001
if argv[:2] == ["python", "-m"] and argv[2].startswith("hook_"):
hook_calls.append(list(argv))
return subprocess.CompletedProcess(args=argv, returncode=0, stdout=b"", stderr=b"")
return subprocess.CompletedProcess(args=argv, returncode=42, stdout=b"", stderr=b"")

events: list[dict] = []
result = invoke_plugin(
program,
command_name="review",
argv=["https://example.com/pr/1"],
trust_record=trust,
event_sink=events.append,
correlation_id="corr-on-error-forged-fail-closed",
subprocess_runner=runner,
)

# Original plugin.failed cause is preserved unchanged.
assert result.status == "failed"
assert result.exit_code == 42
assert "exited with code 42" in result.message

terminal_failed = [event for event in events if event["event_type"] == "plugin.failed"]
assert len(terminal_failed) == 1
assert terminal_failed[0]["result"]["message"] == result.message

# The forged hook subprocess is NEVER invoked.
assert hook_calls == []

# A plugin.hook.blocked audit event is emitted for the forged hook.
blocked = [event for event in events if event["event_type"] == HOOK_BLOCKED_EVENT]
assert blocked, "fail_closed terminal hook must emit plugin.hook.blocked"
assert blocked[0]["provenance"]["hook_name"] == "on_error"
assert blocked[0]["provenance"]["hook_kind"] == "on_error"
assert blocked[0]["provenance"]["failure_policy"] == "fail_closed"
assert blocked[0]["provenance"]["reason"] == "terminal_observability_must_be_fail_open"
assert blocked[0]["result"]["status"] == "blocked"

# The block must not produce plugin.hook.invoked / .completed for
# the forged hook — the subprocess gate was never crossed.
names = [event["event_type"] for event in events]
assert HOOK_INVOKED_EVENT not in names
assert HOOK_COMPLETED_EVENT not in names
# And the original failure is not masked: no extra plugin.failed.
assert names.count("plugin.failed") == 1

def test_on_cancel_fail_closed_is_blocked_and_subprocess_skipped(self, tmp_path: Path) -> None:
program = _forge_fail_closed_terminal_hook(tmp_path, "on_cancel")
trust = _grant_full_trust(tmp_path)

hook_calls: list[list[str]] = []
command_calls: list[list[str]] = []

def runner(argv, *args, **kwargs): # noqa: ARG001
if argv[:2] == ["python", "-m"] and argv[2].startswith("hook_"):
hook_calls.append(list(argv))
return subprocess.CompletedProcess(args=argv, returncode=0, stdout=b"", stderr=b"")
command_calls.append(list(argv))
return subprocess.CompletedProcess(args=argv, returncode=0, stdout=b"", stderr=b"")

events: list[dict] = []
result = invoke_plugin(
program,
command_name="review",
argv=["https://example.com/pr/1"],
trust_record=trust,
event_sink=events.append,
correlation_id="corr-on-cancel-forged-fail-closed",
subprocess_runner=runner,
cancellation_requested=True,
)

# Original cancel cause preserved end-to-end.
assert result.status == "failed"
assert "cancelled" in result.message
terminal_failed = [event for event in events if event["event_type"] == "plugin.failed"]
assert len(terminal_failed) == 1
assert terminal_failed[0]["provenance"]["reason"] == "cancelled"
assert terminal_failed[0]["result"]["message"] == result.message

# Forged on_cancel hook subprocess is NEVER invoked.
assert hook_calls == []
# Cancellation also short-circuits the launched command.
assert command_calls == []

# plugin.hook.blocked is emitted with the forged-hook provenance.
blocked = [event for event in events if event["event_type"] == HOOK_BLOCKED_EVENT]
assert blocked, "fail_closed terminal hook must emit plugin.hook.blocked"
assert blocked[0]["provenance"]["hook_name"] == "on_cancel"
assert blocked[0]["provenance"]["hook_kind"] == "on_cancel"
assert blocked[0]["provenance"]["failure_policy"] == "fail_closed"
assert blocked[0]["provenance"]["reason"] == "terminal_observability_must_be_fail_open"
assert blocked[0]["result"]["status"] == "blocked"

names = [event["event_type"] for event in events]
assert HOOK_INVOKED_EVENT not in names
assert HOOK_COMPLETED_EVENT not in names

def test_forged_hookspec_keeps_hookspec_type(self, tmp_path: Path) -> None:
# Sanity: the forge path produces an actual HookSpec — this guard
# is testing runtime behaviour for valid HookSpec instances that
# bypassed the schema/loader, not for arbitrary duck-types.
program = _forge_fail_closed_terminal_hook(tmp_path, "on_error")
assert len(program.manifest.hooks) == 1
forged = program.manifest.hooks[0]
assert isinstance(forged, HookSpec)
assert forged.failure_policy == "fail_closed"
assert forged.name == "on_error"
Loading