diff --git a/core/middlewares/middlewares.py b/core/middlewares/middlewares.py index d5b21beb..43bdb605 100644 --- a/core/middlewares/middlewares.py +++ b/core/middlewares/middlewares.py @@ -2,7 +2,9 @@ import time import requests +from django.conf import settings from django.http import HttpResponseNotFound, HttpResponse +from django.http.response import JsonResponse from django.utils.deprecation import MiddlewareMixin from request_logging.middleware import LoggingMiddleware from rest_framework.views import APIView @@ -52,7 +54,6 @@ class ResponseHeadersMiddleware(BaseMiddleware): def __call__(self, request): start_time = time.time() response = self.get_response(request) - from django.conf import settings response[VERSION_HEADER] = settings.VERSION try: response[REQUEST_USER_HEADER] = str(getattr(request, 'user', None)) @@ -89,6 +90,109 @@ def __call__(self, request): return response +class RequireAuthenticationMiddleware(BaseMiddleware): + """Block anonymous API access unless the request matches an approved bypass.""" + + exempt_path_prefixes = ( + '/healthcheck/', + '/users/api-token/', + '/users/login/', + '/users/logout/', + '/users/signup/', + '/users/password/reset/', + '/users/oidc/', + '/oidc/', + '/fhir/', + '/swagger', + '/redoc/', + '/admin/', + ) + exempt_exact_paths = { + '/', + '/version', + '/changelog', + '/feedback', + '/toggles', + '/locales', + '/events', + } + forbidden_response = { + 'detail': 'Authentication required. Anonymous API access is disabled.', + 'upgrade_url': 'https://app.openconceptlab.org/pricing', + } + + def __call__(self, request): + """Allow exempt and approved anonymous traffic, otherwise return 403.""" + if self.is_request_allowed(request): + return self.get_response(request) + + return JsonResponse(self.forbidden_response, status=403) + + def is_request_allowed(self, request): + """Return whether the current request can bypass authentication enforcement.""" + if request.method == 'OPTIONS' or request.META.get('HTTP_USER_AGENT', '').startswith('ELB-HealthChecker'): + return True + + user = getattr(request, 'user', None) + return getattr(user, 'is_authenticated', False) or self.is_exempt_path(request.path) or \ + self.has_approved_client_header(request) or \ + self.has_approved_api_key(request) or self.has_approved_ip(request) + + @classmethod + def is_exempt_path(cls, path): + """Return whether a request path must remain accessible to anonymous users.""" + normalized_path = path.rstrip('/') or '/' + if normalized_path in cls.exempt_exact_paths: + return True + + if any(path.startswith(prefix) for prefix in cls.exempt_path_prefixes): + return True + + return ( + path.startswith('/users/') + and ( + '/verify/' in path + or path.endswith('/sso-migrate/') + or path.endswith('/following/') + ) + ) + + @staticmethod + def has_approved_client_header(request): + """Match the configured anonymous allowlist against the X-OCL-CLIENT header.""" + client_name = request.META.get('HTTP_X_OCL_CLIENT', '').strip() + return bool(client_name and client_name in settings.APPROVED_ANONYMOUS_CLIENTS) + + @staticmethod + def has_approved_api_key(request): + """Match allowlisted anonymous API keys from common request locations.""" + approved_keys = settings.APPROVED_ANONYMOUS_API_KEYS + if not approved_keys: + return False + + authorization = request.META.get('HTTP_AUTHORIZATION', '').strip() + x_api_key = request.META.get('HTTP_X_API_KEY', '').strip() + tokens = [authorization, x_api_key] + bearer_token = authorization.split(None, 1)[1].strip() if ' ' in authorization else '' + if bearer_token: + tokens.append(bearer_token) + + return any(token and token in approved_keys for token in tokens) + + @staticmethod + def has_approved_ip(request): + """Match source IPs using the socket address only.""" + approved_ips = settings.APPROVED_ANONYMOUS_IPS + if not approved_ips: + return False + + remote_addr = request.META.get('REMOTE_ADDR', '').strip() + if not remote_addr: + return False + + return remote_addr in approved_ips + + class FhirMiddleware(BaseMiddleware): """ It is used to expose FHIR endpoints under FHIR subdomain only and convert content from xml to json. @@ -98,7 +202,6 @@ class FhirMiddleware(BaseMiddleware): def __call__(self, request): absolute_uri = request.build_absolute_uri() - from django.conf import settings if settings.FHIR_SUBDOMAIN: uri = absolute_uri.split('/') domain = uri[2] if len(uri) > 2 else '' diff --git a/core/middlewares/tests.py b/core/middlewares/tests.py new file mode 100644 index 00000000..3f3ef360 --- /dev/null +++ b/core/middlewares/tests.py @@ -0,0 +1,167 @@ +import json + +from django.conf import settings +from django.contrib.auth.models import AnonymousUser +from django.http import HttpResponse +from django.test import RequestFactory, SimpleTestCase, override_settings + +from core.middlewares.middlewares import RequireAuthenticationMiddleware + + +@override_settings( + REQUIRE_AUTHENTICATION=True, + APPROVED_ANONYMOUS_CLIENTS=['test-client'], + APPROVED_ANONYMOUS_API_KEYS=['test-api-key'], + APPROVED_ANONYMOUS_IPS=['10.0.0.1'], +) +class RequireAuthenticationMiddlewareTest(SimpleTestCase): + """Verify anonymous authentication enforcement and approved bypasses.""" + + def setUp(self): + """Create a request factory and middleware instance for each test.""" + self.factory = RequestFactory() + self.middleware = RequireAuthenticationMiddleware(lambda request: HttpResponse('ok')) + + def make_request(self, path='/orgs/', method='get', user=None, **meta): + """Build a request object with a controllable authenticated user state.""" + request_method = getattr(self.factory, method.lower()) + request = request_method(path, **meta) + request.user = user or AnonymousUser() + return request + + def test_allows_authenticated_request(self): + """Authenticated requests should bypass the anonymous access gate.""" + user = type('AuthenticatedUser', (), {'is_authenticated': True})() + + response = self.middleware(self.make_request(user=user)) + + self.assertEqual(response.status_code, 200) + + def test_blocks_anonymous_request_for_protected_path(self): + """Anonymous traffic to protected API paths should receive a 403 response.""" + response = self.middleware(self.make_request('/orgs/OCL/')) + + self.assertEqual(response.status_code, 403) + self.assertEqual( + json.loads(response.content), + { + 'detail': 'Authentication required. Anonymous API access is disabled.', + 'upgrade_url': 'https://app.openconceptlab.org/pricing', + } + ) + + def test_allows_anonymous_request_for_approved_client_header(self): + """Approved X-OCL-CLIENT values should retain anonymous access.""" + response = self.middleware(self.make_request('/orgs/OCL/', HTTP_X_OCL_CLIENT='test-client')) + + self.assertEqual(response.status_code, 200) + + def test_blocks_anonymous_request_for_unapproved_client_header(self): + """Unknown X-OCL-CLIENT values should still be rejected.""" + response = self.middleware(self.make_request('/orgs/OCL/', HTTP_X_OCL_CLIENT='unknown-client')) + + self.assertEqual(response.status_code, 403) + + def test_blocks_anonymous_request_for_whitespace_client_header(self): + """Whitespace-only client header values should be rejected after normalization.""" + response = self.middleware(self.make_request('/orgs/OCL/', HTTP_X_OCL_CLIENT=' ')) + + self.assertEqual(response.status_code, 403) + + def test_allows_anonymous_request_for_approved_api_key_header(self): + """Allowlisted anonymous API keys should bypass the gate.""" + response = self.middleware(self.make_request('/orgs/OCL/', HTTP_X_API_KEY='test-api-key')) + + self.assertEqual(response.status_code, 200) + + def test_allows_anonymous_request_for_approved_authorization_token(self): + """Allowlisted bearer or token credentials should bypass the gate.""" + response = self.middleware(self.make_request('/orgs/OCL/', HTTP_AUTHORIZATION='Token test-api-key')) + + self.assertEqual(response.status_code, 200) + + def test_blocks_anonymous_request_for_query_string_api_key(self): + """Query string API keys should not bypass the authentication gate.""" + response = self.middleware(self.make_request('/orgs/OCL/?api_key=test-api-key')) + + self.assertEqual(response.status_code, 403) + + def test_allows_anonymous_request_for_approved_ip(self): + """Allowlisted source IPs should keep anonymous access.""" + response = self.middleware(self.make_request('/orgs/OCL/', REMOTE_ADDR='10.0.0.1')) + + self.assertEqual(response.status_code, 200) + + def test_blocks_anonymous_request_for_forwarded_ip_only(self): + """Forwarded IP headers alone should not bypass the authentication gate.""" + response = self.middleware( + self.make_request('/orgs/OCL/', HTTP_X_FORWARDED_FOR='10.0.0.1', REMOTE_ADDR='203.0.113.5') + ) + + self.assertEqual(response.status_code, 403) + + def test_allows_options_request(self): + """CORS preflight requests should not be blocked.""" + response = self.middleware(self.make_request('/orgs/OCL/', method='options')) + + self.assertEqual(response.status_code, 200) + + def test_allows_elb_health_checker_request(self): + """Infrastructure health checks should bypass the gate.""" + response = self.middleware( + self.make_request('/orgs/OCL/', HTTP_USER_AGENT='ELB-HealthChecker/2.0') + ) + + self.assertEqual(response.status_code, 200) + + def test_allows_exempt_exact_paths(self): + """Public root-level utility endpoints should remain anonymous.""" + for path in ['/', '/version/', '/changelog/', '/feedback/', '/toggles/', '/locales/', '/events/']: + with self.subTest(path=path): + response = self.middleware(self.make_request(path)) + self.assertEqual(response.status_code, 200) + + def test_allows_exempt_path_prefixes(self): + """Auth, docs, admin, and FHIR prefixes should remain anonymous.""" + paths = [ + '/healthcheck/', + '/users/api-token/', + '/users/login/', + '/users/logout/', + '/users/signup/', + '/users/password/reset/', + '/users/oidc/code-exchange/', + '/oidc/authenticate/', + '/fhir/', + '/swagger/', + '/swagger.json', + '/redoc/', + '/admin/login/', + ] + + for path in paths: + with self.subTest(path=path): + response = self.middleware(self.make_request(path)) + self.assertEqual(response.status_code, 200) + + def test_allows_exempt_dynamic_user_paths(self): + """Public user verification and following endpoints should remain anonymous.""" + paths = [ + '/users/alice/verify/token-123/', + '/users/alice/sso-migrate/', + '/users/alice/following/', + ] + + for path in paths: + with self.subTest(path=path): + response = self.middleware(self.make_request(path)) + self.assertEqual(response.status_code, 200) + + +@override_settings(REQUIRE_AUTHENTICATION=False) +class RequireAuthenticationSettingsTest(SimpleTestCase): + """Verify auth-enforcement middleware configuration toggles cleanly.""" + + def test_authentication_middleware_not_inserted_when_disabled(self): + """RequireAuthenticationMiddleware should be absent when the feature is disabled.""" + self.assertNotIn('core.middlewares.middlewares.RequireAuthenticationMiddleware', settings.MIDDLEWARE) diff --git a/core/settings.py b/core/settings.py index 254d1783..7424412d 100644 --- a/core/settings.py +++ b/core/settings.py @@ -49,6 +49,17 @@ else: ENABLE_THROTTLING = os.environ.get('ENABLE_THROTTLING', False) in ['true', 'True', 'TRUE', True] + +def get_set_from_env(name): + """Return a trimmed set for comma-separated environment variables.""" + return {value.strip() for value in os.environ.get(name, '').split(',') if value.strip()} + + +REQUIRE_AUTHENTICATION = os.environ.get('REQUIRE_AUTHENTICATION', 'false').lower() in ['true', '1'] +APPROVED_ANONYMOUS_CLIENTS = get_set_from_env('APPROVED_ANONYMOUS_CLIENTS') +APPROVED_ANONYMOUS_API_KEYS = get_set_from_env('APPROVED_ANONYMOUS_API_KEYS') +APPROVED_ANONYMOUS_IPS = get_set_from_env('APPROVED_ANONYMOUS_IPS') + ALLOWED_HOSTS = ['*'] CORS_ALLOW_HEADERS = default_headers + ( @@ -199,6 +210,11 @@ } MIDDLEWARE = [*MIDDLEWARE, 'core.middlewares.middlewares.ThrottleHeadersMiddleware'] +if REQUIRE_AUTHENTICATION: + auth_middleware = 'django.contrib.auth.middleware.AuthenticationMiddleware' + auth_index = MIDDLEWARE.index(auth_middleware) + MIDDLEWARE.insert(auth_index + 1, 'core.middlewares.middlewares.RequireAuthenticationMiddleware') + ROOT_URLCONF = 'core.urls'