Skip to content

Commit

Permalink
Self review
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Nov 4, 2024
1 parent 9782dbe commit 3f44fc3
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 19 deletions.
11 changes: 5 additions & 6 deletions crates/core/src/network/partition_processor_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,11 @@ where
.pinned()
.find_partition_id(inner_request.partition_key())?;

// TODO enable it once https://github.com/restatedev/restate/pull/2172 goes in
// let node_id = self
// .partition_routing
// .get_node_by_partition(partition_id)
// .ok_or(PartitionProcessorRpcClientError::UnknownNode(partition_id))?;
let node_id = metadata().my_node_id();
let node_id = self
.partition_routing
.get_node_by_partition(partition_id)
.ok_or(PartitionProcessorRpcClientError::UnknownNode(partition_id))?;

let response = self
.rpc_router
.call(
Expand Down
13 changes: 2 additions & 11 deletions crates/core/src/partitions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ use crate::{
cancellation_watcher, task_center, ShutdownError, TaskCenter, TaskHandle, TaskId, TaskKind,
};

// #[cfg(any(test))]
// pub use mocks::*;

pub type CommandSender = mpsc::Sender<Command>;
pub type CommandReceiver = mpsc::Receiver<Command>;

Expand All @@ -59,7 +56,7 @@ impl PartitionRouting {
/// A `None` response indicates that either we have no knowledge about this partition, or that
/// the routing table has not yet been refreshed for the cluster. The latter condition should be
/// brief and only on startup, so we can generally treat lack of response as a negative answer.
pub fn get_partition_location(&self, partition_id: PartitionId) -> Option<NodeId> {
pub fn get_node_by_partition(&self, partition_id: PartitionId) -> Option<NodeId> {
let mappings = self.partition_to_node_mappings.load();

// This check should ideally be strengthened to make sure we're using reasonably fresh lookup data
Expand All @@ -75,7 +72,7 @@ impl PartitionRouting {
/// Provide a hint that the partition-to-nodes view may be outdated. This is useful when a
/// caller discovers via some other mechanism that routing information may be invalid - for
/// example, when a request to a node previously returned by
/// [`PartitionRouting::get_partition_location`] indicates that it is no longer serving that
/// [`PartitionRouting::get_node_by_partition`] indicates that it is no longer serving that
/// partition.
///
/// This call returns immediately, while the refresh itself is performed asynchronously on a
Expand Down Expand Up @@ -262,16 +259,10 @@ pub mod mocks {
use arc_swap::ArcSwap;
use tokio::sync::mpsc;

// use crate::partitions::PartitionNodeLookup;
use crate::partitions::PartitionRouting;
use restate_types::identifiers::PartitionId;
use restate_types::{GenerationalNodeId, NodeId, Version};

// #[derive(Debug)]
// pub struct FixedPartitionNode {
// my_node_id: GenerationalNodeId,
// }

pub fn fixed_single_node(
node_id: GenerationalNodeId,
partition_id: PartitionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl RemoteScannerManager {
) -> anyhow::Result<PartitionLocation> {
let my_node_id = self.my_node_id();

match self.partition_routing.get_partition_location(partition_id) {
match self.partition_routing.get_node_by_partition(partition_id) {
None => {
self.partition_routing.request_refresh();
bail!("node lookup for partition {} failed", partition_id)
Expand All @@ -153,7 +153,7 @@ impl RemoteScannerManager {
// Note: we initialize my_node_id lazily as it is not available at construction time. We should
// be careful since the call to metadata().my_node_id() will panic if the system configuration
// isn't fully initialized yet, but we shouldn't be able to accept queries until that is in fact
// the case. For testing, set the node id explicitly with new_with_fixed_node_id.
// the case.
#[inline]
fn my_node_id(&self) -> &GenerationalNodeId {
self.my_node_id.get_or_init(|| metadata().my_node_id())
Expand Down

0 comments on commit 3f44fc3

Please sign in to comment.