From ec730c6ffe5ba85b18828df0a3f7f6c33d93f082 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Thu, 14 Nov 2024 13:47:17 +0100 Subject: [PATCH] Improve RemoteSequencer tracing and logging Also avoid sending over the open connection if the inflight commits channel is closed --- crates/bifrost/src/loglet/mod.rs | 35 +++- .../replicated_loglet/remote_sequencer.rs | 195 +++++++++--------- 2 files changed, 129 insertions(+), 101 deletions(-) diff --git a/crates/bifrost/src/loglet/mod.rs b/crates/bifrost/src/loglet/mod.rs index 76b129968..b4b0298a9 100644 --- a/crates/bifrost/src/loglet/mod.rs +++ b/crates/bifrost/src/loglet/mod.rs @@ -156,21 +156,40 @@ pub type SendableLogletReadStream = Pin>; #[allow(dead_code)] pub(crate) struct LogletCommitResolver { - tx: oneshot::Sender>, + tx: Option>>, } #[allow(dead_code)] impl LogletCommitResolver { - pub fn sealed(self) { - let _ = self.tx.send(Err(AppendError::Sealed)); + pub fn sealed(mut self) { + let _ = self + .tx + .take() + .expect("must be set") + .send(Err(AppendError::Sealed)); } - pub fn offset(self, offset: LogletOffset) { - let _ = self.tx.send(Ok(offset)); + pub fn offset(mut self, offset: LogletOffset) { + let _ = self.tx.take().expect("must be set").send(Ok(offset)); } - pub fn error(self, err: AppendError) { - let _ = self.tx.send(Err(err)); + pub fn error(mut self, err: AppendError) { + let _ = self.tx.take().expect("must be set").send(Err(err)); + } +} + +#[derive(Debug, Clone, Copy, thiserror::Error)] +#[error("Commit resolver was dropped")] +struct CommitCancelled; + +/// If a LogletCommitResolver is dropped without being +/// 'resolved', we resolve it automatically as being cancelled +/// To make it distinguished from a Shutdown. +impl Drop for LogletCommitResolver { + fn drop(&mut self) { + if let Some(tx) = self.tx.take() { + let _ = tx.send(Err(AppendError::retryable(CommitCancelled))); + } } } @@ -194,7 +213,7 @@ impl LogletCommit { #[allow(dead_code)] pub(crate) fn deferred() -> (Self, LogletCommitResolver) { let (tx, rx) = oneshot::channel(); - (Self { rx }, LogletCommitResolver { tx }) + (Self { rx }, LogletCommitResolver { tx: Some(tx) }) } } diff --git a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs index 2c486da84..d4b5f3042 100644 --- a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs @@ -17,8 +17,10 @@ use std::{ }; use tokio::sync::{mpsc, Mutex, OwnedSemaphorePermit, Semaphore}; +use tracing::{debug, instrument, Span}; use restate_core::{ + cancellation_watcher, network::{ rpc_router::{RpcRouter, RpcToken}, NetworkError, NetworkSendError, Networking, Outgoing, TransportConnect, WeakConnection, @@ -33,7 +35,6 @@ use restate_types::{ replicated_loglet::ReplicatedLogletParams, GenerationalNodeId, }; -use tracing::instrument; use super::rpc_routers::SequencersRpc; use crate::loglet::{ @@ -107,7 +108,7 @@ where } #[instrument( - level="trace", + level="debug", skip_all, fields( otel.name = "replicated_loglet::remote_sequencer: append", @@ -122,13 +123,6 @@ where let len = u32::try_from(payloads.len()).expect("batch sizes fit in u32"); - let permits = self - .record_permits - .clone() - .acquire_many_owned(len) - .await - .unwrap(); - let mut connection = self.get_connection().await?; let mut msg = Append { @@ -140,36 +134,43 @@ where payloads, }; - let rpc_token = loop { + let loglet_commit = loop { + let permits = self + .record_permits + .clone() + .acquire_many_owned(len) + .await + .unwrap(); match connection - .send(&self.sequencers_rpc.append, self.params.sequencer, msg) + .send( + permits, + &self.sequencers_rpc.append, + self.params.sequencer, + msg, + ) .await { - Ok(token) => break token, + Ok(loglet_commit) => break loglet_commit, Err(err) => { match err.source { - NetworkError::ConnectError(_) - | NetworkError::ConnectionClosed(_) - | NetworkError::Timeout(_) => { - // we retry to re-connect one time + err @ NetworkError::Full => return Err(err.into()), + _ => { + // we retry on any other network error connection = self.renew_connection(connection).await?; msg = err.original; continue; } - err => return Err(err.into()), } } }; }; - let (commit_token, commit_resolver) = LogletCommit::deferred(); - - connection.resolve_on_appended(permits, rpc_token, commit_resolver); - Ok(commit_token) + Ok(loglet_commit) } /// Gets or starts a new remote sequencer connection + #[instrument(level = "debug", skip_all)] async fn get_connection(&self) -> Result { let mut guard = self.connection.lock().await; if let Some(connection) = guard.deref() { @@ -190,6 +191,7 @@ where /// Renew a connection to a remote sequencer. This guarantees that only a single connection /// to the sequencer is available. + #[instrument(level = "debug", skip_all, fields(renewed = false))] async fn renew_connection( &self, old: RemoteSequencerConnection, @@ -207,6 +209,8 @@ where .node_connection(self.params.sequencer) .await?; + Span::current().record("renewed", true); + let connection = RemoteSequencerConnection::start(self.known_global_tail.clone(), connection)?; @@ -228,18 +232,19 @@ where #[derive(Clone)] struct RemoteSequencerConnection { inner: WeakConnection, - tx: mpsc::UnboundedSender, + tx: mpsc::Sender, } impl RemoteSequencerConnection { + #[instrument(level = "debug", name = "connection_start", skip_all)] fn start( known_global_tail: TailOffsetWatch, connection: WeakConnection, ) -> Result { - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::channel(128); task_center().spawn( - TaskKind::NetworkMessageHandler, + TaskKind::SequencerAppender, "remote-sequencer-connection", None, Self::handle_appended_responses(known_global_tail, connection.clone(), rx), @@ -256,39 +261,33 @@ impl RemoteSequencerConnection { /// It's up to the caller to retry on [`NetworkError`] pub async fn send( &self, + permit: OwnedSemaphorePermit, rpc_router: &RpcRouter, sequencer: GenerationalNodeId, msg: Append, - ) -> Result, NetworkSendError> { + ) -> Result> { + let send_permit = self.tx.clone().reserve_owned().await.map_err(|_| { + NetworkSendError::new( + msg.clone(), + NetworkError::ConnectionClosed(self.inner.peer()), + ) + })?; + let outgoing = Outgoing::new(sequencer, msg).assign_connection(self.inner.clone()); - rpc_router + let rpc_token = rpc_router .send_on_connection(outgoing) .await - .map_err(|err| NetworkSendError::new(err.original.into_body(), err.source)) - } + .map_err(|err| NetworkSendError::new(err.original.into_body(), err.source))?; - pub fn resolve_on_appended( - &self, - permit: OwnedSemaphorePermit, - rpc_token: RpcToken, - commit_resolver: LogletCommitResolver, - ) { - let inflight_append = RemoteInflightAppend { - rpc_token, - commit_resolver, + let (commit_token, commit_resolver) = LogletCommit::deferred(); + send_permit.send(RemoteInflightAppend { + commit_resolver: commit_resolver, permit, - }; + rpc_token, + }); - if let Err(err) = self.tx.send(inflight_append) { - // if we failed to push this to be processed by the connection reactor task - // then we need to notify the caller - err.0 - .commit_resolver - .error(AppendError::retryable(NetworkError::ConnectionClosed( - self.inner.peer(), - ))); - } + Ok(commit_token) } /// Handle all [`Appended`] responses @@ -296,55 +295,84 @@ impl RemoteSequencerConnection { /// This task will run until the [`AppendStream`] is dropped. Once dropped /// all pending commits will be resolved with an error. it's up to the enqueuer /// to retry if needed. + #[instrument(level = "debug", skip_all)] async fn handle_appended_responses( known_global_tail: TailOffsetWatch, connection: WeakConnection, - mut rx: mpsc::UnboundedReceiver, + mut rx: mpsc::Receiver, ) -> anyhow::Result<()> { let mut closed = std::pin::pin!(connection.closed()); + let cancelled = cancellation_watcher(); + + let termination_err = tokio::select! { + _ = &mut closed => { + AppendError::retryable(NetworkError::ConnectionClosed(connection.peer())) + } + _ = cancelled => { + AppendError::Shutdown(ShutdownError) + } + err = Self::handler_loop(known_global_tail, connection.peer(), &mut rx) => { + err + } + }; + + // close channel to stop any further appends calls on the same connection + rx.close(); + // Drain and resolve ALL pending appends on this connection. + // + // todo(azmy): The order of the RemoteInflightAppend's on the channel + // does not necessary matches the actual append calls. This is + // since sending on the connection and pushing on the rx channel is not an atomic + // operation. Which means that, it's possible when we are draining + // the pending requests here that we also end up cancelling some inflight appends + // that has already received a positive response from the sequencer. + // + // For now this should not be a problem since they can (possibly) retry + // to do the write again later. + debug!(cause=%termination_err, "Draining inflight channel"); + let mut count = 0; + while let Some(inflight) = rx.recv().await { + inflight.commit_resolver.error(termination_err.clone()); + count += 1; + } + + debug!("Drained/Cancelled {count} inflight commits"); + + Ok(()) + } + + async fn handler_loop( + known_global_tail: TailOffsetWatch, + peer: GenerationalNodeId, + rx: &mut mpsc::Receiver, + ) -> AppendError { // handle all rpc tokens in an infinite loop // this loop only breaks when it encounters a terminal // AppendError. // When this happens, the receiver channel is closed // and drained. The same error is then used to resolve // all pending tokens - let err = loop { - let inflight = tokio::select! { - inflight = rx.recv() => { - inflight - } - _ = &mut closed => { - break AppendError::retryable(NetworkError::ConnectionClosed(connection.peer())); + loop { + let inflight = match rx.recv().await { + Some(inflight) => inflight, + None => { + return AppendError::retryable(NetworkError::ConnectionClosed(peer)); } }; - let Some(inflight) = inflight else { - // connection was dropped. - break AppendError::retryable(NetworkError::ConnectionClosed(connection.peer())); - }; - let RemoteInflightAppend { rpc_token, commit_resolver, permit: _permit, } = inflight; - let appended = tokio::select! { - incoming = rpc_token.recv() => { - incoming.map_err(AppendError::Shutdown) - }, - _ = &mut closed => { - Err(AppendError::retryable(NetworkError::ConnectionClosed(connection.peer()))) - } - }; - - let appended = match appended { + let appended = match rpc_token.recv().await { Ok(appended) => appended.into_body(), Err(err) => { // this can only be a terminal error (either shutdown or connection is closing) - commit_resolver.error(err.clone()); - break err; + commit_resolver.error(AppendError::Shutdown(err)); + return AppendError::Shutdown(err); } }; @@ -363,7 +391,7 @@ impl RemoteSequencerConnection { // A sealed status returns a terminal error since we can immediately cancel // all inflight append jobs. commit_resolver.sealed(); - break AppendError::Sealed; + return AppendError::Sealed; } SequencerStatus::UnknownLogId | SequencerStatus::UnknownSegmentIndex @@ -375,30 +403,11 @@ impl RemoteSequencerConnection { // While the UnknownLoglet status is non-terminal for the connection // (since only one request is bad), // the AppendError for the caller is terminal + debug!(error=%err, "Resolve commit with error"); commit_resolver.error(AppendError::other(err)); } } - }; - - // close channel to stop any further appends calls on the same connection - rx.close(); - - // Drain and resolve ALL pending appends on this connection. - // - // todo(azmy): The order of the RemoteInflightAppend's on the channel - // does not necessary matches the actual append calls. This is - // since sending on the connection and pushing on the rx channel is not an atomic - // operation. Which means that, it's possible when we are draining - // the pending requests here that we also end up cancelling some inflight appends - // that has already received a positive response from the sequencer. - // - // For now this should not be a problem since they can (possibly) retry - // to do the write again later. - while let Some(inflight) = rx.recv().await { - inflight.commit_resolver.error(err.clone()); } - - Ok(()) } }