From 4eb92ead595070afe97b8de78db5d9455a42204c Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Tue, 19 May 2026 10:09:21 -0500 Subject: [PATCH 1/9] Add lifespan OpenSearch bootstrap via service JWT Introduce a one-shot OpenSearch security bootstrap during FastAPI lifespan that derives the admin username from a platform-issued service JWT. Adds PLATFORM_SERVICE_JWT and OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP settings, validates the presence of the token, decodes the admin username (new admin_username_from_service_jwt helper), waits for OpenSearch, and runs setup_opensearch_security before other startup tasks. Also update startup_tasks to skip OpenSearch setup when the lifespan bootstrap flag is enabled, and add logging and error handling for missing/invalid tokens. --- src/app/lifespan.py | 28 ++++++++++++++++++++++++++++ src/auth/ibm_auth.py | 16 ++++++++++++++++ src/config/settings.py | 17 +++++++++++++++++ src/services/startup_orchestrator.py | 9 ++++++++- 4 files changed, 69 insertions(+), 1 deletion(-) diff --git a/src/app/lifespan.py b/src/app/lifespan.py index 48b03e0f6..3ac05d676 100644 --- a/src/app/lifespan.py +++ b/src/app/lifespan.py @@ -14,6 +14,8 @@ from config.settings import ( JWT_CLAIMS_CACHE_MAX_SIZE, JWT_CLAIMS_CACHE_TTL_SECONDS, + OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP, + PLATFORM_SERVICE_JWT, RBAC_CACHE_BACKEND, RBAC_PERMISSION_CACHE_TTL_SECONDS, UVICORN_WORKER_COUNT, @@ -199,6 +201,32 @@ async def run_startup(app: FastAPI): await mcp_lifespan_ctx.__aenter__() logger.info("FastMCP lifespan started") + # One-shot OpenSearch security bootstrap driven by the platform's + # service JWT. Runs synchronously (before startup_tasks) so the + # admin role mapping is in place before any other startup work + # talks to OpenSearch. The corresponding call inside startup_tasks + # is suppressed when this flag is on. + if OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP: + if not PLATFORM_SERVICE_JWT: + raise RuntimeError( + "OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP is enabled but " + "PLATFORM_SERVICE_JWT is not set" + ) + from auth.ibm_auth import admin_username_from_service_jwt + from utils.opensearch_init import wait_for_opensearch + from utils.opensearch_utils import setup_opensearch_security + + admin_username = admin_username_from_service_jwt(PLATFORM_SERVICE_JWT) + if not admin_username: + raise RuntimeError( + "PLATFORM_SERVICE_JWT has no 'username' or 'sub' claim; " + "cannot bootstrap OpenSearch security" + ) + await wait_for_opensearch() + logger.info("Bootstrapping OpenSearch security", admin_username=admin_username) + await setup_opensearch_security(clients.opensearch, admin_username=admin_username) + logger.info("OpenSearch security bootstrap completed", admin_username=admin_username) + # Start index initialization in background to avoid blocking OIDC endpoints t1 = asyncio.create_task(startup_tasks(services)) app.state.background_tasks.add(t1) diff --git a/src/auth/ibm_auth.py b/src/auth/ibm_auth.py index f6af688eb..dbcbd73e4 100644 --- a/src/auth/ibm_auth.py +++ b/src/auth/ibm_auth.py @@ -52,6 +52,22 @@ def decode_ibm_jwt(token: str) -> dict | None: return None +def admin_username_from_service_jwt(token: str) -> str | None: + """Return the admin username carried by a platform-issued service JWT. + + Decodes *token* unsigned (the platform issues it; we only parse claims) + and returns `username` if present, falling back to `sub`. Matches the + claim precedence used by the auth dependency in dependencies.py. + Returns None if the token cannot be decoded or has neither claim. + """ + try: + claims = jwt.decode(token, options={"verify_signature": False}) + except jwt.InvalidTokenError as exc: + logger.warning("Service JWT decode failed", error=str(exc)) + return None + return claims.get("username") or claims.get("sub") + + async def fetch_ibm_public_key(url: str): """Fetch IBM's JWT public key PEM from *url* and cache it.""" global _cached_public_key diff --git a/src/config/settings.py b/src/config/settings.py index f02a8a490..885146e52 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -67,6 +67,11 @@ IBM_AUTH_ENABLED = os.getenv("IBM_AUTH_ENABLED", "false").lower() in ("true", "1", "yes") PLATFORM_USERNAME = os.getenv("PLATFORM_USERNAME") PLATFORM_PASSWORD = os.getenv("PLATFORM_PASSWORD") +# Platform-issued service JWT. When present and +# OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP is on, lifespan decodes this +# token to derive the admin username used to bootstrap the OpenSearch +# security context (roles + all_access mapping). +PLATFORM_SERVICE_JWT = os.getenv("PLATFORM_SERVICE_JWT") IBM_JWT_PUBLIC_KEY_URL = os.getenv("IBM_JWT_PUBLIC_KEY_URL", "") IBM_SESSION_COOKIE_NAME = os.getenv("IBM_SESSION_COOKIE_NAME", "ibm-openrag-session") IBM_CREDENTIALS_HEADER = os.getenv("IBM_CREDENTIALS_HEADER", "X-IBM-LH-Credentials") @@ -143,6 +148,18 @@ def _resolve_skip_os_security_default() -> str: "OPENRAG_SKIP_OS_SECURITY_SETUP", _resolve_skip_os_security_default() ).lower() in ("true", "1", "yes") +# Run setup_opensearch_security once during FastAPI lifespan startup, +# using the admin username derived from PLATFORM_SERVICE_JWT. Intended +# for platform-managed deployments (saas / on_prem) where the platform +# issues a service token that identifies the admin user that must be +# pinned into the all_access role mapping. Default off. +# +# When this flag is true the corresponding call inside startup_tasks() +# is suppressed — bootstrap is the single source of truth on startup. +OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP = os.getenv( + "OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP", "false" +).lower() in ("true", "1", "yes") + # Enable FastAPI's `debug` mode (verbose tracebacks in HTTP error responses # on the FastAPI app instance). Named explicitly so it isn't confused with # logging-level "debug" or other unrelated debug flags. diff --git a/src/services/startup_orchestrator.py b/src/services/startup_orchestrator.py index f6d7646fd..2dca209b2 100644 --- a/src/services/startup_orchestrator.py +++ b/src/services/startup_orchestrator.py @@ -9,6 +9,7 @@ from config.settings import ( DISABLE_INGEST_WITH_LANGFLOW, FETCH_OPENRAG_DOCS_AT_STARTUP, + OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP, OPENRAG_SKIP_OS_SECURITY_SETUP, clients, get_openrag_config, @@ -69,12 +70,18 @@ async def startup_tasks(services): # Setup OpenSearch security (roles and mappings) after connection is established. # Skip entirely when the platform manages the security context externally # (SaaS / CPD): the call would otherwise either fail with 403/401 or - # overwrite a curated config. + # overwrite a curated config. Also skip when the lifespan-level + # bootstrap (driven by PLATFORM_SERVICE_JWT) has already handled it. if OPENRAG_SKIP_OS_SECURITY_SETUP: logger.info( "Skipping OpenSearch security setup at startup " "(OPENRAG_SKIP_OS_SECURITY_SETUP=true)" ) + elif OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP: + logger.info( + "Skipping OpenSearch security setup in startup_tasks " + "(handled by lifespan bootstrap)" + ) else: try: from utils.opensearch_utils import setup_opensearch_security From 4cc8f9fce8e400e0bef56ade2785f5121c8e1934 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Tue, 19 May 2026 16:10:57 -0500 Subject: [PATCH 2/9] Fallback to K8s SA token for PLATFORM_SERVICE_JWT Use the Kubernetes pod service account token as a fallback for PLATFORM_SERVICE_JWT when no explicit env var is provided. Adds K8S_SA_TOKEN_PATH (with a default of /var/run/secrets/kubernetes.io/serviceaccount/token) and a helper _read_k8s_sa_token() that safely reads the token file (handling missing/permission errors). PLATFORM_SERVICE_JWT is now populated from the env var or the token file, allowing in-cluster authentication without injecting a JWT. --- src/config/settings.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/config/settings.py b/src/config/settings.py index 885146e52..4d2483fe0 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -71,7 +71,21 @@ # OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP is on, lifespan decodes this # token to derive the admin username used to bootstrap the OpenSearch # security context (roles + all_access mapping). -PLATFORM_SERVICE_JWT = os.getenv("PLATFORM_SERVICE_JWT") +# Falls back to the pod's Kubernetes service account token when running +# inside a cluster and no explicit JWT is injected. +_DEFAULT_K8S_SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" +K8S_SA_TOKEN_PATH = os.getenv("K8S_SA_TOKEN_PATH", _DEFAULT_K8S_SA_TOKEN_PATH) + + +def _read_k8s_sa_token() -> str | None: + try: + with open(K8S_SA_TOKEN_PATH) as f: + return f.read().strip() or None + except (FileNotFoundError, PermissionError): + return None + + +PLATFORM_SERVICE_JWT = os.getenv("PLATFORM_SERVICE_JWT") or _read_k8s_sa_token() IBM_JWT_PUBLIC_KEY_URL = os.getenv("IBM_JWT_PUBLIC_KEY_URL", "") IBM_SESSION_COOKIE_NAME = os.getenv("IBM_SESSION_COOKIE_NAME", "ibm-openrag-session") IBM_CREDENTIALS_HEADER = os.getenv("IBM_CREDENTIALS_HEADER", "X-IBM-LH-Credentials") From d2a9335741b4268ce4fa42c6aaf1cf20dc4efd2e Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Wed, 27 May 2026 10:17:22 -0500 Subject: [PATCH 3/9] Fetch service token and refactor OpenSearch client Add config/utils.py to centralize reading the K8s service account token and to fetch an OpenSearch service token from an internal auth server (get_opensearch_service_token). Introduce AUTH_SERVER_URL, OPENRAG_TENANT_ID and K8S_SA_TOKEN handling in settings.py and prefer the fetched OPENRAG_SERVICE_TOKEN as PLATFORM_SERVICE_JWT when available. Refactor AppClients to expose create_opensearch_client_from_jwt (and keep create_user_opensearch_client as an alias) and update lifespan.py to create an OpenSearch client from the JWT, pass it into wait/setup calls, and ensure the client is closed in a finally block. This enables cluster-local token exchange and ensures the bootstrap client is properly cleaned up. --- src/app/lifespan.py | 12 ++++++---- src/config/settings.py | 23 +++++++++++------- src/config/utils.py | 54 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 13 deletions(-) create mode 100644 src/config/utils.py diff --git a/src/app/lifespan.py b/src/app/lifespan.py index 3ac05d676..5ad12664d 100644 --- a/src/app/lifespan.py +++ b/src/app/lifespan.py @@ -222,10 +222,14 @@ async def run_startup(app: FastAPI): "PLATFORM_SERVICE_JWT has no 'username' or 'sub' claim; " "cannot bootstrap OpenSearch security" ) - await wait_for_opensearch() - logger.info("Bootstrapping OpenSearch security", admin_username=admin_username) - await setup_opensearch_security(clients.opensearch, admin_username=admin_username) - logger.info("OpenSearch security bootstrap completed", admin_username=admin_username) + opensearch_client = clients.create_opensearch_client_from_jwt(PLATFORM_SERVICE_JWT) + try: + await wait_for_opensearch(opensearch_client) + logger.info("Bootstrapping OpenSearch security", admin_username=admin_username) + await setup_opensearch_security(opensearch_client, admin_username=admin_username) + logger.info("OpenSearch security bootstrap completed", admin_username=admin_username) + finally: + await opensearch_client.close() # Start index initialization in background to avoid blocking OIDC endpoints t1 = asyncio.create_task(startup_tasks(services)) diff --git a/src/config/settings.py b/src/config/settings.py index 4d2483fe0..b7f3c16f8 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -12,6 +12,7 @@ from config.embedding_constants import OPENAI_DEFAULT_EMBEDDING_MODEL from config.paths import get_flows_path +from config.utils import _read_k8s_sa_token, get_opensearch_service_token from utils.container_utils import determine_docling_host, get_container_host from utils.embedding_fields import build_knn_vector_field from utils.env_utils import get_env_float, get_env_int @@ -75,17 +76,17 @@ # inside a cluster and no explicit JWT is injected. _DEFAULT_K8S_SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" K8S_SA_TOKEN_PATH = os.getenv("K8S_SA_TOKEN_PATH", _DEFAULT_K8S_SA_TOKEN_PATH) +AUTH_SERVER_URL = os.getenv("AUTH_SERVER_URL") -def _read_k8s_sa_token() -> str | None: - try: - with open(K8S_SA_TOKEN_PATH) as f: - return f.read().strip() or None - except (FileNotFoundError, PermissionError): - return None +K8S_SA_TOKEN = _read_k8s_sa_token(K8S_SA_TOKEN_PATH) +OPENRAG_TENANT_ID = os.getenv("OPENRAG_TENANT_ID", "openrag") +OPENRAG_SERVICE_TOKEN = get_opensearch_service_token( + AUTH_SERVER_URL, OPENRAG_TENANT_ID, K8S_SA_TOKEN_PATH +) -PLATFORM_SERVICE_JWT = os.getenv("PLATFORM_SERVICE_JWT") or _read_k8s_sa_token() +PLATFORM_SERVICE_JWT = OPENRAG_SERVICE_TOKEN or os.getenv("PLATFORM_SERVICE_JWT") IBM_JWT_PUBLIC_KEY_URL = os.getenv("IBM_JWT_PUBLIC_KEY_URL", "") IBM_SESSION_COOKIE_NAME = os.getenv("IBM_SESSION_COOKIE_NAME", "ibm-openrag-session") IBM_CREDENTIALS_HEADER = os.getenv("IBM_CREDENTIALS_HEADER", "X-IBM-LH-Credentials") @@ -1095,8 +1096,8 @@ async def _update_langflow_global_variable(self, name: str, value: str): error=str(e), ) - def create_user_opensearch_client(self, jwt_token: str): - """Create OpenSearch client with user's auth token. + def create_opensearch_client_from_jwt(self, jwt_token: str): + """Create an OpenSearch client authenticated with a JWT bearer token. If jwt_token already contains an auth scheme (e.g. "Basic ..." or "Bearer ..."), it is used verbatim. Otherwise it is wrapped as a Bearer token. @@ -1123,6 +1124,10 @@ def create_user_opensearch_client(self, jwt_token: str): retry_on_timeout=True, ) + def create_user_opensearch_client(self, jwt_token: str): + """Create OpenSearch client with user's auth token.""" + return self.create_opensearch_client_from_jwt(jwt_token) + # Component template paths — derived from the centralized flows directory def _component_path(env_var: str, filename: str) -> str: diff --git a/src/config/utils.py b/src/config/utils.py new file mode 100644 index 000000000..f93fe44ae --- /dev/null +++ b/src/config/utils.py @@ -0,0 +1,54 @@ + +import httpx + + +_DEFAULT_K8S_SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" + + +# Read the K8S service account token +def _read_k8s_sa_token(k8s_sa_token_path: str) -> str | None: + try: + with open(k8s_sa_token_path) as f: + return f.read().strip() or None + except (FileNotFoundError, PermissionError): + return None + + +def get_opensearch_service_token( + auth_server_url: str | None, + tenant_id: str, + k8s_sa_token_path: str = _DEFAULT_K8S_SA_TOKEN_PATH, +) -> str | None: + """ + Fetch an OpenSearch service token from the internal auth server using the current K8S service account token. + + Args: + tenant_id (str): The tenant ID for which the token is requested. + + Returns: + str | None: The raw OpenSearch token if successful, else None. + """ + if not auth_server_url: + return None + + token_endpoint = f"{auth_server_url.rstrip('/')}/internal/token/opensearch" + try: + # Read the K8S service account token + k8s_token = _read_k8s_sa_token(k8s_sa_token_path) + if not k8s_token: + return None + + headers = { + "Authorization": f"Bearer {k8s_token}", + "Content-Type": "application/json", + } + json_body = {"tenant_id": tenant_id} + + # Verify is False for cluster-local/internal endpoints; see original curl -k + with httpx.Client(verify=False, timeout=10) as client: + resp = client.post(token_endpoint, headers=headers, json=json_body) + resp.raise_for_status() + data = resp.json() + return data.get("token") + except Exception: + return None From cb0e848fce80dc02594272a00a9f806fcb46fa46 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Wed, 27 May 2026 10:26:37 -0500 Subject: [PATCH 4/9] Validate username/sub claim is a string Ensure admin_username_from_service_jwt only returns string values from the JWT. Previously the function returned whatever was in the "username" or "sub" claim; now it checks the claim type, logs a warning (including claim_type) if the value is present but not a string, and returns None in that case to avoid downstream errors. --- src/auth/ibm_auth.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/auth/ibm_auth.py b/src/auth/ibm_auth.py index dbcbd73e4..ca5d58c63 100644 --- a/src/auth/ibm_auth.py +++ b/src/auth/ibm_auth.py @@ -65,7 +65,15 @@ def admin_username_from_service_jwt(token: str) -> str | None: except jwt.InvalidTokenError as exc: logger.warning("Service JWT decode failed", error=str(exc)) return None - return claims.get("username") or claims.get("sub") + value = claims.get("username") or claims.get("sub") + if not isinstance(value, str): + if value is not None: + logger.warning( + "Service JWT username/sub claim is not a string", + claim_type=type(value).__name__, + ) + return None + return value async def fetch_ibm_public_key(url: str): From c84d0861ed16d14ffc6270cbb58850e97c32d123 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 27 May 2026 15:22:04 +0000 Subject: [PATCH 5/9] style: ruff autofix (auto) --- src/config/utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/config/utils.py b/src/config/utils.py index f93fe44ae..ed86a2cc5 100644 --- a/src/config/utils.py +++ b/src/config/utils.py @@ -1,7 +1,5 @@ - import httpx - _DEFAULT_K8S_SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" From 36ee8fa232924b313afd4ccafb86d69a70f5ca41 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Wed, 27 May 2026 15:50:51 -0500 Subject: [PATCH 6/9] Add JWT issuer verification and caching Implement fetching and caching of JWT verification public keys from issuer URLs and add JWT verification helpers. New utilities include bearer stripping, issuer allowlist checks, PEM/JWK(S) payload parsing, a TTLCache-backed issuer key cache, and get_public_key_from_issuer/verify_jwt_from_issuer functions. get_opensearch_service_token now accepts verify_token (default true) and will verify the returned service JWT against the auth server's issuer/pinned key. Add comprehensive unit tests for issuer verification (PEM, JWKS, raw PEM, prefix allowlist, and token verification behavior) and a small import cleanup in an existing test. --- src/config/utils.py | 178 +++++++++++- .../config/test_jwt_issuer_verification.py | 270 ++++++++++++++++++ tests/unit/test_jwt_claims_cache.py | 4 +- 3 files changed, 446 insertions(+), 6 deletions(-) create mode 100644 tests/unit/config/test_jwt_issuer_verification.py diff --git a/src/config/utils.py b/src/config/utils.py index ed86a2cc5..fca182034 100644 --- a/src/config/utils.py +++ b/src/config/utils.py @@ -1,6 +1,17 @@ +from typing import Any +from urllib.parse import urlparse + import httpx +import jwt +from cachetools import TTLCache +from cryptography.hazmat.primitives.serialization import load_pem_public_key + +from utils.logging_config import get_logger + +logger = get_logger(__name__) _DEFAULT_K8S_SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" +_ISSUER_PUBLIC_KEY_CACHE: TTLCache[str, Any] = TTLCache(maxsize=128, ttl=300) # Read the K8S service account token @@ -12,14 +23,153 @@ def _read_k8s_sa_token(k8s_sa_token_path: str) -> str | None: return None +def _strip_bearer_prefix(token: str) -> str: + scheme, _, value = token.partition(" ") + return value if scheme.lower() == "bearer" and value else token + + +def _issuer_allowed( + issuer: str, + allowed_issuers: set[str] | None, + allowed_issuer_prefixes: tuple[str, ...], +) -> bool: + if allowed_issuers and issuer in allowed_issuers: + return True + for prefix in allowed_issuer_prefixes: + base = prefix.rstrip("/") + if issuer == base or issuer.startswith(base + "/"): + return True + return False + + +def _load_public_key_from_payload(payload: Any, key_id: str | None = None): + if isinstance(payload, str): + return load_pem_public_key(payload.encode("utf-8")) + + if not isinstance(payload, dict): + raise ValueError("Public key response must be PEM text or JSON") + + public_key_pem = payload.get("public_key") or payload.get("pem") or payload.get("key") + if public_key_pem: + if isinstance(public_key_pem, bytes): + return load_pem_public_key(public_key_pem) + return load_pem_public_key(str(public_key_pem).encode("utf-8")) + + jwks = payload.get("keys") + if isinstance(jwks, list) and jwks: + jwk = next( + (candidate for candidate in jwks if key_id and candidate.get("kid") == key_id), + jwks[0], + ) + return jwt.PyJWK.from_dict(jwk).key + + if payload.get("kty"): + return jwt.PyJWK.from_dict(payload).key + + raise ValueError("Public key response does not contain a supported key format") + + +def get_public_key_from_issuer( + issuer: str, + key_id: str | None = None, + *, + verify_tls: bool = True, + timeout: float = 10.0, +): + """Fetch and cache a JWT verification public key from a trusted issuer URL.""" + parsed = urlparse(issuer) + if parsed.scheme not in ("http", "https") or not parsed.netloc: + raise ValueError("Issuer must be an absolute HTTP(S) URL") + + cache_key = f"{issuer}#{key_id or ''}" + cached = _ISSUER_PUBLIC_KEY_CACHE.get(cache_key) + if cached is not None: + return cached + + with httpx.Client(verify=verify_tls, timeout=timeout) as client: + response = client.get(issuer) + response.raise_for_status() + + content_type = response.headers.get("content-type", "") + if "json" in content_type: + key_payload = response.json() + else: + try: + key_payload = response.json() + except ValueError: + key_payload = response.text + + public_key = _load_public_key_from_payload(key_payload, key_id) + _ISSUER_PUBLIC_KEY_CACHE[cache_key] = public_key + return public_key + + +def verify_jwt_from_issuer( + token: str, + *, + allowed_issuers: set[str] | None = None, + allowed_issuer_prefixes: tuple[str, ...] = (), + algorithms: tuple[str, ...] = ("ES256",), + audience: str | list[str] | None = None, + verify_tls: bool = True, + timeout: float = 10.0, +) -> dict[str, Any] | None: + """Verify a JWT by fetching the issuer public key after allowlist checks.""" + raw_token = _strip_bearer_prefix(token) + try: + header = jwt.get_unverified_header(raw_token) + algorithm = header.get("alg") + if algorithm not in algorithms: + return None + + unverified_claims = jwt.decode( + raw_token, + options={"verify_signature": False, "verify_exp": False}, + ) + issuer = unverified_claims.get("iss") + if not isinstance(issuer, str) or not _issuer_allowed( + issuer, + allowed_issuers, + allowed_issuer_prefixes, + ): + return None + + public_key = get_public_key_from_issuer( + issuer, + header.get("kid"), + verify_tls=verify_tls, + timeout=timeout, + ) + + options: dict[str, Any] = {"require": ["iss", "sub", "exp", "iat"]} + decode_kwargs: dict[str, Any] = { + "algorithms": list(algorithms), + "issuer": issuer, + "options": options, + } + if audience is None: + options["verify_aud"] = False + else: + decode_kwargs["audience"] = audience + + return jwt.decode(raw_token, public_key, **decode_kwargs) + except (ValueError, httpx.HTTPError, jwt.InvalidTokenError): + return None + + def get_opensearch_service_token( auth_server_url: str | None, tenant_id: str, k8s_sa_token_path: str = _DEFAULT_K8S_SA_TOKEN_PATH, + *, + verify_token: bool = True, ) -> str | None: """ Fetch an OpenSearch service token from the internal auth server using the current K8S service account token. + When ``verify_token`` is True (default), the returned JWT is verified + against the auth server's public key (issuer pinned to ``auth_server_url``). + Args: tenant_id (str): The tenant ID for which the token is requested. @@ -31,7 +181,6 @@ def get_opensearch_service_token( token_endpoint = f"{auth_server_url.rstrip('/')}/internal/token/opensearch" try: - # Read the K8S service account token k8s_token = _read_k8s_sa_token(k8s_sa_token_path) if not k8s_token: return None @@ -47,6 +196,29 @@ def get_opensearch_service_token( resp = client.post(token_endpoint, headers=headers, json=json_body) resp.raise_for_status() data = resp.json() - return data.get("token") - except Exception: + token = data.get("token") + except Exception as exc: + logger.warning( + "Failed to fetch OpenSearch service token", + error=str(exc), + auth_server_url=auth_server_url, + ) return None + + if not token: + return None + + if verify_token: + claims = verify_jwt_from_issuer( + token, + allowed_issuer_prefixes=(auth_server_url.rstrip("/"),), + verify_tls=False, + ) + if claims is None: + logger.warning( + "OpenSearch service token failed JWT verification; rejecting", + auth_server_url=auth_server_url, + ) + return None + + return token diff --git a/tests/unit/config/test_jwt_issuer_verification.py b/tests/unit/config/test_jwt_issuer_verification.py new file mode 100644 index 000000000..0de224321 --- /dev/null +++ b/tests/unit/config/test_jwt_issuer_verification.py @@ -0,0 +1,270 @@ +import sys +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import jwt +import pytest +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ec + +ROOT = Path(__file__).resolve().parent.parent.parent.parent +SRC = ROOT / "src" +if str(SRC) not in sys.path: + sys.path.insert(0, str(SRC)) + +from config import utils # noqa: E402 + + +@pytest.fixture(autouse=True) +def clear_public_key_cache(): + utils._ISSUER_PUBLIC_KEY_CACHE.clear() + yield + utils._ISSUER_PUBLIC_KEY_CACHE.clear() + + +def _make_es256_token(issuer: str) -> tuple[str, str]: + private_key = ec.generate_private_key(ec.SECP256R1()) + public_pem = private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode() + now = int(time.time()) + token = jwt.encode( + { + "iss": issuer, + "sub": "system:serviceaccount:tenant:wxd-openrag-be", + "exp": now + 900, + "iat": now, + "roles": ["access_all"], + }, + private_key, + algorithm="ES256", + headers={"typ": "JWT"}, + ) + return token, public_pem + + +def test_verify_jwt_from_issuer_fetches_public_key_and_validates_es256_token(): + issuer = "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082/keys/workload" + token, public_pem = _make_es256_token(issuer) + + response = MagicMock() + response.headers = {"content-type": "application/json"} + response.json.return_value = {"public_key": public_pem} + + client = MagicMock() + client.__enter__.return_value = client + client.get.return_value = response + + with patch("config.utils.httpx.Client", return_value=client): + claims = utils.verify_jwt_from_issuer( + f"Bearer {token}", + allowed_issuer_prefixes=( + "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082/keys/", + ), + verify_tls=False, + ) + + assert claims is not None + assert claims["iss"] == issuer + assert claims["roles"] == ["access_all"] + client.get.assert_called_once_with(issuer) + + +def test_verify_jwt_from_issuer_rejects_issuer_that_only_matches_as_string_prefix(): + issuer = ( + "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082" + ".evil.example/keys/workload" + ) + token, _ = _make_es256_token(issuer) + + with patch("config.utils.httpx.Client") as client_factory: + claims = utils.verify_jwt_from_issuer( + token, + allowed_issuer_prefixes=( + "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082", + ), + ) + + assert claims is None + client_factory.assert_not_called() + + +def test_verify_jwt_from_issuer_rejects_unallowed_issuer_without_fetching_key(): + issuer = "https://evil.example.test/keys/workload" + token, _ = _make_es256_token(issuer) + + with patch("config.utils.httpx.Client") as client_factory: + claims = utils.verify_jwt_from_issuer( + token, + allowed_issuer_prefixes=("https://authserver-oidc-svc.openrag-control.svc/",), + ) + + assert claims is None + client_factory.assert_not_called() + + +def test_verify_jwt_from_issuer_accepts_standard_jwks_response(): + issuer = "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082/keys/workload" + private_key = ec.generate_private_key(ec.SECP256R1()) + public_numbers = private_key.public_key().public_numbers() + + def _b64(value: int) -> str: + import base64 + + length = (value.bit_length() + 7) // 8 + return base64.urlsafe_b64encode(value.to_bytes(length, "big")).rstrip(b"=").decode() + + jwks = { + "keys": [ + { + "alg": "ES256", + "crv": "P-256", + "kty": "EC", + "use": "sig", + "x": _b64(public_numbers.x), + "y": _b64(public_numbers.y), + } + ] + } + + now = int(time.time()) + token = jwt.encode( + { + "iss": issuer, + "sub": "system:serviceaccount:tenant:wxd-openrag-be", + "exp": now + 900, + "iat": now, + }, + private_key, + algorithm="ES256", + ) + + response = MagicMock() + response.headers = {"content-type": "application/json"} + response.json.return_value = jwks + + client = MagicMock() + client.__enter__.return_value = client + client.get.return_value = response + + with patch("config.utils.httpx.Client", return_value=client): + claims = utils.verify_jwt_from_issuer( + token, + allowed_issuer_prefixes=( + "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082/keys/", + ), + verify_tls=False, + ) + + assert claims is not None + assert claims["iss"] == issuer + + +def test_verify_jwt_from_issuer_accepts_raw_pem_response(): + issuer = "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082/keys/raw" + token, public_pem = _make_es256_token(issuer) + + response = MagicMock() + response.headers = {"content-type": "application/x-pem-file"} + response.json.side_effect = ValueError("not json") + response.text = public_pem + + client = MagicMock() + client.__enter__.return_value = client + client.get.return_value = response + + with patch("config.utils.httpx.Client", return_value=client): + claims = utils.verify_jwt_from_issuer( + token, + allowed_issuer_prefixes=( + "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082/keys/", + ), + verify_tls=False, + ) + + assert claims is not None + assert claims["iss"] == issuer + + +def _patch_post_returning_token(token: str): + post_response = MagicMock() + post_response.raise_for_status.return_value = None + post_response.json.return_value = {"token": token} + + post_client = MagicMock() + post_client.__enter__.return_value = post_client + post_client.post.return_value = post_response + return post_client + + +def test_get_opensearch_service_token_verifies_returned_jwt(tmp_path): + auth_server_url = "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082" + issuer = f"{auth_server_url}/keys/workload" + token, public_pem = _make_es256_token(issuer) + + sa_token_file = tmp_path / "sa-token" + sa_token_file.write_text("k8s-sa-token") + + post_client = _patch_post_returning_token(token) + + get_response = MagicMock() + get_response.headers = {"content-type": "application/x-pem-file"} + get_response.json.side_effect = ValueError("not json") + get_response.text = public_pem + get_response.raise_for_status.return_value = None + + get_client = MagicMock() + get_client.__enter__.return_value = get_client + get_client.get.return_value = get_response + + with patch("config.utils.httpx.Client", side_effect=[post_client, get_client]): + result = utils.get_opensearch_service_token( + auth_server_url, + tenant_id="openrag", + k8s_sa_token_path=str(sa_token_file), + ) + + assert result == token + + +def test_get_opensearch_service_token_rejects_token_from_unexpected_issuer(tmp_path): + auth_server_url = "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082" + issuer = "https://attacker.example/keys/workload" + token, _ = _make_es256_token(issuer) + + sa_token_file = tmp_path / "sa-token" + sa_token_file.write_text("k8s-sa-token") + + post_client = _patch_post_returning_token(token) + + with patch("config.utils.httpx.Client", side_effect=[post_client]): + result = utils.get_opensearch_service_token( + auth_server_url, + tenant_id="openrag", + k8s_sa_token_path=str(sa_token_file), + ) + + assert result is None + + +def test_get_opensearch_service_token_skips_verification_when_disabled(tmp_path): + auth_server_url = "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082" + issuer = "https://attacker.example/keys/workload" + token, _ = _make_es256_token(issuer) + + sa_token_file = tmp_path / "sa-token" + sa_token_file.write_text("k8s-sa-token") + + post_client = _patch_post_returning_token(token) + + with patch("config.utils.httpx.Client", side_effect=[post_client]): + result = utils.get_opensearch_service_token( + auth_server_url, + tenant_id="openrag", + k8s_sa_token_path=str(sa_token_file), + verify_token=False, + ) + + assert result == token diff --git a/tests/unit/test_jwt_claims_cache.py b/tests/unit/test_jwt_claims_cache.py index fe70b2929..2dce2cf6f 100644 --- a/tests/unit/test_jwt_claims_cache.py +++ b/tests/unit/test_jwt_claims_cache.py @@ -5,6 +5,7 @@ fixture so tests are fully isolated. """ +import os import sys import time from pathlib import Path @@ -17,9 +18,6 @@ if str(SRC) not in sys.path: sys.path.insert(0, str(SRC)) -# Patch heavy config imports before importing session_manager -import os - os.environ.setdefault("OPENRAG_JWT_CACHE_TTL", "60") os.environ.setdefault("OPENRAG_JWT_CACHE_MAXSIZE", "1024") From 021f00642a964d6a3250740c22bb687ba0a848aa Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 27 May 2026 20:51:21 +0000 Subject: [PATCH 7/9] style: ruff autofix (auto) --- tests/unit/config/test_jwt_issuer_verification.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/unit/config/test_jwt_issuer_verification.py b/tests/unit/config/test_jwt_issuer_verification.py index 0de224321..f37652c64 100644 --- a/tests/unit/config/test_jwt_issuer_verification.py +++ b/tests/unit/config/test_jwt_issuer_verification.py @@ -25,10 +25,14 @@ def clear_public_key_cache(): def _make_es256_token(issuer: str) -> tuple[str, str]: private_key = ec.generate_private_key(ec.SECP256R1()) - public_pem = private_key.public_key().public_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo, - ).decode() + public_pem = ( + private_key.public_key() + .public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + .decode() + ) now = int(time.time()) token = jwt.encode( { From 522fd0a7296cfc6cf9f48ac9da9a227617758c52 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Thu, 28 May 2026 14:07:15 -0500 Subject: [PATCH 8/9] Use OPENRAG_SERVICE_TOKEN env at startup Switch startup token handling to read OPENRAG_SERVICE_TOKEN from the environment instead of deriving/fetching a PLATFORM_SERVICE_JWT via Kubernetes service account logic. settings: remove k8s SA token reading and the get_opensearch_service_token plumbing; add get_openrag_service_token() that returns OPENRAG_SERVICE_TOKEN. lifespan: call get_openrag_service_token() and update error messages and uses accordingly. utils: remove K8s token helper and the internal get_opensearch_service_token flow, simplify issuer-key discovery/verification (no issuer allowlist), and improve docstrings. startup_orchestrator and tests: update comments and unit tests to reflect removal of issuer-allowlist and service-token fetching behavior (remove related tests). This simplifies startup configuration by relying on an explicit env var for the platform service token and reduces cluster-local token-fetching complexity. --- src/app/lifespan.py | 13 +- src/config/settings.py | 28 ++-- src/config/utils.py | 113 ++-------------- src/services/startup_orchestrator.py | 2 +- .../config/test_jwt_issuer_verification.py | 124 ------------------ 5 files changed, 29 insertions(+), 251 deletions(-) diff --git a/src/app/lifespan.py b/src/app/lifespan.py index 5ad12664d..7be8e1fc8 100644 --- a/src/app/lifespan.py +++ b/src/app/lifespan.py @@ -15,12 +15,12 @@ JWT_CLAIMS_CACHE_MAX_SIZE, JWT_CLAIMS_CACHE_TTL_SECONDS, OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP, - PLATFORM_SERVICE_JWT, RBAC_CACHE_BACKEND, RBAC_PERMISSION_CACHE_TTL_SECONDS, UVICORN_WORKER_COUNT, clients, get_openrag_config, + get_openrag_service_token, ) from services.startup_orchestrator import startup_tasks from utils.logging_config import get_logger @@ -207,22 +207,23 @@ async def run_startup(app: FastAPI): # talks to OpenSearch. The corresponding call inside startup_tasks # is suppressed when this flag is on. if OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP: - if not PLATFORM_SERVICE_JWT: + service_token = get_openrag_service_token() + if not service_token: raise RuntimeError( "OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP is enabled but " - "PLATFORM_SERVICE_JWT is not set" + "OPENRAG_SERVICE_TOKEN is not set" ) from auth.ibm_auth import admin_username_from_service_jwt from utils.opensearch_init import wait_for_opensearch from utils.opensearch_utils import setup_opensearch_security - admin_username = admin_username_from_service_jwt(PLATFORM_SERVICE_JWT) + admin_username = admin_username_from_service_jwt(service_token) if not admin_username: raise RuntimeError( - "PLATFORM_SERVICE_JWT has no 'username' or 'sub' claim; " + "OPENRAG_SERVICE_TOKEN has no 'username' or 'sub' claim; " "cannot bootstrap OpenSearch security" ) - opensearch_client = clients.create_opensearch_client_from_jwt(PLATFORM_SERVICE_JWT) + opensearch_client = clients.create_opensearch_client_from_jwt(service_token) try: await wait_for_opensearch(opensearch_client) logger.info("Bootstrapping OpenSearch security", admin_username=admin_username) diff --git a/src/config/settings.py b/src/config/settings.py index b7f3c16f8..0b8177779 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -12,7 +12,6 @@ from config.embedding_constants import OPENAI_DEFAULT_EMBEDDING_MODEL from config.paths import get_flows_path -from config.utils import _read_k8s_sa_token, get_opensearch_service_token from utils.container_utils import determine_docling_host, get_container_host from utils.embedding_fields import build_knn_vector_field from utils.env_utils import get_env_float, get_env_int @@ -68,25 +67,7 @@ IBM_AUTH_ENABLED = os.getenv("IBM_AUTH_ENABLED", "false").lower() in ("true", "1", "yes") PLATFORM_USERNAME = os.getenv("PLATFORM_USERNAME") PLATFORM_PASSWORD = os.getenv("PLATFORM_PASSWORD") -# Platform-issued service JWT. When present and -# OPENRAG_BOOTSTRAP_OS_SECURITY_ON_STARTUP is on, lifespan decodes this -# token to derive the admin username used to bootstrap the OpenSearch -# security context (roles + all_access mapping). -# Falls back to the pod's Kubernetes service account token when running -# inside a cluster and no explicit JWT is injected. -_DEFAULT_K8S_SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" -K8S_SA_TOKEN_PATH = os.getenv("K8S_SA_TOKEN_PATH", _DEFAULT_K8S_SA_TOKEN_PATH) -AUTH_SERVER_URL = os.getenv("AUTH_SERVER_URL") - - -K8S_SA_TOKEN = _read_k8s_sa_token(K8S_SA_TOKEN_PATH) OPENRAG_TENANT_ID = os.getenv("OPENRAG_TENANT_ID", "openrag") -OPENRAG_SERVICE_TOKEN = get_opensearch_service_token( - AUTH_SERVER_URL, OPENRAG_TENANT_ID, K8S_SA_TOKEN_PATH -) - - -PLATFORM_SERVICE_JWT = OPENRAG_SERVICE_TOKEN or os.getenv("PLATFORM_SERVICE_JWT") IBM_JWT_PUBLIC_KEY_URL = os.getenv("IBM_JWT_PUBLIC_KEY_URL", "") IBM_SESSION_COOKIE_NAME = os.getenv("IBM_SESSION_COOKIE_NAME", "ibm-openrag-session") IBM_CREDENTIALS_HEADER = os.getenv("IBM_CREDENTIALS_HEADER", "X-IBM-LH-Credentials") @@ -127,6 +108,13 @@ def get_role_claim_viewer() -> str | None: return os.getenv("OPENRAG_ROLE_CLAIM_VIEWER") +def get_openrag_service_token() -> str | None: + """Platform-issued service JWT used at startup to bootstrap the OpenSearch + security context (admin role mapping). Read per-call — like the JWT-claim + accessors above — so runtime/test overrides take effect without a restart.""" + return os.getenv("OPENRAG_SERVICE_TOKEN") + + DOCLING_OCR_ENGINE = os.getenv("DOCLING_OCR_ENGINE") SEGMENT_WRITE_KEY = os.getenv("SEGMENT_WRITE_KEY", "") ENVIRONMENT = os.getenv("ENVIRONMENT", "") @@ -164,7 +152,7 @@ def _resolve_skip_os_security_default() -> str: ).lower() in ("true", "1", "yes") # Run setup_opensearch_security once during FastAPI lifespan startup, -# using the admin username derived from PLATFORM_SERVICE_JWT. Intended +# using the admin username derived from OPENRAG_SERVICE_TOKEN. Intended # for platform-managed deployments (saas / on_prem) where the platform # issues a service token that identifies the admin user that must be # pinned into the all_access role mapping. Default off. diff --git a/src/config/utils.py b/src/config/utils.py index fca182034..d283cd0b0 100644 --- a/src/config/utils.py +++ b/src/config/utils.py @@ -10,38 +10,14 @@ logger = get_logger(__name__) -_DEFAULT_K8S_SA_TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token" _ISSUER_PUBLIC_KEY_CACHE: TTLCache[str, Any] = TTLCache(maxsize=128, ttl=300) -# Read the K8S service account token -def _read_k8s_sa_token(k8s_sa_token_path: str) -> str | None: - try: - with open(k8s_sa_token_path) as f: - return f.read().strip() or None - except (FileNotFoundError, PermissionError): - return None - - def _strip_bearer_prefix(token: str) -> str: scheme, _, value = token.partition(" ") return value if scheme.lower() == "bearer" and value else token -def _issuer_allowed( - issuer: str, - allowed_issuers: set[str] | None, - allowed_issuer_prefixes: tuple[str, ...], -) -> bool: - if allowed_issuers and issuer in allowed_issuers: - return True - for prefix in allowed_issuer_prefixes: - base = prefix.rstrip("/") - if issuer == base or issuer.startswith(base + "/"): - return True - return False - - def _load_public_key_from_payload(payload: Any, key_id: str | None = None): if isinstance(payload, str): return load_pem_public_key(payload.encode("utf-8")) @@ -76,7 +52,8 @@ def get_public_key_from_issuer( verify_tls: bool = True, timeout: float = 10.0, ): - """Fetch and cache a JWT verification public key from a trusted issuer URL.""" + """Fetch and cache a JWT verification public key (PEM / JWKS / JWK) from a + JWT issuer URL. The issuer URL is expected to serve its own key material.""" parsed = urlparse(issuer) if parsed.scheme not in ("http", "https") or not parsed.netloc: raise ValueError("Issuer must be an absolute HTTP(S) URL") @@ -107,14 +84,21 @@ def get_public_key_from_issuer( def verify_jwt_from_issuer( token: str, *, - allowed_issuers: set[str] | None = None, - allowed_issuer_prefixes: tuple[str, ...] = (), algorithms: tuple[str, ...] = ("ES256",), audience: str | list[str] | None = None, verify_tls: bool = True, timeout: float = 10.0, ) -> dict[str, Any] | None: - """Verify a JWT by fetching the issuer public key after allowlist checks.""" + """Verify a JWT by discovering the signing key from the token's own ``iss`` + claim (JWKS/PEM served at the issuer URL), then checking the signature and + the standard ``iss``/``sub``/``exp``/``iat`` claims. + + There is NO issuer allowlist: the ``iss`` URL is trusted to publish its own + verification keys. This suits a deployment where an upstream gateway has + already authenticated the caller and forwards the JWT — the gateway controls + which ``iss`` reaches this code. If the header can be set by untrusted + clients, pin the issuer instead. + """ raw_token = _strip_bearer_prefix(token) try: header = jwt.get_unverified_header(raw_token) @@ -127,11 +111,7 @@ def verify_jwt_from_issuer( options={"verify_signature": False, "verify_exp": False}, ) issuer = unverified_claims.get("iss") - if not isinstance(issuer, str) or not _issuer_allowed( - issuer, - allowed_issuers, - allowed_issuer_prefixes, - ): + if not isinstance(issuer, str) or not issuer: return None public_key = get_public_key_from_issuer( @@ -155,70 +135,3 @@ def verify_jwt_from_issuer( return jwt.decode(raw_token, public_key, **decode_kwargs) except (ValueError, httpx.HTTPError, jwt.InvalidTokenError): return None - - -def get_opensearch_service_token( - auth_server_url: str | None, - tenant_id: str, - k8s_sa_token_path: str = _DEFAULT_K8S_SA_TOKEN_PATH, - *, - verify_token: bool = True, -) -> str | None: - """ - Fetch an OpenSearch service token from the internal auth server using the current K8S service account token. - - When ``verify_token`` is True (default), the returned JWT is verified - against the auth server's public key (issuer pinned to ``auth_server_url``). - - Args: - tenant_id (str): The tenant ID for which the token is requested. - - Returns: - str | None: The raw OpenSearch token if successful, else None. - """ - if not auth_server_url: - return None - - token_endpoint = f"{auth_server_url.rstrip('/')}/internal/token/opensearch" - try: - k8s_token = _read_k8s_sa_token(k8s_sa_token_path) - if not k8s_token: - return None - - headers = { - "Authorization": f"Bearer {k8s_token}", - "Content-Type": "application/json", - } - json_body = {"tenant_id": tenant_id} - - # Verify is False for cluster-local/internal endpoints; see original curl -k - with httpx.Client(verify=False, timeout=10) as client: - resp = client.post(token_endpoint, headers=headers, json=json_body) - resp.raise_for_status() - data = resp.json() - token = data.get("token") - except Exception as exc: - logger.warning( - "Failed to fetch OpenSearch service token", - error=str(exc), - auth_server_url=auth_server_url, - ) - return None - - if not token: - return None - - if verify_token: - claims = verify_jwt_from_issuer( - token, - allowed_issuer_prefixes=(auth_server_url.rstrip("/"),), - verify_tls=False, - ) - if claims is None: - logger.warning( - "OpenSearch service token failed JWT verification; rejecting", - auth_server_url=auth_server_url, - ) - return None - - return token diff --git a/src/services/startup_orchestrator.py b/src/services/startup_orchestrator.py index 2dca209b2..6a3a94ad8 100644 --- a/src/services/startup_orchestrator.py +++ b/src/services/startup_orchestrator.py @@ -71,7 +71,7 @@ async def startup_tasks(services): # Skip entirely when the platform manages the security context externally # (SaaS / CPD): the call would otherwise either fail with 403/401 or # overwrite a curated config. Also skip when the lifespan-level - # bootstrap (driven by PLATFORM_SERVICE_JWT) has already handled it. + # bootstrap (driven by OPENRAG_SERVICE_TOKEN) has already handled it. if OPENRAG_SKIP_OS_SECURITY_SETUP: logger.info( "Skipping OpenSearch security setup at startup " diff --git a/tests/unit/config/test_jwt_issuer_verification.py b/tests/unit/config/test_jwt_issuer_verification.py index f37652c64..7c35e5b7f 100644 --- a/tests/unit/config/test_jwt_issuer_verification.py +++ b/tests/unit/config/test_jwt_issuer_verification.py @@ -64,9 +64,6 @@ def test_verify_jwt_from_issuer_fetches_public_key_and_validates_es256_token(): with patch("config.utils.httpx.Client", return_value=client): claims = utils.verify_jwt_from_issuer( f"Bearer {token}", - allowed_issuer_prefixes=( - "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082/keys/", - ), verify_tls=False, ) @@ -76,39 +73,6 @@ def test_verify_jwt_from_issuer_fetches_public_key_and_validates_es256_token(): client.get.assert_called_once_with(issuer) -def test_verify_jwt_from_issuer_rejects_issuer_that_only_matches_as_string_prefix(): - issuer = ( - "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082" - ".evil.example/keys/workload" - ) - token, _ = _make_es256_token(issuer) - - with patch("config.utils.httpx.Client") as client_factory: - claims = utils.verify_jwt_from_issuer( - token, - allowed_issuer_prefixes=( - "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082", - ), - ) - - assert claims is None - client_factory.assert_not_called() - - -def test_verify_jwt_from_issuer_rejects_unallowed_issuer_without_fetching_key(): - issuer = "https://evil.example.test/keys/workload" - token, _ = _make_es256_token(issuer) - - with patch("config.utils.httpx.Client") as client_factory: - claims = utils.verify_jwt_from_issuer( - token, - allowed_issuer_prefixes=("https://authserver-oidc-svc.openrag-control.svc/",), - ) - - assert claims is None - client_factory.assert_not_called() - - def test_verify_jwt_from_issuer_accepts_standard_jwks_response(): issuer = "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082/keys/workload" private_key = ec.generate_private_key(ec.SECP256R1()) @@ -156,9 +120,6 @@ def _b64(value: int) -> str: with patch("config.utils.httpx.Client", return_value=client): claims = utils.verify_jwt_from_issuer( token, - allowed_issuer_prefixes=( - "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082/keys/", - ), verify_tls=False, ) @@ -182,93 +143,8 @@ def test_verify_jwt_from_issuer_accepts_raw_pem_response(): with patch("config.utils.httpx.Client", return_value=client): claims = utils.verify_jwt_from_issuer( token, - allowed_issuer_prefixes=( - "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082/keys/", - ), verify_tls=False, ) assert claims is not None assert claims["iss"] == issuer - - -def _patch_post_returning_token(token: str): - post_response = MagicMock() - post_response.raise_for_status.return_value = None - post_response.json.return_value = {"token": token} - - post_client = MagicMock() - post_client.__enter__.return_value = post_client - post_client.post.return_value = post_response - return post_client - - -def test_get_opensearch_service_token_verifies_returned_jwt(tmp_path): - auth_server_url = "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082" - issuer = f"{auth_server_url}/keys/workload" - token, public_pem = _make_es256_token(issuer) - - sa_token_file = tmp_path / "sa-token" - sa_token_file.write_text("k8s-sa-token") - - post_client = _patch_post_returning_token(token) - - get_response = MagicMock() - get_response.headers = {"content-type": "application/x-pem-file"} - get_response.json.side_effect = ValueError("not json") - get_response.text = public_pem - get_response.raise_for_status.return_value = None - - get_client = MagicMock() - get_client.__enter__.return_value = get_client - get_client.get.return_value = get_response - - with patch("config.utils.httpx.Client", side_effect=[post_client, get_client]): - result = utils.get_opensearch_service_token( - auth_server_url, - tenant_id="openrag", - k8s_sa_token_path=str(sa_token_file), - ) - - assert result == token - - -def test_get_opensearch_service_token_rejects_token_from_unexpected_issuer(tmp_path): - auth_server_url = "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082" - issuer = "https://attacker.example/keys/workload" - token, _ = _make_es256_token(issuer) - - sa_token_file = tmp_path / "sa-token" - sa_token_file.write_text("k8s-sa-token") - - post_client = _patch_post_returning_token(token) - - with patch("config.utils.httpx.Client", side_effect=[post_client]): - result = utils.get_opensearch_service_token( - auth_server_url, - tenant_id="openrag", - k8s_sa_token_path=str(sa_token_file), - ) - - assert result is None - - -def test_get_opensearch_service_token_skips_verification_when_disabled(tmp_path): - auth_server_url = "https://authserver-oidc-svc.openrag-control.svc.cluster.local:8082" - issuer = "https://attacker.example/keys/workload" - token, _ = _make_es256_token(issuer) - - sa_token_file = tmp_path / "sa-token" - sa_token_file.write_text("k8s-sa-token") - - post_client = _patch_post_returning_token(token) - - with patch("config.utils.httpx.Client", side_effect=[post_client]): - result = utils.get_opensearch_service_token( - auth_server_url, - tenant_id="openrag", - k8s_sa_token_path=str(sa_token_file), - verify_token=False, - ) - - assert result == token From 85d1ce5549adf87a345606ec06f1fc7955bd0330 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 28 May 2026 19:28:11 +0000 Subject: [PATCH 9/9] style: ruff autofix (auto) --- src/config/settings.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/config/settings.py b/src/config/settings.py index 659626435..d4227310b 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -113,6 +113,8 @@ def get_openrag_service_token() -> str | None: security context (admin role mapping). Read per-call — like the JWT-claim accessors above — so runtime/test overrides take effect without a restart.""" return os.getenv("OPENRAG_SERVICE_TOKEN") + + def get_jwt_auth_header() -> str: """HTTP header that may carry a gateway-forwarded JWT for /v1 (API-key) callers. Read per-call so tests can override via monkeypatch.setenv."""