From 9a25d4ad4ec1a4d7ad499887117cb6889af804b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Fri, 24 Apr 2026 16:48:20 +0200 Subject: [PATCH] feat(spanner): add options for RPC priority Adds support for setting the RPC priority for SQL statements, reads, and the CommitRequest of a transaction. --- src/spanner/src/read.rs | 33 ++++++++++++++++++ src/spanner/src/read_write_transaction.rs | 24 +++++++++++-- src/spanner/src/statement.rs | 31 +++++++++++++++++ src/spanner/src/transaction_runner.rs | 22 ++++++++++++ src/spanner/src/write_only_transaction.rs | 42 +++++++++++++++++++---- 5 files changed, 143 insertions(+), 9 deletions(-) diff --git a/src/spanner/src/read.rs b/src/spanner/src/read.rs index 4e086372b8..871eef275c 100644 --- a/src/spanner/src/read.rs +++ b/src/spanner/src/read.rs @@ -14,6 +14,7 @@ use crate::key::KeySet; use crate::model::DirectedReadOptions; +use crate::model::request_options::Priority; /// Represents an incomplete read operation that requires specifying keys. /// @@ -143,6 +144,24 @@ impl ConfiguredReadRequestBuilder { self } + /// Sets the RPC priority to use for this read request. + /// + /// # Example + /// ``` + /// # use google_cloud_spanner::client::{ReadRequest, KeySet}; + /// # use google_cloud_spanner::model::request_options::Priority; + /// let request = ReadRequest::builder("Users", vec!["Id"]) + /// .with_keys(KeySet::all()) + /// .with_priority(Priority::Low) + /// .build(); + /// ``` + pub fn with_priority(mut self, priority: Priority) -> Self { + self.request_options + .get_or_insert_with(crate::model::RequestOptions::default) + .priority = priority; + self + } + /// Sets the directed read options for this request. /// /// ``` @@ -292,6 +311,20 @@ mod tests { ); } + #[test] + fn with_priority() { + let req = ReadRequest::builder("MyTable", vec!["col1"]) + .with_keys(KeySet::all()) + .with_priority(Priority::High) + .build(); + assert_eq!( + req.request_options + .expect("request options missing") + .priority, + Priority::High + ); + } + #[test] fn with_directed_read_options() { let dro = DirectedReadOptions::default(); diff --git a/src/spanner/src/read_write_transaction.rs b/src/spanner/src/read_write_transaction.rs index fd3c7b28cc..90bc11a035 100644 --- a/src/spanner/src/read_write_transaction.rs +++ b/src/spanner/src/read_write_transaction.rs @@ -23,6 +23,7 @@ use crate::model::RollbackRequest; use crate::model::TransactionOptions; use crate::model::TransactionSelector; use crate::model::execute_batch_dml_request::Statement as ExecuteBatchDmlStatement; +use crate::model::request_options::Priority; use crate::model::result_set_stats::RowCount; use crate::model::transaction_options::IsolationLevel; use crate::model::transaction_options::Mode; @@ -45,6 +46,7 @@ pub(crate) struct ReadWriteTransactionBuilder { transaction_tag: Option, max_commit_delay: Option, pub(crate) session_name: String, + commit_priority: Priority, } impl ReadWriteTransactionBuilder { @@ -56,6 +58,7 @@ impl ReadWriteTransactionBuilder { transaction_tag: None, max_commit_delay: None, session_name, + commit_priority: Priority::Unspecified, } } @@ -89,6 +92,11 @@ impl ReadWriteTransactionBuilder { self } + pub(crate) fn with_commit_priority(mut self, priority: Priority) -> Self { + self.commit_priority = priority; + self + } + pub(crate) fn with_max_commit_delay(mut self, delay: Duration) -> Self { self.max_commit_delay = Some(delay); self @@ -132,6 +140,7 @@ impl ReadWriteTransactionBuilder { }, seqno: Arc::new(AtomicI64::new(1)), max_commit_delay: self.max_commit_delay, + commit_priority: self.commit_priority.clone(), }) } } @@ -142,6 +151,7 @@ pub struct ReadWriteTransaction { pub(crate) context: ReadContext, seqno: Arc, max_commit_delay: Option, + commit_priority: Priority, } impl ReadWriteTransaction { @@ -301,6 +311,16 @@ impl ReadWriteTransaction { } } + fn commit_request_options(&self) -> Option { + let mut options = self.context.amend_request_options(None); + if self.commit_priority != Priority::Unspecified { + options + .get_or_insert_with(crate::model::RequestOptions::default) + .priority = self.commit_priority.clone(); + } + options + } + /// Commits the transaction. pub(crate) async fn commit(self) -> crate::Result { let transaction_id = self.transaction_id()?; @@ -309,7 +329,7 @@ impl ReadWriteTransaction { .set_session(self.context.session_name.clone()) .set_transaction_id(transaction_id.clone()) .set_or_clear_precommit_token(precommit_token) - .set_or_clear_request_options(self.context.amend_request_options(None)) + .set_or_clear_request_options(self.commit_request_options()) .set_or_clear_max_commit_delay(self.max_commit_delay); let response = self @@ -325,7 +345,7 @@ impl ReadWriteTransaction { .set_session(self.context.session_name.clone()) .set_transaction_id(transaction_id) .set_precommit_token(*new_precommit_token) - .set_or_clear_request_options(self.context.amend_request_options(None)); + .set_or_clear_request_options(self.commit_request_options()); self.context .client diff --git a/src/spanner/src/statement.rs b/src/spanner/src/statement.rs index f3d882b5b5..8b35d23499 100644 --- a/src/spanner/src/statement.rs +++ b/src/spanner/src/statement.rs @@ -15,6 +15,7 @@ use crate::model::DirectedReadOptions; use crate::model::execute_sql_request::QueryMode; use crate::model::execute_sql_request::QueryOptions; +use crate::model::request_options::Priority; use crate::to_value::ToValue; use crate::types::Type; use crate::value::Value; @@ -99,6 +100,23 @@ impl StatementBuilder { self } + /// Sets the RPC priority to use for this statement. + /// + /// # Example + /// ``` + /// # use google_cloud_spanner::client::Statement; + /// # use google_cloud_spanner::model::request_options::Priority; + /// let statement = Statement::builder("SELECT * FROM users") + /// .with_priority(Priority::Low) + /// .build(); + /// ``` + pub fn with_priority(mut self, priority: Priority) -> Self { + self.request_options + .get_or_insert_with(crate::model::RequestOptions::default) + .priority = priority; + self + } + /// Sets the directed read options for this statement. /// /// ``` @@ -419,6 +437,19 @@ mod tests { ); } + #[test] + fn with_priority() { + let stmt = Statement::builder("SELECT * FROM users") + .with_priority(Priority::High) + .build(); + assert_eq!( + stmt.request_options + .expect("request options missing") + .priority, + Priority::High + ); + } + #[test] fn with_directed_read_options() { let dro = DirectedReadOptions::default(); diff --git a/src/spanner/src/transaction_runner.rs b/src/spanner/src/transaction_runner.rs index b96b0da549..72cc36b22f 100644 --- a/src/spanner/src/transaction_runner.rs +++ b/src/spanner/src/transaction_runner.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::database_client::DatabaseClient; +use crate::model::request_options::Priority; use crate::model::transaction_options::IsolationLevel; use crate::model::transaction_options::read_write::ReadLockMode; use crate::read_write_transaction::{ReadWriteTransaction, ReadWriteTransactionBuilder}; @@ -124,6 +125,27 @@ impl TransactionRunnerBuilder { self } + /// Sets the RPC priority to use for the commit of this transaction. + /// + /// # Example + /// ``` + /// # use google_cloud_spanner::client::Spanner; + /// # use google_cloud_spanner::model::request_options::Priority; + /// # async fn run(client: Spanner) -> Result<(), google_cloud_spanner::Error> { + /// let db_client = client.database_client("projects/p/instances/i/databases/d").build().await?; + /// let runner = db_client + /// .read_write_transaction() + /// .with_commit_priority(Priority::Low) + /// .build() + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn with_commit_priority(mut self, priority: Priority) -> Self { + self.builder = self.builder.with_commit_priority(priority); + self + } + /// Sets the maximum commit delay for the transaction. /// /// # Example diff --git a/src/spanner/src/write_only_transaction.rs b/src/spanner/src/write_only_transaction.rs index 5615fec052..df381770e0 100644 --- a/src/spanner/src/write_only_transaction.rs +++ b/src/spanner/src/write_only_transaction.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::client::{DatabaseClient, Mutation}; +use crate::model::request_options::Priority; use crate::model::transaction_options::ReadWrite; use crate::model::{ BeginTransactionRequest, CommitRequest, CommitResponse, RequestOptions, TransactionOptions, @@ -31,6 +32,7 @@ pub struct WriteOnlyTransactionBuilder { max_commit_delay: Option, retry_policy: Box, exclude_txn_from_change_streams: bool, + commit_priority: Priority, } impl WriteOnlyTransactionBuilder { @@ -41,6 +43,7 @@ impl WriteOnlyTransactionBuilder { max_commit_delay: None, retry_policy: Box::new(BasicTransactionRetryPolicy::default()), exclude_txn_from_change_streams: false, + commit_priority: Priority::Unspecified, } } @@ -64,6 +67,25 @@ impl WriteOnlyTransactionBuilder { self } + /// Sets the RPC priority to use for the commit of this transaction. + /// + /// # Example + /// ``` + /// # use google_cloud_spanner::client::Spanner; + /// # use google_cloud_spanner::model::request_options::Priority; + /// # async fn build_tx(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> { + /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?; + /// let transaction = db_client.write_only_transaction() + /// .with_commit_priority(Priority::Low) + /// .build(); + /// # Ok(()) + /// # } + /// ``` + pub fn with_commit_priority(mut self, priority: Priority) -> Self { + self.commit_priority = priority; + self + } + /// Sets the maximum commit delay for the transaction. /// /// # Example @@ -164,6 +186,7 @@ impl WriteOnlyTransactionBuilder { max_commit_delay: self.max_commit_delay, retry_policy: self.retry_policy, exclude_txn_from_change_streams: self.exclude_txn_from_change_streams, + commit_priority: self.commit_priority, } } } @@ -178,6 +201,7 @@ pub struct WriteOnlyTransaction { max_commit_delay: Option, retry_policy: Box, exclude_txn_from_change_streams: bool, + commit_priority: Priority, } impl WriteOnlyTransaction { @@ -213,8 +237,9 @@ impl WriteOnlyTransaction { where I: IntoIterator, { - let req_options = - RequestOptions::default().set_transaction_tag(self.transaction_tag.unwrap_or_default()); + let req_options = RequestOptions::default() + .set_transaction_tag(self.transaction_tag.unwrap_or_default()) + .set_priority(self.commit_priority.clone()); let mutations_proto: Vec<_> = mutations.into_iter().map(|m| m.build_proto()).collect(); let mutation_key = Mutation::select_mutation_key(&mutations_proto); @@ -326,9 +351,9 @@ impl WriteOnlyTransaction { let single_use = TransactionOptions::new() .set_read_write(Box::new(ReadWrite::new())) .set_exclude_txn_from_change_streams(self.exclude_txn_from_change_streams); - let req_options = - RequestOptions::new().set_transaction_tag(self.transaction_tag.unwrap_or_default()); - + let req_options = RequestOptions::default() + .set_transaction_tag(self.transaction_tag.unwrap_or_default()) + .set_priority(self.commit_priority.clone()); let request = CommitRequest::new() .set_session(self.session_name.clone()) .set_mutations(mutations.into_iter().map(|m| m.build_proto())) @@ -405,9 +430,11 @@ mod tests { let req = req.into_inner(); assert_eq!(req.session, "projects/p/instances/i/databases/d/sessions/123"); - // Validate the custom request options contain the transaction tag + // Validate the custom request options contain the transaction tag and priority assert!(req.request_options.is_some()); - assert_eq!(req.request_options.as_ref().expect("request_options should be present").transaction_tag, "my_tag"); + let req_opts = req.request_options.as_ref().expect("request_options should be present"); + assert_eq!(req_opts.transaction_tag, "my_tag"); + assert_eq!(Priority::from(req_opts.priority), Priority::High); assert!(req.mutations.len() == 1); @@ -440,6 +467,7 @@ mod tests { let res = db_client .write_only_transaction() .with_transaction_tag("my_tag") + .with_commit_priority(Priority::High) .build() .write_at_least_once(vec![mutation]) .await;