Skip to content

Commit

Permalink
Wire up PartitionRouting into RemoteScannerManager
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Oct 30, 2024
1 parent 68ecf55 commit bd4ea08
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 60 deletions.
117 changes: 93 additions & 24 deletions crates/core/src/routing_info/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ use restate_types::cluster_controller::SchedulingPlan;
use restate_types::config::Configuration;
use restate_types::identifiers::PartitionId;
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
use restate_types::{NodeId, Version, Versioned};
use restate_types::NodeId::Generational;
use restate_types::{GenerationalNodeId, NodeId, Version, Versioned};

use crate::metadata_store::MetadataStoreClient;
use crate::{
cancellation_watcher, task_center, ShutdownError, TaskCenter, TaskHandle, TaskId, TaskKind,
};

#[cfg(any(test, feature = "test-util"))]
pub use test_util::*;

pub type CommandSender = mpsc::Sender<Command>;
pub type CommandReceiver = mpsc::Receiver<Command>;

Expand All @@ -37,6 +41,15 @@ pub enum Command {
SyncRoutingInformation,
}

pub enum PartitionLookup {
/// The local node is an authoritative source for this partition.
Local,
/// The partition resides on a remote node.
Remote(NodeId),
/// An authoritative answer is not available at this time.
Unavailable,
}

/// Holds a view of the known partition-to-node mappings. Use it to discover which node(s) to route
/// requests to for a given partition. Compared to the partition table, this view is more dynamic as
/// it changes based on cluster nodes' operational status. This handle can be cheaply cloned.
Expand All @@ -48,29 +61,43 @@ pub struct PartitionRouting {
}

impl PartitionRouting {
/// Look up a suitable node to answer requests for the given partition.
pub fn get_node_by_partition(&self, partition_id: PartitionId) -> Option<NodeId> {
self.partition_to_node_mappings
.load()
.inner
.get(&partition_id)
.copied()
/// Look up a suitable node to process requests for a given partition. Answers are authoritative
/// though subject to propagation delays through the cluster in distributed deployments.
/// Generally, as a consumer of routing information, your options are limited to backing off and
/// retrying the request, or returning an error upstream when information is not available.
pub fn get_partition_location(&self, partition_id: PartitionId) -> PartitionLookup {
let mappings = self.partition_to_node_mappings.load();

// This check should ideally be strengthened to make sure we're using reasonably fresh lookup data
if mappings.version < Version::MIN {
debug!("Partition routing information not available - awaiting refresh");
self.request_refresh();
PartitionLookup::Unavailable
} else {
match mappings.inner.get(&partition_id) {
Some(node_id) if mappings.my_node_id.is_some_and(|my_id| my_id == *node_id) => {
PartitionLookup::Local
}
Some(node_id) => PartitionLookup::Remote(*node_id),
None => PartitionLookup::Unavailable,
}
}
}

/// Call this to hint to the background refresher that its view may be outdated. This is useful
/// when a caller discovers via some other mechanism that routing infromation may be invalid -
/// when a caller discovers via some other mechanism that routing information may be invalid -
/// for example, when a request to a node previously returned by `get_node_by_partition` fails
/// with a response that explicitly indicates that it is no longer serving that partition. The
/// call returns as soon as the request is enqueued. A refresh is not guaranteed to happen.
pub async fn request_refresh(&self) {
self.sender
.send(Command::SyncRoutingInformation)
.await
.expect("Failed to send refresh request");
/// with a response that explicitly indicates that it is no longer serving that partition.
///
/// This call returns immediately, while the refresh itself is performed asynchronously on a
/// best-effort basis. Multiple calls will not result in multiple refresh attempts.
pub fn request_refresh(&self) {
self.sender.try_send(Command::SyncRoutingInformation).ok();
}
}

struct PartitionToNodesRoutingTable {
my_node_id: Option<NodeId>,
version: Version,
/// A mapping of partition IDs to node IDs that are believed to be authoritative for that
/// serving requests for that partition.
Expand Down Expand Up @@ -101,6 +128,7 @@ impl PartitionRoutingRefresher {
sender,
inflight_refresh_task: None,
inner: Arc::new(ArcSwap::new(Arc::new(PartitionToNodesRoutingTable {
my_node_id: None,
version: Version::INVALID,
inner: HashMap::default(),
}))),
Expand All @@ -115,7 +143,7 @@ impl PartitionRoutingRefresher {
}
}

async fn run(mut self) -> anyhow::Result<()> {
async fn run(mut self, my_node_id: GenerationalNodeId) -> anyhow::Result<()> {
debug!("Routing information refresher started");

let update_interval = Configuration::pinned()
Expand All @@ -135,20 +163,20 @@ impl PartitionRoutingRefresher {
Some(cmd) = self.receiver.recv() => {
match cmd {
Command::SyncRoutingInformation => {
self.spawn_sync_routing_information_task();
self.spawn_sync_routing_information_task(my_node_id);
}
}
}
_ = update_interval.tick() => {
trace!("Refreshing routing information...");
self.spawn_sync_routing_information_task();
self.spawn_sync_routing_information_task(my_node_id);
}
}
}
Ok(())
}

fn spawn_sync_routing_information_task(&mut self) {
fn spawn_sync_routing_information_task(&mut self, my_node_id: GenerationalNodeId) {
if !self
.inflight_refresh_task
.as_ref()
Expand All @@ -158,13 +186,17 @@ impl PartitionRoutingRefresher {
let metadata_store_client = self.metadata_store_client.clone();

let task = task_center().spawn_unmanaged(
crate::TaskKind::Disposable,
TaskKind::Disposable,
"refresh-routing-information",
None,
{
async move {
sync_routing_information(partition_to_node_mappings, metadata_store_client)
.await;
sync_routing_information(
my_node_id,
partition_to_node_mappings,
metadata_store_client,
)
.await;
}
},
);
Expand All @@ -175,17 +207,19 @@ impl PartitionRoutingRefresher {

pub fn spawn_partition_routing_refresher(
tc: &TaskCenter,
my_node_id: GenerationalNodeId,
partition_routing_refresher: PartitionRoutingRefresher,
) -> Result<TaskId, ShutdownError> {
tc.spawn(
TaskKind::MetadataBackgroundSync,
"partition-routing-refresher",
None,
partition_routing_refresher.run(),
partition_routing_refresher.run(my_node_id),
)
}

async fn sync_routing_information(
my_node_id: GenerationalNodeId,
partition_to_node_mappings: Arc<ArcSwap<PartitionToNodesRoutingTable>>,
metadata_store_client: MetadataStoreClient,
) {
Expand Down Expand Up @@ -223,8 +257,43 @@ async fn sync_routing_information(
let _ = partition_to_node_mappings.compare_and_swap(
current_mappings,
Arc::new(PartitionToNodesRoutingTable {
my_node_id: Some(Generational(my_node_id)),
version: scheduling_plan.version(),
inner: partition_nodes,
}),
);
}

#[cfg(any(test, feature = "test-util"))]
pub mod test_util {
use std::collections::HashMap;
use std::sync::Arc;

use arc_swap::ArcSwap;
use tokio::sync::mpsc;

use crate::routing_info::PartitionRouting;
use restate_types::identifiers::PartitionId;
use restate_types::{NodeId, Version};

pub struct MockPartitionRouting();

impl MockPartitionRouting {
pub fn all_local() -> PartitionRouting {
let (sender, _) = mpsc::channel(1);
let mut mappings = HashMap::default();
// When used with MockPartitionSelector, all partition lookups will come back as local
mappings.insert(PartitionId::MIN, NodeId::new_generational(0, 1));
PartitionRouting {
sender,
partition_to_node_mappings: Arc::new(ArcSwap::new(Arc::new(
super::PartitionToNodesRoutingTable {
my_node_id: Some(NodeId::new_generational(0, 1)),
version: Version::MIN,
inner: mappings,
},
))),
}
}
}
}
8 changes: 4 additions & 4 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ impl Node {
WorkerRole::create(
health.worker_status(),
metadata.clone(),
partition_routing_refresher.partition_routing(),
updateable_config.clone(),
&mut router_builder,
networking.clone(),
Expand All @@ -234,6 +235,7 @@ impl Node {
bifrost.clone(),
updateable_config.clone(),
metadata,
partition_routing_refresher.partition_routing(),
networking.clone(),
metadata_manager.writer(),
&mut server_builder,
Expand All @@ -253,7 +255,7 @@ impl Node {
&mut router_builder,
worker_role
.as_ref()
.map(|role| role.parition_processor_manager_handle()),
.map(|role| role.partition_processor_manager_handle()),
);

// Ensures that message router is updated after all services have registered themselves in
Expand Down Expand Up @@ -306,9 +308,6 @@ impl Node {
// Start metadata manager
spawn_metadata_manager(&tc, self.metadata_manager)?;

// Start partition routing information refresher
spawn_partition_routing_refresher(&tc, self.partition_routing_refresher)?;

let nodes_config =
Self::upsert_node_config(&self.metadata_store_client, &config.common).await?;
metadata_writer.update(nodes_config).await?;
Expand Down Expand Up @@ -419,6 +418,7 @@ impl Node {
let _ = all_partitions_started_tx.send(());
all_partitions_started_rx
};
spawn_partition_routing_refresher(&tc, my_node_id, self.partition_routing_refresher)?;

if let Some(worker_role) = self.worker_role {
tc.spawn(
Expand Down
3 changes: 3 additions & 0 deletions crates/node/src/roles/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use restate_core::network::MessageRouterBuilder;
use restate_core::network::NetworkServerBuilder;
use restate_core::network::Networking;
use restate_core::network::TransportConnect;
use restate_core::routing_info::PartitionRouting;
use restate_core::{task_center, Metadata, MetadataWriter, TaskCenter, TaskKind};
use restate_service_client::{AssumeRoleCacheMode, ServiceClient};
use restate_service_protocol::discovery::ServiceDiscovery;
Expand Down Expand Up @@ -64,6 +65,7 @@ impl<T: TransportConnect> AdminRole<T> {
bifrost: Bifrost,
updateable_config: Live<Configuration>,
metadata: Metadata,
partition_routing: PartitionRouting,
networking: Networking<T>,
metadata_writer: MetadataWriter,
server_builder: &mut NetworkServerBuilder,
Expand All @@ -90,6 +92,7 @@ impl<T: TransportConnect> AdminRole<T> {
None,
RemoteInvokerStatusHandle,
metadata.updateable_schema(),
partition_routing,
create_remote_scanner_service(
networking.clone(),
task_center.clone(),
Expand Down
7 changes: 5 additions & 2 deletions crates/node/src/roles/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use restate_bifrost::Bifrost;
use restate_core::network::MessageRouterBuilder;
use restate_core::network::Networking;
use restate_core::network::TransportConnect;
use restate_core::routing_info::PartitionRouting;
use restate_core::worker_api::ProcessorsManagerHandle;
use restate_core::{cancellation_watcher, task_center, Metadata, MetadataKind};
use restate_core::{ShutdownError, TaskKind};
Expand Down Expand Up @@ -73,6 +74,7 @@ impl<T: TransportConnect> WorkerRole<T> {
pub async fn create(
health_status: HealthStatus<WorkerStatus>,
metadata: Metadata,
partition_routing: PartitionRouting,
updateable_config: Live<Configuration>,
router_builder: &mut MessageRouterBuilder,
networking: Networking<T>,
Expand All @@ -84,6 +86,7 @@ impl<T: TransportConnect> WorkerRole<T> {
updateable_config,
health_status,
metadata.clone(),
partition_routing,
networking,
bifrost,
router_builder,
Expand All @@ -95,8 +98,8 @@ impl<T: TransportConnect> WorkerRole<T> {
Ok(WorkerRole { worker, metadata })
}

pub fn parition_processor_manager_handle(&self) -> ProcessorsManagerHandle {
self.worker.parition_processor_manager_handle()
pub fn partition_processor_manager_handle(&self) -> ProcessorsManagerHandle {
self.worker.partition_processor_manager_handle()
}

pub fn storage_query_context(&self) -> &QueryContext {
Expand Down
5 changes: 4 additions & 1 deletion crates/storage-query-datafusion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::sql::TableReference;
use restate_core::routing_info::PartitionRouting;
use restate_core::Metadata;
use restate_invoker_api::StatusHandle;
use restate_partition_store::PartitionStoreManager;
Expand Down Expand Up @@ -113,9 +114,11 @@ impl QueryContext {
schemas: Live<
impl DeploymentResolver + ServiceMetadataResolver + Send + Sync + Debug + Clone + 'static,
>,
partition_routing: PartitionRouting,
remote_scanner_service: Arc<dyn RemoteScannerService>,
) -> Result<QueryContext, BuildError> {
let remote_scanner_manager = RemoteScannerManager::new(remote_scanner_service);
let remote_scanner_manager =
RemoteScannerManager::new(partition_routing, remote_scanner_service);

let ctx = QueryContext::new(
options.memory_size.get(),
Expand Down
11 changes: 7 additions & 4 deletions crates/storage-query-datafusion/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::RangeInclusive;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DataFusionError;
use datafusion::execution::SendableRecordBatchStream;
use googletest::matcher::{Matcher, MatcherResult};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::RangeInclusive;
use std::sync::Arc;

use restate_core::routing_info::MockPartitionRouting;
use restate_core::task_center;
use restate_invoker_api::status_handle::test_util::MockStatusHandle;
use restate_invoker_api::StatusHandle;
Expand Down Expand Up @@ -172,6 +174,7 @@ impl MockQueryEngine {
Some(manager),
status,
Live::from_value(schemas),
MockPartitionRouting::all_local(),
Arc::new(NoopSvc),
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
partition_id: PartitionId,
range: RangeInclusive<PartitionKey>,
projection: SchemaRef,
) -> SendableRecordBatchStream {
) -> anyhow::Result<SendableRecordBatchStream> {
let mut stream_builder = RecordBatchReceiverStream::builder(projection.clone(), 16);
let tx = stream_builder.tx();
let partition_store_manager = self.partition_store_manager.clone();
Expand Down Expand Up @@ -103,6 +103,6 @@ where
Ok(())
};
stream_builder.spawn(background_task);
stream_builder.build()
Ok(stream_builder.build())
}
}
Loading

0 comments on commit bd4ea08

Please sign in to comment.