Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions tests/test_python_patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
These tests run without a live database — all external calls are mocked.
"""
import ast
import glob
import inspect
import re
import textwrap
Expand Down Expand Up @@ -120,3 +121,64 @@ def test_tracks_failures(self):
"rollup_refresh.py should track failures"
assert "failures.append" in source, \
"rollup_refresh.py should append to failures list"


# ─────────────────────────────────────────────────────────────────────────────
# Schema-drift guard
# ─────────────────────────────────────────────────────────────────────────────

WORKFLOW_GLOB = "databricks/workflows/*.py"
SQL_GLOB = "sql/*.sql"

# `lakets.<object>` references in Python (registries, functions used in SQL).
_LAKETS_REF = re.compile(r"\blakets\.([a-zA-Z_][a-zA-Z0-9_]*)")
# Logger names like getLogger("lakets.partition_manager") are not DB objects.
_GETLOGGER = re.compile(r"getLogger\([^)]*\)")
# `CREATE [OR REPLACE] TABLE|VIEW|FUNCTION|... [IF NOT EXISTS] lakets.<object>`.
_DEFINITION = re.compile(
r"CREATE\s+(?:OR\s+REPLACE\s+)?"
r"(?:TABLE|VIEW|MATERIALIZED\s+VIEW|FUNCTION|AGGREGATE|PROCEDURE)\s+"
r"(?:IF\s+NOT\s+EXISTS\s+)?lakets\.([a-zA-Z_][a-zA-Z0-9_]*)",
re.IGNORECASE,
)


def _defined_lakets_objects() -> set:
"""All lakets.* tables/views/functions/aggregates defined across sql/*.sql."""
defined = set()
for path in glob.glob(SQL_GLOB):
with open(path) as f:
defined.update(_DEFINITION.findall(f.read()))
return defined


def _referenced_lakets_objects(source: str) -> set:
"""lakets.* objects referenced in a workflow file, excluding logger names."""
return set(_LAKETS_REF.findall(_GETLOGGER.sub("", source)))


class TestSchemaReferences:
"""Every ``lakets.<object>`` the workflow jobs reference must exist in the
SQL modules. Guards against schema drift like the hypertable->chronotable
rename that left ``lakets._hypertable_registry`` references behind in
partition_manager.py / retention_job.py and only failed at runtime."""

def test_sql_definitions_parse(self):
"""Sanity: the SQL modules yield a non-empty set of lakets objects."""
defined = _defined_lakets_objects()
assert "_chronotable_registry" in defined, \
"expected core registries in sql/*.sql — check SQL_GLOB / regex"

def test_workflow_lakets_references_are_defined(self):
"""No workflow file may reference a lakets object absent from sql/*.sql."""
defined = _defined_lakets_objects()
problems = []
for path in sorted(glob.glob(WORKFLOW_GLOB)):
refs = _referenced_lakets_objects(_read_source(path))
missing = sorted(r for r in refs if r not in defined)
if missing:
problems.append(f"{path.split('/')[-1]}: {missing}")
assert not problems, (
"Workflow jobs reference lakets objects not defined in sql/*.sql "
"(schema drift):\n " + "\n ".join(problems)
)