-
Notifications
You must be signed in to change notification settings - Fork 604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-8933] Setup for translation scheduling port #25126
base: dev
Are you sure you want to change the base?
[CORE-8933] Setup for translation scheduling port #25126
Conversation
Signed-off-by: Oren Leiman <[email protected]>
This method loops through the column writers to check if any of them are flush worthy, computes the memory usage in the same loop. Useful for a latter commit that avoids this loop again and needs stats right after append. Signed-off-by: Oren Leiman <[email protected]>
The interface implementations keep track of the current memory used by the writer and related reservations. Signed-off-by: Oren Leiman <[email protected]>
Adds the following - flush() - flushes all the buffered bytes to the output stream - methods to fetch buffered/flushed bytes Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
b929834
to
2098d37
Compare
Signed-off-by: Oren Leiman <[email protected]>
.. instead of lazy_abort_source. To be used later, they are both connected anyway. Signed-off-by: Oren Leiman <[email protected]>
2098d37
to
f2b3275
Compare
/ci-repeat 1 |
CI test resultstest results on build#62085
test results on build#62166
|
CI Failure:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Thanks for pulling this out
@@ -0,0 +1,24 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, what's the intuition for things that go in here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% on details to give a one sentence description, but the bulk of it is here: 82bcd17
@@ -303,11 +303,6 @@ partition_translator::do_translation_for_range( | |||
const auto& ntp = _partition->ntp(); | |||
auto remote_path_prefix = remote_path{ | |||
fmt::format("{}/{}/{}", iceberg_data_path_prefix, ntp.path(), _term)}; | |||
lazy_abort_source las{[this] { | |||
return can_continue() ? std::nullopt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, this used to account for term changes. Is that still covered with _as
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the answer is that it'll be up to the scheduler/executor to abort during term changes via this abort source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i believe that's right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, just one comment which could be a followup
size_t buffered_bytes() const final; | ||
|
||
size_t flushed_bytes() const final; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just amend the interface of the parquet writer to be the same? So we don't have to duplicate the state in both layers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah makes sense to me. will do in a follow up so I can boy scout it a little bit.
Currently multiplexer is a one shot class with pattern as follows mux = create_mux(); co_await reader.consume(mux...) With the new changes, we want multiplexer to multiplex across scheduling iterations and release resouces inbetween. This commit makes changes to the API support this port. The new pattern would look something like this.. mux = create_mux(); mux.multiplex(reader1...) mux.flush_writers(); // optional mux.multiplex(reader2..) mux.flush_writers(); // optional ... ... result = co_await std::move(mux).finish(); The ability to temporarily flush all the intermediate state and multiplex across multiple readers enables porting to the new scheduler API. Signed-off-by: Oren Leiman <[email protected]>
Make the task long running to support batching of data across multiple iterations of scheduler Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
f2b3275
to
ffdcfe0
Compare
Datalake API changes setting us up for porting the translation path to the new scheduler infrastructure. As written, should be functionally equivalent to current world, but some APIs have changes shape significantly.
Pulled form #25077
Backports Required
Release Notes