-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18522: Slice records for share fetch #18804
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 5 out of 9 changed files in this pull request and generated 1 comment.
Files not reviewed (4)
- gradle/spotbugs-exclude.xml: Language not supported
- core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java: Evaluated as low risk
- core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: Evaluated as low risk
- core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: Evaluated as low risk
Comments suppressed due to low confidence (3)
core/src/test/java/kafka/server/share/SharePartitionTest.java:5898
- Consider overloading the fetchAcquiredRecords method instead of adding a boolean flag for subsetAcquired.
private List<AcquiredRecords> fetchAcquiredRecords(
core/src/test/java/kafka/server/share/SharePartitionTest.java:5914
- Ensure that the memoryRecordsBuilder method from ShareFetchTestUtils is used consistently across all tests.
return memoryRecordsBuilder(numOfRecords, startOffset).build();
server/src/main/java/org/apache/kafka/server/share/fetch/ShareAcquiredRecords.java:45
- Ensure that the subsetAcquired flag is properly tested to verify the slicing logic works correctly when this flag is true or false.
private final boolean subsetAcquired;
server/src/main/java/org/apache/kafka/server/share/fetch/ShareAcquiredRecords.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 5 out of 9 changed files in this pull request and generated 1 comment.
Files not reviewed (4)
- gradle/spotbugs-exclude.xml: Language not supported
- core/src/main/java/kafka/server/share/ShareFetchUtils.java: Evaluated as low risk
- core/src/main/java/kafka/server/share/SharePartition.java: Evaluated as low risk
- core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: Evaluated as low risk
server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Carried out an initial pass 😊, thank you, this has been an interesting read!
Thanks for taking a look, indeed it's an intersesting change to optimize the transferred bytes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the PR. Left a few comments.
* of the fetched records. Otherwise, the original records are returned. | ||
*/ | ||
static Records maybeSliceFetchRecords(Records records, ShareAcquiredRecords shareAcquiredRecords) { | ||
if (!shareAcquiredRecords.subsetAcquired() || !(records instanceof FileRecords fileRecords)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, with remote storage, it's possible for records to be of MemoryRecords. It would be useful to slice it too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I do not see any specific slicing API in memory records. Do you think I should add one? Or there exists some way already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems there is no slicing API in memory records. So, we will need to add one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I do it int this PR itself or another PR/task? Else this PR will get too long to include new API in memory records and respective individual tests, whiile integrating here. I ll prefer separately.
for (FileChannelRecordBatch batch : fileRecords.batches()) { | ||
// If the batch base offset is less than the first acquired offset, then the start position | ||
// should be updated to skip the batch. | ||
if (batch.baseOffset() < firstAcquiredOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure why we need to maintain previousBatch
below. Could set just set startPosition
when batch.lastOffset()
is >= firstAcquiredOffset
for the first time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are correct that would have been the easiest way. But as lastOffset of batch loads headers hence I have avoided that call by maintaining previousBatch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explanation. Got it.
I was thinking if we could make the code a bit easier to understand. Specially, rename previousBatch to sth like mayOverlapBatch. Instead of first increasing startPosition and later decreasing it, we only increase startPosition when we are sure mayOverlapBatch indeed overlaps in the next iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao I re-thought about the solution. And tried to simplify it, also got complete rid of lastOffset()
method call. See if it makes better sense now. Also have skipped any calculation if there exists single fetched batch, then we should not consider any slicing, as the acquired records should always be within fetched data.
assertEquals(7, recordBatches.get(0).baseOffset()); | ||
assertEquals(10, recordBatches.get(0).lastOffset()); | ||
|
||
// Acquire including gaps between batches, should return 2 batches. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, there are no gaps btw batches, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gap is at offset 5 and 6 hence the check just validates that there occurs no issue when acquired near gap boundaries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not an area of code that I feel qualified to review authoritatively. However, I see that the changes seem compatible with my thoughts about how to handle isolation level when we tackle that feature.
Thanks for the review @junrao, I have addressed and replied to the comments. I also have one doubt regrding MemoryRecords, please if you can guide. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. A few more comments.
for (FileChannelRecordBatch batch : fileRecords.batches()) { | ||
// If the batch base offset is less than the first acquired offset, then the start position | ||
// should be updated to skip the batch. | ||
if (batch.baseOffset() < firstAcquiredOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explanation. Got it.
I was thinking if we could make the code a bit easier to understand. Specially, rename previousBatch to sth like mayOverlapBatch. Instead of first increasing startPosition and later decreasing it, we only increase startPosition when we are sure mayOverlapBatch indeed overlaps in the next iteration.
server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java
Outdated
Show resolved
Hide resolved
* of the fetched records. Otherwise, the original records are returned. | ||
*/ | ||
static Records maybeSliceFetchRecords(Records records, ShareAcquiredRecords shareAcquiredRecords) { | ||
if (!shareAcquiredRecords.subsetAcquired() || !(records instanceof FileRecords fileRecords)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems there is no slicing API in memory records. So, we will need to add one.
Hi @junrao, can you please re-review the simplified solution. |
The PR handles slicing of fetched records based on acquire response for share fetch. There could be additional bytes fetched from log but acquired offsets can be a subset, typically with
max fetch records
configuration. Rather sending additional bytes of fetched data to client we should slice the file and wire only needed batches.Note: If the acquired offsets are within a batch then we need to send the entire batch within the file record. Hence rather checking for inidividual batches, PR finds the first and last acquired offset, and trims the file for all batches between (inclusive) these two offsets.