Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18522: Slice records for share fetch #18804

Open
wants to merge 10 commits into
base: trunk
Choose a base branch
from
86 changes: 79 additions & 7 deletions core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.OffsetNotAvailableException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
Expand All @@ -39,6 +42,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -113,13 +117,7 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR
.setAcquiredRecords(Collections.emptyList());
} else {
partitionData
// We set the records to the fetchPartitionData records. We do not alter the records
// fetched from the replica manager as they follow zero copy buffer. The acquired records
// might be a subset of the records fetched from the replica manager, depending
// on the max fetch records or available records in the share partition. The client
// sends the max bytes in request which should limit the bytes sent to the client
// in the response.
.setRecords(fetchPartitionData.records)
.setRecords(maybeSliceFetchRecords(fetchPartitionData.records, shareAcquiredRecords))
.setAcquiredRecords(shareAcquiredRecords.acquiredRecords());
acquiredRecordsCount += shareAcquiredRecords.count();
}
Expand Down Expand Up @@ -187,4 +185,78 @@ static Partition partition(ReplicaManager replicaManager, TopicPartition tp) {
}
return partition;
}

/**
* Slice the fetch records based on the acquired records. The slicing is done based on the first
* and last offset of the acquired records from the list. The slicing doesn't consider individual
* acquired batches rather the boundaries of the acquired list.
*
* @param records The records to be sliced.
* @param shareAcquiredRecords The share acquired records containing the non-empty acquired records.
* @return The sliced records, if the records are of type FileRecords and the acquired records are a subset
* of the fetched records. Otherwise, the original records are returned.
*/
static Records maybeSliceFetchRecords(Records records, ShareAcquiredRecords shareAcquiredRecords) {
if (!shareAcquiredRecords.subsetAcquired() || !(records instanceof FileRecords fileRecords)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, with remote storage, it's possible for records to be of MemoryRecords. It would be useful to slice it too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I do not see any specific slicing API in memory records. Do you think I should add one? Or there exists some way already?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there is no slicing API in memory records. So, we will need to add one.

Copy link
Collaborator Author

@apoorvmittal10 apoorvmittal10 Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I do it int this PR itself or another PR/task? Else this PR will get too long to include new API in memory records and respective individual tests, whiile integrating here. I ll prefer separately.

return records;
}
// The acquired records should be non-empty, do not check as the method is called only when the
// acquired records are non-empty.
List<AcquiredRecords> acquiredRecords = shareAcquiredRecords.acquiredRecords();
try {
final long firstAcquiredOffset = acquiredRecords.get(0).firstOffset();
final long lastAcquiredOffset = acquiredRecords.get(acquiredRecords.size() - 1).lastOffset();
int startPosition = 0;
int size = 0;
// Track the previous batch to adjust the start position in case the first acquired offset
// is between the batch.
apoorvmittal10 marked this conversation as resolved.
Show resolved Hide resolved
FileChannelRecordBatch previousBatch = null;
for (FileChannelRecordBatch batch : fileRecords.batches()) {
// If the batch base offset is less than the first acquired offset, then the start position
// should be updated to skip the batch.
if (batch.baseOffset() < firstAcquiredOffset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure why we need to maintain previousBatch below. Could set just set startPosition when batch.lastOffset() is >= firstAcquiredOffset for the first time?

Copy link
Collaborator Author

@apoorvmittal10 apoorvmittal10 Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are correct that would have been the easiest way. But as lastOffset of batch loads headers hence I have avoided that call by maintaining previousBatch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explanation. Got it.

I was thinking if we could make the code a bit easier to understand. Specially, rename previousBatch to sth like mayOverlapBatch. Instead of first increasing startPosition and later decreasing it, we only increase startPosition when we are sure mayOverlapBatch indeed overlaps in the next iteration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao I re-thought about the solution. And tried to simplify it, also got complete rid of lastOffset() method call. See if it makes better sense now. Also have skipped any calculation if there exists single fetched batch, then we should not consider any slicing, as the acquired records should always be within fetched data.

startPosition += batch.sizeInBytes();
previousBatch = batch;
continue;
}
// If the first acquired offset is between the batch, then adjust the start position
apoorvmittal10 marked this conversation as resolved.
Show resolved Hide resolved
// to not skip the previous batch i.e. if batch is from 10-15 and the first acquired
// offset is 12, then the start position should be adjusted to include the batch containing
// the first acquired offset. Though generally, the first acquired offset should be the
// first offset of the batch, but there can be cases where the batch is split because of
// initial load from persister which has subset of acknowledged records from the batch.
// This adjustment should only be done once for the batch containing the first acquired offset,
// hence post the adjustment, the previous batch should be set to null.
if (previousBatch != null && batch.baseOffset() != firstAcquiredOffset) {
startPosition -= previousBatch.sizeInBytes();
size += previousBatch.sizeInBytes();
}
previousBatch = null;
// Consider the full batch size for slicing irrespective of the batch last offset i.e.
// if the batch last offset is greater than the last offset of the acquired records,
// we still consider the full batch size for slicing.
if (batch.baseOffset() <= lastAcquiredOffset) {
size += batch.sizeInBytes();
} else {
break;
}
}
// If the fetch resulted in single batch and the first acquired offset is not the base offset
// of the batch, then the position and size should be adjusted to include the batch. This
// can happen rarely when the batch is split because of initial load from persister. In such
// cases, check the last offset of the previous batch to include the batch. As the last offset
// call on batch is expensive hence the code is optimized to avoid the call. But should be
// considered for the edge case.
if (previousBatch != null && previousBatch.lastOffset() >= lastAcquiredOffset) {
startPosition -= previousBatch.sizeInBytes();
size += previousBatch.sizeInBytes();
}
return fileRecords.slice(startPosition, size);
} catch (Exception e) {
log.error("Error while checking batches for acquired records: {}, skipping slicing.", acquiredRecords, e);
// If there is an exception while slicing, return the original records so that the fetch
// can continue with the original records.
return records;
}
}
}
15 changes: 12 additions & 3 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -638,12 +638,17 @@ public ShareAcquiredRecords acquire(
List<AcquiredRecords> result = new ArrayList<>();
// The acquired count is used to track the number of records acquired for the request.
int acquiredCount = 0;
// Tracks if subset of the fetch batch is acquired.
boolean subsetAcquired = false;
// The fetched records are already part of the in-flight records. The records might
// be available for re-delivery hence try acquiring same. The request batches could
// be an exact match, subset or span over multiple already fetched batches.
for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
// If the acquired count is equal to the max fetch records then break the loop.
if (acquiredCount >= maxFetchRecords) {
// If the limit to acquire records is reached then it means there exists additional
// fetch batches which cannot be acquired.
subsetAcquired = true;
apoorvmittal10 marked this conversation as resolved.
Show resolved Hide resolved
break;
}

Expand Down Expand Up @@ -714,8 +719,9 @@ public ShareAcquiredRecords acquire(
lastBatch.lastOffset(), batchSize, maxFetchRecords - acquiredCount);
result.addAll(shareAcquiredRecords.acquiredRecords());
acquiredCount += shareAcquiredRecords.count();
subsetAcquired = shareAcquiredRecords.subsetAcquired();
}
return new ShareAcquiredRecords(result, acquiredCount);
return new ShareAcquiredRecords(result, acquiredCount, subsetAcquired);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -1213,7 +1219,10 @@ private ShareAcquiredRecords acquireNewBatchRecords(
startOffset = firstAcquiredOffset;
}
endOffset = lastAcquiredOffset;
return new ShareAcquiredRecords(acquiredRecords, (int) (lastAcquiredOffset - firstAcquiredOffset + 1));
return new ShareAcquiredRecords(
acquiredRecords,
(int) (lastAcquiredOffset - firstAcquiredOffset + 1) /* acquired records count */,
lastOffset > lastAcquiredOffset /* subset acquired */);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -2138,7 +2147,7 @@ private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
}
}

private long startOffsetDuringInitialization(long partitionDataStartOffset) throws Exception {
private long startOffsetDuringInitialization(long partitionDataStartOffset) {
// Set the state epoch and end offset from the persisted state.
if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) {
return partitionDataStartOffset;
Expand Down
28 changes: 14 additions & 14 deletions core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
Expand Down Expand Up @@ -71,6 +70,7 @@
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -174,7 +174,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), anyInt(), anyInt(), any())).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// We are testing the case when the share partition is getting fetched for the first time, so for the first time
// the fetchOffsetMetadata will return empty. Post the readFromLog call, the fetchOffsetMetadata will be
Expand Down Expand Up @@ -236,7 +236,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(any(), anyInt(), anyInt(), any())).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// We are testing the case when the share partition has been fetched before, hence we are mocking positionDiff
// functionality to give the file position difference as 1 byte, so it doesn't satisfy the minBytes(2).
Expand Down Expand Up @@ -289,7 +289,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
Expand Down Expand Up @@ -382,7 +382,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Collections.singleton(tp0));
Expand Down Expand Up @@ -530,7 +530,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
when(sp1.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch
// requests, it should add a "check and complete" action for request key tp1 on the purgatory.
Expand Down Expand Up @@ -628,7 +628,7 @@ public void testExceptionInMinBytesCalculation() {

when(sp0.canAcquireRecords()).thenReturn(true);
when(sp0.acquire(any(), anyInt(), anyInt(), any())).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

// Mocking partition object to throw an exception during min bytes calculation while calling fetchOffsetSnapshot
Expand Down Expand Up @@ -824,15 +824,15 @@ public void testPartitionMaxBytesFromUniformStrategyWhenAllPartitionsAreAcquirab
BROKER_TOPIC_STATS);

when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp1.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp2.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp3.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp4.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// All 5 partitions are acquirable.
doAnswer(invocation -> buildLogReadResult(sharePartitions.keySet())).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
Expand Down Expand Up @@ -922,9 +922,9 @@ public void testPartitionMaxBytesFromUniformStrategyWhenFewPartitionsAreAcquirab
BROKER_TOPIC_STATS);

when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
when(sp1.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));

// Only 2 out of 5 partitions are acquirable.
Set<TopicIdPartition> acquirableTopicPartitions = new LinkedHashSet<>();
Expand Down
Loading
Loading