Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cloud_pipelines_backend/api_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,24 @@ def handle_item_already_exists_error(
content={"message": str(exc)},
)

@app.exception_handler(errors.ApiValidationError)
def handle_api_validation_error(
request: fastapi.Request, exc: errors.ApiValidationError
):
return fastapi.responses.JSONResponse(
status_code=422,
content={"detail": str(exc)},
)

@app.exception_handler(NotImplementedError)
def handle_not_implemented_error(
request: fastapi.Request, exc: NotImplementedError
):
return fastapi.responses.JSONResponse(
status_code=501,
content={"detail": str(exc)},
)

get_user_details_dependency = fastapi.Depends(user_details_getter)

def get_user_name(
Expand Down
11 changes: 11 additions & 0 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from . import backend_types_sql as bts
from . import component_structures as structures
from . import errors
from . import filter_query_models

if typing.TYPE_CHECKING:
from cloud_pipelines.orchestration.storage_providers import (
Expand Down Expand Up @@ -167,10 +168,20 @@ def list(
session: orm.Session,
page_token: str | None = None,
filter: str | None = None,
filter_query: str | None = None,
current_user: str | None = None,
include_pipeline_names: bool = False,
include_execution_stats: bool = False,
) -> ListPipelineJobsResponse:
if filter and filter_query:
raise errors.ApiValidationError(
"Cannot use both 'filter' and 'filter_query'. Use one or the other."
)

if filter_query:
filter_query_models.FilterQuery.model_validate_json(filter_query)
raise NotImplementedError("filter_query is not yet implemented.")

filter_value, offset = _resolve_filter_value(
filter=filter,
page_token=page_token,
Expand Down
6 changes: 6 additions & 0 deletions cloud_pipelines_backend/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,9 @@ class ItemAlreadyExistsError(Exception):

class PermissionError(Exception):
pass


class ApiValidationError(Exception):
"""Base for all filter/annotation validation errors -> 422."""

pass
130 changes: 130 additions & 0 deletions cloud_pipelines_backend/filter_query_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from __future__ import annotations

from typing import Annotated

import pydantic

NonEmptyStr = Annotated[str, pydantic.StringConstraints(min_length=1)]


class _BaseModel(pydantic.BaseModel):
model_config = {"extra": "forbid"}


# --- Leaf argument models ---


class KeyExists(_BaseModel):
key: NonEmptyStr


class ValueContains(_BaseModel):
key: NonEmptyStr
value_substring: NonEmptyStr


class ValueIn(_BaseModel):
key: NonEmptyStr
values: list[NonEmptyStr] = pydantic.Field(min_length=1)


class ValueEquals(_BaseModel):
key: NonEmptyStr
value: str


class TimeRange(_BaseModel):
"""At least one of start_time or end_time is required.

Valid combinations: start+end (range), start-only (after), end-only (before).
AwareDatetime requires timezone info (e.g. "2024-01-01T00:00:00Z").
Naive datetimes like "2024-01-01T00:00:00" are rejected, preventing
ambiguous timestamps that could silently resolve to the wrong timezone.
"""

key: NonEmptyStr
start_time: pydantic.AwareDatetime | None = None
end_time: pydantic.AwareDatetime | None = None

@pydantic.model_validator(mode="after")
def _at_least_one_time_bound(self) -> TimeRange:
if self.start_time is None and self.end_time is None:
raise ValueError(
"TimeRange requires at least one of 'start_time' or 'end_time'."
)
return self


# --- Predicate wrapper models (one field each) ---


class KeyExistsPredicate(_BaseModel):
key_exists: KeyExists


class ValueContainsPredicate(_BaseModel):
value_contains: ValueContains


class ValueInPredicate(_BaseModel):
value_in: ValueIn


class ValueEqualsPredicate(_BaseModel):
value_equals: ValueEquals


class TimeRangePredicate(_BaseModel):
time_range: TimeRange


LeafPredicate = (
KeyExistsPredicate
| ValueContainsPredicate
| ValueInPredicate
| ValueEqualsPredicate
| TimeRangePredicate
)


class NotPredicate(_BaseModel):
not_: LeafPredicate = pydantic.Field(alias="not")


class AndPredicate(_BaseModel):
and_: list["Predicate"] = pydantic.Field(alias="and", min_length=1)


class OrPredicate(_BaseModel):
or_: list["Predicate"] = pydantic.Field(alias="or", min_length=1)


Predicate = (
KeyExistsPredicate
| ValueContainsPredicate
| ValueInPredicate
| ValueEqualsPredicate
| TimeRangePredicate
| NotPredicate
| AndPredicate
| OrPredicate
)

# Resolve forward reference to "Predicate" in recursive and/or models
AndPredicate.model_rebuild()
OrPredicate.model_rebuild()


class FilterQuery(_BaseModel):
"""Root: must be exactly one of {"and": [...]} or {"or": [...]}."""

and_: list[Predicate] | None = pydantic.Field(None, alias="and", min_length=1)
or_: list[Predicate] | None = pydantic.Field(None, alias="or", min_length=1)

@pydantic.model_validator(mode="after")
def _exactly_one_root_operator(self) -> FilterQuery:
has_and = self.and_ is not None
has_or = self.or_ is not None
if has_and == has_or:
raise ValueError("FilterQuery root must have exactly one of 'and' or 'or'.")
return self
34 changes: 33 additions & 1 deletion tests/test_api_server_sql.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import pytest
from sqlalchemy import orm

from cloud_pipelines_backend import api_server_sql
from cloud_pipelines_backend import backend_types_sql as bts
from cloud_pipelines_backend import component_structures as structures
from cloud_pipelines_backend import api_server_sql
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend import errors


class TestExecutionStatusSummary:
Expand Down Expand Up @@ -537,3 +538,34 @@ def test_text_search_raises(self):
filter_value="some_text_without_colon",
current_user=None,
)


class TestFilterQueryApiWiring:
def test_filter_query_returns_not_implemented(self, session_factory, service):
valid_json = '{"and": [{"key_exists": {"key": "team"}}]}'
with session_factory() as session:
with pytest.raises(NotImplementedError, match="not yet implemented"):
service.list(
session=session,
filter_query=valid_json,
)

def test_filter_query_validates_before_501(self, session_factory, service):
from pydantic import ValidationError

invalid_json = '{"bad_key": "not_valid"}'
with session_factory() as session:
with pytest.raises(ValidationError):
service.list(
session=session,
filter_query=invalid_json,
)

def test_mutual_exclusivity_rejected(self, session_factory, service):
with session_factory() as session:
with pytest.raises(errors.ApiValidationError, match="Cannot use both"):
service.list(
session=session,
filter="created_by:alice",
filter_query='{"and": [{"key_exists": {"key": "team"}}]}',
)
Loading