Skip to content

Commit

Permalink
expose commit stats
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnorberg committed May 31, 2024
1 parent f4b6c62 commit cfa9ff3
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 5 deletions.
38 changes: 36 additions & 2 deletions spanner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::session::{ManagedSession, SessionConfig, SessionError, SessionManager
use crate::statement::Statement;
use crate::transaction::{CallOptions, QueryOptions};
use crate::transaction_ro::{BatchReadOnlyTransaction, ReadOnlyTransaction};
use crate::transaction_rw::{commit, CommitOptions, ReadWriteTransaction};
use crate::transaction_rw::{commit, CommitOptions, CommitResult, ReadWriteTransaction};
use crate::value::{Timestamp, TimestampBound};

#[derive(Clone, Default)]
Expand Down Expand Up @@ -517,6 +517,38 @@ impl Client {
f: F,
options: ReadWriteTransactionOption,
) -> Result<(Option<Timestamp>, T), E>
where
E: TryAs<Status> + From<SessionError> + From<Status>,
F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
{
self.read_write_transaction_with_stats(f, options)
.await
.map(|(r, v)| (r.and_then(|r| r.timestamp), v))
}

/// ReadWriteTransaction executes a read-write transaction, with retries as
/// necessary.
///
/// The function f will be called one or more times. It must not maintain
/// any state between calls.
///
/// If the transaction cannot be committed or if f returns an ABORTED error,
/// ReadWriteTransaction will call f again. It will continue to call f until the
/// transaction can be committed or the Context times out or is cancelled. If f
/// returns an error other than ABORTED, ReadWriteTransaction will abort the
/// transaction and return the error.
///
/// To limit the number of retries, set a deadline on the Context rather than
/// using a fixed limit on the number of attempts. ReadWriteTransaction will
/// retry as needed until that deadline is met.
///
/// See <https://godoc.org/cloud.google.com/go/spanner#ReadWriteTransaction> for
/// more details.
pub async fn read_write_transaction_with_stats<'a, T, E, F>(
&'a self,
f: F,
options: ReadWriteTransactionOption,
) -> Result<(Option<CommitResult>, T), E>
where
E: TryAs<Status> + From<SessionError> + From<Status>,
F: for<'tx> Fn(&'tx mut ReadWriteTransaction) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'tx>>,
Expand Down Expand Up @@ -610,7 +642,9 @@ impl Client {
|session| async {
let mut tx = self.create_read_write_transaction::<E>(session, bo.clone()).await?;
let result = f(&mut tx);
tx.finish(result, Some(co.clone())).await
tx.finish(result, Some(co.clone()))
.await
.map(|(r, v)| (r.and_then(|r| r.timestamp), v))
},
session,
)
Expand Down
34 changes: 31 additions & 3 deletions spanner/src/transaction_rw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ pub struct CommitOptions {
pub call_options: CallOptions,
}

#[derive(Clone)]
pub struct CommitResult {
pub timestamp: Option<Timestamp>,
pub mutation_count: Option<u64>,
}

impl From<CommitResponse> for CommitResult {
fn from(value: CommitResponse) -> Self {
Self {
timestamp: value.commit_timestamp.map(|v| v.into()),
mutation_count: value.commit_stats.map(|s| s.mutation_count as u64),
}
}
}

/// ReadWriteTransaction provides a locking read-write transaction.
///
/// This type of transaction is the only way to write data into Cloud Spanner;
Expand Down Expand Up @@ -230,14 +245,27 @@ impl ReadWriteTransaction {
result: Result<S, E>,
options: Option<CommitOptions>,
) -> Result<(Option<Timestamp>, S), E>
where
E: TryAs<Status> + From<Status>,
{
self.end_with_stats(result, options)
.await
.map(|(r, v)| (r.and_then(|r| r.timestamp), v))
}

pub async fn end_with_stats<S, E>(
&mut self,
result: Result<S, E>,
options: Option<CommitOptions>,
) -> Result<(Option<CommitResult>, S), E>
where
E: TryAs<Status> + From<Status>,
{
let opt = options.unwrap_or_default();
match result {
Ok(success) => {
let cr = self.commit(opt).await?;
Ok((cr.commit_timestamp.map(|e| e.into()), success))
Ok((Some(cr.into()), success))
}
Err(err) => {
if let Some(status) = err.try_as() {
Expand All @@ -256,15 +284,15 @@ impl ReadWriteTransaction {
&mut self,
result: Result<T, E>,
options: Option<CommitOptions>,
) -> Result<(Option<Timestamp>, T), (E, Option<ManagedSession>)>
) -> Result<(Option<CommitResult>, T), (E, Option<ManagedSession>)>
where
E: TryAs<Status> + From<Status>,
{
let opt = options.unwrap_or_default();

return match result {
Ok(s) => match self.commit(opt).await {
Ok(c) => Ok((c.commit_timestamp.map(|ts| ts.into()), s)),
Ok(c) => Ok((Some(c.into()), s)),
// Retry the transaction using the same session on ABORT error.
// Cloud Spanner will create the new transaction with the previous
// one's wound-wait priority.
Expand Down

0 comments on commit cfa9ff3

Please sign in to comment.