Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,19 @@ def create(
session.refresh(pipeline_run)
return PipelineRunResponse.from_db(pipeline_run)

def get(self, session: orm.Session, id: bts.IdType) -> PipelineRunResponse:
def get(
self,
session: orm.Session,
id: bts.IdType,
include_execution_stats: bool = False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To each their own on this:

I prefer to use an expand class for these use cases, which is more extensible in the future.

@dataclasses.dataclass(frozen=True)
class PipelineRunResponseExpandOptions:
    """A type-safe structure to define which fields to expand."""
    execution_stats: bool = False

then using it as:

if expand.execution_stats:
    response = self._populate_execution_stats(session=session, response=response)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could totally say "pre-mature class" for just one expansion, but I find the consistency and readiness preferable over future refactors or inconsistency.

) -> PipelineRunResponse:
pipeline_run = session.get(bts.PipelineRun, id)
if not pipeline_run:
raise ItemNotFoundError(f"Pipeline run {id} not found.")
return PipelineRunResponse.from_db(pipeline_run)
response = PipelineRunResponse.from_db(pipeline_run)
if include_execution_stats:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should expand the PipelineRun rather than the response? It provides more value to the rest of the codebase that might want to expand this information in other places later on.

This suggestion implies moving the logic to the service layer (e.g. a PipelineRunsService rather than what is currently PipelineRunsApiService_Sql) and adding the field to the PipelineRun model rather than onto the response model.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS: This relates to the fact that there might be opportunities to better organize our backend code in general. For example, orchestrator_sql.py has a lot going on.

response = self._populate_execution_stats(session=session, response=response)
return response

def terminate(
self,
Expand Down Expand Up @@ -258,12 +266,7 @@ def create_pipeline_run_response(
pipeline_name = component_spec.name
response.pipeline_name = pipeline_name
if include_execution_stats:
stats, summary = self._get_execution_stats_and_summary(
session=session,
root_execution_id=pipeline_run.root_execution_id,
)
response.execution_status_stats = stats
response.execution_summary = summary
response = self._populate_execution_stats(session=session, response=response)
return response

return ListPipelineJobsResponse(
Expand All @@ -274,6 +277,19 @@ def create_pipeline_run_response(
next_page_token=next_page_token,
)

def _populate_execution_stats(
Copy link
Collaborator

@morgan-wowk morgan-wowk Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to put general-purpose business logic on a separate class such as a PipelineRunsService rather than having functions like this in a module, and on a class, that is specifically tied to the http transport layer.

self,
session: orm.Session,
response: PipelineRunResponse,
) -> PipelineRunResponse:
stats, summary = self._get_execution_stats_and_summary(
session=session,
root_execution_id=response.root_execution_id,
)
response.execution_status_stats = stats
response.execution_summary = summary
return response

def _get_execution_stats_and_summary(
self,
session: orm.Session,
Expand Down
67 changes: 67 additions & 0 deletions tests/test_api_server_sql.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
from sqlalchemy import orm

from cloud_pipelines_backend import backend_types_sql as bts
Expand All @@ -6,6 +7,7 @@
ExecutionStatusSummary,
PipelineRunsApiService_Sql,
)
from cloud_pipelines_backend.errors import ItemNotFoundError


def _initialize_db_and_get_session_factory():
Expand Down Expand Up @@ -162,3 +164,68 @@ def test_list_with_execution_stats(self):
assert summary.total_executions == 2
assert summary.ended_executions == 1
assert summary.has_ended is False


class TestPipelineRunServiceGet:
def test_get_not_found(self):
session_factory = _initialize_db_and_get_session_factory()
service = PipelineRunsApiService_Sql()
with session_factory() as session:
with pytest.raises(ItemNotFoundError):
service.get(session=session, id="nonexistent-id")

def test_get_returns_pipeline_run(self):
session_factory = _initialize_db_and_get_session_factory()
service = PipelineRunsApiService_Sql()
with session_factory() as session:
root = _create_execution_node(session)
root_id = root.id
run = _create_pipeline_run(session, root, created_by="user1")
run_id = run.id
session.commit()

with session_factory() as session:
result = service.get(session=session, id=run_id)
assert result.id == run_id
assert result.root_execution_id == root_id
assert result.created_by == "user1"
assert result.execution_status_stats is None
assert result.execution_summary is None

def test_get_with_execution_stats(self):
session_factory = _initialize_db_and_get_session_factory()
service = PipelineRunsApiService_Sql()
with session_factory() as session:
root = _create_execution_node(session)
root_id = root.id
child1 = _create_execution_node(
session,
parent=root,
status=bts.ContainerExecutionStatus.SUCCEEDED,
)
child2 = _create_execution_node(
session,
parent=root,
status=bts.ContainerExecutionStatus.RUNNING,
)
_link_ancestor(session, child1, root)
_link_ancestor(session, child2, root)
run = _create_pipeline_run(session, root)
run_id = run.id
session.commit()

with session_factory() as session:
result = service.get(
session=session, id=run_id, include_execution_stats=True
)
assert result.id == run_id
assert result.root_execution_id == root_id
stats = result.execution_status_stats
assert stats is not None
assert stats["SUCCEEDED"] == 1
assert stats["RUNNING"] == 1
summary = result.execution_summary
assert summary is not None
assert summary.total_executions == 2
assert summary.ended_executions == 1
assert summary.has_ended is False