Skip to content

Commit

Permalink
Rename PartitionRouting to PartitionNodeResolver, simplify mocking
Browse files Browse the repository at this point in the history
Also renames the routing_info module to partitions, which is more reflective of
its purpose as well as more general.
  • Loading branch information
pcholakov committed Nov 4, 2024
1 parent 6657938 commit a8c8fc4
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 158 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod metadata;
pub mod metadata_store;
mod metric_definitions;
pub mod network;
pub mod routing_info;
pub mod partitions;
mod task_center;
mod task_center_types;
pub mod worker_api;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
// by the Apache License, Version 2.0.

use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::pin::pin;
use std::sync::{Arc, OnceLock};
use std::sync::Arc;

use arc_swap::ArcSwap;
use tokio::sync::mpsc;
Expand All @@ -22,16 +23,15 @@ use restate_types::cluster_controller::SchedulingPlan;
use restate_types::config::Configuration;
use restate_types::identifiers::PartitionId;
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
use restate_types::{GenerationalNodeId, NodeId, Version, Versioned};
use restate_types::{NodeId, Version, Versioned};

use crate::metadata_store::MetadataStoreClient;
use crate::{
cancellation_watcher, metadata, task_center, ShutdownError, TaskCenter, TaskHandle, TaskId,
TaskKind,
cancellation_watcher, task_center, ShutdownError, TaskCenter, TaskHandle, TaskId, TaskKind,
};

#[cfg(any(test, feature = "test-util"))]
pub use test_util::*;
// #[cfg(any(test))]
// pub use mocks::*;

pub type CommandSender = mpsc::Sender<Command>;
pub type CommandReceiver = mpsc::Receiver<Command>;
Expand All @@ -41,37 +41,39 @@ pub enum Command {
SyncRoutingInformation,
}

pub enum PartitionAuthoritativeNode {
/// The local node is an authoritative source for this partition.
Local,
/// The partition resides on a remote node.
Remote(NodeId),
}

/// Holds a view of the known partition-to-node mappings. Use it to discover which node(s) to route
/// requests to for a given partition. Compared to the partition table, this view is more dynamic as
/// it changes based on cluster nodes' operational status. This handle can be cheaply cloned.
/// Discover cluster nodes for a given partition. Compared to the partition table, this view is more
/// dynamic as it changes based on cluster nodes' operational status. Can be cheaply cloned.
#[derive(Clone)]
pub struct PartitionRouting {
pub struct PartitionNodeResolver {
sender: CommandSender,
/// A mapping of partition IDs to node IDs that are believed to be authoritative for that serving requests.
partition_to_node_mappings: Arc<ArcSwap<PartitionToNodesRoutingTable>>,
my_node_id: OnceLock<GenerationalNodeId>,
}

impl PartitionRouting {
/// Look up a suitable node to process requests for a given partition. Answers are authoritative
/// though subject to propagation delays through the cluster in distributed deployments.
/// Generally, as a consumer of routing information, your options are limited to backing off and
/// retrying the request, or returning an error upstream when information is not available.
///
/// A `None` response indicates that either we have no knowledge about this partition, or that
/// the routing table has not yet been refreshed for the cluster. The latter condition should be
/// brief and only on startup, so we can generally treat lack of response as a negative answer.
pub fn get_partition_location(
&self,
partition_id: PartitionId,
) -> Option<PartitionAuthoritativeNode> {
// pub trait PartitionNodeLookup: Send + Sync + Debug + 'static {
// /// Look up a suitable node to process requests for a given partition. Answers are authoritative
// /// though subject to propagation delays through the cluster in distributed deployments.
// /// Generally, as a consumer of routing information, your options are limited to backing off and
// /// retrying the request, or returning an error upstream when information is not available.
// ///
// /// A `None` response indicates that either we have no knowledge about this partition, or that
// /// the routing table has not yet been refreshed for the cluster. The latter condition should be
// /// brief and only on startup, so we can generally treat lack of response as a negative answer.
// fn get_partition_location(&self, partition_id: PartitionId) -> Option<NodeId>;
//
// /// Provide a hint that the partition-to-nodes view may be outdated. This is useful when a
// /// caller discovers via some other mechanism that routing information may be invalid - for
// /// example, when a request to a node previously returned by
// /// [`PartitionNodeLookup::get_partition_location`] indicates that it is no longer serving that
// /// partition.
// ///
// /// This call returns immediately, while the refresh itself is performed asynchronously on a
// /// best-effort basis. Multiple calls will not result in multiple refresh attempts.
// fn request_refresh(&self);
// }

impl PartitionNodeResolver {
pub fn get_partition_location(&self, partition_id: PartitionId) -> Option<NodeId> {
let mappings = self.partition_to_node_mappings.load();

// This check should ideally be strengthened to make sure we're using reasonably fresh lookup data
Expand All @@ -80,41 +82,24 @@ impl PartitionRouting {
self.request_refresh();
None
} else {
let partition_node = mappings.inner.get(&partition_id).cloned();
// Note: we defer calling my_node_id() until after we have a version of the routing
// table to ensure that we don't panic during server startup. Having loaded the routing
// table is an implicit but nonetheless strong signal that node initialization is done.
match partition_node {
Some(node_id)
if node_id
.as_generational()
.is_some_and(|id| id == *self.my_node_id()) =>
{
Some(PartitionAuthoritativeNode::Local)
}
Some(node_id) => Some(PartitionAuthoritativeNode::Remote(node_id)),
None => None,
}
mappings.inner.get(&partition_id).cloned()
}
}

#[inline]
fn my_node_id(&self) -> &GenerationalNodeId {
self.my_node_id.get_or_init(|| metadata().my_node_id())
}

/// Call this to hint to the background refresher that its view may be outdated. This is useful
/// when a caller discovers via some other mechanism that routing information may be invalid -
/// for example, when a request to a node previously returned by `get_node_by_partition` fails
/// with a response that explicitly indicates that it is no longer serving that partition.
///
/// This call returns immediately, while the refresh itself is performed asynchronously on a
/// best-effort basis. Multiple calls will not result in multiple refresh attempts.
pub fn request_refresh(&self) {
self.sender.try_send(Command::SyncRoutingInformation).ok();
}
}

impl Debug for PartitionNodeResolver {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("PartitionNodeResolver(")?;
self.partition_to_node_mappings.load().fmt(f)?;
f.write_str(")")
}
}

#[derive(Debug)]
struct PartitionToNodesRoutingTable {
version: Version,
/// A mapping of partition IDs to node IDs that are believed to be authoritative for that
Expand Down Expand Up @@ -153,11 +138,10 @@ impl PartitionRoutingRefresher {
}

/// Get a handle to the partition-to-node routing table.
pub fn partition_routing(&self) -> PartitionRouting {
PartitionRouting {
pub fn partition_node_resolver(&self) -> PartitionNodeResolver {
PartitionNodeResolver {
sender: self.sender.clone(),
partition_to_node_mappings: self.inner.clone(),
my_node_id: OnceLock::new(), // looked on use to only perform the lookup after we observe a version of routing
}
}

Expand Down Expand Up @@ -276,57 +260,40 @@ async fn sync_routing_information(
}

#[cfg(any(test, feature = "test-util"))]
pub mod test_util {
pub mod mocks {
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, OnceLock};
use std::sync::Arc;

use arc_swap::ArcSwap;
use tokio::sync::mpsc;

use crate::routing_info::PartitionRouting;
// use crate::partitions::PartitionNodeLookup;
use crate::partitions::PartitionNodeResolver;
use restate_types::identifiers::PartitionId;
use restate_types::{GenerationalNodeId, NodeId, Version};

pub struct MockPartitionRouting {
my_node_id: GenerationalNodeId,
partition_routing: PartitionRouting,
}

impl MockPartitionRouting {
pub fn local_only() -> Self {
let (sender, _) = mpsc::channel(1);
let mut mappings = HashMap::default();

// Matches MockPartitionSelector's default
let my_node_id = GenerationalNodeId::new(0, 1);

mappings.insert(PartitionId::MIN, NodeId::Generational(my_node_id));
MockPartitionRouting {
my_node_id,
partition_routing: PartitionRouting {
sender,
partition_to_node_mappings: Arc::new(ArcSwap::new(Arc::new(
super::PartitionToNodesRoutingTable {
version: Version::MIN,
inner: mappings,
},
))),
my_node_id: OnceLock::from(my_node_id),
},
}
}
// #[derive(Debug)]
// pub struct FixedPartitionNode {
// my_node_id: GenerationalNodeId,
// }

pub fn my_node_id(&self) -> GenerationalNodeId {
self.my_node_id
}
}
pub fn fixed_single_node(
node_id: GenerationalNodeId,
partition_id: PartitionId,
) -> PartitionNodeResolver {
let (sender, _) = mpsc::channel(1);

impl Deref for MockPartitionRouting {
type Target = PartitionRouting;
let mut mappings = HashMap::default();
mappings.insert(partition_id, NodeId::Generational(node_id));

fn deref(&self) -> &Self::Target {
&self.partition_routing
PartitionNodeResolver {
sender,
partition_to_node_mappings: Arc::new(ArcSwap::new(Arc::new(
super::PartitionToNodesRoutingTable {
version: Version::MIN,
inner: mappings,
},
))),
}
}
}
6 changes: 3 additions & 3 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_core::metadata_store::{retry_on_network_error, ReadWriteError};
use restate_core::network::{
GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking,
};
use restate_core::routing_info::{spawn_partition_routing_refresher, PartitionRoutingRefresher};
use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher};
use restate_core::{
spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager, TargetVersion,
};
Expand Down Expand Up @@ -213,7 +213,7 @@ impl Node {
WorkerRole::create(
health.worker_status(),
metadata.clone(),
partition_routing_refresher.partition_routing(),
partition_routing_refresher.partition_node_resolver(),
updateable_config.clone(),
&mut router_builder,
networking.clone(),
Expand All @@ -235,7 +235,7 @@ impl Node {
bifrost.clone(),
updateable_config.clone(),
metadata,
partition_routing_refresher.partition_routing(),
partition_routing_refresher.partition_node_resolver(),
networking.clone(),
metadata_manager.writer(),
&mut server_builder,
Expand Down
21 changes: 13 additions & 8 deletions crates/node/src/roles/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ use restate_core::network::MessageRouterBuilder;
use restate_core::network::NetworkServerBuilder;
use restate_core::network::Networking;
use restate_core::network::TransportConnect;
use restate_core::routing_info::PartitionRouting;
use restate_core::partitions::PartitionNodeResolver;
use restate_core::{task_center, Metadata, MetadataWriter, TaskCenter, TaskKind};
use restate_service_client::{AssumeRoleCacheMode, ServiceClient};
use restate_service_protocol::discovery::ServiceDiscovery;
use restate_storage_query_datafusion::context::{QueryContext, SelectPartitionsFromMetadata};
use restate_storage_query_datafusion::remote_invoker_status_handle::RemoteInvokerStatusHandle;
use restate_storage_query_datafusion::remote_query_scanner_client::create_remote_scanner_service;
use restate_storage_query_datafusion::remote_query_scanner_manager::RemoteScannerManager;
use restate_types::config::Configuration;
use restate_types::config::IngressOptions;
use restate_types::health::HealthStatus;
Expand Down Expand Up @@ -65,7 +66,7 @@ impl<T: TransportConnect> AdminRole<T> {
bifrost: Bifrost,
updateable_config: Live<Configuration>,
metadata: Metadata,
partition_routing: PartitionRouting,
partition_node_resolver: PartitionNodeResolver,
networking: Networking<T>,
metadata_writer: MetadataWriter,
server_builder: &mut NetworkServerBuilder,
Expand All @@ -85,19 +86,23 @@ impl<T: TransportConnect> AdminRole<T> {
let query_context = if let Some(query_context) = local_query_context {
query_context
} else {
let remote_scanner_manager = RemoteScannerManager::new(
partition_node_resolver,
create_remote_scanner_service(
networking.clone(),
task_center.clone(),
router_builder,
),
);

// need to create a remote query context since we are not co-located with a worker role
QueryContext::create(
&config.admin.query_engine,
SelectPartitionsFromMetadata::new(metadata.clone()),
None,
RemoteInvokerStatusHandle,
metadata.updateable_schema(),
partition_routing,
create_remote_scanner_service(
networking.clone(),
task_center.clone(),
router_builder,
),
remote_scanner_manager,
)
.await?
};
Expand Down
6 changes: 3 additions & 3 deletions crates/node/src/roles/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use restate_bifrost::Bifrost;
use restate_core::network::MessageRouterBuilder;
use restate_core::network::Networking;
use restate_core::network::TransportConnect;
use restate_core::routing_info::PartitionRouting;
use restate_core::partitions::PartitionNodeResolver;
use restate_core::worker_api::ProcessorsManagerHandle;
use restate_core::{cancellation_watcher, task_center, Metadata, MetadataKind};
use restate_core::{ShutdownError, TaskKind};
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<T: TransportConnect> WorkerRole<T> {
pub async fn create(
health_status: HealthStatus<WorkerStatus>,
metadata: Metadata,
partition_routing: PartitionRouting,
partition_node_resolver: PartitionNodeResolver,
updateable_config: Live<Configuration>,
router_builder: &mut MessageRouterBuilder,
networking: Networking<T>,
Expand All @@ -86,7 +86,7 @@ impl<T: TransportConnect> WorkerRole<T> {
updateable_config,
health_status,
metadata.clone(),
partition_routing,
partition_node_resolver,
networking,
bifrost,
router_builder,
Expand Down
8 changes: 1 addition & 7 deletions crates/storage-query-datafusion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::sql::TableReference;

use restate_core::routing_info::PartitionRouting;
use restate_core::Metadata;
use restate_invoker_api::StatusHandle;
use restate_partition_store::PartitionStoreManager;
Expand All @@ -35,7 +34,6 @@ use restate_types::live::Live;
use restate_types::schema::deployment::DeploymentResolver;
use restate_types::schema::service::ServiceMetadataResolver;

use crate::remote_query_scanner_client::RemoteScannerService;
use crate::remote_query_scanner_manager::RemoteScannerManager;
use crate::table_providers::ScanPartition;
use crate::{analyzer, physical_optimizer};
Expand Down Expand Up @@ -116,12 +114,8 @@ impl QueryContext {
schemas: Live<
impl DeploymentResolver + ServiceMetadataResolver + Send + Sync + Debug + Clone + 'static,
>,
partition_routing: PartitionRouting,
remote_scanner_service: Arc<dyn RemoteScannerService>,
remote_scanner_manager: RemoteScannerManager,
) -> Result<QueryContext, BuildError> {
let remote_scanner_manager =
RemoteScannerManager::new(partition_routing, remote_scanner_service);

let ctx = QueryContext::new(
options.memory_size.get(),
options.tmp_dir.clone(),
Expand Down
Loading

0 comments on commit a8c8fc4

Please sign in to comment.