diff --git a/tests/test_python_patterns.py b/tests/test_python_patterns.py index 329813e..f86416a 100644 --- a/tests/test_python_patterns.py +++ b/tests/test_python_patterns.py @@ -7,6 +7,7 @@ These tests run without a live database — all external calls are mocked. """ import ast +import glob import inspect import re import textwrap @@ -120,3 +121,64 @@ def test_tracks_failures(self): "rollup_refresh.py should track failures" assert "failures.append" in source, \ "rollup_refresh.py should append to failures list" + + +# ───────────────────────────────────────────────────────────────────────────── +# Schema-drift guard +# ───────────────────────────────────────────────────────────────────────────── + +WORKFLOW_GLOB = "databricks/workflows/*.py" +SQL_GLOB = "sql/*.sql" + +# `lakets.` references in Python (registries, functions used in SQL). +_LAKETS_REF = re.compile(r"\blakets\.([a-zA-Z_][a-zA-Z0-9_]*)") +# Logger names like getLogger("lakets.partition_manager") are not DB objects. +_GETLOGGER = re.compile(r"getLogger\([^)]*\)") +# `CREATE [OR REPLACE] TABLE|VIEW|FUNCTION|... [IF NOT EXISTS] lakets.`. +_DEFINITION = re.compile( + r"CREATE\s+(?:OR\s+REPLACE\s+)?" + r"(?:TABLE|VIEW|MATERIALIZED\s+VIEW|FUNCTION|AGGREGATE|PROCEDURE)\s+" + r"(?:IF\s+NOT\s+EXISTS\s+)?lakets\.([a-zA-Z_][a-zA-Z0-9_]*)", + re.IGNORECASE, +) + + +def _defined_lakets_objects() -> set: + """All lakets.* tables/views/functions/aggregates defined across sql/*.sql.""" + defined = set() + for path in glob.glob(SQL_GLOB): + with open(path) as f: + defined.update(_DEFINITION.findall(f.read())) + return defined + + +def _referenced_lakets_objects(source: str) -> set: + """lakets.* objects referenced in a workflow file, excluding logger names.""" + return set(_LAKETS_REF.findall(_GETLOGGER.sub("", source))) + + +class TestSchemaReferences: + """Every ``lakets.`` the workflow jobs reference must exist in the + SQL modules. Guards against schema drift like the hypertable->chronotable + rename that left ``lakets._hypertable_registry`` references behind in + partition_manager.py / retention_job.py and only failed at runtime.""" + + def test_sql_definitions_parse(self): + """Sanity: the SQL modules yield a non-empty set of lakets objects.""" + defined = _defined_lakets_objects() + assert "_chronotable_registry" in defined, \ + "expected core registries in sql/*.sql — check SQL_GLOB / regex" + + def test_workflow_lakets_references_are_defined(self): + """No workflow file may reference a lakets object absent from sql/*.sql.""" + defined = _defined_lakets_objects() + problems = [] + for path in sorted(glob.glob(WORKFLOW_GLOB)): + refs = _referenced_lakets_objects(_read_source(path)) + missing = sorted(r for r in refs if r not in defined) + if missing: + problems.append(f"{path.split('/')[-1]}: {missing}") + assert not problems, ( + "Workflow jobs reference lakets objects not defined in sql/*.sql " + "(schema drift):\n " + "\n ".join(problems) + )