From 726eb82e7874dde6d0b851034727d84fffe18394 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 22 Apr 2026 16:28:26 +0200 Subject: [PATCH] chore(spanner): foundation for batch write transactions Adds the foundations for batch write transactions. The public API will be added in a follow-up pull request. --- src/spanner/src/batch_write_transaction.rs | 146 +++++++++++++++++++++ src/spanner/src/database_client.rs | 8 ++ src/spanner/src/lib.rs | 1 + src/spanner/src/mutation.rs | 88 +++++++++++++ 4 files changed, 243 insertions(+) create mode 100644 src/spanner/src/batch_write_transaction.rs diff --git a/src/spanner/src/batch_write_transaction.rs b/src/spanner/src/batch_write_transaction.rs new file mode 100644 index 0000000000..ed6973b943 --- /dev/null +++ b/src/spanner/src/batch_write_transaction.rs @@ -0,0 +1,146 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::client::DatabaseClient; +use crate::model::BatchWriteRequest; +use crate::mutation::MutationGroup; +use crate::server_streaming::stream::BatchWriteStream; + +/// A builder for [BatchWriteTransaction]. +#[allow(dead_code)] +pub struct BatchWriteTransactionBuilder { + client: DatabaseClient, +} + +impl BatchWriteTransactionBuilder { + pub(crate) fn new(client: DatabaseClient) -> Self { + Self { client } + } + + /// Builds the [BatchWriteTransaction]. + #[allow(dead_code)] + pub fn build(self) -> BatchWriteTransaction { + let session_name = self.client.session_name(); + BatchWriteTransaction { + session_name, + client: self.client, + } + } +} + +/// A transaction for executing batch writes. +/// +/// Batch writes are not guaranteed to be atomic across mutation groups. +/// All mutations within a group are applied atomically. +#[allow(dead_code)] +pub struct BatchWriteTransaction { + session_name: String, + client: DatabaseClient, +} + +impl BatchWriteTransaction { + /// Executes the batch write and returns a stream of responses. + #[allow(dead_code)] + pub(crate) async fn execute_streaming(self, groups: I) -> crate::Result + where + I: IntoIterator, + { + let req = BatchWriteRequest::new() + .set_session(self.session_name.clone()) + .set_mutation_groups(groups.into_iter().map(|g| g.build_proto())); + + self.client + .spanner + .batch_write(req, crate::RequestOptions::default()) + .send() + .await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::{Mutation, Spanner}; + use crate::result_set::tests::adapt; + use gaxi::grpc::tonic::Response; + use spanner_grpc_mock::MockSpanner; + use spanner_grpc_mock::google::spanner::v1 as mock_v1; + + pub(crate) async fn setup_db_client( + mock: MockSpanner, + ) -> (DatabaseClient, tokio::task::JoinHandle<()>) { + use google_cloud_auth::credentials::anonymous::Builder as Anonymous; + let (address, server) = spanner_grpc_mock::start("0.0.0.0:0", mock) + .await + .expect("Failed to start mock server"); + let spanner = Spanner::builder() + .with_endpoint(address) + .with_credentials(Anonymous::new().build()) + .build() + .await + .expect("Failed to build client"); + + let db_client = spanner + .database_client("projects/p/instances/i/databases/d") + .build() + .await + .expect("Failed to create DatabaseClient"); + + (db_client, server) + } + + #[tokio::test] + async fn test_execute_streaming() { + let mut mock = MockSpanner::new(); + mock.expect_create_session().returning(|_| { + Ok(Response::new(mock_v1::Session { + name: "projects/p/instances/i/databases/d/sessions/123".to_string(), + ..Default::default() + })) + }); + + mock.expect_batch_write().once().returning(|req| { + let req = req.into_inner(); + assert_eq!( + req.session, + "projects/p/instances/i/databases/d/sessions/123" + ); + assert_eq!(req.mutation_groups.len(), 1); + + let response = mock_v1::BatchWriteResponse { + indexes: vec![0], + status: None, + commit_timestamp: None, + }; + + Ok(Response::from(adapt([Ok(response)]))) + }); + + let (db_client, _server) = setup_db_client(mock).await; + + let mutation = Mutation::new_insert_builder("Users") + .set("UserId") + .to(&1) + .build(); + let group = MutationGroup::new(vec![mutation]); + + let tx = db_client.batch_write_transaction().build(); + let mut stream = tx.execute_streaming(vec![group]).await.unwrap(); + + let result = stream.next_message().await; + assert!(result.is_some()); + let result = result.unwrap().unwrap(); + assert_eq!(result.indexes, vec![0]); + } +} diff --git a/src/spanner/src/database_client.rs b/src/spanner/src/database_client.rs index 92a6b50de8..c3467c3213 100644 --- a/src/spanner/src/database_client.rs +++ b/src/spanner/src/database_client.rs @@ -193,6 +193,14 @@ impl DatabaseClient { crate::write_only_transaction::WriteOnlyTransactionBuilder::new(self.clone()) } + /// Returns a builder for a batch write transaction. + #[allow(dead_code)] + pub(crate) fn batch_write_transaction( + &self, + ) -> crate::batch_write_transaction::BatchWriteTransactionBuilder { + crate::batch_write_transaction::BatchWriteTransactionBuilder::new(self.clone()) + } + pub(crate) fn session_name(&self) -> String { self.session_maintainer.session_name() } diff --git a/src/spanner/src/lib.rs b/src/spanner/src/lib.rs index e9ff6f4937..3541a0651e 100644 --- a/src/spanner/src/lib.rs +++ b/src/spanner/src/lib.rs @@ -44,6 +44,7 @@ pub mod batch_read_only_transaction; pub mod model { pub use crate::generated::gapic_dataplane::model::*; } +pub(crate) mod batch_write_transaction; pub(crate) mod database_client; pub(crate) mod from_value; pub(crate) mod key; diff --git a/src/spanner/src/mutation.rs b/src/spanner/src/mutation.rs index 9267772d5c..8df1bc093e 100644 --- a/src/spanner/src/mutation.rs +++ b/src/spanner/src/mutation.rs @@ -13,9 +13,12 @@ // limitations under the License. use crate::key::KeySet; +use crate::model::batch_write_request::MutationGroup as ProtoMutationGroup; use crate::model::mutation::Operation; use crate::to_value::ToValue; use crate::value::Value; +use std::slice::Iter; +use std::vec::IntoIter; /// Represents an individual table modification to be applied to Cloud Spanner. /// @@ -266,6 +269,42 @@ impl ValueBinder { } } +/// A group of mutations that are applied atomically in a [BatchWriteTransaction]. +#[derive(Clone, Debug, PartialEq)] +pub struct MutationGroup { + pub mutations: Vec, +} + +impl MutationGroup { + /// Creates a new mutation group from a list of mutations. + pub fn new(mutations: Vec) -> Self { + Self { mutations } + } + + #[allow(dead_code)] + pub(crate) fn build_proto(self) -> ProtoMutationGroup { + ProtoMutationGroup::new().set_mutations(self.mutations.into_iter().map(|m| m.build_proto())) + } +} + +impl IntoIterator for MutationGroup { + type Item = Mutation; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.mutations.into_iter() + } +} + +impl<'a> IntoIterator for &'a MutationGroup { + type Item = &'a Mutation; + type IntoIter = Iter<'a, Mutation>; + + fn into_iter(self) -> Self::IntoIter { + self.mutations.iter() + } +} + #[cfg(test)] mod tests { use super::*; @@ -277,6 +316,55 @@ mod tests { static_assertions::assert_impl_all!(Delete: Send, Sync, Clone, std::fmt::Debug); static_assertions::assert_impl_all!(WriteBuilder: Send, Sync); static_assertions::assert_impl_all!(ValueBinder: Send, Sync); + static_assertions::assert_impl_all!(MutationGroup: Send, Sync, Clone, std::fmt::Debug); + } + + #[test] + fn mutation_group() { + let mutation1 = Mutation::new_insert_builder("Users") + .set("UserId") + .to(&1) + .build(); + let mutation2 = Mutation::new_insert_builder("Users") + .set("UserId") + .to(&2) + .build(); + let group = MutationGroup::new(vec![mutation1.clone(), mutation2.clone()]); + assert_eq!(group.mutations.len(), 2); + assert_eq!(group.mutations[0], mutation1); + assert_eq!(group.mutations[1], mutation2); + } + + #[test] + fn mutation_group_into_iter() { + let mutation1 = Mutation::new_insert_builder("Users") + .set("UserId") + .to(&1) + .build(); + let mutation2 = Mutation::new_insert_builder("Users") + .set("UserId") + .to(&2) + .build(); + let group = MutationGroup::new(vec![mutation1.clone(), mutation2.clone()]); + + let mutations: Vec<_> = group.into_iter().collect(); + assert_eq!(mutations, vec![mutation1, mutation2]); + } + + #[test] + fn mutation_group_iter_ref() { + let mutation1 = Mutation::new_insert_builder("Users") + .set("UserId") + .to(&1) + .build(); + let mutation2 = Mutation::new_insert_builder("Users") + .set("UserId") + .to(&2) + .build(); + let group = MutationGroup::new(vec![mutation1.clone(), mutation2.clone()]); + + let mutations: Vec<_> = (&group).into_iter().collect(); + assert_eq!(mutations, vec![&mutation1, &mutation2]); } #[test]