Skip to content

Commit

Permalink
add sender test
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiJuWu committed Nov 14, 2024
1 parent 56723be commit c2c7b19
Showing 1 changed file with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,51 @@ public void testSimple() throws Exception {
assertEquals(offset, future.get().offset());
}

@Test
public void testDifferentAcksInSamePartition() throws Exception {
long offset = 0;
Future<RecordMetadata> futureWithDefaultAcks = appendToAccumulator(tp0, 0L, "key", "value");
Future<RecordMetadata> futureWithAcksOne = appendToAccumulator(tp0, 0L, "key", "value", (short) 1);
sender.runOnce(); // connect
sender.runOnce(); // send produce request
assertEquals(2, client.inFlightRequestCount(), "We should have two produce request in flight.");
assertEquals(2, sender.inFlightBatches(tp0).size());
assertTrue(client.hasInFlightRequests());
client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
sender.runOnce();
assertEquals(0, client.inFlightRequestCount(), "All requests completed.");
assertEquals(0, sender.inFlightBatches(tp0).size());
assertFalse(client.hasInFlightRequests());
sender.runOnce();
assertTrue(futureWithDefaultAcks.isDone(), "Request should be completed");
assertEquals(offset, futureWithDefaultAcks.get().offset());
assertTrue(futureWithAcksOne.isDone(), "Request should be completed");
assertEquals(offset, futureWithAcksOne.get().offset());
}

@Test
public void testSameAcksInDifferentPartition() throws Exception {
long offset = 0;
Future<RecordMetadata> futureWithDefaultAcks = appendToAccumulator(tp0, 0L, "key", "value");
Future<RecordMetadata> futureWithAcksOne = appendToAccumulator(tp1, 0L, "key", "value");
sender.runOnce(); // connect
sender.runOnce(); // send produce request
assertEquals(1, client.inFlightRequestCount(), "We should have a single produce request in flight.");
assertEquals(1, sender.inFlightBatches(tp0).size());
assertTrue(client.hasInFlightRequests());
client.respond(produceResponse(List.of(tp0, tp1), offset, Errors.NONE, 0));
sender.runOnce();
assertEquals(0, client.inFlightRequestCount(), "All requests completed.");
assertEquals(0, sender.inFlightBatches(tp0).size());
assertFalse(client.hasInFlightRequests());
sender.runOnce();
assertTrue(futureWithDefaultAcks.isDone(), "Request should be completed");
assertEquals(offset, futureWithDefaultAcks.get().offset());
assertTrue(futureWithAcksOne.isDone(), "Request should be completed");
assertEquals(offset, futureWithAcksOne.get().offset());
}

@Test
public void testMessageFormatDownConversion() throws Exception {
// this test case verifies the behavior when the version of the produce request supported by the
Expand Down Expand Up @@ -3599,8 +3644,27 @@ private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws Inter
}

private FutureRecordMetadata appendToAccumulator(TopicPartition tp, long timestamp, String key, String value) throws InterruptedException {
return appendToAccumulator(tp, timestamp, key, value, ACKS_ALL);
}

private FutureRecordMetadata appendToAccumulator(TopicPartition tp, long timestamp, String key, String value, short acks) throws InterruptedException {
return accumulator.append(tp.topic(), tp.partition(), timestamp, key.getBytes(), value.getBytes(), Record.EMPTY_HEADERS,
ACKS_ALL, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(), TestUtils.singletonCluster()).future;
acks, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(), TestUtils.singletonCluster()).future;
}

private ProduceResponse produceResponse(List<TopicPartition> tps, long offset, Errors error, int throttleTimeMs) {
return produceResponse(tps, offset, error, throttleTimeMs, -1L, null);
}

private ProduceResponse produceResponse(List<TopicPartition> tps, long offset, Errors error, int throttleTimeMs, long logStartOffset, String errorMessage) {
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = new HashMap<>();
for (TopicPartition tp : tps) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset,
RecordBatch.NO_TIMESTAMP, logStartOffset, Collections.emptyList(), errorMessage);
partResp.put(tp, resp);
}

return new ProduceResponse(partResp, throttleTimeMs);
}

@SuppressWarnings("deprecation")
Expand Down

0 comments on commit c2c7b19

Please sign in to comment.