Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 59 additions & 18 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from . import backend_types_sql as bts
from . import component_structures as structures
from . import database_ops
from . import errors
from . import filter_query_sql

Expand All @@ -32,6 +33,27 @@ def _get_current_time() -> datetime.datetime:
return datetime.datetime.now(tz=datetime.timezone.utc)


def _get_pipeline_name_from_task_spec(
*,
task_spec_dict: dict[str, Any],
) -> str | None:
"""Extract pipeline name from a task_spec dict via component_ref.spec.name.

Traversal path:
task_spec_dict -> TaskSpec -> component_ref -> spec -> name

Returns None if any step in the chain is missing or parsing fails.
"""
try:
task_spec = structures.TaskSpec.from_json_dict(task_spec_dict)
except Exception:
return None
spec = task_spec.component_ref.spec
if spec is None:
return None
return spec.name or None


# ==== PipelineJobService
@dataclasses.dataclass(kw_only=True)
class PipelineRunResponse:
Expand Down Expand Up @@ -113,19 +135,15 @@ def create(
},
)
session.add(pipeline_run)
# Mirror created_by into the annotations table so it's searchable
# via filter_query like any other annotation.
if created_by is not None:
# Flush to populate pipeline_run.id (server-generated) before inserting the annotation FK.
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
session.flush()
session.add(
bts.PipelineRunAnnotation(
pipeline_run_id=pipeline_run.id,
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
value=created_by,
)
)
# Flush to populate pipeline_run.id (server-generated) before inserting annotation FKs.
# TODO: Use ORM relationship instead of explicit flush + manual FK assignment.
session.flush()
_mirror_system_annotations(
session=session,
pipeline_run_id=pipeline_run.id,
created_by=created_by,
pipeline_name=pipeline_name,
)
session.commit()

session.refresh(pipeline_run)
Expand Down Expand Up @@ -244,12 +262,9 @@ def _create_pipeline_run_response(
bts.ExecutionNode, pipeline_run.root_execution_id
)
if execution_node:
task_spec = structures.TaskSpec.from_json_dict(
execution_node.task_spec
pipeline_name = _get_pipeline_name_from_task_spec(
task_spec_dict=execution_node.task_spec
)
component_spec = task_spec.component_ref.spec
if component_spec:
pipeline_name = component_spec.name
response.pipeline_name = pipeline_name
if include_execution_stats:
execution_status_stats = self._calculate_execution_status_stats(
Expand Down Expand Up @@ -1153,6 +1168,32 @@ def list_secrets(
]


def _mirror_system_annotations(
*,
session: orm.Session,
pipeline_run_id: bts.IdType,
created_by: str | None,
pipeline_name: str | None,
) -> None:
"""Mirror pipeline run fields as system annotations for filter_query search."""
if created_by:
session.add(
bts.PipelineRunAnnotation(
pipeline_run_id=pipeline_run_id,
key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY,
value=created_by,
)
)
if pipeline_name:
session.add(
bts.PipelineRunAnnotation(
pipeline_run_id=pipeline_run_id,
key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
value=pipeline_name,
)
)


def _recursively_create_all_executions_and_artifacts_root(
session: orm.Session,
root_task_spec: structures.TaskSpec,
Expand Down
232 changes: 232 additions & 0 deletions cloud_pipelines_backend/database_ops.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

import sqlalchemy
from sqlalchemy import orm

Expand Down Expand Up @@ -87,6 +89,7 @@ def migrate_db(db_engine: sqlalchemy.Engine):
break

_backfill_pipeline_run_created_by_annotations(db_engine=db_engine)
_backfill_pipeline_run_name_annotations(db_engine=db_engine)


def _is_pipeline_run_annotation_key_already_backfilled(
Expand Down Expand Up @@ -142,3 +145,232 @@ def _backfill_pipeline_run_created_by_annotations(
)
session.execute(stmt)
session.commit()


def _backfill_pipeline_names_from_extra_data(
*,
session: orm.Session,
) -> None:
"""Phase 1: bulk SQL backfill from extra_data['pipeline_name'].

INSERT INTO pipeline_run_annotation
SELECT id, key, json_extract(extra_data, '$.pipeline_name')
FROM pipeline_run
WHERE json_extract(...) IS NOT NULL

Valid (creates annotation row):
extra_data = {"pipeline_name": "my-pipeline"} -> value = "my-pipeline"
extra_data = {"pipeline_name": ""} -> value = ""

Skipped (no annotation row):
extra_data = NULL -> JSON_EXTRACT = NULL
extra_data = {} -> key absent, NULL
extra_data = {"pipeline_name": null} -> JSON_EXTRACT = NULL

SQLAlchemy's JSON path extraction is NULL-safe: returns SQL NULL
when extra_data is NULL or the key is absent (no Python error).
"""
pipeline_name_expr = bts.PipelineRun.extra_data["pipeline_name"].as_string()
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
["pipeline_run_id", "key", "value"],
sqlalchemy.select(
bts.PipelineRun.id,
sqlalchemy.literal(
filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
),
pipeline_name_expr,
).where(
pipeline_name_expr.isnot(None),
),
)
session.execute(stmt)


def _backfill_pipeline_names_from_component_spec(
*,
session: orm.Session,
) -> None:
"""Phase 2: Bulk SQL fallback for runs still missing a name annotation.

Extracts the pipeline name from each run's ExecutionNode via the
JSON path:

task_spec -> 'componentRef' -> 'spec' ->> 'name'

Starting tables:

pipeline_run execution_node
+----+------------------+ +--------+-------------------------------------------+
| id | root_execution_id| | id | task_spec (JSON) |
+----+------------------+ +--------+-------------------------------------------+
| 1 | exec_1 | | exec_1 | {"componentRef":{"spec":{"name":"A"}}} |
| 2 | exec_2 | | exec_2 | {"componentRef":{"spec":null}} |
| 3 | exec_3 | | exec_3 | {"componentRef":{"spec":{"name":""}}} |
| 4 | exec_4 | | exec_4 | {"componentRef":{"spec":{"name":"B"}}} |
| 5 | exec_99 | +--------+-------------------------------------------+
+----+------------------+ (no exec_99 row)

pipeline_run_annotation (pre-existing)
+--------+---------------------------+-------+
| run_id | key | value |
+--------+---------------------------+-------+
| 1 | system/pipeline_run.name | A |
| 3 | user/custom_tag | hello |
+--------+---------------------------+-------+

Step 1 -- JOIN execution_node (INNER JOIN):
Attaches task_spec to each run. Drops runs with no execution_node.

FROM pipeline_run pr
JOIN execution_node en ON en.id = pr.root_execution_id

+----+--------+-------------------------------------------+
| id | en.id | en.task_spec |
+----+--------+-------------------------------------------+
| 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} |
| 2 | exec_2 | {"componentRef":{"spec":null}} |
| 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} |
| 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} |
+----+--------+-------------------------------------------+
(run 5 dropped -- exec_99 doesn't exist)

Step 2a -- LEFT JOIN annotation:
Attempts to match each run to an existing name annotation.

LEFT JOIN pipeline_run_annotation ann
ON ann.pipeline_run_id = pr.id
AND ann.key = 'system/pipeline_run.name'

+----+--------+------------------------------------------+------------------+----------+
| id | en.id | en.task_spec | ann.run_id | ann.key |
+----+--------+------------------------------------------+------------------+----------+
| 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | 1 | sys/name |
| 2 | exec_2 | {"componentRef":{"spec":null}} | NULL | NULL |
| 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | NULL | NULL |
| 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | NULL | NULL |
+----+--------+------------------------------------------+------------------+----------+
(run 1 matched -- has 'system/pipeline_run.name' annotation)
(run 3 NULL -- has 'user/custom_tag' but ON requires key = 'system/pipeline_run.name')

Step 2b -- WHERE ann.pipeline_run_id IS NULL (anti-join filter):
Keeps only runs where the LEFT JOIN found no match.

WHERE ann.pipeline_run_id IS NULL

+----+--------+-------------------------------------------+
| id | en.id | en.task_spec |
+----+--------+-------------------------------------------+
| 2 | exec_2 | {"componentRef":{"spec":null}} |
| 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} |
| 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} |
+----+--------+-------------------------------------------+
(run 1 dropped -- ann.run_id was 1, not NULL)

Step 3 -- JSON extraction + NULL filter:
Extracts name from JSON path, keeps only non-null (empty string is allowed).

WHERE task_spec->'componentRef'->'spec'->>'name' IS NOT NULL

+----+-------------------------------------------+-----------+
| id | en.task_spec | name_expr |
+----+-------------------------------------------+-----------+
| 2 | {"componentRef":{"spec":null}} | NULL | <- dropped
| 3 | {"componentRef":{"spec":{"name":""}}} | "" | <- kept (empty string OK)
| 4 | {"componentRef":{"spec":{"name":"B"}}} | "B" | <- kept
+----+-------------------------------------------+-----------+

Step 4 -- INSERT INTO pipeline_run_annotation:
Inserts one row per surviving run.

INSERT INTO pipeline_run_annotation (pipeline_run_id, key, value)
+--------+---------------------------+-------+
| run_id | key | value |
+--------+---------------------------+-------+
| 3 | system/pipeline_run.name | |
| 4 | system/pipeline_run.name | B |
+--------+---------------------------+-------+

The JSON path is portable across databases via SQLAlchemy:
- SQLite: JSON_EXTRACT(task_spec, '$.componentRef.spec.name')
- MySQL: JSON_UNQUOTE(JSON_EXTRACT(...))
- PostgreSQL: task_spec -> 'componentRef' -> 'spec' ->> 'name'

Any null at any depth (task_spec NULL, componentRef missing,
spec null, name missing) produces SQL NULL, filtered out by
IS NOT NULL. Empty string is allowed and will be inserted.
"""
key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME
name_expr = bts.ExecutionNode.task_spec[
("componentRef", "spec", "name")
].as_string()
existing_ann = orm.aliased(bts.PipelineRunAnnotation)

# Step 4: INSERT INTO pipeline_run_annotation
stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select(
["pipeline_run_id", "key", "value"],
sqlalchemy.select(
bts.PipelineRun.id,
sqlalchemy.literal(str(key)),
name_expr,
)
# Step 1: INNER JOIN execution_node
.join(
bts.ExecutionNode,
bts.ExecutionNode.id == bts.PipelineRun.root_execution_id,
)
# Step 2a: LEFT JOIN existing annotation
.outerjoin(
existing_ann,
sqlalchemy.and_(
existing_ann.pipeline_run_id == bts.PipelineRun.id,
existing_ann.key == key,
),
).where(
# Step 2b: Anti-join — keep only runs with no existing annotation
existing_ann.pipeline_run_id.is_(None),
# Step 3: JSON extraction — keep only non-NULL names
name_expr.isnot(None),
),
)
session.execute(stmt)


def _backfill_pipeline_run_name_annotations(
*,
db_engine: sqlalchemy.Engine,
) -> None:
"""Backfill pipeline_run_annotation with pipeline names.

The check and both inserts run in a single session/transaction to
avoid TOCTOU races between concurrent startup processes. If anything
fails, the entire transaction rolls back automatically.

Skips entirely if any name annotation already exists (i.e. the
write-path is populating them, so the backfill has already run or is
no longer needed).

Phase 1 -- _backfill_pipeline_names_from_extra_data:
Bulk SQL insert from extra_data['pipeline_name'].

Phase 2 -- _backfill_pipeline_names_from_component_spec:
Bulk SQL fallback for runs Phase 1 missed (extra_data is NULL or
missing the key). Extracts name via JSON path
task_spec -> componentRef -> spec -> name.

Annotation creation rules (same for both phases):
Creates row: any non-NULL string, including empty string ""
Skips row: NULL at any depth in the JSON path
"""
with orm.Session(db_engine) as session:
if _is_pipeline_run_annotation_key_already_backfilled(
session=session,
key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME,
):
return

# execute() - rows in DB buffer for Phase 2
_backfill_pipeline_names_from_extra_data(session=session)
# Phase 2 sees Phase 1's rows via the shared transaction buffer.
_backfill_pipeline_names_from_component_spec(session=session)
# Both phases become permanent atomically.
session.commit()
Loading