Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiJuWu committed Sep 28, 2024
1 parent 7ee1ed3 commit 6abb931
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,35 @@
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;

/**
* Event to perform {@link AsyncKafkaConsumer#seekToBeginning(Collection)}
* Event to perform {@link AsyncKafkaConsumer#seekToBeginning(Collection)} and {@link AsyncKafkaConsumer#seekToEnd(Collection)}
* in the background thread. This can avoid race conditions when subscription state is updated.
*/
public class ResetOffsetEvent extends CompletableApplicationEvent<Boolean> {

private final Collection<TopicPartition> topicPartitions;

private final OffsetResetStrategy offsetStrategy;
private final OffsetResetStrategy offsetResetStrategy;


public ResetOffsetEvent(Collection<TopicPartition> topicPartitions, OffsetResetStrategy offsetStrategy, long deadline) {
public ResetOffsetEvent(Collection<TopicPartition> topicPartitions, OffsetResetStrategy offsetResetStrategy, long deadline) {
super(Type.RESET_OFFSET, deadline);
this.topicPartitions = topicPartitions;
this.offsetStrategy = offsetStrategy;
this.topicPartitions = Collections.unmodifiableCollection(topicPartitions);
this.offsetResetStrategy = Objects.requireNonNull(offsetResetStrategy);
}

public Collection<TopicPartition> topicPartitions() {
return topicPartitions;
}

public OffsetResetStrategy offsetResetStrategy() {
return offsetStrategy;
return offsetResetStrategy;
}

@Override
public String toStringBase() {
return super.toStringBase() + ", topicPartitions=" + topicPartitions + ", offsetStrategy=" + offsetResetStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1889,33 +1889,33 @@ public void testUnsubscribeWithoutGroupId() {
@Test
public void testSeekToBeginning() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
TopicPartition topic = new TopicPartition("test", 0);
consumer = spy(newConsumer(
Collection<TopicPartition> topics = Collections.singletonList(new TopicPartition("test", 0));
consumer = newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id"));
"client-id");
completeResetOffsetEventSuccessfully();
consumer.seekToBeginning(Collections.singleton(topic));
verify(subscriptions).requestOffsetReset(Collections.singleton(topic), OffsetResetStrategy.EARLIEST);
consumer.seekToBeginning(topics);
verify(subscriptions).requestOffsetReset(Collections.unmodifiableCollection(topics), OffsetResetStrategy.EARLIEST);
}

@Test
public void testSeekToEnd() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
TopicPartition topic = new TopicPartition("test", 0);
consumer = spy(newConsumer(
Collection<TopicPartition> topics = Collections.singletonList(new TopicPartition("test", 0));
consumer = newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id"));
"client-id");
completeResetOffsetEventSuccessfully();
consumer.seekToEnd(Collections.singleton(topic));
verify(subscriptions).requestOffsetReset(Collections.singleton(topic), OffsetResetStrategy.LATEST);
consumer.seekToEnd(topics);
verify(subscriptions).requestOffsetReset(Collections.unmodifiableCollection(topics), OffsetResetStrategy.LATEST);
}

private void verifyUnsubscribeEvent(SubscriptionState subscriptions) {
Expand Down Expand Up @@ -1977,8 +1977,8 @@ private void completeResetOffsetEventSuccessfully() {
doAnswer(invocation -> {
ResetOffsetEvent event = invocation.getArgument(0);
consumer.subscriptions().requestOffsetReset(event.topicPartitions(), event.offsetResetStrategy());
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(ResetOffsetEvent.class));
return true;
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ResetOffsetEvent.class));
}

private void completeCommitAsyncApplicationEventSuccessfully() {
Expand Down

0 comments on commit 6abb931

Please sign in to comment.