Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 10 additions & 110 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import base64
import dataclasses
import datetime
import json
import logging
import typing
from typing import Any, Final, Optional
Expand All @@ -12,7 +10,7 @@
from . import backend_types_sql as bts
from . import component_structures as structures
from . import errors
from . import filter_query_models
from . import filter_query_sql

if typing.TYPE_CHECKING:
from cloud_pipelines.orchestration.storage_providers import (
Expand Down Expand Up @@ -69,8 +67,6 @@ class ListPipelineJobsResponse:

class PipelineRunsApiService_Sql:
_PIPELINE_NAME_EXTRA_DATA_KEY = "pipeline_name"
_PAGE_TOKEN_OFFSET_KEY: Final[str] = "offset"
_PAGE_TOKEN_FILTER_KEY: Final[str] = "filter"
_DEFAULT_PAGE_SIZE: Final[int] = 10

def create(
Expand Down Expand Up @@ -173,22 +169,12 @@ def list(
include_pipeline_names: bool = False,
include_execution_stats: bool = False,
) -> ListPipelineJobsResponse:
if filter and filter_query:
raise errors.ApiValidationError(
"Cannot use both 'filter' and 'filter_query'. Use one or the other."
)

if filter_query:
filter_query_models.FilterQuery.model_validate_json(filter_query)
raise NotImplementedError("filter_query is not yet implemented.")

filter_value, offset = _resolve_filter_value(
filter=filter,
page_token=page_token,
)
where_clauses, next_page_filter_value = _build_filter_where_clauses(
filter_value=filter_value,
where_clauses, offset, next_token = filter_query_sql.build_list_filters(
filter_value=filter,
filter_query_value=filter_query,
page_token_value=page_token,
current_user=current_user,
page_size=self._DEFAULT_PAGE_SIZE,
)

pipeline_runs = list(
Expand All @@ -200,14 +186,10 @@ def list(
.limit(self._DEFAULT_PAGE_SIZE)
).all()
)
next_page_offset = offset + self._DEFAULT_PAGE_SIZE
next_page_token_dict = {
self._PAGE_TOKEN_OFFSET_KEY: next_page_offset,
self._PAGE_TOKEN_FILTER_KEY: next_page_filter_value,
}
next_page_token = _encode_page_token(next_page_token_dict)
if len(pipeline_runs) < self._DEFAULT_PAGE_SIZE:
next_page_token = None

next_page_token = (
next_token if len(pipeline_runs) >= self._DEFAULT_PAGE_SIZE else None
)

return ListPipelineJobsResponse(
pipeline_runs=[
Expand Down Expand Up @@ -348,88 +330,6 @@ def delete_annotation(
session.commit()


def _resolve_filter_value(
*,
filter: str | None,
page_token: str | None,
) -> tuple[str | None, int]:
"""Decode page_token and return the effective (filter_value, offset).

If a page_token is present, its stored filter takes precedence over the
raw filter parameter (the token carries the resolved filter forward across pages).
"""
page_token_dict = _decode_page_token(page_token)
offset = page_token_dict.get(
PipelineRunsApiService_Sql._PAGE_TOKEN_OFFSET_KEY,
0,
)
if page_token:
filter = page_token_dict.get(
PipelineRunsApiService_Sql._PAGE_TOKEN_FILTER_KEY,
None,
)
return filter, offset


def _build_filter_where_clauses(
*,
filter_value: str | None,
current_user: str | None,
) -> tuple[list[sql.ColumnElement], str | None]:
"""Parse a filter string into SQLAlchemy WHERE clauses.

Returns (where_clauses, next_page_filter_value). The second value is the
filter string with shorthand values resolved (e.g. "created_by:me" becomes
"created_by:alice@example.com") so it can be embedded in the next page token.
"""
where_clauses: list[sql.ColumnElement] = []
parsed_filter = _parse_filter(filter_value) if filter_value else {}
for key, value in parsed_filter.items():
if key == "_text":
raise NotImplementedError("Text search is not implemented yet.")
elif key == "created_by":
if value == "me":
if current_user is None:
current_user = ""
value = current_user
# TODO: Maybe make this a bit more robust.
# We need to change the filter since it goes into the next_page_token.
filter_value = filter_value.replace(
"created_by:me", f"created_by:{current_user}"
)
if value:
where_clauses.append(bts.PipelineRun.created_by == value)
else:
where_clauses.append(bts.PipelineRun.created_by == None)
else:
raise NotImplementedError(f"Unsupported filter {filter_value}.")
return where_clauses, filter_value


def _decode_page_token(page_token: str) -> dict[str, Any]:
return json.loads(base64.b64decode(page_token)) if page_token else {}


def _encode_page_token(page_token_dict: dict[str, Any]) -> str:
return (base64.b64encode(json.dumps(page_token_dict).encode("utf8"))).decode(
"utf-8"
)


def _parse_filter(filter: str) -> dict[str, str]:
# TODO: Improve
parts = filter.strip().split()
parsed_filter = {}
for part in parts:
key, sep, value = part.partition(":")
if sep:
parsed_filter[key] = value
else:
parsed_filter.setdefault("_text", "")
parsed_filter["_text"] += part
return parsed_filter


# ========== ExecutionNodeApiService_Sql


Expand Down
17 changes: 16 additions & 1 deletion cloud_pipelines_backend/backend_types_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import datetime
import enum
import typing
from typing import Any
from typing import Any, Final

import sqlalchemy as sql
from sqlalchemy import orm
Expand Down Expand Up @@ -294,6 +294,9 @@ class ExecutionToAncestorExecutionLink(_TableBase):
# So we need to jump through extra hoops to make the relationship many-to-many again.
class ExecutionNode(_TableBase):
__tablename__ = "execution_node"
_IX_EXECUTION_NODE_CACHE_KEY: Final[str] = (
"ix_execution_node_container_execution_cache_key"
)
id: orm.Mapped[IdType] = orm.mapped_column(
primary_key=True, init=False, insert_default=generate_unique_id
)
Expand Down Expand Up @@ -491,6 +494,9 @@ class ContainerExecution(_TableBase):

class PipelineRunAnnotation(_TableBase):
__tablename__ = "pipeline_run_annotation"
_IX_ANNOTATION_RUN_ID_KEY_VALUE: Final[str] = (
"ix_pipeline_run_annotation_pipeline_run_id_key_value"
)
pipeline_run_id: orm.Mapped[IdType] = orm.mapped_column(
sql.ForeignKey(PipelineRun.id),
primary_key=True,
Expand All @@ -500,6 +506,15 @@ class PipelineRunAnnotation(_TableBase):
key: orm.Mapped[str] = orm.mapped_column(default=None, primary_key=True)
value: orm.Mapped[str | None] = orm.mapped_column(default=None)

__table_args__ = (
sql.Index(
_IX_ANNOTATION_RUN_ID_KEY_VALUE,
"pipeline_run_id",
"key",
"value",
),
)


class Secret(_TableBase):
__tablename__ = "secret"
Expand Down
1 change: 0 additions & 1 deletion cloud_pipelines_backend/component_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import pydantic.alias_generators
from pydantic.dataclasses import dataclass as pydantic_dataclasses


# PrimitiveTypes = Union[str, int, float, bool]
PrimitiveTypes = str

Expand Down
8 changes: 7 additions & 1 deletion cloud_pipelines_backend/database_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,11 @@ def migrate_db(db_engine: sqlalchemy.Engine):
# Or we need to avoid calling the Index constructor.

for index in bts.ExecutionNode.__table__.indexes:
if index.name == "ix_execution_node_container_execution_cache_key":
if index.name == bts.ExecutionNode._IX_EXECUTION_NODE_CACHE_KEY:
index.create(db_engine, checkfirst=True)
break

for index in bts.PipelineRunAnnotation.__table__.indexes:
if index.name == bts.PipelineRunAnnotation._IX_ANNOTATION_RUN_ID_KEY_VALUE:
index.create(db_engine, checkfirst=True)
break
Loading