From fe934ce6586b333803a371560a5aa1cc99880ecc Mon Sep 17 00:00:00 2001 From: Cliff Bell Date: Fri, 13 Feb 2026 23:42:42 -0600 Subject: [PATCH 1/3] Add CLAUDE.md with project context for Claude Code Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..e2642eb --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,92 @@ +# CLAUDE.md + +## Project Overview + +ZTXP (Zero Trust eXchange Protocol) is a vendor-neutral, cryptographically verifiable protocol for exchanging signed Trust Assertion Messages (TAMs) between Zero Trust Architecture components. The repo contains the protocol specification, a Python reference implementation, and an AWS lab with Terraform infrastructure. + +## Repository Structure + +``` +spec/ # Protocol specification (draft-ztxp-02.md) +reference/ # Python reference implementation (ztxpv0.2.py) +ztxb-aws-lab/ + infra/ # Terraform IaC (modular) + modules/ # vpc, kms, cognito, dynamodb, s3_policy_bundles, + # pdp_fargate, api_notes, ztxp_broker + app/ + lambdas/ # Python 3.12 Lambda handlers (notes_api, pep_authorizer, ztxp_broker) + pdp/ # OPA container (Dockerfile, Rego policies, build scripts) +docs/ # Flow diagrams (WIP) +``` + +## Tech Stack + +- **Spec**: Markdown (RFC-style) +- **Reference impl**: Python 3.x, Flask >=3.0.0, cryptography >=42.0.0, pyyaml >=6.0.0 +- **Crypto**: Ed25519/Ed448 signing, AWS KMS (ECC_NIST_P256) +- **Infrastructure**: Terraform >=1.5.0, AWS (Lambda, API Gateway v2, ECS Fargate, DynamoDB, Cognito, KMS) +- **Policy engine**: Open Policy Agent v0.64.1, Rego +- **Containers**: Docker, AWS ECR +- **Lambda runtime**: Python 3.12 + +## Architecture + +The ZTXP decision flow: + +``` +Client → Cognito Auth → API Gateway → PEP Lambda (signs TAM via KMS) + → ZTXP Broker Lambda → OPA PDP (verifies signature, evaluates Rego policy) + → Decision {allow|deny} → Notes API Lambda → DynamoDB +``` + +Core components: +- **PEP** (Policy Enforcement Point): Collects identity/device context, signs TAM +- **Broker**: Forwards signed TAMs to PDP +- **PDP** (Policy Decision Point): Verifies TAM signatures, evaluates authorization policy via OPA + +## Key Commands + +### Reference Implementation +```bash +# Generate Ed25519 keypair (stored in ~/.ztxp/) +python reference/ztxpv0.2.py keygen + +# Sign a TAM +python reference/ztxpv0.2.py sign input.yaml output.json + +# Verify a signed TAM +python reference/ztxpv0.2.py validate signed.json + +# Run broker server +python reference/ztxpv0.2.py broker --host 0.0.0.0 --port 8080 +``` + +### AWS Lab Infrastructure +```bash +cd ztxb-aws-lab/infra +terraform init +terraform plan +terraform apply +``` + +### PDP Container +```bash +cd ztxb-aws-lab/app/pdp/scripts +./build.sh # Build Docker image and push to ECR +``` + +## Code Conventions + +- Spec changes go in `spec/`, code in `reference/` +- Infrastructure is modular Terraform — each module has `main.tf`, `variables.tf`, `outputs.tf`, `iam.tf` +- Lambda handlers follow stub pattern: log event, return response (real logic TBD) +- TAM canonicalization: UTF-8, sorted JSON keys, no whitespace +- License: Apache 2.0 + +## Current State + +- Specification: Draft-02 +- Reference implementation: Functional minimal toolkit (261 lines) +- AWS lab: Infrastructure skeleton with stub Lambda handlers +- Tests: No test suite yet +- CI/CD: Not yet implemented From 1457d229224f3c1a9d015b589c0e375691894d01 Mon Sep 17 00:00:00 2001 From: Cliff Bell Date: Fri, 13 Feb 2026 23:49:10 -0600 Subject: [PATCH 2/3] Move CLAUDE.md to .gitignore as local-only file Co-Authored-By: Claude Opus 4.6 --- .gitignore | 3 ++ CLAUDE.md | 92 ------------------------------------------------------ 2 files changed, 3 insertions(+), 92 deletions(-) delete mode 100644 CLAUDE.md diff --git a/.gitignore b/.gitignore index 7756436..211a3f1 100644 --- a/.gitignore +++ b/.gitignore @@ -216,3 +216,6 @@ ztxb-aws-lab/infra/terraform.tfstate.backup ztxb-aws-lab/infra/modules/api_notes/*.zip ztxb-aws-lab/infra/modules/ztxp_broker/*.zip + +# Claude Code +CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index e2642eb..0000000 --- a/CLAUDE.md +++ /dev/null @@ -1,92 +0,0 @@ -# CLAUDE.md - -## Project Overview - -ZTXP (Zero Trust eXchange Protocol) is a vendor-neutral, cryptographically verifiable protocol for exchanging signed Trust Assertion Messages (TAMs) between Zero Trust Architecture components. The repo contains the protocol specification, a Python reference implementation, and an AWS lab with Terraform infrastructure. - -## Repository Structure - -``` -spec/ # Protocol specification (draft-ztxp-02.md) -reference/ # Python reference implementation (ztxpv0.2.py) -ztxb-aws-lab/ - infra/ # Terraform IaC (modular) - modules/ # vpc, kms, cognito, dynamodb, s3_policy_bundles, - # pdp_fargate, api_notes, ztxp_broker - app/ - lambdas/ # Python 3.12 Lambda handlers (notes_api, pep_authorizer, ztxp_broker) - pdp/ # OPA container (Dockerfile, Rego policies, build scripts) -docs/ # Flow diagrams (WIP) -``` - -## Tech Stack - -- **Spec**: Markdown (RFC-style) -- **Reference impl**: Python 3.x, Flask >=3.0.0, cryptography >=42.0.0, pyyaml >=6.0.0 -- **Crypto**: Ed25519/Ed448 signing, AWS KMS (ECC_NIST_P256) -- **Infrastructure**: Terraform >=1.5.0, AWS (Lambda, API Gateway v2, ECS Fargate, DynamoDB, Cognito, KMS) -- **Policy engine**: Open Policy Agent v0.64.1, Rego -- **Containers**: Docker, AWS ECR -- **Lambda runtime**: Python 3.12 - -## Architecture - -The ZTXP decision flow: - -``` -Client → Cognito Auth → API Gateway → PEP Lambda (signs TAM via KMS) - → ZTXP Broker Lambda → OPA PDP (verifies signature, evaluates Rego policy) - → Decision {allow|deny} → Notes API Lambda → DynamoDB -``` - -Core components: -- **PEP** (Policy Enforcement Point): Collects identity/device context, signs TAM -- **Broker**: Forwards signed TAMs to PDP -- **PDP** (Policy Decision Point): Verifies TAM signatures, evaluates authorization policy via OPA - -## Key Commands - -### Reference Implementation -```bash -# Generate Ed25519 keypair (stored in ~/.ztxp/) -python reference/ztxpv0.2.py keygen - -# Sign a TAM -python reference/ztxpv0.2.py sign input.yaml output.json - -# Verify a signed TAM -python reference/ztxpv0.2.py validate signed.json - -# Run broker server -python reference/ztxpv0.2.py broker --host 0.0.0.0 --port 8080 -``` - -### AWS Lab Infrastructure -```bash -cd ztxb-aws-lab/infra -terraform init -terraform plan -terraform apply -``` - -### PDP Container -```bash -cd ztxb-aws-lab/app/pdp/scripts -./build.sh # Build Docker image and push to ECR -``` - -## Code Conventions - -- Spec changes go in `spec/`, code in `reference/` -- Infrastructure is modular Terraform — each module has `main.tf`, `variables.tf`, `outputs.tf`, `iam.tf` -- Lambda handlers follow stub pattern: log event, return response (real logic TBD) -- TAM canonicalization: UTF-8, sorted JSON keys, no whitespace -- License: Apache 2.0 - -## Current State - -- Specification: Draft-02 -- Reference implementation: Functional minimal toolkit (261 lines) -- AWS lab: Infrastructure skeleton with stub Lambda handlers -- Tests: No test suite yet -- CI/CD: Not yet implemented From 308574d740f2d05cdfbb73d83518a2d8bc03758e Mon Sep 17 00:00:00 2001 From: Cliff Bell Date: Sat, 14 Feb 2026 20:39:12 -0600 Subject: [PATCH 3/3] Implement full ZTXP security flow, tests, and infrastructure fixes - PEP Lambda: build TAM from request context, sign with KMS (ECDSA_SHA_256), call Broker; extract JWT sub claim for principal identity - Broker Lambda: verify KMS signature, replay protection via timestamp TTL, forward to OPA PDP; deny on any verification failure - Notes Lambda: full DynamoDB CRUD (list/get/create/update/delete), scoped to authenticated principal from authorizer context - OPA policy: updated to rego.v1 syntax, added admin rule, device compliance check, risk score threshold (< 70 for writes) - OPA tests: 13 Rego unit tests covering all allow/deny paths - Python tests: 35 unit tests (all passing) for all three handlers - Terraform: API Gateway $default stages (both APIs), PDP ALB made internal, dedicated security groups for ALB and ECS task, IAM execution role for ECS with ECR pull permissions, CloudWatch log config for OPA container, KMS_KEY_ARN wired to broker, WAF with rate limiting + AWS managed common rule set - CI: GitHub Actions workflow (Python tests, OPA tests, Terraform validate) - Dockerfile: load full policy directory instead of single file Co-Authored-By: Claude Sonnet 4.5 --- .github/workflows/ci.yml | 57 +++++ ztxb-aws-lab/app/lambdas/notes_api/handler.py | 136 ++++++++++-- .../app/lambdas/pep_authorizer/handler.py | 192 +++++++++++++++-- .../app/lambdas/ztxp_broker/handler.py | 195 ++++++++++++++++-- ztxb-aws-lab/app/pdp/Dockerfile | 4 +- ztxb-aws-lab/app/pdp/policy/authz.rego | 65 ++++-- ztxb-aws-lab/app/pdp/policy/authz_test.rego | 124 +++++++++++ ztxb-aws-lab/app/pdp/policy/data.json | 1 + ztxb-aws-lab/infra/modules/api_notes/api.tf | 10 + ztxb-aws-lab/infra/modules/api_notes/waf.tf | 70 +++++++ .../infra/modules/pdp_fargate/security.tf | 126 +++++++++++ .../infra/modules/pdp_fargate/service.tf | 17 +- .../infra/modules/ztxp_broker/main.tf | 9 +- ztxb-aws-lab/tests/test_broker.py | 121 +++++++++++ ztxb-aws-lab/tests/test_notes_api.py | 117 +++++++++++ ztxb-aws-lab/tests/test_pep_authorizer.py | 132 ++++++++++++ 16 files changed, 1313 insertions(+), 63 deletions(-) create mode 100644 .github/workflows/ci.yml create mode 100644 ztxb-aws-lab/app/pdp/policy/authz_test.rego create mode 100644 ztxb-aws-lab/tests/test_broker.py create mode 100644 ztxb-aws-lab/tests/test_notes_api.py create mode 100644 ztxb-aws-lab/tests/test_pep_authorizer.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..806ffe8 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,57 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + python-tests: + name: Python Tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: pip install pytest boto3 + + - name: Run Lambda unit tests + run: pytest ztxb-aws-lab/tests/ -v + + opa-tests: + name: OPA Policy Tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install OPA + run: | + curl -L -o opa https://openpolicyagent.org/downloads/v0.64.1/opa_linux_amd64_static + chmod +x opa + sudo mv opa /usr/local/bin/ + + - name: Run Rego tests + run: opa test ztxb-aws-lab/app/pdp/policy/ -v + + terraform-validate: + name: Terraform Validate + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: hashicorp/setup-terraform@v3 + with: + terraform_version: "1.5.0" + + - name: Terraform Init + working-directory: ztxb-aws-lab/infra + run: terraform init -backend=false + + - name: Terraform Validate + working-directory: ztxb-aws-lab/infra + run: terraform validate diff --git a/ztxb-aws-lab/app/lambdas/notes_api/handler.py b/ztxb-aws-lab/app/lambdas/notes_api/handler.py index e363f8e..63d0953 100644 --- a/ztxb-aws-lab/app/lambdas/notes_api/handler.py +++ b/ztxb-aws-lab/app/lambdas/notes_api/handler.py @@ -1,32 +1,134 @@ # app/lambdas/notes_api/handler.py +""" +Notes API — a simple CRUD service backed by DynamoDB. + +Only reachable after the PEP authorizer grants access. The +authorizer injects context (principalId, ztxp_decision) which +this handler uses to scope queries to the authenticated user. +""" import json import os import logging +import uuid +from datetime import datetime, timezone + +import boto3 +from boto3.dynamodb.conditions import Key logger = logging.getLogger() logger.setLevel(logging.INFO) TABLE_NAME = os.environ.get("TABLE_NAME", "unknown") +ddb = boto3.resource("dynamodb") +table = ddb.Table(TABLE_NAME) -def lambda_handler(event, context): - """ - Minimal stub Notes API. - Later we can wire this up to DynamoDB for real CRUD. - """ - logger.info("Received event: %s", json.dumps(event)) - - method = event.get("requestContext", {}).get("http", {}).get("method", "GET") - path = event.get("rawPath", "/notes") - - body = { - "message": "Notes API stub", - "method": method, - "path": path, - "table": TABLE_NAME, - } +def _response(status, body): return { - "statusCode": 200, + "statusCode": status, "headers": {"Content-Type": "application/json"}, "body": json.dumps(body), } + + +def _user_id(event): + """Extract the authenticated user from the authorizer context.""" + auth_ctx = event.get("requestContext", {}).get("authorizer", {}).get("lambda", {}) + return auth_ctx.get("principalId", "anonymous") + + +def _note_id_from_path(event): + """Extract note_id from path parameters (/notes/{note_id}).""" + params = event.get("pathParameters") or {} + proxy = params.get("proxy", "") + return proxy.strip("/") if proxy else None + + +# --------------------------------------------------------------------------- +# CRUD operations +# --------------------------------------------------------------------------- + +def list_notes(user_id): + resp = table.query(KeyConditionExpression=Key("user_id").eq(user_id)) + return _response(200, {"notes": resp.get("Items", [])}) + + +def get_note(user_id, note_id): + resp = table.get_item(Key={"user_id": user_id, "note_id": note_id}) + item = resp.get("Item") + if not item: + return _response(404, {"error": "not_found"}) + return _response(200, item) + + +def create_note(user_id, body): + note_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + item = { + "user_id": user_id, + "note_id": note_id, + "title": body.get("title", ""), + "content": body.get("content", ""), + "created_at": now, + "updated_at": now, + } + table.put_item(Item=item) + return _response(201, item) + + +def update_note(user_id, note_id, body): + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + resp = table.update_item( + Key={"user_id": user_id, "note_id": note_id}, + UpdateExpression="SET title = :t, content = :c, updated_at = :u", + ExpressionAttributeValues={ + ":t": body.get("title", ""), + ":c": body.get("content", ""), + ":u": now, + }, + ConditionExpression="attribute_exists(user_id)", + ReturnValues="ALL_NEW", + ) + return _response(200, resp.get("Attributes", {})) + + +def delete_note(user_id, note_id): + table.delete_item( + Key={"user_id": user_id, "note_id": note_id}, + ConditionExpression="attribute_exists(user_id)", + ) + return _response(200, {"deleted": note_id}) + + +# --------------------------------------------------------------------------- +# Lambda entry point +# --------------------------------------------------------------------------- + +def lambda_handler(event, context): + logger.info("Notes API invoked") + + method = event.get("requestContext", {}).get("http", {}).get("method", "GET") + user_id = _user_id(event) + note_id = _note_id_from_path(event) + + try: + body = json.loads(event.get("body") or "{}") if event.get("body") else {} + except json.JSONDecodeError: + return _response(400, {"error": "invalid_json"}) + + try: + if method == "GET" and not note_id: + return list_notes(user_id) + elif method == "GET" and note_id: + return get_note(user_id, note_id) + elif method == "POST": + return create_note(user_id, body) + elif method == "PUT" and note_id: + return update_note(user_id, note_id, body) + elif method == "DELETE" and note_id: + return delete_note(user_id, note_id) + else: + return _response(405, {"error": "method_not_allowed"}) + except Exception as exc: + logger.error("Notes API error: %s", exc, exc_info=True) + return _response(500, {"error": "internal_error"}) diff --git a/ztxb-aws-lab/app/lambdas/pep_authorizer/handler.py b/ztxb-aws-lab/app/lambdas/pep_authorizer/handler.py index dca1e82..84fee45 100644 --- a/ztxb-aws-lab/app/lambdas/pep_authorizer/handler.py +++ b/ztxb-aws-lab/app/lambdas/pep_authorizer/handler.py @@ -1,27 +1,191 @@ # app/lambdas/pep_authorizer/handler.py +""" +PEP (Policy Enforcement Point) — HTTP API v2 REQUEST authorizer. + +Collects identity and device context from the incoming request, +constructs a ZTXP Trust Assertion Message (TAM), signs it with +AWS KMS (ECDSA_SHA_256 / P-256), and forwards it to the ZTXP +Broker for a policy decision. + +If the Broker says "allow", the request proceeds to the Notes API. +Otherwise the request is denied at the gateway. +""" +import base64 +import hashlib import json import logging +import os +import uuid +from datetime import datetime, timezone + +import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) -def lambda_handler(event, context): - """ - Minimal PEP stub for HTTP API v2 REQUEST authorizer. - For now: allow every request. - Later: build ZTXP TAM, sign with KMS, call broker. +KMS_KEY_ARN = os.environ.get("KMS_KEY_ARN", "") +BROKER_URL = os.environ.get("BROKER_URL", "") + +kms_client = boto3.client("kms") + +# --------------------------------------------------------------------------- +# TAM helpers +# --------------------------------------------------------------------------- + +def canonical_json(data): + """Deterministic JSON serialisation for signing (sorted keys, no whitespace).""" + return json.dumps(data, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + + +def build_tam(event): + """Extract identity / device / resource context from the API Gateway event + and assemble a TAM according to the ZTXP v0.2 spec.""" + + request_context = event.get("requestContext", {}) + http_info = request_context.get("http", {}) + headers = event.get("headers", {}) + + # --- Identity (from Cognito JWT or Authorization header) --- + auth_header = headers.get("authorization", "") + principal_id = "anonymous" + groups = [] + if auth_header: + principal_id = _extract_sub_from_jwt(auth_header) or auth_header[:40] + + # --- Device context (forwarded by client headers) --- + device_id = headers.get("x-device-id", "unknown") + device_compliant = headers.get("x-device-compliant", "true").lower() == "true" + device_trust = headers.get("x-device-trust", "low-risk") + + # --- Resource --- + method = http_info.get("method", "GET") + path = http_info.get("path", "/") + action = "notes:Write" if method in ("POST", "PUT", "DELETE") else "notes:Read" + + tam = { + "version": "0.2", + "message_id": str(uuid.uuid4()), + "issued_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "issuer": "ztxp://pep.ztxp-aws-lab", + "subject": { + "id": f"user:{principal_id}", + "role": "authenticated", + "groups": groups, + }, + "device": { + "id": f"device:{device_id}", + "posture": { + "compliant": device_compliant, + }, + }, + "context": { + "risk_score": 0, + "device_trust": device_trust, + "source_ip": http_info.get("sourceIp", "0.0.0.0"), + "session_id": request_context.get("requestId", ""), + }, + "resource": { + "id": f"app://notes{path}", + "action": action, + }, + } + return tam + + +def sign_tam(tam): + """Sign the TAM with KMS (ECDSA_SHA_256 on P-256 key). + + KMS Sign with ECDSA_SHA_256 and MessageType=DIGEST expects us + to SHA-256 the canonical payload ourselves. """ - logger.info("Authorizer event: %s", json.dumps(event)) + payload = canonical_json(tam) + digest = hashlib.sha256(payload).digest() + + response = kms_client.sign( + KeyId=KMS_KEY_ARN, + Message=digest, + MessageType="DIGEST", + SigningAlgorithm="ECDSA_SHA_256", + ) + sig_bytes = response["Signature"] + tam["signature"] = { + "alg": "ECDSA_SHA_256", + "key_id": KMS_KEY_ARN, + "sig": base64.b64encode(sig_bytes).decode(), + } + return tam + + +# --------------------------------------------------------------------------- +# Broker call +# --------------------------------------------------------------------------- + +def call_broker(signed_tam): + """POST the signed TAM to the ZTXP Broker and return its decision.""" + from urllib.request import Request, urlopen + from urllib.error import URLError + + url = f"{BROKER_URL}/ztxp/evaluate" + body = json.dumps({"tam": signed_tam}).encode("utf-8") + req = Request(url, data=body, headers={"Content-Type": "application/json"}, method="POST") + + try: + with urlopen(req, timeout=4) as resp: + return json.loads(resp.read().decode()) + except (URLError, Exception) as exc: + logger.error("Broker call failed: %s", exc) + return {"decision": "deny", "reason": "broker_unreachable"} + + +# --------------------------------------------------------------------------- +# JWT helper (lightweight — no external deps) +# --------------------------------------------------------------------------- + +def _extract_sub_from_jwt(auth_header): + """Best-effort extraction of 'sub' claim from a Bearer JWT. + We do NOT verify the JWT here — Cognito + API Gateway handle that. + We only need the subject identifier for the TAM.""" + try: + token = auth_header.replace("Bearer ", "").strip() + payload_segment = token.split(".")[1] + padding = 4 - len(payload_segment) % 4 + payload_segment += "=" * padding + claims = json.loads(base64.b64decode(payload_segment)) + return claims.get("sub") or claims.get("email") or claims.get("cognito:username") + except Exception: + return None + + +# --------------------------------------------------------------------------- +# Lambda entry point +# --------------------------------------------------------------------------- + +def lambda_handler(event, context): + logger.info("PEP authorizer invoked") + + # 1. Build the TAM from request context + tam = build_tam(event) + + # 2. Sign with KMS + try: + signed_tam = sign_tam(tam) + except Exception as exc: + logger.error("KMS signing failed: %s", exc) + return {"isAuthorized": False, "context": {"reason": "signing_failed"}} + + # 3. Forward to the Broker for a policy decision + decision = call_broker(signed_tam) + logger.info("Broker decision: %s", json.dumps(decision)) - # You can extract identity info here for future use - identity = event.get("identity", {}) or {} - principal_id = identity.get("user", "anonymous") + allowed = decision.get("decision") == "allow" - # HTTP API v2 request authorizer response shape + # 4. Return authorizer response to API Gateway return { - "isAuthorized": True, + "isAuthorized": allowed, "context": { - "principalId": principal_id, - "ztxp_decision": "allow_stub" - } + "principalId": tam["subject"]["id"], + "ztxp_decision": decision.get("decision", "deny"), + "ztxp_reason": decision.get("reason", ""), + "ztxp_message_id": tam["message_id"], + }, } diff --git a/ztxb-aws-lab/app/lambdas/ztxp_broker/handler.py b/ztxb-aws-lab/app/lambdas/ztxp_broker/handler.py index 404c4a3..8e64b1a 100644 --- a/ztxb-aws-lab/app/lambdas/ztxp_broker/handler.py +++ b/ztxb-aws-lab/app/lambdas/ztxp_broker/handler.py @@ -1,31 +1,202 @@ # app/lambdas/ztxp_broker/handler.py +""" +ZTXP Broker — receives signed TAMs from the PEP, verifies the +signature via KMS, then forwards the TAM payload to the PDP +(Open Policy Agent) for a policy decision. + +Security flow: + 1. Parse TAM from request body + 2. Verify ECDSA_SHA_256 signature against KMS public key + 3. Validate timestamp freshness (reject replay > 600 s) + 4. POST TAM fields to OPA at PDP_URL for policy evaluation + 5. Return the allow/deny decision +""" +import base64 +import hashlib import json import logging import os +from datetime import datetime, timezone, timedelta + +import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) -PDP_URL = os.environ.get("PDP_URL", "http://pdp-placeholder") +PDP_URL = os.environ.get("PDP_URL", "") +KMS_KEY_ARN = os.environ.get("KMS_KEY_ARN", "") +TAM_TTL_SECONDS = int(os.environ.get("TAM_TTL_SECONDS", "600")) -def lambda_handler(event, context): +kms_client = boto3.client("kms") + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def canonical_json(data): + return json.dumps(data, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + + +def _error(status, message): + return { + "statusCode": status, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps({"decision": "deny", "reason": message}), + } + + +# --------------------------------------------------------------------------- +# Signature verification +# --------------------------------------------------------------------------- + +def verify_signature(tam): + """Verify the TAM signature using KMS Verify (ECDSA_SHA_256). + + Returns True if valid, raises ValueError otherwise. """ - Minimal ZTXP broker stub. - For now: log the TAM and return an 'allow' decision. - Later: forward to PDP_URL and return real PDP decision. + sig_block = tam.get("signature") + if not sig_block: + raise ValueError("missing_signature") + + sig_bytes = base64.b64decode(sig_block["sig"]) + + # Reconstruct the canonical payload (everything except "signature") + tam_copy = {k: v for k, v in tam.items() if k != "signature"} + payload = canonical_json(tam_copy) + digest = hashlib.sha256(payload).digest() + + key_id = sig_block.get("key_id", KMS_KEY_ARN) + + response = kms_client.verify( + KeyId=key_id, + Message=digest, + MessageType="DIGEST", + Signature=sig_bytes, + SigningAlgorithm="ECDSA_SHA_256", + ) + + if not response.get("SignatureValid"): + raise ValueError("invalid_signature") + + return True + + +def verify_timestamp(tam): + """Reject TAMs whose issued_at is older than TAM_TTL_SECONDS.""" + issued_at_str = tam.get("issued_at", "") + if not issued_at_str: + raise ValueError("missing_timestamp") + + try: + issued_at = datetime.strptime(issued_at_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) + except ValueError: + issued_at = datetime.fromisoformat(issued_at_str.replace("Z", "+00:00")) + + age = datetime.now(timezone.utc) - issued_at + if age > timedelta(seconds=TAM_TTL_SECONDS): + raise ValueError(f"tam_expired (age={int(age.total_seconds())}s, ttl={TAM_TTL_SECONDS}s)") + if age < timedelta(seconds=-60): + raise ValueError("tam_from_future") + + return True + + +# --------------------------------------------------------------------------- +# PDP call (OPA) +# --------------------------------------------------------------------------- + +def call_pdp(tam): + """Forward the TAM to OPA for policy evaluation. + + OPA expects: + POST /v1/data/authz/allow + { "input": { ... } } + + We map TAM fields to the OPA input schema that authz.rego expects. """ - logger.info("Broker event: %s", json.dumps(event)) + from urllib.request import Request, urlopen + from urllib.error import URLError + + opa_input = { + "action": tam.get("resource", {}).get("action", ""), + "principal": { + "id": tam.get("subject", {}).get("id", ""), + "role": tam.get("subject", {}).get("role", ""), + "groups": tam.get("subject", {}).get("groups", []), + }, + "resource": tam.get("resource", {}), + "context": { + "device_trust": tam.get("context", {}).get("device_trust", "unknown"), + "risk_score": tam.get("context", {}).get("risk_score", 100), + "compliant": tam.get("device", {}).get("posture", {}).get("compliant", False), + }, + } + + url = f"http://{PDP_URL}/v1/data/authz/allow" + body = json.dumps({"input": opa_input}).encode("utf-8") + req = Request(url, data=body, headers={"Content-Type": "application/json"}, method="POST") + + try: + with urlopen(req, timeout=3) as resp: + result = json.loads(resp.read().decode()) + return result.get("result", False) + except (URLError, Exception) as exc: + logger.error("PDP call failed: %s", exc) + return False + + +# --------------------------------------------------------------------------- +# Lambda entry point +# --------------------------------------------------------------------------- + +def lambda_handler(event, context): + logger.info("Broker invoked") + + # Parse the TAM from the request body + try: + body = event.get("body", "") + if isinstance(body, str): + body = json.loads(body) + tam = body.get("tam") if isinstance(body, dict) else None + if not tam: + return _error(400, "missing_tam") + except (json.JSONDecodeError, AttributeError): + return _error(400, "invalid_json") + + # 1. Verify signature + try: + verify_signature(tam) + except ValueError as exc: + logger.warning("Signature verification failed: %s", exc) + return _error(403, f"signature_rejected: {exc}") + except Exception as exc: + logger.error("KMS verify error: %s", exc) + return _error(500, "verification_error") + + # 2. Verify timestamp freshness (replay protection) + try: + verify_timestamp(tam) + except ValueError as exc: + logger.warning("Timestamp check failed: %s", exc) + return _error(403, f"timestamp_rejected: {exc}") + + # 3. Forward to PDP for policy decision + allowed = call_pdp(tam) + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + decision = "allow" if allowed else "deny" + reason = "policy_allow" if allowed else "policy_deny" - # In the real flow, event["body"] would be the signed TAM JSON. - # For now just echo it and pretend PDP said allow. + logger.info("Decision for message_id=%s: %s", tam.get("message_id"), decision) return { "statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps({ - "decision": "allow", - "reason": "broker_stub", - "evaluated_at": "stub", - "expires_in": 300, + "decision": decision, + "reason": reason, + "evaluated_at": now, + "expires_in": 300 if allowed else 0, + "message_id": tam.get("message_id", ""), }), } diff --git a/ztxb-aws-lab/app/pdp/Dockerfile b/ztxb-aws-lab/app/pdp/Dockerfile index b11048f..f2712f8 100644 --- a/ztxb-aws-lab/app/pdp/Dockerfile +++ b/ztxb-aws-lab/app/pdp/Dockerfile @@ -7,5 +7,5 @@ COPY policy /policy EXPOSE 8181 -# Run OPA in server mode and load our policy package -CMD ["run", "--server", "/policy/authz.rego", "--addr", ":8181"] +# Run OPA in server mode, load the entire policy directory +CMD ["run", "--server", "--addr", ":8181", "/policy"] diff --git a/ztxb-aws-lab/app/pdp/policy/authz.rego b/ztxb-aws-lab/app/pdp/policy/authz.rego index 6f2c9ca..03725f6 100644 --- a/ztxb-aws-lab/app/pdp/policy/authz.rego +++ b/ztxb-aws-lab/app/pdp/policy/authz.rego @@ -1,27 +1,62 @@ # app/pdp/policy/authz.rego +# +# ZTXP authorization policy for the Notes API. +# +# Input schema (mapped from TAM by the Broker): +# input.action - "notes:Read" | "notes:Write" +# input.principal.id - "user:" +# input.principal.role - e.g. "authenticated" +# input.principal.groups - ["writer", "admin", ...] +# input.context.device_trust - "low-risk" | "medium-risk" | "high-risk" +# input.context.risk_score - integer 0-100 +# input.context.compliant - boolean +# input.resource.id - "app://notes/..." +# input.resource.action - same as input.action package authz -# Default deny -default allow = false +import rego.v1 -# Anyone can read notes -allow { - input.action == "notes:Read" +# Default deny — every request is blocked unless an allow rule fires +default allow := false + +# ----------------------------------------------------------------------- +# Read access: anyone with a non-high-risk device can read +# ----------------------------------------------------------------------- +allow if { + input.action == "notes:Read" + not high_risk_device +} + +# ----------------------------------------------------------------------- +# Write access: must be in the "writer" group, device compliant and +# not flagged high-risk, and risk score below threshold +# ----------------------------------------------------------------------- +allow if { + input.action == "notes:Write" + user_in_group("writer") + input.context.compliant == true + not high_risk_device + input.context.risk_score < 70 } -# Writers can write, as long as device is not marked high-risk -allow { - input.action == "notes:Write" - user_in_group("writer") - not high_risk_device +# ----------------------------------------------------------------------- +# Admin override: admins can do anything from compliant devices +# ----------------------------------------------------------------------- +allow if { + user_in_group("admin") + input.context.compliant == true + not high_risk_device } -user_in_group(g) { - some idx - g == input.principal.groups[idx] +# ----------------------------------------------------------------------- +# Helper rules +# ----------------------------------------------------------------------- +user_in_group(g) if { + some group in input.principal.groups + group == g } -high_risk_device { - input.context.device_trust == "high-risk" +high_risk_device if { + input.context.device_trust == "high-risk" } diff --git a/ztxb-aws-lab/app/pdp/policy/authz_test.rego b/ztxb-aws-lab/app/pdp/policy/authz_test.rego new file mode 100644 index 0000000..2f55c43 --- /dev/null +++ b/ztxb-aws-lab/app/pdp/policy/authz_test.rego @@ -0,0 +1,124 @@ +# app/pdp/policy/authz_test.rego +# +# Run: opa test ./policy -v + +package authz_test + +import rego.v1 +import data.authz + +# ----------------------------------------------------------------------- +# Read access +# ----------------------------------------------------------------------- + +test_read_allowed_low_risk if { + authz.allow with input as { + "action": "notes:Read", + "principal": {"id": "user:alice", "groups": []}, + "context": {"device_trust": "low-risk", "risk_score": 10, "compliant": true}, + } +} + +test_read_denied_high_risk_device if { + not authz.allow with input as { + "action": "notes:Read", + "principal": {"id": "user:alice", "groups": []}, + "context": {"device_trust": "high-risk", "risk_score": 10, "compliant": true}, + } +} + +# ----------------------------------------------------------------------- +# Write access +# ----------------------------------------------------------------------- + +test_write_allowed_writer_compliant if { + authz.allow with input as { + "action": "notes:Write", + "principal": {"id": "user:bob", "groups": ["writer"]}, + "context": {"device_trust": "low-risk", "risk_score": 30, "compliant": true}, + } +} + +test_write_denied_no_writer_group if { + not authz.allow with input as { + "action": "notes:Write", + "principal": {"id": "user:charlie", "groups": ["reader"]}, + "context": {"device_trust": "low-risk", "risk_score": 10, "compliant": true}, + } +} + +test_write_denied_high_risk if { + not authz.allow with input as { + "action": "notes:Write", + "principal": {"id": "user:bob", "groups": ["writer"]}, + "context": {"device_trust": "high-risk", "risk_score": 30, "compliant": true}, + } +} + +test_write_denied_non_compliant if { + not authz.allow with input as { + "action": "notes:Write", + "principal": {"id": "user:bob", "groups": ["writer"]}, + "context": {"device_trust": "low-risk", "risk_score": 30, "compliant": false}, + } +} + +test_write_denied_high_risk_score if { + not authz.allow with input as { + "action": "notes:Write", + "principal": {"id": "user:bob", "groups": ["writer"]}, + "context": {"device_trust": "low-risk", "risk_score": 85, "compliant": true}, + } +} + +# ----------------------------------------------------------------------- +# Admin access +# ----------------------------------------------------------------------- + +test_admin_can_write if { + authz.allow with input as { + "action": "notes:Write", + "principal": {"id": "user:admin", "groups": ["admin"]}, + "context": {"device_trust": "low-risk", "risk_score": 10, "compliant": true}, + } +} + +test_admin_can_read if { + authz.allow with input as { + "action": "notes:Read", + "principal": {"id": "user:admin", "groups": ["admin"]}, + "context": {"device_trust": "low-risk", "risk_score": 10, "compliant": true}, + } +} + +test_admin_denied_high_risk_device if { + not authz.allow with input as { + "action": "notes:Write", + "principal": {"id": "user:admin", "groups": ["admin"]}, + "context": {"device_trust": "high-risk", "risk_score": 10, "compliant": true}, + } +} + +test_admin_denied_non_compliant if { + not authz.allow with input as { + "action": "notes:Write", + "principal": {"id": "user:admin", "groups": ["admin"]}, + "context": {"device_trust": "low-risk", "risk_score": 10, "compliant": false}, + } +} + +# ----------------------------------------------------------------------- +# Default deny +# ----------------------------------------------------------------------- + +test_unknown_action_denied if { + not authz.allow with input as { + "action": "notes:Delete", + "principal": {"id": "user:alice", "groups": []}, + "context": {"device_trust": "low-risk", "risk_score": 10, "compliant": true}, + } +} + +test_empty_input_denied if { + not authz.allow with input as {} +} diff --git a/ztxb-aws-lab/app/pdp/policy/data.json b/ztxb-aws-lab/app/pdp/policy/data.json index e69de29..9e26dfe 100644 --- a/ztxb-aws-lab/app/pdp/policy/data.json +++ b/ztxb-aws-lab/app/pdp/policy/data.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/ztxb-aws-lab/infra/modules/api_notes/api.tf b/ztxb-aws-lab/infra/modules/api_notes/api.tf index b1ec35e..4118268 100644 --- a/ztxb-aws-lab/infra/modules/api_notes/api.tf +++ b/ztxb-aws-lab/infra/modules/api_notes/api.tf @@ -39,6 +39,16 @@ resource "aws_apigatewayv2_route" "notes_route" { authorization_type = "CUSTOM" } +############################################### +# STAGE +############################################### + +resource "aws_apigatewayv2_stage" "notes_default" { + api_id = aws_apigatewayv2_api.notes_http.id + name = "$default" + auto_deploy = true +} + ############################################### # PERMISSIONS ############################################### diff --git a/ztxb-aws-lab/infra/modules/api_notes/waf.tf b/ztxb-aws-lab/infra/modules/api_notes/waf.tf index e69de29..f556b1f 100644 --- a/ztxb-aws-lab/infra/modules/api_notes/waf.tf +++ b/ztxb-aws-lab/infra/modules/api_notes/waf.tf @@ -0,0 +1,70 @@ +############################################### +# WAF v2 — rate limiting + AWS managed rules +############################################### + +resource "aws_wafv2_web_acl" "notes" { + name = "${var.project}-notes-waf" + scope = "REGIONAL" + description = "WAF for Notes API" + + default_action { + allow {} + } + + # Rate limit: 500 requests per 5 minutes per IP + rule { + name = "rate-limit" + priority = 1 + + action { + block {} + } + + statement { + rate_based_statement { + limit = 500 + aggregate_key_type = "IP" + } + } + + visibility_config { + sampled_requests_enabled = true + cloudwatch_metrics_enabled = true + metric_name = "${var.project}-rate-limit" + } + } + + # AWS Managed — common attack patterns (SQLi, XSS, etc.) + rule { + name = "aws-common-rules" + priority = 2 + + override_action { + none {} + } + + statement { + managed_rule_group_statement { + vendor_name = "AWS" + name = "AWSManagedRulesCommonRuleSet" + } + } + + visibility_config { + sampled_requests_enabled = true + cloudwatch_metrics_enabled = true + metric_name = "${var.project}-common-rules" + } + } + + visibility_config { + sampled_requests_enabled = true + cloudwatch_metrics_enabled = true + metric_name = "${var.project}-notes-waf" + } +} + +resource "aws_wafv2_web_acl_association" "notes" { + resource_arn = aws_apigatewayv2_stage.notes_default.arn + web_acl_arn = aws_wafv2_web_acl.notes.arn +} diff --git a/ztxb-aws-lab/infra/modules/pdp_fargate/security.tf b/ztxb-aws-lab/infra/modules/pdp_fargate/security.tf index e69de29..50811bc 100644 --- a/ztxb-aws-lab/infra/modules/pdp_fargate/security.tf +++ b/ztxb-aws-lab/infra/modules/pdp_fargate/security.tf @@ -0,0 +1,126 @@ +############################################### +# IAM: EXECUTION & TASK ROLES +############################################### + +resource "aws_iam_role" "pdp_exec" { + name = "${var.project}-pdp-exec-role" + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Effect = "Allow" + Principal = { Service = "ecs-tasks.amazonaws.com" } + Action = "sts:AssumeRole" + } + ] + }) +} + +resource "aws_iam_role_policy_attachment" "pdp_exec_ecr" { + role = aws_iam_role.pdp_exec.name + policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy" +} + +resource "aws_iam_role" "pdp_task" { + name = "${var.project}-pdp-task-role" + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Effect = "Allow" + Principal = { Service = "ecs-tasks.amazonaws.com" } + Action = "sts:AssumeRole" + } + ] + }) +} + +############################################### +# TASK DEFINITION +############################################### + +resource "aws_ecs_task_definition" "pdp" { + family = "${var.project}-pdp" + network_mode = "awsvpc" + requires_compatibilities = ["FARGATE"] + cpu = "256" + memory = "512" + + execution_role_arn = aws_iam_role.pdp_exec.arn + task_role_arn = aws_iam_role.pdp_task.arn + + container_definitions = jsonencode([ + { + name = "opa" + image = var.image + essential = true + portMappings = [ + { + containerPort = 8181 + hostPort = 8181 + } + ] + logConfiguration = { + logDriver = "awslogs" + options = { + "awslogs-group" = "/ecs/${var.project}-pdp" + "awslogs-region" = var.region + "awslogs-stream-prefix" = "opa" + "awslogs-create-group" = "true" + } + } + } + ]) +} + +############################################### +# SECURITY GROUPS +############################################### + +resource "aws_security_group" "pdp_alb" { + name = "${var.project}-pdp-alb-sg" + description = "Allow HTTP to PDP ALB from VPC" + vpc_id = var.vpc_id + + ingress { + description = "HTTP from VPC" + from_port = 80 + to_port = 80 + protocol = "tcp" + cidr_blocks = ["10.42.0.0/16"] + } + + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + tags = { Name = "${var.project}-pdp-alb-sg" } +} + +resource "aws_security_group" "pdp_task" { + name = "${var.project}-pdp-task-sg" + description = "Allow OPA traffic from ALB only" + vpc_id = var.vpc_id + + ingress { + description = "OPA from ALB" + from_port = 8181 + to_port = 8181 + protocol = "tcp" + security_groups = [aws_security_group.pdp_alb.id] + } + + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + tags = { Name = "${var.project}-pdp-task-sg" } +} diff --git a/ztxb-aws-lab/infra/modules/pdp_fargate/service.tf b/ztxb-aws-lab/infra/modules/pdp_fargate/service.tf index 5b95816..25f35ca 100644 --- a/ztxb-aws-lab/infra/modules/pdp_fargate/service.tf +++ b/ztxb-aws-lab/infra/modules/pdp_fargate/service.tf @@ -1,7 +1,12 @@ +############################################### +# ALB (internal — only reachable from VPC) +############################################### + resource "aws_lb" "pdp" { name = "${var.project}-pdp-alb" load_balancer_type = "application" - security_groups = [var.default_sg_id] + internal = true + security_groups = [aws_security_group.pdp_alb.id] subnets = var.public_subnet_ids } @@ -35,6 +40,10 @@ resource "aws_lb_listener" "pdp_http" { } } +############################################### +# ECS SERVICE +############################################### + resource "aws_ecs_service" "pdp" { name = "${var.project}-pdp-service" cluster = aws_ecs_cluster.this.id @@ -45,7 +54,7 @@ resource "aws_ecs_service" "pdp" { network_configuration { assign_public_ip = true subnets = var.public_subnet_ids - security_groups = [var.default_sg_id] + security_groups = [aws_security_group.pdp_task.id] } load_balancer { @@ -55,6 +64,10 @@ resource "aws_ecs_service" "pdp" { } } +############################################### +# OUTPUT +############################################### + output "pdp_url" { value = aws_lb.pdp.dns_name } diff --git a/ztxb-aws-lab/infra/modules/ztxp_broker/main.tf b/ztxb-aws-lab/infra/modules/ztxp_broker/main.tf index 37ccca8..d38cf9a 100644 --- a/ztxb-aws-lab/infra/modules/ztxp_broker/main.tf +++ b/ztxb-aws-lab/infra/modules/ztxp_broker/main.tf @@ -19,7 +19,8 @@ resource "aws_lambda_function" "broker" { environment { variables = { - PDP_URL = var.pdp_url + PDP_URL = var.pdp_url + KMS_KEY_ARN = var.kms_key_arn } } } @@ -46,6 +47,12 @@ resource "aws_apigatewayv2_route" "broker_evaluate" { target = "integrations/${aws_apigatewayv2_integration.broker_lambda.id}" } +resource "aws_apigatewayv2_stage" "broker_default" { + api_id = aws_apigatewayv2_api.broker_http.id + name = "$default" + auto_deploy = true +} + resource "aws_lambda_permission" "broker_invoke" { statement_id = "AllowAPIGatewayInvokeBroker" action = "lambda:InvokeFunction" diff --git a/ztxb-aws-lab/tests/test_broker.py b/ztxb-aws-lab/tests/test_broker.py new file mode 100644 index 0000000..9ac8685 --- /dev/null +++ b/ztxb-aws-lab/tests/test_broker.py @@ -0,0 +1,121 @@ +# tests/test_broker.py +"""Unit tests for the ZTXP Broker Lambda handler.""" +import importlib +import importlib.util +import json +import os +from unittest.mock import patch +from datetime import datetime, timezone, timedelta + +import pytest + +_broker_dir = os.path.join(os.path.dirname(__file__), "..", "app", "lambdas", "ztxp_broker") + +with patch.dict(os.environ, {"PDP_URL": "pdp.internal", "KMS_KEY_ARN": "arn:aws:kms:us-east-1:123456789012:key/test-key"}): + with patch("boto3.client"): + spec = importlib.util.spec_from_file_location("broker_handler", os.path.join(_broker_dir, "handler.py")) + broker = importlib.util.module_from_spec(spec) + spec.loader.exec_module(broker) + + +def _now_iso(): + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _make_tam(issued_at=None, signature=True): + tam = { + "version": "0.2", + "message_id": "test-msg-001", + "issued_at": issued_at or _now_iso(), + "issuer": "ztxp://pep.test", + "subject": {"id": "user:alice", "role": "authenticated", "groups": ["writer"]}, + "device": {"id": "device:abc", "posture": {"compliant": True}}, + "context": {"risk_score": 20, "device_trust": "low-risk"}, + "resource": {"id": "app://notes", "action": "notes:Read"}, + } + if signature: + tam["signature"] = {"alg": "ECDSA_SHA_256", "key_id": "arn:aws:kms:test", "sig": "dGVzdA=="} + return tam + + +def _apigw_event(body): + return {"body": json.dumps(body)} + + +class TestVerifyTimestamp: + def test_fresh_timestamp(self): + tam = _make_tam(signature=False) + assert broker.verify_timestamp(tam) is True + + def test_expired_timestamp(self): + old = (datetime.now(timezone.utc) - timedelta(seconds=700)).strftime("%Y-%m-%dT%H:%M:%SZ") + tam = _make_tam(issued_at=old, signature=False) + with pytest.raises(ValueError, match="tam_expired"): + broker.verify_timestamp(tam) + + def test_future_timestamp(self): + future = (datetime.now(timezone.utc) + timedelta(minutes=5)).strftime("%Y-%m-%dT%H:%M:%SZ") + tam = _make_tam(issued_at=future, signature=False) + with pytest.raises(ValueError, match="tam_from_future"): + broker.verify_timestamp(tam) + + def test_missing_timestamp(self): + tam = _make_tam(signature=False) + tam.pop("issued_at") + with pytest.raises(ValueError, match="missing_timestamp"): + broker.verify_timestamp(tam) + + +class TestCanonicalJson: + def test_sorted_keys(self): + result = broker.canonical_json({"z": 1, "a": 2}) + assert result == b'{"a":2,"z":1}' + + +class TestLambdaHandler: + @patch.object(broker, "call_pdp", return_value=True) + @patch.object(broker, "verify_signature") + def test_allow_flow(self, mock_verify, mock_pdp): + event = _apigw_event({"tam": _make_tam()}) + result = broker.lambda_handler(event, None) + body = json.loads(result["body"]) + + assert result["statusCode"] == 200 + assert body["decision"] == "allow" + mock_verify.assert_called_once() + mock_pdp.assert_called_once() + + @patch.object(broker, "call_pdp", return_value=False) + @patch.object(broker, "verify_signature") + def test_deny_flow(self, mock_verify, mock_pdp): + event = _apigw_event({"tam": _make_tam()}) + result = broker.lambda_handler(event, None) + body = json.loads(result["body"]) + + assert body["decision"] == "deny" + assert body["reason"] == "policy_deny" + + def test_missing_tam(self): + event = _apigw_event({"not_tam": {}}) + result = broker.lambda_handler(event, None) + assert result["statusCode"] == 400 + + def test_invalid_json(self): + event = {"body": "not json"} + result = broker.lambda_handler(event, None) + assert result["statusCode"] == 400 + + @patch.object(broker, "verify_signature", side_effect=ValueError("invalid_signature")) + def test_bad_signature(self, mock_verify): + event = _apigw_event({"tam": _make_tam()}) + result = broker.lambda_handler(event, None) + assert result["statusCode"] == 403 + assert "signature_rejected" in json.loads(result["body"])["reason"] + + @patch.object(broker, "verify_signature") + def test_expired_tam(self, mock_verify): + old = (datetime.now(timezone.utc) - timedelta(seconds=700)).strftime("%Y-%m-%dT%H:%M:%SZ") + event = _apigw_event({"tam": _make_tam(issued_at=old)}) + result = broker.lambda_handler(event, None) + assert result["statusCode"] == 403 + assert "timestamp_rejected" in json.loads(result["body"])["reason"] diff --git a/ztxb-aws-lab/tests/test_notes_api.py b/ztxb-aws-lab/tests/test_notes_api.py new file mode 100644 index 0000000..521914e --- /dev/null +++ b/ztxb-aws-lab/tests/test_notes_api.py @@ -0,0 +1,117 @@ +# tests/test_notes_api.py +"""Unit tests for the Notes API Lambda handler.""" +import importlib +import importlib.util +import json +import os +from unittest.mock import patch, MagicMock + +import pytest + +_notes_dir = os.path.join(os.path.dirname(__file__), "..", "app", "lambdas", "notes_api") + +mock_table = MagicMock() +mock_ddb_resource = MagicMock() +mock_ddb_resource.Table.return_value = mock_table + +with patch.dict(os.environ, {"TABLE_NAME": "test-notes"}): + with patch("boto3.resource", return_value=mock_ddb_resource): + spec = importlib.util.spec_from_file_location("notes_handler", os.path.join(_notes_dir, "handler.py")) + notes = importlib.util.module_from_spec(spec) + spec.loader.exec_module(notes) + notes.table = mock_table + + +def _make_event(method="GET", proxy="", body=None, principal_id="user:alice"): + event = { + "requestContext": { + "http": {"method": method}, + "authorizer": {"lambda": {"principalId": principal_id}}, + }, + "pathParameters": {"proxy": proxy} if proxy else None, + } + if body: + event["body"] = json.dumps(body) + return event + + +class TestUserIdExtraction: + def test_extracts_principal(self): + event = _make_event(principal_id="user:bob") + assert notes._user_id(event) == "user:bob" + + def test_defaults_to_anonymous(self): + event = {"requestContext": {}} + assert notes._user_id(event) == "anonymous" + + +class TestNoteIdFromPath: + def test_extracts_note_id(self): + event = _make_event(proxy="abc-123") + assert notes._note_id_from_path(event) == "abc-123" + + def test_none_when_empty(self): + event = _make_event() + assert notes._note_id_from_path(event) is None + + +class TestLambdaHandler: + def test_list_notes(self): + mock_table.query.return_value = {"Items": [{"note_id": "1", "title": "Test"}]} + result = notes.lambda_handler(_make_event(method="GET"), None) + body = json.loads(result["body"]) + + assert result["statusCode"] == 200 + assert len(body["notes"]) == 1 + + def test_get_note(self): + mock_table.get_item.return_value = {"Item": {"note_id": "abc", "title": "Hello"}} + result = notes.lambda_handler(_make_event(method="GET", proxy="abc"), None) + + assert result["statusCode"] == 200 + assert json.loads(result["body"])["title"] == "Hello" + + def test_get_note_not_found(self): + mock_table.get_item.return_value = {} + result = notes.lambda_handler(_make_event(method="GET", proxy="missing"), None) + + assert result["statusCode"] == 404 + + def test_create_note(self): + mock_table.put_item.return_value = {} + result = notes.lambda_handler( + _make_event(method="POST", body={"title": "New", "content": "Body"}), None + ) + body = json.loads(result["body"]) + + assert result["statusCode"] == 201 + assert body["title"] == "New" + assert "note_id" in body + assert "created_at" in body + + def test_update_note(self): + mock_table.update_item.return_value = {"Attributes": {"title": "Updated"}} + result = notes.lambda_handler( + _make_event(method="PUT", proxy="abc", body={"title": "Updated", "content": "New body"}), + None, + ) + + assert result["statusCode"] == 200 + assert json.loads(result["body"])["title"] == "Updated" + + def test_delete_note(self): + mock_table.delete_item.return_value = {} + result = notes.lambda_handler(_make_event(method="DELETE", proxy="abc"), None) + + assert result["statusCode"] == 200 + assert json.loads(result["body"])["deleted"] == "abc" + + def test_method_not_allowed(self): + result = notes.lambda_handler(_make_event(method="PATCH"), None) + assert result["statusCode"] == 405 + + def test_invalid_json_body(self): + event = _make_event(method="POST") + event["body"] = "not-json" + result = notes.lambda_handler(event, None) + assert result["statusCode"] == 400 diff --git a/ztxb-aws-lab/tests/test_pep_authorizer.py b/ztxb-aws-lab/tests/test_pep_authorizer.py new file mode 100644 index 0000000..d5ac4a9 --- /dev/null +++ b/ztxb-aws-lab/tests/test_pep_authorizer.py @@ -0,0 +1,132 @@ +# tests/test_pep_authorizer.py +"""Unit tests for the PEP Authorizer Lambda handler.""" +import base64 +import importlib +import json +import sys +import os +from unittest.mock import patch, MagicMock +from datetime import datetime, timezone + +import pytest + +# Use importlib to avoid module name collisions between handler.py files +_pep_dir = os.path.join(os.path.dirname(__file__), "..", "app", "lambdas", "pep_authorizer") + +with patch.dict(os.environ, {"KMS_KEY_ARN": "arn:aws:kms:us-east-1:123456789012:key/test-key", "BROKER_URL": "https://broker.example.com"}): + with patch("boto3.client"): + spec = importlib.util.spec_from_file_location("pep_handler", os.path.join(_pep_dir, "handler.py")) + pep = importlib.util.module_from_spec(spec) + spec.loader.exec_module(pep) + + +def _make_event(method="GET", path="/notes", auth_header="", extra_headers=None): + headers = {"authorization": auth_header} + if extra_headers: + headers.update(extra_headers) + return { + "requestContext": { + "http": {"method": method, "path": path, "sourceIp": "10.0.0.1"}, + "requestId": "test-req-123", + }, + "headers": headers, + } + + +class TestBuildTam: + def test_basic_tam_structure(self): + event = _make_event() + tam = pep.build_tam(event) + + assert tam["version"] == "0.2" + assert "message_id" in tam + assert "issued_at" in tam + assert tam["issuer"] == "ztxp://pep.ztxp-aws-lab" + assert tam["subject"]["id"] == "user:anonymous" + assert tam["resource"]["action"] == "notes:Read" + + def test_write_action_for_post(self): + event = _make_event(method="POST") + tam = pep.build_tam(event) + assert tam["resource"]["action"] == "notes:Write" + + def test_write_action_for_put(self): + event = _make_event(method="PUT") + tam = pep.build_tam(event) + assert tam["resource"]["action"] == "notes:Write" + + def test_write_action_for_delete(self): + event = _make_event(method="DELETE") + tam = pep.build_tam(event) + assert tam["resource"]["action"] == "notes:Write" + + def test_read_action_for_get(self): + event = _make_event(method="GET") + tam = pep.build_tam(event) + assert tam["resource"]["action"] == "notes:Read" + + def test_device_context_headers(self): + event = _make_event(extra_headers={ + "x-device-id": "laptop-42", + "x-device-compliant": "false", + "x-device-trust": "high-risk", + }) + tam = pep.build_tam(event) + assert tam["device"]["id"] == "device:laptop-42" + assert tam["device"]["posture"]["compliant"] is False + assert tam["context"]["device_trust"] == "high-risk" + + def test_jwt_subject_extraction(self): + claims = {"sub": "user-abc-123", "email": "test@example.com"} + payload = base64.b64encode(json.dumps(claims).encode()).decode().rstrip("=") + fake_jwt = f"eyJhbGciOiJSUzI1NiJ9.{payload}.fake-sig" + + event = _make_event(auth_header=f"Bearer {fake_jwt}") + tam = pep.build_tam(event) + assert tam["subject"]["id"] == "user:user-abc-123" + + +class TestCanonicalJson: + def test_deterministic(self): + data = {"b": 2, "a": 1} + result = pep.canonical_json(data) + assert result == b'{"a":1,"b":2}' + + def test_no_whitespace(self): + data = {"key": "value"} + result = pep.canonical_json(data) + assert b" " not in result + + +class TestLambdaHandler: + @patch.object(pep, "call_broker") + @patch.object(pep, "sign_tam") + def test_allow_decision(self, mock_sign, mock_broker): + mock_sign.side_effect = lambda tam: {**tam, "signature": {"alg": "test", "sig": "abc", "key_id": "k"}} + mock_broker.return_value = {"decision": "allow", "reason": "policy_allow"} + + event = _make_event() + result = pep.lambda_handler(event, None) + + assert result["isAuthorized"] is True + assert result["context"]["ztxp_decision"] == "allow" + + @patch.object(pep, "call_broker") + @patch.object(pep, "sign_tam") + def test_deny_decision(self, mock_sign, mock_broker): + mock_sign.side_effect = lambda tam: {**tam, "signature": {"alg": "test", "sig": "abc", "key_id": "k"}} + mock_broker.return_value = {"decision": "deny", "reason": "policy_deny"} + + event = _make_event() + result = pep.lambda_handler(event, None) + + assert result["isAuthorized"] is False + assert result["context"]["ztxp_decision"] == "deny" + + @patch.object(pep, "sign_tam", side_effect=Exception("KMS error")) + def test_signing_failure_denies(self, mock_sign): + event = _make_event() + result = pep.lambda_handler(event, None) + + assert result["isAuthorized"] is False + assert result["context"]["reason"] == "signing_failed"