Skip to content

Commit

Permalink
Introduce retry-policy to shuffle
Browse files Browse the repository at this point in the history
Fixes #2148
  • Loading branch information
muhamadazmy committed Nov 7, 2024
1 parent 67bae96 commit 2d01133
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 8 deletions.
6 changes: 6 additions & 0 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ pub struct WorkerOptions {

#[serde(flatten)]
pub snapshots: SnapshotsOptions,

/// # Append retry policy
///
/// Retry policy for appending records to virtual log (bifrost)
pub append_retry_policy: RetryPolicy,
}

impl WorkerOptions {
Expand Down Expand Up @@ -98,6 +103,7 @@ impl Default for WorkerOptions {
invoker: Default::default(),
max_command_batch_size: NonZeroUsize::new(4).expect("Non zero number"),
snapshots: SnapshotsOptions::default(),
append_retry_policy: RetryPolicy::fixed_delay(Duration::from_secs(1), None),
}
}
}
Expand Down
51 changes: 43 additions & 8 deletions crates/worker/src/partition/shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,17 +261,20 @@ where
}

mod state_machine {
use pin_project::pin_project;
use std::cmp::Ordering;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use pin_project::pin_project;
use tokio_util::sync::ReusableBoxFuture;
use tracing::{debug, trace};

use restate_storage_api::outbox_table::OutboxMessage;
use restate_types::config::Configuration;
use restate_types::message::MessageIndex;
use restate_types::retries::{RetryIter, RetryPolicy};
use restate_wal_protocol::Envelope;

use crate::partition::shuffle;
Expand All @@ -291,7 +294,12 @@ mod state_machine {
enum State<SendFuture> {
Idle,
ReadingOutbox,
Sending(#[pin] SendFuture, Arc<Envelope>),
Sending {
#[pin]
send_future: SendFuture,
envelope: Arc<Envelope>,
retry: Option<RetryIter<'static>>,
},
}

#[pin_project]
Expand All @@ -302,6 +310,7 @@ mod state_machine {
read_future: ReadFuture<OutboxReader>,
send_operation: SendOp,
hint_rx: &'a mut async_channel::Receiver<NewOutboxMessage>,
retry_policy: RetryPolicy,
#[pin]
state: State<SendFuture>,
}
Expand Down Expand Up @@ -342,6 +351,7 @@ mod state_machine {
read_future: ReusableBoxFuture::new(reading_future),
send_operation,
hint_rx,
retry_policy: Configuration::pinned().worker.append_retry_policy.clone(),
state: State::ReadingOutbox,
}
}
Expand Down Expand Up @@ -371,7 +381,11 @@ mod state_machine {
this.metadata,
));
let send_future = (this.send_operation)(Arc::clone(&envelope));
this.state.set(State::Sending(send_future, envelope));
this.state.set(State::Sending {
send_future,
envelope,
retry: Some(this.retry_policy.clone().into_iter()),
});
break;
}
Ordering::Greater => {
Expand Down Expand Up @@ -410,20 +424,41 @@ mod state_machine {
));
let send_future = (this.send_operation)(Arc::clone(&envelope));

this.state.set(State::Sending(send_future, envelope));
this.state.set(State::Sending {
send_future,
envelope,
retry: Some(this.retry_policy.clone().into_iter()),
});
} else {
this.state.set(State::Idle);
}
}
StateProj::Sending(send_future, envelope) => {
StateProj::Sending {
send_future,
envelope,
retry,
} => {
if let Err(err) = send_future.await {
debug!("Retrying failed shuffle attempt: {err}");

let send_future = (this.send_operation)(Arc::clone(envelope));
let envelope = Arc::clone(envelope);
this.state.set(State::Sending(send_future, envelope));
let mut retry = retry.take().expect("retry policy is set");

tokio::time::sleep(Duration::from_secs(1)).await;
match retry.next() {
Some(delay) => {
this.state.set(State::Sending {
send_future,
envelope,
retry: Some(retry),
});

tokio::time::sleep(delay).await;
}
None => {
return Err(err).context("Maximum number of retries exhausted");
}
}
} else {
let successfully_shuffled_sequence_number =
*this.current_sequence_number;
Expand Down

0 comments on commit 2d01133

Please sign in to comment.