Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 105 additions & 2 deletions core/middlewares/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import time

import requests
from django.conf import settings
from django.http import HttpResponseNotFound, HttpResponse
from django.http.response import JsonResponse
from django.utils.deprecation import MiddlewareMixin
from request_logging.middleware import LoggingMiddleware
from rest_framework.views import APIView
Expand Down Expand Up @@ -52,7 +54,6 @@ class ResponseHeadersMiddleware(BaseMiddleware):
def __call__(self, request):
start_time = time.time()
response = self.get_response(request)
from django.conf import settings
response[VERSION_HEADER] = settings.VERSION
try:
response[REQUEST_USER_HEADER] = str(getattr(request, 'user', None))
Expand Down Expand Up @@ -89,6 +90,109 @@ def __call__(self, request):
return response


class RequireAuthenticationMiddleware(BaseMiddleware):
"""Block anonymous API access unless the request matches an approved bypass."""

exempt_path_prefixes = (
'/healthcheck/',
'/users/api-token/',
'/users/login/',
'/users/logout/',
'/users/signup/',
'/users/password/reset/',
'/users/oidc/',
'/oidc/',
'/fhir/',
'/swagger',
'/redoc/',
'/admin/',
)
exempt_exact_paths = {
'/',
'/version',
'/changelog',
'/feedback',
'/toggles',
'/locales',
'/events',
}
forbidden_response = {
'detail': 'Authentication required. Anonymous API access is disabled.',
'upgrade_url': 'https://app.openconceptlab.org/pricing',
}

def __call__(self, request):
"""Allow exempt and approved anonymous traffic, otherwise return 403."""
if self.is_request_allowed(request):
return self.get_response(request)

return JsonResponse(self.forbidden_response, status=403)

def is_request_allowed(self, request):
"""Return whether the current request can bypass authentication enforcement."""
if request.method == 'OPTIONS' or request.META.get('HTTP_USER_AGENT', '').startswith('ELB-HealthChecker'):
return True

user = getattr(request, 'user', None)
return getattr(user, 'is_authenticated', False) or self.is_exempt_path(request.path) or \
self.has_approved_client_header(request) or \
self.has_approved_api_key(request) or self.has_approved_ip(request)

@classmethod
def is_exempt_path(cls, path):
"""Return whether a request path must remain accessible to anonymous users."""
normalized_path = path.rstrip('/') or '/'
if normalized_path in cls.exempt_exact_paths:
return True

if any(path.startswith(prefix) for prefix in cls.exempt_path_prefixes):
return True

return (
path.startswith('/users/')
and (
'/verify/' in path
or path.endswith('/sso-migrate/')
or path.endswith('/following/')
)
)

@staticmethod
def has_approved_client_header(request):
"""Match the configured anonymous allowlist against the X-OCL-CLIENT header."""
client_name = request.META.get('HTTP_X_OCL_CLIENT', '').strip()
return bool(client_name and client_name in settings.APPROVED_ANONYMOUS_CLIENTS)

@staticmethod
def has_approved_api_key(request):
"""Match allowlisted anonymous API keys from common request locations."""
approved_keys = settings.APPROVED_ANONYMOUS_API_KEYS
if not approved_keys:
return False

authorization = request.META.get('HTTP_AUTHORIZATION', '').strip()
x_api_key = request.META.get('HTTP_X_API_KEY', '').strip()
tokens = [authorization, x_api_key]
bearer_token = authorization.split(None, 1)[1].strip() if ' ' in authorization else ''
if bearer_token:
tokens.append(bearer_token)

return any(token and token in approved_keys for token in tokens)

@staticmethod
def has_approved_ip(request):
"""Match source IPs using the socket address only."""
approved_ips = settings.APPROVED_ANONYMOUS_IPS
if not approved_ips:
return False

remote_addr = request.META.get('REMOTE_ADDR', '').strip()
if not remote_addr:
return False

return remote_addr in approved_ips


class FhirMiddleware(BaseMiddleware):
"""
It is used to expose FHIR endpoints under FHIR subdomain only and convert content from xml to json.
Expand All @@ -98,7 +202,6 @@ class FhirMiddleware(BaseMiddleware):
def __call__(self, request):
absolute_uri = request.build_absolute_uri()

from django.conf import settings
if settings.FHIR_SUBDOMAIN:
uri = absolute_uri.split('/')
domain = uri[2] if len(uri) > 2 else ''
Expand Down
167 changes: 167 additions & 0 deletions core/middlewares/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import json

from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.http import HttpResponse
from django.test import RequestFactory, SimpleTestCase, override_settings

from core.middlewares.middlewares import RequireAuthenticationMiddleware


@override_settings(
REQUIRE_AUTHENTICATION=True,
APPROVED_ANONYMOUS_CLIENTS=['test-client'],
APPROVED_ANONYMOUS_API_KEYS=['test-api-key'],
APPROVED_ANONYMOUS_IPS=['10.0.0.1'],
)
class RequireAuthenticationMiddlewareTest(SimpleTestCase):
"""Verify anonymous authentication enforcement and approved bypasses."""

def setUp(self):
"""Create a request factory and middleware instance for each test."""
self.factory = RequestFactory()
self.middleware = RequireAuthenticationMiddleware(lambda request: HttpResponse('ok'))

def make_request(self, path='/orgs/', method='get', user=None, **meta):
"""Build a request object with a controllable authenticated user state."""
request_method = getattr(self.factory, method.lower())
request = request_method(path, **meta)
request.user = user or AnonymousUser()
return request

def test_allows_authenticated_request(self):
"""Authenticated requests should bypass the anonymous access gate."""
user = type('AuthenticatedUser', (), {'is_authenticated': True})()

response = self.middleware(self.make_request(user=user))

self.assertEqual(response.status_code, 200)

def test_blocks_anonymous_request_for_protected_path(self):
"""Anonymous traffic to protected API paths should receive a 403 response."""
response = self.middleware(self.make_request('/orgs/OCL/'))

self.assertEqual(response.status_code, 403)
self.assertEqual(
json.loads(response.content),
{
'detail': 'Authentication required. Anonymous API access is disabled.',
'upgrade_url': 'https://app.openconceptlab.org/pricing',
}
)

def test_allows_anonymous_request_for_approved_client_header(self):
"""Approved X-OCL-CLIENT values should retain anonymous access."""
response = self.middleware(self.make_request('/orgs/OCL/', HTTP_X_OCL_CLIENT='test-client'))

self.assertEqual(response.status_code, 200)

def test_blocks_anonymous_request_for_unapproved_client_header(self):
"""Unknown X-OCL-CLIENT values should still be rejected."""
response = self.middleware(self.make_request('/orgs/OCL/', HTTP_X_OCL_CLIENT='unknown-client'))

self.assertEqual(response.status_code, 403)

def test_blocks_anonymous_request_for_whitespace_client_header(self):
"""Whitespace-only client header values should be rejected after normalization."""
response = self.middleware(self.make_request('/orgs/OCL/', HTTP_X_OCL_CLIENT=' '))

self.assertEqual(response.status_code, 403)

def test_allows_anonymous_request_for_approved_api_key_header(self):
"""Allowlisted anonymous API keys should bypass the gate."""
response = self.middleware(self.make_request('/orgs/OCL/', HTTP_X_API_KEY='test-api-key'))

self.assertEqual(response.status_code, 200)

def test_allows_anonymous_request_for_approved_authorization_token(self):
"""Allowlisted bearer or token credentials should bypass the gate."""
response = self.middleware(self.make_request('/orgs/OCL/', HTTP_AUTHORIZATION='Token test-api-key'))

self.assertEqual(response.status_code, 200)

def test_blocks_anonymous_request_for_query_string_api_key(self):
"""Query string API keys should not bypass the authentication gate."""
response = self.middleware(self.make_request('/orgs/OCL/?api_key=test-api-key'))

self.assertEqual(response.status_code, 403)

def test_allows_anonymous_request_for_approved_ip(self):
"""Allowlisted source IPs should keep anonymous access."""
response = self.middleware(self.make_request('/orgs/OCL/', REMOTE_ADDR='10.0.0.1'))

self.assertEqual(response.status_code, 200)

def test_blocks_anonymous_request_for_forwarded_ip_only(self):
"""Forwarded IP headers alone should not bypass the authentication gate."""
response = self.middleware(
self.make_request('/orgs/OCL/', HTTP_X_FORWARDED_FOR='10.0.0.1', REMOTE_ADDR='203.0.113.5')
)

self.assertEqual(response.status_code, 403)

def test_allows_options_request(self):
"""CORS preflight requests should not be blocked."""
response = self.middleware(self.make_request('/orgs/OCL/', method='options'))

self.assertEqual(response.status_code, 200)

def test_allows_elb_health_checker_request(self):
"""Infrastructure health checks should bypass the gate."""
response = self.middleware(
self.make_request('/orgs/OCL/', HTTP_USER_AGENT='ELB-HealthChecker/2.0')
)

self.assertEqual(response.status_code, 200)

def test_allows_exempt_exact_paths(self):
"""Public root-level utility endpoints should remain anonymous."""
for path in ['/', '/version/', '/changelog/', '/feedback/', '/toggles/', '/locales/', '/events/']:
with self.subTest(path=path):
response = self.middleware(self.make_request(path))
self.assertEqual(response.status_code, 200)

def test_allows_exempt_path_prefixes(self):
"""Auth, docs, admin, and FHIR prefixes should remain anonymous."""
paths = [
'/healthcheck/',
'/users/api-token/',
'/users/login/',
'/users/logout/',
'/users/signup/',
'/users/password/reset/',
'/users/oidc/code-exchange/',
'/oidc/authenticate/',
'/fhir/',
'/swagger/',
'/swagger.json',
'/redoc/',
'/admin/login/',
]

for path in paths:
with self.subTest(path=path):
response = self.middleware(self.make_request(path))
self.assertEqual(response.status_code, 200)

def test_allows_exempt_dynamic_user_paths(self):
"""Public user verification and following endpoints should remain anonymous."""
paths = [
'/users/alice/verify/token-123/',
'/users/alice/sso-migrate/',
'/users/alice/following/',
]

for path in paths:
with self.subTest(path=path):
response = self.middleware(self.make_request(path))
self.assertEqual(response.status_code, 200)


@override_settings(REQUIRE_AUTHENTICATION=False)
class RequireAuthenticationSettingsTest(SimpleTestCase):
"""Verify auth-enforcement middleware configuration toggles cleanly."""

def test_authentication_middleware_not_inserted_when_disabled(self):
"""RequireAuthenticationMiddleware should be absent when the feature is disabled."""
self.assertNotIn('core.middlewares.middlewares.RequireAuthenticationMiddleware', settings.MIDDLEWARE)
16 changes: 16 additions & 0 deletions core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@
else:
ENABLE_THROTTLING = os.environ.get('ENABLE_THROTTLING', False) in ['true', 'True', 'TRUE', True]


def get_set_from_env(name):
"""Return a trimmed set for comma-separated environment variables."""
return {value.strip() for value in os.environ.get(name, '').split(',') if value.strip()}


REQUIRE_AUTHENTICATION = os.environ.get('REQUIRE_AUTHENTICATION', 'false').lower() in ['true', '1']
APPROVED_ANONYMOUS_CLIENTS = get_set_from_env('APPROVED_ANONYMOUS_CLIENTS')
APPROVED_ANONYMOUS_API_KEYS = get_set_from_env('APPROVED_ANONYMOUS_API_KEYS')
APPROVED_ANONYMOUS_IPS = get_set_from_env('APPROVED_ANONYMOUS_IPS')

ALLOWED_HOSTS = ['*']

CORS_ALLOW_HEADERS = default_headers + (
Expand Down Expand Up @@ -199,6 +210,11 @@
}
MIDDLEWARE = [*MIDDLEWARE, 'core.middlewares.middlewares.ThrottleHeadersMiddleware']

if REQUIRE_AUTHENTICATION:
auth_middleware = 'django.contrib.auth.middleware.AuthenticationMiddleware'
auth_index = MIDDLEWARE.index(auth_middleware)
MIDDLEWARE.insert(auth_index + 1, 'core.middlewares.middlewares.RequireAuthenticationMiddleware')


ROOT_URLCONF = 'core.urls'

Expand Down
Loading