Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire up PartitionRouting into RemoteScannerManager #2172

Merged
merged 9 commits into from
Nov 5, 2024

Conversation

pcholakov
Copy link
Contributor

@pcholakov pcholakov commented Oct 24, 2024

As per title, this enables DF queries to run in remote scan mode based on partition-node mappings (themselves derived from the cluster scheduling plan for the time being).

Note this PR is based on #2182 (I've set the base branch to my own copy so you only see relevant commits here).

Copy link

github-actions bot commented Oct 24, 2024

Test Results

  5 files  ±0    5 suites  ±0   2m 52s ⏱️ +37s
 47 tests +2   46 ✅ +1  1 💤 +1  0 ❌ ±0 
119 runs  +5  117 ✅ +3  2 💤 +2  0 ❌ ±0 

Results for commit 5621d20. ± Comparison against base commit fdb95c5.

♻️ This comment has been updated with latest results.

Base automatically changed from feat/partition-node-routing to main October 25, 2024 04:30
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for wiring up the PartitionRouting with the RemoteScannerManager @pcholakov. I think due the changes a few tests are failing. One question I had is what happens if we panic in the get_partition_target_node method? Will this abort the process? If yes, then we have to handle the error situation properly.

@pcholakov
Copy link
Contributor Author

Apologies @tillrohrmann, I will move this back to draft; This PR was just a quick sketch to illustrate wiring, and not intended to be a complete solution. Panic is absolutely not the answer here. I iterated on this briefly on Friday but I didn't manage to get into a ready state. My thinking is as follows:

  • the node should know whether it's the local authority for the partition in any event without any reliance on metadata sync; then it can default to local execution without waiting for the node routing info to be available
  • else, queries may fail if we determine that a routing decision is required, and the information for where to route the read request isn't available

I will come back to finish this, now that we have the distributed query merged though.

@pcholakov pcholakov marked this pull request as draft October 28, 2024 10:25
@pcholakov pcholakov changed the title Wire up PartitionRouting into RemoteScannerManager [WIP] Wire up PartitionRouting into RemoteScannerManager Oct 29, 2024
@pcholakov pcholakov force-pushed the feat/wire-up-partition-lookup branch 2 times, most recently from 7b707c9 to bd4ea08 Compare October 30, 2024 11:07
@pcholakov pcholakov changed the title [WIP] Wire up PartitionRouting into RemoteScannerManager Wire up PartitionRouting into RemoteScannerManager Oct 30, 2024
@pcholakov pcholakov changed the base branch from main to pcholakov/pr2173 October 30, 2024 11:14
@pcholakov pcholakov force-pushed the feat/wire-up-partition-lookup branch 2 times, most recently from 2269b51 to 5113c0f Compare October 31, 2024 10:04
crates/core/src/routing_info/mod.rs Outdated Show resolved Hide resolved
crates/core/src/routing_info/mod.rs Outdated Show resolved Hide resolved
@pcholakov pcholakov marked this pull request as ready for review October 31, 2024 10:05
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @pcholakov. The changes looks good to me. There are two things that I would like to discuss:

  1. Should the PartitionRouting tell callers whether it is Local or Remote? What if a user of the PartitionRouting is only interested in the node_id?
  2. Do you think that the usage of Deref for the MockPartitionRouting is required? I fear that it leads to less predictable code.

crates/core/src/routing_info/mod.rs Outdated Show resolved Hide resolved
crates/core/src/routing_info/mod.rs Outdated Show resolved Hide resolved
crates/core/src/routing_info/mod.rs Outdated Show resolved Hide resolved
slinkydeveloper added a commit to slinkydeveloper/restate that referenced this pull request Nov 1, 2024
@tillrohrmann
Copy link
Contributor

We probably can include in this PR the wiring of #2111 with the PartitionRouting.

Also renames the routing_info module to partitions, which is more reflective of
its purpose as well as more general.
slinkydeveloper added a commit that referenced this pull request Nov 4, 2024
* Another squash

WIP wire up new PartitionRouting

A bit of docs

Retries and timeouts in the RpcRequestDispatcher

Fix names in RequestDispatcher abstraction and remove method we don't need

Fix bad change that I've done before to how we handle actions

Fix comment

Use PartitionProcessorRequestId as "physical deduplication id" for RPC requests. We do this because we don't want to rely on the different semantics between idempotency id/workflow id when dealing with duplicates.

Rebase changes

Many things:

* Remove node awareness from ServiceInvocationResponseSink::Ingress and SubmitNotificationSink::Ingress
* Inline ActionEffect inside leadership.rs
* Now for append_invocation and append_invocation_response we await bifrost to ack the append
* Various renamings

Propagate completion expiry time. There are some cases in the PP state machine where we could have the completion expiry time, but for now I set None to avoid too many conflicts with #2035

Types cleanup

Cleanup ingress-dispatcher. This is now used only by kafka ingress

The ingress http stops won't use anymore ingress-dispatcher. The interface InvocationStorageReader became RequestDispatcher and just encapsulates all the RPC features.

Moved inside the PartitionProcessor the setting of the "_sink" fields of the ServiceInvocation.

In theory the ServiceInvocation data structure shouldn't be used here anymore for this RPC contract, but for now we keep it for simplicity.

Implement all the Partition Processor RPC commands on the PP side.

Move the business logic of self proposing from action effect handler into `SelfProposer`, part of the Leader state.
Implement SubmitInvocationResponse rpc

Define the contract of interaction between ingress <-> PP (see partition_processor_rpc_client and types/net/partition_processor).
Reorganize a bit types here and there.

Decouple ingress from partition store

This commit replaces the direct dependency of the ingress on the partition
store with rpcs. The ingress now looks up the corresponding partition processor
and asks it for the output result of the queried invocation.

This fixes #1811.

* rebase

* Fix PartitionProcessor business logic:

* On the same invocation, we now just run the same business logic we would with idempotent requests.
* I still kept some form of "unexpected duplicate invocation id" check, this might turn out to be useful to debug issues related to rpc and/or routing
* Added the business logic to correctly handle the /send notification case with duplicate RPCs

* Miscellaneous changes related to the previous one. I'll deal with back-compat problems in one of the next commits.

* PartitionProcessorRpcClient now uses the ConnectionAwareRpcRouter, and has a little method on the error to discriminate what is safe to retry or not from the rpc client perspective

* Various renaming of methods of RpcRequestDispatcher to make more obvious what they're needed for + generics massaging.

* Add a new data structure for defining invocation requests.

Now the InvocationRequest is just the invocation parameters, while the ServiceInvocation is the "wal" invocation request, which contains additional metadata that are added while the invocation flows through the system.

* Adapt the business logic of the RpcRequestDispatcher to the various changes

* Sort out situations when it's safe to retry or not.

* PR Feedback

* Still need to wait for PR #2172 to enable partition routing

* Feedback

* Remove assertion for bad duplicate situations

* Fix test

* Feedback

---------

Co-authored-by: Till Rohrmann <[email protected]>
@pcholakov pcholakov force-pushed the feat/wire-up-partition-lookup branch from 5113c0f to a8c8fc4 Compare November 4, 2024 08:23
@pcholakov pcholakov changed the base branch from pcholakov/pr2173 to main November 4, 2024 09:03
@pcholakov pcholakov force-pushed the feat/wire-up-partition-lookup branch from 22052eb to 3f44fc3 Compare November 4, 2024 09:09
@@ -13,7 +13,7 @@ mod metadata;
pub mod metadata_store;
mod metric_definitions;
pub mod network;
pub mod routing_info;
pub mod partitions;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is more intuitive and more general for the future.

@pcholakov
Copy link
Contributor Author

@tillrohrmann

We probably can include in this PR the wiring of #2111 with the PartitionRouting.

Done, I've addressed the open TODOs from that PR and ready for re-review :-)

@igalshilman igalshilman removed their request for review November 4, 2024 13:27
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR @pcholakov. I think it looks really nice. I've one remaining question which is regarding the PartitionRouting integration with the PartitionProcessorRpcClient: How are we triggering the refresh of the routing information?

Comment on lines 135 to 136
self.partition_routing.request_refresh();
bail!("node lookup for partition {} failed", partition_id)
Copy link
Contributor

@tillrohrmann tillrohrmann Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whether retrying once after the refresh has completed would give a noticeable better experience. While this can't catch all problems it will catch the common problem where there was a change in the cluster but not all nodes have realized this. Now when a query comes in, we are still operating with outdated information which could be solved by refreshing the routing information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my comment does not make a lot of sense here. Ignore it.

Copy link
Contributor

@igalshilman igalshilman Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tillrohrmann can you explain why it is not making sense? because I was thinking the same.
The way I resolve this (in an inner monologue 😅 ) is that it is not a big deal that the query will fail, and a user can retry. Did you think about a similar reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think where that was going is there's no guarantee that even though we've requested a refresh, it will have completed by the time the next user-driven retry inevitably comes around?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not too pleased with this tbh; but I think the right fix is to make sure we have either push-based updates (gossip, metadata store update subscriptions) or make the cost of polling cheap enough that we can do it more frequently than 3s, and block to retry after ~ one refresh interval.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wrote the comment, I was thinking about the case where a query fails because we are talking to a PP that is not the leader or to a node from which the partition has been removed. Since we don't care about who's the leader and we also don't move partitions away from nodes, we shouldn't really run into this situation (at least not right now). If we do, then I guess we need to handle this situation based on some response we receive from the remote server.

The case where a retry could help with for this specific method is if we don't know about the partition yet (so in the beginning of the cluster life time).

Comment on lines +322 to +325
let node_id = self
.partition_routing
.get_node_by_partition(partition_id)
.ok_or(PartitionProcessorRpcClientError::UnknownNode(partition_id))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we gonna refresh the partition_routing in the PartitionProcessorRpcClient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we shouldn't just inline this responsibility into the get_node_by_partition method; as you've suggested elsewhere, perhaps it's worth at least a single retry before giving up. We'd have to align the retry strategy's delay with the refresh interval otherwise - the tension is either putting increased load on the store polling for refreshes (until we do something better), or we slow down pending unknown lookups by up to the refresh interval (3s at present).

My worry is that by blocking assorted requests throughout the system for up to 3s, we could massively increase concurrency. This is why I didn't want to add a blocking version that waits for a refresh. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the idea of pushing every potential caller into having to remember to manually call request_refresh. Let me rethink that; it seemed more reasonable when we are operating on a user-issued SQL query but the user has no influence over what partition ids we decide to execute against. The fact that we have a partition id that can't be routed to a node is a strong enough signal that something needs a refresh.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the case where we don't have information for a given PartitionId inlining into the get_node_by_partition might work. However, if the case is that we are returning a node that is no longer the leader, it seems to me that we need to handle this situation in the component which is using the PartitionRouting (deciding to call refresh and then deciding on how to handle this situation (failing, retrying w/o waiting for the update, awaiting the update and the retrying, etc.)). By offering an API where one can wait for an update, it would allow higher level components to opt-in to this behavior if needed.

Regarding this very specific comment, it was more about who's calling refresh on the PartitionRouting struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in time, this should probably become async, and we can introduce a retry policy - or a retrying wrapper around the lookup, so we can control retries by call site. For now, I've added an automatic refresh request whenever we return None from lookup so that callers don't have to think about that. But, that may take up to 3s with current config, so we should probably keep failing to the caller immediately, and let the client redrive.

@igalshilman
Copy link
Contributor

@pcholakov I've rebased on your PR, but when I try to query I get

2024-11-04T16:18:27.498200Z ERROR tracing_panic
  A panic occurred
    panic.payload: "metadata() called outside task-center scope: AccessError"
    panic.location: "crates/storage-query-datafusion/src/remote_query_scanner_manager.rs:159:40"
    panic.backtrace: disabled backtrace
    panic.note: "run with RUST_BACKTRACE=1 environment variable to display a backtrace"
on rs:worker-12
thread 'rs:worker-12' panicked at crates/storage-query-datafusion/src/remote_query_scanner_manager.rs:159:40:
metadata() called outside task-center scope: AccessError
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Aborted (core dumped)

@igalshilman
Copy link
Contributor

The relevant code is this:

    // Note: we initialize my_node_id lazily as it is not available at construction time. We should
    // be careful since the call to metadata().my_node_id() will panic if the system configuration
    // isn't fully initialized yet, but we shouldn't be able to accept queries until that is in fact
    // the case.
    #[inline]
    fn my_node_id(&self) -> &GenerationalNodeId {
        self.my_node_id.get_or_init(|| metadata().my_node_id())
    }

It seems like this is being called not in a task center. which actually makes sense, because I think that this is being called during the data fusion planning which happens somewhere in the tokio-verse :-) just manually tokio::spawn left and right.
So accessing thread-locally like that is not going to work, i'm afried.

So what I end up doing in a similar situation is this:
https://github.com/restatedev/restate/blob/main/crates/storage-query-datafusion/src/remote_query_scanner_client.rs#L203,L211

@igalshilman
Copy link
Contributor

Let me fix this on my PR that is based on this, I think I got it.

@pcholakov
Copy link
Contributor Author

It seems like this is being called not in a task center. which actually makes sense, because I think that this is being called during the data fusion planning which happens somewhere in the tokio-verse :-)

Ah! I didn't think that through very well - I can also wire up a metadata handle into the PartitionRouting, no idea why I didn't just do that in the first place. There's no hard reason for relying on the thread context helper.

@igalshilman
Copy link
Contributor

@pcholakov I've verified that the changes at least don't panic now on the followup PR.

@pcholakov
Copy link
Contributor Author

Pushed one more update, which eliminates the assumption that this will only be called in a Task Center context. Sorry for the breakage @igalshilman, I think it's the better long-term solution.

@pcholakov
Copy link
Contributor Author

@tillrohrmann

I've one remaining question which is regarding the PartitionRouting integration with the PartitionProcessorRpcClient: How are we triggering the refresh of the routing information?

I hope I've addressed that above - PartitionRouting will now request a refresh any time it returns None, but not block for an answer. Let's get this merged to unblock Igal, if that's ok, and I'd be happy to rework this to be async + retrying and update all the call sites if we feel it's important?

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR @pcholakov. The changes look good to me. What is still missing is a way for the ingress to signal that we need to refresh the partition routing. We can go ahead and merge this PR to unblock @igalshilman and do the missing part as a follow-up.

Comment on lines +71 to +77
if maybe_node.is_none() {
debug!(
?partition_id,
"No known node for partition - requesting refresh"
);
self.request_refresh();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Practically, this change will probably have no visible effect right now since the SchedulingPlan with version Version::MIN should contain for all partitions an assignment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The earlier conditional is will kick in before any load has happened, i.e. version = Version::INVALID. This one will kick in if there is a new partition id that we have no node for, when version >= Version::MIN. Might still be missing something but that was my intent :-)

Comment on lines +161 to +166
self.my_node_id.get_or_init(|| {
self.metadata
.as_ref()
.expect("must have metadata if no node id is set")
.my_node_id()
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not always using metadata.my_node_id()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no functional need to cache it here, but @igalshilman is already taking care of this under his PR.

@pcholakov
Copy link
Contributor Author

I've opened #2215 for the follow-up Ingress update.

@pcholakov pcholakov merged commit da5f8f7 into main Nov 5, 2024
11 checks passed
@pcholakov pcholakov deleted the feat/wire-up-partition-lookup branch November 5, 2024 12:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants