Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ wrappers = [
"grpc-server",
"integration-tests-grpc-mock",
"integration-tests-o11y",
"integration-tests-spanner",
"pubsub-grpc-mock",
"spanner-grpc-mock",
"storage-grpc-mock",
Expand Down
3 changes: 3 additions & 0 deletions tests/spanner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ google-cloud-test-utils = { workspace = true }
prost-types.workspace = true
reqwest = { workspace = true, features = ["json"] }
serde_json = { workspace = true }
spanner-grpc-mock = { path = "../../src/spanner/grpc-mock" }
tokio = { workspace = true, features = ["sync"] }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing.workspace = true

[lints]
Expand Down
99 changes: 90 additions & 9 deletions tests/spanner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

use google_cloud_spanner::client::{KeySet, Mutation, Spanner};
use google_cloud_test_utils::resource_names::LowercaseAlphanumeric;
use spanner_grpc_mock::google::spanner::v1::spanner_server::Spanner as MockSpannerTrait;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::sleep;

const PROJECT_ID: &str = "test-project";
const INSTANCE_ID: &str = "test-instance";
Expand All @@ -40,7 +44,7 @@ pub async fn wait_for_emulator(endpoint: &str) {
static PROVISION_EMULATOR: tokio::sync::OnceCell<()> = tokio::sync::OnceCell::const_new();
static DATABASE_ID: tokio::sync::OnceCell<String> = tokio::sync::OnceCell::const_new();

async fn get_database_id() -> &'static str {
pub async fn get_database_id() -> &'static str {
DATABASE_ID
.get_or_init(|| async {
std::env::var("SPANNER_EMULATOR_TEST_DB")
Expand All @@ -59,16 +63,19 @@ pub async fn provision_emulator(endpoint: &str) {
.await;
}

pub fn get_emulator_rest_endpoint(grpc_endpoint: &str) -> String {
let rest_endpoint = std::env::var("SPANNER_EMULATOR_REST_HOST")
.unwrap_or_else(|_| grpc_endpoint.replace("9010", "9020"));
if rest_endpoint.starts_with("http://") || rest_endpoint.starts_with("https://") {
rest_endpoint
} else {
format!("http://{}", rest_endpoint)
}
}

async fn do_provision_emulator(endpoint: &str) {
// TODO(#4973): Re-write this to use the admin clients once those also support the Emulator.
let rest_endpoint = std::env::var("SPANNER_EMULATOR_REST_HOST")
.unwrap_or_else(|_| endpoint.replace("9010", "9020"));
let rest_endpoint =
if rest_endpoint.starts_with("http://") || rest_endpoint.starts_with("https://") {
rest_endpoint
} else {
format!("http://{}", rest_endpoint)
};
let rest_endpoint = get_emulator_rest_endpoint(endpoint);
let client = reqwest::Client::new();

// Create a test instance and ignore any ALREADY_EXISTS errors.
Expand Down Expand Up @@ -196,3 +203,77 @@ pub async fn create_database_client() -> Option<google_cloud_spanner::client::Da

Some(db_client)
}

/// Updates the database DDL by executing the given statement on the Spanner Emulator.
///
/// This method uses the emulator's REST API directly. It includes a retry loop to handle
/// transient "Schema change operation rejected" errors that can occur in the emulator
/// if multiple schema changes are executed in parallel, or if schema changes are executed
/// in parallel with read/write transactions.
pub async fn update_database_ddl(statement: String) -> anyhow::Result<()> {
let emulator_host = get_emulator_host().expect("SPANNER_EMULATOR_HOST must be set");
let rest_endpoint = get_emulator_rest_endpoint(&emulator_host);
let db_path = format!(
"projects/{}/instances/{}/databases/{}",
PROJECT_ID,
INSTANCE_ID,
get_database_id().await
);
let url = format!("{}/v1/{}/ddl", rest_endpoint, db_path);
let client = reqwest::Client::new();
let payload = serde_json::json!({
"statements": [statement]
});

let mut attempts = 0;
const MAX_ATTEMPTS: u32 = 25;

loop {
attempts += 1;
let res = client.patch(&url).json(&payload).send().await?;

let status = res.status();
let text = res.text().await?;

if status.is_success() {
return Ok(());
}

// Check if the error is the specific one we want to retry.
// Code 9 is FailedPrecondition.
if text.contains("\"code\":9") && text.contains("Schema change operation rejected") {
if attempts >= MAX_ATTEMPTS {
anyhow::bail!(
"Failed to update DDL after {} attempts. Last error: {}",
attempts,
text
);
}
sleep(Duration::from_millis(100)).await;
continue;
}

anyhow::bail!("Failed to update DDL: status={}, body={}", status, text);
}
}

/// A guard that aborts the server task when dropped.
pub struct ServerGuard(JoinHandle<()>);

impl Drop for ServerGuard {
fn drop(&mut self) {
self.0.abort();
}
}

/// Helper to start a mock server and return a drop guard.
pub async fn start_guarded_server<T>(
address: &str,
service: T,
) -> anyhow::Result<(String, ServerGuard)>
where
T: MockSpannerTrait + Send + 'static,
{
let (uri, handle) = spanner_grpc_mock::start(address, service).await?;
Ok((uri, ServerGuard(handle)))
}
1 change: 1 addition & 0 deletions tests/spanner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ pub mod partitioned_dml;
pub mod query;
pub mod read;
pub mod read_write_transaction;
pub mod test_proxy;
pub mod write;
139 changes: 138 additions & 1 deletion tests/spanner/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use google_cloud_spanner::client::{DatabaseClient, Kind, QueryOptions, Statement};
use crate::client::{get_database_id, get_emulator_host};
use crate::test_proxy::{InterceptedSpanner, SpannerInterceptor};
use google_cloud_spanner::client::{DatabaseClient, Kind, QueryOptions, Spanner, Statement};
use google_cloud_test_utils::resource_names::LowercaseAlphanumeric;
use spanner_grpc_mock::google::spanner::v1 as spanner_v1;
use spanner_grpc_mock::google::spanner::v1::spanner_client::SpannerClient;
use std::sync::Arc;
use tokio::sync::Notify;
use tonic::transport::Channel;

pub async fn simple_query(db_client: &DatabaseClient) -> anyhow::Result<()> {
let rot = db_client.single_use().build();
Expand Down Expand Up @@ -409,6 +417,135 @@ fn verify_row_2(row: &google_cloud_spanner::client::Row) {
);
}

/// A test proxy that intercepts and delays `BeginTransaction` requests.
///
/// It notifies `begin_transaction_entered_latch` when a `BeginTransaction` request is received,
/// and blocks the request execution until `latch` is notified. This allows tests to
/// synchronize the order of execution of `BeginTransaction` with other operations.
struct DelayedBeginProxy {
emulator_client: SpannerClient<Channel>,
latch: Arc<Notify>,
begin_transaction_entered_latch: Arc<Notify>,
}

#[tonic::async_trait]
impl SpannerInterceptor for DelayedBeginProxy {
fn emulator_client(&self) -> SpannerClient<Channel> {
self.emulator_client.clone()
}

async fn begin_transaction(
&self,
request: tonic::Request<spanner_v1::BeginTransactionRequest>,
) -> std::result::Result<tonic::Response<spanner_v1::Transaction>, tonic::Status> {
self.begin_transaction_entered_latch.notify_one();
self.latch.notified().await;
self.emulator_client().begin_transaction(request).await
}
}

// This test verifies that the client correctly falls back to `BeginTransaction` when the
// first statement in a transaction fails. It also shows that the statement is retried and
// could (theoretically) succeed during this retry. It achieves this by doing the following:
// 1. It uses a proxy that allows it to intercept the RPCs that are being sent to Spanner.
// 2. It creates a read-only transaction that uses inline-begin-transaction.
// 3. It executes a query that tries to read from a table that does not exist.
// 4. As the first statement in the transaction fails, the client falls back to using
// an explicit BeginTransaction RPC.
// 5. The proxy blocks this BeginTransaction RPC, and in the meantime the test creates
// the missing table.
// 6. The proxy unblocks the BeginTransaction RPC.
// 7. The statement is retried and succeeds. The test never sees the error.
//
// This test might seem like an extreme corner case for a read-only transaction like this.
// However, for read/write transactions, similar types of failures are more likely to occur,
// for example if a transaction tries to insert a row that violates the primary key. Another
// transaction could delete the row in the time between the first attempt failed, and the
// BeginTransaction RPC has been executed.
pub async fn inline_begin_fallback(_db_client: &DatabaseClient) -> anyhow::Result<()> {
let emulator_host = get_emulator_host().expect("SPANNER_EMULATOR_HOST must be set");
let latch = Arc::new(Notify::new());
let begin_transaction_entered_latch = Arc::new(Notify::new());

// Create a raw gRPC client that connects to the Spanner Emulator.
// This will be used by the proxy server to forward requests to the Emulator.
let endpoint = Channel::from_shared(format!("http://{}", emulator_host))?
.connect()
.await?;
let raw_client = SpannerClient::new(endpoint);

let proxy = DelayedBeginProxy {
emulator_client: raw_client,
latch: Arc::clone(&latch),
begin_transaction_entered_latch: Arc::clone(&begin_transaction_entered_latch),
};

let (proxy_uri, _guard) =
crate::client::start_guarded_server("127.0.0.1:0", InterceptedSpanner(proxy)).await?;

// We build the Spanner DatabaseClient pointing directly to our proxy address over HTTP.
let proxy_db_client = Spanner::builder()
.with_endpoint(proxy_uri)
.build()
.await?
.database_client(format!(
"projects/test-project/instances/test-instance/databases/{}",
get_database_id().await
))
.build()
.await?;

let tx = proxy_db_client
.read_only_transaction()
.with_explicit_begin_transaction(false)
.build()
.await?;

let table_name = LowercaseAlphanumeric.random_string(10);
let table_name = format!("LateLoadedTable_{}", table_name);

// Create a task that tries to query the table before it exists.
// This will initially fail, and the client will fall back to using
// an explicit BeginTransaction RPC. The table will then be created
// BEFORE the BeginTransaction RPC is executed, which will cause the
// query to succeed when it is retried using the transaction ID that
// was returned by BeginTransaction. This task will never see the
// initial error, and instead it will seem like the query simply
// succeeded.
let query_task = tokio::spawn({
let table_name = table_name.clone();
async move {
let stmt = Statement::builder(format!("SELECT * FROM {}", table_name)).build();
let mut rs = tx.execute_query(stmt).await?;
let _ = rs.next().await;
Ok::<_, anyhow::Error>(tx)
}
});

// Wait until the query task above has been executed and has triggered an
// explicit BeginTransaction RPC. The BeginTransaction RPC is blocked until
// `latch` is notified.
begin_transaction_entered_latch.notified().await;

// Create the table on the emulator while the BeginTransaction RPC is blocked.
let statement = format!("CREATE TABLE {} (Id INT64) PRIMARY KEY (Id)", table_name);
crate::client::update_database_ddl(statement).await?;

// Unblock the BeginTransaction RPC.
latch.notify_one();

// Wait for the query task to complete. It should succeed and never see
// the initial error.
let tx = query_task.await??;

assert!(
tx.read_timestamp().is_some(),
"The transaction should have a read timestamp"
);

Ok(())
}

pub async fn query_with_options(db_client: &DatabaseClient) -> anyhow::Result<()> {
let rot = db_client.single_use().build();

Expand Down
Loading
Loading