Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17634: Tweak wakeup logic to match WakeupTrigger changes #17304

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ ShareFetchCollector<K, V> build(
public Set<String> subscription() {
acquireAndEnsureOpen();
try {
return subscriptions.subscription();
return Collections.unmodifiableSet(subscriptions.subscription());
} finally {
release();
}
Expand Down Expand Up @@ -594,7 +594,6 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
return ConsumerRecords.empty();
} finally {
kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
wakeupTrigger.clearTask();
release();
}
}
Expand All @@ -612,13 +611,16 @@ private ShareFetch<K, V> pollForFetches(final Timer timer) {

// Wait a bit - this is where we will fetch records
Timer pollTimer = time.timer(pollTimeout);
wakeupTrigger.setShareFetchAction(fetchBuffer);

try {
fetchBuffer.awaitNotEmpty(pollTimer);
} catch (InterruptException e) {
log.trace("Timeout during fetch", e);
throw e;
} finally {
timer.update(pollTimer.currentTimeMs());
wakeupTrigger.clearTask();
}

return collect(Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,21 @@ public void wakeup() {
FetchAction fetchAction = (FetchAction) task;
fetchAction.fetchBuffer().wakeup();
return new WakeupFuture();
} else if (task instanceof ShareFetchAction) {
ShareFetchAction shareFetchAction = (ShareFetchAction) task;
shareFetchAction.fetchBuffer().wakeup();
return new WakeupFuture();
} else {
return task;
}
});
}

/**
* If there is no pending task, set the pending task active.
* If wakeup was called before setting an active task, the current task will complete exceptionally with
* WakeupException right
* away.
* if there is an active task, throw exception.
* If there is no pending task, set the pending task active.
* If wakeup was called before setting an active task, the current task will complete exceptionally with
* WakeupException right away.
* If there is an active task, throw exception.
* @param currentTask
* @param <T>
* @return
Expand Down Expand Up @@ -105,6 +108,25 @@ public void setFetchAction(final FetchBuffer fetchBuffer) {
}
}

public void setShareFetchAction(final ShareFetchBuffer fetchBuffer) {
final AtomicBoolean throwWakeupException = new AtomicBoolean(false);
pendingTask.getAndUpdate(task -> {
if (task == null) {
return new ShareFetchAction(fetchBuffer);
} else if (task instanceof WakeupFuture) {
throwWakeupException.set(true);
return null;
} else if (task instanceof DisabledWakeups) {
return task;
}
// last active state is still active
throw new IllegalStateException("Last active task is still active");
});
if (throwWakeupException.get()) {
throw new WakeupException();
}
}

public void disableWakeups() {
pendingTask.set(new DisabledWakeups());
}
Expand All @@ -113,7 +135,7 @@ public void clearTask() {
pendingTask.getAndUpdate(task -> {
if (task == null) {
return null;
} else if (task instanceof ActiveFuture || task instanceof FetchAction) {
} else if (task instanceof ActiveFuture || task instanceof FetchAction || task instanceof ShareFetchAction) {
return null;
}
return task;
Expand Down Expand Up @@ -172,4 +194,17 @@ public FetchBuffer fetchBuffer() {
return fetchBuffer;
}
}

static class ShareFetchAction implements Wakeupable {

private final ShareFetchBuffer fetchBuffer;

public ShareFetchAction(ShareFetchBuffer fetchBuffer) {
this.fetchBuffer = fetchBuffer;
}

public ShareFetchBuffer fetchBuffer() {
return fetchBuffer;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,45 @@ public void testWakeupBeforeCallingPoll() {
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}

@Test
public void testWakeupAfterEmptyFetch() {
consumer = newConsumer();
final String topicName = "foo";
final int partition = 3;
doAnswer(invocation -> {
consumer.wakeup();
return ShareFetch.empty();
}).doAnswer(invocation -> ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));

consumer.subscribe(singletonList(topicName));

assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1)));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}

@Test
public void testWakeupAfterNonEmptyFetch() {
consumer = newConsumer();
final String topicName = "foo";
final int partition = 3;
final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), partition, topicName);
final ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(tip);
batch.addRecord(new ConsumerRecord<>(topicName, partition, 2, "key1", "value1"));
doAnswer(invocation -> {
consumer.wakeup();
final ShareFetch<String, String> fetch = ShareFetch.empty();
fetch.add(tip, batch);
return fetch;
}).when(fetchCollector).collect(Mockito.any(ShareFetchBuffer.class));

consumer.subscribe(singletonList(topicName));

// since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
// the previously ignored wake-up should not be ignored in the next call
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
}

@Test
public void testFailOnClosedConsumer() {
consumer = newConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ public void testWakeupFromFetchAction() {
}
}

@Test
public void testWakeupFromShareFetchAction() {
try (final ShareFetchBuffer fetchBuffer = mock(ShareFetchBuffer.class)) {
wakeupTrigger.setShareFetchAction(fetchBuffer);

wakeupTrigger.wakeup();

verify(fetchBuffer).wakeup();
final WakeupTrigger.Wakeupable wakeupable = wakeupTrigger.getPendingTask();
assertInstanceOf(WakeupTrigger.WakeupFuture.class, wakeupable);
}
}

@Test
public void testManualTriggerWhenWakeupCalled() {
wakeupTrigger.wakeup();
Expand Down