diff --git a/Cargo.lock b/Cargo.lock index d8ed3c4cd7..eb4ec1469b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5966,7 +5966,10 @@ dependencies = [ "prost-types", "reqwest 0.13.2", "serde_json", + "spanner-grpc-mock", "tokio", + "tokio-stream", + "tonic", "tracing", ] diff --git a/deny.toml b/deny.toml index a3942fcf86..2e78103efb 100644 --- a/deny.toml +++ b/deny.toml @@ -117,6 +117,7 @@ wrappers = [ "grpc-server", "integration-tests-grpc-mock", "integration-tests-o11y", + "integration-tests-spanner", "pubsub-grpc-mock", "spanner-grpc-mock", "storage-grpc-mock", diff --git a/tests/spanner/Cargo.toml b/tests/spanner/Cargo.toml index 461c18d110..30b6625f58 100644 --- a/tests/spanner/Cargo.toml +++ b/tests/spanner/Cargo.toml @@ -36,7 +36,10 @@ google-cloud-test-utils = { workspace = true } prost-types.workspace = true reqwest = { workspace = true, features = ["json"] } serde_json = { workspace = true } +spanner-grpc-mock = { path = "../../src/spanner/grpc-mock" } tokio = { workspace = true, features = ["sync"] } +tokio-stream = { workspace = true } +tonic = { workspace = true } tracing.workspace = true [lints] diff --git a/tests/spanner/src/client.rs b/tests/spanner/src/client.rs index 7b05ecf3c5..90b0026836 100644 --- a/tests/spanner/src/client.rs +++ b/tests/spanner/src/client.rs @@ -14,6 +14,10 @@ use google_cloud_spanner::client::{KeySet, Mutation, Spanner}; use google_cloud_test_utils::resource_names::LowercaseAlphanumeric; +use spanner_grpc_mock::google::spanner::v1::spanner_server::Spanner as MockSpannerTrait; +use std::time::Duration; +use tokio::task::JoinHandle; +use tokio::time::sleep; const PROJECT_ID: &str = "test-project"; const INSTANCE_ID: &str = "test-instance"; @@ -40,7 +44,7 @@ pub async fn wait_for_emulator(endpoint: &str) { static PROVISION_EMULATOR: tokio::sync::OnceCell<()> = tokio::sync::OnceCell::const_new(); static DATABASE_ID: tokio::sync::OnceCell = tokio::sync::OnceCell::const_new(); -async fn get_database_id() -> &'static str { +pub async fn get_database_id() -> &'static str { DATABASE_ID .get_or_init(|| async { std::env::var("SPANNER_EMULATOR_TEST_DB") @@ -59,16 +63,19 @@ pub async fn provision_emulator(endpoint: &str) { .await; } +pub fn get_emulator_rest_endpoint(grpc_endpoint: &str) -> String { + let rest_endpoint = std::env::var("SPANNER_EMULATOR_REST_HOST") + .unwrap_or_else(|_| grpc_endpoint.replace("9010", "9020")); + if rest_endpoint.starts_with("http://") || rest_endpoint.starts_with("https://") { + rest_endpoint + } else { + format!("http://{}", rest_endpoint) + } +} + async fn do_provision_emulator(endpoint: &str) { // TODO(#4973): Re-write this to use the admin clients once those also support the Emulator. - let rest_endpoint = std::env::var("SPANNER_EMULATOR_REST_HOST") - .unwrap_or_else(|_| endpoint.replace("9010", "9020")); - let rest_endpoint = - if rest_endpoint.starts_with("http://") || rest_endpoint.starts_with("https://") { - rest_endpoint - } else { - format!("http://{}", rest_endpoint) - }; + let rest_endpoint = get_emulator_rest_endpoint(endpoint); let client = reqwest::Client::new(); // Create a test instance and ignore any ALREADY_EXISTS errors. @@ -196,3 +203,77 @@ pub async fn create_database_client() -> Option anyhow::Result<()> { + let emulator_host = get_emulator_host().expect("SPANNER_EMULATOR_HOST must be set"); + let rest_endpoint = get_emulator_rest_endpoint(&emulator_host); + let db_path = format!( + "projects/{}/instances/{}/databases/{}", + PROJECT_ID, + INSTANCE_ID, + get_database_id().await + ); + let url = format!("{}/v1/{}/ddl", rest_endpoint, db_path); + let client = reqwest::Client::new(); + let payload = serde_json::json!({ + "statements": [statement] + }); + + let mut attempts = 0; + const MAX_ATTEMPTS: u32 = 25; + + loop { + attempts += 1; + let res = client.patch(&url).json(&payload).send().await?; + + let status = res.status(); + let text = res.text().await?; + + if status.is_success() { + return Ok(()); + } + + // Check if the error is the specific one we want to retry. + // Code 9 is FailedPrecondition. + if text.contains("\"code\":9") && text.contains("Schema change operation rejected") { + if attempts >= MAX_ATTEMPTS { + anyhow::bail!( + "Failed to update DDL after {} attempts. Last error: {}", + attempts, + text + ); + } + sleep(Duration::from_millis(100)).await; + continue; + } + + anyhow::bail!("Failed to update DDL: status={}, body={}", status, text); + } +} + +/// A guard that aborts the server task when dropped. +pub struct ServerGuard(JoinHandle<()>); + +impl Drop for ServerGuard { + fn drop(&mut self) { + self.0.abort(); + } +} + +/// Helper to start a mock server and return a drop guard. +pub async fn start_guarded_server( + address: &str, + service: T, +) -> anyhow::Result<(String, ServerGuard)> +where + T: MockSpannerTrait + Send + 'static, +{ + let (uri, handle) = spanner_grpc_mock::start(address, service).await?; + Ok((uri, ServerGuard(handle))) +} diff --git a/tests/spanner/src/lib.rs b/tests/spanner/src/lib.rs index 2be88b7a18..d8a95bf9c1 100644 --- a/tests/spanner/src/lib.rs +++ b/tests/spanner/src/lib.rs @@ -18,4 +18,5 @@ pub mod partitioned_dml; pub mod query; pub mod read; pub mod read_write_transaction; +pub mod test_proxy; pub mod write; diff --git a/tests/spanner/src/query.rs b/tests/spanner/src/query.rs index ea6facddd2..59371704f3 100644 --- a/tests/spanner/src/query.rs +++ b/tests/spanner/src/query.rs @@ -12,7 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use google_cloud_spanner::client::{DatabaseClient, Kind, QueryOptions, Statement}; +use crate::client::{get_database_id, get_emulator_host}; +use crate::test_proxy::{InterceptedSpanner, SpannerInterceptor}; +use google_cloud_spanner::client::{DatabaseClient, Kind, QueryOptions, Spanner, Statement}; +use google_cloud_test_utils::resource_names::LowercaseAlphanumeric; +use spanner_grpc_mock::google::spanner::v1 as spanner_v1; +use spanner_grpc_mock::google::spanner::v1::spanner_client::SpannerClient; +use std::sync::Arc; +use tokio::sync::Notify; +use tonic::transport::Channel; pub async fn simple_query(db_client: &DatabaseClient) -> anyhow::Result<()> { let rot = db_client.single_use().build(); @@ -409,6 +417,135 @@ fn verify_row_2(row: &google_cloud_spanner::client::Row) { ); } +/// A test proxy that intercepts and delays `BeginTransaction` requests. +/// +/// It notifies `begin_transaction_entered_latch` when a `BeginTransaction` request is received, +/// and blocks the request execution until `latch` is notified. This allows tests to +/// synchronize the order of execution of `BeginTransaction` with other operations. +struct DelayedBeginProxy { + emulator_client: SpannerClient, + latch: Arc, + begin_transaction_entered_latch: Arc, +} + +#[tonic::async_trait] +impl SpannerInterceptor for DelayedBeginProxy { + fn emulator_client(&self) -> SpannerClient { + self.emulator_client.clone() + } + + async fn begin_transaction( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.begin_transaction_entered_latch.notify_one(); + self.latch.notified().await; + self.emulator_client().begin_transaction(request).await + } +} + +// This test verifies that the client correctly falls back to `BeginTransaction` when the +// first statement in a transaction fails. It also shows that the statement is retried and +// could (theoretically) succeed during this retry. It achieves this by doing the following: +// 1. It uses a proxy that allows it to intercept the RPCs that are being sent to Spanner. +// 2. It creates a read-only transaction that uses inline-begin-transaction. +// 3. It executes a query that tries to read from a table that does not exist. +// 4. As the first statement in the transaction fails, the client falls back to using +// an explicit BeginTransaction RPC. +// 5. The proxy blocks this BeginTransaction RPC, and in the meantime the test creates +// the missing table. +// 6. The proxy unblocks the BeginTransaction RPC. +// 7. The statement is retried and succeeds. The test never sees the error. +// +// This test might seem like an extreme corner case for a read-only transaction like this. +// However, for read/write transactions, similar types of failures are more likely to occur, +// for example if a transaction tries to insert a row that violates the primary key. Another +// transaction could delete the row in the time between the first attempt failed, and the +// BeginTransaction RPC has been executed. +pub async fn inline_begin_fallback(_db_client: &DatabaseClient) -> anyhow::Result<()> { + let emulator_host = get_emulator_host().expect("SPANNER_EMULATOR_HOST must be set"); + let latch = Arc::new(Notify::new()); + let begin_transaction_entered_latch = Arc::new(Notify::new()); + + // Create a raw gRPC client that connects to the Spanner Emulator. + // This will be used by the proxy server to forward requests to the Emulator. + let endpoint = Channel::from_shared(format!("http://{}", emulator_host))? + .connect() + .await?; + let raw_client = SpannerClient::new(endpoint); + + let proxy = DelayedBeginProxy { + emulator_client: raw_client, + latch: Arc::clone(&latch), + begin_transaction_entered_latch: Arc::clone(&begin_transaction_entered_latch), + }; + + let (proxy_uri, _guard) = + crate::client::start_guarded_server("127.0.0.1:0", InterceptedSpanner(proxy)).await?; + + // We build the Spanner DatabaseClient pointing directly to our proxy address over HTTP. + let proxy_db_client = Spanner::builder() + .with_endpoint(proxy_uri) + .build() + .await? + .database_client(format!( + "projects/test-project/instances/test-instance/databases/{}", + get_database_id().await + )) + .build() + .await?; + + let tx = proxy_db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let table_name = LowercaseAlphanumeric.random_string(10); + let table_name = format!("LateLoadedTable_{}", table_name); + + // Create a task that tries to query the table before it exists. + // This will initially fail, and the client will fall back to using + // an explicit BeginTransaction RPC. The table will then be created + // BEFORE the BeginTransaction RPC is executed, which will cause the + // query to succeed when it is retried using the transaction ID that + // was returned by BeginTransaction. This task will never see the + // initial error, and instead it will seem like the query simply + // succeeded. + let query_task = tokio::spawn({ + let table_name = table_name.clone(); + async move { + let stmt = Statement::builder(format!("SELECT * FROM {}", table_name)).build(); + let mut rs = tx.execute_query(stmt).await?; + let _ = rs.next().await; + Ok::<_, anyhow::Error>(tx) + } + }); + + // Wait until the query task above has been executed and has triggered an + // explicit BeginTransaction RPC. The BeginTransaction RPC is blocked until + // `latch` is notified. + begin_transaction_entered_latch.notified().await; + + // Create the table on the emulator while the BeginTransaction RPC is blocked. + let statement = format!("CREATE TABLE {} (Id INT64) PRIMARY KEY (Id)", table_name); + crate::client::update_database_ddl(statement).await?; + + // Unblock the BeginTransaction RPC. + latch.notify_one(); + + // Wait for the query task to complete. It should succeed and never see + // the initial error. + let tx = query_task.await??; + + assert!( + tx.read_timestamp().is_some(), + "The transaction should have a read timestamp" + ); + + Ok(()) +} + pub async fn query_with_options(db_client: &DatabaseClient) -> anyhow::Result<()> { let rot = db_client.single_use().build(); diff --git a/tests/spanner/src/test_proxy.rs b/tests/spanner/src/test_proxy.rs new file mode 100644 index 0000000000..7f0a5837a2 --- /dev/null +++ b/tests/spanner/src/test_proxy.rs @@ -0,0 +1,269 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use spanner_grpc_mock::google::spanner::v1 as spanner_v1; +use spanner_grpc_mock::google::spanner::v1::spanner_client::SpannerClient; + +#[tonic::async_trait] +pub trait SpannerInterceptor: Send + Sync + 'static { + fn emulator_client(&self) -> SpannerClient; + + async fn create_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().create_session(request).await + } + + async fn batch_create_sessions( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + self.emulator_client().batch_create_sessions(request).await + } + + async fn get_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().get_session(request).await + } + + async fn list_sessions( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().list_sessions(request).await + } + + async fn delete_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().delete_session(request).await + } + + async fn execute_sql( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().execute_sql(request).await + } + + async fn execute_streaming_sql( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.emulator_client().execute_streaming_sql(request).await + } + + async fn execute_batch_dml( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + self.emulator_client().execute_batch_dml(request).await + } + + async fn read( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().read(request).await + } + + async fn streaming_read( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.emulator_client().streaming_read(request).await + } + + async fn begin_transaction( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().begin_transaction(request).await + } + + async fn commit( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().commit(request).await + } + + async fn rollback( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().rollback(request).await + } + + async fn partition_query( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().partition_query(request).await + } + + async fn partition_read( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().partition_read(request).await + } + + async fn batch_write( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.emulator_client().batch_write(request).await + } +} + +pub struct InterceptedSpanner(pub T); + +#[tonic::async_trait] +impl spanner_v1::spanner_server::Spanner for InterceptedSpanner { + async fn create_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.create_session(request).await + } + + async fn batch_create_sessions( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + self.0.batch_create_sessions(request).await + } + + async fn get_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.get_session(request).await + } + + async fn list_sessions( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.list_sessions(request).await + } + + async fn delete_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.delete_session(request).await + } + + async fn execute_sql( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.execute_sql(request).await + } + + type ExecuteStreamingSqlStream = tonic::codec::Streaming; + + async fn execute_streaming_sql( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.execute_streaming_sql(request).await + } + + async fn execute_batch_dml( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + self.0.execute_batch_dml(request).await + } + + async fn read( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.read(request).await + } + + type StreamingReadStream = tonic::codec::Streaming; + + async fn streaming_read( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.streaming_read(request).await + } + + async fn begin_transaction( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.begin_transaction(request).await + } + + async fn commit( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.commit(request).await + } + + async fn rollback( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.rollback(request).await + } + + async fn partition_query( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.partition_query(request).await + } + + async fn partition_read( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.partition_read(request).await + } + + type BatchWriteStream = tonic::codec::Streaming; + + async fn batch_write( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.batch_write(request).await + } +} diff --git a/tests/spanner/tests/driver.rs b/tests/spanner/tests/driver.rs index a8a3c8547d..4dcb4fcd02 100644 --- a/tests/spanner/tests/driver.rs +++ b/tests/spanner/tests/driver.rs @@ -30,6 +30,7 @@ mod spanner { &db_client, ) .await?; + integration_tests_spanner::query::inline_begin_fallback(&db_client).await?; integration_tests_spanner::query::query_with_options(&db_client).await?; Ok(())