diff --git a/.internal-skills/supply-chain/ACTION_LOG.md b/.internal-skills/supply-chain/ACTION_LOG.md new file mode 100644 index 0000000..328e9bf --- /dev/null +++ b/.internal-skills/supply-chain/ACTION_LOG.md @@ -0,0 +1,81 @@ +# `.internal-skills/supply-chain/ACTION_LOG.md` — append-only internal action log + +> Internal · NON-NORMATIVE. Append-only. Records actions, validations, and +> limitations for supply-chain runner/gate work. No external action is recorded +> here because none was taken (no release, tag, DOI, publish, deploy, or +> communication). The private repo `Davincc77/klickd-ai` was not touched. + +--- + +## 2026-06-02 — candidate generator + promotion gate (v0.1) + +- **Branch:** `feat/supply-chain-runner-gate`, stacked on + `integration/supply-chain-cumulative` (PR #121). +- **Base for PR:** `integration/supply-chain-cumulative` (NOT `main`). + +### Added +- `scripts/generate_supply_chain_candidate.py` — internal candidate generator + (runner). Config-only `build_request` JSON → candidate skill in the internal + v4.2 target shape under `.internal-skills/supply-chain/candidates/` or + `--out`. Deterministic: `candidate_id` / `candidate_hash` / `run_id` derived + only from canonical build_request bytes (+ resolved source manifest hash). + No `generated_at` in the hashed core. Sources come only from the + build_request / referenced source_manifest; missing domain info → + `requires_human_premium_pass`, never hallucinated. +- `scripts/run_supply_chain_promotion_gate.py` — combined promotion gate. + Orchestrates threat model (always), source/license (when `--source-manifest`), + logical diff (when `--before`), candidate shape checks, and forbidden-claim / + public-private boundary tripwires. Classifies ACCEPT / ACCEPT_WITH_REVIEW / + BLOCK. Exit 0 acceptable, 1 BLOCK, 2 usage. `deterministic_gate_id` excludes + the clock (`eval_date`). Reports — does not run — premium pass. `not_run` + checks recorded with a reason, never as `pass`. +- `tests/test_supply_chain_candidate.py` (20 tests), + `tests/test_supply_chain_promotion_gate.py` (19 tests). +- `tests/fixtures/supply_chain_candidate/` — `build_request_clean.json`, + `build_request_missing_domain.json`, `source_manifest_ok.json`. +- Example artefacts: `candidates/xklickd-research-reader.json`, + `promotion-gate/xklickd-research-reader.gate.json` + `.gate.md`. +- Updated `README.md` integration index: moved Candidate generation + Promotion + gate from "planned" to tool-backed, with literal scope notes. + +### Commands run (local, offline, stdlib-only) +- `python scripts/generate_supply_chain_candidate.py --build-request --out ` +- `python scripts/run_supply_chain_promotion_gate.py --candidate [--source-manifest ] [--before ] --out --md --eval-date 2026-06-02` +- `python -m pytest tests/test_supply_chain_*.py` → 102 passed. +- `python -m pytest tests/` → 283 passed, 1 unrelated DeprecationWarning + (jsonschema.__version__), 0 failures. +- `python scripts/verify_xklickd_skill_packs.py verify` → rc 0. +- `python scripts/validate_v4_schemas.py` → rc 0. +- `python scripts/validate_v4_1_candidate_mapping.py` → rc 0. +- Forbidden-claims / codename grep over committed `candidates/` and + `promotion-gate/` artefacts → CLEAN (no banned substring). Internal track name + `xklickd_internal_skill_v4_2` appears only inside the candidate's + `internal_target` block, as designed. + +### Validations / behaviour confirmed +- Deterministic repeatability: identical build_request → identical + candidate_id/hash; identical candidate → identical gate_id, stable across + differing `--eval-date`. +- Missing domain info → `requires_human_premium_pass=true` with named gaps; + no competencies/sources hallucinated. +- Clean candidate → gate ACCEPT (exit 0). +- Missing-domain candidate → gate ACCEPT_WITH_REVIEW (exit 0), + premium_pass_required=true. +- Forbidden claim, internal codename, private→public leak, public v4.2 + over-claim, missing v4.2 layer, completeness claim → gate BLOCK (exit 1). + +### Limitations (no mirage) +- Emitting the v4.2 target shape is NOT a claim of supply-chain completeness; a + generated candidate is NOT a loaded executable skill (fails the loaded-skill + gate: requires artifact_loaded AND sha256_matches_manifest). +- The gate's boundary tripwire is a coarse guard, not a full PII/secrets + scanner (still a planned stage). Runtime enforcement remains planned. +- No legal/compliance, security-certification, or benchmark-superiority claim. +- Premium pass is reported as required where applicable but is NOT executed. + +### Explicitly NOT done +- No release, tag, DOI, npm/PyPI publish, GitHub Release, or deploy. +- No merge to `main`. +- No external communication. +- No change to `Davincc77/klickd-ai`. +- No public artefact promoted to v4.2 (public stays v4.1 candidates). diff --git a/.internal-skills/supply-chain/README.md b/.internal-skills/supply-chain/README.md index e122265..966d72f 100644 --- a/.internal-skills/supply-chain/README.md +++ b/.internal-skills/supply-chain/README.md @@ -21,17 +21,25 @@ This README is the integration index that brings the supply-chain components tog | **Logical diff** | `scripts/generate_supply_chain_diff.py` | `diff/` (report output) | `tests/test_supply_chain_diff.py` (+ `tests/fixtures/supply_chain_diff/`) | | **Source freshness + license** | `scripts/check_supply_chain_sources.py` | `source-check/example_source_manifest.json` | `tests/test_supply_chain_sources.py` (+ `tests/fixtures/supply_chain_sources/`) | | **Threat model** | `scripts/generate_supply_chain_threat_model.py` | (report output) · doc: `docs/supply-chain/THREAT_MODEL_GENERATOR.md` | `tests/test_supply_chain_threat_model.py` (+ `tests/fixtures/threat-model/`) | +| **Candidate generation** | `scripts/generate_supply_chain_candidate.py` | `candidates/` (example: `candidates/xklickd-research-reader.json`) | `tests/test_supply_chain_candidate.py` (+ `tests/fixtures/supply_chain_candidate/`) | +| **Promotion gate** | `scripts/run_supply_chain_promotion_gate.py` | `promotion-gate/` (example: `promotion-gate/xklickd-research-reader.gate.json` + `.md`) | `tests/test_supply_chain_promotion_gate.py` | -Each of these is `tool`: a runnable script with a passing test module and deterministic output. "Tool-backed" means the bytes and behaviour exist and are tested — it does **not** imply the end-to-end build runner exists. +Each of these is `tool`: a runnable script with a passing test module and deterministic output. "Tool-backed" means the bytes and behaviour exist and are tested — it does **not** imply the supply chain is complete, that any candidate is a loaded skill, or that a public release exists. + +### Candidate generator scope (read literally) + +`generate_supply_chain_candidate.py` emits the **internal v4.2 target shape** from a config-only `build_request`. Emitting the shape is **not** a claim that every lifecycle stage is implemented or verified — and a generated candidate is **not** a loaded executable skill (it fails the loaded-skill gate below). When domain information is missing, the runner marks `requires_human_premium_pass` rather than inventing competencies, risk, or sources. Sources come **only** from the `build_request` / referenced `source_manifest`. + +### Promotion gate scope (read literally) + +`run_supply_chain_promotion_gate.py` orchestrates the tool-backed checks (threat model always; source/license when a manifest is given; logical diff when a `--before` is given) plus candidate shape checks and forbidden-claim / public-private boundary tripwires. It classifies **ACCEPT / ACCEPT_WITH_REVIEW / BLOCK** and **reports** whether a human premium pass is required — it does **not** run that pass, and makes no compliance/security/benchmark claim. A check that could not run is recorded `not_run` with a reason, never as `pass`. ## Planned stages (specified, not built) | Stage | What it will do | Why it is not claimed yet | |---|---|---| -| **Candidate generation** | Produce a candidate `carrier_pack` from a config-only build request (the build *runner*). | No runner is shipped; the *process* is specified, the executor is not. | -| **Promotion gate** | Pass/fail enforcement that blocks a candidate from being promoted unless all checks pass. | No gate enforces promotion today; checks run, but nothing blocks on them. | -| **Full PII / secrets scanner** | Scan candidate inputs/outputs for PII and secrets beyond the engineering source/license checks. | Current `source-check` is an engineering license/freshness check, **not** a compliance attestation or a PII scanner. | -| **Runtime enforcement** | Enforce guardrails in-loop at execution time, not just at build/audit time. | Build-time checks exist; runtime enforcement does not. | +| **Full PII / secrets scanner** | Scan candidate inputs/outputs for PII and secrets beyond the engineering source/license checks. | Current `source-check` is an engineering license/freshness check, **not** a compliance attestation or a PII scanner. The gate's boundary tripwire is a coarse guard, not a full scanner. | +| **Runtime enforcement** | Enforce guardrails in-loop at execution time, not just at build/audit time. | Build-time checks + the promotion gate exist; runtime enforcement does not. | --- diff --git a/.internal-skills/supply-chain/candidates/xklickd-research-reader.json b/.internal-skills/supply-chain/candidates/xklickd-research-reader.json new file mode 100644 index 0000000..b71e930 --- /dev/null +++ b/.internal-skills/supply-chain/candidates/xklickd-research-reader.json @@ -0,0 +1,315 @@ +{ + "audit": { + "audit_trail_stage": "audit_trail_index", + "build_request_hash": "sha256:1f439bb471fc5bc90b95aa685b340becf75a1a578069f9193aad1f62e2315174", + "emits_audit_event_per_output": true + }, + "build_request_hash": "sha256:1f439bb471fc5bc90b95aa685b340becf75a1a578069f9193aad1f62e2315174", + "candidate_hash": "sha256:e6369e8b9b1f91b65d17f68db2fe4df7efa5c9fcbd545db0b8c3bb72e9835ad3", + "candidate_id": "sha256:2dc00bf2ebb3251025b8de1b8bbfcd426f5edc5bb7f245d20d305857ab376bff", + "competency_architecture": { + "competency_core": { + "base_transversal_core": { + "transversal_refs": [ + "ESCO:S1.transversal_thinking", + "ESCO:S2.transversal_collaboration", + "ESCO:S3.transversal_communication", + "LifeComp:Personal.self_regulation", + "LifeComp:Social.cooperation", + "LifeComp:Learning.learning_to_learn", + "DigComp:transversal.responsible_use" + ] + }, + "foundation_competencies": [ + "ESCO:S1.transversal_thinking", + "ESCO:S2.transversal_collaboration", + "ESCO:S3.transversal_communication", + "LifeComp:Personal.self_regulation", + "LifeComp:Social.cooperation", + "LifeComp:Learning.learning_to_learn", + "DigComp:transversal.responsible_use" + ], + "note": "Foundation/transversal anchors are framework-referenced structural names, not fabricated domain knowledge.", + "transversal_competencies": [ + "WEF:critical_thinking", + "WEF:problem_solving", + "WEF:creativity", + "WEF:adaptability", + "WEF:ethical_reasoning", + "ESCO:information_literacy", + "ESCO:digital_literacy", + "LifeComp:growth_mindset", + "LifeComp:empathy", + "DigComp:information_evaluation", + "DigComp:data_protection_awareness", + "DigComp:safety" + ] + }, + "domain": "research", + "domain_output_requirements": { + "format": "cited_summary" + }, + "domain_risk_profile": { + "default_risk": "low", + "sensitive_actions": [] + }, + "harmonized_layers": [ + "competency_core", + "primary_domain_competencies", + "secondary_domain_competencies", + "domain_risk_profile", + "domain_output_requirements" + ], + "primary_domain_competencies": [ + "ESCO:research_methods", + "ESCO:academic_writing" + ], + "secondary_domain_competencies": [ + "ESCO:citation_management" + ] + }, + "context_graph": { + "edge_types": [ + "scopes", + "requires", + "creates", + "vetoes", + "audits" + ], + "node_types": [ + "competency", + "memory", + "evidence", + "policy", + "action", + "audit" + ], + "traversed_by_runtime": true + }, + "domain": "research", + "evidence": { + "requires_citations": true, + "source_count": 1, + "sources": [ + { + "category": "academic", + "freshness": "current", + "id": "skos", + "license": "CC-BY-4.0", + "name": "skos-framework", + "published_at": "2025-01-01", + "title": "SKOS Reference", + "url": "https://example.org/skos", + "usage": "reference" + } + ] + }, + "governance": { + "final_decision_owner": "human_operator", + "human_veto_required": true, + "no_auto_external_action": true, + "non_lowerable_rules": [ + "no_stub_as_loaded_skill", + "no_unsourced_claim" + ] + }, + "governance_system": { + "_declared_sensitive_actions": [], + "action_gates": [], + "approval_lifecycle": [ + "requested", + "granted", + "denied", + "expired" + ], + "authority_hierarchy": [ + "human_operator", + "human_reviewer", + "agent" + ], + "consent_rules": [], + "escalation_rules": [], + "final_decision_owner": "human_operator", + "governance_audit": { + "emits_to": "audit" + }, + "human_veto": { + "lowerable": false, + "note": "raise-only; not lowerable by an agent", + "required": true + }, + "human_veto_required": true, + "no_auto_external_action": true, + "non_lowerable_rules": [ + "no_stub_as_loaded_skill", + "no_unsourced_claim" + ], + "policy_conflict_resolution": "strictest_rule_wins", + "revocation_rules": [], + "risk_levels": [ + "low", + "medium", + "high", + "critical" + ] + }, + "interactions": { + "canonical_flow": [ + "user_task", + "intent_detection", + "competency_activation", + "memory_retrieval", + "context_graph_traversal", + "evidence_resolution", + "policy_evaluation", + "output_contract_check", + "human_veto_if_required", + "response_or_action", + "audit_event", + "memory_update_candidate" + ], + "flows": [ + "task_to_competency_flow", + "competency_to_memory_flow", + "memory_to_context_graph_flow", + "context_graph_to_policy_flow", + "policy_to_output_contract_flow", + "output_to_audit_flow", + "lifecycle_to_runtime_flow" + ], + "human_veto_if_required_lowerable": false, + "memory_update_is_candidate_only": true + }, + "internal_target": { + "note": "Internal v4.2 target shape. NOT a public v4.2 release; public x.klickd artefacts remain v4.1 candidates.", + "public_version": "v4.1", + "track": "xklickd_internal_skill_v4_2" + }, + "kind": "xklickd_internal_candidate_skill", + "memory": { + "promotion_rules": [], + "reads_private_context": false, + "writes_long_term": false + }, + "memory_governance": { + "every_action_influencing_write_is_governed": true, + "every_governance_decision_consulting_memory_is_audited": true, + "role": "bridge_between_memory_and_governance" + }, + "memory_system": { + "promotion_rules": [], + "reads_private_context": false, + "retention": "session_default", + "retrieval": "scoped_by_active_competency", + "write_candidates": "subject_to_memory_governance", + "writes_long_term": false + }, + "metadata": { + "domain": "research", + "publisher": "internal", + "size_tier": "lite", + "skill_id": "xklickd-research-reader", + "status": "candidate", + "title": "Research Reader" + }, + "non_normative": true, + "output_contract": { + "allowed_outputs": [ + "text_response" + ], + "emits_public_output": true, + "forbidden_outputs": [ + "private_to_public", + "unsourced_public_claim" + ], + "graph_bindings": { + "creates_action_node": false, + "may_trigger_veto_edge": true, + "requires_evidence_node": true, + "requires_policy_node": true, + "writes_audit_edge": true + }, + "required_audit_event": true, + "required_citations": true, + "required_handoff_summary": true, + "required_uncertainty_markers": true, + "requires_citations": true + }, + "premium_pass_status": { + "gaps": [], + "note": "Missing domain information is surfaced as a gap, never hallucinated. A clean candidate has an empty gaps list.", + "requires_human_premium_pass": false + }, + "risk_profile": { + "default_risk": "low", + "sensitive_actions": [] + }, + "run_id": "sha256:2dc00bf2ebb3251025b8de1b8bbfcd426f5edc5bb7f245d20d305857ab376bff", + "runtime": { + "lifecycle_gates_availability": true, + "loadable_only_if_promoted": true, + "tools": { + "allowed": [ + "read_file" + ], + "forbidden": [ + "publish", + "send_email" + ] + } + }, + "schema_version": "xklickd.candidate.v0.1", + "security": { + "declared_classification": "internal", + "no_real_pii_in_candidate": true, + "no_secrets_in_candidate": true, + "private_public_boundary_guarded": true + }, + "skill_id": "xklickd-research-reader", + "skill_lifecycle": { + "completeness_claimed": false, + "note": "Target lifecycle layout only; not an assertion that every stage is implemented or verified. release_record is an INTERNAL record, never a public tag/DOI/package/release.", + "renamed_from": "supply_chain", + "stages": [ + "build_request", + "source_manifest", + "generated_candidate", + "validation_pipeline", + "audit_trail_index", + "determinism_record", + "logical_diff_report", + "source_license_report", + "threat_model_report", + "benchmark_report", + "premium_pass_report", + "promotion_gate", + "rollback_protocol", + "deprecation_protocol", + "release_record" + ] + }, + "source_manifest": null, + "source_manifest_hash": null, + "sources": [ + { + "category": "academic", + "freshness": "current", + "id": "skos", + "license": "CC-BY-4.0", + "name": "skos-framework", + "published_at": "2025-01-01", + "title": "SKOS Reference", + "url": "https://example.org/skos", + "usage": "reference" + } + ], + "tools": { + "allowed": [ + "read_file" + ], + "forbidden": [ + "publish", + "send_email" + ] + } +} diff --git a/.internal-skills/supply-chain/promotion-gate/xklickd-research-reader.gate.json b/.internal-skills/supply-chain/promotion-gate/xklickd-research-reader.gate.json new file mode 100644 index 0000000..1fbeb9d --- /dev/null +++ b/.internal-skills/supply-chain/promotion-gate/xklickd-research-reader.gate.json @@ -0,0 +1,117 @@ +{ + "blocking_findings": [], + "candidate_hash": "sha256:e6369e8b9b1f91b65d17f68db2fe4df7efa5c9fcbd545db0b8c3bb72e9835ad3", + "candidate_id": "sha256:2dc00bf2ebb3251025b8de1b8bbfcd426f5edc5bb7f245d20d305857ab376bff", + "candidate_path": ".internal-skills/supply-chain/candidates/xklickd-research-reader.json", + "checks": [ + { + "blocking_findings": [], + "check": "candidate_shape", + "detail": { + "required_layers": [ + "metadata", + "competency_architecture", + "memory_system", + "governance_system", + "memory_governance", + "runtime", + "context_graph", + "interactions", + "evidence", + "security", + "audit", + "skill_lifecycle", + "output_contract" + ] + }, + "review_findings": [], + "verdict": "pass" + }, + { + "blocking_findings": [], + "check": "boundary_tripwires", + "detail": {}, + "review_findings": [], + "verdict": "pass" + }, + { + "blocking_findings": [], + "check": "threat_model", + "detail": { + "deterministic_threat_model_id": "tmid:4465464eef334cd591f8703405c67744", + "summary": { + "blocked": 0, + "by_category": {}, + "by_severity": { + "critical": 0, + "high": 0, + "low": 0, + "medium": 0 + }, + "total": 0 + } + }, + "review_findings": [], + "verdict": "pass" + }, + { + "blocking_findings": [], + "check": "source_license", + "detail": { + "deterministic_report_id": "sha256:10dde9d9942f1d13938bf046be3ac6470a10aad763979315334a7ef1f0a13126", + "summary": { + "allowed": 1, + "blocked": 0, + "review": 0, + "total_sources": 1 + } + }, + "review_findings": [], + "verdict": "pass" + }, + { + "blocking_findings": [], + "check": "logical_diff", + "detail": { + "reason": "no --before candidate provided" + }, + "review_findings": [], + "verdict": "not_run" + }, + { + "blocking_findings": [], + "check": "premium_pass_required", + "detail": { + "gaps": [], + "requires_human_premium_pass": false + }, + "review_findings": [], + "verdict": "pass" + } + ], + "claim_boundaries": { + "establishes_legal_compliance": false, + "is_full_automation": false, + "is_security_certification": false, + "note": "The gate orchestrates offline, stdlib-only checks and reports whether a human premium pass is required; it does not run that pass and makes no compliance claim.", + "proves_loaded_executable_skill": false, + "runs_premium_pass": false + }, + "classification": "ACCEPT", + "deterministic_gate_id": "sha256:a45b0325986dc2880411c570254765360037856dce1df783740ac8ec2f1ff9d4", + "kind": "xklickd_supply_chain_promotion_gate_report", + "non_deterministic_zone": { + "evaluated_at": "2026-06-02", + "note": "evaluated_at is excluded from deterministic_gate_id." + }, + "non_normative": true, + "premium_pass_required": false, + "review_findings": [], + "schema_version": "xklickd.promotion_gate.v0.1", + "summary": { + "blocking": 0, + "checks_not_run": 1, + "checks_run": 5, + "review": 0 + } +} diff --git a/.internal-skills/supply-chain/promotion-gate/xklickd-research-reader.gate.md b/.internal-skills/supply-chain/promotion-gate/xklickd-research-reader.gate.md new file mode 100644 index 0000000..9c81315 --- /dev/null +++ b/.internal-skills/supply-chain/promotion-gate/xklickd-research-reader.gate.md @@ -0,0 +1,19 @@ +# Supply-chain promotion gate — ACCEPT + +- **Candidate:** `sha256:2dc00bf2ebb3251025b8de1b8bbfcd426f5edc5bb7f245d20d305857ab376bff` +- **Gate id:** `sha256:a45b0325986dc2880411c570254765360037856dce1df783740ac8ec2f1ff9d4` +- **Premium pass required:** False +- **Blocking:** 0 · **Review:** 0 · **Checks run:** 5 · **Not run:** 1 + +## Checks + +| Check | Verdict | Blocking | Review | +|---|---|---|---| +| candidate_shape | pass | 0 | 0 | +| boundary_tripwires | pass | 0 | 0 | +| threat_model | pass | 0 | 0 | +| source_license | pass | 0 | 0 | +| logical_diff | not_run | 0 | 0 | +| premium_pass_required | pass | 0 | 0 | + +> NON-NORMATIVE. No release, no compliance claim. The gate reports whether a human premium pass is required; it does not run one. diff --git a/scripts/generate_supply_chain_candidate.py b/scripts/generate_supply_chain_candidate.py new file mode 100644 index 0000000..2441599 --- /dev/null +++ b/scripts/generate_supply_chain_candidate.py @@ -0,0 +1,676 @@ +#!/usr/bin/env python3 +"""x.klickd supply-chain — internal candidate skill generator (runner v0.1). + +This is the candidate-generation stage of the documented supply-chain pipeline: +the build *runner* that turns a deterministic, config-only `build_request` into +a candidate skill in the INTERNAL v4.2 target shape described in +docs/internal/INTERNAL_SKILL_V4_2_MAPPING.md. + +NON-NORMATIVE. Internal only. This runner: + - produces NO public release, tag, DOI, package, or deploy; + - does NOT promote any public artefact to v4.2 (public stays v4.1 candidate); + - does NOT run the premium pass — it only marks where one is required; + - does NOT invent sources: every source comes from the build_request or a + referenced source_manifest. Missing domain information is surfaced as a + `requires_human_premium_pass` flag, never hallucinated. + +Anti-mirage contract: + - The runner emits the v4.2 *target shape* (metadata, competency_architecture, + memory_system, governance_system, memory_governance, runtime, context_graph, + interactions, evidence, security, audit, skill_lifecycle, output_contract). + Emitting the shape is NOT a claim that every lifecycle stage is implemented + or verified — that is the promotion gate's job, and it stays honest about + what it has and has not run. + - A candidate is only "complete enough to promote without human premium pass" + when no `requires_human_premium_pass` marker is set. The generator never + fabricates competencies, domain risk, or sources to clear that bar. + +Determinism: + - candidate_id / candidate_hash / run_id are derived ONLY from the canonical + build_request bytes (+ resolved source manifest bytes when referenced). + Identical inputs -> identical ids across runs / hosts / clocks. + - Any clock value (generated_at) is quarantined under non_deterministic_zone + and excluded from every hash. + +CLI: + python scripts/generate_supply_chain_candidate.py --build-request REQ.json + python scripts/generate_supply_chain_candidate.py --build-request REQ.json --out cand.json + +Exit codes: + 0 candidate generated (may carry requires_human_premium_pass markers) + 1 the build_request is structurally invalid (cannot generate honestly) + 2 usage / I-O error +""" +from __future__ import annotations + +import argparse +import hashlib +import json +import sys +from pathlib import Path +from typing import Any + +REPO_ROOT = Path(__file__).resolve().parents[1] +DEFAULT_OUT_DIR = ( + REPO_ROOT / ".internal-skills" / "supply-chain" / "candidates" +) + +CANDIDATE_SCHEMA_VERSION = "xklickd.candidate.v0.1" +INTERNAL_TARGET_TRACK = "xklickd_internal_skill_v4_2" + +# The 7 foundation (transversal) competency anchors and 12 transversal-flow +# competencies. These are STRUCTURAL anchors (framework-referenced names), not +# fabricated domain knowledge — they are the shared "base transversal core" +# every candidate carries per docs/chimera/V4_1_COMPETENCY_IDENTIFICATION_PROTOCOL.md. +FOUNDATION_COMPETENCIES = ( + "ESCO:S1.transversal_thinking", + "ESCO:S2.transversal_collaboration", + "ESCO:S3.transversal_communication", + "LifeComp:Personal.self_regulation", + "LifeComp:Social.cooperation", + "LifeComp:Learning.learning_to_learn", + "DigComp:transversal.responsible_use", +) +TRANSVERSAL_COMPETENCIES = ( + "WEF:critical_thinking", + "WEF:problem_solving", + "WEF:creativity", + "WEF:adaptability", + "WEF:ethical_reasoning", + "ESCO:information_literacy", + "ESCO:digital_literacy", + "LifeComp:growth_mindset", + "LifeComp:empathy", + "DigComp:information_evaluation", + "DigComp:data_protection_awareness", + "DigComp:safety", +) + +# Harmonised v4.2 competency_architecture sub-layer names (mapping §3). +COMPETENCY_ARCH_LAYERS = ( + "competency_core", + "primary_domain_competencies", + "secondary_domain_competencies", + "domain_risk_profile", + "domain_output_requirements", +) + +# Canonical interactions flow (mapping §5.1). Stored verbatim so the candidate +# records the intended layer-communication contract, not an invented one. +CANONICAL_FLOW = ( + "user_task", + "intent_detection", + "competency_activation", + "memory_retrieval", + "context_graph_traversal", + "evidence_resolution", + "policy_evaluation", + "output_contract_check", + "human_veto_if_required", + "response_or_action", + "audit_event", + "memory_update_candidate", +) + +INTERACTION_FLOWS = ( + "task_to_competency_flow", + "competency_to_memory_flow", + "memory_to_context_graph_flow", + "context_graph_to_policy_flow", + "policy_to_output_contract_flow", + "output_to_audit_flow", + "lifecycle_to_runtime_flow", +) + +# skill_lifecycle stages (mapping §6). NOT named "supply_chain". +SKILL_LIFECYCLE_STAGES = ( + "build_request", + "source_manifest", + "generated_candidate", + "validation_pipeline", + "audit_trail_index", + "determinism_record", + "logical_diff_report", + "source_license_report", + "threat_model_report", + "benchmark_report", + "premium_pass_report", + "promotion_gate", + "rollback_protocol", + "deprecation_protocol", + "release_record", +) + +# Banned substrings on the generated output: internal-codename leakage and +# unbounded public claims. Mirrors the audit-stage tripwire. NOTE: the internal +# target-track name itself is allowed only under the explicit `internal_target` +# metadata key (it must not leak into any public-facing field), so we do not ban +# it globally here — the promotion gate's boundary tripwire enforces placement. +BANNED_SUBSTRINGS = ( + "chimera", + "universal standard", + "automatic gdpr", + "automatic eu ai act", + "benchmark superiority", + "proven benchmark", +) + + +class BuildRequestError(RuntimeError): + """Raised when the build_request cannot be honestly turned into a candidate.""" + + +# --- hashing / canonical helpers -------------------------------------------- +def _canonical_bytes(obj: Any) -> bytes: + return json.dumps( + obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False + ).encode("utf-8") + + +def _sha256_obj(obj: Any) -> str: + return "sha256:" + hashlib.sha256(_canonical_bytes(obj)).hexdigest() + + +def _sha256_text(text: str) -> str: + return "sha256:" + hashlib.sha256(text.encode("utf-8")).hexdigest() + + +def _rel(path: Path) -> str: + try: + return str(path.relative_to(REPO_ROOT)) + except ValueError: + return path.name + + +# --- input loading ----------------------------------------------------------- +def load_build_request(path: Path) -> tuple[dict[str, Any], str]: + if not path.exists(): + raise BuildRequestError(f"build_request not found: {path}") + text = path.read_text(encoding="utf-8") + try: + data = json.loads(text) + except json.JSONDecodeError as exc: + raise BuildRequestError(f"build_request is not valid JSON: {exc}") from exc + if not isinstance(data, dict): + raise BuildRequestError("build_request root must be a JSON object") + return data, text + + +def _resolve_sources( + request: dict[str, Any], request_path: Path +) -> tuple[list[dict[str, Any]], str | None, str | None]: + """Resolve declared sources from the request and/or a source_manifest. + + Sources come ONLY from the build_request (inline `sources`) or a referenced + `source_manifest` file. The runner never adds a source of its own. Returns + (sources, source_manifest_relpath, source_manifest_hash). + """ + sources: list[dict[str, Any]] = [] + inline = request.get("sources") + if inline is not None: + if not isinstance(inline, list): + raise BuildRequestError("build_request.sources must be a list") + for i, s in enumerate(inline): + if not isinstance(s, dict): + raise BuildRequestError(f"sources[{i}] must be an object") + sources.extend(inline) + + manifest_rel: str | None = None + manifest_hash: str | None = None + ref = request.get("source_manifest") + if ref: + manifest_path = (request_path.resolve().parent / str(ref)) + if not manifest_path.exists(): + raise BuildRequestError( + f"referenced source_manifest not found: {ref}" + ) + mtext = manifest_path.read_text(encoding="utf-8") + try: + mdata = json.loads(mtext) + except json.JSONDecodeError as exc: + raise BuildRequestError( + f"source_manifest is not valid JSON: {exc}" + ) from exc + msources = mdata.get("sources") + if not isinstance(msources, list): + raise BuildRequestError("source_manifest.sources must be a list") + for i, s in enumerate(msources): + if not isinstance(s, dict): + raise BuildRequestError( + f"source_manifest.sources[{i}] must be an object" + ) + sources.extend(msources) + manifest_rel = _rel(manifest_path) + manifest_hash = _sha256_text(mtext) + + return sources, manifest_rel, manifest_hash + + +# --- candidate assembly ------------------------------------------------------ +def _gap(reason: str) -> dict[str, str]: + return {"requires_human_premium_pass": True, "reason": reason} + + +def _competency_architecture( + request: dict[str, Any], gaps: list[str] +) -> dict[str, Any]: + domain = request.get("domain") + primary = request.get("primary_domain_competencies") + secondary = request.get("secondary_domain_competencies") or [] + + # Domain competencies MUST come from the request. We never invent them. + if not primary: + gaps.append("competency_architecture.primary_domain_competencies") + primary_block: Any = _gap( + "no primary domain competencies declared in build_request; " + "domain expertise must be supplied by a human premium pass, " + "not generated" + ) + else: + if not isinstance(primary, list): + raise BuildRequestError( + "primary_domain_competencies must be a list when provided" + ) + primary_block = list(primary) + + if not isinstance(secondary, list): + raise BuildRequestError( + "secondary_domain_competencies must be a list when provided" + ) + + domain_risk = request.get("domain_risk_profile") + if not domain_risk: + gaps.append("competency_architecture.domain_risk_profile") + risk_block: Any = _gap( + "no domain risk profile declared; domain risk must be assessed by " + "a human premium pass" + ) + else: + risk_block = domain_risk + + domain_out = request.get("domain_output_requirements") + if not domain_out: + gaps.append("competency_architecture.domain_output_requirements") + out_block: Any = _gap( + "no domain output requirements declared; must be specified by a " + "human premium pass" + ) + else: + out_block = domain_out + + return { + "competency_core": { + "foundation_competencies": list(FOUNDATION_COMPETENCIES), + "transversal_competencies": list(TRANSVERSAL_COMPETENCIES), + "base_transversal_core": { + "transversal_refs": list(FOUNDATION_COMPETENCIES), + }, + "note": ( + "Foundation/transversal anchors are framework-referenced " + "structural names, not fabricated domain knowledge." + ), + }, + "primary_domain_competencies": primary_block, + "secondary_domain_competencies": list(secondary), + "domain_risk_profile": risk_block, + "domain_output_requirements": out_block, + "harmonized_layers": list(COMPETENCY_ARCH_LAYERS), + "domain": domain, + } + + +def _governance_system(request: dict[str, Any]) -> dict[str, Any]: + """Governance defaults to the strictest safe posture. + + Where the request under-specifies, we DEFAULT TO SAFE (veto required, no + auto external action, human final owner) rather than guessing a permissive + setting. A request may tighten but the runner never loosens below the floor. + """ + g = request.get("governance") or {} + sensitive = list((request.get("risk_profile") or {}).get("sensitive_actions") or []) + return { + "authority_hierarchy": g.get("authority_hierarchy") + or ["human_operator", "human_reviewer", "agent"], + "human_veto": { + "required": True, + "lowerable": False, + "note": "raise-only; not lowerable by an agent", + }, + "consent_rules": g.get("consent_rules") or [], + "risk_levels": g.get("risk_levels") + or ["low", "medium", "high", "critical"], + "action_gates": g.get("action_gates") or [], + "non_lowerable_rules": sorted(set( + list(g.get("non_lowerable_rules") or []) + + ["no_unsourced_claim", "no_stub_as_loaded_skill"] + )), + "escalation_rules": g.get("escalation_rules") or [], + "approval_lifecycle": ["requested", "granted", "denied", "expired"], + "revocation_rules": g.get("revocation_rules") or [], + "policy_conflict_resolution": g.get("policy_conflict_resolution") + or "strictest_rule_wins", + "governance_audit": {"emits_to": "audit"}, + # Flat mirror consumed by the threat-model tool (classify()). + "human_veto_required": True, + "no_auto_external_action": True, + "final_decision_owner": "human_operator", + "_declared_sensitive_actions": sensitive, + } + + +def _memory_system(request: dict[str, Any]) -> dict[str, Any]: + m = request.get("memory") or {} + writes_long_term = bool(m.get("writes_long_term")) + return { + "retrieval": m.get("retrieval") or "scoped_by_active_competency", + "write_candidates": "subject_to_memory_governance", + "retention": m.get("retention") or "session_default", + # Flat mirror for the threat-model tool. + "writes_long_term": writes_long_term, + "reads_private_context": bool(m.get("reads_private_context")), + "promotion_rules": m.get("promotion_rules") + or (["human_review_required"] if writes_long_term else []), + } + + +def _memory_governance() -> dict[str, Any]: + return { + "role": "bridge_between_memory_and_governance", + "every_action_influencing_write_is_governed": True, + "every_governance_decision_consulting_memory_is_audited": True, + } + + +def _runtime(request: dict[str, Any]) -> dict[str, Any]: + return { + "loadable_only_if_promoted": True, + "lifecycle_gates_availability": True, + "tools": { + "allowed": list((request.get("tools") or {}).get("allowed") or []), + "forbidden": sorted(set( + list((request.get("tools") or {}).get("forbidden") or []) + + ["publish", "send_email"] + )), + }, + } + + +def _context_graph() -> dict[str, Any]: + return { + "node_types": [ + "competency", "memory", "evidence", "policy", "action", "audit", + ], + "edge_types": ["scopes", "requires", "creates", "vetoes", "audits"], + "traversed_by_runtime": True, + } + + +def _interactions() -> dict[str, Any]: + return { + "flows": list(INTERACTION_FLOWS), + "canonical_flow": list(CANONICAL_FLOW), + "human_veto_if_required_lowerable": False, + "memory_update_is_candidate_only": True, + } + + +def _evidence(sources: list[dict[str, Any]], gaps: list[str]) -> dict[str, Any]: + if not sources: + gaps.append("evidence.sources") + return { + "sources": [], + "requires_citations": True, + "status": _gap( + "no sources declared in build_request or source_manifest; " + "evidence must be supplied, not invented" + ), + } + return { + "sources": list(sources), + "requires_citations": True, + "source_count": len(sources), + } + + +def _security(request: dict[str, Any]) -> dict[str, Any]: + return { + "no_secrets_in_candidate": True, + "no_real_pii_in_candidate": True, + "private_public_boundary_guarded": True, + "declared_classification": request.get("classification") or "internal", + } + + +def _audit(build_request_hash: str) -> dict[str, Any]: + return { + "build_request_hash": build_request_hash, + "emits_audit_event_per_output": True, + "audit_trail_stage": "audit_trail_index", + } + + +def _skill_lifecycle() -> dict[str, Any]: + # Records the TARGET lifecycle layout. Stage presence here is structural; + # it is NOT an assertion that each stage is implemented/verified. + return { + "stages": list(SKILL_LIFECYCLE_STAGES), + "renamed_from": "supply_chain", + "completeness_claimed": False, + "note": ( + "Target lifecycle layout only; not an assertion that every stage " + "is implemented or verified. release_record is an INTERNAL record, " + "never a public tag/DOI/package/release." + ), + } + + +def _output_contract(request: dict[str, Any]) -> dict[str, Any]: + oc = request.get("output_contract") or {} + return { + "allowed_outputs": list(oc.get("allowed_outputs") or ["text_response"]), + "forbidden_outputs": sorted(set( + list(oc.get("forbidden_outputs") or []) + + ["unsourced_public_claim", "private_to_public"] + )), + "required_citations": True, + "required_uncertainty_markers": True, + "required_handoff_summary": True, + "required_audit_event": True, + "graph_bindings": { + "creates_action_node": bool(oc.get("creates_action_node")), + "requires_policy_node": True, + "requires_evidence_node": True, + "may_trigger_veto_edge": True, + "writes_audit_edge": True, + }, + # Flat mirror for the threat-model tool. + "requires_citations": True, + "emits_public_output": bool(oc.get("emits_public_output")), + } + + +def build_candidate( + request: dict[str, Any], + request_text: str, + request_path: Path, +) -> dict[str, Any]: + """Build the candidate skill in the v4.2 internal target shape. + + Deterministic given (request bytes + resolved source manifest bytes). + """ + skill_id = request.get("skill_id") + if not skill_id or not isinstance(skill_id, str): + raise BuildRequestError("build_request.skill_id (string) is required") + if not request.get("domain"): + raise BuildRequestError("build_request.domain is required") + + sources, manifest_rel, manifest_hash = _resolve_sources(request, request_path) + + build_request_hash = _sha256_text(request_text) + # Determinism anchor: id derived ONLY from canonical request + manifest hash. + id_material = { + "skill_id": skill_id, + "build_request_hash": build_request_hash, + "source_manifest_hash": manifest_hash, + } + candidate_id = _sha256_obj(id_material) + run_id = candidate_id # one run == one deterministic candidate id + + gaps: list[str] = [] + + candidate: dict[str, Any] = { + "schema_version": CANDIDATE_SCHEMA_VERSION, + "kind": "xklickd_internal_candidate_skill", + "non_normative": True, + "internal_target": { + "track": INTERNAL_TARGET_TRACK, + "public_version": "v4.1", + "note": ( + "Internal v4.2 target shape. NOT a public v4.2 release; public " + "x.klickd artefacts remain v4.1 candidates." + ), + }, + "skill_id": skill_id, + "domain": request.get("domain"), + "candidate_id": candidate_id, + "run_id": run_id, + "build_request_hash": build_request_hash, + "source_manifest": manifest_rel, + "source_manifest_hash": manifest_hash, + # --- v4.2 target top-level layers (mapping §1) --- + "metadata": { + "skill_id": skill_id, + "domain": request.get("domain"), + "title": request.get("title") or skill_id, + "size_tier": request.get("size_tier") or "lite", + "publisher": request.get("publisher") or "internal", + "status": "candidate", + }, + "competency_architecture": _competency_architecture(request, gaps), + "memory_system": _memory_system(request), + "governance_system": _governance_system(request), + "memory_governance": _memory_governance(), + "runtime": _runtime(request), + "context_graph": _context_graph(), + "interactions": _interactions(), + "evidence": _evidence(sources, gaps), + "security": _security(request), + "audit": _audit(build_request_hash), + "skill_lifecycle": _skill_lifecycle(), + "output_contract": _output_contract(request), + # --- threat-model-compatible flat mirrors (classify() reads these) --- + "risk_profile": request.get("risk_profile") + or {"default_risk": "low", "sensitive_actions": []}, + "governance": None, # filled below from governance_system mirror + "memory": None, # filled below from memory_system mirror + "tools": None, # filled below from runtime mirror + "sources": list(sources), + } + + # Wire the flat mirrors the threat-model tool consumes, derived (not + # duplicated by hand) from the structured layers above. + gov = candidate["governance_system"] + candidate["governance"] = { + "human_veto_required": gov["human_veto_required"], + "no_auto_external_action": gov["no_auto_external_action"], + "final_decision_owner": gov["final_decision_owner"], + "non_lowerable_rules": list(gov["non_lowerable_rules"]), + } + mem = candidate["memory_system"] + candidate["memory"] = { + "writes_long_term": mem["writes_long_term"], + "reads_private_context": mem["reads_private_context"], + "promotion_rules": list(mem["promotion_rules"]), + } + candidate["tools"] = { + "allowed": list(candidate["runtime"]["tools"]["allowed"]), + "forbidden": list(candidate["runtime"]["tools"]["forbidden"]), + } + + # Anti-mirage summary: surface (not hide) any premium-pass requirements. + candidate["premium_pass_status"] = { + "requires_human_premium_pass": bool(gaps), + "gaps": sorted(gaps), + "note": ( + "Missing domain information is surfaced as a gap, never " + "hallucinated. A clean candidate has an empty gaps list." + ), + } + + # candidate_hash over the deterministic core (no clock zone yet present). + candidate["candidate_hash"] = _sha256_obj(candidate) + return candidate + + +def render(candidate: dict[str, Any]) -> str: + return json.dumps(candidate, indent=2, sort_keys=True, ensure_ascii=False) + "\n" + + +def _scan_banned(text: str) -> list[str]: + low = text.lower() + return [s for s in BANNED_SUBSTRINGS if s in low] + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description="Internal x.klickd supply-chain candidate generator " + "(v4.2 target shape, non-normative).", + ) + parser.add_argument( + "--build-request", required=True, + help="path to a deterministic build_request JSON", + ) + parser.add_argument( + "--out", default=None, + help="output path for the candidate JSON (default: " + ".internal-skills/supply-chain/candidates/.json)", + ) + parser.add_argument( + "--quiet", action="store_true", + help="do not print the candidate to stdout", + ) + args = parser.parse_args(argv if argv is not None else sys.argv[1:]) + + req_path = Path(args.build_request) + try: + request, request_text = load_build_request(req_path) + candidate = build_candidate(request, request_text, req_path) + except BuildRequestError as exc: + print(f"FAIL (build_request): {exc}", file=sys.stderr) + return 1 + except OSError as exc: + print(f"FAIL (io): {exc}", file=sys.stderr) + return 2 + + serialized = render(candidate) + + banned = _scan_banned(serialized) + if banned: + print(f"FAIL: banned substring(s) in candidate: {banned}", file=sys.stderr) + return 1 + + if args.out: + out_path = Path(args.out) + else: + out_path = DEFAULT_OUT_DIR / f"{candidate['skill_id']}.json" + try: + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(serialized, encoding="utf-8") + except OSError as exc: + print(f"FAIL (io): {exc}", file=sys.stderr) + return 2 + + if not args.quiet: + sys.stdout.write(serialized) + print( + f"OK: candidate {candidate['skill_id']} (id {candidate['candidate_id']}, " + f"premium_pass_required=" + f"{candidate['premium_pass_status']['requires_human_premium_pass']}) " + f"-> {_rel(out_path)}", + file=sys.stderr, + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/run_supply_chain_promotion_gate.py b/scripts/run_supply_chain_promotion_gate.py new file mode 100644 index 0000000..044488d --- /dev/null +++ b/scripts/run_supply_chain_promotion_gate.py @@ -0,0 +1,565 @@ +#!/usr/bin/env python3 +"""x.klickd supply-chain — combined promotion gate (v0.1). + +The promotion gate is the pass/fail orchestrator for the documented +supply-chain pipeline. It runs the existing tool-backed checks on a candidate +skill, applies candidate schema/shape checks and public/private boundary +tripwires, and classifies the candidate as: + + ACCEPT -- no blocking findings, no premium-pass requirement + ACCEPT_WITH_REVIEW -- no blocking findings, but human review/premium pass + is required (gaps, review-bucket sources, mediums) + BLOCK -- at least one blocking finding + +NON-NORMATIVE. Internal only. The gate: + - does NOT run the premium pass; it only REPORTS whether one is required; + - makes NO compliance, legal, security-certification, or benchmark claim; + - produces NO release, tag, DOI, package, or deploy; + - asserts a check result ONLY when it actually ran the check. A check that + could not run is recorded as "not_run" with a reason, never as "pass". + +Orchestrated checks (each is run only if its tool is importable on this branch): + - threat model (scripts/generate_supply_chain_threat_model.py) + - source/license (scripts/check_supply_chain_sources.py) when a source + manifest is provided + - logical diff (scripts/generate_supply_chain_diff.py) when --before is + provided + - candidate schema/shape checks (built in here) + - forbidden-claims / public-private boundary tripwires (built in here) + +Determinism: + - The gate report's deterministic_gate_id is derived only from the candidate + hash plus the sorted, normalized check verdicts. Clock values live under + non_deterministic_zone and are excluded from the id. + +CLI: + python scripts/run_supply_chain_promotion_gate.py --candidate CAND.json + python scripts/run_supply_chain_promotion_gate.py --candidate CAND.json \ + --source-manifest SRC.json --before PREV.json --out gate.json --md gate.md + +Exit codes: + 0 ACCEPT or ACCEPT_WITH_REVIEW (acceptable) + 1 BLOCK (a blocking finding was raised) + 2 usage / I-O error +""" +from __future__ import annotations + +import argparse +import datetime as _dt +import hashlib +import importlib.util +import json +import sys +from pathlib import Path +from types import ModuleType +from typing import Any + +REPO_ROOT = Path(__file__).resolve().parents[1] +SCRIPTS_DIR = REPO_ROOT / "scripts" +DEFAULT_OUT_DIR = ( + REPO_ROOT / ".internal-skills" / "supply-chain" / "promotion-gate" +) + +GATE_SCHEMA_VERSION = "xklickd.promotion_gate.v0.1" + +# v4.2 internal target top-level layers a candidate MUST carry (mapping §1). +REQUIRED_CANDIDATE_LAYERS = ( + "metadata", + "competency_architecture", + "memory_system", + "governance_system", + "memory_governance", + "runtime", + "context_graph", + "interactions", + "evidence", + "security", + "audit", + "skill_lifecycle", + "output_contract", +) + +# Forbidden public claims (over-claim tripwire). Lower-cased substring match. +FORBIDDEN_CLAIM_SUBSTRINGS = ( + "universal standard", + "automatic gdpr", + "automatic eu ai act", + "automatically gdpr compliant", + "guaranteed compliance", + "benchmark superiority", + "proven benchmark", + "industry standard for all", +) + +# Internal codename that must never appear anywhere in a candidate. +FORBIDDEN_CODENAME_SUBSTRINGS = ( + "chimera", +) + +# The internal target-track name is allowed ONLY under the candidate's +# `internal_target` block; anywhere else is a leak of an internal name into a +# field that could surface publicly. +INTERNAL_TRACK_NAME = "xklickd_internal_skill_v4_2" + + +def _load_module(name: str, filename: str) -> ModuleType | None: + """Import a sibling script as a module, or return None if unavailable.""" + path = SCRIPTS_DIR / filename + if not path.exists(): + return None + spec = importlib.util.spec_from_file_location(name, path) + if not spec or not spec.loader: + return None + mod = importlib.util.module_from_spec(spec) + try: + spec.loader.exec_module(mod) + except Exception: # noqa: BLE001 - a broken tool must not crash the gate + return None + return mod + + +def _check( + name: str, + verdict: str, + blocking: list[str], + review: list[str], + detail: dict[str, Any] | None = None, +) -> dict[str, Any]: + """A single normalized check result. + + verdict in: pass | review | block | not_run + """ + return { + "check": name, + "verdict": verdict, + "blocking_findings": sorted(blocking), + "review_findings": sorted(review), + "detail": detail or {}, + } + + +def _not_run(name: str, reason: str) -> dict[str, Any]: + """A check that did not run. Its reason is a note, NOT a review finding. + + Recording the reason as a review finding would conflate "we skipped this" + with "a reviewer must look at this" and inflate the review count. A not_run + check contributes neither blocking nor review findings to the verdict. + """ + return { + "check": name, + "verdict": "not_run", + "blocking_findings": [], + "review_findings": [], + "detail": {"reason": reason}, + } + + +# --- individual checks ------------------------------------------------------- +def check_candidate_shape(candidate: dict[str, Any]) -> dict[str, Any]: + blocking: list[str] = [] + review: list[str] = [] + + if not isinstance(candidate, dict): + return _check("candidate_shape", "block", + ["candidate is not a JSON object"], []) + + for layer in REQUIRED_CANDIDATE_LAYERS: + if layer not in candidate: + blocking.append(f"missing required v4.2 layer: {layer}") + + if not candidate.get("skill_id"): + blocking.append("missing skill_id") + if not candidate.get("candidate_hash"): + review.append("missing candidate_hash (cannot anchor determinism)") + + # skill_lifecycle must not be literally named supply_chain and must not + # claim completeness. + sl = candidate.get("skill_lifecycle") or {} + if isinstance(sl, dict): + if sl.get("completeness_claimed") is True: + blocking.append( + "skill_lifecycle.completeness_claimed is true (supply-chain " + "completeness must not be claimed)" + ) + if str(sl.get("name", "")).lower() == "supply_chain": + blocking.append("skill_lifecycle must not be named 'supply_chain'") + + # output_contract.graph_bindings must be present (mapping §7). + oc = candidate.get("output_contract") or {} + if isinstance(oc, dict) and "graph_bindings" not in oc: + blocking.append("output_contract.graph_bindings missing") + + # Governance floor: human veto must be required and not lowerable. + gov = candidate.get("governance_system") or {} + hv = gov.get("human_veto") if isinstance(gov, dict) else None + if isinstance(hv, dict): + if not hv.get("required"): + blocking.append("governance_system.human_veto.required is not true") + if hv.get("lowerable") is True: + blocking.append("governance_system.human_veto is lowerable") + + verdict = "block" if blocking else ("review" if review else "pass") + return _check("candidate_shape", verdict, blocking, review, + {"required_layers": list(REQUIRED_CANDIDATE_LAYERS)}) + + +def check_boundary_tripwires(candidate_text: str, + candidate: dict[str, Any]) -> dict[str, Any]: + """Forbidden public claims + internal-name / private->public leakage.""" + blocking: list[str] = [] + review: list[str] = [] + low = candidate_text.lower() + + for sub in FORBIDDEN_CLAIM_SUBSTRINGS: + if sub in low: + blocking.append(f"forbidden public claim: {sub!r}") + + for sub in FORBIDDEN_CODENAME_SUBSTRINGS: + if sub in low: + blocking.append(f"internal codename leak: {sub!r}") + + # Internal track name is allowed only inside the internal_target block. + if INTERNAL_TRACK_NAME.lower() in low: + it = candidate.get("internal_target") + it_text = json.dumps(it, sort_keys=True).lower() if it else "" + # Count occurrences outside the internal_target block. + total = low.count(INTERNAL_TRACK_NAME.lower()) + inside = it_text.count(INTERNAL_TRACK_NAME.lower()) + if total > inside: + blocking.append( + "internal track name leaks outside internal_target block" + ) + + # Explicit private->public leak flags (mirrors threat-model contract). + if candidate.get("private_public_leak"): + blocking.append("candidate declares a private->public leak") + boundaries = candidate.get("boundaries") or {} + if isinstance(boundaries, dict) and boundaries.get("private_to_public_leak"): + blocking.append("boundaries.private_to_public_leak is set") + + # Public v4.2 over-claim: a candidate must not claim public v4.2. + it = candidate.get("internal_target") or {} + if isinstance(it, dict) and str(it.get("public_version", "")).startswith("v4.2"): + blocking.append("candidate claims public_version v4.2 (public stays v4.1)") + + verdict = "block" if blocking else ("review" if review else "pass") + return _check("boundary_tripwires", verdict, blocking, review) + + +def check_threat_model(candidate: dict[str, Any], candidate_path: str, + mod: ModuleType | None) -> dict[str, Any]: + if mod is None: + return _not_run("threat_model", + "threat-model tool not importable on this branch") + try: + report = mod.build_report(candidate, candidate_path) + except Exception as exc: # noqa: BLE001 + return _not_run("threat_model", f"threat-model tool errored: {exc}") + blocked = list(report.get("blocked_findings") or []) + mediums = report.get("summary", {}).get("by_severity", {}).get("medium", 0) + review = [f"{mediums} medium finding(s) to review"] if mediums else [] + verdict = "block" if blocked else ("review" if review else "pass") + return _check( + "threat_model", verdict, + [f"threat:{t}" for t in blocked], review, + {"deterministic_threat_model_id": + report.get("deterministic_threat_model_id"), + "summary": report.get("summary")}, + ) + + +def check_source_license(source_manifest: Path | None, + eval_date: _dt.date, + mod: ModuleType | None) -> dict[str, Any]: + if source_manifest is None: + return _not_run("source_license", "no --source-manifest provided") + if mod is None: + return _not_run("source_license", + "source-check tool not importable on this branch") + try: + manifest, text = mod.load_manifest(source_manifest) + report = mod.build_report(manifest, text, source_manifest, eval_date, 3) + except Exception as exc: # noqa: BLE001 + return _not_run("source_license", f"source-check could not run: {exc}") + summary = report.get("summary", {}) + blocked_n = summary.get("blocked", 0) + review_n = summary.get("review", 0) + blocking = [ + f"source:{f['id']}" for f in report.get("blocked_findings") or [] + ] + review = [f"{review_n} source(s) need review"] if review_n else [] + verdict = "block" if blocked_n else ("review" if review_n else "pass") + return _check("source_license", verdict, blocking, review, + {"deterministic_report_id": + report.get("deterministic_report_id"), + "summary": summary}) + + +def check_logical_diff(before: Path | None, candidate_path: Path, + mod: ModuleType | None) -> dict[str, Any]: + if before is None: + return _not_run("logical_diff", "no --before candidate provided") + if mod is None: + return _not_run("logical_diff", + "diff tool not importable on this branch") + try: + report = mod.build_report(before, candidate_path) + except Exception as exc: # noqa: BLE001 + return _not_run("logical_diff", f"diff tool could not run: {exc}") + blocked = report.get("blocked_findings") or [] + high = report.get("high_risk_findings") or [] + blocking = [f"diff:{f.get('path')}:{f.get('kind')}" for f in blocked] + review = [f"{len(high)} high-risk diff finding(s)"] if high else [] + verdict = "block" if blocked else ("review" if high else "pass") + return _check("logical_diff", verdict, blocking, review, + {"deterministic_diff_id": report.get("deterministic_diff_id"), + "summary": report.get("summary")}) + + +def check_premium_pass(candidate: dict[str, Any]) -> dict[str, Any]: + """Report (never run) whether a human premium pass is required.""" + status = candidate.get("premium_pass_status") or {} + gaps = list(status.get("gaps") or []) + requires = bool(status.get("requires_human_premium_pass")) or bool(gaps) + review = ( + [f"premium pass required for gap: {g}" for g in gaps] + if requires else [] + ) + # A premium-pass requirement is NOT blocking; it routes to ACCEPT_WITH_REVIEW. + verdict = "review" if requires else "pass" + return _check("premium_pass_required", verdict, [], review, + {"requires_human_premium_pass": requires, "gaps": sorted(gaps)}) + + +# --- orchestration ----------------------------------------------------------- +def _deterministic_gate_id(candidate_hash: str | None, + checks: list[dict[str, Any]]) -> str: + normalized = [ + { + "check": c["check"], + "verdict": c["verdict"], + "blocking_findings": c["blocking_findings"], + "review_findings": c["review_findings"], + } + for c in sorted(checks, key=lambda c: c["check"]) + ] + payload = json.dumps( + {"candidate_hash": candidate_hash, "checks": normalized}, + sort_keys=True, separators=(",", ":"), + ) + return "sha256:" + hashlib.sha256(payload.encode("utf-8")).hexdigest() + + +def run_gate( + candidate: dict[str, Any], + candidate_text: str, + candidate_path: Path, + source_manifest: Path | None, + before: Path | None, + eval_date: _dt.date, +) -> dict[str, Any]: + threat_mod = _load_module( + "sc_threat_model", "generate_supply_chain_threat_model.py" + ) + source_mod = _load_module( + "sc_source_check", "check_supply_chain_sources.py" + ) + diff_mod = _load_module("sc_diff", "generate_supply_chain_diff.py") + + checks: list[dict[str, Any]] = [ + check_candidate_shape(candidate), + check_boundary_tripwires(candidate_text, candidate), + check_threat_model(candidate, str(candidate_path), threat_mod), + check_source_license(source_manifest, eval_date, source_mod), + check_logical_diff(before, candidate_path, diff_mod), + check_premium_pass(candidate), + ] + + any_block = any(c["verdict"] == "block" for c in checks) + any_review = any(c["verdict"] == "review" for c in checks) + + if any_block: + classification = "BLOCK" + elif any_review: + classification = "ACCEPT_WITH_REVIEW" + else: + classification = "ACCEPT" + + all_blocking = sorted( + f for c in checks for f in c["blocking_findings"] + ) + all_review = sorted( + f for c in checks for f in c["review_findings"] + ) + + return { + "schema_version": GATE_SCHEMA_VERSION, + "kind": "xklickd_supply_chain_promotion_gate_report", + "non_normative": True, + "candidate_path": str(candidate_path), + "candidate_id": candidate.get("candidate_id"), + "candidate_hash": candidate.get("candidate_hash"), + "classification": classification, + "deterministic_gate_id": _deterministic_gate_id( + candidate.get("candidate_hash"), checks + ), + "summary": { + "checks_run": sum(1 for c in checks if c["verdict"] != "not_run"), + "checks_not_run": sum(1 for c in checks if c["verdict"] == "not_run"), + "blocking": len(all_blocking), + "review": len(all_review), + }, + "checks": checks, + "blocking_findings": all_blocking, + "review_findings": all_review, + "premium_pass_required": any( + c["check"] == "premium_pass_required" + and c["detail"].get("requires_human_premium_pass") + for c in checks + ), + "claim_boundaries": { + "is_security_certification": False, + "establishes_legal_compliance": False, + "is_full_automation": False, + "proves_loaded_executable_skill": False, + "runs_premium_pass": False, + "note": ("The gate orchestrates offline, stdlib-only checks and " + "reports whether a human premium pass is required; it " + "does not run that pass and makes no compliance claim."), + }, + "non_deterministic_zone": { + "evaluated_at": eval_date.isoformat(), + "note": ("evaluated_at is excluded from deterministic_gate_id."), + }, + } + + +def render_json(report: dict[str, Any]) -> str: + return json.dumps(report, indent=2, sort_keys=True, ensure_ascii=False) + "\n" + + +def render_md(report: dict[str, Any]) -> str: + lines = [ + f"# Supply-chain promotion gate — {report['classification']}", + "", + f"- **Candidate:** `{report.get('candidate_id')}`", + f"- **Gate id:** `{report['deterministic_gate_id']}`", + f"- **Premium pass required:** {report['premium_pass_required']}", + f"- **Blocking:** {report['summary']['blocking']} · " + f"**Review:** {report['summary']['review']} · " + f"**Checks run:** {report['summary']['checks_run']} · " + f"**Not run:** {report['summary']['checks_not_run']}", + "", + "## Checks", + "", + "| Check | Verdict | Blocking | Review |", + "|---|---|---|---|", + ] + for c in report["checks"]: + lines.append( + f"| {c['check']} | {c['verdict']} | " + f"{len(c['blocking_findings'])} | {len(c['review_findings'])} |" + ) + if report["blocking_findings"]: + lines += ["", "## Blocking findings", ""] + lines += [f"- {f}" for f in report["blocking_findings"]] + if report["review_findings"]: + lines += ["", "## Review findings", ""] + lines += [f"- {f}" for f in report["review_findings"]] + lines += [ + "", + "> NON-NORMATIVE. No release, no compliance claim. The gate reports " + "whether a human premium pass is required; it does not run one.", + "", + ] + return "\n".join(lines) + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description="Combined x.klickd supply-chain promotion gate " + "(non-normative).", + ) + parser.add_argument("--candidate", required=True, + help="path to candidate skill JSON") + parser.add_argument("--source-manifest", default=None, + help="optional source manifest for the source/license check") + parser.add_argument("--before", default=None, + help="optional prior candidate JSON for a logical diff") + parser.add_argument("--out", default=None, + help="path to write the gate report JSON " + "(default: .internal-skills/supply-chain/" + "promotion-gate/.gate.json)") + parser.add_argument("--md", default=None, + help="optional path to write a Markdown summary") + parser.add_argument("--eval-date", default=None, + help="ISO date for source freshness math " + "(default: today UTC)") + parser.add_argument("--quiet", action="store_true", + help="do not print the report JSON to stdout") + args = parser.parse_args(argv if argv is not None else sys.argv[1:]) + + cand_path = Path(args.candidate) + if not cand_path.exists(): + print(f"error: candidate not found: {cand_path}", file=sys.stderr) + return 2 + candidate_text = cand_path.read_text(encoding="utf-8") + try: + candidate = json.loads(candidate_text) + except json.JSONDecodeError as exc: + print(f"error: candidate JSON parse failed: {exc}", file=sys.stderr) + return 2 + if not isinstance(candidate, dict): + print("error: candidate root must be a JSON object", file=sys.stderr) + return 2 + + source_manifest = Path(args.source_manifest) if args.source_manifest else None + if source_manifest is not None and not source_manifest.exists(): + print(f"error: source manifest not found: {source_manifest}", + file=sys.stderr) + return 2 + before = Path(args.before) if args.before else None + if before is not None and not before.exists(): + print(f"error: --before not found: {before}", file=sys.stderr) + return 2 + + if args.eval_date: + try: + eval_date = _dt.date.fromisoformat(args.eval_date[:10]) + except ValueError: + print(f"error: invalid --eval-date: {args.eval_date}", file=sys.stderr) + return 2 + else: + eval_date = _dt.datetime.now(_dt.timezone.utc).date() + + report = run_gate( + candidate, candidate_text, cand_path, source_manifest, before, eval_date + ) + + serialized = render_json(report) + if args.out: + out_path = Path(args.out) + else: + out_path = DEFAULT_OUT_DIR / f"{cand_path.stem}.gate.json" + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(serialized, encoding="utf-8") + if args.md: + md_path = Path(args.md) + md_path.parent.mkdir(parents=True, exist_ok=True) + md_path.write_text(render_md(report), encoding="utf-8") + if not args.quiet: + sys.stdout.write(serialized) + + print( + f"GATE: {report['classification']} " + f"(blocking={report['summary']['blocking']}, " + f"review={report['summary']['review']}, " + f"premium_pass_required={report['premium_pass_required']}) " + f"id={report['deterministic_gate_id']}", + file=sys.stderr, + ) + return 1 if report["classification"] == "BLOCK" else 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/fixtures/supply_chain_candidate/build_request_clean.json b/tests/fixtures/supply_chain_candidate/build_request_clean.json new file mode 100644 index 0000000..5b8a497 --- /dev/null +++ b/tests/fixtures/supply_chain_candidate/build_request_clean.json @@ -0,0 +1,50 @@ +{ + "skill_id": "xklickd-research-reader", + "domain": "research", + "title": "Research Reader", + "size_tier": "lite", + "publisher": "internal", + "primary_domain_competencies": [ + "ESCO:research_methods", + "ESCO:academic_writing" + ], + "secondary_domain_competencies": [ + "ESCO:citation_management" + ], + "domain_risk_profile": { + "default_risk": "low", + "sensitive_actions": [] + }, + "domain_output_requirements": { + "format": "cited_summary" + }, + "risk_profile": { + "default_risk": "low", + "sensitive_actions": [] + }, + "tools": { + "allowed": ["read_file"], + "forbidden": [] + }, + "memory": { + "writes_long_term": false, + "reads_private_context": false + }, + "output_contract": { + "emits_public_output": true, + "allowed_outputs": ["text_response"] + }, + "sources": [ + { + "id": "skos", + "name": "skos-framework", + "title": "SKOS Reference", + "license": "CC-BY-4.0", + "freshness": "current", + "url": "https://example.org/skos", + "published_at": "2025-01-01", + "usage": "reference", + "category": "academic" + } + ] +} diff --git a/tests/fixtures/supply_chain_candidate/build_request_missing_domain.json b/tests/fixtures/supply_chain_candidate/build_request_missing_domain.json new file mode 100644 index 0000000..f424329 --- /dev/null +++ b/tests/fixtures/supply_chain_candidate/build_request_missing_domain.json @@ -0,0 +1,10 @@ +{ + "skill_id": "xklickd-medicine-advisor", + "domain": "medicine", + "title": "Medicine Advisor", + "size_tier": "lite", + "risk_profile": { + "default_risk": "low", + "sensitive_actions": [] + } +} diff --git a/tests/fixtures/supply_chain_candidate/source_manifest_ok.json b/tests/fixtures/supply_chain_candidate/source_manifest_ok.json new file mode 100644 index 0000000..667b9ed --- /dev/null +++ b/tests/fixtures/supply_chain_candidate/source_manifest_ok.json @@ -0,0 +1,14 @@ +{ + "schema_version": "xklickd.source_manifest.v0.1", + "sources": [ + { + "id": "skos", + "title": "SKOS Reference", + "license": "CC-BY-4.0", + "usage": "reference", + "url": "https://example.org/skos", + "published_at": "2025-01-01", + "category": "academic" + } + ] +} diff --git a/tests/test_supply_chain_candidate.py b/tests/test_supply_chain_candidate.py new file mode 100644 index 0000000..5825666 --- /dev/null +++ b/tests/test_supply_chain_candidate.py @@ -0,0 +1,229 @@ +"""Tests for scripts/generate_supply_chain_candidate.py. + +Internal candidate generator (v4.2 target shape). NON-NORMATIVE. These tests +assert the anti-mirage contract: deterministic ids, the full v4.2 layer set, +foundation/transversal competency anchors, and that missing domain information +is surfaced as `requires_human_premium_pass` rather than hallucinated. +""" +from __future__ import annotations + +import importlib.util +import json +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +SCRIPT = REPO_ROOT / "scripts" / "generate_supply_chain_candidate.py" +FIX = REPO_ROOT / "tests" / "fixtures" / "supply_chain_candidate" + +V4_2_LAYERS = ( + "metadata", + "competency_architecture", + "memory_system", + "governance_system", + "memory_governance", + "runtime", + "context_graph", + "interactions", + "evidence", + "security", + "audit", + "skill_lifecycle", + "output_contract", +) + + +def _load(): + spec = importlib.util.spec_from_file_location( + "generate_supply_chain_candidate", SCRIPT + ) + assert spec and spec.loader, f"could not load {SCRIPT}" + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +def _candidate(mod, name: str): + path = FIX / name + request, text = mod.load_build_request(path) + return mod.build_candidate(request, text, path) + + +# --- structural -------------------------------------------------------------- +def test_script_exists(): + assert SCRIPT.exists() + + +def test_candidate_has_all_v4_2_layers(): + mod = _load() + cand = _candidate(mod, "build_request_clean.json") + for layer in V4_2_LAYERS: + assert layer in cand, f"missing v4.2 layer: {layer}" + + +def test_foundation_and_transversal_counts(): + mod = _load() + cand = _candidate(mod, "build_request_clean.json") + core = cand["competency_architecture"]["competency_core"] + assert len(core["foundation_competencies"]) == 7 + assert len(core["transversal_competencies"]) == 12 + + +def test_harmonized_domain_names_present(): + mod = _load() + cand = _candidate(mod, "build_request_clean.json") + arch = cand["competency_architecture"] + for name in ( + "competency_core", + "primary_domain_competencies", + "secondary_domain_competencies", + "domain_risk_profile", + "domain_output_requirements", + ): + assert name in arch, f"missing harmonized arch layer: {name}" + + +def test_skill_lifecycle_not_named_supply_chain(): + mod = _load() + cand = _candidate(mod, "build_request_clean.json") + sl = cand["skill_lifecycle"] + assert sl.get("renamed_from") == "supply_chain" + assert sl.get("completeness_claimed") is False + # build_request stage exists but the lifecycle is not literally "supply_chain". + assert "build_request" in sl["stages"] + assert "promotion_gate" in sl["stages"] + + +def test_output_contract_graph_bindings(): + mod = _load() + cand = _candidate(mod, "build_request_clean.json") + gb = cand["output_contract"]["graph_bindings"] + for field in ( + "creates_action_node", + "requires_policy_node", + "requires_evidence_node", + "may_trigger_veto_edge", + "writes_audit_edge", + ): + assert field in gb, f"missing graph_binding: {field}" + + +def test_interactions_canonical_flow(): + mod = _load() + cand = _candidate(mod, "build_request_clean.json") + flow = cand["interactions"]["canonical_flow"] + assert flow[0] == "user_task" + assert "human_veto_if_required" in flow + assert flow[-1] == "memory_update_candidate" + + +# --- anti-mirage: no hallucination ------------------------------------------ +def test_missing_domain_info_triggers_premium_pass(): + mod = _load() + cand = _candidate(mod, "build_request_missing_domain.json") + status = cand["premium_pass_status"] + assert status["requires_human_premium_pass"] is True + # The specific missing-domain gaps must be named, not silently filled. + assert "competency_architecture.primary_domain_competencies" in status["gaps"] + assert "evidence.sources" in status["gaps"] + + +def test_missing_domain_does_not_hallucinate_competencies(): + mod = _load() + cand = _candidate(mod, "build_request_missing_domain.json") + primary = cand["competency_architecture"]["primary_domain_competencies"] + # Must be the gap marker, not an invented competency list. + assert isinstance(primary, dict) + assert primary.get("requires_human_premium_pass") is True + # No sources were declared, so none may appear. + assert cand["sources"] == [] + assert cand["evidence"]["sources"] == [] + + +def test_clean_candidate_has_no_premium_pass_requirement(): + mod = _load() + cand = _candidate(mod, "build_request_clean.json") + assert cand["premium_pass_status"]["requires_human_premium_pass"] is False + assert cand["premium_pass_status"]["gaps"] == [] + + +def test_sources_only_from_request(): + mod = _load() + cand = _candidate(mod, "build_request_clean.json") + ids = {s.get("id") for s in cand["sources"]} + assert ids == {"skos"} # exactly what the request declared, nothing added + + +# --- governance floor -------------------------------------------------------- +def test_governance_floor_is_safe_by_default(): + mod = _load() + cand = _candidate(mod, "build_request_missing_domain.json") + gov = cand["governance_system"] + assert gov["human_veto"]["required"] is True + assert gov["human_veto"]["lowerable"] is False + assert gov["human_veto_required"] is True + assert gov["no_auto_external_action"] is True + assert gov["final_decision_owner"].startswith("human") + + +def test_threat_model_flat_mirrors_present(): + mod = _load() + cand = _candidate(mod, "build_request_clean.json") + # The flat fields consumed by the threat-model tool must be present. + for key in ("governance", "memory", "tools", "risk_profile", + "output_contract", "sources", "skill_id"): + assert key in cand, f"missing threat-model flat field: {key}" + + +# --- determinism ------------------------------------------------------------- +def test_candidate_id_deterministic_across_runs(): + mod = _load() + a = _candidate(mod, "build_request_clean.json") + b = _candidate(mod, "build_request_clean.json") + assert a["candidate_id"] == b["candidate_id"] + assert a["candidate_hash"] == b["candidate_hash"] + assert a["run_id"] == b["run_id"] + + +def test_candidate_id_changes_with_input(): + mod = _load() + a = _candidate(mod, "build_request_clean.json") + b = _candidate(mod, "build_request_missing_domain.json") + assert a["candidate_id"] != b["candidate_id"] + + +def test_no_clock_field_in_deterministic_core(): + mod = _load() + cand = _candidate(mod, "build_request_clean.json") + # The candidate carries no top-level generated_at; any clock value would + # have to live in a quarantined zone, never in the hashed core. + assert "generated_at" not in cand + + +# --- CLI --------------------------------------------------------------------- +def test_cli_writes_candidate_and_exits_zero(tmp_path): + mod = _load() + out = tmp_path / "cand.json" + rc = mod.main([ + "--build-request", str(FIX / "build_request_clean.json"), + "--out", str(out), "--quiet", + ]) + assert rc == 0 + data = json.loads(out.read_text()) + assert data["skill_id"] == "xklickd-research-reader" + + +def test_cli_invalid_request_exits_one(tmp_path): + mod = _load() + bad = tmp_path / "bad.json" + bad.write_text(json.dumps({"domain": "x"})) # no skill_id + rc = mod.main(["--build-request", str(bad), "--quiet", + "--out", str(tmp_path / "o.json")]) + assert rc == 1 + + +def test_cli_missing_request_exits_two(tmp_path): + mod = _load() + rc = mod.main(["--build-request", str(tmp_path / "nope.json"), "--quiet", + "--out", str(tmp_path / "o.json")]) + # missing file is a build_request error -> exit 1 (cannot generate) + assert rc == 1 diff --git a/tests/test_supply_chain_promotion_gate.py b/tests/test_supply_chain_promotion_gate.py new file mode 100644 index 0000000..2122f14 --- /dev/null +++ b/tests/test_supply_chain_promotion_gate.py @@ -0,0 +1,267 @@ +"""Tests for scripts/run_supply_chain_promotion_gate.py. + +Combined promotion gate. NON-NORMATIVE. The gate orchestrates the existing +tool-backed checks, classifies ACCEPT / ACCEPT_WITH_REVIEW / BLOCK, and reports +(never runs) whether a human premium pass is required. A fixed --eval-date is +used so source freshness classification is reproducible. +""" +from __future__ import annotations + +import datetime as _dt +import importlib.util +import json +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +GATE_SCRIPT = REPO_ROOT / "scripts" / "run_supply_chain_promotion_gate.py" +GEN_SCRIPT = REPO_ROOT / "scripts" / "generate_supply_chain_candidate.py" +FIX = REPO_ROOT / "tests" / "fixtures" / "supply_chain_candidate" +EVAL_DATE = _dt.date(2026, 6, 2) + + +def _load(path: Path, name: str): + spec = importlib.util.spec_from_file_location(name, path) + assert spec and spec.loader, f"could not load {path}" + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +def _gate_mod(): + return _load(GATE_SCRIPT, "run_supply_chain_promotion_gate") + + +def _gen_mod(): + return _load(GEN_SCRIPT, "generate_supply_chain_candidate") + + +def _make_candidate(tmp_path: Path, request_name: str) -> Path: + """Generate a real candidate from a fixture build_request.""" + gen = _gen_mod() + out = tmp_path / f"{request_name}.cand.json" + rc = gen.main([ + "--build-request", str(FIX / request_name), + "--out", str(out), "--quiet", + ]) + assert rc == 0 + return out + + +def _run(gate, candidate_path: Path, **kw): + candidate_text = candidate_path.read_text() + candidate = json.loads(candidate_text) + return gate.run_gate( + candidate, candidate_text, candidate_path, + kw.get("source_manifest"), kw.get("before"), EVAL_DATE, + ) + + +# --- structural -------------------------------------------------------------- +def test_script_exists(): + assert GATE_SCRIPT.exists() + + +def test_clean_candidate_accepts(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + rep = _run(gate, cand) + assert rep["classification"] == "ACCEPT" + assert rep["summary"]["blocking"] == 0 + assert rep["premium_pass_required"] is False + + +def test_report_has_required_fields(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + rep = _run(gate, cand) + for field in ( + "schema_version", "classification", "deterministic_gate_id", + "summary", "checks", "blocking_findings", "review_findings", + "premium_pass_required", "claim_boundaries", "non_deterministic_zone", + ): + assert field in rep, f"missing gate field: {field}" + + +# --- premium pass / accept-with-review -------------------------------------- +def test_missing_domain_candidate_accepts_with_review(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_missing_domain.json") + rep = _run(gate, cand) + assert rep["classification"] == "ACCEPT_WITH_REVIEW" + assert rep["premium_pass_required"] is True + assert rep["summary"]["blocking"] == 0 + + +def test_gate_does_not_run_premium_pass(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_missing_domain.json") + rep = _run(gate, cand) + assert rep["claim_boundaries"]["runs_premium_pass"] is False + + +# --- block paths ------------------------------------------------------------- +def test_forbidden_claim_blocks(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + data = json.loads(cand.read_text()) + data["metadata"]["title"] = "the universal standard for everything" + leak = tmp_path / "claim.json" + leak.write_text(json.dumps(data)) + rep = _run(gate, leak) + assert rep["classification"] == "BLOCK" + assert any("forbidden public claim" in f for f in rep["blocking_findings"]) + + +def test_internal_codename_leak_blocks(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + data = json.loads(cand.read_text()) + data["metadata"]["note"] = "internal chimera reference" + leak = tmp_path / "codename.json" + leak.write_text(json.dumps(data)) + rep = _run(gate, leak) + assert rep["classification"] == "BLOCK" + assert any("codename" in f for f in rep["blocking_findings"]) + + +def test_private_public_leak_blocks(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + data = json.loads(cand.read_text()) + data["private_public_leak"] = True + leak = tmp_path / "leak.json" + leak.write_text(json.dumps(data)) + rep = _run(gate, leak) + assert rep["classification"] == "BLOCK" + assert any("private->public" in f for f in rep["blocking_findings"]) + + +def test_public_v4_2_overclaim_blocks(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + data = json.loads(cand.read_text()) + data["internal_target"]["public_version"] = "v4.2" + leak = tmp_path / "v42.json" + leak.write_text(json.dumps(data)) + rep = _run(gate, leak) + assert rep["classification"] == "BLOCK" + assert any("public_version v4.2" in f for f in rep["blocking_findings"]) + + +def test_missing_v4_2_layer_blocks(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + data = json.loads(cand.read_text()) + del data["governance_system"] + bad = tmp_path / "nolayer.json" + bad.write_text(json.dumps(data)) + rep = _run(gate, bad) + assert rep["classification"] == "BLOCK" + assert any("governance_system" in f for f in rep["blocking_findings"]) + + +def test_completeness_claim_blocks(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + data = json.loads(cand.read_text()) + data["skill_lifecycle"]["completeness_claimed"] = True + bad = tmp_path / "complete.json" + bad.write_text(json.dumps(data)) + rep = _run(gate, bad) + assert rep["classification"] == "BLOCK" + assert any("completeness" in f for f in rep["blocking_findings"]) + + +# --- orchestrated checks run honestly --------------------------------------- +def test_not_run_checks_recorded_without_inflating_review(tmp_path): + """A skipped check is recorded as not_run with a reason, not as a review + finding — otherwise a clean candidate would never be a clean ACCEPT.""" + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + rep = _run(gate, cand) # no source manifest, no before -> 2 not_run checks + not_run = [c for c in rep["checks"] if c["verdict"] == "not_run"] + assert len(not_run) >= 2 + for c in not_run: + assert c["review_findings"] == [] + assert "reason" in c["detail"] + assert rep["classification"] == "ACCEPT" + + +def test_source_manifest_check_runs_when_provided(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + rep = _run(gate, cand, + source_manifest=FIX / "source_manifest_ok.json") + sc = next(c for c in rep["checks"] if c["check"] == "source_license") + assert sc["verdict"] in ("pass", "review") # it ran + assert rep["classification"] in ("ACCEPT", "ACCEPT_WITH_REVIEW") + + +def test_threat_model_check_runs(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + rep = _run(gate, cand) + tm = next(c for c in rep["checks"] if c["check"] == "threat_model") + assert tm["verdict"] != "not_run" + assert tm["detail"].get("deterministic_threat_model_id") + + +# --- determinism ------------------------------------------------------------- +def test_gate_id_stable_across_runs(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + a = _run(gate, cand) + b = _run(gate, cand) + assert a["deterministic_gate_id"] == b["deterministic_gate_id"] + + +def test_gate_id_excludes_eval_date(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + text = cand.read_text() + data = json.loads(text) + a = gate.run_gate(data, text, cand, None, None, _dt.date(2026, 6, 2)) + b = gate.run_gate(data, text, cand, None, None, _dt.date(2030, 1, 1)) + assert a["deterministic_gate_id"] == b["deterministic_gate_id"] + + +# --- CLI exit codes ---------------------------------------------------------- +def test_cli_accept_exits_zero(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + out = tmp_path / "gate.json" + rc = gate.main(["--candidate", str(cand), "--out", str(out), + "--quiet", "--eval-date", "2026-06-02"]) + assert rc == 0 + assert json.loads(out.read_text())["classification"] == "ACCEPT" + + +def test_cli_block_exits_one(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + data = json.loads(cand.read_text()) + data["private_public_leak"] = True + bad = tmp_path / "leak.json" + bad.write_text(json.dumps(data)) + out = tmp_path / "gate.json" + rc = gate.main(["--candidate", str(bad), "--out", str(out), + "--quiet", "--eval-date", "2026-06-02"]) + assert rc == 1 + + +def test_cli_missing_candidate_exits_two(tmp_path): + gate = _gate_mod() + rc = gate.main(["--candidate", str(tmp_path / "nope.json"), "--quiet"]) + assert rc == 2 + + +def test_cli_writes_md_summary(tmp_path): + gate = _gate_mod() + cand = _make_candidate(tmp_path, "build_request_clean.json") + out = tmp_path / "gate.json" + md = tmp_path / "gate.md" + rc = gate.main(["--candidate", str(cand), "--out", str(out), + "--md", str(md), "--quiet", "--eval-date", "2026-06-02"]) + assert rc == 0 + assert md.exists() + assert "promotion gate" in md.read_text().lower()