Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/routers/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def get_or_create_webhook_endpoint(
webhook: schemas.WebhookEndpointCreate = Body(
..., description="Webhook endpoint parameters"
),
jwt_params: JWTParams = Depends(require_auth()),
jwt_params: JWTParams = Depends(require_auth(workspace_name="workspace_id")),
db: AsyncSession = db,
) -> schemas.WebhookEndpoint:
"""
Expand All @@ -55,7 +55,7 @@ async def get_or_create_webhook_endpoint(
@router.get("", response_model=Page[schemas.WebhookEndpoint])
async def list_webhook_endpoints(
workspace_id: str = Path(..., description="Workspace ID"),
jwt_params: JWTParams = Depends(require_auth()),
jwt_params: JWTParams = Depends(require_auth(workspace_name="workspace_id")),
db: AsyncSession = db,
) -> Page[schemas.WebhookEndpoint]:
"""
Expand All @@ -72,7 +72,7 @@ async def list_webhook_endpoints(
async def delete_webhook_endpoint(
workspace_id: str = Path(..., description="Workspace ID"),
endpoint_id: str = Path(..., description="Webhook endpoint ID"),
jwt_params: JWTParams = Depends(require_auth()),
jwt_params: JWTParams = Depends(require_auth(workspace_name="workspace_id")),
db: AsyncSession = db,
) -> None:
"""
Expand All @@ -88,7 +88,7 @@ async def delete_webhook_endpoint(
@router.get("/test")
async def test_emit(
workspace_id: str = Path(..., description="Workspace ID"),
jwt_params: JWTParams = Depends(require_auth()),
jwt_params: JWTParams = Depends(require_auth(workspace_name="workspace_id")),
) -> None:
"""
Test publishing a webhook event.
Expand Down
42 changes: 30 additions & 12 deletions src/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ def verify_jwt(token: str) -> JWTParams:
params.p = decoded["p"]
if "s" in decoded:
params.s = decoded["s"]
# Token-shape invariant: a peer- or session-scoped token MUST also
# carry its parent workspace. Without `w`, the route-level check
# cannot rule out cross-workspace use (a `{p: "alice"}` token would
# match `alice` in any workspace).
if (params.s is not None or params.p is not None) and params.w is None:
raise AuthenticationException(
"Invalid JWT scope: peer/session token missing workspace"
)
return params
except jwt.PyJWTError:
raise AuthenticationException("Invalid JWT") from None
Expand Down Expand Up @@ -171,30 +179,40 @@ async def auth(

jwt_params = verify_jwt(credentials.credentials)

# based on api operation, verify api key based on that key's permissions
# Authorize by the token's narrowest scope, not by the route's. A
# narrower-than-workspace token must NOT fall back to workspace access:
# `{w: ws, p: alice}` may only act on `alice`, never on a sibling peer.
if jwt_params.ad:
return jwt_params
if admin:
raise AuthenticationException("Resource requires admin privileges")

# For session level access
if session_name and jwt_params.s == session_name:
if not any([session_name, peer_name, workspace_name]):
# Self-authorizing routes decode the token here and compare the claims
# against body/path data inside the handler. This is needed for routes
# whose resource identifier is not available to require_auth().
return jwt_params

if jwt_params.s is not None:
if not session_name or jwt_params.s != session_name:
raise AuthenticationException("JWT not permissioned for this resource")
if workspace_name and jwt_params.w != workspace_name:
raise AuthenticationException("JWT not permissioned for this resource")
return jwt_params

# For peer level access
if peer_name and jwt_params.p == peer_name:
if jwt_params.p is not None:
if not peer_name or jwt_params.p != peer_name:
raise AuthenticationException("JWT not permissioned for this resource")
if workspace_name and jwt_params.w != workspace_name:
raise AuthenticationException("JWT not permissioned for this resource")
return jwt_params

# For workspace level access - can access all peers/sessions under this workspace
if workspace_name and jwt_params.w == workspace_name:
if jwt_params.w is not None:
# Workspace tokens reach any route inside their workspace. Routes
# without a declared workspace (e.g. POST /v3/workspaces) self-authorize
# by reading jwt_params.w themselves.
if workspace_name and jwt_params.w != workspace_name:
raise AuthenticationException("JWT not permissioned for this resource")
return jwt_params

if any([session_name, peer_name, workspace_name]):
raise AuthenticationException("JWT not permissioned for this resource")

# Route did not specify any parameters, so it should parse parameters itself
return jwt_params
raise AuthenticationException("JWT not permissioned for this resource")
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def emit(self, record: logging.LogRecord):
# LLM transport tests mock providers directly and don't need database/runtime setup.
"tests/utils/test_length_finish_reason.py",
"tests/utils/test_clients.py",
# Pure JWT scope tests — operate on src.security directly, no DB needed.
"tests/test_security.py",
)

_LIVE_LLM_MARKER = "live_llm"
Expand Down
8 changes: 4 additions & 4 deletions tests/routes/test_scoped_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def test_get_peer_by_name_with_auth(
# Test with peer-scoped JWT
if auth_client.auth_type == "empty":
auth_client.headers["Authorization"] = (
f"Bearer {create_jwt(JWTParams(p=test_peer.name))}"
f"Bearer {create_jwt(JWTParams(w=test_workspace.name, p=test_peer.name))}"
)

# Get specific peer using get_or_create endpoint
Expand Down Expand Up @@ -218,7 +218,7 @@ def test_create_session_with_auth(
# Test with peer-scoped JWT
if auth_client.auth_type == "empty":
auth_client.headers["Authorization"] = (
f"Bearer {create_jwt(JWTParams(p=test_peer.name))}"
f"Bearer {create_jwt(JWTParams(w=test_workspace.name, p=test_peer.name))}"
)

session_name2 = str(generate_nanoid())
Expand Down Expand Up @@ -262,7 +262,7 @@ def test_get_session_by_name_with_auth(
if auth_client.auth_type == "empty":
# Test with session-scoped JWT
auth_client.headers["Authorization"] = (
f"Bearer {create_jwt(JWTParams(s=session_name))}"
f"Bearer {create_jwt(JWTParams(w=test_workspace.name, s=session_name))}"
)

response = auth_client.post(
Expand All @@ -282,7 +282,7 @@ def test_get_session_by_name_with_auth(

# Test with peer-scoped JWT
auth_client.headers["Authorization"] = (
f"Bearer {create_jwt(JWTParams(p=test_peer.name))}"
f"Bearer {create_jwt(JWTParams(w=test_workspace.name, p=test_peer.name))}"
)

assert auth_client.post(
Expand Down
173 changes: 173 additions & 0 deletions tests/test_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Auth scope tests — DEV-1736 regression coverage.

Prior to this fix `auth()` walked the route's declared scope first and fell
through to a workspace check, so a `{w, p}` token authorized any peer in `w`.
The contract now is: authorize by the token's narrowest claim, never widen.
"""

import jwt as pyjwt
import pytest
from fastapi.security import HTTPAuthorizationCredentials

from src.config import settings
from src.exceptions import AuthenticationException
from src.security import JWTParams, auth, create_jwt, verify_jwt


@pytest.fixture(autouse=True)
def _enable_auth(monkeypatch: pytest.MonkeyPatch): # pyright: ignore[reportUnusedFunction]
monkeypatch.setattr(settings.AUTH, "USE_AUTH", True)
monkeypatch.setattr(settings.AUTH, "JWT_SECRET", "test-secret")


def _bearer(token: str) -> HTTPAuthorizationCredentials:
return HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)


class TestVerifyJWTShape:
def test_peer_token_without_workspace_rejected(self):
token = pyjwt.encode({"p": "alice"}, b"test-secret", algorithm="HS256")
with pytest.raises(AuthenticationException):
verify_jwt(token)

def test_session_token_without_workspace_rejected(self):
token = pyjwt.encode({"s": "sess-1"}, b"test-secret", algorithm="HS256")
with pytest.raises(AuthenticationException):
verify_jwt(token)

def test_workspace_only_token_ok(self):
token = create_jwt(JWTParams(w="ws-a"))
params = verify_jwt(token)
assert params.w == "ws-a"

def test_workspace_peer_token_ok(self):
token = create_jwt(JWTParams(w="ws-a", p="alice"))
params = verify_jwt(token)
assert params.w == "ws-a"
assert params.p == "alice"


class TestAuthPeerScope:
"""`{w: ws-a, p: alice}` may only act on alice in ws-a."""

@pytest.mark.asyncio
async def test_matches_own_peer(self):
creds = _bearer(create_jwt(JWTParams(w="ws-a", p="alice")))
params = await auth(credentials=creds, workspace_name="ws-a", peer_name="alice")
assert params.p == "alice"

@pytest.mark.asyncio
async def test_denies_sibling_peer_same_workspace(self):
"""The original bug: peer-scoped token fell through to workspace auth."""
creds = _bearer(create_jwt(JWTParams(w="ws-a", p="alice")))
with pytest.raises(AuthenticationException):
await auth(credentials=creds, workspace_name="ws-a", peer_name="bob")

@pytest.mark.asyncio
async def test_denies_workspace_route_with_no_peer(self):
"""Peer-scoped token cannot use workspace-listing routes."""
creds = _bearer(create_jwt(JWTParams(w="ws-a", p="alice")))
with pytest.raises(AuthenticationException):
await auth(credentials=creds, workspace_name="ws-a")

@pytest.mark.asyncio
async def test_self_authorizing_route_receives_claims(self):
"""Body-scoped routes use require_auth() and compare claims in-handler."""
creds = _bearer(create_jwt(JWTParams(w="ws-a", p="alice")))
params = await auth(credentials=creds)
assert params.w == "ws-a"
assert params.p == "alice"

@pytest.mark.asyncio
async def test_denies_cross_workspace(self):
creds = _bearer(create_jwt(JWTParams(w="ws-a", p="alice")))
with pytest.raises(AuthenticationException):
await auth(credentials=creds, workspace_name="ws-b", peer_name="alice")


class TestAuthSessionScope:
@pytest.mark.asyncio
async def test_matches_own_session(self):
creds = _bearer(create_jwt(JWTParams(w="ws-a", s="sess-1")))
params = await auth(
credentials=creds, workspace_name="ws-a", session_name="sess-1"
)
assert params.s == "sess-1"

@pytest.mark.asyncio
async def test_denies_sibling_session_same_workspace(self):
creds = _bearer(create_jwt(JWTParams(w="ws-a", s="sess-1")))
with pytest.raises(AuthenticationException):
await auth(credentials=creds, workspace_name="ws-a", session_name="sess-2")

@pytest.mark.asyncio
async def test_denies_workspace_route_with_no_session(self):
creds = _bearer(create_jwt(JWTParams(w="ws-a", s="sess-1")))
with pytest.raises(AuthenticationException):
await auth(credentials=creds, workspace_name="ws-a")

@pytest.mark.asyncio
async def test_self_authorizing_route_receives_claims(self):
creds = _bearer(create_jwt(JWTParams(w="ws-a", s="sess-1")))
params = await auth(credentials=creds)
assert params.w == "ws-a"
assert params.s == "sess-1"


class TestAuthWorkspaceScope:
@pytest.mark.asyncio
async def test_matches_workspace(self):
creds = _bearer(create_jwt(JWTParams(w="ws-a")))
params = await auth(credentials=creds, workspace_name="ws-a")
assert params.w == "ws-a"

@pytest.mark.asyncio
async def test_workspace_token_reaches_peer_route(self):
"""Workspace tokens still authorize narrower routes inside the workspace."""
creds = _bearer(create_jwt(JWTParams(w="ws-a")))
params = await auth(credentials=creds, workspace_name="ws-a", peer_name="alice")
assert params.w == "ws-a"

@pytest.mark.asyncio
async def test_denies_cross_workspace(self):
creds = _bearer(create_jwt(JWTParams(w="ws-a")))
with pytest.raises(AuthenticationException):
await auth(credentials=creds, workspace_name="ws-b")

@pytest.mark.asyncio
async def test_passes_self_authorizing_route(self):
"""Routes with no declared scope (e.g. POST /v3/workspaces) self-authorize
on the token's `w`. The auth dependency must let workspace tokens through."""
creds = _bearer(create_jwt(JWTParams(w="ws-a")))
params = await auth(credentials=creds)
assert params.w == "ws-a"


class TestAuthAdminAndUnscoped:
@pytest.mark.asyncio
async def test_admin_passes_any_route(self):
creds = _bearer(create_jwt(JWTParams(ad=True)))
params = await auth(credentials=creds, workspace_name="ws-a", peer_name="alice")
assert params.ad is True

@pytest.mark.asyncio
async def test_non_admin_token_denied_on_admin_route(self):
creds = _bearer(create_jwt(JWTParams(w="ws-a")))
with pytest.raises(AuthenticationException):
await auth(credentials=creds, admin=True)

@pytest.mark.asyncio
async def test_unscoped_token_on_self_authorizing_route(self):
"""A token with no scope claims and a route with no declared scope is the
escape hatch for routes that introspect jwt_params themselves."""
creds = _bearer(create_jwt(JWTParams()))
params = await auth(credentials=creds)
assert params.w is None
assert params.p is None
assert params.s is None

@pytest.mark.asyncio
async def test_unscoped_token_denied_on_scoped_route(self):
creds = _bearer(create_jwt(JWTParams()))
with pytest.raises(AuthenticationException):
await auth(credentials=creds, workspace_name="ws-a")
Loading