-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a PartitionRouting partition-id-to-node mapping holder and background refresher #2166
Conversation
@@ -45,9 +45,6 @@ use crate::task_center; | |||
pub(super) type CommandSender = mpsc::UnboundedSender<Command>; | |||
pub(super) type CommandReceiver = mpsc::UnboundedReceiver<Command>; | |||
|
|||
#[derive(Debug, thiserror::Error)] | |||
pub enum SyncError {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noticed this is unused, so removed it.
let update_interval = Configuration::pinned() | ||
.common | ||
.metadata_update_interval | ||
.into(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wasn't sure what to pick for this. It seems reasonable to start with; it might be a nice optimisation to add a get_if_newer_version
to the metadata store to make it cheaper to poll. Eventually we should gossip/push this info to nodes without needing to poll all the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering whether the periodic update is really required if users of this struct refresh explicitly on outdated leader information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe we have any explicit mechanism today to provide outdated feedback; I exposed the refresh method just in case, but let's keep this for now please. Happy to reduce the refresh interval to something like 3-5x the heartbeat interval initially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's see whether the load on the metadata store will be too high. Something to keep an eye on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah! Not entirely certain what signals to monitor that will tell us that this is a problem, before it becomes an actual problem, if that makes sense. Probably some metrics around metadata store service?
I'm pretty keen on the get_if_version_newer
optimisation - I think basically any backing store we would choose for metadata should easily support this feature, and worst case, just fall back on always doing a full read if it can't. At least saves some bytes over the wire.
let partition_to_node_mappings = self.inner.clone(); | ||
let metadata_store_client = self.metadata_store_client.clone(); | ||
|
||
let task = task_center().spawn_unmanaged( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use spawn_unmanaged
to get a task handle which allows me to check if the previous one is finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea 👍
/// requests to for a given partition. Compared to the partition table, this view is more dynamic as | ||
/// it changes based on cluster nodes' operational status. This handle can be cheaply cloned. | ||
#[derive(Clone)] | ||
pub struct PartitionRouting { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not in love with this name but hate everything else I've come up with more. Any suggestions? PartitionNodeRoutes
is a more verbose version I also like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @pcholakov. I think this goes exactly in the direction I had in mind for the simple solution to unblock Igal's and Francesco's work. I left a few comments and suggestions we could do before merging.
crates/core/src/routing_info/mod.rs
Outdated
/// Request a refresh without waiting for it to complete. This is useful when the caller | ||
/// discovers via some other mechanism that the local view might be outdated - for example, when | ||
/// a request to a node previously returned by `get_node_by_partition` fails with a response | ||
/// that indicates that this routing information is no longer valid. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will a user of PartitionRouting
use it if they encounter invalid routing information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just call it? :-) It's super dumb right now, just a fire and forget to trigger the refresh. We don't yet have anything errors that would signify that one is working with outdated routing info but I imagine that instead of TargetVersion::Latest
, the caller might specify something they got from a peer, such as monotonic a partition mapping version/epoch number. But the expectation is that whoever calls this already encountered an unrecoverable error, and calling this is just a nice to have signal. Let me see if I can work this into a doc comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do I do after I called this method? Right now, I cannot wait until the update has been processed. So I can only continue and hope that by the time I retry, the information has been updated. If not, then I'll talk to the outdated leader again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking here was that the caller has likely already given up, and this is a pure courtesy notification. Because there's no guarantee whatsoever that waiting for a change will fix anything - and conversely, the same node might later respond successfully even if nothing has changed from the routing table's point of view. Happy to add a blocking option here that returns when the version changes, just didn't see it as a very useful thing to do right now. What do you see we can do differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was imagining the caller having a number of retries. Whenever it receives a "wrong node" response indicating that it is talking to a wrong node, it would like to refresh the routing information hoping that it was operating against wrong information. Once new information is obtained, it could continue hoping that this time the information is up to date.
If the caller stopped retrying, then only notifying the RoutingInformation
that the data is outdated is probably enough.
So I guess I am wondering whether an API that allows us to await for a refresh to complete is missing or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tillrohrmann I think this is the main outstanding concern with this PR - would you like to see some specific improvements before we can merge this? I thought about this a fair amount yesterday and don't believe we can meaningfully improve on "just back off and retry" at the moment, though we'll definitely be able to do better in the future once we have better view of cluster node status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can take this as a follow-up. One idea could be to have a watch that gets updated after the refresh task completes. A caller could then await this update, for example. I think this might be relevant for the work that @slinkydeveloper is doing (depending a bit on how he builds the ingress retries on wrong leadership information).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good - I'd love to do that follow-up!
// Implementation note: the sole authority for node-level routing is currently the metadata store, | ||
// and in particular, the cluster scheduling plan. This will change in the future so avoid leaking | ||
// implementation details or assumptions about the source of truth. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 thanks for stating it here.
crates/core/src/routing_info/mod.rs
Outdated
let scheduling_plan: SchedulingPlan = metadata_store_client | ||
.get(SCHEDULING_PLAN_KEY.clone()) | ||
.await? | ||
.context("Scheduling plan not found")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if there is no scheduler yet that has created a SchedulingPlan
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figure we'll just fail some requests until we have one - if a caller is trying to look up routing info for a partition, and we don't yet know how to address any of its nodes, it should return an appropriate error. E.g. the ingress layer might just return a 500 server-internal retryable error indicating to the upstream caller that the condition should be transient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add a debug-level log message about this at least.
let partition_to_node_mappings = self.inner.clone(); | ||
let metadata_store_client = self.metadata_store_client.clone(); | ||
|
||
let task = task_center().spawn_unmanaged( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea 👍
ce52cc6
to
4e2c5e2
Compare
In this PR I can't find where I would be able to acces this |
4e2c5e2
to
9f259e3
Compare
I guess it will be passed into the |
@slinkydeveloper see how I've wired it up in this PR: #2172 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for implementing the PartitionRouting
struct @pcholakov. The changes look really good. +1 for merging.
let update_interval = Configuration::pinned() | ||
.common | ||
.metadata_update_interval | ||
.into(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's see whether the load on the metadata store will be too high. Something to keep an eye on.
crates/core/src/routing_info/mod.rs
Outdated
) -> anyhow::Result<()> { | ||
let scheduling_plan: Option<SchedulingPlan> = metadata_store_client | ||
.get(SCHEDULING_PLAN_KEY.clone()) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we also log on debug if an error occurred here. That way things might become easier to debug if the routing does not update.
let _ = partition_to_node_mappings.compare_and_swap( | ||
current_mappings, | ||
Arc::new(PartitionToNodesRoutingTable { | ||
version: scheduling_plan.version(), | ||
inner: partition_nodes, | ||
}), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice :-)
self.sender | ||
.send(Command::SyncRoutingInformation) | ||
.await | ||
.expect("Failed to send refresh request"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Since the Command
no longer carries any specific value, one could use a watch to signal the refresh request. That way, the request_refresh
call could become synchronous and users of this method wouldn't block on the availability of send permits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a neat use of watch! I do think the sync command will grow to carry some information again in the future - perhaps a node id, partition id, or some other context from the caller - as we learn more about the conditions that should trigger a refresh. So, I'd rather keep in as is for the time being, even though it's a totally valid nit!
New approach as an alternative to #2149, which exposes a dedicated
PartitionRouting
lookup interface. Its implementation is initially backed by the Metadata Store but it is not expected to stay that way for long.