Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 21 additions & 29 deletions cloud_pipelines_backend/filter_query_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,12 @@ def build_list_filters(
page_token=page_token_value,
)

where_clauses, resolved_filter = _build_filter_where_clauses(
filter_value=filter_value,
current_user=current_user,
)
if filter_value:
filter_query_value = _convert_legacy_filter_to_filter_query(
filter_value=filter_value,
)

where_clauses: list[sql.ColumnElement] = []
if filter_query_value:
parsed = filter_query_models.FilterQuery.model_validate_json(filter_query_value)
where_clauses.append(
Expand All @@ -177,7 +178,6 @@ def build_list_filters(
next_page_token = _encode_page_token(
page_token_dict={
_PAGE_TOKEN_OFFSET_KEY: offset + page_size,
_PAGE_TOKEN_FILTER_KEY: resolved_filter,
_PAGE_TOKEN_FILTER_QUERY_KEY: filter_query_value,
}
)
Expand Down Expand Up @@ -217,39 +217,31 @@ def _parse_filter(filter: str) -> dict[str, str]:
return parsed_filter


def _build_filter_where_clauses(
def _convert_legacy_filter_to_filter_query(
*,
filter_value: str | None,
current_user: str | None,
) -> tuple[list[sql.ColumnElement], str | None]:
"""Parse a filter string into SQLAlchemy WHERE clauses.
filter_value: str,
) -> str:
"""Convert a legacy ``filter`` string to an equivalent ``filter_query`` JSON string.

Returns (where_clauses, next_page_filter_value). The second value is the
filter string with shorthand values resolved (e.g. "created_by:me" becomes
"created_by:alice@example.com") so it can be embedded in the next page token.
Only ``created_by`` is supported. ``"me"`` is NOT resolved here — the
downstream ``_maybe_resolve_system_values`` handles that.
"""
where_clauses: list[sql.ColumnElement] = []
parsed_filter = _parse_filter(filter_value) if filter_value else {}
for key, value in parsed_filter.items():
parsed = _parse_filter(filter_value)
predicates: list[dict] = []
for key, value in parsed.items():
if key == "_text":
raise NotImplementedError("Text search is not implemented yet.")
elif key == "created_by":
if value == "me":
if current_user is None:
current_user = ""
value = current_user
# TODO: Maybe make this a bit more robust.
# We need to change the filter since it goes into the next_page_token.
filter_value = filter_value.replace(
"created_by:me", f"created_by:{current_user}"
if not value:
raise errors.ApiValidationError(
"Legacy filter 'created_by' requires a non-empty value."
)
if value:
where_clauses.append(bts.PipelineRun.created_by == value)
else:
where_clauses.append(bts.PipelineRun.created_by == None)
predicates.append(
{"value_equals": {"key": SystemKey.CREATED_BY, "value": value}}
)
else:
raise NotImplementedError(f"Unsupported filter {filter_value}.")
return where_clauses, filter_value
return json.dumps({"and": predicates})


# ---------------------------------------------------------------------------
Expand Down
26 changes: 6 additions & 20 deletions tests/test_api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,13 @@ def test_list_filter_created_by(self, session_factory, service):
assert len(result.pipeline_runs) == 1
assert result.pipeline_runs[0].created_by == "user1"

def test_list_filter_created_by_empty(self, session_factory, service):
_create_run(
session_factory,
service,
root_task=_make_task_spec(),
created_by=None,
)
_create_run(
session_factory,
service,
root_task=_make_task_spec(),
created_by="user1",
)

def test_list_filter_created_by_empty_raises(self, session_factory, service):
with session_factory() as session:
result = service.list(
session=session,
filter="created_by:",
)
assert len(result.pipeline_runs) == 1
assert result.pipeline_runs[0].created_by is None
with pytest.raises(errors.ApiValidationError, match="non-empty value"):
service.list(
session=session,
filter="created_by:",
)

def test_list_pagination(self, session_factory, service):
for i in range(12):
Expand Down
98 changes: 53 additions & 45 deletions tests/test_filter_query_sql.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json

import pytest
import sqlalchemy as sql
from sqlalchemy.dialects import sqlite as sqlite_dialect
Expand Down Expand Up @@ -212,59 +214,46 @@ def test_decode_empty_string(self):
assert token == {}


class TestBuildFilterWhereClauses:
def test_no_filter(self):
clauses, next_filter = filter_query_sql._build_filter_where_clauses(
filter_value=None,
current_user=None,
)
assert clauses == []
assert next_filter is None

class TestConvertLegacyFilterToFilterQuery:
def test_created_by_literal(self):
clauses, next_filter = filter_query_sql._build_filter_where_clauses(
result = filter_query_sql._convert_legacy_filter_to_filter_query(
filter_value="created_by:alice",
current_user=None,
)
assert len(clauses) == 1
assert next_filter == "created_by:alice"

def test_created_by_me_resolves(self):
clauses, next_filter = filter_query_sql._build_filter_where_clauses(
filter_value="created_by:me",
current_user="alice@example.com",
)
assert len(clauses) == 1
assert next_filter == "created_by:alice@example.com"

def test_created_by_me_no_current_user(self):
clauses, next_filter = filter_query_sql._build_filter_where_clauses(
parsed = json.loads(result)
assert parsed == {
"and": [
{
"value_equals": {
"key": "system/pipeline_run.created_by",
"value": "alice",
}
}
]
}

def test_created_by_me_not_resolved(self):
result = filter_query_sql._convert_legacy_filter_to_filter_query(
filter_value="created_by:me",
current_user=None,
)
assert len(clauses) == 1
assert next_filter == "created_by:"
parsed = json.loads(result)
assert parsed["and"][0]["value_equals"]["value"] == "me"

def test_created_by_empty_value(self):
clauses, next_filter = filter_query_sql._build_filter_where_clauses(
filter_value="created_by:",
current_user=None,
)
assert len(clauses) == 1
assert next_filter == "created_by:"
def test_created_by_empty_raises(self):
with pytest.raises(errors.ApiValidationError, match="non-empty value"):
filter_query_sql._convert_legacy_filter_to_filter_query(
filter_value="created_by:",
)

def test_unsupported_key_raises(self):
with pytest.raises(NotImplementedError, match="Unsupported filter"):
filter_query_sql._build_filter_where_clauses(
filter_query_sql._convert_legacy_filter_to_filter_query(
filter_value="unknown_key:value",
current_user=None,
)

def test_text_search_raises(self):
with pytest.raises(NotImplementedError, match="Text search"):
filter_query_sql._build_filter_where_clauses(
filter_query_sql._convert_legacy_filter_to_filter_query(
filter_value="some_text_without_colon",
current_user=None,
)


Expand Down Expand Up @@ -294,7 +283,7 @@ def test_mutual_exclusivity_raises(self):
page_size=10,
)

def test_legacy_filter_produces_clauses(self):
def test_legacy_filter_produces_annotation_clause(self):
clauses, offset, next_token = filter_query_sql.build_list_filters(
filter_value="created_by:alice",
filter_query_value=None,
Expand All @@ -303,13 +292,21 @@ def test_legacy_filter_produces_clauses(self):
page_size=10,
)
assert len(clauses) == 1
assert _compile(clauses[0]) == (
"EXISTS (SELECT pipeline_run_annotation.pipeline_run_id"
" FROM pipeline_run_annotation, pipeline_run"
" WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id"
" AND pipeline_run_annotation.\"key\" = 'system/pipeline_run.created_by'"
" AND pipeline_run_annotation.value = 'alice')"
)
assert offset == 0
decoded = filter_query_sql._decode_page_token(page_token=next_token)
assert decoded["filter"] == "created_by:alice"
assert "filter" not in decoded
assert "filter_query" in decoded

def test_filter_query_produces_clauses(self):
fq = '{"and": [{"key_exists": {"key": "team"}}]}'
clauses, offset, next_token = filter_query_sql.build_list_filters(
clauses, _offset, next_token = filter_query_sql.build_list_filters(
filter_value=None,
filter_query_value=fq,
page_token_value=None,
Expand All @@ -326,7 +323,7 @@ def test_filter_query_produces_clauses(self):
decoded = filter_query_sql._decode_page_token(page_token=next_token)
assert decoded["filter_query"] == fq

def test_page_token_restores_offset_and_filters(self):
def test_page_token_with_legacy_filter_converts(self):
encoded_token = filter_query_sql._encode_page_token(
page_token_dict={
"offset": 20,
Expand All @@ -342,9 +339,17 @@ def test_page_token_restores_offset_and_filters(self):
)
assert offset == 20
assert len(clauses) == 1
assert _compile(clauses[0]) == (
"EXISTS (SELECT pipeline_run_annotation.pipeline_run_id"
" FROM pipeline_run_annotation, pipeline_run"
" WHERE pipeline_run_annotation.pipeline_run_id = pipeline_run.id"
" AND pipeline_run_annotation.\"key\" = 'system/pipeline_run.created_by'"
" AND pipeline_run_annotation.value = 'alice')"
)
decoded = filter_query_sql._decode_page_token(page_token=next_token)
assert decoded["offset"] == 30
assert decoded["filter"] == "created_by:alice"
assert "filter" not in decoded
assert "filter_query" in decoded

def test_page_token_restores_filter_query(self):
fq = '{"and": [{"key_exists": {"key": "env"}}]}'
Expand Down Expand Up @@ -379,7 +384,7 @@ def test_page_size_reflected_in_next_token(self):
assert decoded["offset"] == 25

def test_created_by_me_resolved_in_next_token(self):
clauses, offset, next_token = filter_query_sql.build_list_filters(
clauses, _offset, next_token = filter_query_sql.build_list_filters(
filter_value="created_by:me",
filter_query_value=None,
page_token_value=None,
Expand All @@ -388,7 +393,10 @@ def test_created_by_me_resolved_in_next_token(self):
)
assert len(clauses) == 1
decoded = filter_query_sql._decode_page_token(page_token=next_token)
assert decoded["filter"] == "created_by:bob@example.com"
assert "filter" not in decoded
assert "filter_query" in decoded
parsed_fq = json.loads(decoded["filter_query"])
assert parsed_fq["and"][0]["value_equals"]["value"] == "me"


class TestPipelineRunAnnotationSystemKeyValidation:
Expand Down