Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Cdc log visible only when the data already flush to rocksDb from preWriteBuffer #179

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.client.admin;

import com.alibaba.fluss.client.admin.OffsetSpec.LatestSpec;
import com.alibaba.fluss.client.admin.OffsetSpec.TimestampSpec;
import com.alibaba.fluss.client.scanner.ScanRecord;
import com.alibaba.fluss.client.scanner.log.LogScan;
import com.alibaba.fluss.client.scanner.log.LogScanner;
import com.alibaba.fluss.client.scanner.log.ScanRecords;
import com.alibaba.fluss.client.table.Table;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.types.RowType;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.alibaba.fluss.testutils.InternalRowAssert.assertThatRow;
import static org.assertj.core.api.Assertions.assertThat;

/**
* The base test class for client to server request and response. The server include
* CoordinatorServer and TabletServer.
*/
public class ClientToServerITCaseUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem to be used.


public static long createTable(
Admin admin,
TablePath tablePath,
TableDescriptor tableDescriptor,
boolean ignoreIfExists)
throws Exception {
admin.createDatabase(tablePath.getDatabaseName(), ignoreIfExists).get();
admin.createTable(tablePath, tableDescriptor, ignoreIfExists).get();
return admin.getTable(tablePath).get().getTableId();
}

public static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
// set a shorter interval for testing purpose
conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
// set a shorter max lag time to to make tests in FlussFailServerTableITCase faster
conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, Duration.ofSeconds(10));

conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb"));
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb"));
return conf;
}

public static LogScanner createLogScanner(Table table) {
return table.getLogScanner(new LogScan());
}

public static LogScanner createLogScanner(Table table, int[] projectFields) {
return table.getLogScanner(new LogScan().withProjectedFields(projectFields));
}

public static void subscribeFromBeginning(LogScanner logScanner, Table table) {
int bucketCount = getBucketCount(table);
for (int i = 0; i < bucketCount; i++) {
logScanner.subscribeFromBeginning(i);
}
}

public static void subscribeFromTimestamp(
PhysicalTablePath physicalTablePath,
@Nullable Long partitionId,
Table table,
LogScanner logScanner,
Admin admin,
long timestamp)
throws Exception {
Map<Integer, Long> offsetsMap =
admin.listOffsets(
physicalTablePath,
getAllBuckets(table),
new TimestampSpec(timestamp))
.all()
.get();
if (partitionId != null) {
offsetsMap.forEach(
(bucketId, offset) -> logScanner.subscribe(partitionId, bucketId, offset));
} else {
offsetsMap.forEach(logScanner::subscribe);
}
}

public static void subscribeFromLatestOffset(
PhysicalTablePath physicalTablePath,
@Nullable Long partitionId,
Table table,
LogScanner logScanner,
Admin admin)
throws Exception {
Map<Integer, Long> offsetsMap =
admin.listOffsets(physicalTablePath, getAllBuckets(table), new LatestSpec())
.all()
.get();
if (partitionId != null) {
offsetsMap.forEach(
(bucketId, offset) -> logScanner.subscribe(partitionId, bucketId, offset));
} else {
offsetsMap.forEach(logScanner::subscribe);
}
}

public static List<Integer> getAllBuckets(Table table) {
List<Integer> buckets = new ArrayList<>();
int bucketCount = getBucketCount(table);
for (int i = 0; i < bucketCount; i++) {
buckets.add(i);
}
return buckets;
}

public static int getBucketCount(Table table) {
return table.getDescriptor()
.getTableDistribution()
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
.orElse(ConfigOptions.DEFAULT_BUCKET_NUMBER.defaultValue());
}

public static void verifyPartitionLogs(
Table table, RowType rowType, Map<Long, List<InternalRow>> expectPartitionsRows)
throws Exception {
int totalRecords =
expectPartitionsRows.values().stream().map(List::size).reduce(0, Integer::sum);
int scanRecordCount = 0;
Map<Long, List<InternalRow>> actualRows = new HashMap<>();
try (LogScanner logScanner = table.getLogScanner(new LogScan())) {
for (Long partitionId : expectPartitionsRows.keySet()) {
logScanner.subscribeFromBeginning(partitionId, 0);
}
while (scanRecordCount < totalRecords) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
for (TableBucket scanBucket : scanRecords.buckets()) {
List<ScanRecord> records = scanRecords.records(scanBucket);
for (ScanRecord scanRecord : records) {
actualRows
.computeIfAbsent(
scanBucket.getPartitionId(), k -> new ArrayList<>())
.add(scanRecord.getRow());
}
}
scanRecordCount += scanRecords.count();
}
}
assertThat(scanRecordCount).isEqualTo(totalRecords);
verifyRows(rowType, actualRows, expectPartitionsRows);
}

public static void verifyRows(
RowType rowType,
Map<Long, List<InternalRow>> actualRows,
Map<Long, List<InternalRow>> expectedRows) {
// verify rows size
assertThat(actualRows.size()).isEqualTo(expectedRows.size());
// verify each partition -> rows
for (Map.Entry<Long, List<InternalRow>> entry : actualRows.entrySet()) {
List<InternalRow> actual = entry.getValue();
List<InternalRow> expected = expectedRows.get(entry.getKey());
// verify size
assertThat(actual.size()).isEqualTo(expected.size());
// verify each row
for (int i = 0; i < actual.size(); i++) {
assertThatRow(actual.get(i)).withSchema(rowType).isEqualTo(expected.get(i));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public final class LogTablet {
private final Clock clock;
private final boolean isChangeLog;

@GuardedBy("lock")
private volatile LogOffsetMetadata ackedLogEndOffsetMetadata;

@GuardedBy("lock")
private volatile LogOffsetMetadata highWatermarkMetadata;

Expand Down Expand Up @@ -136,6 +139,7 @@ private LogTablet(
int writerExpirationCheckIntervalMs =
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_CHECK_INTERVAL).toMillis();
this.writerStateManager = writerStateManager;
this.ackedLogEndOffsetMetadata = new LogOffsetMetadata(0L);
this.highWatermarkMetadata = new LogOffsetMetadata(0L);

this.scheduler = scheduler;
Expand Down Expand Up @@ -402,31 +406,46 @@ private void updateHighWatermarkMetadata(LogOffsetMetadata newHighWatermark) {
LOG.trace("Setting high watermark {}", newHighWatermark);
}

private void updateAckedLogEndOffsetMetadata(LogOffsetMetadata newAckedLogEndOffset) {
if (newAckedLogEndOffset.getMessageOffset() < 0) {
throw new IllegalArgumentException("Acked log end offset should be non-negative");
}
synchronized (lock) {
if (newAckedLogEndOffset.getMessageOffset()
< ackedLogEndOffsetMetadata.getMessageOffset()) {
LOG.warn(
"Non-monotonic update of Acked log end offset from {} to {}",
ackedLogEndOffsetMetadata,
newAckedLogEndOffset);
}
ackedLogEndOffsetMetadata = newAckedLogEndOffset;
}
LOG.trace("Setting acked log end offset to {}", newAckedLogEndOffset);
}

/**
* Update the highWatermark to a new value if and only if it is larger than the old value. It is
* an error to update to a value which is larger than the log end offset.
* Update the ackedLogEndOffset to a new value if and only if it is larger than the old value.
* It is an error to update to a value which is larger than the log end offset.
*
* <p>This method is intended to be used by the leader to update the highWatermark after
* <p>This method is intended to be used by the leader to update the ackedLogEndOffset after
* follower fetch offsets have been updated.
*/
public Optional<LogOffsetMetadata> maybeIncrementHighWatermark(
LogOffsetMetadata newHighWatermark) throws IOException {
if (newHighWatermark.getMessageOffset() > localLogEndOffset()) {
public Optional<LogOffsetMetadata> maybeIncrementAckedLogEndOffset(
LogOffsetMetadata newAckedLogEndOffset) throws IOException {
if (newAckedLogEndOffset.getMessageOffset() > localLogEndOffset()) {
throw new IllegalArgumentException(
String.format(
"HighWatermark %s update exceeds current log end offset %s",
newHighWatermark, localLog.getLocalLogEndOffsetMetadata()));
"AckedLogEndOffset %s update exceeds current log end offset %s",
newAckedLogEndOffset, localLog.getLocalLogEndOffsetMetadata()));
}
synchronized (lock) {
LogOffsetMetadata oldHighWatermark = fetchHighWatermarkMetadata();
// Ensure that the highWatermark increases monotonically. We also update the
// highWatermark when the new offset metadata is on a newer segment, which occurs
// whenever the log is rolled to a new segment.
if (oldHighWatermark.getMessageOffset() < newHighWatermark.getMessageOffset()
|| (oldHighWatermark.getMessageOffset() == newHighWatermark.getMessageOffset()
&& oldHighWatermark.onOlderSegment(newHighWatermark))) {
updateHighWatermarkMetadata(newHighWatermark);
return Optional.of(oldHighWatermark);
LogOffsetMetadata oldAckedLogEndOffset = fetchAckedLogEndOffsetMetadata();
if (oldAckedLogEndOffset.getMessageOffset() < newAckedLogEndOffset.getMessageOffset()
|| (oldAckedLogEndOffset.getMessageOffset()
== newAckedLogEndOffset.getMessageOffset()
&& oldAckedLogEndOffset.onOlderSegment(newAckedLogEndOffset))) {
updateAckedLogEndOffsetMetadata(newAckedLogEndOffset);
return Optional.of(oldAckedLogEndOffset);
} else {
return Optional.empty();
}
Expand Down Expand Up @@ -553,6 +572,26 @@ LogOffsetMetadata fetchHighWatermarkMetadata() throws IOException {
}
}

/**
* Get the offset and metadata for the current acked log end offset. If offset metadata is not
* known, this will do a lookup in the index and cache the result.
*/
LogOffsetMetadata fetchAckedLogEndOffsetMetadata() throws IOException {
localLog.checkIfMemoryMappedBufferClosed();
LogOffsetMetadata offsetMetadata = ackedLogEndOffsetMetadata;
if (offsetMetadata.messageOffsetOnly()) {
synchronized (lock) {
LogOffsetMetadata fullOffset =
convertToOffsetMetadataOrThrow(
ackedLogEndOffsetMetadata.getMessageOffset());
updateAckedLogEndOffsetMetadata(fullOffset);
return fullOffset;
}
} else {
return offsetMetadata;
}
}

/**
* Given a message offset, find its corresponding offset metadata in the log. If the message
* offset is out of range, throw an {@link LogOffsetOutOfRangeException}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,22 +850,30 @@ public LogReadInfo fetchRecords(FetchParams fetchParams) throws IOException {
}

/**
* Check and maybe increment the high watermark of the replica (leader). this function can be
* triggered when:
* Check and maybe increment the high watermark of the replica (leader), we will first try to
* increment ackedLogEndOffset, if the ackedLogEndOffset incremented, for log table, we would
* directly increment highWatermark in this function. For kv table, we will first to flushKv
* (see {@link #mayFlushKv(long)}) and then increment highWatermark if flush success. The
* function can be triggered when:
*
* <pre>
* 1. bucket ISR changed.
* 2. any follower replica's LEO changed.
* </pre>
*
* <p>The HW is determined by the smallest log end offset among all follower replicas that are
* in sync. This way, if a replica is considered caught-up, but its log end offset is smaller
* than HW, we will wait for this replica to catch up to the HW before advancing the HW.
* <p>The high watermark is determined by the smallest log end offset among all follower
* replicas that are in sync and the kv data in memory buffer already flush to rocksdb. This
* way, if a replica is considered caught-up, but its lpg end offset is smaller than high
* watermark, we will wait for this replica to catch up tp the high watermark before advancing
* it.
*
* <p>Note There is no need to acquire the leaderIsrUpdate lock here since all callers of this
* private API acquires that lock.
*
* @return true if the high watermark is incremented, and false otherwise.
* <p>Note: the high watermark is equal to the ackedLogEndOffset for log table, but may not
* equal to the ackedLogEndOffset for kv table. For kv table, the HW is equal to
* min(ackedLogEndOffset, flushedLogOffset). The flushedLogOffset means the largest record
* offset which already be flushed to kv store (rocksDb).
*/
private boolean maybeIncrementLeaderHW(LogTablet leaderLog, long currentTimeMs)
throws IOException {
Expand All @@ -880,28 +888,34 @@ private boolean maybeIncrementLeaderHW(LogTablet leaderLog, long currentTimeMs)
// maybeIncrementLeaderHW is in the hot path, the following code is written to
// avoid unnecessary collection generation.
LogOffsetMetadata leaderLogEndOffset = leaderLog.getLocalEndOffsetMetadata();
LogOffsetMetadata newHighWatermark = leaderLogEndOffset;
LogOffsetMetadata newAckedLogEndOffset = leaderLogEndOffset;

for (FollowerReplica remoteFollowerReplica : followerReplicasMap.values()) {
// Note here we are using the "maximal", see explanation above.
FollowerReplica.FollowerReplicaState replicaState =
remoteFollowerReplica.stateSnapshot();
int followerId = remoteFollowerReplica.getFollowerId();
if (replicaState.getLogEndOffsetMetadata().getMessageOffset()
< newHighWatermark.getMessageOffset()
< newAckedLogEndOffset.getMessageOffset()
&& (isrState.maximalIsr().contains(followerId)
|| shouldWaitForReplicaToJoinIsr(
replicaState, leaderLogEndOffset, currentTimeMs, followerId))) {
newHighWatermark = replicaState.getLogEndOffsetMetadata();
newAckedLogEndOffset = replicaState.getLogEndOffsetMetadata();
}
}

Optional<LogOffsetMetadata> oldWatermark =
leaderLog.maybeIncrementHighWatermark(newHighWatermark);
if (oldWatermark.isPresent()) {
LOG.debug("High watermark update from {} to {}.", oldWatermark.get(), newHighWatermark);
// when watermark advanced, we may need to flush kv if it's kv replica
mayFlushKv(newHighWatermark.getMessageOffset());
Optional<LogOffsetMetadata> optOldAckedLogEndOffset =
leaderLog.maybeIncrementAckedLogEndOffset(newAckedLogEndOffset);
if (optOldAckedLogEndOffset.isPresent()) {
long messageOffset = optOldAckedLogEndOffset.get().getMessageOffset();
LOG.debug(
"AckedLogEndOffset update from {} to {}.", messageOffset, newAckedLogEndOffset);

// when ackedLogEndOffset advanced, we may need to flush kv if it's kv replica
mayFlushKv(newAckedLogEndOffset.getMessageOffset());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can simplify flush kv first, and then update high-watermark? Our purpose is just make KV ready before log is available. This can avoid introducing additional variable ackedLogEndOffsetMetadata (it is quite complex if we need to handle the restore of the new variable).


LOG.debug("Highwatermark update from {} to {}.", messageOffset, newAckedLogEndOffset);
leaderLog.updateHighWatermark(newAckedLogEndOffset.getMessageOffset());
return true;
} else {
return false;
Expand Down
Loading
Loading