Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always clean up leadership state if the partition processor stops #2295

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions crates/types/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use bytestring::ByteString;
use rand::RngCore;
use sha2::{Digest, Sha256};
use std::fmt;
use std::fmt::Formatter;
use std::hash::Hash;
use std::mem::size_of;
use std::str::FromStr;
Expand Down Expand Up @@ -855,7 +856,6 @@ impl FromStr for LambdaARN {
}

#[derive(
Debug,
PartialOrd,
PartialEq,
Eq,
Expand Down Expand Up @@ -897,7 +897,14 @@ impl Default for PartitionProcessorRpcRequestId {

impl fmt::Display for PartitionProcessorRpcRequestId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
fmt::Display::fmt(&self.0, f)
}
}

impl fmt::Debug for PartitionProcessorRpcRequestId {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
// use the same formatting for debug and display to show a consistent representation
fmt::Display::fmt(self, f)
}
}

Expand Down
20 changes: 10 additions & 10 deletions crates/worker/src/partition/leadership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ where
#[instrument(level = "debug", skip_all, fields(leader_epoch = %leader_epoch))]
pub async fn run_for_leader(&mut self, leader_epoch: LeaderEpoch) -> Result<(), Error> {
if self.is_new_leader_epoch(leader_epoch) {
self.become_follower().await?;
self.become_follower().await;
self.announce_leadership(leader_epoch).await?;
debug!("Running for leadership.");
} else {
Expand Down Expand Up @@ -305,7 +305,7 @@ where
Ok(())
}

pub async fn step_down(&mut self) -> Result<(), Error> {
pub async fn step_down(&mut self) {
debug!("Stepping down. Being a role model for Joe.");
self.become_follower().await
}
Expand All @@ -326,7 +326,7 @@ where
match leader_epoch.cmp(&announce_leader.leader_epoch) {
Ordering::Less => {
debug!("Lost leadership campaign. Becoming an obedient follower.");
self.become_follower().await?;
self.become_follower().await;
}
Ordering::Equal => {
debug!("Won the leadership campaign. Becoming the strong leader now.");
Expand All @@ -345,7 +345,7 @@ where
new_leader_epoch = %announce_leader.leader_epoch,
"Every reign must end. Stepping down and becoming an obedient follower."
);
self.become_follower().await?;
self.become_follower().await;
}
Ordering::Equal => {
warn!("Observed another leadership announcement for my own leadership. This should never happen and indicates a bug!");
Expand Down Expand Up @@ -491,7 +491,7 @@ where
Ok(invoker_rx)
}

async fn become_follower(&mut self) -> Result<(), Error> {
async fn become_follower(&mut self) {
match &mut self.state {
State::Follower => {}
State::Candidate { appender_task, .. } => {
Expand All @@ -510,7 +510,10 @@ where
let cleaner_handle =
OptionFuture::from(task_center().cancel_task(*cleaner_task_id));

let (shuffle_result, cleaner_result, abort_result) = tokio::join!(
// It's ok to not check the abort_result because either it succeeded or the invoker
// is not running. If the invoker is not running, and we are not shutting down, then
// we will fail the next time we try to invoke.
let (shuffle_result, cleaner_result, _abort_result) = tokio::join!(
shuffle_handle,
cleaner_handle,
self.invoker_tx.abort_all_partition((
Expand All @@ -519,8 +522,6 @@ where
)),
);

abort_result.map_err(Error::Invoker)?;

if let Some(shuffle_result) = shuffle_result {
shuffle_result.expect("graceful termination of shuffle task");
}
Expand All @@ -545,7 +546,6 @@ where
}

self.state = State::Follower;
Ok(())
}

pub async fn handle_actions(
Expand Down Expand Up @@ -1199,7 +1199,7 @@ mod tests {

assert!(matches!(state.state, State::Leader(_)));

state.step_down().await?;
state.step_down().await;

assert!(matches!(state.state, State::Follower));

Expand Down
35 changes: 20 additions & 15 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};

use restate_bifrost::{Bifrost, FindTailAttributes};
use restate_core::network::{HasConnection, Incoming, Outgoing};
use restate_core::{cancellation_watcher, metadata, TaskCenter, TaskHandle, TaskKind};
use restate_core::{cancellation_watcher, TaskCenter, TaskHandle, TaskKind};
use restate_partition_store::{PartitionStore, PartitionStoreTransaction};
use restate_storage_api::deduplication_table::{
DedupInformation, DedupSequenceNumber, DeduplicationTable, ProducerId,
Expand Down Expand Up @@ -264,8 +264,24 @@ where
{
#[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))]
pub async fn run(mut self) -> anyhow::Result<()> {
info!("Starting the partition processor");
let res = self.run_inner().await;
info!("Starting the partition processor.");

let res = tokio::select! {
res = self.run_inner() => {
match res.as_ref() {
Ok(_) => warn!("Shutting partition processor down because it stopped unexpectedly."),
Err(err) => warn!("Shutting partition processor down because it failed: {err}"),
Comment on lines +272 to +273
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be error! or is there a case where this is okay to happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can happen if we fail to find the tail, for example. In this case, it's up to the PPM to handle this situation. Right now it would stop the PP and wait for further instructions from the CC. But maybe warn is too high since it can also happen when shutting down. The PPM would have the context to distinguish a shut down case from an unexpected termination.

}
res
},
_ = cancellation_watcher() => {
debug!("Shutting partition processor down because it was cancelled.");
Ok(())
},
};

// clean up pending rpcs and stop child tasks
self.leadership_state.step_down().await;

// Drain control_rx
self.control_rx.close();
Expand Down Expand Up @@ -372,7 +388,6 @@ where
tokio::time::interval(Duration::from_millis(500 + rand::random::<u64>() % 524));
status_update_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);

let mut cancellation = std::pin::pin!(cancellation_watcher());
let partition_id_str: &'static str = Box::leak(Box::new(self.partition_id.to_string()));
// Telemetry setup
let apply_command_latency =
Expand All @@ -389,7 +404,6 @@ where

loop {
tokio::select! {
_ = &mut cancellation => break,
Some(command) = self.control_rx.recv() => {
if let Err(err) = self.on_command(command).await {
warn!("Failed executing command: {err}");
Expand Down Expand Up @@ -476,12 +490,6 @@ where
// budget.
tokio::task::consume_budget().await;
}

debug!(restate.node = %metadata().my_node_id(), %self.partition_id, "Shutting partition processor down.");
// ignore errors that happen during shut down
let _ = self.leadership_state.step_down().await;

Ok(())
}

async fn on_command(
Expand All @@ -498,10 +506,7 @@ where
}
PartitionProcessorControlCommand::StepDown => {
self.status.planned_mode = RunMode::Follower;
self.leadership_state
.step_down()
.await
.context("failed handling StepDown command")?;
self.leadership_state.step_down().await;
self.status.effective_mode = RunMode::Follower;
}
PartitionProcessorControlCommand::CreateSnapshot(maybe_sender) => {
Expand Down
Loading