From ab7fb4fc18790f645aacdd6620f50e00a91c0dcd Mon Sep 17 00:00:00 2001 From: Yue Chao Qin Date: Wed, 25 Feb 2026 23:04:58 -0800 Subject: [PATCH] feat: Search pipeline name in pipeline run API --- cloud_pipelines_backend/api_server_sql.py | 77 ++- cloud_pipelines_backend/database_ops.py | 232 +++++++ cloud_pipelines_backend/filter_query_sql.py | 17 +- tests/test_api_server_sql.py | 166 ++++- tests/test_database_ops.py | 708 +++++++++++++++++--- tests/test_filter_query_sql.py | 35 + 6 files changed, 1119 insertions(+), 116 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index e53e2a3..0d3f818 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -9,6 +9,7 @@ from . import backend_types_sql as bts from . import component_structures as structures +from . import database_ops from . import errors from . import filter_query_sql @@ -32,6 +33,27 @@ def _get_current_time() -> datetime.datetime: return datetime.datetime.now(tz=datetime.timezone.utc) +def _get_pipeline_name_from_task_spec( + *, + task_spec_dict: dict[str, Any], +) -> str | None: + """Extract pipeline name from a task_spec dict via component_ref.spec.name. + + Traversal path: + task_spec_dict -> TaskSpec -> component_ref -> spec -> name + + Returns None if any step in the chain is missing or parsing fails. + """ + try: + task_spec = structures.TaskSpec.from_json_dict(task_spec_dict) + except Exception: + return None + spec = task_spec.component_ref.spec + if spec is None: + return None + return spec.name or None + + # ==== PipelineJobService @dataclasses.dataclass(kw_only=True) class PipelineRunResponse: @@ -113,19 +135,15 @@ def create( }, ) session.add(pipeline_run) - # Mirror created_by into the annotations table so it's searchable - # via filter_query like any other annotation. - if created_by is not None: - # Flush to populate pipeline_run.id (server-generated) before inserting the annotation FK. - # TODO: Use ORM relationship instead of explicit flush + manual FK assignment. - session.flush() - session.add( - bts.PipelineRunAnnotation( - pipeline_run_id=pipeline_run.id, - key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, - value=created_by, - ) - ) + # Flush to populate pipeline_run.id (server-generated) before inserting annotation FKs. + # TODO: Use ORM relationship instead of explicit flush + manual FK assignment. + session.flush() + _mirror_system_annotations( + session=session, + pipeline_run_id=pipeline_run.id, + created_by=created_by, + pipeline_name=pipeline_name, + ) session.commit() session.refresh(pipeline_run) @@ -244,12 +262,9 @@ def _create_pipeline_run_response( bts.ExecutionNode, pipeline_run.root_execution_id ) if execution_node: - task_spec = structures.TaskSpec.from_json_dict( - execution_node.task_spec + pipeline_name = _get_pipeline_name_from_task_spec( + task_spec_dict=execution_node.task_spec ) - component_spec = task_spec.component_ref.spec - if component_spec: - pipeline_name = component_spec.name response.pipeline_name = pipeline_name if include_execution_stats: execution_status_stats = self._calculate_execution_status_stats( @@ -1153,6 +1168,32 @@ def list_secrets( ] +def _mirror_system_annotations( + *, + session: orm.Session, + pipeline_run_id: bts.IdType, + created_by: str | None, + pipeline_name: str | None, +) -> None: + """Mirror pipeline run fields as system annotations for filter_query search.""" + if created_by: + session.add( + bts.PipelineRunAnnotation( + pipeline_run_id=pipeline_run_id, + key=filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY, + value=created_by, + ) + ) + if pipeline_name: + session.add( + bts.PipelineRunAnnotation( + pipeline_run_id=pipeline_run_id, + key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, + value=pipeline_name, + ) + ) + + def _recursively_create_all_executions_and_artifacts_root( session: orm.Session, root_task_spec: structures.TaskSpec, diff --git a/cloud_pipelines_backend/database_ops.py b/cloud_pipelines_backend/database_ops.py index 92a36ea..2cf6a38 100644 --- a/cloud_pipelines_backend/database_ops.py +++ b/cloud_pipelines_backend/database_ops.py @@ -1,3 +1,5 @@ +from typing import Any + import sqlalchemy from sqlalchemy import orm @@ -87,6 +89,7 @@ def migrate_db(db_engine: sqlalchemy.Engine): break _backfill_pipeline_run_created_by_annotations(db_engine=db_engine) + _backfill_pipeline_run_name_annotations(db_engine=db_engine) def _is_pipeline_run_annotation_key_already_backfilled( @@ -142,3 +145,232 @@ def _backfill_pipeline_run_created_by_annotations( ) session.execute(stmt) session.commit() + + +def _backfill_pipeline_names_from_extra_data( + *, + session: orm.Session, +) -> None: + """Phase 1: bulk SQL backfill from extra_data['pipeline_name']. + + INSERT INTO pipeline_run_annotation + SELECT id, key, json_extract(extra_data, '$.pipeline_name') + FROM pipeline_run + WHERE json_extract(...) IS NOT NULL + + Valid (creates annotation row): + extra_data = {"pipeline_name": "my-pipeline"} -> value = "my-pipeline" + extra_data = {"pipeline_name": ""} -> value = "" + + Skipped (no annotation row): + extra_data = NULL -> JSON_EXTRACT = NULL + extra_data = {} -> key absent, NULL + extra_data = {"pipeline_name": null} -> JSON_EXTRACT = NULL + + SQLAlchemy's JSON path extraction is NULL-safe: returns SQL NULL + when extra_data is NULL or the key is absent (no Python error). + """ + pipeline_name_expr = bts.PipelineRun.extra_data["pipeline_name"].as_string() + stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select( + ["pipeline_run_id", "key", "value"], + sqlalchemy.select( + bts.PipelineRun.id, + sqlalchemy.literal( + filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, + ), + pipeline_name_expr, + ).where( + pipeline_name_expr.isnot(None), + ), + ) + session.execute(stmt) + + +def _backfill_pipeline_names_from_component_spec( + *, + session: orm.Session, +) -> None: + """Phase 2: Bulk SQL fallback for runs still missing a name annotation. + + Extracts the pipeline name from each run's ExecutionNode via the + JSON path: + + task_spec -> 'componentRef' -> 'spec' ->> 'name' + + Starting tables: + + pipeline_run execution_node + +----+------------------+ +--------+-------------------------------------------+ + | id | root_execution_id| | id | task_spec (JSON) | + +----+------------------+ +--------+-------------------------------------------+ + | 1 | exec_1 | | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | + | 2 | exec_2 | | exec_2 | {"componentRef":{"spec":null}} | + | 3 | exec_3 | | exec_3 | {"componentRef":{"spec":{"name":""}}} | + | 4 | exec_4 | | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | + | 5 | exec_99 | +--------+-------------------------------------------+ + +----+------------------+ (no exec_99 row) + + pipeline_run_annotation (pre-existing) + +--------+---------------------------+-------+ + | run_id | key | value | + +--------+---------------------------+-------+ + | 1 | system/pipeline_run.name | A | + | 3 | user/custom_tag | hello | + +--------+---------------------------+-------+ + + Step 1 -- JOIN execution_node (INNER JOIN): + Attaches task_spec to each run. Drops runs with no execution_node. + + FROM pipeline_run pr + JOIN execution_node en ON en.id = pr.root_execution_id + + +----+--------+-------------------------------------------+ + | id | en.id | en.task_spec | + +----+--------+-------------------------------------------+ + | 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | + | 2 | exec_2 | {"componentRef":{"spec":null}} | + | 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | + | 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | + +----+--------+-------------------------------------------+ + (run 5 dropped -- exec_99 doesn't exist) + + Step 2a -- LEFT JOIN annotation: + Attempts to match each run to an existing name annotation. + + LEFT JOIN pipeline_run_annotation ann + ON ann.pipeline_run_id = pr.id + AND ann.key = 'system/pipeline_run.name' + + +----+--------+------------------------------------------+------------------+----------+ + | id | en.id | en.task_spec | ann.run_id | ann.key | + +----+--------+------------------------------------------+------------------+----------+ + | 1 | exec_1 | {"componentRef":{"spec":{"name":"A"}}} | 1 | sys/name | + | 2 | exec_2 | {"componentRef":{"spec":null}} | NULL | NULL | + | 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | NULL | NULL | + | 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | NULL | NULL | + +----+--------+------------------------------------------+------------------+----------+ + (run 1 matched -- has 'system/pipeline_run.name' annotation) + (run 3 NULL -- has 'user/custom_tag' but ON requires key = 'system/pipeline_run.name') + + Step 2b -- WHERE ann.pipeline_run_id IS NULL (anti-join filter): + Keeps only runs where the LEFT JOIN found no match. + + WHERE ann.pipeline_run_id IS NULL + + +----+--------+-------------------------------------------+ + | id | en.id | en.task_spec | + +----+--------+-------------------------------------------+ + | 2 | exec_2 | {"componentRef":{"spec":null}} | + | 3 | exec_3 | {"componentRef":{"spec":{"name":""}}} | + | 4 | exec_4 | {"componentRef":{"spec":{"name":"B"}}} | + +----+--------+-------------------------------------------+ + (run 1 dropped -- ann.run_id was 1, not NULL) + + Step 3 -- JSON extraction + NULL filter: + Extracts name from JSON path, keeps only non-null (empty string is allowed). + + WHERE task_spec->'componentRef'->'spec'->>'name' IS NOT NULL + + +----+-------------------------------------------+-----------+ + | id | en.task_spec | name_expr | + +----+-------------------------------------------+-----------+ + | 2 | {"componentRef":{"spec":null}} | NULL | <- dropped + | 3 | {"componentRef":{"spec":{"name":""}}} | "" | <- kept (empty string OK) + | 4 | {"componentRef":{"spec":{"name":"B"}}} | "B" | <- kept + +----+-------------------------------------------+-----------+ + + Step 4 -- INSERT INTO pipeline_run_annotation: + Inserts one row per surviving run. + + INSERT INTO pipeline_run_annotation (pipeline_run_id, key, value) + +--------+---------------------------+-------+ + | run_id | key | value | + +--------+---------------------------+-------+ + | 3 | system/pipeline_run.name | | + | 4 | system/pipeline_run.name | B | + +--------+---------------------------+-------+ + + The JSON path is portable across databases via SQLAlchemy: + - SQLite: JSON_EXTRACT(task_spec, '$.componentRef.spec.name') + - MySQL: JSON_UNQUOTE(JSON_EXTRACT(...)) + - PostgreSQL: task_spec -> 'componentRef' -> 'spec' ->> 'name' + + Any null at any depth (task_spec NULL, componentRef missing, + spec null, name missing) produces SQL NULL, filtered out by + IS NOT NULL. Empty string is allowed and will be inserted. + """ + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + name_expr = bts.ExecutionNode.task_spec[ + ("componentRef", "spec", "name") + ].as_string() + existing_ann = orm.aliased(bts.PipelineRunAnnotation) + + # Step 4: INSERT INTO pipeline_run_annotation + stmt = sqlalchemy.insert(bts.PipelineRunAnnotation).from_select( + ["pipeline_run_id", "key", "value"], + sqlalchemy.select( + bts.PipelineRun.id, + sqlalchemy.literal(str(key)), + name_expr, + ) + # Step 1: INNER JOIN execution_node + .join( + bts.ExecutionNode, + bts.ExecutionNode.id == bts.PipelineRun.root_execution_id, + ) + # Step 2a: LEFT JOIN existing annotation + .outerjoin( + existing_ann, + sqlalchemy.and_( + existing_ann.pipeline_run_id == bts.PipelineRun.id, + existing_ann.key == key, + ), + ).where( + # Step 2b: Anti-join — keep only runs with no existing annotation + existing_ann.pipeline_run_id.is_(None), + # Step 3: JSON extraction — keep only non-NULL names + name_expr.isnot(None), + ), + ) + session.execute(stmt) + + +def _backfill_pipeline_run_name_annotations( + *, + db_engine: sqlalchemy.Engine, +) -> None: + """Backfill pipeline_run_annotation with pipeline names. + + The check and both inserts run in a single session/transaction to + avoid TOCTOU races between concurrent startup processes. If anything + fails, the entire transaction rolls back automatically. + + Skips entirely if any name annotation already exists (i.e. the + write-path is populating them, so the backfill has already run or is + no longer needed). + + Phase 1 -- _backfill_pipeline_names_from_extra_data: + Bulk SQL insert from extra_data['pipeline_name']. + + Phase 2 -- _backfill_pipeline_names_from_component_spec: + Bulk SQL fallback for runs Phase 1 missed (extra_data is NULL or + missing the key). Extracts name via JSON path + task_spec -> componentRef -> spec -> name. + + Annotation creation rules (same for both phases): + Creates row: any non-NULL string, including empty string "" + Skips row: NULL at any depth in the JSON path + """ + with orm.Session(db_engine) as session: + if _is_pipeline_run_annotation_key_already_backfilled( + session=session, + key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, + ): + return + + # execute() - rows in DB buffer for Phase 2 + _backfill_pipeline_names_from_extra_data(session=session) + # Phase 2 sees Phase 1's rows via the shared transaction buffer. + _backfill_pipeline_names_from_component_spec(session=session) + # Both phases become permanent atomically. + session.commit() diff --git a/cloud_pipelines_backend/filter_query_sql.py b/cloud_pipelines_backend/filter_query_sql.py index dfcce1c..5d65684 100644 --- a/cloud_pipelines_backend/filter_query_sql.py +++ b/cloud_pipelines_backend/filter_query_sql.py @@ -10,10 +10,12 @@ from . import filter_query_models SYSTEM_KEY_PREFIX: Final[str] = "system/" +_PIPELINE_RUN_KEY_PREFIX: Final[str] = f"{SYSTEM_KEY_PREFIX}pipeline_run." class PipelineRunAnnotationSystemKey(enum.StrEnum): - CREATED_BY = f"{SYSTEM_KEY_PREFIX}pipeline_run.created_by" + CREATED_BY = f"{_PIPELINE_RUN_KEY_PREFIX}created_by" + PIPELINE_NAME = f"{_PIPELINE_RUN_KEY_PREFIX}name" SYSTEM_KEY_SUPPORTED_PREDICATES: dict[PipelineRunAnnotationSystemKey, set[type]] = { @@ -22,6 +24,12 @@ class PipelineRunAnnotationSystemKey(enum.StrEnum): filter_query_models.ValueEqualsPredicate, filter_query_models.ValueInPredicate, }, + PipelineRunAnnotationSystemKey.PIPELINE_NAME: { + filter_query_models.KeyExistsPredicate, + filter_query_models.ValueEqualsPredicate, + filter_query_models.ValueContainsPredicate, + filter_query_models.ValueInPredicate, + }, } # --------------------------------------------------------------------------- @@ -237,7 +245,12 @@ def _convert_legacy_filter_to_filter_query( "Legacy filter 'created_by' requires a non-empty value." ) predicates.append( - {"value_equals": {"key": SystemKey.CREATED_BY, "value": value}} + { + "value_equals": { + "key": PipelineRunAnnotationSystemKey.CREATED_BY, + "value": value, + } + } ) else: raise NotImplementedError(f"Unsupported filter {filter_value}.") diff --git a/tests/test_api_server_sql.py b/tests/test_api_server_sql.py index 97fc87e..dca0e6f 100644 --- a/tests/test_api_server_sql.py +++ b/tests/test_api_server_sql.py @@ -346,24 +346,86 @@ def test_create_without_created_by(self, session_factory, service): result = _create_run(session_factory, service, root_task=_make_task_spec()) assert result.created_by is None - def test_create_writes_created_by_annotation(self, session_factory, service): + def test_create_mirrors_name_and_created_by(self, session_factory, service): run = _create_run( session_factory, service, - root_task=_make_task_spec(), - created_by="alice@example.com", + root_task=_make_task_spec("my-pipeline"), + created_by="alice", ) with session_factory() as session: annotations = service.list_annotations(session=session, id=run.id) + assert ( + annotations[filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME] + == "my-pipeline" + ) assert ( annotations[filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY] - == "alice@example.com" + == "alice" ) - def test_create_without_created_by_no_annotation(self, session_factory, service): - run = _create_run(session_factory, service, root_task=_make_task_spec()) + def test_create_mirrors_name_only(self, session_factory, service): + run = _create_run( + session_factory, + service, + root_task=_make_task_spec("solo-pipeline"), + ) with session_factory() as session: annotations = service.list_annotations(session=session, id=run.id) + assert ( + annotations[filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME] + == "solo-pipeline" + ) + assert ( + filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + not in annotations + ) + + def test_create_mirrors_created_by_only(self, session_factory, service): + task_spec = _make_task_spec("placeholder") + task_spec.component_ref.spec.name = None + run = _create_run( + session_factory, service, root_task=task_spec, created_by="alice" + ) + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert ( + annotations[filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY] + == "alice" + ) + assert ( + filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + not in annotations + ) + + def test_create_skips_mirror_when_empty_values(self, session_factory, service): + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(""), + created_by="", + ) + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert ( + filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + not in annotations + ) + assert ( + filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + not in annotations + ) + + def test_create_skips_mirror_when_both_absent(self, session_factory, service): + task_spec = _make_task_spec("placeholder") + task_spec.component_ref.spec.name = None + run = _create_run(session_factory, service, root_task=task_spec) + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert ( + filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + not in annotations + ) assert ( filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY not in annotations @@ -371,6 +433,35 @@ def test_create_without_created_by_no_annotation(self, session_factory, service) class TestPipelineRunAnnotationCrud: + def test_system_annotations_coexist_with_user_annotations( + self, session_factory, service + ): + run = _create_run( + session_factory, + service, + root_task=_make_task_spec("my-pipeline"), + created_by="alice", + ) + with session_factory() as session: + service.set_annotation( + session=session, + id=run.id, + key="team", + value="ml-ops", + user_name="alice", + ) + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations["team"] == "ml-ops" + assert ( + annotations[filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME] + == "my-pipeline" + ) + assert ( + annotations[filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY] + == "alice" + ) + def test_set_annotation(self, session_factory, service): run = _create_run( session_factory, @@ -443,11 +534,13 @@ def test_delete_annotation(self, session_factory, service): annotations = service.list_annotations(session=session, id=run.id) assert "team" not in annotations - def test_list_annotations_empty(self, session_factory, service): + def test_list_annotations_only_system(self, session_factory, service): run = _create_run(session_factory, service, root_task=_make_task_spec()) with session_factory() as session: annotations = service.list_annotations(session=session, id=run.id) - assert annotations == {} + assert annotations == { + filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME: "test-pipeline" + } def test_set_annotation_rejects_system_key(self, session_factory, service): run = _create_run( @@ -970,3 +1063,60 @@ def test_filter_query_created_by_unsupported_predicate( session=session, filter_query=fq, ) + + +class TestGetPipelineNameFromTaskSpec: + """Unit tests for _get_pipeline_name_from_task_spec.""" + + def test_returns_name(self): + """Happy path: task_spec_dict -> TaskSpec -> component_ref -> spec -> name""" + task = _make_task_spec(pipeline_name="my-pipe") + result = api_server_sql._get_pipeline_name_from_task_spec( + task_spec_dict=task.to_json_dict() + ) + assert result == "my-pipe" + + def test_returns_none_when_spec_is_none(self): + """task_spec_dict -> TaskSpec -> component_ref -> [spec=None]""" + result = api_server_sql._get_pipeline_name_from_task_spec( + task_spec_dict={"component_ref": {}}, + ) + assert result is None + + def test_returns_none_when_name_is_none(self): + """task_spec_dict -> ... -> spec -> [name=None]""" + result = api_server_sql._get_pipeline_name_from_task_spec( + task_spec_dict={ + "component_ref": { + "spec": { + "implementation": { + "container": {"image": "img"}, + } + } + } + }, + ) + assert result is None + + def test_returns_none_when_name_is_empty(self): + """task_spec_dict -> ... -> spec -> [name=""]""" + result = api_server_sql._get_pipeline_name_from_task_spec( + task_spec_dict={ + "component_ref": { + "spec": { + "name": "", + "implementation": { + "container": {"image": "img"}, + }, + } + } + }, + ) + assert result is None + + def test_returns_none_on_malformed_dict(self): + """[task_spec_dict=malformed] -> from_json_dict() raises""" + result = api_server_sql._get_pipeline_name_from_task_spec( + task_spec_dict={"bad": "data"} + ) + assert result is None diff --git a/tests/test_database_ops.py b/tests/test_database_ops.py index 6183519..d150540 100644 --- a/tests/test_database_ops.py +++ b/tests/test_database_ops.py @@ -1,8 +1,27 @@ -from typing import Any +"""Tests for database_ops: backfill and annotation helpers. + +Pipeline Name Resolution Path +============================== + +Phase 1 (bulk SQL -- extra_data path): + pipeline_run.extra_data -> ["pipeline_name"] -> value + | | | + +-- None +-- key missing +-- "" + v v v + SQL NULL (safe) SQL NULL (safe) filtered by != "" + +Phase 2 (bulk SQL -- component_spec JSON path): + execution_node.task_spec -> 'componentRef' -> 'spec' ->> 'name' + | | | | + +-- NULL +-- key missing +-- null +-- null + v v v v + SQL NULL (safe) SQL NULL (safe) SQL NULL SQL NULL +""" import pytest import sqlalchemy from sqlalchemy import orm +from typing import Any from cloud_pipelines_backend import api_server_sql from cloud_pipelines_backend import backend_types_sql as bts @@ -27,6 +46,42 @@ def _make_task_spec( ) +def _set_execution_node_task_spec( + *, + session_factory: orm.sessionmaker, + run_id: str, + task_spec: structures.TaskSpec, +) -> None: + """Replace the execution_node's task_spec JSON with the given TaskSpec. + + Use to test Phase 2 fallback paths where spec is None or name is None, + since the service's create() requires a valid spec to run. + """ + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run_id) + exec_node = session.get(bts.ExecutionNode, db_run.root_execution_id) + exec_node.task_spec = task_spec.to_json_dict() + session.commit() + + +def _set_execution_node_task_spec_raw( + *, + session_factory: orm.sessionmaker, + run_id: str, + task_spec_dict: dict[str, Any] | None, +) -> None: + """Set task_spec to an arbitrary dict (or None) bypassing Pydantic. + + Use to test JSON paths that Pydantic's TaskSpec cannot represent + (e.g. empty dict, missing componentRef, null task_spec). + """ + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run_id) + exec_node = session.get(bts.ExecutionNode, db_run.root_execution_id) + exec_node.task_spec = task_spec_dict + session.commit() + + @pytest.fixture() def session_factory() -> orm.sessionmaker: db_engine = database_ops.create_db_engine(database_uri="sqlite://") @@ -44,6 +99,23 @@ def _create_run( return service.create(session, **kwargs) +def _delete_annotation( + *, + session_factory: orm.sessionmaker, + run_id: str, + key: filter_query_sql.PipelineRunAnnotationSystemKey, +) -> None: + """Remove a write-path annotation so backfill can be tested in isolation.""" + with session_factory() as session: + session.execute( + sqlalchemy.delete(bts.PipelineRunAnnotation).where( + bts.PipelineRunAnnotation.pipeline_run_id == run_id, + bts.PipelineRunAnnotation.key == key, + ) + ) + session.commit() + + def _get_index_names( *, engine: sqlalchemy.Engine, @@ -206,186 +278,639 @@ def test_matches_exact_key( is False ) - def test_true_after_backfill( - self, - session_factory: orm.sessionmaker, - ) -> None: - """Create a run, delete its write-path annotation, then backfill.""" + +class TestPipelineNameBackfill: + """Integration tests for pipeline name backfill (Phase 1 + Phase 2).""" + + # --- Orchestration tests (full backfill flow) --- + + def test_backfill_populates_name(self, session_factory): + """P1 happy path: extra_data -> ["pipeline_name"] -> [value] OK""" service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="my-pipeline"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run.id) + engine = session_factory.kw["bind"] - key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + database_ops._backfill_pipeline_run_name_annotations(db_engine=engine) - run = _create_run( + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "my-pipeline" + + def test_backfill_populates_name_via_both_phases(self, session_factory): + """Orchestrator resolves run_a via P1 (extra_data) and run_b via P2 (component_spec).""" + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + run_a = _create_run( session_factory, service, - root_task=_make_task_spec(), - created_by="bob", + root_task=_make_task_spec(pipeline_name="from-extra-data"), + ) + _delete_annotation(session_factory=session_factory, run_id=run_a.id, key=key) + + run_b = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="from-component-spec"), ) + _delete_annotation(session_factory=session_factory, run_id=run_b.id, key=key) with session_factory() as session: - session.query(bts.PipelineRunAnnotation).filter_by( - pipeline_run_id=run.id, - key=key, - ).delete() + db_run_b = session.get(bts.PipelineRun, run_b.id) + db_run_b.extra_data = None session.commit() + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_a.id) + assert key not in service.list_annotations(session=session, id=run_b.id) + + engine = session_factory.kw["bind"] + database_ops._backfill_pipeline_run_name_annotations(db_engine=engine) + with session_factory() as session: assert ( - database_ops._is_pipeline_run_annotation_key_already_backfilled( - session=session, - key=key, + service.list_annotations(session=session, id=run_a.id)[key] + == "from-extra-data" + ) + assert ( + service.list_annotations(session=session, id=run_b.id)[key] + == "from-component-spec" + ) + + def test_backfill_phase2_skips_phase1_insert_within_transaction( + self, session_factory + ): + """Phase 2's anti-join sees Phase 1's insert within the same transaction. + + A run that both phases could match (has extra_data AND component_spec + name). Phase 1 inserts the annotation; Phase 2 must skip it via the + anti-join, not insert a duplicate. Proves execute() writes are visible + within the shared transaction buffer. + """ + service = api_server_sql.PipelineRunsApiService_Sql() + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="shared-name"), + ) + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run.id) + + engine = session_factory.kw["bind"] + database_ops._backfill_pipeline_run_name_annotations(db_engine=engine) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "shared-name" + + # Verify exactly one annotation row — no duplicate from Phase 2. + with session_factory() as session: + count = ( + session.query(bts.PipelineRunAnnotation) + .filter( + bts.PipelineRunAnnotation.pipeline_run_id == run.id, + bts.PipelineRunAnnotation.key == key, ) - is False + .count() ) - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) + assert count == 1 + + def test_backfill_skips_when_key_already_exists(self, session_factory): + """Once any NAME annotation exists, subsequent backfill calls are no-ops.""" + service = api_server_sql.PipelineRunsApiService_Sql() + engine = session_factory.kw["bind"] + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + + run_a = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="pipeline-a"), + ) + _delete_annotation(session_factory=session_factory, run_id=run_a.id, key=key) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_a.id) + + database_ops._backfill_pipeline_run_name_annotations(db_engine=engine) + with session_factory() as session: assert ( - database_ops._is_pipeline_run_annotation_key_already_backfilled( - session=session, - key=key, - ) - is True + service.list_annotations(session=session, id=run_a.id)[key] + == "pipeline-a" ) + run_b = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="pipeline-b"), + ) + _delete_annotation(session_factory=session_factory, run_id=run_b.id, key=key) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_b.id) + + # Backfill is a no-op because run_a's annotation already exists + database_ops._backfill_pipeline_run_name_annotations(db_engine=engine) -class TestCreatedByBackfill: - def test_backfill_populates_annotation_value( + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_b.id) + + # --- Phase 1 tests (_backfill_pipeline_names_from_extra_data) --- + + def test_backfill_phase1_skips_none_extra_data(self, session_factory): + """P1 null point: [extra_data=None] FAIL""" + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run.id) + + with session_factory() as session: + database_ops._backfill_pipeline_names_from_extra_data(session=session) + session.commit() + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_backfill_phase1_skips_missing_key(self, session_factory): + """P1 null point: extra_data -> [key missing] FAIL""" + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = {} + session.commit() + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run.id) + + with session_factory() as session: + database_ops._backfill_pipeline_names_from_extra_data(session=session) + session.commit() + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_backfill_phase1_inserts_empty_name(self, session_factory): + """P1 valid: extra_data -> ["pipeline_name"] -> [""] + Passes: empty string is a valid name, annotation inserted with value="".""" + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = {"pipeline_name": ""} + session.commit() + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run.id) + + with session_factory() as session: + database_ops._backfill_pipeline_names_from_extra_data(session=session) + session.commit() + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "" + + # --- Phase 2 tests (_backfill_pipeline_names_from_component_spec) --- + # Ordered by JSON traversal depth (0 -> 4). + # Path: ExecutionNode row -> task_spec -> componentRef -> spec -> name + + def test_p2_depth0_execution_node_missing( self, session_factory: orm.sessionmaker, ) -> None: - """The INSERT path produces the correct annotation value.""" + """Path: [ExecutionNode row missing] -> task_spec -> componentRef -> spec -> name + Fails at: INNER JOIN finds no execution_node, row excluded.""" service = api_server_sql.PipelineRunsApiService_Sql() - engine = session_factory.kw["bind"] run = _create_run( session_factory, service, - root_task=_make_task_spec(), - created_by="alice", + root_task=_make_task_spec(pipeline_name="some-name"), ) - key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - # Remove write-path annotation so the backfill INSERT actually runs with session_factory() as session: - session.query(bts.PipelineRunAnnotation).filter_by( - pipeline_run_id=run.id, - key=key, - ).delete() + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + exec_node = session.get(bts.ExecutionNode, db_run.root_execution_id) + session.delete(exec_node) session.commit() with session_factory() as session: assert key not in service.list_annotations(session=session, id=run.id) - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) + with session_factory() as session: + database_ops._backfill_pipeline_names_from_component_spec(session=session) + session.commit() with session_factory() as session: annotations = service.list_annotations(session=session, id=run.id) - assert annotations[key] == "alice" + assert key not in annotations - def test_backfill_skips_empty_created_by( + def test_p2_depth1_task_spec_null( self, session_factory: orm.sessionmaker, ) -> None: - """Runs with created_by='' are not backfilled (isnot(None) passes but empty string has no value).""" + """Path: task_spec=[NULL] -> componentRef -> spec -> name + Fails at: task_spec column is NULL, JSON extraction returns NULL.""" service = api_server_sql.PipelineRunsApiService_Sql() - engine = session_factory.kw["bind"] - key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - # Create a run then set created_by to empty string directly in DB - run = _create_run(session_factory, service, root_task=_make_task_spec()) with session_factory() as session: db_run = session.get(bts.PipelineRun, run.id) - db_run.created_by = "" + db_run.extra_data = None session.commit() + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run.id, + task_spec_dict=None, + ) with session_factory() as session: - assert key not in service.list_annotations(session=session, id=run.id) - - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) + database_ops._backfill_pipeline_names_from_component_spec(session=session) + session.commit() with session_factory() as session: annotations = service.list_annotations(session=session, id=run.id) assert key not in annotations - def test_backfill_idempotent( + def test_p2_depth1_task_spec_empty( self, session_factory: orm.sessionmaker, ) -> None: + """Path: task_spec={} -> [componentRef missing] -> spec -> name + Fails at: task_spec is empty dict, 'componentRef' key absent.""" service = api_server_sql.PipelineRunsApiService_Sql() - engine = session_factory.kw["bind"] + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run.id, + task_spec_dict={}, + ) + + with session_factory() as session: + database_ops._backfill_pipeline_names_from_component_spec(session=session) + session.commit() + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_p2_depth2_component_ref_missing( + self, + session_factory: orm.sessionmaker, + ) -> None: + """Path: task_spec -> [componentRef missing] -> spec -> name + Fails at: 'componentRef' key absent from task_spec.""" + service = api_server_sql.PipelineRunsApiService_Sql() run = _create_run( session_factory, service, - root_task=_make_task_spec(), - created_by="alice", + root_task=_make_task_spec(pipeline_name="some-name"), ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run.id, + task_spec_dict={"other_key": "value"}, + ) + + with session_factory() as session: + database_ops._backfill_pipeline_names_from_component_spec(session=session) + session.commit() with session_factory() as session: annotations = service.list_annotations(session=session, id=run.id) - assert ( - annotations[filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY] - == "alice" + assert key not in annotations + + def test_p2_depth2_component_ref_null( + self, + session_factory: orm.sessionmaker, + ) -> None: + """Path: task_spec -> [componentRef=null] -> spec -> name + Fails at: componentRef is null, JSON extraction returns NULL.""" + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="some-name"), ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - def test_backfill_skips_null_created_by( + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run.id, + task_spec_dict={"componentRef": None}, + ) + + with session_factory() as session: + database_ops._backfill_pipeline_names_from_component_spec(session=session) + session.commit() + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_p2_depth3_spec_null( self, session_factory: orm.sessionmaker, ) -> None: + """Path: task_spec -> componentRef -> [spec=null] -> name + Fails at: spec is null, JSON extraction returns NULL.""" service = api_server_sql.PipelineRunsApiService_Sql() - engine = session_factory.kw["bind"] + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - run = _create_run(session_factory, service, root_task=_make_task_spec()) - assert run.created_by is None + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec( + session_factory=session_factory, + run_id=run.id, + task_spec=structures.TaskSpec( + component_ref=structures.ComponentReference( + name="placeholder", + spec=None, + ) + ), + ) - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run.id) + + with session_factory() as session: + database_ops._backfill_pipeline_names_from_component_spec(session=session) + session.commit() with session_factory() as session: annotations = service.list_annotations(session=session, id=run.id) - assert ( - filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY - not in annotations + assert key not in annotations + + def test_p2_depth4_name_null( + self, + session_factory: orm.sessionmaker, + ) -> None: + """Path: task_spec -> componentRef -> spec -> [name=null] + Fails at: name is null, JSON extraction returns NULL.""" + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="some-name"), ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - def test_backfill_mixed_runs_and_repeated_backfills( + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec( + session_factory=session_factory, + run_id=run.id, + task_spec=structures.TaskSpec( + component_ref=structures.ComponentReference( + spec=structures.ComponentSpec( + name=None, + ) + ) + ), + ) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run.id) + + with session_factory() as session: + database_ops._backfill_pipeline_names_from_component_spec(session=session) + session.commit() + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_p2_depth4_name_empty_string( self, session_factory: orm.sessionmaker, ) -> None: - """Simulates a realistic sequence: create runs, backfill, create more runs, backfill again. - Verifies all annotations are correct and no duplicates are created.""" + """Path: task_spec -> componentRef -> spec -> [name=""] + Passes: empty string is a valid name, annotation inserted with value="".""" service = api_server_sql.PipelineRunsApiService_Sql() - engine = session_factory.kw["bind"] - key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + run = _create_run( + session_factory, + service, + root_task=_make_task_spec(pipeline_name="some-name"), + ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) - run_alice = _create_run( + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + _set_execution_node_task_spec_raw( + session_factory=session_factory, + run_id=run.id, + task_spec_dict={ + "componentRef": {"spec": {"name": ""}}, + }, + ) + + with session_factory() as session: + database_ops._backfill_pipeline_names_from_component_spec(session=session) + session.commit() + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "" + + def test_p2_depth4_name_valid( + self, + session_factory: orm.sessionmaker, + ) -> None: + """Path: task_spec -> componentRef -> spec -> [name="fallback-name"] + Passes: valid name extracted, annotation inserted.""" + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( session_factory, service, - root_task=_make_task_spec(), - created_by="alice", + root_task=_make_task_spec(pipeline_name="fallback-name"), ) - run_no_user = _create_run( + key = filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + + with session_factory() as session: + db_run = session.get(bts.PipelineRun, run.id) + db_run.extra_data = None + session.commit() + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run.id) + + with session_factory() as session: + database_ops._backfill_pipeline_names_from_component_spec(session=session) + session.commit() + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "fallback-name" + + +class TestCreatedByBackfill: + def test_backfill_populates_created_by( + self, + session_factory: orm.sessionmaker, + ) -> None: + """The INSERT path produces the correct annotation value.""" + service = api_server_sql.PipelineRunsApiService_Sql() + engine = session_factory.kw["bind"] + run = _create_run( session_factory, service, root_task=_make_task_spec(), + created_by="alice", ) + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run.id) database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) - run_bob = _create_run( + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert annotations[key] == "alice" + + def test_backfill_skips_null_created_by(self, session_factory): + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run(session_factory, service, root_task=_make_task_spec()) + engine = session_factory.kw["bind"] + + database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert ( + filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + not in annotations + ) + + def test_backfill_skips_empty_created_by(self, session_factory): + """Runs with created_by='' are not backfilled.""" + service = api_server_sql.PipelineRunsApiService_Sql() + run = _create_run( session_factory, service, root_task=_make_task_spec(), - created_by="bob", + created_by="", ) - run_alice2 = _create_run( + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + _delete_annotation(session_factory=session_factory, run_id=run.id, key=key) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run.id) + + engine = session_factory.kw["bind"] + database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) + + with session_factory() as session: + annotations = service.list_annotations(session=session, id=run.id) + assert key not in annotations + + def test_backfill_skips_when_key_already_exists(self, session_factory): + """Once any CREATED_BY annotation exists, subsequent backfill calls are no-ops.""" + service = api_server_sql.PipelineRunsApiService_Sql() + engine = session_factory.kw["bind"] + key = filter_query_sql.PipelineRunAnnotationSystemKey.CREATED_BY + + run_alice = _create_run( session_factory, service, root_task=_make_task_spec(), created_by="alice", ) + _delete_annotation( + session_factory=session_factory, run_id=run_alice.id, key=key + ) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_alice.id) - database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) with session_factory() as session: @@ -393,16 +918,23 @@ def test_backfill_mixed_runs_and_repeated_backfills( service.list_annotations(session=session, id=run_alice.id)[key] == "alice" ) - assert key not in service.list_annotations( - session=session, id=run_no_user.id - ) - assert ( - service.list_annotations(session=session, id=run_bob.id)[key] == "bob" - ) - assert ( - service.list_annotations(session=session, id=run_alice2.id)[key] - == "alice" - ) + + run_bob = _create_run( + session_factory, + service, + root_task=_make_task_spec(), + created_by="bob", + ) + _delete_annotation(session_factory=session_factory, run_id=run_bob.id, key=key) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_bob.id) + + # Backfill is a no-op because run_alice's annotation already exists + database_ops._backfill_pipeline_run_created_by_annotations(db_engine=engine) + + with session_factory() as session: + assert key not in service.list_annotations(session=session, id=run_bob.id) def test_backfill_uses_single_session( self, diff --git a/tests/test_filter_query_sql.py b/tests/test_filter_query_sql.py index 9c260e2..a881eed 100644 --- a/tests/test_filter_query_sql.py +++ b/tests/test_filter_query_sql.py @@ -535,3 +535,38 @@ def test_validate_and_resolve_predicate(self): current_user="charlie@example.com", ) assert resolved.value_equals.value == "charlie@example.com" + + def test_check_predicate_allowed_name_key_exists(self): + pred = filter_query_models.KeyExistsPredicate( + key_exists=filter_query_models.KeyExists( + key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME + ) + ) + filter_query_sql._check_predicate_allowed(predicate=pred) + + def test_check_predicate_allowed_name_value_equals(self): + pred = filter_query_models.ValueEqualsPredicate( + value_equals=filter_query_models.ValueEquals( + key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, + value="my-pipeline", + ) + ) + filter_query_sql._check_predicate_allowed(predicate=pred) + + def test_check_predicate_allowed_name_value_contains(self): + pred = filter_query_models.ValueContainsPredicate( + value_contains=filter_query_models.ValueContains( + key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, + value_substring="nightly", + ) + ) + filter_query_sql._check_predicate_allowed(predicate=pred) + + def test_check_predicate_allowed_name_value_in(self): + pred = filter_query_models.ValueInPredicate( + value_in=filter_query_models.ValueIn( + key=filter_query_sql.PipelineRunAnnotationSystemKey.PIPELINE_NAME, + values=["a", "b"], + ) + ) + filter_query_sql._check_predicate_allowed(predicate=pred)