Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions etl-destinations/src/bigquery/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ use std::time::Duration;

use crate::bigquery::BigQueryDestination;
use crate::bigquery::table_name_to_bigquery_table_id;
use crate::retry::{RetryDecision, RetryPolicy, retry_with_backoff};
use etl::store::schema::SchemaStore;
use etl::store::state::StateStore;
use etl::types::{PipelineId, TableName};
use gcp_bigquery_client::Client;
use gcp_bigquery_client::client_builder::ClientBuilder;
use gcp_bigquery_client::error::BQError;
use gcp_bigquery_client::model::dataset::Dataset;
use gcp_bigquery_client::model::query_request::QueryRequest;
use gcp_bigquery_client::model::table_row::TableRow;
Expand All @@ -24,6 +26,22 @@ const BIGQUERY_QUERY_MAX_ATTEMPTS: u32 = 30;
/// Delay in milliseconds between verification attempts when querying BigQuery.
const BIGQUERY_QUERY_RETRY_DELAY_MS: u64 = 500;

/// Retry policy for BigQuery test setup operations (client creation, dataset creation).
const SETUP_RETRY_POLICY: RetryPolicy = RetryPolicy {
max_retries: 4,
initial_delay: Duration::from_secs(1),
max_delay: Duration::from_secs(4),
};

/// Returns whether a `BQError` is transient and worth retrying.
fn is_transient_bq_error(err: &BQError) -> RetryDecision {
match err {
BQError::RequestError(_) => RetryDecision::Retry,
BQError::ResponseError { error } if error.error.code >= 500 => RetryDecision::Retry,
_ => RetryDecision::Stop,
}
}

/// Environment variable name for the BigQuery project ID.
pub const BIGQUERY_PROJECT_ID_ENV: &str = "TESTS_BIGQUERY_PROJECT_ID";

Expand Down Expand Up @@ -117,14 +135,30 @@ impl BigQueryDatabase {
}
}

/// Creates the dataset in BigQuery.
/// Creates the dataset in BigQuery, retrying on transient errors.
pub async fn create_dataset(&self) {
let dataset = Dataset::new(&self.project_id, &self.dataset_id);
self.client
.dataset()
.create(dataset)
.await
.expect("Failed to create dataset");
retry_with_backoff(
SETUP_RETRY_POLICY,
is_transient_bq_error,
|delay| delay,
|attempt| {
eprintln!(
"bigquery dataset creation failed (attempt {}/{}): {}",
attempt.retry_index, attempt.max_retries, attempt.error,
);
},
|| async {
let dataset = Dataset::new(&self.project_id, &self.dataset_id);
self.client.dataset().create(dataset).await.map(|_| ())
},
)
.await
.unwrap_or_else(|failure| {
panic!(
"Failed to create BigQuery dataset after {} attempts: {}",
failure.total_attempts, failure.last_error,
)
});
}

/// Drops the dataset and all its contents.
Expand Down
Loading