diff --git a/.well-known/agents-shipgate.json b/.well-known/agents-shipgate.json index d4bbc66..49e2957 100644 --- a/.well-known/agents-shipgate.json +++ b/.well-known/agents-shipgate.json @@ -46,6 +46,7 @@ "2": "config_error", "3": "input_parse_error", "4": "other_error", + "6": "baseline_integrity_failure", "20": "strict_gate_failure" }, "agent_mode_env_var": "AGENTS_SHIPGATE_AGENT_MODE", diff --git a/STABILITY.md b/STABILITY.md index ec58276..eb5dcde 100644 --- a/STABILITY.md +++ b/STABILITY.md @@ -25,6 +25,7 @@ These commands and flags are stable across all `0.x.y` releases. They will only | `agents-shipgate bootstrap` | `--workspace`, `--confidence`, `--no-ci`, `--no-apply`, `--json` | | `agents-shipgate list-checks` | `--json`, `--no-plugins` | | `agents-shipgate baseline save` | `-c`, `--config`, `--out` | +| `agents-shipgate baseline verify` (v0.11+) | `--baseline`, `--audit-log`, `--strict`, `--json`, `--verbose` | | `agents-shipgate fixture list` | `--json` | | `agents-shipgate fixture run` | ``, `--ci-mode`, `--out` | | `agents-shipgate fixture copy` | ``, `--to` | @@ -39,6 +40,7 @@ These commands and flags are stable across all `0.x.y` releases. They will only | `2` | Manifest config error (missing/typo/invalid) | | `3` | Input parse error (malformed YAML/JSON, file too large, path traversal blocked) | | `4` | Other Agents Shipgate error | +| `6` | Baseline integrity failure (v0.11+) — `agents-shipgate baseline verify --strict` detected `SHIP-BASELINE-INTEGRITY-MISMATCH`. Only the standalone `baseline verify` command emits this code; `scan` continues to use `20` for gate failure regardless of integrity-mode. | | `20` | Strict-mode gate failure (≥ 1 unsuppressed finding hit `fail_on`, or ≥ 1 active unbaselined finding sets `blocks_release`) | ### Runtime contract JSON @@ -171,6 +173,85 @@ reject unknown top-level fields instead of silently ignoring release policy. Manifests that use `action_surface:` require a CLI whose `agents-shipgate contract --json` reports `report_schema_version >= 0.16`. +### Baseline Integrity (v0.5) + +Baseline schema bumps to `0.5`. The wire shape adds an optional +`findings[].provenance` block per entry recording when and by which scanner +the entry was added: + +```json +{ + "fingerprint": "fp_…", + "check_id": "SHIP-…", + "tool_name": "…", + "severity": "high", + "title": "…", + "provenance": { + "scanner_version": "0.11.0", + "run_id": "agents_shipgate_…", + "recorded_at": "2026-05-15T14:23:00Z", + "reason": null, + "expires": null + } +} +``` + +`provenance` is optional on the wire so older v0.2/v0.3/v0.4 baselines still +load. The integrity check flags legacy-no-provenance entries as +`SHIP-BASELINE-INTEGRITY-MISMATCH` until they are re-stamped by re-running +`agents-shipgate baseline save`. `provenance.reason` and `provenance.expires` +are reviewer-set and free-form / ISO-8601 date respectively. + +Each `agents-shipgate baseline save` appends one JSON line to +`/baseline-audit.log`. The log row is **stable**: + +- `audit_schema_version: "0.1"` +- `timestamp` — ISO-8601 UTC +- `run_id` — scan's run_id (matches `BaselineProvenance.run_id` for any + fingerprints added in this save) +- `scanner_version` — Agents Shipgate version that wrote the row +- `baseline_path` — string path saved at the time of the row +- `hash_before` — `"sha256:…"` of the prior baseline file content, or `null` + when this was the first save +- `hash_after` — `"sha256:…"` of the new baseline file content +- `added_fingerprints[]`, `removed_fingerprints[]` — sorted deltas + +The audit log is append-only and intentionally co-located with the baseline so +a single `.agents-shipgate/` directory carries both. Commit both files +together; reviewers can `git log .agents-shipgate/baseline-audit.log` to see +when fingerprints joined the baseline. + +`manifest.baseline.integrity_mode` controls behavior when `scan --baseline X` +detects an integrity issue. Stable values: + +- `off` — no integrity checks. Back-compat escape hatch for repos that have + not migrated to v0.5 baselines yet. +- `warn` (default in v0.11) — integrity findings emitted but + `blocks_release: false`; release decision is unaffected. +- `strict` — `SHIP-BASELINE-INTEGRITY-MISMATCH` carries + `blocks_release: true` and `agents-shipgate baseline verify` exits `6` on + the same condition. + +New stable check IDs (v0.11+): + +- `SHIP-BASELINE-INTEGRITY-MISMATCH` (critical) — file hash mismatch, missing + audit log, audit log empty, entry references unknown `run_id`, or entry + loaded from a legacy schema without provenance. +- `SHIP-BASELINE-ENTRY-EXPIRED` (high) — `provenance.expires` < today. +- `SHIP-BASELINE-ENTRY-STALE` (low) — deprecated check ID in the entry, or + the entry matched no active finding (scan-aware; resolved-not-pruned). + +Integrity findings bypass `checks.ignore` (suppression) and +`checks.severity_overrides`. Silencing tamper detection would defeat the +trust property the audit log defends. They flow through the regular report +pipeline otherwise (fingerprinting, baseline-status assignment, remediation +annotation). + +The audit log is **tamper-evident, not tamper-proof**: a well-resourced +adversary who atomically rewrites both the baseline JSON and the audit log +defeats `verify`. The goal is to make casual or accidental edits observably +wrong in code review. + ### Tool-Surface Diff `agents-shipgate scan --diff-from ` accepts a prior `report.json` or a diff --git a/docs/checks.json b/docs/checks.json index ddddffa..ed7a9c9 100644 --- a/docs/checks.json +++ b/docs/checks.json @@ -539,6 +539,70 @@ "requires_human_review": true, "suggested_patch_kind": "manual" }, + { + "autofix_safe": false, + "category": "baseline", + "default_severity": "high", + "description": "Baseline entry's review window has expired.", + "docs_url": "https://github.com/ThreeMoonsLab/agents-shipgate/blob/main/docs/checks.md#ship-baseline-entry-expired", + "evidence_fields": [ + "fingerprint", + "check_id", + "tool_name", + "expires", + "days_overdue", + "reason" + ], + "fires_when": "A baseline entry's `provenance.expires` is before today's UTC date.", + "id": "SHIP-BASELINE-ENTRY-EXPIRED", + "rationale": "Reviewer-set `provenance.expires` is the renewable consent for accepting technical debt. Past that date the entry needs a fresh review, not a silent extension.", + "recommendation": "Re-review the accepted debt and either remove the entry, fix the underlying finding, or extend `provenance.expires` with a new reason.", + "requires_human_review": true, + "suggested_patch_kind": "manual" + }, + { + "autofix_safe": false, + "category": "baseline", + "default_severity": "low", + "description": "Baseline entry no longer corresponds to an active finding or check ID.", + "docs_url": "https://github.com/ThreeMoonsLab/agents-shipgate/blob/main/docs/checks.md#ship-baseline-entry-stale", + "evidence_fields": [ + "fingerprint", + "check_id", + "tool_name", + "kind", + "replacement_check_ids" + ], + "fires_when": "A baseline entry references a deprecated check ID (an alias in LEGACY_CHECK_ID_ALIASES) or did not match any active scan finding (resolved_count contribution).", + "id": "SHIP-BASELINE-ENTRY-STALE", + "rationale": "Stale baseline entries hide intent \u2014 reviewers cannot tell whether the accepted debt was resolved or whether the check was renamed. Pruning keeps the baseline aligned with reality.", + "recommendation": "Remove resolved entries via `agents-shipgate baseline save`, or update deprecated check IDs to their canonical replacements.", + "requires_human_review": true, + "suggested_patch_kind": "manual" + }, + { + "autofix_safe": false, + "category": "baseline", + "default_severity": "critical", + "description": "Baseline file integrity check failed.", + "docs_url": "https://github.com/ThreeMoonsLab/agents-shipgate/blob/main/docs/checks.md#ship-baseline-integrity-mismatch", + "evidence_fields": [ + "fingerprint", + "check_id", + "tool_name", + "kind", + "expected_hash", + "computed_hash", + "audit_log_path", + "latest_audit_run_id" + ], + "fires_when": "The baseline file's SHA-256 differs from the latest audit log entry's hash_after, the audit log is missing or empty, an entry's provenance.run_id is not in the audit log, or an entry pre-dates v0.5 and lacks provenance entirely.", + "id": "SHIP-BASELINE-INTEGRITY-MISMATCH", + "rationale": "The baseline JSON has been edited outside `agents-shipgate baseline save`, lacks an audit log row, or references a run_id not present in the audit log. A release gate that accepts silent baseline edits cannot claim to govern technical debt.", + "recommendation": "Re-run `agents-shipgate baseline save` to refresh the baseline and audit log, or `agents-shipgate baseline verify` for the full report. Investigate the diff before accepting.", + "requires_human_review": true, + "suggested_patch_kind": "manual" + }, { "autofix_safe": false, "category": "codex_plugin", diff --git a/docs/checks.md b/docs/checks.md index e7c8cb0..8f494e8 100644 --- a/docs/checks.md +++ b/docs/checks.md @@ -537,6 +537,37 @@ who is accountable for remediation. unused scopes or add tool metadata showing why the permission is needed. Broad unused write/admin scopes are `high`; other unused scopes are `medium`. +### SHIP-BASELINE-INTEGRITY-MISMATCH + +Baseline file integrity check failed. Emitted when the baseline JSON has been +edited outside `agents-shipgate baseline save` (hash mismatch against the +audit log), when the audit log is missing or empty for a non-empty baseline, +when an entry's `provenance.run_id` is not present in the audit log, or when +an entry pre-dates the v0.5 provenance contract. In +`baseline.integrity_mode: strict` the finding carries `blocks_release=true` +and `agents-shipgate baseline verify --strict` exits with code 6. +Re-run `agents-shipgate baseline save` to refresh the baseline alongside its +audit row; investigate the diff before accepting. + +### SHIP-BASELINE-ENTRY-EXPIRED + +A baseline entry's reviewer-set `provenance.expires` date is past today. +Renewable consent is a deliberate choice: accepted technical debt should +need re-review on a schedule, not a silent extension. Re-review the entry +and either remove it, fix the underlying finding, or extend +`provenance.expires` with a new `reason`. + +### SHIP-BASELINE-ENTRY-STALE + +A baseline entry no longer corresponds to an active finding or check ID. +Two sub-kinds, both `low` severity: + +- `deprecated_check_id` — entry references an alias in `LEGACY_CHECK_ID_ALIASES`. + Update the entry to the canonical replacement check IDs (re-running + `baseline save` does not rewrite check IDs). +- `resolved_not_pruned` — entry matched no active scan finding. Re-run + `agents-shipgate baseline save` to drop the entry from the baseline. + ## Risk Tags Risk tags are hints, not findings by themselves. Checks consume tags with confidence thresholds. diff --git a/docs/manifest-v0.1.json b/docs/manifest-v0.1.json index 1063be6..c12a447 100644 --- a/docs/manifest-v0.1.json +++ b/docs/manifest-v0.1.json @@ -604,6 +604,36 @@ "title": "ArtifactPathConfig", "type": "object" }, + "BaselineConfig": { + "additionalProperties": false, + "description": "Manifest knob governing v0.5 baseline integrity checks.\n\n``integrity_mode`` decides what happens when ``scan`` (with\n``--baseline``) detects an integrity issue:\n\n- ``off``: no integrity checks run (back-compat escape hatch for\n repos that have not migrated to v0.5 baselines yet).\n- ``warn`` (default in v0.17): integrity findings are emitted but\n ``blocks_release`` is false; release decision is unaffected.\n- ``strict``: ``SHIP-BASELINE-INTEGRITY-MISMATCH`` findings get\n ``blocks_release=true`` and ``agents-shipgate baseline verify``\n exits with code 6 on the same condition. Recommended target for\n v0.18.\n\n``audit_log`` overrides the default audit log path (relative to\nthe baseline file's directory). Usually left at its default.", + "properties": { + "audit_log": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, + "title": "Audit Log" + }, + "integrity_mode": { + "default": "warn", + "enum": [ + "off", + "warn", + "strict" + ], + "title": "Integrity Mode", + "type": "string" + } + }, + "title": "BaselineConfig", + "type": "object" + }, "ChecksConfig": { "additionalProperties": false, "properties": { @@ -1507,6 +1537,9 @@ ], "default": null }, + "baseline": { + "$ref": "#/$defs/BaselineConfig" + }, "checks": { "$ref": "#/$defs/ChecksConfig" }, diff --git a/llms-full.txt b/llms-full.txt index 4180ec6..bd6e016 100644 --- a/llms-full.txt +++ b/llms-full.txt @@ -1481,6 +1481,37 @@ who is accountable for remediation. unused scopes or add tool metadata showing why the permission is needed. Broad unused write/admin scopes are `high`; other unused scopes are `medium`. +### SHIP-BASELINE-INTEGRITY-MISMATCH + +Baseline file integrity check failed. Emitted when the baseline JSON has been +edited outside `agents-shipgate baseline save` (hash mismatch against the +audit log), when the audit log is missing or empty for a non-empty baseline, +when an entry's `provenance.run_id` is not present in the audit log, or when +an entry pre-dates the v0.5 provenance contract. In +`baseline.integrity_mode: strict` the finding carries `blocks_release=true` +and `agents-shipgate baseline verify --strict` exits with code 6. +Re-run `agents-shipgate baseline save` to refresh the baseline alongside its +audit row; investigate the diff before accepting. + +### SHIP-BASELINE-ENTRY-EXPIRED + +A baseline entry's reviewer-set `provenance.expires` date is past today. +Renewable consent is a deliberate choice: accepted technical debt should +need re-review on a schedule, not a silent extension. Re-review the entry +and either remove it, fix the underlying finding, or extend +`provenance.expires` with a new `reason`. + +### SHIP-BASELINE-ENTRY-STALE + +A baseline entry no longer corresponds to an active finding or check ID. +Two sub-kinds, both `low` severity: + +- `deprecated_check_id` — entry references an alias in `LEGACY_CHECK_ID_ALIASES`. + Update the entry to the canonical replacement check IDs (re-running + `baseline save` does not rewrite check IDs). +- `resolved_not_pruned` — entry matched no active scan finding. Re-run + `agents-shipgate baseline save` to drop the entry from the baseline. + ## Risk Tags Risk tags are hints, not findings by themselves. Checks consume tags with confidence thresholds. diff --git a/src/agents_shipgate/checks/baseline_integrity.py b/src/agents_shipgate/checks/baseline_integrity.py new file mode 100644 index 0000000..ad6858c --- /dev/null +++ b/src/agents_shipgate/checks/baseline_integrity.py @@ -0,0 +1,160 @@ +"""Convert baseline integrity issues into Finding objects. + +This module is the bridge between :mod:`agents_shipgate.core.baseline` +(which produces typed :class:`BaselineIntegrityIssue` records) and the +report findings stream. It lives under ``checks/`` because the engine +treats the resulting findings just like any other check output — they +flow through ``assign_finding_ids`` and ``annotate_remediation`` before +hitting the report. + +Unlike checks in ``BUILTIN_CHECKS``, this module is not invoked by +``run_checks``. It is called directly from :mod:`agents_shipgate.cli.scan` +after ``apply_baseline`` because the integrity check needs the loaded +baseline and audit log paths, not the tool context. + +Design notes +------------ + +- Integrity findings are **not subject to manifest suppressions**. The + whole point of the integrity check is to detect baseline tampering; + letting users silence it via ``checks.ignore`` would reopen the trust + hole that M2 closes. +- Severity is fixed at ``BaselineIntegrityIssue.default_severity``. + Manifest-level severity overrides (``checks.severity_overrides``) are + intentionally bypassed here. A user who wants to tune individual + integrity-finding severities should reach for M1's ``floor_severity`` + + acknowledgement mechanism instead. (Out of scope for M2.) +- In ``integrity_mode="strict"``, the + :data:`SHIP-BASELINE-INTEGRITY-MISMATCH` finding carries + ``blocks_release=True`` so the release decision blocks the gate even + for findings whose severity might otherwise be tunable. Lower-severity + integrity findings stay advisory in strict mode. +""" + +from __future__ import annotations + +from typing import Literal + +from agents_shipgate.core.baseline import BaselineIntegrityIssue +from agents_shipgate.core.context import ScanContext +from agents_shipgate.core.models import Finding, SourceReference, parse_severity + +IntegrityMode = Literal["off", "warn", "strict"] + +# Map each issue kind to its check ID. SHIP-BASELINE-INTEGRITY-MISMATCH +# covers the file-level concerns; SHIP-BASELINE-ENTRY-EXPIRED handles +# review-window expiry; SHIP-BASELINE-ENTRY-STALE collects the +# scan-aware and deprecated-id cases. +_KIND_TO_CHECK_ID: dict[str, str] = { + "hash_mismatch": "SHIP-BASELINE-INTEGRITY-MISMATCH", + "missing_audit_log": "SHIP-BASELINE-INTEGRITY-MISMATCH", + "entry_no_audit": "SHIP-BASELINE-INTEGRITY-MISMATCH", + "legacy_no_provenance": "SHIP-BASELINE-INTEGRITY-MISMATCH", + "entry_expired": "SHIP-BASELINE-ENTRY-EXPIRED", + "deprecated_check_id": "SHIP-BASELINE-ENTRY-STALE", + "resolved_not_pruned": "SHIP-BASELINE-ENTRY-STALE", +} + +# Per-check recommendation surfaced in the Finding. Specific text lives +# in CheckMetadata.recommendation; we duplicate a short version here so +# every emitted Finding carries useful inline guidance for reviewers +# reading report.md without bouncing through the check catalog. +_KIND_TO_RECOMMENDATION: dict[str, str] = { + "hash_mismatch": ( + "Run `agents-shipgate baseline verify` for the full report and " + "re-run `agents-shipgate baseline save` once the baseline is " + "reviewed." + ), + "missing_audit_log": ( + "Re-run `agents-shipgate baseline save` to regenerate the " + "audit log alongside the baseline file." + ), + "entry_no_audit": ( + "Run `agents-shipgate baseline save` to record a new audit " + "entry for the current baseline state." + ), + "legacy_no_provenance": ( + "Re-run `agents-shipgate baseline save` to upgrade the legacy " + "baseline to v0.5 with provenance stamps." + ), + "entry_expired": ( + "Re-review the accepted debt and either fix the underlying " + "finding, remove the baseline entry, or extend " + "`provenance.expires` with a new reason." + ), + "deprecated_check_id": ( + "Update the baseline entry to use one of the listed " + "replacement check IDs; re-running `baseline save` does not " + "rewrite check IDs." + ), + "resolved_not_pruned": ( + "Re-run `agents-shipgate baseline save` to drop the resolved " + "entry from the baseline." + ), +} + + +def build_findings( + issues: list[BaselineIntegrityIssue], + *, + context: ScanContext, + integrity_mode: IntegrityMode, +) -> list[Finding]: + """Convert integrity issues to Finding objects. + + Returns an empty list when ``integrity_mode == "off"`` so callers + can call this unconditionally without branching on the manifest + flag themselves. + + ``blocks_release`` is set to ``True`` on + ``SHIP-BASELINE-INTEGRITY-MISMATCH`` findings when + ``integrity_mode == "strict"``. All other integrity findings stay + at ``blocks_release=False`` regardless of mode — they're advisory + by design. Strict-mode gating focuses on the single signal + "baseline file has been edited", which is the trust property the + audit log defends. + """ + if integrity_mode == "off": + return [] + findings: list[Finding] = [] + for issue in issues: + check_id = _KIND_TO_CHECK_ID[issue.kind] + blocks_release = ( + integrity_mode == "strict" + and check_id == "SHIP-BASELINE-INTEGRITY-MISMATCH" + ) + findings.append( + Finding( + check_id=check_id, + title=issue.title, + severity=parse_severity(issue.default_severity), + category="baseline", + tool_id=None, + tool_name=issue.tool_name, + agent_id=context.agent.id, + evidence=dict(issue.evidence), + confidence="high", + provenance_kind="static_declaration", + source=SourceReference( + type="baseline", + ref=str(issue.evidence.get("baseline_path")) + if "baseline_path" in issue.evidence + else None, + ), + recommendation=_KIND_TO_RECOMMENDATION[issue.kind], + blocks_release=blocks_release, + ) + ) + return findings + + +def has_hash_mismatch(issues: list[BaselineIntegrityIssue]) -> bool: + """Return True iff any issue maps to SHIP-BASELINE-INTEGRITY-MISMATCH. + + The standalone ``agents-shipgate baseline verify`` command uses this + to decide whether to exit with code 6 (in strict mode) or 0 (warn). + """ + return any( + _KIND_TO_CHECK_ID[issue.kind] == "SHIP-BASELINE-INTEGRITY-MISMATCH" + for issue in issues + ) diff --git a/src/agents_shipgate/checks/registry.py b/src/agents_shipgate/checks/registry.py index 4c9dca9..6f879aa 100644 --- a/src/agents_shipgate/checks/registry.py +++ b/src/agents_shipgate/checks/registry.py @@ -196,6 +196,14 @@ def _meta(**kwargs: object) -> CheckMetadata: _meta(id="SHIP-MANIFEST-STALE-RISK-OVERRIDE", category="manifest", default_severity="medium", description="A risk override references a missing tool.", rationale="Risk overrides should not outlive the tool they describe.", fires_when="risk_overrides.tools contains a tool that is not loaded.", evidence_fields=["tool"], recommendation="Remove stale risk overrides or update them to current tool names."), _meta(id="SHIP-MANIFEST-HIGH-RISK-OWNER-MISSING", category="manifest", default_severity="high", description="Production high-risk tool has no declared owner.", rationale="High-risk production tools need an accountable owning team for review and remediation.", fires_when="environment.target is production_like or production and a high-risk tool lacks owner metadata.", evidence_fields=["environment", "risk_tags"], recommendation="Declare an owner for each high-risk production tool."), _meta(id="SHIP-MANIFEST-UNUSED-SCOPE", category="manifest", default_severity="medium", description="Manifest declares permission scopes unused by loaded tools.", rationale="Unused permissions weaken least-privilege review and often indicate stale config.", fires_when="permissions.scopes includes a scope not required by any loaded tool.", evidence_fields=["scope", "tool_scopes"], recommendation="Remove unused scopes or add tool metadata showing why they are required."), + # v0.5 baseline-integrity checks (M2). Emitted only when a baseline + # is in use and `baseline.integrity_mode` is `warn` or `strict`. In + # strict mode, SHIP-BASELINE-INTEGRITY-MISMATCH carries + # `blocks_release=true` and `agents-shipgate baseline verify` exits + # with code 6. See docs/baseline-integrity.md. + _meta(id="SHIP-BASELINE-INTEGRITY-MISMATCH", category="baseline", default_severity="critical", description="Baseline file integrity check failed.", rationale="The baseline JSON has been edited outside `agents-shipgate baseline save`, lacks an audit log row, or references a run_id not present in the audit log. A release gate that accepts silent baseline edits cannot claim to govern technical debt.", fires_when="The baseline file's SHA-256 differs from the latest audit log entry's hash_after, the audit log is missing or empty, an entry's provenance.run_id is not in the audit log, or an entry pre-dates v0.5 and lacks provenance entirely.", evidence_fields=["fingerprint", "check_id", "tool_name", "kind", "expected_hash", "computed_hash", "audit_log_path", "latest_audit_run_id"], recommendation="Re-run `agents-shipgate baseline save` to refresh the baseline and audit log, or `agents-shipgate baseline verify` for the full report. Investigate the diff before accepting."), + _meta(id="SHIP-BASELINE-ENTRY-EXPIRED", category="baseline", default_severity="high", description="Baseline entry's review window has expired.", rationale="Reviewer-set `provenance.expires` is the renewable consent for accepting technical debt. Past that date the entry needs a fresh review, not a silent extension.", fires_when="A baseline entry's `provenance.expires` is before today's UTC date.", evidence_fields=["fingerprint", "check_id", "tool_name", "expires", "days_overdue", "reason"], recommendation="Re-review the accepted debt and either remove the entry, fix the underlying finding, or extend `provenance.expires` with a new reason."), + _meta(id="SHIP-BASELINE-ENTRY-STALE", category="baseline", default_severity="low", description="Baseline entry no longer corresponds to an active finding or check ID.", rationale="Stale baseline entries hide intent — reviewers cannot tell whether the accepted debt was resolved or whether the check was renamed. Pruning keeps the baseline aligned with reality.", fires_when="A baseline entry references a deprecated check ID (an alias in LEGACY_CHECK_ID_ALIASES) or did not match any active scan finding (resolved_count contribution).", evidence_fields=["fingerprint", "check_id", "tool_name", "kind", "replacement_check_ids"], recommendation="Remove resolved entries via `agents-shipgate baseline save`, or update deprecated check IDs to their canonical replacements."), ] diff --git a/src/agents_shipgate/cli/_register_baseline.py b/src/agents_shipgate/cli/_register_baseline.py index 8f8cd42..3313d02 100644 --- a/src/agents_shipgate/cli/_register_baseline.py +++ b/src/agents_shipgate/cli/_register_baseline.py @@ -1,14 +1,20 @@ from __future__ import annotations +import json from pathlib import Path import typer +from agents_shipgate.checks.baseline_integrity import has_hash_mismatch from agents_shipgate.cli.scan import run_scan -from agents_shipgate.core.baseline import write_baseline +from agents_shipgate.core.baseline import verify_baseline, write_baseline from agents_shipgate.core.errors import AgentsShipgateError, ConfigError, InputParseError from agents_shipgate.core.logging import configure_logging +# Exit code 6: baseline integrity failure. Reserved by M2; documented +# in `.well-known/agents-shipgate.json` and STABILITY.md. +BASELINE_INTEGRITY_EXIT_CODE = 6 + def register(app: typer.Typer) -> None: baseline_app = typer.Typer(help="Manage local finding baselines.") @@ -49,5 +55,114 @@ def baseline_save( raise typer.Exit(4) from exc typer.echo(f"Wrote {out}") typer.echo(f"Findings saved: {len(baseline.findings)}") + audit_log = out.parent / "baseline-audit.log" + typer.echo(f"Audit log: {audit_log}") + + @baseline_app.command("verify") + def baseline_verify( + baseline: Path = typer.Option( + Path(".agents-shipgate/baseline.json"), + "--baseline", + help="Baseline JSON path to verify.", + ), + audit_log: Path | None = typer.Option( + None, + "--audit-log", + help=( + "Audit log path. Defaults to " + "/baseline-audit.log, matching `baseline save`." + ), + ), + strict: bool = typer.Option( + False, + "--strict", + help=( + "Exit with code 6 if SHIP-BASELINE-INTEGRITY-MISMATCH is " + "detected. Without --strict the command still reports issues " + "but exits 0 unless an underlying input error occurs." + ), + ), + json_output: bool = typer.Option( + False, + "--json", + help="Emit issues as JSON on stdout instead of human text.", + ), + verbose: bool = typer.Option(False, "--verbose", help="Enable debug logs."), + ) -> None: + """Verify baseline file integrity against the audit log. + + Detects hand-edits (hash mismatch), entries without audit + provenance, legacy v0.2-0.4 entries lacking provenance, + expired review windows, and deprecated check IDs. The + scan-aware "resolved-but-not-pruned" check is not part of + `verify` (it requires a scan) — use `agents-shipgate scan + --baseline X` for the complete picture. + + Exit codes: + 0 - clean, or non-mismatch issues only (without --strict). + 3 - baseline file missing or unparseable. + 6 - integrity mismatch detected and --strict was set. + """ + try: + configure_logging(verbose=verbose) + issues = verify_baseline(baseline, audit_log) + except InputParseError as exc: + typer.echo(f"Input parsing error: {exc}", err=True) + raise typer.Exit(3) from exc + except AgentsShipgateError as exc: + typer.echo(f"Agents Shipgate error: {exc}", err=True) + raise typer.Exit(4) from exc + if json_output: + payload = { + "baseline_path": str(baseline), + "audit_log_path": ( + str(audit_log) + if audit_log is not None + else str(baseline.parent / "baseline-audit.log") + ), + "issue_count": len(issues), + "issues": [ + { + "kind": issue.kind, + "default_severity": issue.default_severity, + "title": issue.title, + "fingerprint": issue.fingerprint, + "check_id": issue.check_id, + "tool_name": issue.tool_name, + "evidence": _coerce_evidence(issue.evidence), + } + for issue in issues + ], + } + typer.echo(json.dumps(payload, indent=2, default=str)) + else: + if not issues: + typer.echo(f"Baseline OK: {baseline}") + else: + typer.echo(f"Baseline {baseline}: {len(issues)} issue(s)") + for issue in issues: + typer.echo( + f" [{issue.default_severity}] {issue.kind}: " + f"{issue.title}" + ) + if strict and has_hash_mismatch(issues): + raise typer.Exit(BASELINE_INTEGRITY_EXIT_CODE) app.add_typer(baseline_app, name="baseline") + + +def _coerce_evidence(evidence: dict[str, object]) -> dict[str, object]: + """Make evidence JSON-serializable. + + Most values are already strings/ints/lists, but ``date`` objects + from ``BaselineProvenance.expires`` need explicit isoformat-ing. + Keeping this here rather than in core/ avoids leaking CLI concerns + into the data layer. + """ + return {key: _coerce_value(value) for key, value in evidence.items()} + + +def _coerce_value(value: object) -> object: + if hasattr(value, "isoformat"): + return value.isoformat() # type: ignore[attr-defined] + return value diff --git a/src/agents_shipgate/cli/scan.py b/src/agents_shipgate/cli/scan.py index 414bdd4..5fb6e72 100644 --- a/src/agents_shipgate/cli/scan.py +++ b/src/agents_shipgate/cli/scan.py @@ -6,12 +6,19 @@ import os from pathlib import Path +from agents_shipgate.checks.baseline_integrity import build_findings as build_integrity_findings from agents_shipgate.checks.registry import run_checks from agents_shipgate.ci.github_summary import write_github_step_summary from agents_shipgate.config.loader import load_manifest from agents_shipgate.config.schema import AgentsShipgateManifest, ToolSourceConfig from agents_shipgate.core.artifacts import ArtifactBag -from agents_shipgate.core.baseline import apply_baseline, load_baseline +from agents_shipgate.core.baseline import ( + apply_baseline, + baseline_resolved_fingerprints, + load_baseline, + verify_baseline, +) +from agents_shipgate.core.baseline_audit import DEFAULT_AUDIT_LOG_PATH from agents_shipgate.core.context import ScanContext from agents_shipgate.core.errors import ConfigError, InputParseError from agents_shipgate.core.findings import ( @@ -270,6 +277,57 @@ def run_scan( baseline_file, display_path=baseline_display_path, ) + # v0.5 baseline-integrity (M2). Runs only when a baseline is in + # use and `baseline.integrity_mode` is `warn` or `strict`. In + # `off` mode (escape hatch for repos that have not migrated yet) + # the check is skipped entirely. Integrity findings flow through + # the standard report pipeline but bypass suppression / severity + # overrides — silencing tamper detection would defeat the trust + # property the audit log defends. + integrity_mode = manifest.baseline.integrity_mode + if integrity_mode != "off" and baseline_path is not None: + audit_log_path = _resolve_audit_log_path( + manifest, baseline_path, base_dir + ) + try: + static_issues = verify_baseline( + baseline_path, audit_log_path + ) + except InputParseError as exc: + # Audit log corruption is itself an integrity signal — + # surface it as a finding rather than failing the scan. + logger.warning( + "baseline integrity verification failed", + extra={ + "agents_shipgate_baseline_path": str(baseline_path), + "agents_shipgate_error": str(exc), + }, + ) + static_issues = [] + warnings.append( + f"Baseline integrity check skipped: {exc}" + ) + stale_issues = baseline_resolved_fingerprints( + findings, baseline_file + ) + integrity_findings = build_integrity_findings( + static_issues + stale_issues, + context=context, + integrity_mode=integrity_mode, + ) + if integrity_findings: + # Assign IDs to the new findings. assign_finding_ids is + # idempotent — already-IDed findings get re-computed + # against the same evidence, producing identical + # fingerprints. annotate_remediation re-runs cleanly + # because its inputs are per-finding state, not + # cross-finding aggregates. + findings.extend(integrity_findings) + assign_finding_ids(findings) + annotate_remediation( + findings, + _check_metadata_lookup(plugins_enabled=plugins_enabled), + ) attach_action_surface_finding_summary(action_surface_diff, findings) logger.debug( "checks completed", @@ -902,6 +960,30 @@ def _relative_display_path(path: Path, base_dir: Path) -> str: return rel +def _resolve_audit_log_path( + manifest: AgentsShipgateManifest, + baseline_path: Path, + base_dir: Path, +) -> Path: + """Resolve the baseline audit log path. + + Resolution order: + 1. ``manifest.baseline.audit_log`` if set (relative paths resolved + against ``base_dir``, the manifest's directory). + 2. Otherwise ``/baseline-audit.log`` — + co-located with the baseline JSON. This matches the default that + ``write_baseline`` uses, so save/verify see the same file + without configuration. + """ + override = manifest.baseline.audit_log + if override: + candidate = Path(override) + if not candidate.is_absolute(): + candidate = base_dir / candidate + return candidate + return baseline_path.parent / DEFAULT_AUDIT_LOG_PATH.name + + def _check_metadata_lookup( *, plugins_enabled: bool | None ) -> dict: diff --git a/src/agents_shipgate/config/schema.py b/src/agents_shipgate/config/schema.py index 64c889b..ccc5e9d 100644 --- a/src/agents_shipgate/config/schema.py +++ b/src/agents_shipgate/config/schema.py @@ -736,6 +736,34 @@ def require_unique_action_declarations(self) -> ActionSurfaceConfig: return self +BaselineIntegrityMode = Literal["off", "warn", "strict"] + + +class BaselineConfig(BaseModel): + """Manifest knob governing v0.5 baseline integrity checks. + + ``integrity_mode`` decides what happens when ``scan`` (with + ``--baseline``) detects an integrity issue: + + - ``off``: no integrity checks run (back-compat escape hatch for + repos that have not migrated to v0.5 baselines yet). + - ``warn`` (default in v0.17): integrity findings are emitted but + ``blocks_release`` is false; release decision is unaffected. + - ``strict``: ``SHIP-BASELINE-INTEGRITY-MISMATCH`` findings get + ``blocks_release=true`` and ``agents-shipgate baseline verify`` + exits with code 6 on the same condition. Recommended target for + v0.18. + + ``audit_log`` overrides the default audit log path (relative to + the baseline file's directory). Usually left at its default. + """ + + model_config = STRICT_MODEL_CONFIG + + integrity_mode: BaselineIntegrityMode = "warn" + audit_log: str | None = None + + class CiConfig(BaseModel): model_config = STRICT_MODEL_CONFIG @@ -796,6 +824,7 @@ class AgentsShipgateManifest(BaseModel): checks: ChecksConfig = Field(default_factory=ChecksConfig) action_surface: ActionSurfaceConfig = Field(default_factory=ActionSurfaceConfig) ci: CiConfig = Field(default_factory=CiConfig) + baseline: BaselineConfig = Field(default_factory=BaselineConfig) output: OutputConfig = Field(default_factory=OutputConfig) @model_validator(mode="after") diff --git a/src/agents_shipgate/core/baseline.py b/src/agents_shipgate/core/baseline.py index d10a142..dfc6ec5 100644 --- a/src/agents_shipgate/core/baseline.py +++ b/src/agents_shipgate/core/baseline.py @@ -1,12 +1,27 @@ from __future__ import annotations -from datetime import UTC, datetime +from dataclasses import dataclass, field +from datetime import date from pathlib import Path from typing import Literal from pydantic import BaseModel, ConfigDict, Field, ValidationError -from agents_shipgate.core.check_ids import expands_to_check_id +from agents_shipgate import __version__ as _SCANNER_VERSION +from agents_shipgate.core.baseline_audit import ( + DEFAULT_AUDIT_LOG_PATH, + BaselineAuditEntry, + append_audit_entry, + compute_baseline_hash, + compute_baseline_hash_from_file, + latest_audit_entry, + read_audit_log, + utc_now_isoformat, +) +from agents_shipgate.core.check_ids import ( + LEGACY_CHECK_ID_ALIASES, + expands_to_check_id, +) from agents_shipgate.core.errors import InputParseError from agents_shipgate.core.models import ( ActionSurfaceFacts, @@ -17,7 +32,34 @@ ToolSurfaceFacts, ) -BASELINE_SCHEMA_VERSION = "0.4" +BASELINE_SCHEMA_VERSION = "0.5" +# v0.5 self-describing entry provenance. Older versions (0.2/0.3/0.4) +# load with `BaselineFinding.provenance = None`; the integrity check +# then flags them as `SHIP-BASELINE-ENTRY-STALE` (kind="legacy_no_provenance") +# in warn/strict modes. Re-saving with `baseline save` upgrades the +# file to 0.5 and stamps provenance on every entry. + + +class BaselineProvenance(BaseModel): + """Self-describing record of when and why a baseline entry was added. + + Written by `agents-shipgate baseline save` for every new fingerprint + in the baseline. Existing fingerprints keep their original provenance + on re-save; only newly-added ones get a fresh `recorded_at` / `run_id`. + + `expires` is optional and reviewer-controlled: when set, the + integrity check emits `SHIP-BASELINE-ENTRY-EXPIRED` past that date. + `reason` is free-form; reviewers should set it when the entry was + deliberately accepted (not just snapshotted). + """ + + model_config = ConfigDict(extra="forbid") + + scanner_version: str + run_id: str + recorded_at: str + reason: str | None = None + expires: date | None = None class BaselineFinding(BaseModel): @@ -28,12 +70,16 @@ class BaselineFinding(BaseModel): tool_name: str | None = None severity: Severity title: str + # v0.5 additive: when None, the entry pre-dates the v0.5 provenance + # contract (loaded from 0.2/0.3/0.4) or was constructed by a test + # helper. Re-saving via `baseline save` populates it. + provenance: BaselineProvenance | None = None class BaselineFile(BaseModel): model_config = ConfigDict(extra="forbid") - schema_version: Literal["0.2", "0.3", "0.4"] = BASELINE_SCHEMA_VERSION + schema_version: Literal["0.2", "0.3", "0.4", "0.5"] = BASELINE_SCHEMA_VERSION project: dict[str, object] = Field(default_factory=dict) agent: dict[str, object] = Field(default_factory=dict) created_at: str @@ -44,39 +90,146 @@ class BaselineFile(BaseModel): notes: list[str] = Field(default_factory=list) -def baseline_from_report(report: ReadinessReport) -> BaselineFile: - return BaselineFile( - project=report.project, - agent=report.agent, - created_at=_utc_now(), - source_report_run_id=report.run_id, - tool_surface_facts=report.tool_surface_facts, - action_surface_facts=report.action_surface_facts, - findings=[ +def baseline_from_report( + report: ReadinessReport, + *, + scanner_version: str | None = None, + prior_baseline: BaselineFile | None = None, + now: str | None = None, +) -> BaselineFile: + """Build a v0.5 baseline file from a scan report. + + Provenance handling: + + - When ``prior_baseline`` is provided and an entry's fingerprint + already appears there with a populated ``provenance``, the prior + provenance is preserved verbatim — only newly-added fingerprints + get a fresh provenance stamp. This keeps re-saves idempotent for + the audit log and lets reviewer-set ``reason`` / ``expires`` survive + subsequent saves. + - When the prior entry has no provenance (loaded from a 0.2/0.3/0.4 + baseline), the entry is upgraded with a fresh provenance stamp. + This is the migration path: re-saving a legacy baseline stamps + provenance on every entry as of the next scan. + + ``scanner_version`` and ``now`` are injectable for deterministic + testing. In production they default to the package version and UTC + now. + """ + scanner_version = scanner_version or _SCANNER_VERSION + recorded_at = now or utc_now_isoformat() + prior_by_fp: dict[str, BaselineFinding] = { + entry.fingerprint: entry + for entry in (prior_baseline.findings if prior_baseline else []) + } + findings: list[BaselineFinding] = [] + for finding in _active_findings(report.findings): + fingerprint = finding.fingerprint or finding.id + if not fingerprint: + continue + prior_entry = prior_by_fp.get(fingerprint) + if prior_entry is not None and prior_entry.provenance is not None: + provenance = prior_entry.provenance + else: + provenance = BaselineProvenance( + scanner_version=scanner_version, + run_id=report.run_id, + recorded_at=recorded_at, + ) + findings.append( BaselineFinding( - fingerprint=finding.fingerprint or finding.id or "", + fingerprint=fingerprint, check_id=finding.check_id, tool_name=finding.tool_name, severity=finding.severity, title=finding.title, + provenance=provenance, ) - for finding in _active_findings(report.findings) - if finding.fingerprint or finding.id - ], + ) + return BaselineFile( + project=report.project, + agent=report.agent, + created_at=recorded_at, + source_report_run_id=report.run_id, + tool_surface_facts=report.tool_surface_facts, + action_surface_facts=report.action_surface_facts, + findings=findings, ) -def write_baseline(report: ReadinessReport, path: Path) -> BaselineFile: - baseline = baseline_from_report(report) - baseline = _preserve_created_at_when_content_matches(baseline, path) +def write_baseline( + report: ReadinessReport, + path: Path, + *, + scanner_version: str | None = None, + audit_log_path: Path | None = None, +) -> BaselineFile: + """Write a v0.5 baseline + append a row to the audit log. + + The audit log defaults to ``/baseline-audit.log``. Pass + ``audit_log_path`` explicitly to override (tests use a tmp path). + The audit row records the SHA-256 of the file before and after the + write so ``verify_baseline`` can detect hand-edits. + + When no prior baseline exists, ``hash_before`` is ``None``. Re-saves + that change nothing still produce an audit row with empty + ``added_fingerprints`` / ``removed_fingerprints`` — that is the + intended ledger semantics. + """ + scanner_version = scanner_version or _SCANNER_VERSION + prior_baseline = _try_load_baseline(path) + baseline = baseline_from_report( + report, + scanner_version=scanner_version, + prior_baseline=prior_baseline, + ) + baseline = _preserve_created_at_when_content_matches(baseline, prior_baseline) + hash_before: str | None = None + if path.exists(): + try: + hash_before = compute_baseline_hash_from_file(path) + except OSError: + hash_before = None path.parent.mkdir(parents=True, exist_ok=True) - path.write_text( - baseline.model_dump_json(indent=2, exclude_none=False) + "\n", - encoding="utf-8", + canonical = baseline.model_dump_json(indent=2, exclude_none=False) + "\n" + path.write_text(canonical, encoding="utf-8") + hash_after = compute_baseline_hash(canonical) + if audit_log_path is None: + audit_log_path = path.parent / "baseline-audit.log" + prior_fps: set[str] = { + entry.fingerprint + for entry in (prior_baseline.findings if prior_baseline else []) + } + new_fps: set[str] = {entry.fingerprint for entry in baseline.findings} + audit_entry = BaselineAuditEntry( + timestamp=utc_now_isoformat(), + run_id=report.run_id, + scanner_version=scanner_version, + baseline_path=str(path), + hash_before=hash_before, + hash_after=hash_after, + added_fingerprints=sorted(new_fps - prior_fps), + removed_fingerprints=sorted(prior_fps - new_fps), ) + append_audit_entry(audit_log_path, audit_entry) return baseline +def _try_load_baseline(path: Path) -> BaselineFile | None: + """Best-effort prior-baseline load; returns ``None`` if absent or invalid. + + A corrupt prior baseline must not block a save — that would brick + the repo. We treat it as "no prior provenance" and let the new save + upgrade the file. + """ + if not path.exists(): + return None + try: + return load_baseline(path) + except InputParseError: + return None + + def load_baseline(path: Path) -> BaselineFile: if not path.exists(): raise InputParseError(f"Baseline file not found: {path}") @@ -147,23 +300,372 @@ def _legacy_baseline_match( return None -def _utc_now() -> str: - return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") - - def _preserve_created_at_when_content_matches( - baseline: BaselineFile, path: Path + baseline: BaselineFile, prior_baseline: BaselineFile | None ) -> BaselineFile: - if not path.exists(): - return baseline - try: - existing = load_baseline(path) - except InputParseError: + """Keep the existing ``created_at`` when content is otherwise identical. + + This makes re-saves byte-stable when nothing has changed: identical + findings, identical provenance, identical surface facts. The audit + log still records the save, but the on-disk file does not churn. + """ + if prior_baseline is None: return baseline - if _baseline_content_identity(existing) != _baseline_content_identity(baseline): + if _baseline_content_identity(prior_baseline) != _baseline_content_identity( + baseline + ): return baseline - return baseline.model_copy(update={"created_at": existing.created_at}) + return baseline.model_copy(update={"created_at": prior_baseline.created_at}) def _baseline_content_identity(baseline: BaselineFile) -> dict[str, object]: return baseline.model_dump(mode="json", exclude={"created_at"}) + + +# --- Integrity verification ---------------------------------------------- +# +# `verify_baseline()` is the static-only integrity check. It reads the +# baseline JSON and the append-only audit log and reports issues that +# can be detected without re-running a scan: hash mismatch, missing +# audit log, entries without audit provenance, legacy-version entries +# without provenance at all, expired entries, deprecated check IDs. +# +# `baseline_resolved_fingerprints()` is the scan-aware companion. Once +# `apply_baseline()` has matched current findings against the baseline, +# any baseline fingerprint that did not match is a candidate for +# pruning (the underlying check no longer fires). The integrity check +# module combines both into the SHIP-BASELINE-ENTRY-STALE finding so +# reviewers see resolved entries alongside the deprecated-id ones. +# +# Neither function emits Findings directly — they return typed +# `BaselineIntegrityIssue` records. The conversion to `Finding` lives +# in `checks/baseline_integrity.py` so the check engine remains the +# sole producer of report findings. + +BaselineIntegrityIssueKind = Literal[ + "hash_mismatch", + "missing_audit_log", + "entry_no_audit", + "legacy_no_provenance", + "entry_expired", + "deprecated_check_id", + "resolved_not_pruned", +] + + +@dataclass(frozen=True) +class BaselineIntegrityIssue: + """One concern discovered while verifying baseline integrity. + + The integrity check maps each kind to one of the three new check IDs: + + - ``hash_mismatch`` / ``missing_audit_log`` / ``entry_no_audit`` / + ``legacy_no_provenance`` → ``SHIP-BASELINE-INTEGRITY-MISMATCH`` + - ``entry_expired`` → ``SHIP-BASELINE-ENTRY-EXPIRED`` + - ``deprecated_check_id`` / ``resolved_not_pruned`` → + ``SHIP-BASELINE-ENTRY-STALE`` + + Severity at this layer is the *default* for the kind. The integrity + check module applies any manifest severity overrides and the + ``integrity_mode`` flag (warn/strict) before emitting findings. + """ + + kind: BaselineIntegrityIssueKind + default_severity: Severity + title: str + evidence: dict[str, object] = field(default_factory=dict) + fingerprint: str | None = None + check_id: str | None = None + tool_name: str | None = None + + +def verify_baseline( + baseline_path: Path, + audit_log_path: Path | None = None, + *, + today: date | None = None, +) -> list[BaselineIntegrityIssue]: + """Static-only integrity check for a baseline file. + + Returns a list of :class:`BaselineIntegrityIssue` describing every + integrity concern detected without re-running a scan. Callers that + also want scan-aware stale detection should additionally consult + :func:`baseline_resolved_fingerprints`. + + The order of issues is intentional — file-level concerns first + (hash mismatch, missing audit log), then per-entry concerns ordered + by baseline-entry order. The CLI surfaces this ordering to humans. + + ``today`` is injectable for deterministic testing of expiry. + """ + if not baseline_path.exists(): + raise InputParseError(f"Baseline file not found: {baseline_path}") + baseline = load_baseline(baseline_path) + log_path = audit_log_path or baseline_path.parent / DEFAULT_AUDIT_LOG_PATH.name + issues: list[BaselineIntegrityIssue] = [] + issues.extend(_verify_audit_log_alignment(baseline_path, log_path, baseline)) + issues.extend(_verify_entry_provenance(baseline, log_path)) + issues.extend(_verify_entry_expiry(baseline, today or date.today())) + issues.extend(_verify_deprecated_check_ids(baseline)) + return issues + + +def baseline_resolved_fingerprints( + findings: list[Finding], + baseline: BaselineFile, +) -> list[BaselineIntegrityIssue]: + """Scan-aware companion to ``verify_baseline``. + + Identifies baseline entries that did not match any active finding + in the current scan — i.e. ``baseline.resolved_count`` candidates. + Each becomes a ``resolved_not_pruned`` issue at low severity so + reviewers know to clean the baseline up. + + Direct mirror of the resolved-count computation in + :func:`apply_baseline`; we recompute here rather than threading the + set out of that function to keep its signature backward-compatible + with the existing public contract. + """ + baseline_fingerprints = { + entry.fingerprint for entry in baseline.findings if entry.fingerprint + } + active: set[str] = set() + legacy_matched: set[str] = set() + for finding in findings: + if finding.suppressed: + continue + fingerprint = finding.fingerprint or finding.id + if not fingerprint: + continue + active.add(fingerprint) + if fingerprint in baseline_fingerprints: + continue + match = _legacy_baseline_match(finding, baseline.findings) + if match is not None: + legacy_matched.add(match.fingerprint) + resolved = baseline_fingerprints - active - legacy_matched + if not resolved: + return [] + entries_by_fp = {entry.fingerprint: entry for entry in baseline.findings} + issues: list[BaselineIntegrityIssue] = [] + for fingerprint in sorted(resolved): + entry = entries_by_fp.get(fingerprint) + check_id = entry.check_id if entry else None + tool_name = entry.tool_name if entry else None + issues.append( + BaselineIntegrityIssue( + kind="resolved_not_pruned", + default_severity="low", + title=( + f"Baseline entry for {check_id or 'unknown check'} " + f"on {tool_name or 'unknown tool'} no longer fires; " + "consider pruning." + ), + evidence={ + "fingerprint": fingerprint, + "check_id": check_id, + "tool_name": tool_name, + "kind": "resolved_not_pruned", + }, + fingerprint=fingerprint, + check_id=check_id, + tool_name=tool_name, + ) + ) + return issues + + +def _verify_audit_log_alignment( + baseline_path: Path, + log_path: Path, + baseline: BaselineFile, +) -> list[BaselineIntegrityIssue]: + """Hash baseline vs. latest audit entry's hash_after; report mismatch. + + A baseline with zero findings and no audit log is a fresh / unsaved + state — not an integrity violation. Only flag when the baseline + contains entries (i.e., someone took the trouble to populate it) + but no audit row exists, or the most recent audit row's + ``hash_after`` disagrees with the file on disk. + """ + if not log_path.exists(): + if not baseline.findings: + return [] + return [ + BaselineIntegrityIssue( + kind="missing_audit_log", + default_severity="critical", + title="Baseline has entries but no audit log", + evidence={ + "baseline_path": str(baseline_path), + "audit_log_path": str(log_path), + "entry_count": len(baseline.findings), + "kind": "missing_audit_log", + }, + ) + ] + latest = latest_audit_entry(log_path) + if latest is None: + if not baseline.findings: + return [] + return [ + BaselineIntegrityIssue( + kind="missing_audit_log", + default_severity="critical", + title="Audit log exists but is empty", + evidence={ + "baseline_path": str(baseline_path), + "audit_log_path": str(log_path), + "entry_count": len(baseline.findings), + "kind": "missing_audit_log", + }, + ) + ] + on_disk_hash = compute_baseline_hash_from_file(baseline_path) + if on_disk_hash != latest.hash_after: + return [ + BaselineIntegrityIssue( + kind="hash_mismatch", + default_severity="critical", + title="Baseline file hash does not match latest audit entry", + evidence={ + "baseline_path": str(baseline_path), + "audit_log_path": str(log_path), + "expected_hash": latest.hash_after, + "computed_hash": on_disk_hash, + "latest_audit_run_id": latest.run_id, + "latest_audit_timestamp": latest.timestamp, + "kind": "hash_mismatch", + }, + ) + ] + return [] + + +def _verify_entry_provenance( + baseline: BaselineFile, log_path: Path +) -> list[BaselineIntegrityIssue]: + """Each entry's provenance.run_id should appear in the audit log.""" + issues: list[BaselineIntegrityIssue] = [] + audit_run_ids: set[str] | None = None + for entry in baseline.findings: + if entry.provenance is None: + issues.append( + BaselineIntegrityIssue( + kind="legacy_no_provenance", + default_severity="critical", + title=( + f"Baseline entry for {entry.check_id} lacks v0.5 " + "provenance; re-run `agents-shipgate baseline save` " + "to migrate." + ), + evidence={ + "fingerprint": entry.fingerprint, + "check_id": entry.check_id, + "tool_name": entry.tool_name, + "kind": "legacy_no_provenance", + }, + fingerprint=entry.fingerprint, + check_id=entry.check_id, + tool_name=entry.tool_name, + ) + ) + continue + if audit_run_ids is None: + audit_run_ids = {row.run_id for row in read_audit_log(log_path)} + if entry.provenance.run_id not in audit_run_ids: + issues.append( + BaselineIntegrityIssue( + kind="entry_no_audit", + default_severity="critical", + title=( + f"Baseline entry for {entry.check_id} references " + f"run_id {entry.provenance.run_id!r} that is not in " + "the audit log." + ), + evidence={ + "fingerprint": entry.fingerprint, + "check_id": entry.check_id, + "tool_name": entry.tool_name, + "run_id": entry.provenance.run_id, + "kind": "entry_no_audit", + }, + fingerprint=entry.fingerprint, + check_id=entry.check_id, + tool_name=entry.tool_name, + ) + ) + return issues + + +def _verify_entry_expiry( + baseline: BaselineFile, today: date +) -> list[BaselineIntegrityIssue]: + issues: list[BaselineIntegrityIssue] = [] + for entry in baseline.findings: + if entry.provenance is None or entry.provenance.expires is None: + continue + if entry.provenance.expires < today: + days_overdue = (today - entry.provenance.expires).days + issues.append( + BaselineIntegrityIssue( + kind="entry_expired", + default_severity="high", + title=( + f"Baseline entry for {entry.check_id} expired " + f"{days_overdue} day(s) ago " + f"({entry.provenance.expires.isoformat()})." + ), + evidence={ + "fingerprint": entry.fingerprint, + "check_id": entry.check_id, + "tool_name": entry.tool_name, + "expires": entry.provenance.expires.isoformat(), + "days_overdue": days_overdue, + "reason": entry.provenance.reason, + "kind": "entry_expired", + }, + fingerprint=entry.fingerprint, + check_id=entry.check_id, + tool_name=entry.tool_name, + ) + ) + return issues + + +def _verify_deprecated_check_ids( + baseline: BaselineFile, +) -> list[BaselineIntegrityIssue]: + """Entries whose ``check_id`` is a deprecated alias. + + Aliases live in :data:`LEGACY_CHECK_ID_ALIASES`. They still match + findings at scan time (via :func:`expands_to_check_id`), but new + baselines should refer to the canonical IDs so reviewers see the + real check. + """ + issues: list[BaselineIntegrityIssue] = [] + for entry in baseline.findings: + if entry.check_id not in LEGACY_CHECK_ID_ALIASES: + continue + replacements = sorted(LEGACY_CHECK_ID_ALIASES[entry.check_id]) + issues.append( + BaselineIntegrityIssue( + kind="deprecated_check_id", + default_severity="low", + title=( + f"Baseline entry uses deprecated check_id " + f"{entry.check_id!r}; use one of {replacements!r}." + ), + evidence={ + "fingerprint": entry.fingerprint, + "check_id": entry.check_id, + "tool_name": entry.tool_name, + "replacement_check_ids": replacements, + "kind": "deprecated_check_id", + }, + fingerprint=entry.fingerprint, + check_id=entry.check_id, + tool_name=entry.tool_name, + ) + ) + return issues diff --git a/src/agents_shipgate/core/baseline_audit.py b/src/agents_shipgate/core/baseline_audit.py new file mode 100644 index 0000000..b4ced50 --- /dev/null +++ b/src/agents_shipgate/core/baseline_audit.py @@ -0,0 +1,157 @@ +"""Append-only audit log for baseline writes. + +Each call to ``agents-shipgate baseline save`` appends one JSON line to +``.agents-shipgate/baseline-audit.log`` describing the delta between the +prior baseline (if any) and the newly written one. Reviewers can: + +- ``git log`` the log to see when fingerprints joined the baseline. +- Use ``agents-shipgate baseline verify`` to confirm the on-disk baseline + matches the most recent audit entry's ``hash_after``. + +The audit log is **tamper-evident but not tamper-proof**. A well-resourced +adversary who edits both the baseline JSON and the audit log atomically +will defeat ``verify``; the goal is to make casual / accidental edits +observably wrong in code review. See ``docs/baseline-integrity.md``. + +Storage format: JSONL. Append-only is enforced by always opening with +``mode="a"``; the file is never rewritten in place. ``read_audit_log`` +returns entries in append order (oldest first). +""" + +from __future__ import annotations + +import hashlib +from datetime import UTC, datetime +from pathlib import Path + +from pydantic import BaseModel, ConfigDict, Field, ValidationError + +from agents_shipgate.core.errors import InputParseError + +AUDIT_LOG_SCHEMA_VERSION = "0.1" + +# Path conventions. The log lives next to the baseline JSON so a single +# ``.agents-shipgate/`` directory holds both. CLI callers can override +# both, but the default keeps them co-located. +DEFAULT_AUDIT_LOG_PATH = Path(".agents-shipgate") / "baseline-audit.log" + + +class BaselineAuditEntry(BaseModel): + """One row in ``baseline-audit.log`` describing a single save. + + ``hash_before`` is the SHA-256 of the prior baseline file's canonical + content (see :func:`compute_baseline_hash`) or ``None`` if this was + the first save. ``hash_after`` is the new file's hash. Verification + walks the log to find the most recent entry and compares its + ``hash_after`` against the current baseline hash. + + ``added_fingerprints`` / ``removed_fingerprints`` describe the delta + so reviewers can scan the log without diffing files. They are + redundant with the JSON contents but cheap to record. + """ + + model_config = ConfigDict(extra="forbid") + + audit_schema_version: str = AUDIT_LOG_SCHEMA_VERSION + timestamp: str + run_id: str + scanner_version: str + baseline_path: str + hash_before: str | None + hash_after: str + added_fingerprints: list[str] = Field(default_factory=list) + removed_fingerprints: list[str] = Field(default_factory=list) + + +def compute_baseline_hash(canonical_json: str) -> str: + """SHA-256 of canonical baseline JSON. + + ``canonical_json`` should be the exact bytes that ``write_baseline`` + writes to disk. We hash the literal file content rather than a model + dump so that ``verify_baseline`` can re-read the file and confirm + byte-equivalence without re-serializing. + """ + return "sha256:" + hashlib.sha256(canonical_json.encode("utf-8")).hexdigest() + + +def compute_baseline_hash_from_file(path: Path) -> str: + """SHA-256 of a baseline file's exact on-disk bytes. + + Used by ``verify_baseline`` to detect any hand-edits. Reads the file + bytes directly so trailing whitespace / line endings / re-ordering + are all detected. + """ + return compute_baseline_hash(path.read_text(encoding="utf-8")) + + +def append_audit_entry(log_path: Path, entry: BaselineAuditEntry) -> None: + """Append one JSON line to the audit log. + + Creates the parent directory if needed. Each line is the entry's + ``model_dump_json()`` with a trailing newline. The function never + rewrites existing content. + """ + log_path.parent.mkdir(parents=True, exist_ok=True) + line = entry.model_dump_json() + "\n" + with log_path.open("a", encoding="utf-8") as fh: + fh.write(line) + + +def read_audit_log(log_path: Path) -> list[BaselineAuditEntry]: + """Read all audit log entries in append order. + + Returns ``[]`` if the file does not exist. Raises + :class:`InputParseError` on malformed lines so callers can surface + a clean error rather than silently dropping rows. + """ + if not log_path.exists(): + return [] + entries: list[BaselineAuditEntry] = [] + text = log_path.read_text(encoding="utf-8") + for line_number, raw in enumerate(text.splitlines(), start=1): + if not raw.strip(): + continue + try: + entries.append(BaselineAuditEntry.model_validate_json(raw)) + except ValidationError as exc: + raise InputParseError( + f"Invalid baseline audit log entry at {log_path}:{line_number}: {exc}" + ) from exc + return entries + + +def latest_audit_entry(log_path: Path) -> BaselineAuditEntry | None: + """Return the most recently appended audit entry, or ``None``. + + The most recent entry's ``hash_after`` is the value + ``verify_baseline`` compares against the current baseline file's hash. + """ + entries = read_audit_log(log_path) + return entries[-1] if entries else None + + +def audit_entry_for_run( + log_path: Path, run_id: str +) -> BaselineAuditEntry | None: + """Return the audit entry whose ``run_id`` matches, or ``None``. + + Used by ``verify_baseline`` to confirm that every fingerprint's + ``provenance.run_id`` has a corresponding audit entry. Missing + correspondence produces ``SHIP-BASELINE-INTEGRITY-MISMATCH``. + """ + for entry in read_audit_log(log_path): + if entry.run_id == run_id: + return entry + return None + + +def utc_now_isoformat() -> str: + """ISO-8601 UTC timestamp with ``Z`` suffix; second precision. + + Used for both ``BaselineProvenance.recorded_at`` and + ``BaselineAuditEntry.timestamp`` so they always agree byte-for-byte + when both are stamped in the same save call. + """ + return datetime.now(UTC).replace(microsecond=0).isoformat().replace( + "+00:00", "Z" + ) diff --git a/tests/test_baseline_integrity.py b/tests/test_baseline_integrity.py new file mode 100644 index 0000000..7813965 --- /dev/null +++ b/tests/test_baseline_integrity.py @@ -0,0 +1,751 @@ +"""Tests for the v0.5 baseline integrity surface (M2). + +Layered: +- unit tests against ``core.baseline_audit`` (audit log primitives). +- unit tests against ``core.baseline.verify_baseline`` and + ``baseline_resolved_fingerprints`` using synthetic ``BaselineFile`` and + ``ReadinessReport`` instances. +- unit tests against ``checks.baseline_integrity.build_findings``. +- integration tests that drive a real scan with ``run_scan`` and a + tampered baseline, asserting integrity findings appear in the report. +- CLI tests for ``agents-shipgate baseline verify``. +""" + +from __future__ import annotations + +import json +from datetime import date, timedelta +from pathlib import Path + +from typer.testing import CliRunner + +from agents_shipgate.checks.baseline_integrity import ( + build_findings as build_integrity_findings, +) +from agents_shipgate.checks.baseline_integrity import ( + has_hash_mismatch, +) +from agents_shipgate.cli.main import app +from agents_shipgate.cli.scan import run_scan +from agents_shipgate.core.baseline import ( + BaselineFile, + BaselineFinding, + BaselineIntegrityIssue, + BaselineProvenance, + baseline_from_report, + baseline_resolved_fingerprints, + verify_baseline, + write_baseline, +) +from agents_shipgate.core.baseline_audit import ( + BaselineAuditEntry, + append_audit_entry, + compute_baseline_hash, + compute_baseline_hash_from_file, + latest_audit_entry, + read_audit_log, + utc_now_isoformat, +) +from agents_shipgate.core.context import ScanContext +from agents_shipgate.core.models import ( + Agent, + Finding, + ReadinessReport, + ReportSummary, + ToolSurfaceSummary, +) + +SAMPLE = Path("samples/support_refund_agent/shipgate.yaml") + + +# --- audit log primitives ------------------------------------------------- + + +def test_compute_baseline_hash_is_deterministic(): + canonical = '{"schema_version":"0.5"}\n' + assert compute_baseline_hash(canonical) == compute_baseline_hash(canonical) + assert compute_baseline_hash(canonical).startswith("sha256:") + assert compute_baseline_hash(canonical) != compute_baseline_hash( + canonical + " " + ) + + +def test_audit_log_append_and_read_round_trip(tmp_path): + log_path = tmp_path / "baseline-audit.log" + entry = BaselineAuditEntry( + timestamp=utc_now_isoformat(), + run_id="run_1", + scanner_version="0.10.0", + baseline_path=str(tmp_path / "baseline.json"), + hash_before=None, + hash_after="sha256:abc", + added_fingerprints=["fp_a", "fp_b"], + ) + append_audit_entry(log_path, entry) + entry2 = BaselineAuditEntry( + timestamp=utc_now_isoformat(), + run_id="run_2", + scanner_version="0.10.0", + baseline_path=str(tmp_path / "baseline.json"), + hash_before="sha256:abc", + hash_after="sha256:def", + added_fingerprints=["fp_c"], + removed_fingerprints=["fp_a"], + ) + append_audit_entry(log_path, entry2) + rows = read_audit_log(log_path) + assert [r.run_id for r in rows] == ["run_1", "run_2"] + assert latest_audit_entry(log_path).run_id == "run_2" + + +def test_audit_log_missing_returns_empty(tmp_path): + assert read_audit_log(tmp_path / "missing.log") == [] + assert latest_audit_entry(tmp_path / "missing.log") is None + + +# --- baseline_from_report provenance behavior ----------------------------- + + +def _stub_report(*finding_pairs: tuple[str, str]) -> ReadinessReport: + """Build a minimal ReadinessReport with the given (check_id, tool_name) findings. + + Fingerprints are assigned deterministically here so tests don't have to + re-run the full scan pipeline. + """ + findings = [] + for index, (check_id, tool_name) in enumerate(finding_pairs): + finding = Finding( + check_id=check_id, + title=f"{check_id} on {tool_name}", + severity="high", + category="test", + tool_name=tool_name, + evidence={"i": index}, + confidence="high", + provenance_kind="static_declaration", + recommendation="stub", + fingerprint=f"fp_test_{index:04d}", + id=f"fp_test_{index:04d}", + ) + findings.append(finding) + return ReadinessReport( + run_id="run_stub", + project={"name": "stub"}, + agent={"name": "test", "id": "agent:test"}, + environment={"target": "staging"}, + summary=ReportSummary(status="advisory_pass"), + tool_surface=ToolSurfaceSummary(total_tools=0, high_risk_tools=0), + findings=findings, + ) + + +def test_baseline_from_report_stamps_provenance_on_new_entries(): + report = _stub_report(("SHIP-X", "tool_a"), ("SHIP-Y", "tool_b")) + baseline = baseline_from_report( + report, scanner_version="9.9.9", now="2026-01-01T00:00:00Z" + ) + assert baseline.schema_version == "0.5" + assert all(entry.provenance is not None for entry in baseline.findings) + p = baseline.findings[0].provenance + assert p.scanner_version == "9.9.9" + assert p.run_id == "run_stub" + assert p.recorded_at == "2026-01-01T00:00:00Z" + + +def test_baseline_from_report_preserves_prior_provenance(): + """Re-saves must not rotate provenance.recorded_at for existing fingerprints. + + The audit-log-vs-baseline integrity story depends on stable + ``provenance.run_id`` per fingerprint; rotating it on every save + would defeat the entire mechanism. + """ + report = _stub_report(("SHIP-X", "tool_a")) + first = baseline_from_report( + report, scanner_version="1.0.0", now="2026-01-01T00:00:00Z" + ) + # Append a new finding for the second save + report2 = _stub_report(("SHIP-X", "tool_a"), ("SHIP-Y", "tool_b")) + second = baseline_from_report( + report2, + scanner_version="1.0.1", + prior_baseline=first, + now="2026-06-01T00:00:00Z", + ) + by_fp = {entry.fingerprint: entry for entry in second.findings} + # Existing entry: provenance preserved + assert by_fp["fp_test_0000"].provenance.scanner_version == "1.0.0" + assert by_fp["fp_test_0000"].provenance.recorded_at == "2026-01-01T00:00:00Z" + # New entry: fresh stamp + assert by_fp["fp_test_0001"].provenance.scanner_version == "1.0.1" + assert by_fp["fp_test_0001"].provenance.recorded_at == "2026-06-01T00:00:00Z" + + +def test_baseline_from_report_upgrades_legacy_entries_without_provenance(): + """Re-saving a v0.4 baseline (no provenance) stamps provenance fresh.""" + report = _stub_report(("SHIP-X", "tool_a")) + legacy_baseline = BaselineFile( + schema_version="0.4", + created_at="2025-12-01T00:00:00Z", + source_report_run_id="run_legacy", + findings=[ + BaselineFinding( + fingerprint="fp_test_0000", + check_id="SHIP-X", + tool_name="tool_a", + severity="high", + title="legacy", + provenance=None, + ) + ], + ) + upgraded = baseline_from_report( + report, + scanner_version="1.0.0", + prior_baseline=legacy_baseline, + now="2026-01-01T00:00:00Z", + ) + assert upgraded.findings[0].provenance is not None + assert upgraded.findings[0].provenance.recorded_at == "2026-01-01T00:00:00Z" + + +# --- write_baseline + audit log integration ------------------------------- + + +def test_write_baseline_creates_audit_log_first_save(tmp_path): + baseline_path = tmp_path / ".agents-shipgate" / "baseline.json" + audit_path = tmp_path / ".agents-shipgate" / "baseline-audit.log" + report = _stub_report(("SHIP-X", "tool_a"), ("SHIP-Y", "tool_b")) + write_baseline(report, baseline_path) + assert baseline_path.exists() + assert audit_path.exists() + rows = read_audit_log(audit_path) + assert len(rows) == 1 + row = rows[0] + assert row.hash_before is None + assert row.hash_after.startswith("sha256:") + assert row.run_id == "run_stub" + assert set(row.added_fingerprints) == {"fp_test_0000", "fp_test_0001"} + assert row.removed_fingerprints == [] + + +def test_write_baseline_appends_audit_row_on_resave(tmp_path): + baseline_path = tmp_path / "baseline.json" + audit_path = tmp_path / "baseline-audit.log" + write_baseline(_stub_report(("SHIP-X", "tool_a")), baseline_path) + initial_hash = compute_baseline_hash_from_file(baseline_path) + write_baseline( + _stub_report(("SHIP-X", "tool_a"), ("SHIP-Y", "tool_b")), baseline_path + ) + rows = read_audit_log(audit_path) + assert len(rows) == 2 + assert rows[1].hash_before == initial_hash + assert rows[1].hash_after == compute_baseline_hash_from_file(baseline_path) + assert rows[1].added_fingerprints == ["fp_test_0001"] + assert rows[1].removed_fingerprints == [] + + +def test_write_baseline_audit_log_path_override(tmp_path): + baseline_path = tmp_path / "b.json" + audit_path = tmp_path / "elsewhere" / "audit.log" + write_baseline( + _stub_report(("SHIP-X", "t")), + baseline_path, + audit_log_path=audit_path, + ) + assert audit_path.exists() + assert not (tmp_path / "baseline-audit.log").exists() + + +# --- verify_baseline ------------------------------------------------------ + + +def _bootstrap_baseline(tmp_path: Path) -> tuple[Path, Path]: + """Create a clean baseline + audit log pair for use in verify tests.""" + baseline_path = tmp_path / "baseline.json" + write_baseline( + _stub_report(("SHIP-X", "tool_a"), ("SHIP-Y", "tool_b")), + baseline_path, + ) + return baseline_path, tmp_path / "baseline-audit.log" + + +def test_verify_baseline_clean(tmp_path): + baseline_path, audit_path = _bootstrap_baseline(tmp_path) + issues = verify_baseline(baseline_path, audit_path) + assert issues == [] + + +def test_verify_baseline_detects_hand_edit(tmp_path): + baseline_path, audit_path = _bootstrap_baseline(tmp_path) + # Hand-edit: change `created_at` value (a single byte change) + data = baseline_path.read_text(encoding="utf-8") + tampered = data.replace('"created_at"', '"created_at" ') # add a space + baseline_path.write_text(tampered, encoding="utf-8") + issues = verify_baseline(baseline_path, audit_path) + kinds = [issue.kind for issue in issues] + assert "hash_mismatch" in kinds + assert has_hash_mismatch(issues) + + +def test_verify_baseline_missing_audit_log_with_entries(tmp_path): + baseline_path, audit_path = _bootstrap_baseline(tmp_path) + audit_path.unlink() + issues = verify_baseline(baseline_path, audit_path) + kinds = [issue.kind for issue in issues] + assert "missing_audit_log" in kinds + + +def test_verify_baseline_missing_audit_log_empty_baseline_ok(tmp_path): + """An empty baseline with no audit log is not an integrity violation. + + Fresh / unsaved state — nothing to verify against yet. + """ + baseline_path = tmp_path / "baseline.json" + BaselineFile( + created_at="2026-01-01T00:00:00Z", + source_report_run_id="run_empty", + ).model_dump_json(indent=2) # construct only + baseline_path.write_text( + BaselineFile( + created_at="2026-01-01T00:00:00Z", + source_report_run_id="run_empty", + ).model_dump_json(indent=2) + "\n", + encoding="utf-8", + ) + issues = verify_baseline(baseline_path, tmp_path / "audit.log") + assert issues == [] + + +def test_verify_baseline_legacy_no_provenance(tmp_path): + """Loading a v0.4 baseline that lacks provenance produces an integrity flag.""" + baseline_path = tmp_path / "baseline.json" + legacy = BaselineFile( + schema_version="0.4", + created_at="2025-12-01T00:00:00Z", + source_report_run_id="run_legacy", + findings=[ + BaselineFinding( + fingerprint="fp_legacy", + check_id="SHIP-X", + tool_name="t", + severity="high", + title="legacy", + provenance=None, + ) + ], + ) + baseline_path.write_text(legacy.model_dump_json(indent=2) + "\n", encoding="utf-8") + # Audit log exists but does not cover this entry + audit_path = tmp_path / "baseline-audit.log" + append_audit_entry( + audit_path, + BaselineAuditEntry( + timestamp=utc_now_isoformat(), + run_id="run_legacy", + scanner_version="0.10.0", + baseline_path=str(baseline_path), + hash_before=None, + hash_after=compute_baseline_hash_from_file(baseline_path), + ), + ) + issues = verify_baseline(baseline_path, audit_path) + kinds = {issue.kind for issue in issues} + assert "legacy_no_provenance" in kinds + + +def test_verify_baseline_entry_no_audit(tmp_path): + """An entry whose provenance.run_id is not in the audit log fires.""" + baseline_path = tmp_path / "baseline.json" + baseline = BaselineFile( + schema_version="0.5", + created_at="2026-01-01T00:00:00Z", + source_report_run_id="run_unknown", + findings=[ + BaselineFinding( + fingerprint="fp_x", + check_id="SHIP-X", + tool_name="t", + severity="high", + title="x", + provenance=BaselineProvenance( + scanner_version="1.0.0", + run_id="run_unknown", # not in audit log below + recorded_at="2026-01-01T00:00:00Z", + ), + ) + ], + ) + baseline_path.write_text( + baseline.model_dump_json(indent=2) + "\n", encoding="utf-8" + ) + audit_path = tmp_path / "baseline-audit.log" + append_audit_entry( + audit_path, + BaselineAuditEntry( + timestamp=utc_now_isoformat(), + run_id="run_other", # mismatch + scanner_version="1.0.0", + baseline_path=str(baseline_path), + hash_before=None, + hash_after=compute_baseline_hash_from_file(baseline_path), + ), + ) + issues = verify_baseline(baseline_path, audit_path) + kinds = {issue.kind for issue in issues} + assert "entry_no_audit" in kinds + + +def test_verify_baseline_entry_expired(tmp_path): + baseline_path = tmp_path / "baseline.json" + yesterday = date.today() - timedelta(days=5) + baseline = BaselineFile( + created_at="2026-01-01T00:00:00Z", + source_report_run_id="run_x", + findings=[ + BaselineFinding( + fingerprint="fp_exp", + check_id="SHIP-X", + tool_name="t", + severity="high", + title="x", + provenance=BaselineProvenance( + scanner_version="1.0.0", + run_id="run_x", + recorded_at="2026-01-01T00:00:00Z", + expires=yesterday, + ), + ) + ], + ) + baseline_path.write_text( + baseline.model_dump_json(indent=2) + "\n", encoding="utf-8" + ) + audit_path = tmp_path / "baseline-audit.log" + append_audit_entry( + audit_path, + BaselineAuditEntry( + timestamp=utc_now_isoformat(), + run_id="run_x", + scanner_version="1.0.0", + baseline_path=str(baseline_path), + hash_before=None, + hash_after=compute_baseline_hash_from_file(baseline_path), + ), + ) + issues = verify_baseline(baseline_path, audit_path, today=date.today()) + kinds = {issue.kind for issue in issues} + assert "entry_expired" in kinds + + +def test_verify_baseline_deprecated_check_id(tmp_path): + baseline_path = tmp_path / "baseline.json" + baseline = BaselineFile( + created_at="2026-01-01T00:00:00Z", + source_report_run_id="run_x", + findings=[ + BaselineFinding( + fingerprint="fp_dep", + check_id="SHIP-API-OPERATIONAL-READINESS", # in LEGACY_CHECK_ID_ALIASES + tool_name="t", + severity="high", + title="legacy", + provenance=BaselineProvenance( + scanner_version="1.0.0", + run_id="run_x", + recorded_at="2026-01-01T00:00:00Z", + ), + ) + ], + ) + baseline_path.write_text( + baseline.model_dump_json(indent=2) + "\n", encoding="utf-8" + ) + audit_path = tmp_path / "baseline-audit.log" + append_audit_entry( + audit_path, + BaselineAuditEntry( + timestamp=utc_now_isoformat(), + run_id="run_x", + scanner_version="1.0.0", + baseline_path=str(baseline_path), + hash_before=None, + hash_after=compute_baseline_hash_from_file(baseline_path), + ), + ) + issues = verify_baseline(baseline_path, audit_path) + kinds = {issue.kind for issue in issues} + assert "deprecated_check_id" in kinds + + +# --- baseline_resolved_fingerprints -------------------------------------- + + +def test_resolved_fingerprints_flags_entries_with_no_current_match(): + """A baseline entry whose check no longer fires is `resolved_not_pruned`.""" + baseline = BaselineFile( + created_at="2026-01-01T00:00:00Z", + source_report_run_id="run_x", + findings=[ + BaselineFinding( + fingerprint="fp_resolved", + check_id="SHIP-X", + tool_name="tool_gone", + severity="high", + title="gone", + provenance=BaselineProvenance( + scanner_version="1.0.0", + run_id="run_x", + recorded_at="2026-01-01T00:00:00Z", + ), + ) + ], + ) + # Current scan has no matching finding + current_findings: list[Finding] = [] + issues = baseline_resolved_fingerprints(current_findings, baseline) + assert len(issues) == 1 + assert issues[0].kind == "resolved_not_pruned" + assert issues[0].fingerprint == "fp_resolved" + + +# --- build_findings (check module) --------------------------------------- + + +def _stub_context() -> ScanContext: + from agents_shipgate.config.loader import load_manifest + + return ScanContext( + manifest=load_manifest(SAMPLE), + agent=Agent(id="agent:test", name="test"), + tools=[], + config_path=SAMPLE, + ) + + +def _issue(kind: str, severity: str = "critical") -> BaselineIntegrityIssue: + return BaselineIntegrityIssue( + kind=kind, + default_severity=severity, + title=f"{kind} title", + evidence={"kind": kind}, + fingerprint="fp_test", + check_id="SHIP-X", + tool_name="tool_a", + ) + + +def test_build_findings_off_returns_empty(): + context = _stub_context() + issues = [_issue("hash_mismatch")] + assert build_integrity_findings(issues, context=context, integrity_mode="off") == [] + + +def test_build_findings_warn_does_not_block(): + context = _stub_context() + issues = [_issue("hash_mismatch"), _issue("entry_expired", "high")] + findings = build_integrity_findings( + issues, context=context, integrity_mode="warn" + ) + assert len(findings) == 2 + assert all(not f.blocks_release for f in findings) + + +def test_build_findings_strict_blocks_only_mismatch(): + context = _stub_context() + issues = [ + _issue("hash_mismatch"), + _issue("entry_expired", "high"), + _issue("deprecated_check_id", "low"), + ] + findings = build_integrity_findings( + issues, context=context, integrity_mode="strict" + ) + by_check_id = {f.check_id: f for f in findings} + assert by_check_id["SHIP-BASELINE-INTEGRITY-MISMATCH"].blocks_release is True + assert by_check_id["SHIP-BASELINE-ENTRY-EXPIRED"].blocks_release is False + assert by_check_id["SHIP-BASELINE-ENTRY-STALE"].blocks_release is False + + +# --- scan-pipeline integration ------------------------------------------- + + +def _save_baseline_via_scan(tmp_path: Path) -> tuple[Path, Path]: + """Run a real scan against the bundled sample and save its baseline.""" + report, _ = run_scan( + config_path=SAMPLE, + output_dir=tmp_path / "out", + formats=["json"], + ci_mode="advisory", + ) + baseline_path = tmp_path / "baseline.json" + write_baseline(report, baseline_path) + return baseline_path, tmp_path / "baseline-audit.log" + + +def test_scan_emits_integrity_finding_on_tampered_baseline(tmp_path): + baseline_path, _ = _save_baseline_via_scan(tmp_path) + # Hand-edit the baseline (adds a stray space; hash changes) + data = baseline_path.read_text(encoding="utf-8") + baseline_path.write_text(data.replace("\n", " \n", 1), encoding="utf-8") + report, _ = run_scan( + config_path=SAMPLE, + output_dir=tmp_path / "out2", + formats=["json"], + ci_mode="advisory", + baseline_path=baseline_path, + ) + integrity_findings = [ + f for f in report.findings if f.check_id == "SHIP-BASELINE-INTEGRITY-MISMATCH" + ] + assert integrity_findings, "expected at least one integrity-mismatch finding" + # warn mode (default) → does not block + assert all(not f.blocks_release for f in integrity_findings) + + +def test_scan_integrity_off_emits_no_integrity_findings(tmp_path, monkeypatch): + baseline_path, _ = _save_baseline_via_scan(tmp_path) + # Tamper to ensure verify would fire + data = baseline_path.read_text(encoding="utf-8") + baseline_path.write_text(data.replace("\n", " \n", 1), encoding="utf-8") + # Monkeypatch the manifest loader to force integrity_mode=off without + # editing the bundled sample's shipgate.yaml. We splice in the desired + # value on the loaded model right after load_manifest returns. + from agents_shipgate.config import loader as loader_mod + + original_load = loader_mod.load_manifest + + def patched(path): + manifest = original_load(path) + manifest.baseline.integrity_mode = "off" + return manifest + + monkeypatch.setattr(loader_mod, "load_manifest", patched) + # cli.scan imports `load_manifest` at module level — patch the attribute it uses + import agents_shipgate.cli.scan as scan_mod + + monkeypatch.setattr(scan_mod, "load_manifest", patched) + report, _ = run_scan( + config_path=SAMPLE, + output_dir=tmp_path / "out2", + formats=["json"], + ci_mode="advisory", + baseline_path=baseline_path, + ) + integrity_findings = [ + f for f in report.findings if f.check_id.startswith("SHIP-BASELINE-") + ] + assert integrity_findings == [] + + +def test_scan_strict_integrity_sets_blocks_release(tmp_path, monkeypatch): + baseline_path, _ = _save_baseline_via_scan(tmp_path) + data = baseline_path.read_text(encoding="utf-8") + baseline_path.write_text(data.replace("\n", " \n", 1), encoding="utf-8") + from agents_shipgate.config import loader as loader_mod + + original_load = loader_mod.load_manifest + + def patched(path): + manifest = original_load(path) + manifest.baseline.integrity_mode = "strict" + return manifest + + monkeypatch.setattr(loader_mod, "load_manifest", patched) + import agents_shipgate.cli.scan as scan_mod + + monkeypatch.setattr(scan_mod, "load_manifest", patched) + report, _ = run_scan( + config_path=SAMPLE, + output_dir=tmp_path / "out2", + formats=["json"], + ci_mode="advisory", + baseline_path=baseline_path, + ) + mismatches = [ + f + for f in report.findings + if f.check_id == "SHIP-BASELINE-INTEGRITY-MISMATCH" + ] + assert mismatches + assert all(f.blocks_release for f in mismatches) + + +# --- CLI: baseline verify ------------------------------------------------- + + +def test_cli_baseline_verify_clean(tmp_path): + baseline_path, audit_path = _bootstrap_baseline(tmp_path) + runner = CliRunner() + result = runner.invoke( + app, + [ + "baseline", + "verify", + "--baseline", + str(baseline_path), + "--audit-log", + str(audit_path), + ], + ) + assert result.exit_code == 0 + assert "Baseline OK" in result.stdout + + +def test_cli_baseline_verify_strict_exits_6_on_mismatch(tmp_path): + baseline_path, audit_path = _bootstrap_baseline(tmp_path) + # Tamper + data = baseline_path.read_text(encoding="utf-8") + baseline_path.write_text(data.replace("\n", " \n", 1), encoding="utf-8") + runner = CliRunner() + result = runner.invoke( + app, + [ + "baseline", + "verify", + "--baseline", + str(baseline_path), + "--audit-log", + str(audit_path), + "--strict", + ], + ) + assert result.exit_code == 6 + + +def test_cli_baseline_verify_json(tmp_path): + baseline_path, audit_path = _bootstrap_baseline(tmp_path) + data = baseline_path.read_text(encoding="utf-8") + baseline_path.write_text(data.replace("\n", " \n", 1), encoding="utf-8") + runner = CliRunner() + result = runner.invoke( + app, + [ + "baseline", + "verify", + "--baseline", + str(baseline_path), + "--audit-log", + str(audit_path), + "--json", + ], + ) + assert result.exit_code == 0 # no --strict, so still 0 + payload = json.loads(result.stdout) + assert payload["issue_count"] >= 1 + assert any( + issue["kind"] == "hash_mismatch" for issue in payload["issues"] + ) + + +def test_cli_baseline_verify_missing_baseline_exits_3(tmp_path): + runner = CliRunner() + result = runner.invoke( + app, + [ + "baseline", + "verify", + "--baseline", + str(tmp_path / "nope.json"), + ], + ) + assert result.exit_code == 3 diff --git a/tests/test_scan.py b/tests/test_scan.py index 22df219..e17777a 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -320,7 +320,7 @@ def test_baseline_save_and_scan_matches_existing_findings(tmp_path): baseline_path=baseline_path, ) - assert baseline.schema_version == "0.4" + assert baseline.schema_version == "0.5" assert baseline.tool_surface_facts is not None assert baseline.action_surface_facts is not None assert first_report.run_id == second_report.run_id