diff --git a/jetstream/analysis.py b/jetstream/analysis.py index aa432934..f343c878 100644 --- a/jetstream/analysis.py +++ b/jetstream/analysis.py @@ -885,10 +885,15 @@ def _create_subset_metric_table_query_covariate( covariate_period.value, 1, analysis_basis=AnalysisBasis.ENROLLMENTS, metric=metric_name ) - if not self.bigquery.table_exists(covariate_table_name): + if not self.bigquery.column_exists_in_table(covariate_table_name, covariate_metric_name): normalized_slug = bq_normalize_name(self.config.experiment.normandy_slug) + log_msg = ( + f"Covariate adjustment table {covariate_table_name} does not exist " + f"(or `{covariate_metric_name}` not found in table), " + f"falling back to unadjusted inferences" + ) logger.warning( - f"Covariate adjustment table {covariate_table_name} does not exist, falling back to unadjusted inferences", # noqa:E501 + log_msg, extra={ "experiment": normalized_slug, "metric": metric.name, diff --git a/jetstream/bigquery_client.py b/jetstream/bigquery_client.py index 996e4522..82f4530d 100644 --- a/jetstream/bigquery_client.py +++ b/jetstream/bigquery_client.py @@ -81,6 +81,14 @@ def _current_timestamp_label(self) -> str: """Returns the current UTC timestamp as a valid BigQuery label.""" return str(int(time.time())) + def column_exists_in_table(self, table_name: str, col_name: str) -> bool: + try: + table = self.client.get_table(f"{self.project}.{self.dataset}.{table_name}") + columns = [schema.name for schema in table.schema] + return col_name in columns + except NotFound: + return False + def table_exists(self, table_name: str) -> bool: try: self.client.get_table(f"{self.project}.{self.dataset}.{table_name}") diff --git a/jetstream/tests/integration/test_bigquery_client.py b/jetstream/tests/integration/test_bigquery_client.py index c9268c82..db92cc10 100644 --- a/jetstream/tests/integration/test_bigquery_client.py +++ b/jetstream/tests/integration/test_bigquery_client.py @@ -64,6 +64,18 @@ def test_table_exists(self, client, temporary_dataset): client.client.create_table(f"{temporary_dataset}.dummy_table") assert client.table_exists("dummy_table") is True + def test_column_exists_in_table(self, client, temporary_dataset): + table = client.client.create_table(f"{temporary_dataset}.dummy_table") + assert client.column_exists_in_table("dummy_table", "dummy_column") is False + original_schema = table.schema + new_schema = original_schema[:] + new_schema.append(bigquery.SchemaField("dummy_column", "STRING")) + + table.schema = new_schema + table = client.client.update_table(table, ["schema"]) + + assert client.column_exists_in_table("dummy_table", "dummy_column") is True + def test_touch_tables(self, client, temporary_dataset): for table, experiment in { "enrollments_test_experiment": "test-experiment", diff --git a/jetstream/tests/test_analysis.py b/jetstream/tests/test_analysis.py index ca553cb0..0370e7d4 100644 --- a/jetstream/tests/test_analysis.py +++ b/jetstream/tests/test_analysis.py @@ -347,7 +347,7 @@ def test_create_subset_metric_table_query_covariate_basic(randomization_unit, mo "jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre") ) monkeypatch.setattr( - "jetstream.bigquery_client.BigQueryClient.table_exists", + "jetstream.bigquery_client.BigQueryClient.column_exists_in_table", MagicMock(return_value=True), ) @@ -416,7 +416,7 @@ def test_create_subset_metric_table_query_covariate_missing_table_fallback( "jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre") ) monkeypatch.setattr( - "jetstream.bigquery_client.BigQueryClient.table_exists", + "jetstream.bigquery_client.BigQueryClient.column_exists_in_table", MagicMock(return_value=False), ) @@ -450,8 +450,9 @@ def test_create_subset_metric_table_query_covariate_missing_table_fallback( # test that logging message was generated assert ( - "Covariate adjustment table table_pre does not exist, falling back to unadjusted inferences" - in caplog.text + "Covariate adjustment table table_pre does not exist " + "(or `metric_name` not found in table), " + "falling back to unadjusted inferences" in caplog.text ) @@ -490,7 +491,7 @@ def test_create_subset_metric_table_query_covariate_segment(randomization_unit, "jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre") ) monkeypatch.setattr( - "jetstream.bigquery_client.BigQueryClient.table_exists", + "jetstream.bigquery_client.BigQueryClient.column_exists_in_table", MagicMock(return_value=True), ) @@ -587,7 +588,7 @@ def test_create_subset_metric_table_query_covariate_exposures(randomization_unit "jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre") ) monkeypatch.setattr( - "jetstream.bigquery_client.BigQueryClient.table_exists", + "jetstream.bigquery_client.BigQueryClient.column_exists_in_table", MagicMock(return_value=True), ) @@ -781,7 +782,7 @@ def test_create_subset_metric_table_query_covariate_unsupported_analysis_basis( experiments, monkeypatch ): monkeypatch.setattr( - "jetstream.bigquery_client.BigQueryClient.table_exists", + "jetstream.bigquery_client.BigQueryClient.column_exists_in_table", MagicMock(return_value=True), ) metric = Metric( @@ -843,7 +844,7 @@ def test_create_subset_metric_table_query_use_covariate_explicit_metric( "jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre") ) monkeypatch.setattr( - "jetstream.bigquery_client.BigQueryClient.table_exists", + "jetstream.bigquery_client.BigQueryClient.column_exists_in_table", MagicMock(return_value=True), ) @@ -917,7 +918,7 @@ def test_create_subset_metric_table_query_use_covariate_implicit_metric( "jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre") ) monkeypatch.setattr( - "jetstream.bigquery_client.BigQueryClient.table_exists", + "jetstream.bigquery_client.BigQueryClient.column_exists_in_table", MagicMock(return_value=True), ) @@ -1013,7 +1014,7 @@ def test_create_subset_metric_table_query_complete_covariate(randomization_unit, "jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre") ) monkeypatch.setattr( - "jetstream.bigquery_client.BigQueryClient.table_exists", + "jetstream.bigquery_client.BigQueryClient.column_exists_in_table", MagicMock(return_value=True), ) @@ -1088,7 +1089,7 @@ def test_create_subset_metric_table_query_covariate_fallback(randomization_unit, "jetstream.analysis.Analysis._table_name", MagicMock(return_value="table_pre") ) monkeypatch.setattr( - "jetstream.bigquery_client.BigQueryClient.table_exists", + "jetstream.bigquery_client.BigQueryClient.column_exists_in_table", MagicMock(return_value=True), )