Skip to content

Commit

Permalink
Improve RemoteSequencer tracing and logging
Browse files Browse the repository at this point in the history
Also avoid sending over the open connection if the
inflight commits channel is closed
  • Loading branch information
muhamadazmy committed Nov 14, 2024
1 parent 3e98771 commit 8f3856b
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 102 deletions.
35 changes: 27 additions & 8 deletions crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,40 @@ pub type SendableLogletReadStream = Pin<Box<dyn LogletReadStream + Send>>;

#[allow(dead_code)]
pub(crate) struct LogletCommitResolver {
tx: oneshot::Sender<Result<LogletOffset, AppendError>>,
tx: Option<oneshot::Sender<Result<LogletOffset, AppendError>>>,
}

#[allow(dead_code)]
impl LogletCommitResolver {
pub fn sealed(self) {
let _ = self.tx.send(Err(AppendError::Sealed));
pub fn sealed(mut self) {
let _ = self
.tx
.take()
.expect("must be set")
.send(Err(AppendError::Sealed));
}

pub fn offset(self, offset: LogletOffset) {
let _ = self.tx.send(Ok(offset));
pub fn offset(mut self, offset: LogletOffset) {
let _ = self.tx.take().expect("must be set").send(Ok(offset));
}

pub fn error(self, err: AppendError) {
let _ = self.tx.send(Err(err));
pub fn error(mut self, err: AppendError) {
let _ = self.tx.take().expect("must be set").send(Err(err));
}
}

#[derive(Debug, Clone, Copy, thiserror::Error)]
#[error("Commit resolver was dropped")]
struct CommitCancelled;

/// If a LogletCommitResolver is dropped without being
/// 'resolved', we resolve it automatically as being cancelled
/// To make it distinguished from a Shutdown.
impl Drop for LogletCommitResolver {
fn drop(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(Err(AppendError::retryable(CommitCancelled)));
}
}
}

Expand All @@ -194,7 +213,7 @@ impl LogletCommit {
#[allow(dead_code)]
pub(crate) fn deferred() -> (Self, LogletCommitResolver) {
let (tx, rx) = oneshot::channel();
(Self { rx }, LogletCommitResolver { tx })
(Self { rx }, LogletCommitResolver { tx: Some(tx) })
}
}

Expand Down
Loading

0 comments on commit 8f3856b

Please sign in to comment.