diff --git a/docs/DESIGN.md b/docs/DESIGN.md index a3cf279..4fa19a6 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -170,6 +170,11 @@ sequenceDiagram 2. Find workload by matching verified token claims against workload database: - GitHub: match `repo_owner`, `repo_name`, `repo_owner_id` - Jenkins: match `issuer` +6. Workload-Specific Claim Verification + - GitHub: `event_name` must be in the allowlist `{push, workflow_dispatch}`. + Excludes triggers that can be indirectly driven by non-maintainers + (`pull_request_target`, `workflow_run`, `issue_comment`, …). + - Jenkins: no additional checks. ## 4. API Design diff --git a/pia/main.py b/pia/main.py index ce7340c..a9e9ade 100644 --- a/pia/main.py +++ b/pia/main.py @@ -18,6 +18,7 @@ find_dt_project, find_workload_by_claims, is_issuer_known, + verify_workload_claims, ) # Configure logging @@ -134,6 +135,12 @@ async def authenticate( ) _401("No matching workload found for token claims") + # Workload-type-specific claim verification (e.g. GitHub event_name allowlist) + reason = verify_workload_claims(workload, verified_claims) + if reason: + logger.warning(f"Token claims rejected: {reason}") + _401("Token claims rejected") + logger.info( f"Authenticated workload (project={workload.ef_project_id}, " f"type={workload.type}, id={workload.id})" diff --git a/pia/models.py b/pia/models.py index cefc7e1..4efa0ae 100644 --- a/pia/models.py +++ b/pia/models.py @@ -16,6 +16,14 @@ JENKINS_ISSUER_PREFIX = "https://ci.eclipse.org" """Prefix used for early validation of Jenkins issuer URLs.""" +GITHUB_ALLOWED_EVENT_NAMES = frozenset({"push", "workflow_dispatch"}) +"""Allowed values for the GitHub OIDC token's `event_name` claim. + +Restricts the OIDC mint to events that require write access to the repo +(i.e. only maintainers can cause one). Excludes triggers like +`pull_request_target`, `workflow_run`, and `issue_comment` that can be +indirectly driven by non-maintainers. Relax on demand.""" + class Base(DeclarativeBase): """Declarative base class for al ORM models.""" @@ -163,6 +171,21 @@ def find_workload_by_claims( return session.execute(stmt).scalar_one_or_none() +def verify_workload_claims(workload: Workload, claims: dict[str, Any]) -> str | None: + """Workload-type-specific claim verification beyond the workload-match step. + + Returns a human-readable reason on rejection, or None on success. + """ + if isinstance(workload, GitHubWorkload): + event_name = claims.get("event_name") + if event_name not in GITHUB_ALLOWED_EVENT_NAMES: + return ( + f"GitHub event_name {event_name!r} not in allowlist " + f"{sorted(GITHUB_ALLOWED_EVENT_NAMES)}" + ) + return None + + def find_dt_project( session: Session, ef_project_id: str, name: str ) -> DependencyTrackProject | None: diff --git a/tests/test_main.py b/tests/test_main.py index aa9d0ca..33d6062 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -136,11 +136,43 @@ def test_success(self, mock_decode, mock_verify, seed_db): "iss": GITHUB_ISSUER, "repository": "eclipse-test/repo", "repository_owner_id": "42", + "event_name": "push", } result = self._call(BEARER_TOKEN, seed_db) assert isinstance(result, Workload) assert result.ef_project_id == "eclipse-test" + @patch("pia.main.oidc.verify_token") + @patch("pia.main.jwt.decode") + def test_github_disallowed_event_name(self, mock_decode, mock_verify, seed_db): + """GitHub token with disallowed event_name is rejected.""" + mock_decode.return_value = {"iss": GITHUB_ISSUER} + mock_verify.return_value = { + "iss": GITHUB_ISSUER, + "repository": "eclipse-test/repo", + "repository_owner_id": "42", + "event_name": "pull_request_target", + } + with pytest.raises(HTTPException) as exc: + self._call(BEARER_TOKEN, seed_db) + assert exc.value.status_code == 401 + assert "Token claims rejected" in exc.value.detail + + @patch("pia.main.oidc.verify_token") + @patch("pia.main.jwt.decode") + def test_github_missing_event_name(self, mock_decode, mock_verify, seed_db): + """GitHub token without an event_name claim is rejected.""" + mock_decode.return_value = {"iss": GITHUB_ISSUER} + mock_verify.return_value = { + "iss": GITHUB_ISSUER, + "repository": "eclipse-test/repo", + "repository_owner_id": "42", + } + with pytest.raises(HTTPException) as exc: + self._call(BEARER_TOKEN, seed_db) + assert exc.value.status_code == 401 + assert "Token claims rejected" in exc.value.detail + class TestUploadSBOMEndpoint: """Tests for /v1/upload/sbom endpoint, with authentication bypassed.""" diff --git a/tests/test_models.py b/tests/test_models.py index 1f94f59..6374227 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -11,6 +11,7 @@ find_dt_project, find_workload_by_claims, is_issuer_known, + verify_workload_claims, ) GITHUB_ISSUER = "https://token.actions.githubusercontent.com" @@ -89,6 +90,48 @@ def test_jenkins_no_match(self, seed_db): assert workload is None +class TestVerifyWorkloadClaims: + @pytest.fixture + def github_workload(self): + return GitHubWorkload( + ef_project_id="eclipse-test", + repo_owner="eclipse-test", + repo_name="repo", + repo_owner_id="42", + ) + + @pytest.fixture + def jenkins_workload(self): + return JenkinsWorkload( + ef_project_id="eclipse-other", + issuer="https://ci.eclipse.org/eclipse-other/oidc", + ) + + @pytest.mark.parametrize("event_name", ["push", "workflow_dispatch"]) + def test_github_allowed_event(self, github_workload, event_name): + assert ( + verify_workload_claims(github_workload, {"event_name": event_name}) is None + ) + + @pytest.mark.parametrize( + "event_name", + ["pull_request_target", "workflow_run", "issue_comment", "schedule", "release"], + ) + def test_github_disallowed_event(self, github_workload, event_name): + reason = verify_workload_claims(github_workload, {"event_name": event_name}) + assert reason is not None + assert event_name in reason + + def test_github_missing_event(self, github_workload): + reason = verify_workload_claims(github_workload, {}) + assert reason is not None + assert "None" in reason + + def test_jenkins_not_checked(self, jenkins_workload): + # No event_name check for Jenkins; any claims dict passes. + assert verify_workload_claims(jenkins_workload, {}) is None + + class TestFindDtProject: def test_match(self, seed_db): dt_project = find_dt_project(seed_db, "eclipse-test", "test-product")