From e7d34b069064aa93113ec4909cbcdb390160f94e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 1 Apr 2026 16:15:22 +0200 Subject: [PATCH 1/4] perf(spanner): inline BeginTransaction with first query (step 1) Adds support for inlining the BeginTransaction with the first query in a read-only transaction. This saves one round-trip to Spanner for multi-use read-only transactions. This implementation is intentionally simple: 1. It does not support parallel queries at the start of the transaction. 2. It does not include error handling for the first query. 3. It only supports read-only transactions. This is step 1. Follow-up pull requests addresses the above points. --- .../src/batch_read_only_transaction.rs | 11 +- src/spanner/src/read_only_transaction.rs | 269 ++++++++++++++++-- src/spanner/src/read_write_transaction.rs | 12 +- src/spanner/src/result_set.rs | 17 +- tests/spanner/src/query.rs | 29 +- 5 files changed, 299 insertions(+), 39 deletions(-) diff --git a/src/spanner/src/batch_read_only_transaction.rs b/src/spanner/src/batch_read_only_transaction.rs index c31447a5f3..179ac26dc2 100644 --- a/src/spanner/src/batch_read_only_transaction.rs +++ b/src/spanner/src/batch_read_only_transaction.rs @@ -43,7 +43,8 @@ pub struct BatchReadOnlyTransactionBuilder { impl BatchReadOnlyTransactionBuilder { pub(crate) fn new(client: DatabaseClient) -> Self { Self { - inner: MultiUseReadOnlyTransactionBuilder::new(client), + inner: MultiUseReadOnlyTransactionBuilder::new(client) + .with_explicit_begin_transaction(true), } } @@ -147,7 +148,7 @@ impl BatchReadOnlyTransaction { .clone() .into_partition_query_request() .set_session(self.inner.context.client.session.name.clone()) - .set_transaction(self.inner.context.transaction_selector.clone()) + .set_transaction(self.inner.context.transaction_selector.selector()) .set_partition_options(options); let response = self @@ -164,7 +165,7 @@ impl BatchReadOnlyTransaction { .map(|p| Partition { inner: PartitionedOperation::Query { partition_token: p.partition_token, - transaction_selector: self.inner.context.transaction_selector.clone(), + transaction_selector: self.inner.context.transaction_selector.selector(), session_name: self.inner.context.client.session.name.clone(), statement: statement.clone(), }, @@ -202,7 +203,7 @@ impl BatchReadOnlyTransaction { .clone() .into_partition_read_request() .set_session(self.inner.context.client.session.name.clone()) - .set_transaction(self.inner.context.transaction_selector.clone()) + .set_transaction(self.inner.context.transaction_selector.selector()) .set_partition_options(options); let response = self @@ -219,7 +220,7 @@ impl BatchReadOnlyTransaction { .map(|p| Partition { inner: PartitionedOperation::Read { partition_token: p.partition_token, - transaction_selector: self.inner.context.transaction_selector.clone(), + transaction_selector: self.inner.context.transaction_selector.selector(), session_name: self.inner.context.client.session.name.clone(), read_request: read.clone(), }, diff --git a/src/spanner/src/read_only_transaction.rs b/src/spanner/src/read_only_transaction.rs index 3f0df51a5c..44782326f3 100644 --- a/src/spanner/src/read_only_transaction.rs +++ b/src/spanner/src/read_only_transaction.rs @@ -19,6 +19,7 @@ use crate::precommit::PrecommitTokenTracker; use crate::result_set::{ResultSet, StreamOperation}; use crate::statement::Statement; use crate::timestamp_bound::TimestampBound; +use std::sync::{Arc, Mutex}; /// A builder for [SingleUseReadOnlyTransaction]. /// @@ -91,7 +92,10 @@ impl SingleUseReadOnlyTransactionBuilder { SingleUseReadOnlyTransaction { context: ReadContext { client: self.client, - transaction_selector, + transaction_selector: ReadContextTransactionSelector::Fixed( + transaction_selector, + None, + ), precommit_token_tracker: PrecommitTokenTracker::new_noop(), transaction_tag: None, }, @@ -204,6 +208,7 @@ impl SingleUseReadOnlyTransaction { pub struct MultiUseReadOnlyTransactionBuilder { client: DatabaseClient, timestamp_bound: Option, + explicit_begin: bool, } impl MultiUseReadOnlyTransactionBuilder { @@ -211,9 +216,44 @@ impl MultiUseReadOnlyTransactionBuilder { Self { client, timestamp_bound: None, + explicit_begin: false, } } + /// Sets whether the transaction should be explicitly started using a `BeginTransaction` RPC. + /// + /// # Example + /// ``` + /// # use google_cloud_spanner::client::Spanner; + /// # use google_cloud_spanner::client::Statement; + /// # async fn set_explicit_begin(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> { + /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?; + /// let transaction = db_client.read_only_transaction().with_explicit_begin_transaction(true).build().await?; + /// let statement = Statement::builder("SELECT * FROM users").build(); + /// let result_set = transaction.execute_query(statement).await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// By default, the Spanner client will inline the `BeginTransaction` call with the first query + /// in the transaction. This reduces the number of round-trips to Spanner that are needed for a + /// transaction. Setting this option to `true` can be beneficial for specific transaction shapes: + /// + /// 1. When the transaction executes multiple parallel queries at the start of the transaction. + /// Only one query can include a `BeginTransaction` option, and all other queries must wait for + /// the first query to return the first result before they can proceed to execute. A + /// `BeginTransaction` RPC will quickly return a transaction ID and allow all queries to start + /// execution in parallel once the transaction ID has been returned. + /// 2. When the first query in the transaction could fail. If the query fails, then it will also + /// not start a transaction and return a transaction ID. The transaction will then fall back to + /// executing a `BeginTransaction` RPC and retry the first query. + /// + /// Default is `false` (inline begin). + pub fn with_explicit_begin_transaction(mut self, explicit: bool) -> Self { + self.explicit_begin = explicit; + self + } + /// Sets the timestamp bound for the read-only transaction. /// /// # Example @@ -231,6 +271,29 @@ impl MultiUseReadOnlyTransactionBuilder { self } + async fn begin( + &self, + options: TransactionOptions, + ) -> crate::Result { + let request = crate::model::BeginTransactionRequest::default() + .set_session(self.client.session.name.clone()) + .set_options(options); + + // TODO(#4972): make request options configurable + let response = self + .client + .spanner + .begin_transaction(request, crate::RequestOptions::default()) + .await?; + + let transaction_selector = crate::model::TransactionSelector::default().set_id(response.id); + + Ok(ReadContextTransactionSelector::Fixed( + transaction_selector, + response.read_timestamp, + )) + } + /// Builds the [MultiUseReadOnlyTransaction] and starts the transaction /// by calling the `BeginTransaction` RPC. /// @@ -245,30 +308,27 @@ impl MultiUseReadOnlyTransactionBuilder { /// ``` pub async fn build(self) -> crate::Result { let read_only = ReadOnly::default().set_return_read_timestamp(true); - let read_only = match self.timestamp_bound { - Some(b) => read_only.set_timestamp_bound(b.0), + let read_only = match self.timestamp_bound.as_ref() { + Some(b) => read_only.set_timestamp_bound(b.0.clone()), None => read_only.set_strong(true), }; - let request = crate::model::BeginTransactionRequest::default() - .set_session(self.client.session.name.clone()) - .set_options(TransactionOptions::default().set_read_only(read_only)); + let options = TransactionOptions::default().set_read_only(read_only); - // TODO(#4972): make request options configurable - let response = self - .client - .spanner - .begin_transaction(request, crate::RequestOptions::default()) - .await?; + let selector = if self.explicit_begin { + self.begin(options).await? + } else { + ReadContextTransactionSelector::Lazy(Arc::new(Mutex::new( + TransactionState::NotStarted(options), + ))) + }; - let transaction_selector = crate::model::TransactionSelector::default().set_id(response.id); Ok(MultiUseReadOnlyTransaction { context: ReadContext { client: self.client, - transaction_selector, + transaction_selector: selector, precommit_token_tracker: PrecommitTokenTracker::new_noop(), transaction_tag: None, }, - read_timestamp: response.read_timestamp, }) } } @@ -297,13 +357,12 @@ impl MultiUseReadOnlyTransactionBuilder { #[derive(Debug)] pub struct MultiUseReadOnlyTransaction { pub(crate) context: ReadContext, - pub(crate) read_timestamp: Option, } impl MultiUseReadOnlyTransaction { /// Returns the read timestamp chosen for the transaction. pub fn read_timestamp(&self) -> Option { - self.read_timestamp + self.context.transaction_selector.read_timestamp() } /// Executes a query using this transaction. @@ -370,10 +429,71 @@ impl MultiUseReadOnlyTransaction { } } +#[derive(Clone, Debug)] +pub(crate) enum ReadContextTransactionSelector { + Fixed(crate::model::TransactionSelector, Option), + Lazy(Arc>), +} + +#[derive(Clone, Debug)] +pub(crate) enum TransactionState { + NotStarted(crate::model::TransactionOptions), + Started(crate::model::TransactionSelector, Option), +} + +impl TransactionState { + fn selector(&self) -> crate::model::TransactionSelector { + match self { + Self::Started(selector, _) => selector.clone(), + Self::NotStarted(options) => { + crate::model::TransactionSelector::default().set_begin(options.clone()) + } + } + } +} + +impl ReadContextTransactionSelector { + pub(crate) fn selector(&self) -> crate::model::TransactionSelector { + match self { + Self::Fixed(selector, _) => selector.clone(), + Self::Lazy(lazy) => lazy + .lock() + .expect("transaction state mutex poisoned") + .selector(), + } + } + + pub(crate) fn update(&self, id: bytes::Bytes, timestamp: Option) { + if let Self::Lazy(lazy) = self { + let mut guard = lazy.lock().expect("transaction state mutex poisoned"); + if matches!(&*guard, TransactionState::NotStarted(_)) { + *guard = TransactionState::Started( + crate::model::TransactionSelector::default().set_id(id), + timestamp, + ); + } + } + } + + pub(crate) fn read_timestamp(&self) -> Option { + match self { + Self::Fixed(_, timestamp) => *timestamp, + Self::Lazy(lazy) => { + let guard = lazy.lock().expect("transaction state mutex poisoned"); + if let TransactionState::Started(_, timestamp) = &*guard { + *timestamp + } else { + None + } + } + } + } +} + #[derive(Clone, Debug)] pub(crate) struct ReadContext { pub(crate) client: DatabaseClient, - pub(crate) transaction_selector: crate::model::TransactionSelector, + pub(crate) transaction_selector: ReadContextTransactionSelector, pub(crate) precommit_token_tracker: PrecommitTokenTracker, pub(crate) transaction_tag: Option, } @@ -405,7 +525,7 @@ impl ReadContext { .into() .into_request() .set_session(self.client.session.name.clone()) - .set_transaction(self.transaction_selector.clone()); + .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); let stream = self @@ -418,6 +538,7 @@ impl ReadContext { Ok(ResultSet::new( stream, + Some(self.transaction_selector.clone()), self.precommit_token_tracker.clone(), self.client.clone(), StreamOperation::Query(request), @@ -432,7 +553,7 @@ impl ReadContext { .into() .into_request() .set_session(self.client.session.name.clone()) - .set_transaction(self.transaction_selector.clone()); + .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); let stream = self @@ -445,6 +566,7 @@ impl ReadContext { Ok(ResultSet::new( stream, + Some(self.transaction_selector.clone()), self.precommit_token_tracker.clone(), self.client.clone(), StreamOperation::Read(request), @@ -525,9 +647,8 @@ pub(crate) mod tests { let (db_client, _server) = setup_db_client(mock).await; let tx = db_client.single_use().build(); - let ro = tx - .context - .transaction_selector + let selector = tx.context.transaction_selector.selector(); + let ro = selector .single_use() .expect("Expected SingleUse selector") .read_only() @@ -543,9 +664,8 @@ pub(crate) mod tests { std::time::Duration::from_secs(10), )) .build(); - let ro2 = tx2 - .context - .transaction_selector + let selector = tx2.context.transaction_selector.selector(); + let ro2 = selector .single_use() .expect("Expected SingleUse selector") .read_only() @@ -646,6 +766,7 @@ pub(crate) mod tests { let tx = db_client .read_only_transaction() + .with_explicit_begin_transaction(true) .build() .await .expect("Failed to start tx"); @@ -670,6 +791,102 @@ pub(crate) mod tests { } } + #[tokio::test] + async fn execute_multi_query_inline_begin() -> anyhow::Result<()> { + use super::super::result_set::tests::string_val; + use crate::client::Statement; + use crate::value::Value; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + + let mut mock = create_session_mock(); + + // No explicit begin_transaction should be called. + mock.expect_begin_transaction().never(); + + let mut seq = mockall::Sequence::new(); + + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + assert_eq!( + req.session, + "projects/p/instances/i/databases/d/sessions/123" + ); + + // First call: Should have Selector::Begin + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Begin(_) => {} + _ => panic!("Expected Selector::Begin"), + } + let mut rs = setup_select1(); + rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction { + id: vec![4, 5, 6], + read_timestamp: Some(prost_types::Timestamp { + seconds: 987654321, + nanos: 0, + }), + ..Default::default() + }); + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(rs)]), + ))) + }); + + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + // Second call: Should have Selector::Id using the ID returned in the first call + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![4, 5, 6]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(setup_select1())]), + ))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // The read timestamp is not available until the first query is executed. + assert!(tx.read_timestamp().is_none()); + + for i in 0..2 { + let mut rs = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await?; + + let row = rs.next().await.expect("Expected a row")?; + assert_eq!(row.raw_values(), [Value(string_val("1"))]); + + let result = rs.next().await; + assert!(result.is_none(), "Expected None, got {result:?}"); + + if i == 0 { + // Read timestamp becomes available. + assert_eq!( + tx.read_timestamp() + .expect("Expected read timestamp") + .seconds(), + 987654321 + ); + } + } + + Ok(()) + } + #[tokio::test] async fn execute_single_read() { use super::super::result_set::tests::string_val; diff --git a/src/spanner/src/read_write_transaction.rs b/src/spanner/src/read_write_transaction.rs index 9a84b1bb87..920d69bfda 100644 --- a/src/spanner/src/read_write_transaction.rs +++ b/src/spanner/src/read_write_transaction.rs @@ -100,7 +100,11 @@ impl ReadWriteTransactionBuilder { .begin_transaction(request, RequestOptions::default()) .await?; - let transaction_selector = TransactionSelector::default().set_id(response.id); + let transaction_selector = + crate::read_only_transaction::ReadContextTransactionSelector::Fixed( + TransactionSelector::default().set_id(response.id), + None, + ); Ok(ReadWriteTransaction { context: ReadContext { client: self.client.clone(), @@ -144,7 +148,7 @@ impl ReadWriteTransaction { .into() .into_request() .set_session(self.context.client.session.name.clone()) - .set_transaction(self.context.transaction_selector.clone()) + .set_transaction(self.context.transaction_selector.selector()) .set_seqno(seqno); request.request_options = self.context.amend_request_options(request.request_options); @@ -245,7 +249,7 @@ impl ReadWriteTransaction { let request = ExecuteBatchDmlRequest::default() .set_session(self.context.client.session.name.clone()) - .set_transaction(self.context.transaction_selector.clone()) + .set_transaction(self.context.transaction_selector.selector()) .set_seqno(seqno) .set_statements(statements) .set_or_clear_request_options( @@ -271,7 +275,7 @@ impl ReadWriteTransaction { } pub(crate) fn transaction_id(&self) -> crate::Result { - match &self.context.transaction_selector.selector { + match &self.context.transaction_selector.selector().selector { Some(Selector::Id(id)) => Ok(id.clone()), _ => Err(internal_error("Transaction ID is missing")), } diff --git a/src/spanner/src/result_set.rs b/src/spanner/src/result_set.rs index cfe0397cae..6bd848b175 100644 --- a/src/spanner/src/result_set.rs +++ b/src/spanner/src/result_set.rs @@ -16,6 +16,7 @@ use crate::database_client::DatabaseClient; use crate::error::internal_error; use crate::google::spanner::v1::PartialResultSet; use crate::precommit::PrecommitTokenTracker; +use crate::read_only_transaction::ReadContextTransactionSelector; use crate::result_set_metadata::ResultSetMetadata; use crate::row::Row; use crate::server_streaming::stream::PartialResultSetStream; @@ -58,6 +59,7 @@ pub struct ResultSet { safe_to_retry: bool, max_buffered_partial_result_sets: usize, retry_count: usize, + transaction_selector: Option, } /// Errors that can occur when interacting with a [`ResultSet`]. @@ -84,6 +86,7 @@ impl ResultSet { /// Creates a new result set. pub(crate) fn new( stream: PartialResultSetStream, + transaction_selector: Option, precommit_token_tracker: PrecommitTokenTracker, client: DatabaseClient, operation: StreamOperation, @@ -102,6 +105,7 @@ impl ResultSet { safe_to_retry: true, max_buffered_partial_result_sets: MAX_BUFFERED_PARTIAL_RESULT_SETS, retry_count: 0, + transaction_selector, } } @@ -274,8 +278,19 @@ impl ResultSet { (Some(_), Some(_)) => { return Err(internal_error("Additional metadata after first result set")); } - (None, Some(m)) => { + (None, Some(mut m)) => { + let transaction = m.transaction.take(); self.metadata = Some(ResultSetMetadata::new(Some(m))); + if let (Some(selector), Some(transaction)) = + (&self.transaction_selector, transaction) + { + selector.update( + transaction.id, + transaction + .read_timestamp + .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()), + ); + } } } diff --git a/tests/spanner/src/query.rs b/tests/spanner/src/query.rs index e38b51b989..fe02427a19 100644 --- a/tests/spanner/src/query.rs +++ b/tests/spanner/src/query.rs @@ -194,17 +194,40 @@ pub async fn result_set_metadata(db_client: &DatabaseClient) -> anyhow::Result<( } pub async fn multi_use_read_only_transaction(db_client: &DatabaseClient) -> anyhow::Result<()> { + for explicit_begin in [false, true] { + test_multi_use_read_only_transaction(db_client, explicit_begin).await?; + } + Ok(()) +} + +async fn test_multi_use_read_only_transaction( + db_client: &DatabaseClient, + explicit_begin: bool, +) -> anyhow::Result<()> { // Start a multi-use read-only transaction. - let tx = db_client.read_only_transaction().build().await?; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(explicit_begin) + .build() + .await?; - // Expect a read timestamp to have been chosen. - assert!(tx.read_timestamp().is_some()); + if explicit_begin { + // Expect a read timestamp to have been chosen immediately. + assert!(tx.read_timestamp().is_some()); + } else { + // Expect a read timestamp to NOT have been chosen yet. + assert!(tx.read_timestamp().is_none()); + } // Execute the first query. let mut rs1 = tx .execute_query(Statement::builder("SELECT 1 AS col_int").build()) .await?; let row1 = rs1.next().await.transpose()?.expect("should yield a row"); + + // The read timestamp is now always available. + assert!(tx.read_timestamp().is_some()); + let val1 = row1.raw_values()[0].as_string(); assert_eq!(val1, "1"); let next1 = rs1.next().await.transpose()?; From 57eb4035dd1fc002e80ca3033aed9a8a7e5133c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 8 Apr 2026 08:56:33 +0200 Subject: [PATCH 2/4] fix(spanner): modify constructor call in BatchReadOnlyTransaction --- src/spanner/src/batch_read_only_transaction.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/spanner/src/batch_read_only_transaction.rs b/src/spanner/src/batch_read_only_transaction.rs index 44ddb8cd07..ff59050803 100644 --- a/src/spanner/src/batch_read_only_transaction.rs +++ b/src/spanner/src/batch_read_only_transaction.rs @@ -16,7 +16,7 @@ use crate::database_client::DatabaseClient; use crate::model::PartitionOptions; use crate::precommit::PrecommitTokenTracker; use crate::read_only_transaction::{ - MultiUseReadOnlyTransaction, MultiUseReadOnlyTransactionBuilder, + MultiUseReadOnlyTransaction, MultiUseReadOnlyTransactionBuilder, ReadContextTransactionSelector, }; use crate::result_set::{ResultSet, StreamOperation}; use crate::statement::Statement; @@ -345,6 +345,10 @@ impl Partition { Ok(ResultSet::new( stream, + Some(ReadContextTransactionSelector::Fixed( + transaction_selector.clone(), + None, + )), PrecommitTokenTracker::new_noop(), client.clone(), StreamOperation::Query(request), @@ -374,6 +378,10 @@ impl Partition { Ok(ResultSet::new( stream, + Some(ReadContextTransactionSelector::Fixed( + transaction_selector.clone(), + None, + )), PrecommitTokenTracker::new_noop(), client.clone(), StreamOperation::Read(request), From d8af76e44d5049df1326f19625ed2c28c9f8daaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 8 Apr 2026 09:35:55 +0200 Subject: [PATCH 3/4] test(spanner): add missing test for Read --- src/spanner/src/read_only_transaction.rs | 97 ++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/src/spanner/src/read_only_transaction.rs b/src/spanner/src/read_only_transaction.rs index 44782326f3..ea57a75853 100644 --- a/src/spanner/src/read_only_transaction.rs +++ b/src/spanner/src/read_only_transaction.rs @@ -922,4 +922,101 @@ pub(crate) mod tests { let result = rs.next().await; assert!(result.is_none(), "expected None, got {result:?}"); } + + #[tokio::test] + async fn execute_multi_read() -> anyhow::Result<()> { + use super::super::result_set::tests::string_val; + use crate::client::{KeySet, ReadRequest}; + use crate::value::Value; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + + let mut mock = create_session_mock(); + + // No explicit begin_transaction should be called. + mock.expect_begin_transaction().never(); + + let mut seq = mockall::Sequence::new(); + + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + assert_eq!( + req.session, + "projects/p/instances/i/databases/d/sessions/123" + ); + + // First call: Should have Selector::Begin + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Begin(_) => {} + _ => panic!("Expected Selector::Begin"), + } + let mut rs = setup_select1(); + rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction { + id: vec![4, 5, 6], + read_timestamp: Some(prost_types::Timestamp { + seconds: 987654321, + nanos: 0, + }), + ..Default::default() + }); + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(rs)]), + ))) + }); + + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(move |req| { + let req = req.into_inner(); + // Second call: Should have Selector::Id using the ID returned in the first call + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![4, 5, 6]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(setup_select1())]), + ))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // The read timestamp is not available until the first query is executed. + assert!(tx.read_timestamp().is_none()); + + for i in 0..2 { + let read = ReadRequest::builder("Users", vec!["Id", "Name"]) + .with_keys(KeySet::all()) + .build(); + let mut rs = tx.execute_read(read).await?; + + let row = rs.next().await.expect("Expected a row")?; + assert_eq!(row.raw_values(), [Value(string_val("1"))]); + + let result = rs.next().await; + assert!(result.is_none(), "Expected None, got {result:?}"); + + if i == 0 { + // Read timestamp becomes available. + assert_eq!( + tx.read_timestamp() + .expect("Expected read timestamp") + .seconds(), + 987654321 + ); + } + } + + Ok(()) + } } From c3e8b332ca0629f0f439e7761a7d401d45fd934c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Thu, 2 Apr 2026 11:26:16 +0200 Subject: [PATCH 4/4] perf(spanner): inline begin transaction error handling Adds error handling for inline-begin-transaction. If the first statement in a transaction fails, and that statement included a BeginTransaction option, then the transaction has not been started. In order to keep the semantics of the transaction consistent for an 'outside observer', we need to do the following: 1. Catch the error that was thrown by the initial statement. 2. Start the transaction using an explicit BeginTransaction RPC. 3. Retry the initial statement, but now using the transaction ID from step 2. 4. Return the error or result for the retried initial statement. The above makes sure that: 1. The transaction is actually started when the first statement is executed, also when the statement failed. 2. The statement becomes part of the transaction, and the result of the statement is consistent with the read-timestamp of the transaction. The second part is important in order to comply with Spanner's strong consistency guarantees; If for example a statement returns a 'Table not found' error, then that error is only valid for the read timestamp that was used for executing the statement. This is the reason that we retry the statement after the BeginTransaction RPC to be able to return a result that is guaranteed to be consistent with any other queries/reads that will be executed in the same transaction. --- src/spanner/src/read_only_transaction.rs | 486 ++++++++++++++++++++-- src/spanner/src/result_set.rs | 503 ++++++++++++++++++++++- tests/spanner/src/query.rs | 39 ++ tests/spanner/tests/driver.rs | 4 + 4 files changed, 974 insertions(+), 58 deletions(-) diff --git a/src/spanner/src/read_only_transaction.rs b/src/spanner/src/read_only_transaction.rs index ea57a75853..67ca13feb4 100644 --- a/src/spanner/src/read_only_transaction.rs +++ b/src/spanner/src/read_only_transaction.rs @@ -275,16 +275,7 @@ impl MultiUseReadOnlyTransactionBuilder { &self, options: TransactionOptions, ) -> crate::Result { - let request = crate::model::BeginTransactionRequest::default() - .set_session(self.client.session.name.clone()) - .set_options(options); - - // TODO(#4972): make request options configurable - let response = self - .client - .spanner - .begin_transaction(request, crate::RequestOptions::default()) - .await?; + let response = execute_begin_transaction(&self.client, options).await?; let transaction_selector = crate::model::TransactionSelector::default().set_id(response.id); @@ -429,6 +420,22 @@ impl MultiUseReadOnlyTransaction { } } +/// Executes an explicit `BeginTransaction` RPC on Spanner. +async fn execute_begin_transaction( + client: &crate::database_client::DatabaseClient, + options: crate::model::TransactionOptions, +) -> crate::Result { + let request = crate::model::BeginTransactionRequest::default() + .set_session(client.session.name.clone()) + .set_options(options); + + // TODO(#4972): make request options configurable + client + .spanner + .begin_transaction(request, crate::RequestOptions::default()) + .await +} + #[derive(Clone, Debug)] pub(crate) enum ReadContextTransactionSelector { Fixed(crate::model::TransactionSelector, Option), @@ -463,6 +470,32 @@ impl ReadContextTransactionSelector { } } + /// Explicitly begins a transaction if the transaction selector is a `Lazy` + /// selector and the transaction has not yet been started. This is used by + /// the client to force the start of a transaction if the first statement + /// failed. + pub(crate) async fn begin_explicitly( + &self, + client: &crate::database_client::DatabaseClient, + ) -> crate::Result<()> { + let Self::Lazy(lazy) = self else { + return Ok(()); + }; + + let options = { + let guard = lazy.lock().expect("transaction state mutex poisoned"); + let TransactionState::NotStarted(options) = &*guard else { + return Ok(()); + }; + options.clone() + }; + + let response = execute_begin_transaction(client, options).await?; + self.update(response.id, response.read_timestamp); + + Ok(()) + } + pub(crate) fn update(&self, id: bytes::Bytes, timestamp: Option) { if let Self::Lazy(lazy) = self { let mut guard = lazy.lock().expect("transaction state mutex poisoned"); @@ -517,6 +550,64 @@ impl ReadContext { options } + /// Attempts to execute an explicit `begin_transaction` RPC if the current transaction + /// selector is still in the `Lazy(NotStarted)` state. This is used as a + /// fallback mechanism when an initial implicit begin attempt failed. + async fn begin_explicitly_if_not_started(&self) -> crate::Result { + let ReadContextTransactionSelector::Lazy(lazy) = &self.transaction_selector else { + return Ok(false); + }; + let is_started = matches!(&*lazy.lock().unwrap(), TransactionState::Started(_, _)); + if is_started { + return Ok(false); + } + + self.transaction_selector + .begin_explicitly(&self.client) + .await?; + Ok(true) + } +} + +/// Helper macro to execute a streaming SQL or streaming read RPC with retry logic. +macro_rules! execute_stream_with_retry { + ($self:expr, $request:ident, $rpc_method:ident, $operation_variant:path) => {{ + let stream = match $self + .client + .spanner + // TODO(#4972): make request options configurable + .$rpc_method($request.clone(), crate::RequestOptions::default()) + .send() + .await + { + Ok(s) => s, + Err(e) => { + if $self.begin_explicitly_if_not_started().await? { + $request.transaction = Some($self.transaction_selector.selector()); + $self + .client + .spanner + // TODO(#4972): make request options configurable + .$rpc_method($request.clone(), crate::RequestOptions::default()) + .send() + .await? + } else { + return Err(e); + } + } + }; + + Ok(ResultSet::new( + stream, + Some($self.transaction_selector.clone()), + $self.precommit_token_tracker.clone(), + $self.client.clone(), + $operation_variant($request), + )) + }}; +} + +impl ReadContext { pub(crate) async fn execute_query>( &self, statement: T, @@ -528,21 +619,7 @@ impl ReadContext { .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); - let stream = self - .client - .spanner - // TODO(#4972): make request options configurable - .execute_streaming_sql(request.clone(), crate::RequestOptions::default()) - .send() - .await?; - - Ok(ResultSet::new( - stream, - Some(self.transaction_selector.clone()), - self.precommit_token_tracker.clone(), - self.client.clone(), - StreamOperation::Query(request), - )) + execute_stream_with_retry!(self, request, execute_streaming_sql, StreamOperation::Query) } pub(crate) async fn execute_read>( @@ -556,27 +633,15 @@ impl ReadContext { .set_transaction(self.transaction_selector.selector()); request.request_options = self.amend_request_options(request.request_options); - let stream = self - .client - .spanner - // TODO(#4972): make request options configurable - .streaming_read(request.clone(), crate::RequestOptions::default()) - .send() - .await?; - - Ok(ResultSet::new( - stream, - Some(self.transaction_selector.clone()), - self.precommit_token_tracker.clone(), - self.client.clone(), - StreamOperation::Read(request), - )) + execute_stream_with_retry!(self, request, streaming_read, StreamOperation::Read) } } #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::result_set::tests::string_val; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; #[test] fn auto_traits() { @@ -1019,4 +1084,345 @@ pub(crate) mod tests { Ok(()) } + + #[tokio::test] + async fn inline_begin_failure_retry_success() -> anyhow::Result<()> { + use crate::value::Value; + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + assert_eq!( + req.session, + "projects/p/instances/i/databases/d/sessions/123" + ); + // Return a transaction with ID + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: Some(prost_types::Timestamp { + seconds: 123456789, + nanos: 0, + }), + ..Default::default() + })) + }); + + // 3. Retry of the query succeeds + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + // Ensure it uses the new transaction ID + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok( + setup_select1(), + )])))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let mut rs = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await?; + + let row = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected a row but stream cleanly exhausted"))??; + assert_eq!( + row.raw_values(), + [Value(string_val("1"))], + "The parsed row value safely matched the underlying stream chunk" + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_failure_retry_failure() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error first"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| { + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: Some(prost_types::Timestamp { + seconds: 123456789, + nanos: 0, + }), + ..Default::default() + })) + }); + + // 3. Retry of the query fails again + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error second"))); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!( + rs_result.is_err(), + "The failed execution bubbled upwards securely" + ); + let err_str = rs_result.unwrap_err().to_string(); + assert!( + err_str.contains("Internal error second"), + "Secondary error message accurately propagates: {}", + err_str + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_failure_fallback_rpc_fails() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial query fails + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error query"))); + + // 2. Explicit begin transaction fails + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error begin tx"))); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!( + rs_result.is_err(), + "The explicitly errored fallback boot securely propagated outwards" + ); + let err_str = rs_result.unwrap_err().to_string(); + assert!( + err_str.contains("Internal error begin tx"), + "Natively propagated specific BeginTx bounds: {}", + err_str + ); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_read_failure_retry_success() -> anyhow::Result<()> { + use crate::client::{KeySet, ReadRequest}; + use crate::value::Value; + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial read fails + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error"))); + + // 2. Explicit begin transaction succeeds + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| { + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: None, + ..Default::default() + })) + }); + + // 3. Retry of the read succeeds + mock.expect_streaming_read() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + // Ensure it uses the new transaction ID + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id"), + } + Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok( + setup_select1(), + )])))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + let read = ReadRequest::builder("Users", vec!["Id", "Name"]) + .with_keys(KeySet::all()) + .build(); + let mut rs = tx.execute_read(read).await?; + + let row = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected a row uniquely returned"))??; + assert_eq!( + row.raw_values(), + [Value(string_val("1"))], + "The macro correctly unpacked read arrays seamlessly" + ); + + Ok(()) + } + + #[tokio::test] + async fn single_use_query_send_error_returns_immediately() -> anyhow::Result<()> { + use crate::client::Statement; + use gaxi::grpc::tonic::Status; + + let mut mock = create_session_mock(); + + mock.expect_execute_streaming_sql() + .times(1) + .returning(|_| Err(Status::internal("Internal error single use query"))); + + mock.expect_begin_transaction().never(); + + let (db_client, _server) = setup_db_client(mock).await; + // single_use creates a Fixed selector + let tx = db_client.single_use().build(); + + let rs_result = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await; + + assert!(rs_result.is_err()); + let err_str = rs_result.unwrap_err().to_string(); + assert!(err_str.contains("Internal error single use query")); + + Ok(()) + } + + #[tokio::test] + async fn inline_begin_already_started_query_send_error_returns_immediately() + -> anyhow::Result<()> { + use crate::client::Statement; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + + let mut mock = create_session_mock(); + let mut seq = mockall::Sequence::new(); + + mock.expect_begin_transaction().never(); + + // 1. First query executes successfully and implicitly starts the transaction. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(move |_req| { + let mut rs = setup_select1(); + rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction { + id: vec![4, 5, 6], + read_timestamp: None, + ..Default::default() + }); + Ok(gaxi::grpc::tonic::Response::new(Box::pin( + tokio_stream::iter(vec![Ok(rs)]), + ))) + }); + + // 2. Second query fails immediately upon send() + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_| Err(Status::internal("Internal error second query"))); + + let (db_client, _server) = setup_db_client(mock).await; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // Run first query (starts tx) + let mut rs = tx + .execute_query(Statement::builder("SELECT 1").build()) + .await?; + let _ = rs.next().await.expect("has row")?; + + // Run second query (fails) + let rs_result = tx + .execute_query(Statement::builder("SELECT 2").build()) + .await; + + assert!(rs_result.is_err()); + let err_str = rs_result.unwrap_err().to_string(); + assert!(err_str.contains("Internal error second query")); + + Ok(()) + } } diff --git a/src/spanner/src/result_set.rs b/src/spanner/src/result_set.rs index 6bd848b175..2d775b7412 100644 --- a/src/spanner/src/result_set.rs +++ b/src/spanner/src/result_set.rs @@ -233,7 +233,29 @@ impl ResultSet { return Ok(()); } - Err(e) + // Check if this stream included an inlined BeginTransaction option + // and has not yet returned a transaction ID. If so, we explicitly + // begin the transaction and restart the stream. + let Some(ReadContextTransactionSelector::Lazy(lazy)) = &self.transaction_selector else { + return Err(e); + }; + let is_started = matches!( + &*lazy.lock().unwrap(), + crate::read_only_transaction::TransactionState::Started(_, _) + ); + if is_started { + return Err(e); + } + + self.transaction_selector + .as_ref() + .unwrap() + .begin_explicitly(&self.client) + .await?; + + self.partial_result_sets_buffer.clear(); + self.restart_stream().await?; + Ok(()) } fn handle_stream_end(&mut self) -> crate::Result> { @@ -281,15 +303,25 @@ impl ResultSet { (None, Some(mut m)) => { let transaction = m.transaction.take(); self.metadata = Some(ResultSetMetadata::new(Some(m))); - if let (Some(selector), Some(transaction)) = - (&self.transaction_selector, transaction) - { - selector.update( - transaction.id, - transaction - .read_timestamp - .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()), - ); + if let Some(selector) = &self.transaction_selector { + if let Some(transaction) = transaction { + selector.update( + transaction.id, + transaction + .read_timestamp + .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()), + ); + } else if let ReadContextTransactionSelector::Lazy(lazy) = selector { + let is_started = matches!( + &*lazy.lock().expect("transaction state mutex poisoned"), + crate::read_only_transaction::TransactionState::Started(_, _) + ); + if !is_started { + return Err(internal_error( + "Spanner failed to return a transaction ID for a query that included a BeginTransaction option", + )); + } + } } } } @@ -336,9 +368,15 @@ impl ResultSet { } async fn restart_stream(&mut self) -> crate::Result<()> { + // Get the latest transaction selector for this transaction. + let transaction_selector = self.transaction_selector.as_ref().map(|s| s.selector()); + match &mut self.operation { StreamOperation::Query(req) => { req.resume_token = self.last_resume_token.clone(); + req.transaction = transaction_selector + .clone() + .or_else(|| req.transaction.take()); let stream = self .client .spanner @@ -349,6 +387,9 @@ impl ResultSet { } StreamOperation::Read(req) => { req.resume_token = self.last_resume_token.clone(); + req.transaction = transaction_selector + .clone() + .or_else(|| req.transaction.take()); let stream = self .client .spanner @@ -465,6 +506,7 @@ pub(crate) mod tests { use super::*; use crate::client::Spanner; use gaxi::grpc::tonic::Response; + use google_cloud_auth::credentials::anonymous::Builder as Anonymous; use prost_types::Value; use spanner_grpc_mock::MockSpanner; use spanner_grpc_mock::google::spanner::v1::spanner_server::Spanner as SpannerTrait; @@ -528,7 +570,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await .expect("Failed to build client"); @@ -564,6 +606,39 @@ pub(crate) mod tests { assert!(next.is_none()); } + #[tokio::test] + async fn test_result_set_metadata() -> anyhow::Result<()> { + let mut rs = run_mock_query(vec![PartialResultSet { + metadata: metadata(2), + values: vec![string_val("a"), string_val("b")], + last: true, + ..Default::default() + }]) + .await; + + // Called before next() -> returns MetadataNotAvailable + let meta_err = rs.metadata(); + assert!(meta_err.is_err()); + assert!(matches!( + meta_err.unwrap_err(), + ResultSetError::MetadataNotAvailable + )); + + // Advance to fetch metadata + let _next = rs.next().await.expect("Expected a row")?; + + // Called after next() -> returns metadata + let meta = rs.metadata(); + assert!(meta.is_ok()); + let meta = meta.unwrap(); + assert_eq!( + meta.column_names(), + &["col0".to_string(), "col1".to_string()] + ); + + Ok(()) + } + #[tokio::test] async fn test_result_set_handle_partial_result_set_error() -> anyhow::Result<()> { let mut rs = run_mock_query(vec![PartialResultSet { @@ -586,6 +661,34 @@ pub(crate) mod tests { Ok(()) } + #[tokio::test] + async fn test_result_set_handle_partial_result_set_error_immediate() -> anyhow::Result<()> { + let mut rs = run_mock_query(vec![ + PartialResultSet { + values: vec![string_val("row1")], + ..Default::default() + }, + PartialResultSet { + resume_token: b"token".to_vec(), + ..Default::default() + }, + ]) + .await; + + let res = rs.next().await; + assert!(res.is_some(), "Expected an error but got None"); + let res = res.expect("Expected some response but got None"); + assert!(res.is_err(), "Expected an error but got Ok"); + let err_str = res.expect_err("Expected should be an error").to_string(); + assert!( + err_str.contains("First PartialResultSet did not contain metadata"), + "Expected error to contain 'First PartialResultSet did not contain metadata', but got '{}'", + err_str + ); + + Ok(()) + } + #[tokio::test] async fn test_result_set_stream_ended_with_chunked_value() -> anyhow::Result<()> { let mut rs = run_mock_query(vec![PartialResultSet { @@ -725,7 +828,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1081,7 +1184,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1137,7 +1240,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1207,7 +1310,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1296,7 +1399,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1375,7 +1478,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1427,7 +1530,7 @@ pub(crate) mod tests { let client: Spanner = Spanner::builder() .with_endpoint(address) - .with_credentials(google_cloud_auth::credentials::anonymous::Builder::new().build()) + .with_credentials(Anonymous::new().build()) .build() .await?; @@ -1448,4 +1551,368 @@ pub(crate) mod tests { Ok(()) } + + #[tokio::test] + async fn result_set_inline_begin_stream_error_fallback() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Stream yields an error on the first chunk before returning transaction metadata. + // E.g., INVALID_ARGUMENT because the query is malformed. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let stream = + tokio_stream::iter(vec![Err(Status::invalid_argument("Invalid query"))]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + // 2. The explicit BeginTransaction fallback gets triggered. + mock.expect_begin_transaction() + .times(1) + .in_sequence(&mut seq) + .returning(|_| { + Ok(Response::new(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: Some(prost_types::Timestamp { + seconds: 123456789, + nanos: 0, + }), + ..Default::default() + })) + }); + + // 3. The ResultSet gracefully restarts the stream using the transaction ID returned by BeginTransaction. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + // Ensure the explicitly yielded ID is routed into the new stream transaction selector + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id"), + } + + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + metadata: metadata(1), + values: vec![string_val("1")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let row1 = rs.next().await.ok_or_else(|| { + anyhow::anyhow!("Expected row returned successfully despite stream breaking") + })??; + assert_eq!( + row1.raw_values()[0].0, + string_val("1"), + "Verify the returned stream successfully resumed with the correct payload" + ); + + Ok(()) + } + + #[tokio::test] + async fn result_set_retry_inline_begin_transient_error() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial stream throws UNAVAILABLE before metadata. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let stream = + tokio_stream::iter(vec![Err(Status::unavailable("Transient network issue"))]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + // 2. We retry the stream since it was a transient error. + // The retry should use the same transaction selector as the original request. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Begin(_) => {} + _ => panic!("Expected Selector::Begin on stream retry"), + } + + let mut meta = metadata(1).unwrap(); + meta.transaction = Some(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: None, + ..Default::default() + }); + + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + metadata: Some(meta), + values: vec![string_val("1")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let row1 = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected stream to recover safely"))??; + assert_eq!( + row1.raw_values()[0].0, + string_val("1"), + "Verify resumed stream returns data" + ); + + Ok(()) + } + + #[tokio::test] + async fn result_set_retry_inline_begin_id_recovered() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use gaxi::grpc::tonic::Status; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Stream successfully returns metadata chunk then throws UNAVAILABLE on chunk 2. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let mut meta = metadata(1).unwrap(); + meta.transaction = Some(mock_v1::Transaction { + id: vec![7, 8, 9], + read_timestamp: None, + ..Default::default() + }); + let stream = tokio_stream::iter(vec![ + Ok(PartialResultSet { + metadata: Some(meta), + values: vec![string_val("1")], + resume_token: b"token1".to_vec(), + ..Default::default() + }), + Err(Status::unavailable("Transient mid-stream network issue")), + ]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + // 2. Stream resumes using Selector::Id. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|req| { + let req = req.into_inner(); + match req.transaction.unwrap().selector.unwrap() { + mock_v1::transaction_selector::Selector::Id(id) => { + assert_eq!(id, vec![7, 8, 9]); + } + _ => panic!("Expected Selector::Id on stream retry"), + } + + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + values: vec![string_val("2")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let row1 = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected stream row1 extracted"))??; + assert_eq!( + row1.raw_values()[0].0, + string_val("1"), + "Verified chunk 1 payload" + ); + let row2 = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected stream row2 recovered"))??; + assert_eq!( + row2.raw_values()[0].0, + string_val("2"), + "Verified chunk 2 reboot dynamically intercepted ID bounds correctly" + ); + + Ok(()) + } + + #[tokio::test] + async fn result_set_inline_begin_metadata_missing_transaction_fails() -> anyhow::Result<()> { + use gaxi::grpc::tonic::Response; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::start; + + let mut mock = MockSpanner::new(); + let mut seq = mockall::Sequence::new(); + + // 1. Initial stream successfully returns metadata chunk but completely lacks the `Transaction` entity. + mock.expect_execute_streaming_sql() + .times(1) + .in_sequence(&mut seq) + .returning(|_request| { + let stream = tokio_stream::iter(vec![Ok(PartialResultSet { + metadata: metadata(1), // Missing `.transaction` natively + values: vec![string_val("1")], + ..Default::default() + })]); + Ok(Response::new( + Box::pin(stream) as ::ExecuteStreamingSqlStream, + )) + }); + + mock.expect_create_session().returning(|_| { + Ok(Response::new(Session { + name: "session".to_string(), + multiplexed: true, + ..Default::default() + })) + }); + + let (address, _server) = start("127.0.0.1:0", mock).await?; + + let client: Spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await?; + + let db_client = client.database_client("db").build().await?; + + // Use explicitly deferred Lazy begin transaction! + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + let mut rs = tx.execute_query("SELECT 1").await?; + + let rs_result = rs + .next() + .await + .ok_or_else(|| anyhow::anyhow!("Expected explicit crash bound properly"))?; + assert!( + rs_result.is_err(), + "Securely aborted when metadata failed to package internal bounds properly" + ); + + let err_str = rs_result.unwrap_err().to_string(); + assert!( + err_str.contains("failed to return a transaction ID"), + "Caught implicit gap boundary: {}", + err_str + ); + + Ok(()) + } } diff --git a/tests/spanner/src/query.rs b/tests/spanner/src/query.rs index fe02427a19..0f1a54553e 100644 --- a/tests/spanner/src/query.rs +++ b/tests/spanner/src/query.rs @@ -246,6 +246,45 @@ async fn test_multi_use_read_only_transaction( Ok(()) } +pub async fn multi_use_read_only_transaction_invalid_query_fallback( + db_client: &DatabaseClient, +) -> anyhow::Result<()> { + // Start a multi-use read-only transaction with implicit begin. + let tx = db_client + .read_only_transaction() + .with_explicit_begin_transaction(false) + .build() + .await?; + + // Expect a read timestamp to NOT have been chosen yet. + assert!(tx.read_timestamp().is_none()); + + // Execute the first query with invalid syntax. + let rs_result = tx + .execute_query(Statement::builder("SELECT * FROM NonExistentTable").build()) + .await; + + assert!( + rs_result.is_err(), + "Expected an error from an invalid query" + ); + + // The read timestamp should now be available because the transaction + // fell back to an explicit BeginTransaction. + assert!(tx.read_timestamp().is_some()); + + // It should be possible to use the transaction. + let mut rs2 = tx + .execute_query(Statement::builder("SELECT 2 AS col_int").build()) + .await?; + + let row2 = rs2.next().await.transpose()?.expect("should yield a row"); + let val2 = row2.raw_values()[0].as_string(); + assert_eq!(val2, "2"); + + Ok(()) +} + fn verify_null_row(row: &google_cloud_spanner::client::Row) { let raw_values = row.raw_values(); assert_eq!(raw_values.len(), 20, "Row should have exactly 20 columns"); diff --git a/tests/spanner/tests/driver.rs b/tests/spanner/tests/driver.rs index d27cc10dc5..2492088f67 100644 --- a/tests/spanner/tests/driver.rs +++ b/tests/spanner/tests/driver.rs @@ -26,6 +26,10 @@ mod spanner { integration_tests_spanner::query::query_with_parameters(&db_client).await?; integration_tests_spanner::query::result_set_metadata(&db_client).await?; integration_tests_spanner::query::multi_use_read_only_transaction(&db_client).await?; + integration_tests_spanner::query::multi_use_read_only_transaction_invalid_query_fallback( + &db_client, + ) + .await?; Ok(()) }