Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions jetstream/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,10 +885,15 @@ def _create_subset_metric_table_query_covariate(
covariate_period.value, 1, analysis_basis=AnalysisBasis.ENROLLMENTS, metric=metric_name
)

if not self.bigquery.table_exists(covariate_table_name):
if not self.bigquery.column_exists_in_table(covariate_table_name, covariate_metric_name):
normalized_slug = bq_normalize_name(self.config.experiment.normandy_slug)
log_msg = (
f"Covariate adjustment table {covariate_table_name} does not exist "
f"(or `{covariate_metric_name}` not found in table), "
f"falling back to unadjusted inferences"
)
logger.warning(
f"Covariate adjustment table {covariate_table_name} does not exist, falling back to unadjusted inferences", # noqa:E501
log_msg,
extra={
"experiment": normalized_slug,
"metric": metric.name,
Expand Down
8 changes: 8 additions & 0 deletions jetstream/bigquery_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ def _current_timestamp_label(self) -> str:
"""Returns the current UTC timestamp as a valid BigQuery label."""
return str(int(time.time()))

def column_exists_in_table(self, table_name: str, col_name: str) -> bool:
try:
table = self.client.get_table(f"{self.project}.{self.dataset}.{table_name}")
columns = [schema.name for schema in table.schema]
return col_name in columns
except NotFound:
return False

def table_exists(self, table_name: str) -> bool:
try:
self.client.get_table(f"{self.project}.{self.dataset}.{table_name}")
Expand Down
12 changes: 12 additions & 0 deletions jetstream/tests/integration/test_bigquery_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ def test_table_exists(self, client, temporary_dataset):
client.client.create_table(f"{temporary_dataset}.dummy_table")
assert client.table_exists("dummy_table") is True

def test_column_exists_in_table(self, client, temporary_dataset):
table = client.client.create_table(f"{temporary_dataset}.dummy_table")
assert client.column_exists_in_table("dummy_table", "dummy_column") is False
original_schema = table.schema
new_schema = original_schema[:]
new_schema.append(bigquery.SchemaField("dummy_column", "STRING"))

table.schema = new_schema
table = client.client.update_table(table, ["schema"])

assert client.column_exists_in_table("dummy_table", "dummy_column") is True

def test_touch_tables(self, client, temporary_dataset):
for table, experiment in {
"enrollments_test_experiment": "test-experiment",
Expand Down
23 changes: 12 additions & 11 deletions jetstream/tests/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def test_create_subset_metric_table_query_covariate_basic(randomization_unit, mo
"jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre")
)
monkeypatch.setattr(
"jetstream.bigquery_client.BigQueryClient.table_exists",
"jetstream.bigquery_client.BigQueryClient.column_exists_in_table",
MagicMock(return_value=True),
)

Expand Down Expand Up @@ -416,7 +416,7 @@ def test_create_subset_metric_table_query_covariate_missing_table_fallback(
"jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre")
)
monkeypatch.setattr(
"jetstream.bigquery_client.BigQueryClient.table_exists",
"jetstream.bigquery_client.BigQueryClient.column_exists_in_table",
MagicMock(return_value=False),
)

Expand Down Expand Up @@ -450,8 +450,9 @@ def test_create_subset_metric_table_query_covariate_missing_table_fallback(

# test that logging message was generated
assert (
"Covariate adjustment table table_pre does not exist, falling back to unadjusted inferences"
in caplog.text
"Covariate adjustment table table_pre does not exist "
"(or `metric_name` not found in table), "
"falling back to unadjusted inferences" in caplog.text
)


Expand Down Expand Up @@ -490,7 +491,7 @@ def test_create_subset_metric_table_query_covariate_segment(randomization_unit,
"jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre")
)
monkeypatch.setattr(
"jetstream.bigquery_client.BigQueryClient.table_exists",
"jetstream.bigquery_client.BigQueryClient.column_exists_in_table",
MagicMock(return_value=True),
)

Expand Down Expand Up @@ -587,7 +588,7 @@ def test_create_subset_metric_table_query_covariate_exposures(randomization_unit
"jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre")
)
monkeypatch.setattr(
"jetstream.bigquery_client.BigQueryClient.table_exists",
"jetstream.bigquery_client.BigQueryClient.column_exists_in_table",
MagicMock(return_value=True),
)

Expand Down Expand Up @@ -781,7 +782,7 @@ def test_create_subset_metric_table_query_covariate_unsupported_analysis_basis(
experiments, monkeypatch
):
monkeypatch.setattr(
"jetstream.bigquery_client.BigQueryClient.table_exists",
"jetstream.bigquery_client.BigQueryClient.column_exists_in_table",
MagicMock(return_value=True),
)
metric = Metric(
Expand Down Expand Up @@ -843,7 +844,7 @@ def test_create_subset_metric_table_query_use_covariate_explicit_metric(
"jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre")
)
monkeypatch.setattr(
"jetstream.bigquery_client.BigQueryClient.table_exists",
"jetstream.bigquery_client.BigQueryClient.column_exists_in_table",
MagicMock(return_value=True),
)

Expand Down Expand Up @@ -917,7 +918,7 @@ def test_create_subset_metric_table_query_use_covariate_implicit_metric(
"jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre")
)
monkeypatch.setattr(
"jetstream.bigquery_client.BigQueryClient.table_exists",
"jetstream.bigquery_client.BigQueryClient.column_exists_in_table",
MagicMock(return_value=True),
)

Expand Down Expand Up @@ -1013,7 +1014,7 @@ def test_create_subset_metric_table_query_complete_covariate(randomization_unit,
"jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre")
)
monkeypatch.setattr(
"jetstream.bigquery_client.BigQueryClient.table_exists",
"jetstream.bigquery_client.BigQueryClient.column_exists_in_table",
MagicMock(return_value=True),
)

Expand Down Expand Up @@ -1088,7 +1089,7 @@ def test_create_subset_metric_table_query_covariate_fallback(randomization_unit,
"jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre")
)
monkeypatch.setattr(
"jetstream.bigquery_client.BigQueryClient.table_exists",
"jetstream.bigquery_client.BigQueryClient.column_exists_in_table",
MagicMock(return_value=True),
)

Expand Down
Loading