From e7d34b069064aa93113ec4909cbcdb390160f94e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 1 Apr 2026 16:15:22 +0200 Subject: [PATCH 1/8] perf(spanner): inline BeginTransaction with first query (step 1) Adds support for inlining the BeginTransaction with the first query in a read-only transaction. This saves one round-trip to Spanner for multi-use read-only transactions. This implementation is intentionally simple: 1. It does not support parallel queries at the start of the transaction. 2. It does not include error handling for the first query. 3. It only supports read-only transactions. This is step 1. Follow-up pull requests addresses the above points. --- .../src/batch_read_only_transaction.rs | 11 +- src/spanner/src/read_only_transaction.rs | 269 ++++++++++++++++-- src/spanner/src/read_write_transaction.rs | 12 +- src/spanner/src/result_set.rs | 17 +- tests/spanner/src/query.rs | 29 +- 5 files changed, 299 insertions(+), 39 deletions(-) diff --git a/src/spanner/src/batch_read_only_transaction.rs b/src/spanner/src/batch_read_only_transaction.rs index c31447a5f3..179ac26dc2 100644 --- a/src/spanner/src/batch_read_only_transaction.rs +++ b/src/spanner/src/batch_read_only_transaction.rs @@ -43,7 +43,8 @@ pub struct BatchReadOnlyTransactionBuilder { impl BatchReadOnlyTransactionBuilder { pub(crate) fn new(client: DatabaseClient) -> Self { Self { - inner: MultiUseReadOnlyTransactionBuilder::new(client), + inner: MultiUseReadOnlyTransactionBuilder::new(client) + .with_explicit_begin_transaction(true), } } @@ -147,7 +148,7 @@ impl BatchReadOnlyTransaction { .clone() .into_partition_query_request() .set_session(self.inner.context.client.session.name.clone()) - .set_transaction(self.inner.context.transaction_selector.clone()) + .set_transaction(self.inner.context.transaction_selector.selector()) .set_partition_options(options); let response = self @@ -164,7 +165,7 @@ impl BatchReadOnlyTransaction { .map(|p| Partition { inner: PartitionedOperation::Query { partition_token: p.partition_token, - transaction_selector: self.inner.context.transaction_selector.clone(), + transaction_selector: self.inner.context.transaction_selector.selector(), session_name: self.inner.context.client.session.name.clone(), statement: statement.clone(), }, @@ -202,7 +203,7 @@ impl BatchReadOnlyTransaction { .clone() .into_partition_read_request() .set_session(self.inner.context.client.session.name.clone()) - .set_transaction(self.inner.context.transaction_selector.clone()) + .set_transaction(self.inner.context.transaction_selector.selector()) .set_partition_options(options); let response = self @@ -219,7 +220,7 @@ impl BatchReadOnlyTransaction { .map(|p| Partition { inner: PartitionedOperation::Read { partition_token: p.partition_token, - transaction_selector: self.inner.context.transaction_selector.clone(), + transaction_selector: self.inner.context.transaction_selector.selector(), session_name: self.inner.context.client.session.name.clone(), read_request: read.clone(), }, diff --git a/src/spanner/src/read_only_transaction.rs b/src/spanner/src/read_only_transaction.rs index 3f0df51a5c..44782326f3 100644 --- a/src/spanner/src/read_only_transaction.rs +++ b/src/spanner/src/read_only_transaction.rs @@ -19,6 +19,7 @@ use crate::precommit::PrecommitTokenTracker; use crate::result_set::{ResultSet, StreamOperation}; use crate::statement::Statement; use crate::timestamp_bound::TimestampBound; +use std::sync::{Arc, Mutex}; /// A builder for [SingleUseReadOnlyTransaction]. /// @@ -91,7 +92,10 @@ impl SingleUseReadOnlyTransactionBuilder { SingleUseReadOnlyTransaction { context: ReadContext { client: self.client, - transaction_selector, + transaction_selector: ReadContextTransactionSelector::Fixed( + transaction_selector, + None, + ), precommit_token_tracker: PrecommitTokenTracker::new_noop(), transaction_tag: None, }, @@ -204,6 +208,7 @@ impl SingleUseReadOnlyTransaction { pub struct MultiUseReadOnlyTransactionBuilder { client: DatabaseClient, timestamp_bound: Option, + explicit_begin: bool, } impl MultiUseReadOnlyTransactionBuilder { @@ -211,9 +216,44 @@ impl MultiUseReadOnlyTransactionBuilder { Self { client, timestamp_bound: None, + explicit_begin: false, } } + /// Sets whether the transaction should be explicitly started using a `BeginTransaction` RPC. + /// + /// # Example + /// ``` + /// # use google_cloud_spanner::client::Spanner; + /// # use google_cloud_spanner::client::Statement; + /// # async fn set_explicit_begin(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> { + /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?; + /// let transaction = db_client.read_only_transaction().with_explicit_begin_transaction(true).build().await?; + /// let statement = Statement::builder("SELECT * FROM users").build(); + /// let result_set = transaction.execute_query(statement).await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// By default, the Spanner client will inline the `BeginTransaction` call with the first query + /// in the transaction. This reduces the number of round-trips to Spanner that are needed for a + /// transaction. Setting this option to `true` can be beneficial for specific transaction shapes: + /// + /// 1. When the transaction executes multiple parallel queries at the start of the transaction. + /// Only one query can include a `BeginTransaction` option, and all other queries must wait for + /// the first query to return the first result before they can proceed to execute. A + /// `BeginTransaction` RPC will quickly return a transaction ID and allow all queries to start + /// execution in parallel once the transaction ID has been returned. + /// 2. When the first query in the transaction could fail. If the query fails, then it will also + /// not start a transaction and return a transaction ID. The transaction will then fall back to + /// executing a `BeginTransaction` RPC and retry the first query. + /// + /// Default is `false` (inline begin). + pub fn with_explicit_begin_transaction(mut self, explicit: bool) -> Self { + self.explicit_begin = explicit; + self + } + /// Sets the timestamp bound for the read-only transaction. /// /// # Example @@ -231,6 +271,29 @@ impl MultiUseReadOnlyTransactionBuilder { self } + async fn begin( + &self, + options: TransactionOptions, + ) -> crate::Result { + let request = crate::model::BeginTransactionRequest::default() + .set_session(self.client.session.name.clone()) + .set_options(options); + + // TODO(#4972): make request options configurable + let response = self + .client + .spanner + .begin_transaction(request, crate::RequestOptions::default()) + .await?; + + let transaction_selector = crate::model::TransactionSelector::default().set_id(response.id); + + Ok(ReadContextTransactionSelector::Fixed( + transaction_selector, + response.read_timestamp, + )) + } + /// Builds the [MultiUseReadOnlyTransaction] and starts the transaction /// by calling the `BeginTransaction` RPC. /// @@ -245,30 +308,27 @@ impl MultiUseReadOnlyTransactionBuilder { /// ``` pub async fn build(self) -> crate::Result { let read_only = ReadOnly::default().set_return_read_timestamp(true); - let read_only = match self.timestamp_bound { - Some(b) => read_only.set_timestamp_bound(b.0), + let read_only = match self.timestamp_bound.as_ref() { + Some(b) => read_only.set_timestamp_bound(b.0.clone()), None => read_only.set_strong(true), }; - let request = crate::model::BeginTransactionRequest::default() - .set_session(self.client.session.name.clone()) - .set_options(TransactionOptions::default().set_read_only(read_only)); + let options = TransactionOptions::default().set_read_only(read_only); - // TODO(#4972): make request options configurable - let response = self - .client - .spanner - .begin_transaction(request, crate::RequestOptions::default()) - .await?; + let selector = if self.explicit_begin { + self.begin(options).await? + } else { + ReadContextTransactionSelector::Lazy(Arc::new(Mutex::new( + TransactionState::NotStarted(options), + ))) + }; - let transaction_selector = crate::model::TransactionSelector::default().set_id(response.id); Ok(MultiUseReadOnlyTransaction { context: ReadContext { client: self.client, - transaction_selector, + transaction_selector: selector, precommit_token_tracker: PrecommitTokenTracker::new_noop(), transaction_tag: None, }, - read_timestamp: response.read_timestamp, }) } } @@ -297,13 +357,12 @@ impl MultiUseReadOnlyTransactionBuilder { #[derive(Debug)] pub struct MultiUseReadOnlyTransaction { pub(crate) context: ReadContext, - pub(crate) read_timestamp: Option, } impl MultiUseReadOnlyTransaction { /// Returns the read timestamp chosen for the transaction. pub fn read_timestamp(&self) -> Option { - self.read_timestamp + self.context.transaction_selector.read_timestamp() } /// Executes a query using this transaction. @@ -370,10 +429,71 @@ impl MultiUseReadOnlyTransaction { } } +#[derive(Clone, Debug)] +pub(crate) enum ReadContextTransactionSelector { + Fixed(crate::model::TransactionSelector, Option), + Lazy(Arc>), +} + +#[derive(Clone, Debug)] +pub(crate) enum TransactionState { + NotStarted(crate::model::TransactionOptions), + Started(crate::model::TransactionSelector, Option), +} + +impl TransactionState { + fn selector(&self) -> crate::model::TransactionSelector { + match self { + Self::Started(selector, _) => selector.clone(), + Self::NotStarted(options) => { + crate::model::TransactionSelector::default().set_begin(options.clone()) + } + } + } +} + +impl ReadContextTransactionSelector { + pub(crate) fn selector(&self) -> crate::model::TransactionSelector { + match self { + Self::Fixed(selector, _) => selector.clone(), + Self::Lazy(lazy) => lazy + .lock() + .expect("transaction state mutex poisoned") + .selector(), + } + } + + pub(crate) fn update(&self, id: bytes::Bytes, timestamp: Option) { + if let Self::Lazy(lazy) = self { + let mut guard = lazy.lock().expect("transaction state mutex poisoned"); + if matches!(&*guard, TransactionState::NotStarted(_)) { + *guard = TransactionState::Started( + crate::model::TransactionSelector::default().set_id(id), + timestamp, + ); + } + } + } + + pub(crate) fn read_timestamp(&self) -> Option { + match self { + Self::Fixed(_, timestamp) => *timestamp, + Self::Lazy(lazy) => { + let guard = lazy.lock().expect("transaction state mutex poisoned"); + if let TransactionState::Started(_, timestamp) = &*guard { + *timestamp + } else { + None + } + } + } + } +} + #[derive(Clone, Debug)] pub(crate) struct ReadContext { pub(crate) client: DatabaseClient, - pub(crate) transaction_selector: crate::model::TransactionSelector, + pub(crate) transaction_selector: ReadContextTransactionSelector, pub(crate) precommit_token_tracker: PrecommitTokenTracker, pub(crate) transaction_tag: Option, } @@ -405,7 +525,7 @@ impl ReadContext { .into() .into_request() .set_session(self.client.session.name.clone()) - .set_transaction(self.transaction_selector.clone()); + .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); let stream = self @@ -418,6 +538,7 @@ impl ReadContext { Ok(ResultSet::new( stream, + Some(self.transaction_selector.clone()), self.precommit_token_tracker.clone(), self.client.clone(), StreamOperation::Query(request), @@ -432,7 +553,7 @@ impl ReadContext { .into() .into_request() .set_session(self.client.session.name.clone()) - .set_transaction(self.transaction_selector.clone()); + .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); let stream = self @@ -445,6 +566,7 @@ impl ReadContext { Ok(ResultSet::new( stream, + Some(self.transaction_selector.clone()), self.precommit_token_tracker.clone(), self.client.clone(), StreamOperation::Read(request), @@ -525,9 +647,8 @@ pub(crate) mod tests { let (db_client, _server) = setup_db_client(mock).await; let tx = db_client.single_use().build(); - let ro = tx - .context - .transaction_selector + let selector = tx.context.transaction_selector.selector(); + let ro = selector .single_use() .expect("Expected SingleUse selector") .read_only() @@ -543,9 +664,8 @@ pub(crate) mod tests { std::time::Duration::from_secs(10), )) .build(); - let ro2 = tx2 - .context - .transaction_selector + let selector = tx2.context.transaction_selector.selector(); + let ro2 = selector .single_use() .expect("Expected SingleUse selector") .read_only() @@ -646,6 +766,7 @@ pub(crate) mod tests { let tx = db_client .read_only_transaction() + .with_explicit_begin_transaction(true) .build() .await .expect("Failed to start tx"); @@ -670,6 +791,102 @@ pub(crate) mod tests { } } + #[tokio::test] + async fn execute_multi_query_inline_begin() -> anyhow::Result<()> { + use super::super::result_set::tests::string_val; + use crate::client::Statement; + use crate::value::Value; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + + let mut mock = create_session_mock(); + + // No explicit begin_transaction should be called. + mock.expect_begin_transaction().never(); + + let mut seq = mockall::Sequence::new(); + + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + assert_eq!( + req.session, + "projects/p/instances/i/databases/d/sessions/123" + ); + + // First call: Should have Selector::Begin + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Begin(_) => {} + _ => panic!("Expected Selector::Begin"), + } + let mut rs = setup_select1(); + rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction { + id: vec![4, 5, 6], + read_timestamp: Some(prost_types::Timestamp { + seconds: 987654321, + nanos: 0, + }), + ..Default::default() + }); + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(rs)]), + ))) + }); + + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + // Second call: Should have Selector::Id using the ID returned in the first call + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![4, 5, 6]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(setup_select1())]), + ))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // The read timestamp is not available until the first query is executed. + assert!(tx.read_timestamp().is_none()); + + for i in 0..2 { + let mut rs = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await?; + + let row = rs.next().await.expect("Expected a row")?; + assert_eq!(row.raw_values(), [Value(string_val("1"))]); + + let result = rs.next().await; + assert!(result.is_none(), "Expected None, got {result:?}"); + + if i == 0 { + // Read timestamp becomes available. + assert_eq!( + tx.read_timestamp() + .expect("Expected read timestamp") + .seconds(), + 987654321 + ); + } + } + + Ok(()) + } + #[tokio::test] async fn execute_single_read() { use super::super::result_set::tests::string_val; diff --git a/src/spanner/src/read_write_transaction.rs b/src/spanner/src/read_write_transaction.rs index 9a84b1bb87..920d69bfda 100644 --- a/src/spanner/src/read_write_transaction.rs +++ b/src/spanner/src/read_write_transaction.rs @@ -100,7 +100,11 @@ impl ReadWriteTransactionBuilder { .begin_transaction(request, RequestOptions::default()) .await?; - let transaction_selector = TransactionSelector::default().set_id(response.id); + let transaction_selector = + crate::read_only_transaction::ReadContextTransactionSelector::Fixed( + TransactionSelector::default().set_id(response.id), + None, + ); Ok(ReadWriteTransaction { context: ReadContext { client: self.client.clone(), @@ -144,7 +148,7 @@ impl ReadWriteTransaction { .into() .into_request() .set_session(self.context.client.session.name.clone()) - .set_transaction(self.context.transaction_selector.clone()) + .set_transaction(self.context.transaction_selector.selector()) .set_seqno(seqno); request.request_options = self.context.amend_request_options(request.request_options); @@ -245,7 +249,7 @@ impl ReadWriteTransaction { let request = ExecuteBatchDmlRequest::default() .set_session(self.context.client.session.name.clone()) - .set_transaction(self.context.transaction_selector.clone()) + .set_transaction(self.context.transaction_selector.selector()) .set_seqno(seqno) .set_statements(statements) .set_or_clear_request_options( @@ -271,7 +275,7 @@ impl ReadWriteTransaction { } pub(crate) fn transaction_id(&self) -> crate::Result { - match &self.context.transaction_selector.selector { + match &self.context.transaction_selector.selector().selector { Some(Selector::Id(id)) => Ok(id.clone()), _ => Err(internal_error("Transaction ID is missing")), } diff --git a/src/spanner/src/result_set.rs b/src/spanner/src/result_set.rs index cfe0397cae..6bd848b175 100644 --- a/src/spanner/src/result_set.rs +++ b/src/spanner/src/result_set.rs @@ -16,6 +16,7 @@ use crate::database_client::DatabaseClient; use crate::error::internal_error; use crate::google::spanner::v1::PartialResultSet; use crate::precommit::PrecommitTokenTracker; +use crate::read_only_transaction::ReadContextTransactionSelector; use crate::result_set_metadata::ResultSetMetadata; use crate::row::Row; use crate::server_streaming::stream::PartialResultSetStream; @@ -58,6 +59,7 @@ pub struct ResultSet { safe_to_retry: bool, max_buffered_partial_result_sets: usize, retry_count: usize, + transaction_selector: Option, } /// Errors that can occur when interacting with a [`ResultSet`]. @@ -84,6 +86,7 @@ impl ResultSet { /// Creates a new result set. pub(crate) fn new( stream: PartialResultSetStream, + transaction_selector: Option, precommit_token_tracker: PrecommitTokenTracker, client: DatabaseClient, operation: StreamOperation, @@ -102,6 +105,7 @@ impl ResultSet { safe_to_retry: true, max_buffered_partial_result_sets: MAX_BUFFERED_PARTIAL_RESULT_SETS, retry_count: 0, + transaction_selector, } } @@ -274,8 +278,19 @@ impl ResultSet { (Some(_), Some(_)) => { return Err(internal_error("Additional metadata after first result set")); } - (None, Some(m)) => { + (None, Some(mut m)) => { + let transaction = m.transaction.take(); self.metadata = Some(ResultSetMetadata::new(Some(m))); + if let (Some(selector), Some(transaction)) = + (&self.transaction_selector, transaction) + { + selector.update( + transaction.id, + transaction + .read_timestamp + .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()), + ); + } } } diff --git a/tests/spanner/src/query.rs b/tests/spanner/src/query.rs index e38b51b989..fe02427a19 100644 --- a/tests/spanner/src/query.rs +++ b/tests/spanner/src/query.rs @@ -194,17 +194,40 @@ pub async fn result_set_metadata(db_client: &DatabaseClient) -> anyhow::Result<( } pub async fn multi_use_read_only_transaction(db_client: &DatabaseClient) -> anyhow::Result<()> { + for explicit_begin in [false, true] { + test_multi_use_read_only_transaction(db_client, explicit_begin).await?; + } + Ok(()) +} + +async fn test_multi_use_read_only_transaction( + db_client: &DatabaseClient, + explicit_begin: bool, +) -> anyhow::Result<()> { // Start a multi-use read-only transaction. - let tx = db_client.read_only_transaction().build().await?; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(explicit_begin) + .build() + .await?; - // Expect a read timestamp to have been chosen. - assert!(tx.read_timestamp().is_some()); + if explicit_begin { + // Expect a read timestamp to have been chosen immediately. + assert!(tx.read_timestamp().is_some()); + } else { + // Expect a read timestamp to NOT have been chosen yet. + assert!(tx.read_timestamp().is_none()); + } // Execute the first query. let mut rs1 = tx .execute_query(Statement::builder("SELECT 1 AS col_int").build()) .await?; let row1 = rs1.next().await.transpose()?.expect("should yield a row"); + + // The read timestamp is now always available. + assert!(tx.read_timestamp().is_some()); + let val1 = row1.raw_values()[0].as_string(); assert_eq!(val1, "1"); let next1 = rs1.next().await.transpose()?; From 57eb4035dd1fc002e80ca3033aed9a8a7e5133c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 8 Apr 2026 08:56:33 +0200 Subject: [PATCH 2/8] fix(spanner): modify constructor call in BatchReadOnlyTransaction --- src/spanner/src/batch_read_only_transaction.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/spanner/src/batch_read_only_transaction.rs b/src/spanner/src/batch_read_only_transaction.rs index 44ddb8cd07..ff59050803 100644 --- a/src/spanner/src/batch_read_only_transaction.rs +++ b/src/spanner/src/batch_read_only_transaction.rs @@ -16,7 +16,7 @@ use crate::database_client::DatabaseClient; use crate::model::PartitionOptions; use crate::precommit::PrecommitTokenTracker; use crate::read_only_transaction::{ - MultiUseReadOnlyTransaction, MultiUseReadOnlyTransactionBuilder, + MultiUseReadOnlyTransaction, MultiUseReadOnlyTransactionBuilder, ReadContextTransactionSelector, }; use crate::result_set::{ResultSet, StreamOperation}; use crate::statement::Statement; @@ -345,6 +345,10 @@ impl Partition { Ok(ResultSet::new( stream, + Some(ReadContextTransactionSelector::Fixed( + transaction_selector.clone(), + None, + )), PrecommitTokenTracker::new_noop(), client.clone(), StreamOperation::Query(request), @@ -374,6 +378,10 @@ impl Partition { Ok(ResultSet::new( stream, + Some(ReadContextTransactionSelector::Fixed( + transaction_selector.clone(), + None, + )), PrecommitTokenTracker::new_noop(), client.clone(), StreamOperation::Read(request), From d8af76e44d5049df1326f19625ed2c28c9f8daaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 8 Apr 2026 09:35:55 +0200 Subject: [PATCH 3/8] test(spanner): add missing test for Read --- src/spanner/src/read_only_transaction.rs | 97 ++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/src/spanner/src/read_only_transaction.rs b/src/spanner/src/read_only_transaction.rs index 44782326f3..ea57a75853 100644 --- a/src/spanner/src/read_only_transaction.rs +++ b/src/spanner/src/read_only_transaction.rs @@ -922,4 +922,101 @@ pub(crate) mod tests { let result = rs.next().await; assert!(result.is_none(), "expected None, got {result:?}"); } + + #[tokio::test] + async fn execute_multi_read() -> anyhow::Result<()> { + use super::super::result_set::tests::string_val; + use crate::client::{KeySet, ReadRequest}; + use crate::value::Value; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + + let mut mock = create_session_mock(); + + // No explicit begin_transaction should be called. + mock.expect_begin_transaction().never(); + + let mut seq = mockall::Sequence::new(); + + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + assert_eq!( + req.session, + "projects/p/instances/i/databases/d/sessions/123" + ); + + // First call: Should have Selector::Begin + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Begin(_) => {} + _ => panic!("Expected Selector::Begin"), + } + let mut rs = setup_select1(); + rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction { + id: vec![4, 5, 6], + read_timestamp: Some(prost_types::Timestamp { + seconds: 987654321, + nanos: 0, + }), + ..Default::default() + }); + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(rs)]), + ))) + }); + + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + // Second call: Should have Selector::Id using the ID returned in the first call + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![4, 5, 6]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(setup_select1())]), + ))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // The read timestamp is not available until the first query is executed. + assert!(tx.read_timestamp().is_none()); + + for i in 0..2 { + let read = ReadRequest::builder("Users", vec!["Id", "Name"]) + .with_keys(KeySet::all()) + .build(); + let mut rs = tx.execute_read(read).await?; + + let row = rs.next().await.expect("Expected a row")?; + assert_eq!(row.raw_values(), [Value(string_val("1"))]); + + let result = rs.next().await; + assert!(result.is_none(), "Expected None, got {result:?}"); + + if i == 0 { + // Read timestamp becomes available. + assert_eq!( + tx.read_timestamp() + .expect("Expected read timestamp") + .seconds(), + 987654321 + ); + } + } + + Ok(()) + } } From c3e8b332ca0629f0f439e7761a7d401d45fd934c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Thu, 2 Apr 2026 11:26:16 +0200 Subject: [PATCH 4/8] perf(spanner): inline begin transaction error handling Adds error handling for inline-begin-transaction. If the first statement in a transaction fails, and that statement included a BeginTransaction option, then the transaction has not been started. In order to keep the semantics of the transaction consistent for an 'outside observer', we need to do the following: 1. Catch the error that was thrown by the initial statement. 2. Start the transaction using an explicit BeginTransaction RPC. 3. Retry the initial statement, but now using the transaction ID from step 2. 4. Return the error or result for the retried initial statement. The above makes sure that: 1. The transaction is actually started when the first statement is executed, also when the statement failed. 2. The statement becomes part of the transaction, and the result of the statement is consistent with the read-timestamp of the transaction. The second part is important in order to comply with Spanner's strong consistency guarantees; If for example a statement returns a 'Table not found' error, then that error is only valid for the read timestamp that was used for executing the statement. This is the reason that we retry the statement after the BeginTransaction RPC to be able to return a result that is guaranteed to be consistent with any other queries/reads that will be executed in the same transaction. --- src/spanner/src/read_only_transaction.rs | 486 ++++++++++++++++++++-- src/spanner/src/result_set.rs | 503 ++++++++++++++++++++++- tests/spanner/src/query.rs | 39 ++ tests/spanner/tests/driver.rs | 4 + 4 files changed, 974 insertions(+), 58 deletions(-) diff --git a/src/spanner/src/read_only_transaction.rs b/src/spanner/src/read_only_transaction.rs index ea57a75853..67ca13feb4 100644 --- a/src/spanner/src/read_only_transaction.rs +++ b/src/spanner/src/read_only_transaction.rs @@ -275,16 +275,7 @@ impl MultiUseReadOnlyTransactionBuilder { &self, options: TransactionOptions, ) -> crate::Result { - let request = crate::model::BeginTransactionRequest::default() - .set_session(self.client.session.name.clone()) - .set_options(options); - - // TODO(#4972): make request options configurable - let response = self - .client - .spanner - .begin_transaction(request, crate::RequestOptions::default()) - .await?; + let response = execute_begin_transaction(&self.client, options).await?; let transaction_selector = crate::model::TransactionSelector::default().set_id(response.id); @@ -429,6 +420,22 @@ impl MultiUseReadOnlyTransaction { } } +/// Executes an explicit `BeginTransaction` RPC on Spanner. +async fn execute_begin_transaction( + client: &crate::database_client::DatabaseClient, + options: crate::model::TransactionOptions, +) -> crate::Result { + let request = crate::model::BeginTransactionRequest::default() + .set_session(client.session.name.clone()) + .set_options(options); + + // TODO(#4972): make request options configurable + client + .spanner + .begin_transaction(request, crate::RequestOptions::default()) + .await +} + #[derive(Clone, Debug)] pub(crate) enum ReadContextTransactionSelector { Fixed(crate::model::TransactionSelector, Option), @@ -463,6 +470,32 @@ impl ReadContextTransactionSelector { } } + /// Explicitly begins a transaction if the transaction selector is a `Lazy` + /// selector and the transaction has not yet been started. This is used by + /// the client to force the start of a transaction if the first statement + /// failed. + pub(crate) async fn begin_explicitly( + &self, + client: &crate::database_client::DatabaseClient, + ) -> crate::Result<()> { + let Self::Lazy(lazy) = self else { + return Ok(()); + }; + + let options = { + let guard = lazy.lock().expect("transaction state mutex poisoned"); + let TransactionState::NotStarted(options) = &*guard else { + return Ok(()); + }; + options.clone() + }; + + let response = execute_begin_transaction(client, options).await?; + self.update(response.id, response.read_timestamp); + + Ok(()) + } + pub(crate) fn update(&self, id: bytes::Bytes, timestamp: Option) { if let Self::Lazy(lazy) = self { let mut guard = lazy.lock().expect("transaction state mutex poisoned"); @@ -517,6 +550,64 @@ impl ReadContext { options } + /// Attempts to execute an explicit `begin_transaction` RPC if the current transaction + /// selector is still in the `Lazy(NotStarted)` state. This is used as a + /// fallback mechanism when an initial implicit begin attempt failed. + async fn begin_explicitly_if_not_started(&self) -> crate::Result { + let ReadContextTransactionSelector::Lazy(lazy) = &self.transaction_selector else { + return Ok(false); + }; + let is_started = matches!(&*lazy.lock().unwrap(), TransactionState::Started(_, _)); + if is_started { + return Ok(false); + } + + self.transaction_selector + .begin_explicitly(&self.client) + .await?; + Ok(true) + } +} + +/// Helper macro to execute a streaming SQL or streaming read RPC with retry logic. +macro_rules! execute_stream_with_retry { + ($self:expr, $request:ident, $rpc_method:ident, $operation_variant:path) => {{ + let stream = match $self + .client + .spanner + // TODO(#4972): make request options configurable + .$rpc_method($request.clone(), crate::RequestOptions::default()) + .send() + .await + { + Ok(s) => s, + Err(e) => { + if $self.begin_explicitly_if_not_started().await? { + $request.transaction = Some($self.transaction_selector.selector()); + $self + .client + .spanner + // TODO(#4972): make request options configurable + .$rpc_method($request.clone(), crate::RequestOptions::default()) + .send() + .await? + } else { + return Err(e); + } + } + }; + + Ok(ResultSet::new( + stream, + Some($self.transaction_selector.clone()), + $self.precommit_token_tracker.clone(), + $self.client.clone(), + $operation_variant($request), + )) + }}; +} + +impl ReadContext { pub(crate) async fn execute_query>( &self, statement: T, @@ -528,21 +619,7 @@ impl ReadContext { .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); - let stream = self - .client - .spanner - // TODO(#4972): make request options configurable - .execute_streaming_sql(request.clone(), crate::RequestOptions::default()) - .send() - .await?; - - Ok(ResultSet::new( - stream, - Some(self.transaction_selector.clone()), - self.precommit_token_tracker.clone(), - self.client.clone(), - StreamOperation::Query(request), - )) + execute_stream_with_retry!(self, request, execute_streaming_sql, StreamOperation::Query) } pub(crate) async fn execute_read>( @@ -556,27 +633,15 @@ impl ReadContext { .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); - let stream = self - .client - .spanner - // TODO(#4972): make request options configurable - .streaming_read(request.clone(), crate::RequestOptions::default()) - .send() - .await?; - - Ok(ResultSet::new( - stream, - Some(self.transaction_selector.clone()), - self.precommit_token_tracker.clone(), - self.client.clone(), - StreamOperation::Read(request), - )) + execute_stream_with_retry!(self, request, streaming_read, StreamOperation::Read) } } #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::result_set::tests::string_val; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; #[test] fn auto_traits() { @@ -1019,4 +1084,345 @@ pub(crate) mod tests { Ok(()) } + + #[tokio::test] + async fn inline_begin_failure_retry_success() -> anyhow::Result<()> { + use crate::value::Value; + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + assert_eq!( + req.session, + "projects/p/instances/i/databases/d/sessions/123" + ); + // Return a transaction with ID + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: Some(prost_types::Timestamp { + seconds: 123456789, + nanos: 0, + }), + ..Default::default() + })) + }); + + // 3. Retry of the query succeeds + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + // Ensure it uses the new transaction ID + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok( + setup_select1(), + )])))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let mut rs = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await?; + + let row = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected a row but stream cleanly exhausted"))??; + assert_eq!( + row.raw_values(), + [Value(string_val("1"))], + "The parsed row value safely matched the underlying stream chunk" + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_failure_retry_failure() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error first"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| { + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: Some(prost_types::Timestamp { + seconds: 123456789, + nanos: 0, + }), + ..Default::default() + })) + }); + + // 3. Retry of the query fails again + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error second"))); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!( + rs_result.is_err(), + "The failed execution bubbled upwards securely" + ); + let err_str = rs_result.unwrap_err().to_string(); + assert!( + err_str.contains("Internal error second"), + "Secondary error message accurately propagates: {}", + err_str + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_failure_fallback_rpc_fails() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error query"))); + + // 2. Explicit begin transaction fails + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error begin tx"))); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!( + rs_result.is_err(), + "The explicitly errored fallback boot securely propagated outwards" + ); + let err_str = rs_result.unwrap_err().to_string(); + assert!( + err_str.contains("Internal error begin tx"), + "Natively propagated specific BeginTx bounds: {}", + err_str + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_read_failure_retry_success() -> anyhow::Result<()> { + use crate::client::{KeySet, ReadRequest}; + use crate::value::Value; + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial read fails + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| { + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: None, + ..Default::default() + })) + }); + + // 3. Retry of the read succeeds + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + // Ensure it uses the new transaction ID + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok( + setup_select1(), + )])))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let read = ReadRequest::builder("Users", vec!["Id", "Name"]) + .with_keys(KeySet::all()) + .build(); + let mut rs = tx.execute_read(read).await?; + + let row = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected a row uniquely returned"))??; + assert_eq!( + row.raw_values(), + [Value(string_val("1"))], + "The macro correctly unpacked read arrays seamlessly" + ); + + Ok(()) + } + + #[tokio::test] + async fn single_use_query_send_error_returns_immediately() -> anyhow::Result<()> { + use crate::client::Statement; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + + mock.expect_execute_streaming_sql() + .times(1) + .returning(|_| Err(Status::internal("Internal error single use query"))); + + mock.expect_begin_transaction().never(); + + let (db_client, _server) = setup_db_client(mock).await; + // single_use creates a Fixed selector + let tx = db_client.single_use().build(); + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!(rs_result.is_err()); + let err_str = rs_result.unwrap_err().to_string(); + assert!(err_str.contains("Internal error single use query")); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_already_started_query_send_error_returns_immediately() + -> anyhow::Result<()> { + use crate::client::Statement; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + mock.expect_begin_transaction().never(); + + // 1. First query executes successfully and implicitly starts the transaction. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(move |_req| { + let mut rs = setup_select1(); + rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction { + id: vec![4, 5, 6], + read_timestamp: None, + ..Default::default() + }); + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(rs)]), + ))) + }); + + // 2. Second query fails immediately upon send() + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error second query"))); + + let (db_client, _server) = setup_db_client(mock).await; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // Run first query (starts tx) + let mut rs = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await?; + let _ = rs.next().await.expect("has row")?; + + // Run second query (fails) + let rs_result = tx + .execute_query(Statement::builder("SELECT 2").build()) + .await; + + assert!(rs_result.is_err()); + let err_str = rs_result.unwrap_err().to_string(); + assert!(err_str.contains("Internal error second query")); + + Ok(()) + } } diff --git a/src/spanner/src/result_set.rs b/src/spanner/src/result_set.rs index 6bd848b175..2d775b7412 100644 --- a/src/spanner/src/result_set.rs +++ b/src/spanner/src/result_set.rs @@ -233,7 +233,29 @@ impl ResultSet { return Ok(()); } - Err(e) + // Check if this stream included an inlined BeginTransaction option + // and has not yet returned a transaction ID. If so, we explicitly + // begin the transaction and restart the stream. + let Some(ReadContextTransactionSelector::Lazy(lazy)) = &self.transaction_selector else { + return Err(e); + }; + let is_started = matches!( + &*lazy.lock().unwrap(), + crate::read_only_transaction::TransactionState::Started(_, _) + ); + if is_started { + return Err(e); + } + + self.transaction_selector + .as_ref() + .unwrap() + .begin_explicitly(&self.client) + .await?; + + self.partial_result_sets_buffer.clear(); + self.restart_stream().await?; + Ok(()) } fn handle_stream_end(&mut self) -> crate::Result> { @@ -281,15 +303,25 @@ impl ResultSet { (None, Some(mut m)) => { let transaction = m.transaction.take(); self.metadata = Some(ResultSetMetadata::new(Some(m))); - if let (Some(selector), Some(transaction)) = - (&self.transaction_selector, transaction) - { - selector.update( - transaction.id, - transaction - .read_timestamp - .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()), - ); + if let Some(selector) = &self.transaction_selector { + if let Some(transaction) = transaction { + selector.update( + transaction.id, + transaction + .read_timestamp + .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()), + ); + } else if let ReadContextTransactionSelector::Lazy(lazy) = selector { + let is_started = matches!( + &*lazy.lock().expect("transaction state mutex poisoned"), + crate::read_only_transaction::TransactionState::Started(_, _) + ); + if !is_started { + return Err(internal_error( + "Spanner failed to return a transaction ID for a query that included a BeginTransaction option", + )); + } + } } } } @@ -336,9 +368,15 @@ impl ResultSet { } async fn restart_stream(&mut self) -> crate::Result<()> { + // Get the latest transaction selector for this transaction. + let transaction_selector = self.transaction_selector.as_ref().map(|s| s.selector()); + match &mut self.operation { StreamOperation::Query(req) => { req.resume_token = self.last_resume_token.clone(); + req.transaction = transaction_selector + .clone() + .or_else(|| req.transaction.take()); let stream = self .client .spanner @@ -349,6 +387,9 @@ impl ResultSet { } StreamOperation::Read(req) => { req.resume_token = self.last_resume_token.clone(); + req.transaction = transaction_selector + .clone() + .or_else(|| req.transaction.take()); let stream = self .client .spanner @@ -465,6 +506,7 @@ pub(crate) mod tests { use super::*; use crate::client::Spanner; use gaxi::grpc::tonic::Response; + use google_cloud_auth::credentials::anonymous::Builder as Anonymous; use prost_types::Value; use spanner_grpc_mock::MockSpanner; use spanner_grpc_mock::google::spanner::v1::spanner_server::Spanner as SpannerTrait; @@ -528,7 +570,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await .expect("Failed to build client"); @@ -564,6 +606,39 @@ pub(crate) mod tests { assert!(next.is_none()); } + #[tokio::test] + async fn test_result_set_metadata() -> anyhow::Result<()> { + let mut rs = run_mock_query(vec![PartialResultSet { + metadata: metadata(2), + values: vec![string_val("a"), string_val("b")], + last: true, + ..Default::default() + }]) + .await; + + // Called before next() -> returns MetadataNotAvailable + let meta_err = rs.metadata(); + assert!(meta_err.is_err()); + assert!(matches!( + meta_err.unwrap_err(), + ResultSetError::MetadataNotAvailable + )); + + // Advance to fetch metadata + let _next = rs.next().await.expect("Expected a row")?; + + // Called after next() -> returns metadata + let meta = rs.metadata(); + assert!(meta.is_ok()); + let meta = meta.unwrap(); + assert_eq!( + meta.column_names(), + &["col0".to_string(), "col1".to_string()] + ); + + Ok(()) + } + #[tokio::test] async fn test_result_set_handle_partial_result_set_error() -> anyhow::Result<()> { let mut rs = run_mock_query(vec![PartialResultSet { @@ -586,6 +661,34 @@ pub(crate) mod tests { Ok(()) } + #[tokio::test] + async fn test_result_set_handle_partial_result_set_error_immediate() -> anyhow::Result<()> { + let mut rs = run_mock_query(vec![ + PartialResultSet { + values: vec![string_val("row1")], + ..Default::default() + }, + PartialResultSet { + resume_token: b"token".to_vec(), + ..Default::default() + }, + ]) + .await; + + let res = rs.next().await; + assert!(res.is_some(), "Expected an error but got None"); + let res = res.expect("Expected some response but got None"); + assert!(res.is_err(), "Expected an error but got Ok"); + let err_str = res.expect_err("Expected should be an error").to_string(); + assert!( + err_str.contains("First PartialResultSet did not contain metadata"), + "Expected error to contain 'First PartialResultSet did not contain metadata', but got '{}'", + err_str + ); + + Ok(()) + } + #[tokio::test] async fn test_result_set_stream_ended_with_chunked_value() -> anyhow::Result<()> { let mut rs = run_mock_query(vec![PartialResultSet { @@ -725,7 +828,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1081,7 +1184,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1137,7 +1240,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1207,7 +1310,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1296,7 +1399,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1375,7 +1478,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1427,7 +1530,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1448,4 +1551,368 @@ pub(crate) mod tests { Ok(()) } + + #[tokio::test] + async fn result_set_inline_begin_stream_error_fallback() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Stream yields an error on the first chunk before returning transaction metadata. + // E.g., INVALID_ARGUMENT because the query is malformed. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let stream = + tokio_stream::iter(vec![Err(Status::invalid_argument("Invalid query"))]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + // 2. The explicit BeginTransaction fallback gets triggered. + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| { + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: Some(prost_types::Timestamp { + seconds: 123456789, + nanos: 0, + }), + ..Default::default() + })) + }); + + // 3. The ResultSet gracefully restarts the stream using the transaction ID returned by BeginTransaction. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + // Ensure the explicitly yielded ID is routed into the new stream transaction selector + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id"), + } + + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + metadata: metadata(1), + values: vec![string_val("1")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let row1 = rs.next().await.ok_or_else(|| { + anyhow::anyhow!("Expected row returned successfully despite stream breaking") + })??; + assert_eq!( + row1.raw_values()[0].0, + string_val("1"), + "Verify the returned stream successfully resumed with the correct payload" + ); + + Ok(()) + } + + #[tokio::test] + async fn result_set_retry_inline_begin_transient_error() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial stream throws UNAVAILABLE before metadata. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let stream = + tokio_stream::iter(vec![Err(Status::unavailable("Transient network issue"))]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + // 2. We retry the stream since it was a transient error. + // The retry should use the same transaction selector as the original request. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Begin(_) => {} + _ => panic!("Expected Selector::Begin on stream retry"), + } + + let mut meta = metadata(1).unwrap(); + meta.transaction = Some(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: None, + ..Default::default() + }); + + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + metadata: Some(meta), + values: vec![string_val("1")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let row1 = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected stream to recover safely"))??; + assert_eq!( + row1.raw_values()[0].0, + string_val("1"), + "Verify resumed stream returns data" + ); + + Ok(()) + } + + #[tokio::test] + async fn result_set_retry_inline_begin_id_recovered() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Stream successfully returns metadata chunk then throws UNAVAILABLE on chunk 2. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let mut meta = metadata(1).unwrap(); + meta.transaction = Some(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: None, + ..Default::default() + }); + let stream = tokio_stream::iter(vec![ + Ok(PartialResultSet { + metadata: Some(meta), + values: vec![string_val("1")], + resume_token: b"token1".to_vec(), + ..Default::default() + }), + Err(Status::unavailable("Transient mid-stream network issue")), + ]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + // 2. Stream resumes using Selector::Id. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id on stream retry"), + } + + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + values: vec![string_val("2")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let row1 = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected stream row1 extracted"))??; + assert_eq!( + row1.raw_values()[0].0, + string_val("1"), + "Verified chunk 1 payload" + ); + let row2 = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected stream row2 recovered"))??; + assert_eq!( + row2.raw_values()[0].0, + string_val("2"), + "Verified chunk 2 reboot dynamically intercepted ID bounds correctly" + ); + + Ok(()) + } + + #[tokio::test] + async fn result_set_inline_begin_metadata_missing_transaction_fails() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial stream successfully returns metadata chunk but completely lacks the `Transaction` entity. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + metadata: metadata(1), // Missing `.transaction` natively + values: vec![string_val("1")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + // Use explicitly deferred Lazy begin transaction! + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let rs_result = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected explicit crash bound properly"))?; + assert!( + rs_result.is_err(), + "Securely aborted when metadata failed to package internal bounds properly" + ); + + let err_str = rs_result.unwrap_err().to_string(); + assert!( + err_str.contains("failed to return a transaction ID"), + "Caught implicit gap boundary: {}", + err_str + ); + + Ok(()) + } } diff --git a/tests/spanner/src/query.rs b/tests/spanner/src/query.rs index fe02427a19..0f1a54553e 100644 --- a/tests/spanner/src/query.rs +++ b/tests/spanner/src/query.rs @@ -246,6 +246,45 @@ async fn test_multi_use_read_only_transaction( Ok(()) } +pub async fn multi_use_read_only_transaction_invalid_query_fallback( + db_client: &DatabaseClient, +) -> anyhow::Result<()> { + // Start a multi-use read-only transaction with implicit begin. + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // Expect a read timestamp to NOT have been chosen yet. + assert!(tx.read_timestamp().is_none()); + + // Execute the first query with invalid syntax. + let rs_result = tx + .execute_query(Statement::builder("SELECT * FROM NonExistentTable").build()) + .await; + + assert!( + rs_result.is_err(), + "Expected an error from an invalid query" + ); + + // The read timestamp should now be available because the transaction + // fell back to an explicit BeginTransaction. + assert!(tx.read_timestamp().is_some()); + + // It should be possible to use the transaction. + let mut rs2 = tx + .execute_query(Statement::builder("SELECT 2 AS col_int").build()) + .await?; + + let row2 = rs2.next().await.transpose()?.expect("should yield a row"); + let val2 = row2.raw_values()[0].as_string(); + assert_eq!(val2, "2"); + + Ok(()) +} + fn verify_null_row(row: &google_cloud_spanner::client::Row) { let raw_values = row.raw_values(); assert_eq!(raw_values.len(), 20, "Row should have exactly 20 columns"); diff --git a/tests/spanner/tests/driver.rs b/tests/spanner/tests/driver.rs index d27cc10dc5..2492088f67 100644 --- a/tests/spanner/tests/driver.rs +++ b/tests/spanner/tests/driver.rs @@ -26,6 +26,10 @@ mod spanner { integration_tests_spanner::query::query_with_parameters(&db_client).await?; integration_tests_spanner::query::result_set_metadata(&db_client).await?; integration_tests_spanner::query::multi_use_read_only_transaction(&db_client).await?; + integration_tests_spanner::query::multi_use_read_only_transaction_invalid_query_fallback( + &db_client, + ) + .await?; Ok(()) } From a0cfa919c19fbb23142f4c503ec46a755ccbb6ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Thu, 2 Apr 2026 11:26:16 +0200 Subject: [PATCH 5/8] perf(spanner): inline begin transaction error handling Adds error handling for inline-begin-transaction. If the first statement in a transaction fails, and that statement included a BeginTransaction option, then the transaction has not been started. In order to keep the semantics of the transaction consistent for an 'outside observer', we need to do the following: 1. Catch the error that was thrown by the initial statement. 2. Start the transaction using an explicit BeginTransaction RPC. 3. Retry the initial statement, but now using the transaction ID from step 2. 4. Return the error or result for the retried initial statement. The above makes sure that: 1. The transaction is actually started when the first statement is executed, also when the statement failed. 2. The statement becomes part of the transaction, and the result of the statement is consistent with the read-timestamp of the transaction. The second part is important in order to comply with Spanner's strong consistency guarantees; If for example a statement returns a 'Table not found' error, then that error is only valid for the read timestamp that was used for executing the statement. This is the reason that we retry the statement after the BeginTransaction RPC to be able to return a result that is guaranteed to be consistent with any other queries/reads that will be executed in the same transaction. --- src/spanner/src/read_only_transaction.rs | 341 +++++++++++++++++++++++ tests/spanner/src/query.rs | 36 +++ 2 files changed, 377 insertions(+) diff --git a/src/spanner/src/read_only_transaction.rs b/src/spanner/src/read_only_transaction.rs index 67ca13feb4..4856a0495f 100644 --- a/src/spanner/src/read_only_transaction.rs +++ b/src/spanner/src/read_only_transaction.rs @@ -1425,4 +1425,345 @@ pub(crate) mod tests { Ok(()) } + + #[tokio::test] + async fn inline_begin_failure_retry_success() -> anyhow::Result<()> { + use crate::value::Value; + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + assert_eq!( + req.session, + "projects/p/instances/i/databases/d/sessions/123" + ); + // Return a transaction with ID + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: Some(prost_types::Timestamp { + seconds: 123456789, + nanos: 0, + }), + ..Default::default() + })) + }); + + // 3. Retry of the query succeeds + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + // Ensure it uses the new transaction ID + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok( + setup_select1(), + )])))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let mut rs = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await?; + + let row = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected a row but stream cleanly exhausted"))??; + assert_eq!( + row.raw_values(), + [Value(string_val("1"))], + "The parsed row value safely matched the underlying stream chunk" + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_failure_retry_failure() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error first"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| { + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: Some(prost_types::Timestamp { + seconds: 123456789, + nanos: 0, + }), + ..Default::default() + })) + }); + + // 3. Retry of the query fails again + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error second"))); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!( + rs_result.is_err(), + "The failed execution bubbled upwards securely" + ); + let err_str = rs_result.unwrap_err().to_string(); + assert!( + err_str.contains("Internal error second"), + "Secondary error message accurately propagates: {}", + err_str + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_failure_fallback_rpc_fails() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error query"))); + + // 2. Explicit begin transaction fails + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error begin tx"))); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!( + rs_result.is_err(), + "The explicitly errored fallback boot securely propagated outwards" + ); + let err_str = rs_result.unwrap_err().to_string(); + assert!( + err_str.contains("Internal error begin tx"), + "Natively propagated specific BeginTx bounds: {}", + err_str + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_read_failure_retry_success() -> anyhow::Result<()> { + use crate::client::{KeySet, ReadRequest}; + use crate::value::Value; + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial read fails + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| { + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: None, + ..Default::default() + })) + }); + + // 3. Retry of the read succeeds + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + // Ensure it uses the new transaction ID + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok( + setup_select1(), + )])))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let read = ReadRequest::builder("Users", vec!["Id", "Name"]) + .with_keys(KeySet::all()) + .build(); + let mut rs = tx.execute_read(read).await?; + + let row = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected a row uniquely returned"))??; + assert_eq!( + row.raw_values(), + [Value(string_val("1"))], + "The macro correctly unpacked read arrays seamlessly" + ); + + Ok(()) + } + + #[tokio::test] + async fn single_use_query_send_error_returns_immediately() -> anyhow::Result<()> { + use crate::client::Statement; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + + mock.expect_execute_streaming_sql() + .times(1) + .returning(|_| Err(Status::internal("Internal error single use query"))); + + mock.expect_begin_transaction().never(); + + let (db_client, _server) = setup_db_client(mock).await; + // single_use creates a Fixed selector + let tx = db_client.single_use().build(); + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!(rs_result.is_err()); + let err_str = rs_result.unwrap_err().to_string(); + assert!(err_str.contains("Internal error single use query")); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_already_started_query_send_error_returns_immediately() + -> anyhow::Result<()> { + use crate::client::Statement; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + mock.expect_begin_transaction().never(); + + // 1. First query executes successfully and implicitly starts the transaction. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(move |_req| { + let mut rs = setup_select1(); + rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction { + id: vec![4, 5, 6], + read_timestamp: None, + ..Default::default() + }); + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(rs)]), + ))) + }); + + // 2. Second query fails immediately upon send() + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error second query"))); + + let (db_client, _server) = setup_db_client(mock).await; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // Run first query (starts tx) + let mut rs = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await?; + let _ = rs.next().await.expect("has row")?; + + // Run second query (fails) + let rs_result = tx + .execute_query(Statement::builder("SELECT 2").build()) + .await; + + assert!(rs_result.is_err()); + let err_str = rs_result.unwrap_err().to_string(); + assert!(err_str.contains("Internal error second query")); + + Ok(()) + } } diff --git a/tests/spanner/src/query.rs b/tests/spanner/src/query.rs index 0f1a54553e..b62b84a99c 100644 --- a/tests/spanner/src/query.rs +++ b/tests/spanner/src/query.rs @@ -285,6 +285,42 @@ pub async fn multi_use_read_only_transaction_invalid_query_fallback( Ok(()) } +pub async fn multi_use_read_only_transaction_invalid_query_fallback( + db_client: &DatabaseClient, +) -> anyhow::Result<()> { + // Start a multi-use read-only transaction with implicit begin. + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // Expect a read timestamp to NOT have been chosen yet. + assert!(tx.read_timestamp().is_none()); + + // Execute the first query with invalid syntax. + let rs_result = tx + .execute_query(Statement::builder("SELECT * FROM NonExistentTable").build()) + .await; + + assert!(rs_result.is_err(), "Expected an error from an invalid query"); + + // The read timestamp should now be available because the transaction + // fell back to an explicit BeginTransaction. + assert!(tx.read_timestamp().is_some()); + + // It should be possible to use the transaction. + let mut rs2 = tx + .execute_query(Statement::builder("SELECT 2 AS col_int").build()) + .await?; + + let row2 = rs2.next().await.transpose()?.expect("should yield a row"); + let val2 = row2.raw_values()[0].as_string(); + assert_eq!(val2, "2"); + + Ok(()) +} + fn verify_null_row(row: &google_cloud_spanner::client::Row) { let raw_values = row.raw_values(); assert_eq!(raw_values.len(), 20, "Row should have exactly 20 columns"); From 0efcab82a6697fb7e2cc4b979ab83c253e9d8628 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Thu, 2 Apr 2026 20:49:19 +0200 Subject: [PATCH 6/8] test(spanner): add integration test for inline-begin error handling Adds an integration test for error handling for inline-begin-transaction. This test uses a gRPC proxy to intercept calls from the client to Spanner to be able to deterministically emulate specific concurrency issues. This test shows how a query that failed during the first attempt, and thereby also failed to start the transaction, could succeed during a retry after the transaction has been started with an explicit BeginTransaction RPC. --- Cargo.lock | 3 + deny.toml | 1 + src/spanner/grpc-mock/src/lib.rs | 1 + src/spanner/src/read_only_transaction.rs | 341 ----------------------- tests/spanner/Cargo.toml | 3 + tests/spanner/src/client.rs | 70 ++++- tests/spanner/src/lib.rs | 1 + tests/spanner/src/query.rs | 184 +++++++++--- tests/spanner/src/test_proxy.rs | 269 ++++++++++++++++++ tests/spanner/tests/driver.rs | 1 + 10 files changed, 487 insertions(+), 387 deletions(-) create mode 100644 tests/spanner/src/test_proxy.rs diff --git a/Cargo.lock b/Cargo.lock index ec1f0475c9..52c1732b1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5914,7 +5914,10 @@ dependencies = [ "prost-types", "reqwest 0.13.2", "serde_json", + "spanner-grpc-mock", "tokio", + "tokio-stream", + "tonic", "tracing", ] diff --git a/deny.toml b/deny.toml index fa61a89904..67b8359d88 100644 --- a/deny.toml +++ b/deny.toml @@ -115,6 +115,7 @@ wrappers = [ # Use in tests is fine. "grpc-server", "integration-tests-o11y", + "integration-tests-spanner", "pubsub-grpc-mock", "spanner-grpc-mock", "storage-grpc-mock", diff --git a/src/spanner/grpc-mock/src/lib.rs b/src/spanner/grpc-mock/src/lib.rs index 72b19b07bd..10a3fb74c6 100644 --- a/src/spanner/grpc-mock/src/lib.rs +++ b/src/spanner/grpc-mock/src/lib.rs @@ -64,6 +64,7 @@ pub mod google { include!("generated/protos/google.rpc.rs"); } pub mod spanner { + #[allow(rustdoc::broken_intra_doc_links, rustdoc::bare_urls)] pub mod v1 { include!("generated/protos/google.spanner.v1.rs"); } diff --git a/src/spanner/src/read_only_transaction.rs b/src/spanner/src/read_only_transaction.rs index 4856a0495f..67ca13feb4 100644 --- a/src/spanner/src/read_only_transaction.rs +++ b/src/spanner/src/read_only_transaction.rs @@ -1425,345 +1425,4 @@ pub(crate) mod tests { Ok(()) } - - #[tokio::test] - async fn inline_begin_failure_retry_success() -> anyhow::Result<()> { - use crate::value::Value; - use gaxi::grpc::tonic::Response; - use gaxi::grpc::tonic::Status; - - let mut mock = create_session_mock(); - let mut seq = mockall::Sequence::new(); - - // 1. Initial query fails - mock.expect_execute_streaming_sql() - .times(1) - .in_sequence(&mut seq) - .returning(|_| Err(Status::internal("Internal error"))); - - // 2. Explicit begin transaction succeeds - mock.expect_begin_transaction() - .times(1) - .in_sequence(&mut seq) - .returning(|req| { - let req = req.into_inner(); - assert_eq!( - req.session, - "projects/p/instances/i/databases/d/sessions/123" - ); - // Return a transaction with ID - Ok(Response::new(mock_v1::Transaction { - id: vec![7, 8, 9], - read_timestamp: Some(prost_types::Timestamp { - seconds: 123456789, - nanos: 0, - }), - ..Default::default() - })) - }); - - // 3. Retry of the query succeeds - mock.expect_execute_streaming_sql() - .times(1) - .in_sequence(&mut seq) - .returning(|req| { - let req = req.into_inner(); - // Ensure it uses the new transaction ID - match req.transaction.unwrap().selector.unwrap() { - mock_v1::transaction_selector::Selector::Id(id) => { - assert_eq!(id, vec![7, 8, 9]); - } - _ => panic!("Expected Selector::Id"), - } - Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok( - setup_select1(), - )])))) - }); - - let (db_client, _server) = setup_db_client(mock).await; - let tx = db_client - .read_only_transaction() - .with_explicit_begin_transaction(false) - .build() - .await?; - - let mut rs = tx - .execute_query(Statement::builder("SELECT 1").build()) - .await?; - - let row = rs - .next() - .await - .ok_or_else(|| anyhow::anyhow!("Expected a row but stream cleanly exhausted"))??; - assert_eq!( - row.raw_values(), - [Value(string_val("1"))], - "The parsed row value safely matched the underlying stream chunk" - ); - - Ok(()) - } - - #[tokio::test] - async fn inline_begin_failure_retry_failure() -> anyhow::Result<()> { - use gaxi::grpc::tonic::Response; - use gaxi::grpc::tonic::Status; - - let mut mock = create_session_mock(); - let mut seq = mockall::Sequence::new(); - - // 1. Initial query fails - mock.expect_execute_streaming_sql() - .times(1) - .in_sequence(&mut seq) - .returning(|_| Err(Status::internal("Internal error first"))); - - // 2. Explicit begin transaction succeeds - mock.expect_begin_transaction() - .times(1) - .in_sequence(&mut seq) - .returning(|_| { - Ok(Response::new(mock_v1::Transaction { - id: vec![7, 8, 9], - read_timestamp: Some(prost_types::Timestamp { - seconds: 123456789, - nanos: 0, - }), - ..Default::default() - })) - }); - - // 3. Retry of the query fails again - mock.expect_execute_streaming_sql() - .times(1) - .in_sequence(&mut seq) - .returning(|_| Err(Status::internal("Internal error second"))); - - let (db_client, _server) = setup_db_client(mock).await; - let tx = db_client - .read_only_transaction() - .with_explicit_begin_transaction(false) - .build() - .await?; - - let rs_result = tx - .execute_query(Statement::builder("SELECT 1").build()) - .await; - - assert!( - rs_result.is_err(), - "The failed execution bubbled upwards securely" - ); - let err_str = rs_result.unwrap_err().to_string(); - assert!( - err_str.contains("Internal error second"), - "Secondary error message accurately propagates: {}", - err_str - ); - - Ok(()) - } - - #[tokio::test] - async fn inline_begin_failure_fallback_rpc_fails() -> anyhow::Result<()> { - use gaxi::grpc::tonic::Status; - - let mut mock = create_session_mock(); - let mut seq = mockall::Sequence::new(); - - // 1. Initial query fails - mock.expect_execute_streaming_sql() - .times(1) - .in_sequence(&mut seq) - .returning(|_| Err(Status::internal("Internal error query"))); - - // 2. Explicit begin transaction fails - mock.expect_begin_transaction() - .times(1) - .in_sequence(&mut seq) - .returning(|_| Err(Status::internal("Internal error begin tx"))); - - let (db_client, _server) = setup_db_client(mock).await; - let tx = db_client - .read_only_transaction() - .with_explicit_begin_transaction(false) - .build() - .await?; - - let rs_result = tx - .execute_query(Statement::builder("SELECT 1").build()) - .await; - - assert!( - rs_result.is_err(), - "The explicitly errored fallback boot securely propagated outwards" - ); - let err_str = rs_result.unwrap_err().to_string(); - assert!( - err_str.contains("Internal error begin tx"), - "Natively propagated specific BeginTx bounds: {}", - err_str - ); - - Ok(()) - } - - #[tokio::test] - async fn inline_begin_read_failure_retry_success() -> anyhow::Result<()> { - use crate::client::{KeySet, ReadRequest}; - use crate::value::Value; - use gaxi::grpc::tonic::Response; - use gaxi::grpc::tonic::Status; - - let mut mock = create_session_mock(); - let mut seq = mockall::Sequence::new(); - - // 1. Initial read fails - mock.expect_streaming_read() - .times(1) - .in_sequence(&mut seq) - .returning(|_| Err(Status::internal("Internal error"))); - - // 2. Explicit begin transaction succeeds - mock.expect_begin_transaction() - .times(1) - .in_sequence(&mut seq) - .returning(|_| { - Ok(Response::new(mock_v1::Transaction { - id: vec![7, 8, 9], - read_timestamp: None, - ..Default::default() - })) - }); - - // 3. Retry of the read succeeds - mock.expect_streaming_read() - .times(1) - .in_sequence(&mut seq) - .returning(|req| { - let req = req.into_inner(); - // Ensure it uses the new transaction ID - match req.transaction.unwrap().selector.unwrap() { - mock_v1::transaction_selector::Selector::Id(id) => { - assert_eq!(id, vec![7, 8, 9]); - } - _ => panic!("Expected Selector::Id"), - } - Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok( - setup_select1(), - )])))) - }); - - let (db_client, _server) = setup_db_client(mock).await; - let tx = db_client - .read_only_transaction() - .with_explicit_begin_transaction(false) - .build() - .await?; - - let read = ReadRequest::builder("Users", vec!["Id", "Name"]) - .with_keys(KeySet::all()) - .build(); - let mut rs = tx.execute_read(read).await?; - - let row = rs - .next() - .await - .ok_or_else(|| anyhow::anyhow!("Expected a row uniquely returned"))??; - assert_eq!( - row.raw_values(), - [Value(string_val("1"))], - "The macro correctly unpacked read arrays seamlessly" - ); - - Ok(()) - } - - #[tokio::test] - async fn single_use_query_send_error_returns_immediately() -> anyhow::Result<()> { - use crate::client::Statement; - use gaxi::grpc::tonic::Status; - - let mut mock = create_session_mock(); - - mock.expect_execute_streaming_sql() - .times(1) - .returning(|_| Err(Status::internal("Internal error single use query"))); - - mock.expect_begin_transaction().never(); - - let (db_client, _server) = setup_db_client(mock).await; - // single_use creates a Fixed selector - let tx = db_client.single_use().build(); - - let rs_result = tx - .execute_query(Statement::builder("SELECT 1").build()) - .await; - - assert!(rs_result.is_err()); - let err_str = rs_result.unwrap_err().to_string(); - assert!(err_str.contains("Internal error single use query")); - - Ok(()) - } - - #[tokio::test] - async fn inline_begin_already_started_query_send_error_returns_immediately() - -> anyhow::Result<()> { - use crate::client::Statement; - use gaxi::grpc::tonic::Status; - use spanner_grpc_mock::google::spanner::v1 as mock_v1; - - let mut mock = create_session_mock(); - let mut seq = mockall::Sequence::new(); - - mock.expect_begin_transaction().never(); - - // 1. First query executes successfully and implicitly starts the transaction. - mock.expect_execute_streaming_sql() - .times(1) - .in_sequence(&mut seq) - .returning(move |_req| { - let mut rs = setup_select1(); - rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction { - id: vec![4, 5, 6], - read_timestamp: None, - ..Default::default() - }); - Ok(gaxi::grpc::tonic::Response::new(Box::pin( - tokio_stream::iter(vec![Ok(rs)]), - ))) - }); - - // 2. Second query fails immediately upon send() - mock.expect_execute_streaming_sql() - .times(1) - .in_sequence(&mut seq) - .returning(|_| Err(Status::internal("Internal error second query"))); - - let (db_client, _server) = setup_db_client(mock).await; - - let tx = db_client - .read_only_transaction() - .with_explicit_begin_transaction(false) - .build() - .await?; - - // Run first query (starts tx) - let mut rs = tx - .execute_query(Statement::builder("SELECT 1").build()) - .await?; - let _ = rs.next().await.expect("has row")?; - - // Run second query (fails) - let rs_result = tx - .execute_query(Statement::builder("SELECT 2").build()) - .await; - - assert!(rs_result.is_err()); - let err_str = rs_result.unwrap_err().to_string(); - assert!(err_str.contains("Internal error second query")); - - Ok(()) - } } diff --git a/tests/spanner/Cargo.toml b/tests/spanner/Cargo.toml index 461c18d110..30b6625f58 100644 --- a/tests/spanner/Cargo.toml +++ b/tests/spanner/Cargo.toml @@ -36,7 +36,10 @@ google-cloud-test-utils = { workspace = true } prost-types.workspace = true reqwest = { workspace = true, features = ["json"] } serde_json = { workspace = true } +spanner-grpc-mock = { path = "../../src/spanner/grpc-mock" } tokio = { workspace = true, features = ["sync"] } +tokio-stream = { workspace = true } +tonic = { workspace = true } tracing.workspace = true [lints] diff --git a/tests/spanner/src/client.rs b/tests/spanner/src/client.rs index 7b05ecf3c5..3516bc011c 100644 --- a/tests/spanner/src/client.rs +++ b/tests/spanner/src/client.rs @@ -14,6 +14,8 @@ use google_cloud_spanner::client::{KeySet, Mutation, Spanner}; use google_cloud_test_utils::resource_names::LowercaseAlphanumeric; +use std::time::Duration; +use tokio::time::sleep; const PROJECT_ID: &str = "test-project"; const INSTANCE_ID: &str = "test-instance"; @@ -40,7 +42,7 @@ pub async fn wait_for_emulator(endpoint: &str) { static PROVISION_EMULATOR: tokio::sync::OnceCell<()> = tokio::sync::OnceCell::const_new(); static DATABASE_ID: tokio::sync::OnceCell = tokio::sync::OnceCell::const_new(); -async fn get_database_id() -> &'static str { +pub async fn get_database_id() -> &'static str { DATABASE_ID .get_or_init(|| async { std::env::var("SPANNER_EMULATOR_TEST_DB") @@ -59,16 +61,19 @@ pub async fn provision_emulator(endpoint: &str) { .await; } +pub fn get_emulator_rest_endpoint(grpc_endpoint: &str) -> String { + let rest_endpoint = std::env::var("SPANNER_EMULATOR_REST_HOST") + .unwrap_or_else(|_| grpc_endpoint.replace("9010", "9020")); + if rest_endpoint.starts_with("http://") || rest_endpoint.starts_with("https://") { + rest_endpoint + } else { + format!("http://{}", rest_endpoint) + } +} + async fn do_provision_emulator(endpoint: &str) { // TODO(#4973): Re-write this to use the admin clients once those also support the Emulator. - let rest_endpoint = std::env::var("SPANNER_EMULATOR_REST_HOST") - .unwrap_or_else(|_| endpoint.replace("9010", "9020")); - let rest_endpoint = - if rest_endpoint.starts_with("http://") || rest_endpoint.starts_with("https://") { - rest_endpoint - } else { - format!("http://{}", rest_endpoint) - }; + let rest_endpoint = get_emulator_rest_endpoint(endpoint); let client = reqwest::Client::new(); // Create a test instance and ignore any ALREADY_EXISTS errors. @@ -196,3 +201,50 @@ pub async fn create_database_client() -> Option anyhow::Result<()> { + let emulator_host = get_emulator_host().expect("SPANNER_EMULATOR_HOST must be set"); + let rest_endpoint = get_emulator_rest_endpoint(&emulator_host); + let db_path = format!( + "projects/{}/instances/{}/databases/{}", + PROJECT_ID, + INSTANCE_ID, + get_database_id().await + ); + let url = format!("{}/v1/{}/ddl", rest_endpoint, db_path); + let client = reqwest::Client::new(); + let payload = serde_json::json!({ + "statements": [statement] + }); + + let mut attempts = 0; + const MAX_ATTEMPTS: u32 = 25; + + loop { + attempts += 1; + let res = client.patch(&url).json(&payload).send().await?; + + let status = res.status(); + let text = res.text().await?; + + if status.is_success() { + return Ok(()); + } + + // Check if the error is the specific one we want to retry. + // Code 9 is FailedPrecondition. + if text.contains("\"code\":9") && text.contains("Schema change operation rejected") { + if attempts >= MAX_ATTEMPTS { + anyhow::bail!( + "Failed to update DDL after {} attempts. Last error: {}", + attempts, + text + ); + } + sleep(Duration::from_millis(100)).await; + continue; + } + + anyhow::bail!("Failed to update DDL: status={}, body={}", status, text); + } +} diff --git a/tests/spanner/src/lib.rs b/tests/spanner/src/lib.rs index 2be88b7a18..d8a95bf9c1 100644 --- a/tests/spanner/src/lib.rs +++ b/tests/spanner/src/lib.rs @@ -18,4 +18,5 @@ pub mod partitioned_dml; pub mod query; pub mod read; pub mod read_write_transaction; +pub mod test_proxy; pub mod write; diff --git a/tests/spanner/src/query.rs b/tests/spanner/src/query.rs index b62b84a99c..58c785c0e1 100644 --- a/tests/spanner/src/query.rs +++ b/tests/spanner/src/query.rs @@ -12,7 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use google_cloud_spanner::client::{DatabaseClient, Kind, Statement}; +use crate::client::{get_database_id, get_emulator_host}; +use crate::test_proxy::{InterceptedSpanner, SpannerInterceptor}; +use google_cloud_spanner::client::{DatabaseClient, Kind, Spanner, Statement}; +use google_cloud_test_utils::resource_names::LowercaseAlphanumeric; +use spanner_grpc_mock::google::spanner::v1 as spanner_v1; +use spanner_grpc_mock::google::spanner::v1::spanner_client::SpannerClient; +use spanner_grpc_mock::google::spanner::v1::spanner_server::SpannerServer; +use std::sync::Arc; +use tokio::net::TcpListener; +use tokio::sync::Notify; +use tokio_stream::wrappers::TcpListenerStream; +use tonic::transport::{Channel, Server}; pub async fn simple_query(db_client: &DatabaseClient) -> anyhow::Result<()> { let rot = db_client.single_use().build(); @@ -285,42 +296,6 @@ pub async fn multi_use_read_only_transaction_invalid_query_fallback( Ok(()) } -pub async fn multi_use_read_only_transaction_invalid_query_fallback( - db_client: &DatabaseClient, -) -> anyhow::Result<()> { - // Start a multi-use read-only transaction with implicit begin. - let tx = db_client - .read_only_transaction() - .with_explicit_begin_transaction(false) - .build() - .await?; - - // Expect a read timestamp to NOT have been chosen yet. - assert!(tx.read_timestamp().is_none()); - - // Execute the first query with invalid syntax. - let rs_result = tx - .execute_query(Statement::builder("SELECT * FROM NonExistentTable").build()) - .await; - - assert!(rs_result.is_err(), "Expected an error from an invalid query"); - - // The read timestamp should now be available because the transaction - // fell back to an explicit BeginTransaction. - assert!(tx.read_timestamp().is_some()); - - // It should be possible to use the transaction. - let mut rs2 = tx - .execute_query(Statement::builder("SELECT 2 AS col_int").build()) - .await?; - - let row2 = rs2.next().await.transpose()?.expect("should yield a row"); - let val2 = row2.raw_values()[0].as_string(); - assert_eq!(val2, "2"); - - Ok(()) -} - fn verify_null_row(row: &google_cloud_spanner::client::Row) { let raw_values = row.raw_values(); assert_eq!(raw_values.len(), 20, "Row should have exactly 20 columns"); @@ -444,3 +419,138 @@ fn verify_row_2(row: &google_cloud_spanner::client::Row) { "2026-03-11T16:20:00Z" ); } + +struct DelayedBeginProxy { + emulator_client: SpannerClient, + latch: Arc, + begin_transaction_entered_latch: Arc, +} + +#[tonic::async_trait] +impl SpannerInterceptor for DelayedBeginProxy { + fn emulator_client(&self) -> SpannerClient { + self.emulator_client.clone() + } + + async fn begin_transaction( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.begin_transaction_entered_latch.notify_one(); + self.latch.notified().await; + self.emulator_client().begin_transaction(request).await + } +} + +// This test verifies that the client correctly falls back to `BeginTransaction` when the +// first statement in a transaction fails. It also shows that the statement is retried and +// could (theoretically) succeed during this retry. It achieves this by doing the following: +// 1. It uses a proxy that allows it to intercept the RPCs that are being sent to Spanner. +// 2. It creates a read-only transaction that uses inline-begin-transaction. +// 3. It executes a query that tries to read from a table that does not exist. +// 4. As the first statement in the transaction fails, the client falls back to using +// an explicit BeginTransaction RPC. +// 5. The proxy blocks this BeginTransaction RPC, and in the meantime the test creates +// the missing table. +// 6. The proxy unblocks the BeginTransaction RPC. +// 7. The statement is retried and succeeds. The test never sees the error. +// +// This test might seem like an extreme corner case for a read-only transaction like this. +// However, for read/write transactions, similar types of failures are more likely to occur, +// for example if a transaction tries to insert a row that violates the primary key. Another +// transaction could delete the row in the time between the first attempt failed, and the +// BeginTransaction RPC has been executed. +pub async fn inline_begin_fallback(_db_client: &DatabaseClient) -> anyhow::Result<()> { + let emulator_host = get_emulator_host().expect("SPANNER_EMULATOR_HOST must be set"); + let latch = Arc::new(Notify::new()); + let begin_transaction_entered_latch = Arc::new(Notify::new()); + + // Create a raw gRPC client that connects to the Spanner Emulator. + // This will be used by the proxy server to forward requests to the Emulator. + let endpoint = Channel::from_shared(format!("http://{}", emulator_host))? + .connect() + .await?; + let raw_client = SpannerClient::new(endpoint); + + // Create a local TCP listener to bind our proxy server to. + let listener = TcpListener::bind("127.0.0.1:0").await?; + let local_addr = listener.local_addr()?; + let proxy_address = format!("{}:{}", local_addr.ip(), local_addr.port()); + + let proxy = DelayedBeginProxy { + emulator_client: raw_client, + latch: Arc::clone(&latch), + begin_transaction_entered_latch: Arc::clone(&begin_transaction_entered_latch), + }; + + let _server_handle = tokio::spawn(async move { + let stream = TcpListenerStream::new(listener); + Server::builder() + .add_service(SpannerServer::new(InterceptedSpanner(proxy))) + .serve_with_incoming(stream) + .await + .expect("Proxy server failed"); + }); + + // We build the Spanner DatabaseClient pointing directly to our proxy address over HTTP. + let proxy_db_client = Spanner::builder() + .with_endpoint(format!("http://{}", proxy_address)) + .build() + .await? + .database_client(format!( + "projects/test-project/instances/test-instance/databases/{}", + get_database_id().await + )) + .build() + .await?; + + let tx = proxy_db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let table_name = LowercaseAlphanumeric.random_string(10); + let table_name = format!("LateLoadedTable_{}", table_name); + + // Create a task that tries to query the table before it exists. + // This will initially fail, and the client will fall back to using + // an explicit BeginTransaction RPC. The table will then be created + // BEFORE the BeginTransaction RPC is executed, which will cause the + // query to succeed when it is retried using the transaction ID that + // was returned by BeginTransaction. This task will never see the + // initial error, and instead it will seem like the query simply + // succeeded. + let query_task = tokio::spawn({ + let table_name = table_name.clone(); + async move { + let stmt = Statement::builder(format!("SELECT * FROM {}", table_name)).build(); + let mut rs = tx.execute_query(stmt).await?; + let _ = rs.next().await; + Ok::<_, anyhow::Error>(tx) + } + }); + + // Wait until the query task above has been executed and has triggered an + // explicit BeginTransaction RPC. The BeginTransaction RPC is blocked until + // `latch` is notified. + begin_transaction_entered_latch.notified().await; + + // Create the table on the emulator while the BeginTransaction RPC is blocked. + let statement = format!("CREATE TABLE {} (Id INT64) PRIMARY KEY (Id)", table_name); + crate::client::update_database_ddl(statement).await?; + + // Unblock the BeginTransaction RPC. + latch.notify_one(); + + // Wait for the query task to complete. It should succeed and never see + // the initial error. + let tx = query_task.await??; + + assert!( + tx.read_timestamp().is_some(), + "The transaction should have a read timestamp" + ); + + Ok(()) +} diff --git a/tests/spanner/src/test_proxy.rs b/tests/spanner/src/test_proxy.rs new file mode 100644 index 0000000000..7f0a5837a2 --- /dev/null +++ b/tests/spanner/src/test_proxy.rs @@ -0,0 +1,269 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use spanner_grpc_mock::google::spanner::v1 as spanner_v1; +use spanner_grpc_mock::google::spanner::v1::spanner_client::SpannerClient; + +#[tonic::async_trait] +pub trait SpannerInterceptor: Send + Sync + 'static { + fn emulator_client(&self) -> SpannerClient; + + async fn create_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().create_session(request).await + } + + async fn batch_create_sessions( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + self.emulator_client().batch_create_sessions(request).await + } + + async fn get_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().get_session(request).await + } + + async fn list_sessions( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().list_sessions(request).await + } + + async fn delete_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().delete_session(request).await + } + + async fn execute_sql( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().execute_sql(request).await + } + + async fn execute_streaming_sql( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.emulator_client().execute_streaming_sql(request).await + } + + async fn execute_batch_dml( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + self.emulator_client().execute_batch_dml(request).await + } + + async fn read( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().read(request).await + } + + async fn streaming_read( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.emulator_client().streaming_read(request).await + } + + async fn begin_transaction( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().begin_transaction(request).await + } + + async fn commit( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().commit(request).await + } + + async fn rollback( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().rollback(request).await + } + + async fn partition_query( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().partition_query(request).await + } + + async fn partition_read( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.emulator_client().partition_read(request).await + } + + async fn batch_write( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.emulator_client().batch_write(request).await + } +} + +pub struct InterceptedSpanner(pub T); + +#[tonic::async_trait] +impl spanner_v1::spanner_server::Spanner for InterceptedSpanner { + async fn create_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.create_session(request).await + } + + async fn batch_create_sessions( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + self.0.batch_create_sessions(request).await + } + + async fn get_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.get_session(request).await + } + + async fn list_sessions( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.list_sessions(request).await + } + + async fn delete_session( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.delete_session(request).await + } + + async fn execute_sql( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.execute_sql(request).await + } + + type ExecuteStreamingSqlStream = tonic::codec::Streaming; + + async fn execute_streaming_sql( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.execute_streaming_sql(request).await + } + + async fn execute_batch_dml( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + self.0.execute_batch_dml(request).await + } + + async fn read( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.read(request).await + } + + type StreamingReadStream = tonic::codec::Streaming; + + async fn streaming_read( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.streaming_read(request).await + } + + async fn begin_transaction( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.begin_transaction(request).await + } + + async fn commit( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.commit(request).await + } + + async fn rollback( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.rollback(request).await + } + + async fn partition_query( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.partition_query(request).await + } + + async fn partition_read( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.partition_read(request).await + } + + type BatchWriteStream = tonic::codec::Streaming; + + async fn batch_write( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + self.0.batch_write(request).await + } +} diff --git a/tests/spanner/tests/driver.rs b/tests/spanner/tests/driver.rs index 2492088f67..4d45413e13 100644 --- a/tests/spanner/tests/driver.rs +++ b/tests/spanner/tests/driver.rs @@ -30,6 +30,7 @@ mod spanner { &db_client, ) .await?; + integration_tests_spanner::query::inline_begin_fallback(&db_client).await?; Ok(()) } From 2d24bcdfa00eabd51cae90a87052f778dc7c7051 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Tue, 14 Apr 2026 17:11:12 +0200 Subject: [PATCH 7/8] chore(spanner): add some additional comments --- tests/spanner/src/client.rs | 6 ++++++ tests/spanner/src/query.rs | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/tests/spanner/src/client.rs b/tests/spanner/src/client.rs index 3516bc011c..53f5acf67e 100644 --- a/tests/spanner/src/client.rs +++ b/tests/spanner/src/client.rs @@ -202,6 +202,12 @@ pub async fn create_database_client() -> Option anyhow::Result<()> { let emulator_host = get_emulator_host().expect("SPANNER_EMULATOR_HOST must be set"); let rest_endpoint = get_emulator_rest_endpoint(&emulator_host); diff --git a/tests/spanner/src/query.rs b/tests/spanner/src/query.rs index 58c785c0e1..e90bdecb95 100644 --- a/tests/spanner/src/query.rs +++ b/tests/spanner/src/query.rs @@ -420,6 +420,11 @@ fn verify_row_2(row: &google_cloud_spanner::client::Row) { ); } +/// A test proxy that intercepts and delays `BeginTransaction` requests. +/// +/// It notifies `begin_transaction_entered_latch` when a `BeginTransaction` request is received, +/// and blocks the request execution until `latch` is notified. This allows tests to +/// synchronize the order of execution of `BeginTransaction` with other operations. struct DelayedBeginProxy { emulator_client: SpannerClient, latch: Arc, From 1d21817e054dc334ac866b65bb00660484ac20c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Fri, 17 Apr 2026 10:38:37 +0200 Subject: [PATCH 8/8] test(spanner): automatically stop server when test finishes --- tests/spanner/src/client.rs | 23 +++++++++++++++++++++++ tests/spanner/src/query.rs | 22 ++++------------------ 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/tests/spanner/src/client.rs b/tests/spanner/src/client.rs index 53f5acf67e..90b0026836 100644 --- a/tests/spanner/src/client.rs +++ b/tests/spanner/src/client.rs @@ -14,7 +14,9 @@ use google_cloud_spanner::client::{KeySet, Mutation, Spanner}; use google_cloud_test_utils::resource_names::LowercaseAlphanumeric; +use spanner_grpc_mock::google::spanner::v1::spanner_server::Spanner as MockSpannerTrait; use std::time::Duration; +use tokio::task::JoinHandle; use tokio::time::sleep; const PROJECT_ID: &str = "test-project"; @@ -254,3 +256,24 @@ pub async fn update_database_ddl(statement: String) -> anyhow::Result<()> { anyhow::bail!("Failed to update DDL: status={}, body={}", status, text); } } + +/// A guard that aborts the server task when dropped. +pub struct ServerGuard(JoinHandle<()>); + +impl Drop for ServerGuard { + fn drop(&mut self) { + self.0.abort(); + } +} + +/// Helper to start a mock server and return a drop guard. +pub async fn start_guarded_server( + address: &str, + service: T, +) -> anyhow::Result<(String, ServerGuard)> +where + T: MockSpannerTrait + Send + 'static, +{ + let (uri, handle) = spanner_grpc_mock::start(address, service).await?; + Ok((uri, ServerGuard(handle))) +} diff --git a/tests/spanner/src/query.rs b/tests/spanner/src/query.rs index e90bdecb95..1e1cc67270 100644 --- a/tests/spanner/src/query.rs +++ b/tests/spanner/src/query.rs @@ -18,12 +18,9 @@ use google_cloud_spanner::client::{DatabaseClient, Kind, Spanner, Statement}; use google_cloud_test_utils::resource_names::LowercaseAlphanumeric; use spanner_grpc_mock::google::spanner::v1 as spanner_v1; use spanner_grpc_mock::google::spanner::v1::spanner_client::SpannerClient; -use spanner_grpc_mock::google::spanner::v1::spanner_server::SpannerServer; use std::sync::Arc; -use tokio::net::TcpListener; use tokio::sync::Notify; -use tokio_stream::wrappers::TcpListenerStream; -use tonic::transport::{Channel, Server}; +use tonic::transport::Channel; pub async fn simple_query(db_client: &DatabaseClient) -> anyhow::Result<()> { let rot = db_client.single_use().build(); @@ -477,29 +474,18 @@ pub async fn inline_begin_fallback(_db_client: &DatabaseClient) -> anyhow::Resul .await?; let raw_client = SpannerClient::new(endpoint); - // Create a local TCP listener to bind our proxy server to. - let listener = TcpListener::bind("127.0.0.1:0").await?; - let local_addr = listener.local_addr()?; - let proxy_address = format!("{}:{}", local_addr.ip(), local_addr.port()); - let proxy = DelayedBeginProxy { emulator_client: raw_client, latch: Arc::clone(&latch), begin_transaction_entered_latch: Arc::clone(&begin_transaction_entered_latch), }; - let _server_handle = tokio::spawn(async move { - let stream = TcpListenerStream::new(listener); - Server::builder() - .add_service(SpannerServer::new(InterceptedSpanner(proxy))) - .serve_with_incoming(stream) - .await - .expect("Proxy server failed"); - }); + let (proxy_uri, _guard) = + crate::client::start_guarded_server("127.0.0.1:0", InterceptedSpanner(proxy)).await?; // We build the Spanner DatabaseClient pointing directly to our proxy address over HTTP. let proxy_db_client = Spanner::builder() - .with_endpoint(format!("http://{}", proxy_address)) + .with_endpoint(proxy_uri) .build() .await? .database_client(format!(