-
Notifications
You must be signed in to change notification settings - Fork 544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[rhythm] block-builder: optimize partition sorting and improve cycle handling #4750
base: main
Are you sure you want to change the base?
[rhythm] block-builder: optimize partition sorting and improve cycle handling #4750
Conversation
This commit includes two important improvements to the block builder: 1. Optimize partition sorting: - Move the sort operation outside the loop to avoid re-sorting all partitions on each iteration - Use a more efficient bubble-up approach to re-position only the updated partition - Add a safety check for empty partitions array 2. Fix cycle handling when records stop mid-cycle: - Ensure the block builder correctly stops a consumption cycle when records stop coming in - Prevent re-consumption of the same records in subsequent cycles The changes ensure that when a consumption cycle starts but records stop before the cycle ends, the block builder correctly handles this case, stops at the appropriate time, and doesn't consume the same records again in the next cycle.
} | ||
|
||
sort.Slice(ps, func(i, j int) bool { | ||
now := time.Now() | ||
return now.Sub(ps[i].lastRecordTs) > now.Sub(ps[j].lastRecordTs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is comparing 2 deltas, which I think can be simplified to comparing the timestamps directly ps[i].lastRecordTs.Before(ps[j].lastRecordTs)
ps[0].startOffset = lastRecordOffset | ||
ps[0].commitOffset = commitOffset | ||
|
||
// Re-sort only if needed - find the correct position for the updated partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of no data consumePartition
returns time.Time{}, -1
and it looks like it will still be sorted to the top and break the loop, even if other partitions need servicing. Not sure if this is an implausible edge case or not, because stepping back a partition that is caught up would be skipped over initially because hasRecords()
is false.
Following that path, I think there is a new unidentified issue (please check my thinking here). If we are assigned 2 partitions and one has lag (A) and the other not (B), it will not work as expected. Flow:
- fetch initial state of all partitions
- the partition without lag (B) hasRecords()==false and is skipped
- lastRecordTs remains zero
- We sort A to the front and consumePartition(A)
- Then B is sorted to the front and the loop breaks
- We don't keep servicing partition A as long as needed, PLUS we also don't requery the state of B to see if it gets new data while processing A.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could happen but eventually, it would be fixed by itself.
- We stop the cycle when we reach the stop condition, meaning A has no lag anymore.
- We go back to the running method and initiate a new cycle
- B has lag already, A most likely since we have to wait
b.cfg.ConsumeCycleDuration - lagTime
We need to stop sorting, that's the problem
I propose no sort at all, iterate the array, and choose always the laggiest partition:
// Iterate over the laggiest partition until the lag is less than the cycle duration or none of the partitions has records
for {
laggiestPartitionIndex := 0
for i:= 1;i<len(ps);i++{
if !ps[i].lastRecordTs.IsZero() && ps[i].lastRecordTs.Before(ps[laggiestPartitionIndex].lastRecordTs){
laggiestPartitionIndex = i
}
}
if ps[laggiestPartitionIndex].lastRecordTs.IsZero() {
return b.cfg.ConsumeCycleDuration, nil
}
lagTime := time.Since(ps[laggiestPartitionIndex].lastRecordTs)
if lagTime < b.cfg.ConsumeCycleDuration {
return b.cfg.ConsumeCycleDuration - lagTime, nil
}
lastRecordTs, lastRecordOffset, err := b.consumePartition(ctx, ps[laggiestPartitionIndex])
if err != nil {
return 0, err
}
ps[laggiestPartitionIndex].lastRecordTs = lastRecordTs
ps[laggiestPartitionIndex].startOffset = lastRecordOffset
}
}
This is even more efficient than sorting and easier to understand
@@ -101,20 +101,20 @@ type partitionState struct { | |||
// Partition number | |||
partition int32 | |||
// Start and end offset | |||
startOffset, endOffset int64 | |||
commitOffset, endOffset int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
What this PR does:
Optimize partition sorting:
Fix cycle handling when records stop mid-cycle:
The changes ensure that when a consumption cycle starts but records stop before the cycle ends, the block builder correctly handles this case, stops at the appropriate time, and doesn't consume the same records again in the next cycle.
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]