Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 3 additions & 17 deletions services/py-genai-helper/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
from functools import wraps

import jwt
import requests
from flask import request

_jwks_cache: dict | None = None

KEYCLOAK_ISSUER_URL = os.environ.get(
"KEYCLOAK_ISSUER_URL",
"http://keycloak:8080/auth/realms/devops",
Expand All @@ -18,23 +15,12 @@
f"{KEYCLOAK_ISSUER_URL}/protocol/openid-connect/certs",
)


def _fetch_jwks() -> dict:
response = requests.get(_JWKS_URL, timeout=5)
response.raise_for_status()
return response.json()
# PyJWKClient handles caching internally (cache_jwk_set=True, lifespan=300s).
_jwks_client = jwt.PyJWKClient(_JWKS_URL, cache_jwk_set=True, lifespan=300)


def _get_signing_key(token: str) -> jwt.PyJWK:
global _jwks_cache
if _jwks_cache is None:
_jwks_cache = _fetch_jwks()
try:
return jwt.PyJWKClient(_JWKS_URL, jwks_data=_jwks_cache).get_signing_key_from_jwt(token)
except jwt.exceptions.PyJWKClientError:
# Key not found in cache — Keycloak may have rotated keys; refresh once.
_jwks_cache = _fetch_jwks()
return jwt.PyJWKClient(_JWKS_URL, jwks_data=_jwks_cache).get_signing_key_from_jwt(token)
return _jwks_client.get_signing_key_from_jwt(token)


def require_auth(f):
Expand Down
Loading