Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/spanner/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use crate::key::KeySet;
use crate::model::DirectedReadOptions;
use crate::model::request_options::Priority;

/// Represents an incomplete read operation that requires specifying keys.
///
Expand Down Expand Up @@ -143,6 +144,24 @@ impl ConfiguredReadRequestBuilder {
self
}

/// Sets the RPC priority to use for this read request.
///
/// # Example
/// ```
/// # use google_cloud_spanner::client::{ReadRequest, KeySet};
/// # use google_cloud_spanner::model::request_options::Priority;
/// let request = ReadRequest::builder("Users", vec!["Id"])
/// .with_keys(KeySet::all())
/// .with_priority(Priority::Low)
/// .build();
/// ```
pub fn with_priority(mut self, priority: Priority) -> Self {
self.request_options
.get_or_insert_with(crate::model::RequestOptions::default)
.priority = priority;
self
}

/// Sets the directed read options for this request.
///
/// ```
Expand Down Expand Up @@ -292,6 +311,20 @@ mod tests {
);
}

#[test]
fn with_priority() {
let req = ReadRequest::builder("MyTable", vec!["col1"])
.with_keys(KeySet::all())
.with_priority(Priority::High)
.build();
assert_eq!(
req.request_options
.expect("request options missing")
.priority,
Priority::High
);
}

#[test]
fn with_directed_read_options() {
let dro = DirectedReadOptions::default();
Expand Down
24 changes: 22 additions & 2 deletions src/spanner/src/read_write_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::model::RollbackRequest;
use crate::model::TransactionOptions;
use crate::model::TransactionSelector;
use crate::model::execute_batch_dml_request::Statement as ExecuteBatchDmlStatement;
use crate::model::request_options::Priority;
use crate::model::result_set_stats::RowCount;
use crate::model::transaction_options::IsolationLevel;
use crate::model::transaction_options::Mode;
Expand All @@ -45,6 +46,7 @@ pub(crate) struct ReadWriteTransactionBuilder {
transaction_tag: Option<String>,
max_commit_delay: Option<Duration>,
pub(crate) session_name: String,
commit_priority: Priority,
}

impl ReadWriteTransactionBuilder {
Expand All @@ -56,6 +58,7 @@ impl ReadWriteTransactionBuilder {
transaction_tag: None,
max_commit_delay: None,
session_name,
commit_priority: Priority::Unspecified,
}
}

Expand Down Expand Up @@ -89,6 +92,11 @@ impl ReadWriteTransactionBuilder {
self
}

pub(crate) fn with_commit_priority(mut self, priority: Priority) -> Self {
self.commit_priority = priority;
self
}

pub(crate) fn with_max_commit_delay(mut self, delay: Duration) -> Self {
self.max_commit_delay = Some(delay);
self
Expand Down Expand Up @@ -132,6 +140,7 @@ impl ReadWriteTransactionBuilder {
},
seqno: Arc::new(AtomicI64::new(1)),
max_commit_delay: self.max_commit_delay,
commit_priority: self.commit_priority.clone(),
})
}
}
Expand All @@ -142,6 +151,7 @@ pub struct ReadWriteTransaction {
pub(crate) context: ReadContext,
seqno: Arc<AtomicI64>,
max_commit_delay: Option<Duration>,
commit_priority: Priority,
}

impl ReadWriteTransaction {
Expand Down Expand Up @@ -301,6 +311,16 @@ impl ReadWriteTransaction {
}
}

fn commit_request_options(&self) -> Option<crate::model::RequestOptions> {
let mut options = self.context.amend_request_options(None);
if self.commit_priority != Priority::Unspecified {
options
.get_or_insert_with(crate::model::RequestOptions::default)
.priority = self.commit_priority.clone();
}
options
}

/// Commits the transaction.
pub(crate) async fn commit(self) -> crate::Result<wkt::Timestamp> {
let transaction_id = self.transaction_id()?;
Expand All @@ -309,7 +329,7 @@ impl ReadWriteTransaction {
.set_session(self.context.session_name.clone())
.set_transaction_id(transaction_id.clone())
.set_or_clear_precommit_token(precommit_token)
.set_or_clear_request_options(self.context.amend_request_options(None))
.set_or_clear_request_options(self.commit_request_options())
.set_or_clear_max_commit_delay(self.max_commit_delay);

let response = self
Expand All @@ -325,7 +345,7 @@ impl ReadWriteTransaction {
.set_session(self.context.session_name.clone())
.set_transaction_id(transaction_id)
.set_precommit_token(*new_precommit_token)
.set_or_clear_request_options(self.context.amend_request_options(None));
.set_or_clear_request_options(self.commit_request_options());

self.context
.client
Expand Down
31 changes: 31 additions & 0 deletions src/spanner/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::model::DirectedReadOptions;
use crate::model::execute_sql_request::QueryMode;
use crate::model::execute_sql_request::QueryOptions;
use crate::model::request_options::Priority;
use crate::to_value::ToValue;
use crate::types::Type;
use crate::value::Value;
Expand Down Expand Up @@ -99,6 +100,23 @@ impl StatementBuilder {
self
}

/// Sets the RPC priority to use for this statement.
///
/// # Example
/// ```
/// # use google_cloud_spanner::client::Statement;
/// # use google_cloud_spanner::model::request_options::Priority;
/// let statement = Statement::builder("SELECT * FROM users")
/// .with_priority(Priority::Low)
/// .build();
/// ```
pub fn with_priority(mut self, priority: Priority) -> Self {
self.request_options
.get_or_insert_with(crate::model::RequestOptions::default)
.priority = priority;
self
}

/// Sets the directed read options for this statement.
///
/// ```
Expand Down Expand Up @@ -419,6 +437,19 @@ mod tests {
);
}

#[test]
fn with_priority() {
let stmt = Statement::builder("SELECT * FROM users")
.with_priority(Priority::High)
.build();
assert_eq!(
stmt.request_options
.expect("request options missing")
.priority,
Priority::High
);
}

#[test]
fn with_directed_read_options() {
let dro = DirectedReadOptions::default();
Expand Down
22 changes: 22 additions & 0 deletions src/spanner/src/transaction_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use crate::database_client::DatabaseClient;
use crate::model::request_options::Priority;
use crate::model::transaction_options::IsolationLevel;
use crate::model::transaction_options::read_write::ReadLockMode;
use crate::read_write_transaction::{ReadWriteTransaction, ReadWriteTransactionBuilder};
Expand Down Expand Up @@ -124,6 +125,27 @@ impl TransactionRunnerBuilder {
self
}

/// Sets the RPC priority to use for the commit of this transaction.
///
/// # Example
/// ```
/// # use google_cloud_spanner::client::Spanner;
/// # use google_cloud_spanner::model::request_options::Priority;
/// # async fn run(client: Spanner) -> Result<(), google_cloud_spanner::Error> {
/// let db_client = client.database_client("projects/p/instances/i/databases/d").build().await?;
/// let runner = db_client
/// .read_write_transaction()
/// .with_commit_priority(Priority::Low)
/// .build()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn with_commit_priority(mut self, priority: Priority) -> Self {
self.builder = self.builder.with_commit_priority(priority);
self
}

/// Sets the maximum commit delay for the transaction.
///
/// # Example
Expand Down
42 changes: 35 additions & 7 deletions src/spanner/src/write_only_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use crate::client::{DatabaseClient, Mutation};
use crate::model::request_options::Priority;
use crate::model::transaction_options::ReadWrite;
use crate::model::{
BeginTransactionRequest, CommitRequest, CommitResponse, RequestOptions, TransactionOptions,
Expand All @@ -31,6 +32,7 @@ pub struct WriteOnlyTransactionBuilder {
max_commit_delay: Option<Duration>,
retry_policy: Box<dyn TransactionRetryPolicy>,
exclude_txn_from_change_streams: bool,
commit_priority: Priority,
}

impl WriteOnlyTransactionBuilder {
Expand All @@ -41,6 +43,7 @@ impl WriteOnlyTransactionBuilder {
max_commit_delay: None,
retry_policy: Box::new(BasicTransactionRetryPolicy::default()),
exclude_txn_from_change_streams: false,
commit_priority: Priority::Unspecified,
}
}

Expand All @@ -64,6 +67,25 @@ impl WriteOnlyTransactionBuilder {
self
}

/// Sets the RPC priority to use for the commit of this transaction.
///
/// # Example
/// ```
/// # use google_cloud_spanner::client::Spanner;
/// # use google_cloud_spanner::model::request_options::Priority;
/// # async fn build_tx(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
/// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
/// let transaction = db_client.write_only_transaction()
/// .with_commit_priority(Priority::Low)
/// .build();
/// # Ok(())
/// # }
/// ```
pub fn with_commit_priority(mut self, priority: Priority) -> Self {
self.commit_priority = priority;
self
}

/// Sets the maximum commit delay for the transaction.
///
/// # Example
Expand Down Expand Up @@ -164,6 +186,7 @@ impl WriteOnlyTransactionBuilder {
max_commit_delay: self.max_commit_delay,
retry_policy: self.retry_policy,
exclude_txn_from_change_streams: self.exclude_txn_from_change_streams,
commit_priority: self.commit_priority,
}
}
}
Expand All @@ -178,6 +201,7 @@ pub struct WriteOnlyTransaction {
max_commit_delay: Option<Duration>,
retry_policy: Box<dyn TransactionRetryPolicy>,
exclude_txn_from_change_streams: bool,
commit_priority: Priority,
}

impl WriteOnlyTransaction {
Expand Down Expand Up @@ -213,8 +237,9 @@ impl WriteOnlyTransaction {
where
I: IntoIterator<Item = Mutation>,
{
let req_options =
RequestOptions::default().set_transaction_tag(self.transaction_tag.unwrap_or_default());
let req_options = RequestOptions::default()
.set_transaction_tag(self.transaction_tag.unwrap_or_default())
.set_priority(self.commit_priority.clone());

let mutations_proto: Vec<_> = mutations.into_iter().map(|m| m.build_proto()).collect();
let mutation_key = Mutation::select_mutation_key(&mutations_proto);
Expand Down Expand Up @@ -326,9 +351,9 @@ impl WriteOnlyTransaction {
let single_use = TransactionOptions::new()
.set_read_write(Box::new(ReadWrite::new()))
.set_exclude_txn_from_change_streams(self.exclude_txn_from_change_streams);
let req_options =
RequestOptions::new().set_transaction_tag(self.transaction_tag.unwrap_or_default());

let req_options = RequestOptions::default()
.set_transaction_tag(self.transaction_tag.unwrap_or_default())
.set_priority(self.commit_priority.clone());
let request = CommitRequest::new()
.set_session(self.session_name.clone())
.set_mutations(mutations.into_iter().map(|m| m.build_proto()))
Expand Down Expand Up @@ -405,9 +430,11 @@ mod tests {
let req = req.into_inner();
assert_eq!(req.session, "projects/p/instances/i/databases/d/sessions/123");

// Validate the custom request options contain the transaction tag
// Validate the custom request options contain the transaction tag and priority
assert!(req.request_options.is_some());
assert_eq!(req.request_options.as_ref().expect("request_options should be present").transaction_tag, "my_tag");
let req_opts = req.request_options.as_ref().expect("request_options should be present");
assert_eq!(req_opts.transaction_tag, "my_tag");
assert_eq!(Priority::from(req_opts.priority), Priority::High);

assert!(req.mutations.len() == 1);

Expand Down Expand Up @@ -440,6 +467,7 @@ mod tests {
let res = db_client
.write_only_transaction()
.with_transaction_tag("my_tag")
.with_commit_priority(Priority::High)
.build()
.write_at_least_once(vec![mutation])
.await;
Expand Down
Loading