Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Soft-leader election mechanism for cc based on observed cluster state #2252

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ impl<T: TransportConnect> Scheduler<T> {
let alive_workers = observed_cluster_state
.alive_nodes
.keys()
.cloned()
.filter(|node_id| nodes_config.has_worker_role(node_id))
.cloned()
.collect();

self.update_scheduling_plan(&alive_workers, nodes_config, placement_hints)
Expand Down
70 changes: 58 additions & 12 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ use std::time::Duration;
use anyhow::anyhow;
use codederror::CodedError;
use futures::future::OptionFuture;
use itertools::Itertools;
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
use tonic::codec::CompressionEncoding;
use tracing::{debug, info, warn};
use tracing::{debug, info, trace, warn};

use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient};
Expand All @@ -40,6 +41,7 @@ use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::metadata_store::keys::PARTITION_TABLE_KEY;
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor_manager::CreateSnapshotRequest;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::partition_table::PartitionTable;
use restate_types::protobuf::common::AdminStatus;
use restate_types::{GenerationalNodeId, Version};
Expand Down Expand Up @@ -312,33 +314,35 @@ impl<T: TransportConnect> Service<T> {
}
}
Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => {
let nodes_config = &nodes_config.live_load();
observed_cluster_state.update(&cluster_state);
logs_controller.on_observed_cluster_state_update(
nodes_config,
&observed_cluster_state, SchedulingPlanNodeSetSelectorHints::from(&scheduler))?;
scheduler.on_observed_cluster_state(
&observed_cluster_state,
nodes_config,
LogsBasedPartitionProcessorPlacementHints::from(&logs_controller))
.await?;

if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) {
self.on_cluster_state_update(
nodes_config.live_load(),
&observed_cluster_state,
&mut logs_controller,
&mut scheduler,
).await?;
}
}
result = logs_controller.run_async_operations() => {
result?;
}
Ok(_) = logs_watcher.changed() => {
Ok(_) = logs_watcher.changed(), if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) => {
logs_controller.on_logs_update(self.metadata.logs_ref())?;
// tell the scheduler about potentially newly provisioned logs
scheduler.on_logs_update(logs.live_load(), partition_table.live_load()).await?
}
Ok(_) = partition_table_watcher.changed() => {
Ok(_) = partition_table_watcher.changed(), if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) => {
let partition_table = partition_table.live_load();
let logs = logs.live_load();

logs_controller.on_partition_table_update(partition_table);
scheduler.on_logs_update(logs, partition_table).await?;
}
Some(cmd) = self.command_rx.recv() => {
// note: This branch is safe to enable on passive CCs
// since it works only as a gateway to leader PPs
self.on_cluster_cmd(cmd, bifrost_admin).await;
}
_ = config_watcher.changed() => {
Expand All @@ -356,6 +360,48 @@ impl<T: TransportConnect> Service<T> {
}
}

fn is_active_controller(
&self,
nodes_config: &NodesConfiguration,
observed_cluster_state: &ObservedClusterState,
) -> bool {
let maybe_leader = nodes_config
.get_admin_nodes()
.filter(|node| observed_cluster_state.is_node_alive(node.current_generation))
.map(|node| node.current_generation.as_plain())
.sorted()
.next();

// assume active if no leader CC (None) or self holds the smallest plain node id with role admin
!maybe_leader.is_some_and(|admin_id| admin_id != self.metadata.my_node_id().as_plain())
}

async fn on_cluster_state_update(
&self,
nodes_config: &NodesConfiguration,
observed_cluster_state: &ObservedClusterState,
logs_controller: &mut LogsController,
scheduler: &mut Scheduler<T>,
) -> anyhow::Result<()> {
trace!("Acting like a cluster controller");

logs_controller.on_observed_cluster_state_update(
nodes_config,
observed_cluster_state,
SchedulingPlanNodeSetSelectorHints::from(scheduler as &Scheduler<T>),
)?;

scheduler
.on_observed_cluster_state(
observed_cluster_state,
nodes_config,
LogsBasedPartitionProcessorPlacementHints::from(logs_controller as &LogsController),
)
.await?;

Ok(())
}

async fn init_partition_table(&mut self) -> anyhow::Result<()> {
let configuration = self.configuration.live_load();

Expand Down
Loading