Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire up PartitionRouting into RemoteScannerManager #2172

Merged
merged 9 commits into from
Nov 5, 2024
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod metadata;
pub mod metadata_store;
mod metric_definitions;
pub mod network;
pub mod routing_info;
pub mod partitions;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is more intuitive and more general for the future.

mod task_center;
mod task_center_types;
pub mod worker_api;
Expand Down
15 changes: 7 additions & 8 deletions crates/core/src/network/partition_processor_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use restate_types::partition_table::{FindPartition, PartitionTable, PartitionTab

use crate::network::rpc_router::{ConnectionAwareRpcError, ConnectionAwareRpcRouter, RpcError};
use crate::network::{HasConnection, Networking, Outgoing, TransportConnect};
use crate::routing_info::PartitionRouting;
use crate::{metadata, ShutdownError};
use crate::partitions::PartitionRouting;
use crate::ShutdownError;

#[derive(Debug, thiserror::Error)]
pub enum PartitionProcessorRpcClientError {
Expand Down Expand Up @@ -319,12 +319,11 @@ where
.pinned()
.find_partition_id(inner_request.partition_key())?;

// TODO enable it once https://github.com/restatedev/restate/pull/2172 goes in
// let node_id = self
// .partition_routing
// .get_node_by_partition(partition_id)
// .ok_or(PartitionProcessorRpcClientError::UnknownNode(partition_id))?;
let node_id = metadata().my_node_id();
let node_id = self
.partition_routing
.get_node_by_partition(partition_id)
.ok_or(PartitionProcessorRpcClientError::UnknownNode(partition_id))?;
Comment on lines +322 to +325
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we gonna refresh the partition_routing in the PartitionProcessorRpcClient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we shouldn't just inline this responsibility into the get_node_by_partition method; as you've suggested elsewhere, perhaps it's worth at least a single retry before giving up. We'd have to align the retry strategy's delay with the refresh interval otherwise - the tension is either putting increased load on the store polling for refreshes (until we do something better), or we slow down pending unknown lookups by up to the refresh interval (3s at present).

My worry is that by blocking assorted requests throughout the system for up to 3s, we could massively increase concurrency. This is why I didn't want to add a blocking version that waits for a refresh. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the idea of pushing every potential caller into having to remember to manually call request_refresh. Let me rethink that; it seemed more reasonable when we are operating on a user-issued SQL query but the user has no influence over what partition ids we decide to execute against. The fact that we have a partition id that can't be routed to a node is a strong enough signal that something needs a refresh.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the case where we don't have information for a given PartitionId inlining into the get_node_by_partition might work. However, if the case is that we are returning a node that is no longer the leader, it seems to me that we need to handle this situation in the component which is using the PartitionRouting (deciding to call refresh and then deciding on how to handle this situation (failing, retrying w/o waiting for the update, awaiting the update and the retrying, etc.)). By offering an API where one can wait for an update, it would allow higher level components to opt-in to this behavior if needed.

Regarding this very specific comment, it was more about who's calling refresh on the PartitionRouting struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in time, this should probably become async, and we can introduce a retry policy - or a retrying wrapper around the lookup, so we can control retries by call site. For now, I've added an automatic refresh request whenever we return None from lookup so that callers don't have to think about that. But, that may take up to 3s with current config, so we should probably keep failing to the caller immediately, and let the client redrive.


let response = self
.rpc_router
.call(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::pin::pin;
use std::sync::Arc;

Expand Down Expand Up @@ -37,9 +38,8 @@ pub enum Command {
SyncRoutingInformation,
}

/// Holds a view of the known partition-to-node mappings. Use it to discover which node(s) to route
/// requests to for a given partition. Compared to the partition table, this view is more dynamic as
/// it changes based on cluster nodes' operational status. This handle can be cheaply cloned.
/// Discover cluster nodes for a given partition. Compared to the partition table, this view is more
/// dynamic as it changes based on cluster nodes' operational status. Can be cheaply cloned.
#[derive(Clone)]
pub struct PartitionRouting {
sender: CommandSender,
Expand All @@ -48,28 +48,61 @@ pub struct PartitionRouting {
}

impl PartitionRouting {
/// Look up a suitable node to answer requests for the given partition.
/// Look up a suitable node to process requests for a given partition. Answers are authoritative
/// though subject to propagation delays through the cluster in distributed deployments.
/// Generally, as a consumer of routing information, your options are limited to backing off and
/// retrying the request, or returning an error upstream when information is not available.
///
/// A `None` response indicates that either we have no knowledge about this partition, or that
/// the routing table has not yet been refreshed for the cluster. The latter condition should be
/// brief and only on startup, so we can generally treat lack of response as a negative answer.
/// An automatic refresh is scheduled any time a `None` response is returned.
pub fn get_node_by_partition(&self, partition_id: PartitionId) -> Option<NodeId> {
self.partition_to_node_mappings
.load()
.inner
.get(&partition_id)
.copied()
let mappings = self.partition_to_node_mappings.load();

// This check should ideally be strengthened to make sure we're using reasonably fresh lookup data
if mappings.version < Version::MIN {
debug!("Partition routing information not available - requesting refresh");
self.request_refresh();
return None;
}

let maybe_node = mappings.inner.get(&partition_id).cloned();
if maybe_node.is_none() {
debug!(
?partition_id,
"No known node for partition - requesting refresh"
);
self.request_refresh();
}
Comment on lines +71 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Practically, this change will probably have no visible effect right now since the SchedulingPlan with version Version::MIN should contain for all partitions an assignment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The earlier conditional is will kick in before any load has happened, i.e. version = Version::INVALID. This one will kick in if there is a new partition id that we have no node for, when version >= Version::MIN. Might still be missing something but that was my intent :-)

maybe_node
}

/// Call this to hint to the background refresher that its view may be outdated. This is useful
/// when a caller discovers via some other mechanism that routing infromation may be invalid -
/// for example, when a request to a node previously returned by `get_node_by_partition` fails
/// with a response that explicitly indicates that it is no longer serving that partition. The
/// call returns as soon as the request is enqueued. A refresh is not guaranteed to happen.
pub async fn request_refresh(&self) {
self.sender
.send(Command::SyncRoutingInformation)
.await
.expect("Failed to send refresh request");
/// Provide a hint that the partition-to-nodes view may be outdated. This is useful when a
/// caller discovers via some other mechanism that routing information may be invalid - for
/// example, when a request to a node previously returned by
/// [`PartitionRouting::get_node_by_partition`] indicates that it is no longer serving that
/// partition.
///
/// This call returns immediately, while the refresh itself is performed asynchronously on a
/// best-effort basis. Multiple calls will not result in multiple refresh attempts. You only
/// need to call this method if you get a node id, and later discover it's incorrect; a `None`
/// response to a lookup triggers a refresh automatically.
pub fn request_refresh(&self) {
// if the channel already contains an unconsumed message, it doesn't matter that we can't send another
let _ = self.sender.try_send(Command::SyncRoutingInformation).ok();
}
}

impl Debug for PartitionRouting {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("PartitionNodeResolver(")?;
self.partition_to_node_mappings.load().fmt(f)?;
f.write_str(")")
}
}

#[derive(Debug)]
struct PartitionToNodesRoutingTable {
version: Version,
/// A mapping of partition IDs to node IDs that are believed to be authoritative for that
Expand Down Expand Up @@ -158,7 +191,7 @@ impl PartitionRoutingRefresher {
let metadata_store_client = self.metadata_store_client.clone();

let task = task_center().spawn_unmanaged(
crate::TaskKind::Disposable,
TaskKind::Disposable,
"refresh-routing-information",
None,
{
Expand Down Expand Up @@ -228,3 +261,36 @@ async fn sync_routing_information(
}),
);
}

#[cfg(any(test, feature = "test-util"))]
pub mod mocks {
use std::collections::HashMap;
use std::sync::Arc;

use arc_swap::ArcSwap;
use tokio::sync::mpsc;

use crate::partitions::PartitionRouting;
use restate_types::identifiers::PartitionId;
use restate_types::{GenerationalNodeId, NodeId, Version};

pub fn fixed_single_node(
node_id: GenerationalNodeId,
partition_id: PartitionId,
) -> PartitionRouting {
let (sender, _) = mpsc::channel(1);

let mut mappings = HashMap::default();
mappings.insert(partition_id, NodeId::Generational(node_id));

PartitionRouting {
sender,
partition_to_node_mappings: Arc::new(ArcSwap::new(Arc::new(
super::PartitionToNodesRoutingTable {
version: Version::MIN,
inner: mappings,
},
))),
}
}
}
6 changes: 4 additions & 2 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use restate_core::metadata_store::{retry_on_network_error, ReadWriteError};
use restate_core::network::{
GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking,
};
use restate_core::routing_info::{spawn_partition_routing_refresher, PartitionRoutingRefresher};
use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher};
use restate_core::{
spawn_metadata_manager, MetadataBuilder, MetadataKind, MetadataManager, TargetVersion,
};
Expand Down Expand Up @@ -219,6 +219,7 @@ impl Node {
WorkerRole::create(
health.worker_status(),
metadata.clone(),
partition_routing_refresher.partition_routing(),
updateable_config.clone(),
&mut router_builder,
networking.clone(),
Expand All @@ -240,6 +241,7 @@ impl Node {
bifrost.clone(),
updateable_config.clone(),
metadata,
partition_routing_refresher.partition_routing(),
networking.clone(),
metadata_manager.writer(),
&mut server_builder,
Expand All @@ -259,7 +261,7 @@ impl Node {
&mut router_builder,
worker_role
.as_ref()
.map(|role| role.parition_processor_manager_handle()),
.map(|role| role.partition_processor_manager_handle()),
);

// Ensures that message router is updated after all services have registered themselves in
Expand Down
19 changes: 14 additions & 5 deletions crates/node/src/roles/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ use restate_core::network::MessageRouterBuilder;
use restate_core::network::NetworkServerBuilder;
use restate_core::network::Networking;
use restate_core::network::TransportConnect;
use restate_core::partitions::PartitionRouting;
use restate_core::{task_center, Metadata, MetadataWriter, TaskCenter, TaskKind};
use restate_service_client::{AssumeRoleCacheMode, ServiceClient};
use restate_service_protocol::discovery::ServiceDiscovery;
use restate_storage_query_datafusion::context::{QueryContext, SelectPartitionsFromMetadata};
use restate_storage_query_datafusion::remote_invoker_status_handle::RemoteInvokerStatusHandle;
use restate_storage_query_datafusion::remote_query_scanner_client::create_remote_scanner_service;
use restate_storage_query_datafusion::remote_query_scanner_manager::RemoteScannerManager;
use restate_types::config::Configuration;
use restate_types::config::IngressOptions;
use restate_types::health::HealthStatus;
Expand Down Expand Up @@ -62,6 +64,7 @@ impl<T: TransportConnect> AdminRole<T> {
bifrost: Bifrost,
updateable_config: Live<Configuration>,
metadata: Metadata,
partition_routing: PartitionRouting,
networking: Networking<T>,
metadata_writer: MetadataWriter,
server_builder: &mut NetworkServerBuilder,
Expand All @@ -81,18 +84,24 @@ impl<T: TransportConnect> AdminRole<T> {
let query_context = if let Some(query_context) = local_query_context {
query_context
} else {
let remote_scanner_manager = RemoteScannerManager::new(
metadata.clone(),
partition_routing,
create_remote_scanner_service(
networking.clone(),
task_center.clone(),
router_builder,
),
);

// need to create a remote query context since we are not co-located with a worker role
QueryContext::create(
&config.admin.query_engine,
SelectPartitionsFromMetadata::new(metadata.clone()),
None,
RemoteInvokerStatusHandle,
metadata.updateable_schema(),
create_remote_scanner_service(
networking.clone(),
task_center.clone(),
router_builder,
),
remote_scanner_manager,
)
.await?
};
Expand Down
7 changes: 5 additions & 2 deletions crates/node/src/roles/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use restate_bifrost::Bifrost;
use restate_core::network::MessageRouterBuilder;
use restate_core::network::Networking;
use restate_core::network::TransportConnect;
use restate_core::partitions::PartitionRouting;
use restate_core::worker_api::ProcessorsManagerHandle;
use restate_core::{cancellation_watcher, task_center, Metadata, MetadataKind};
use restate_core::{ShutdownError, TaskKind};
Expand Down Expand Up @@ -72,6 +73,7 @@ impl<T: TransportConnect> WorkerRole<T> {
pub async fn create(
health_status: HealthStatus<WorkerStatus>,
metadata: Metadata,
partition_routing: PartitionRouting,
updateable_config: Live<Configuration>,
router_builder: &mut MessageRouterBuilder,
networking: Networking<T>,
Expand All @@ -83,6 +85,7 @@ impl<T: TransportConnect> WorkerRole<T> {
updateable_config,
health_status,
metadata.clone(),
partition_routing,
networking,
bifrost,
router_builder,
Expand All @@ -94,8 +97,8 @@ impl<T: TransportConnect> WorkerRole<T> {
Ok(WorkerRole { worker, metadata })
}

pub fn parition_processor_manager_handle(&self) -> ProcessorsManagerHandle {
self.worker.parition_processor_manager_handle()
pub fn partition_processor_manager_handle(&self) -> ProcessorsManagerHandle {
self.worker.partition_processor_manager_handle()
}

pub fn storage_query_context(&self) -> &QueryContext {
Expand Down
7 changes: 3 additions & 4 deletions crates/storage-query-datafusion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::sql::TableReference;

use restate_core::Metadata;
use restate_invoker_api::StatusHandle;
use restate_partition_store::PartitionStoreManager;
Expand All @@ -33,7 +34,6 @@ use restate_types::live::Live;
use restate_types::schema::deployment::DeploymentResolver;
use restate_types::schema::service::ServiceMetadataResolver;

use crate::remote_query_scanner_client::RemoteScannerService;
use crate::remote_query_scanner_manager::RemoteScannerManager;
use crate::table_providers::ScanPartition;
use crate::{analyzer, physical_optimizer};
Expand Down Expand Up @@ -105,6 +105,7 @@ pub struct QueryContext {
}

impl QueryContext {
#[allow(clippy::too_many_arguments)]
pub async fn create(
options: &QueryEngineOptions,
partition_selector: impl SelectPartitions + Clone,
Expand All @@ -113,10 +114,8 @@ impl QueryContext {
schemas: Live<
impl DeploymentResolver + ServiceMetadataResolver + Send + Sync + Debug + Clone + 'static,
>,
remote_scanner_service: Arc<dyn RemoteScannerService>,
remote_scanner_manager: RemoteScannerManager,
) -> Result<QueryContext, BuildError> {
let remote_scanner_manager = RemoteScannerManager::new(remote_scanner_service);

let ctx = QueryContext::new(
options.memory_size.get(),
options.tmp_dir.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/storage-query-datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) mod mocks;

pub mod remote_invoker_status_handle;
pub mod remote_query_scanner_client;
mod remote_query_scanner_manager;
pub mod remote_query_scanner_manager;
#[cfg(test)]
mod tests;

Expand Down
Loading
Loading