diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResultTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResultTest.java index 736733febe42c..cd2250d365fad 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResultTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/DeleteConsumerGroupOffsetsResultTest.java @@ -63,7 +63,7 @@ public void testTopLevelErrorConstructor() throws InterruptedException { partitionFutures.completeExceptionally(Errors.GROUP_AUTHORIZATION_FAILED.exception()); DeleteConsumerGroupOffsetsResult topLevelErrorResult = new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions); - TestUtils.assertFutureThrows(topLevelErrorResult.all(), GroupAuthorizationException.class); + TestUtils.assertFutureThrows(GroupAuthorizationException.class, topLevelErrorResult.all()); } @Test @@ -79,9 +79,9 @@ public void testPartitionMissingInResponseErrorConstructor() throws InterruptedE DeleteConsumerGroupOffsetsResult missingPartitionResult = new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions); - TestUtils.assertFutureThrows(missingPartitionResult.all(), IllegalArgumentException.class); + TestUtils.assertFutureThrows(IllegalArgumentException.class, missingPartitionResult.all()); assertNull(missingPartitionResult.partitionResult(tpZero).get()); - TestUtils.assertFutureThrows(missingPartitionResult.partitionResult(tpOne), IllegalArgumentException.class); + TestUtils.assertFutureThrows(IllegalArgumentException.class, missingPartitionResult.partitionResult(tpOne)); } @Test @@ -110,9 +110,9 @@ private DeleteConsumerGroupOffsetsResult createAndVerifyPartitionLevelError() th DeleteConsumerGroupOffsetsResult partitionLevelErrorResult = new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions); - TestUtils.assertFutureThrows(partitionLevelErrorResult.all(), UnknownTopicOrPartitionException.class); + TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, partitionLevelErrorResult.all()); assertNull(partitionLevelErrorResult.partitionResult(tpZero).get()); - TestUtils.assertFutureThrows(partitionLevelErrorResult.partitionResult(tpOne), UnknownTopicOrPartitionException.class); + TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, partitionLevelErrorResult.partitionResult(tpOne)); return partitionLevelErrorResult; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 411569644a5fb..72d1161c0924c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -862,7 +862,7 @@ public void testTimeoutWithoutMetadata() throws Exception { KafkaFuture future = env.adminClient().createTopics( singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(1000)).all(); - TestUtils.assertFutureThrows(future, TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, future); } } @@ -945,7 +945,7 @@ public void testPropagatedMetadataFetchException() throws Exception { KafkaFuture future = env.adminClient().createTopics( singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(1000)).all(); - TestUtils.assertFutureThrows(future, SaslAuthenticationException.class); + TestUtils.assertFutureThrows(SaslAuthenticationException.class, future); } } @@ -975,7 +975,7 @@ public void testCreateTopicsPartialResponse() throws Exception { new NewTopic("myTopic2", Collections.singletonMap(0, asList(0, 1, 2)))), new CreateTopicsOptions().timeoutMs(10000)); topicsResult.values().get("myTopic").get(); - TestUtils.assertFutureThrows(topicsResult.values().get("myTopic2"), ApiException.class); + TestUtils.assertFutureThrows(ApiException.class, topicsResult.values().get("myTopic2")); } } @@ -1079,7 +1079,7 @@ public void testCreateTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti assertNull(result.values().get("topic1").get()); assertNull(result.values().get("topic2").get()); - TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); + TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -1124,10 +1124,10 @@ public void testCreateTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"), - ThrottlingQuotaExceededException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2") + ); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); + TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -1151,10 +1151,10 @@ public void testCreateTopicsDontRetryThrottlingExceptionWhenDisabled() throws Ex new CreateTopicsOptions().retryOnQuotaViolation(false)); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"), - ThrottlingQuotaExceededException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2") + ); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); + TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -1189,14 +1189,14 @@ public void testDeleteTopics() throws Exception { prepareDeleteTopicsResponse("myTopic", Errors.TOPIC_DELETION_DISABLED)); future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(future, TopicDeletionDisabledException.class); + TestUtils.assertFutureThrows(TopicDeletionDisabledException.class, future); env.kafkaClient().prepareResponse( expectDeleteTopicsRequestWithTopics("myTopic"), prepareDeleteTopicsResponse("myTopic", Errors.UNKNOWN_TOPIC_OR_PARTITION)); future = env.adminClient().deleteTopics(singletonList("myTopic"), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(future, UnknownTopicOrPartitionException.class); + TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, future); // With topic IDs Uuid topicId = Uuid.randomUuid(); @@ -1213,14 +1213,14 @@ public void testDeleteTopics() throws Exception { prepareDeleteTopicsResponseWithTopicId(topicId, Errors.TOPIC_DELETION_DISABLED)); future = env.adminClient().deleteTopics(TopicCollection.ofTopicIds(singletonList(topicId)), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(future, TopicDeletionDisabledException.class); + TestUtils.assertFutureThrows(TopicDeletionDisabledException.class, future); env.kafkaClient().prepareResponse( expectDeleteTopicsRequestWithTopicIds(topicId), prepareDeleteTopicsResponseWithTopicId(topicId, Errors.UNKNOWN_TOPIC_ID)); future = env.adminClient().deleteTopics(TopicCollection.ofTopicIds(singletonList(topicId)), new DeleteTopicsOptions()).all(); - TestUtils.assertFutureThrows(future, UnknownTopicIdException.class); + TestUtils.assertFutureThrows(UnknownTopicIdException.class, future); } } @@ -1239,7 +1239,7 @@ public void testDeleteTopicsPartialResponse() throws Exception { asList("myTopic", "myOtherTopic"), new DeleteTopicsOptions()); result.topicNameValues().get("myTopic").get(); - TestUtils.assertFutureThrows(result.topicNameValues().get("myOtherTopic"), ApiException.class); + TestUtils.assertFutureThrows(ApiException.class, result.topicNameValues().get("myOtherTopic")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1253,7 +1253,7 @@ public void testDeleteTopicsPartialResponse() throws Exception { TopicCollection.ofTopicIds(asList(topicId1, topicId2)), new DeleteTopicsOptions()); resultIds.topicIdValues().get(topicId1).get(); - TestUtils.assertFutureThrows(resultIds.topicIdValues().get(topicId2), ApiException.class); + TestUtils.assertFutureThrows(ApiException.class, resultIds.topicIdValues().get(topicId2)); } } @@ -1285,7 +1285,7 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti assertNull(result.topicNameValues().get("topic1").get()); assertNull(result.topicNameValues().get("topic2").get()); - TestUtils.assertFutureThrows(result.topicNameValues().get("topic3"), TopicExistsException.class); + TestUtils.assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1315,7 +1315,7 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti assertNull(resultIds.topicIdValues().get(topicId1).get()); assertNull(resultIds.topicIdValues().get(topicId2).get()); - TestUtils.assertFutureThrows(resultIds.topicIdValues().get(topicId3), UnknownTopicIdException.class); + TestUtils.assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); } } @@ -1357,10 +1357,10 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO time.sleep(defaultApiTimeout + 1); assertNull(result.topicNameValues().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.topicNameValues().get("topic2"), - ThrottlingQuotaExceededException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.topicNameValues().get("topic2") + ); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(result.topicNameValues().get("topic3"), TopicExistsException.class); + TestUtils.assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1394,10 +1394,10 @@ public void testDeleteTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO time.sleep(defaultApiTimeout + 1); assertNull(resultIds.topicIdValues().get(topicId1).get()); - e = TestUtils.assertFutureThrows(resultIds.topicIdValues().get(topicId2), - ThrottlingQuotaExceededException.class); + e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, resultIds.topicIdValues().get(topicId2) + ); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(resultIds.topicIdValues().get(topicId3), UnknownTopicIdException.class); + TestUtils.assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); } } @@ -1418,10 +1418,10 @@ public void testDeleteTopicsDontRetryThrottlingExceptionWhenDisabled() throws Ex new DeleteTopicsOptions().retryOnQuotaViolation(false)); assertNull(result.topicNameValues().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.topicNameValues().get("topic2"), - ThrottlingQuotaExceededException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.topicNameValues().get("topic2") + ); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(result.topicNameValues().get("topic3"), TopicExistsException.class); + TestUtils.assertFutureThrows(TopicExistsException.class, result.topicNameValues().get("topic3")); // With topic IDs Uuid topicId1 = Uuid.randomUuid(); @@ -1439,10 +1439,10 @@ public void testDeleteTopicsDontRetryThrottlingExceptionWhenDisabled() throws Ex new DeleteTopicsOptions().retryOnQuotaViolation(false)); assertNull(resultIds.topicIdValues().get(topicId1).get()); - e = TestUtils.assertFutureThrows(resultIds.topicIdValues().get(topicId2), - ThrottlingQuotaExceededException.class); + e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, resultIds.topicIdValues().get(topicId2) + ); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(resultIds.topicIdValues().get(topicId3), UnknownTopicIdException.class); + TestUtils.assertFutureThrows(UnknownTopicIdException.class, resultIds.topicIdValues().get(topicId3)); } } @@ -1474,14 +1474,14 @@ public void testInvalidTopicNames() throws Exception { List sillyTopicNames = asList("", null); Map> deleteFutures = env.adminClient().deleteTopics(sillyTopicNames).topicNameValues(); for (String sillyTopicName : sillyTopicNames) { - TestUtils.assertFutureThrows(deleteFutures.get(sillyTopicName), InvalidTopicException.class); + TestUtils.assertFutureThrows(InvalidTopicException.class, deleteFutures.get(sillyTopicName)); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); Map> describeFutures = env.adminClient().describeTopics(sillyTopicNames).topicNameValues(); for (String sillyTopicName : sillyTopicNames) { - TestUtils.assertFutureThrows(describeFutures.get(sillyTopicName), InvalidTopicException.class); + TestUtils.assertFutureThrows(InvalidTopicException.class, describeFutures.get(sillyTopicName)); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); @@ -1492,7 +1492,7 @@ public void testInvalidTopicNames() throws Exception { Map> createFutures = env.adminClient().createTopics(newTopics).values(); for (String sillyTopicName : sillyTopicNames) { - TestUtils.assertFutureThrows(createFutures .get(sillyTopicName), InvalidTopicException.class); + TestUtils.assertFutureThrows(InvalidTopicException.class, createFutures .get(sillyTopicName)); } assertEquals(0, env.kafkaClient().inFlightRequestCount()); } @@ -1788,7 +1788,7 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiErrorHandling() thro asList(topicName1, topicName0), new DescribeTopicsOptions() ); - TestUtils.assertFutureThrows(result.allTopicNames(), TopicAuthorizationException.class); + TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.allTopicNames()); } } @@ -1888,11 +1888,11 @@ public void testDescribeAcls() throws Exception { env.kafkaClient().prepareResponse(new DescribeAclsResponse(new DescribeAclsResponseData() .setErrorCode(Errors.SECURITY_DISABLED.code()) .setErrorMessage("Security is disabled"), ApiKeys.DESCRIBE_ACLS.latestVersion())); - TestUtils.assertFutureThrows(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class); + TestUtils.assertFutureThrows(SecurityDisabledException.class, env.adminClient().describeAcls(FILTER2).values()); // Test a call where we supply an invalid filter. - TestUtils.assertFutureThrows(env.adminClient().describeAcls(UNKNOWN_FILTER).values(), - InvalidRequestException.class); + TestUtils.assertFutureThrows(InvalidRequestException.class, env.adminClient().describeAcls(UNKNOWN_FILTER).values() + ); } } @@ -1984,9 +1984,9 @@ public void testCreateAcls() throws Exception { new CreateAclsResponseData.AclCreationResult())))); results = env.adminClient().createAcls(asList(ACL1, ACL2)); assertCollectionIs(results.values().keySet(), ACL1, ACL2); - TestUtils.assertFutureThrows(results.values().get(ACL1), SecurityDisabledException.class); + TestUtils.assertFutureThrows(SecurityDisabledException.class, results.values().get(ACL1)); results.values().get(ACL2).get(); - TestUtils.assertFutureThrows(results.all(), SecurityDisabledException.class); + TestUtils.assertFutureThrows(SecurityDisabledException.class, results.all()); } } @@ -2014,8 +2014,8 @@ public void testDeleteAcls() throws Exception { assertEquals(ACL1, filter1Results.values().get(0).binding()); assertNull(filter1Results.values().get(1).exception()); assertEquals(ACL2, filter1Results.values().get(1).binding()); - TestUtils.assertFutureThrows(filterResults.get(FILTER2), SecurityDisabledException.class); - TestUtils.assertFutureThrows(results.all(), SecurityDisabledException.class); + TestUtils.assertFutureThrows(SecurityDisabledException.class, filterResults.get(FILTER2)); + TestUtils.assertFutureThrows(SecurityDisabledException.class, results.all()); // Test a call where one deletion result has an error. env.kafkaClient().prepareResponse(new DeleteAclsResponse(new DeleteAclsResponseData() @@ -2035,7 +2035,7 @@ public void testDeleteAcls() throws Exception { ApiKeys.DELETE_ACLS.latestVersion())); results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2)); assertTrue(results.values().get(FILTER2).get().values().isEmpty()); - TestUtils.assertFutureThrows(results.all(), SecurityDisabledException.class); + TestUtils.assertFutureThrows(SecurityDisabledException.class, results.all()); // Test a call where there are no errors. env.kafkaClient().prepareResponse(new DeleteAclsResponse(new DeleteAclsResponseData() @@ -2106,7 +2106,7 @@ public void testElectLeaders() throws Exception { electionType, new HashSet<>(asList(topic1, topic2)), new ElectLeadersOptions().timeoutMs(100)); - TestUtils.assertFutureThrows(results.partitions(), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, results.partitions()); } } } @@ -2171,7 +2171,7 @@ public void testDescribeConfigsPartialResponse() { topic2)).values(); assertEquals(new HashSet<>(asList(topic, topic2)), result.keySet()); result.get(topic); - TestUtils.assertFutureThrows(result.get(topic2), ApiException.class); + TestUtils.assertFutureThrows(ApiException.class, result.get(topic2)); } } @@ -2579,7 +2579,7 @@ public void testCreatePartitionsRetryThrottlingExceptionWhenEnabled() throws Exc assertNull(result.values().get("topic1").get()); assertNull(result.values().get("topic2").get()); - TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); + TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -2625,10 +2625,10 @@ public void testCreatePartitionsRetryThrottlingExceptionWhenEnabledUntilRequestT time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"), - ThrottlingQuotaExceededException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2") + ); assertEquals(0, e.throttleTimeMs()); - TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); + TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -2653,10 +2653,10 @@ public void testCreatePartitionsDontRetryThrottlingExceptionWhenDisabled() throw counts, new CreatePartitionsOptions().retryOnQuotaViolation(false)); assertNull(result.values().get("topic1").get()); - ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"), - ThrottlingQuotaExceededException.class); + ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(ThrottlingQuotaExceededException.class, result.values().get("topic2") + ); assertEquals(1000, e.throttleTimeMs()); - TestUtils.assertFutureThrows(result.values().get("topic3"), TopicExistsException.class); + TestUtils.assertFutureThrows(TopicExistsException.class, result.values().get("topic3")); } } @@ -2691,7 +2691,7 @@ public void testDeleteRecordsTopicAuthorizationError() { recordsToDelete.put(partition, RecordsToDelete.beforeOffset(10L)); DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete); - TestUtils.assertFutureThrows(results.lowWatermarks().get(partition), TopicAuthorizationException.class); + TestUtils.assertFutureThrows(TopicAuthorizationException.class, results.lowWatermarks().get(partition)); } } @@ -2737,7 +2737,7 @@ public void testDeleteRecordsMultipleSends() throws Exception { DeleteRecordsResult results = env.adminClient().deleteRecords(recordsToDelete); assertEquals(3L, results.lowWatermarks().get(tp0).get().lowWatermark()); - TestUtils.assertFutureThrows(results.lowWatermarks().get(tp1), SaslAuthenticationException.class); + TestUtils.assertFutureThrows(SaslAuthenticationException.class, results.lowWatermarks().get(tp1)); } } @@ -2859,13 +2859,13 @@ public void testDescribeTopicsByIds() throws ExecutionException, InterruptedExce DescribeTopicsResult result1 = env.adminClient().describeTopics( TopicCollection.ofTopicIds(singletonList(nonExistID))); - TestUtils.assertFutureThrows(result1.allTopicIds(), UnknownTopicIdException.class); + TestUtils.assertFutureThrows(UnknownTopicIdException.class, result1.allTopicIds()); Exception e = assertThrows(Exception.class, () -> result1.allTopicIds().get(), "describe with non-exist topic ID should throw exception"); assertEquals(String.format("org.apache.kafka.common.errors.UnknownTopicIdException: TopicId %s not found.", nonExistID), e.getMessage()); DescribeTopicsResult result2 = env.adminClient().describeTopics( TopicCollection.ofTopicIds(singletonList(Uuid.ZERO_UUID))); - TestUtils.assertFutureThrows(result2.allTopicIds(), InvalidTopicException.class); + TestUtils.assertFutureThrows(InvalidTopicException.class, result2.allTopicIds()); e = assertThrows(Exception.class, () -> result2.allTopicIds().get(), "describe with non-exist topic ID should throw exception"); assertEquals("The given topic id 'AAAAAAAAAAAAAAAAAAAAAA' cannot be represented in a request.", e.getCause().getMessage()); @@ -2927,14 +2927,14 @@ public void testDescribeClusterHandleError() { .setErrorMessage(errorMessage))); final DescribeClusterResult result = env.adminClient().describeCluster(); - TestUtils.assertFutureThrows(result.clusterId(), - InvalidRequestException.class, errorMessage); - TestUtils.assertFutureThrows(result.controller(), - InvalidRequestException.class, errorMessage); - TestUtils.assertFutureThrows(result.nodes(), - InvalidRequestException.class, errorMessage); - TestUtils.assertFutureThrows(result.authorizedOperations(), - InvalidRequestException.class, errorMessage); + TestUtils.assertFutureThrows(InvalidRequestException.class, result.clusterId(), + errorMessage); + TestUtils.assertFutureThrows(InvalidRequestException.class, result.controller(), + errorMessage); + TestUtils.assertFutureThrows(InvalidRequestException.class, result.nodes(), + errorMessage); + TestUtils.assertFutureThrows(InvalidRequestException.class, result.authorizedOperations(), + errorMessage); } } @@ -3095,7 +3095,7 @@ public void testListGroups() throws Exception { env.cluster().nodeById(3)); final ListGroupsResult result = env.adminClient().listGroups(); - TestUtils.assertFutureThrows(result.all(), UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(6, listings.size()); @@ -3129,7 +3129,7 @@ public void testListGroupsMetadataFailure() throws Exception { Collections.emptyList())); final ListGroupsResult result = env.adminClient().listGroups(); - TestUtils.assertFutureThrows(result.all(), KafkaException.class); + TestUtils.assertFutureThrows(KafkaException.class, result.all()); } } @@ -3252,7 +3252,7 @@ public void testListGroupsWithTypesOlderBrokerVersion() { ListGroupsOptions options = new ListGroupsOptions().withTypes(singleton(GroupType.CLASSIC)); ListGroupsResult result = env.adminClient().listGroups(options); - TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3269,7 +3269,7 @@ public void testDescribeClusterHandleUnsupportedVersionForIncludingFencedBrokers request -> request instanceof DescribeClusterRequest); final DescribeClusterResult result = env.adminClient().describeCluster(new DescribeClusterOptions().includeFencedBrokers(true)); - TestUtils.assertFutureThrows(result.nodes(), UnsupportedVersionException.class); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.nodes()); } } @@ -3366,7 +3366,7 @@ public void testListConsumerGroups() throws Exception { env.cluster().nodeById(3)); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); - TestUtils.assertFutureThrows(result.all(), UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(3, listings.size()); @@ -3401,7 +3401,7 @@ public void testListConsumerGroupsMetadataFailure() throws Exception { Collections.emptyList())); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); - TestUtils.assertFutureThrows(result.all(), KafkaException.class); + TestUtils.assertFutureThrows(KafkaException.class, result.all()); } } @@ -3534,7 +3534,7 @@ public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exceptio options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE)); result = env.adminClient().listConsumerGroups(options); - TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3579,7 +3579,7 @@ public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception options = new ListConsumerGroupsOptions().withTypes(singleton(GroupType.CLASSIC)); result = env.adminClient().listConsumerGroups(options); - TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3616,7 +3616,7 @@ public void testOffsetCommitNumRetries() throws Exception { offsets.put(tp1, new OffsetAndMetadata(123L)); final AlterConsumerGroupOffsetsResult result = env.adminClient().alterConsumerGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(result.all(), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.all()); } } @@ -3647,9 +3647,9 @@ public void testOffsetCommitWithMultipleErrors() throws Exception { .alterConsumerGroupOffsets(GROUP_ID, offsets); assertNull(result.partitionResult(foo0).get()); - TestUtils.assertFutureThrows(result.partitionResult(foo1), UnknownTopicOrPartitionException.class); + TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, result.partitionResult(foo1)); - TestUtils.assertFutureThrows(result.all(), UnknownTopicOrPartitionException.class); + TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, result.all()); } } @@ -3728,7 +3728,7 @@ public void testDescribeConsumerGroupNumRetries() throws Exception { final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList(GROUP_ID)); - TestUtils.assertFutureThrows(result.all(), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.all()); } } @@ -4092,7 +4092,7 @@ public void testDescribeNonConsumerGroups() throws Exception { final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList(GROUP_ID)); - TestUtils.assertFutureThrows(result.describedGroups().get(GROUP_ID), IllegalArgumentException.class); + TestUtils.assertFutureThrows(IllegalArgumentException.class, result.describedGroups().get(GROUP_ID)); } } @@ -4113,7 +4113,7 @@ public void testDescribeGroupsWithBothUnsupportedApis() throws InterruptedExcept request -> request instanceof DescribeGroupsRequest); DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(singletonList(GROUP_ID)); - TestUtils.assertFutureThrows(result.describedGroups().get(GROUP_ID), UnsupportedVersionException.class); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.describedGroups().get(GROUP_ID)); } } @@ -4317,7 +4317,7 @@ public void testListConsumerGroupOffsetsNumRetries() throws Exception { final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets(GROUP_ID); - TestUtils.assertFutureThrows(result.partitionsToOffsetAndMetadata(), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.partitionsToOffsetAndMetadata()); } } @@ -4424,7 +4424,7 @@ public void testListConsumerGroupOffsetsNonRetriableErrors() throws Exception { ListConsumerGroupOffsetsResult errorResult = env.adminClient().listConsumerGroupOffsets(GROUP_ID); - TestUtils.assertFutureThrows(errorResult.partitionsToOffsetAndMetadata(), error.exception().getClass()); + TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionsToOffsetAndMetadata()); } } } @@ -4644,7 +4644,7 @@ public void testDeleteConsumerGroupsNumRetries() throws Exception { final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds); - TestUtils.assertFutureThrows(result.all(), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.all()); } } @@ -4745,7 +4745,7 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); - TestUtils.assertFutureThrows(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class); + TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.deletedGroups().get("groupId")); // Retriable errors should be retried env.kafkaClient().prepareResponse( @@ -4867,7 +4867,7 @@ public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception { final DeleteConsumerGroupOffsetsResult result = env.adminClient() .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(result.all(), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.all()); } } @@ -4958,8 +4958,8 @@ public void testDeleteConsumerGroupOffsets() throws Exception { GROUP_ID, Stream.of(tp1, tp2).collect(Collectors.toSet())); assertNull(errorResult.partitionResult(tp1).get()); - TestUtils.assertFutureThrows(errorResult.all(), GroupSubscribedToTopicException.class); - TestUtils.assertFutureThrows(errorResult.partitionResult(tp2), GroupSubscribedToTopicException.class); + TestUtils.assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.all()); + TestUtils.assertFutureThrows(GroupSubscribedToTopicException.class, errorResult.partitionResult(tp2)); assertThrows(IllegalArgumentException.class, () -> errorResult.partitionResult(tp3)); } } @@ -5030,8 +5030,8 @@ public void testDeleteConsumerGroupOffsetsNonRetriableErrors() throws Exception DeleteConsumerGroupOffsetsResult errorResult = env.adminClient() .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(errorResult.all(), error.exception().getClass()); - TestUtils.assertFutureThrows(errorResult.partitionResult(tp1), error.exception().getClass()); + TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.all()); + TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); } } } @@ -5079,8 +5079,8 @@ public void testDeleteConsumerGroupOffsetsFindCoordinatorNonRetriableErrors() th final DeleteConsumerGroupOffsetsResult errorResult = env.adminClient() .deleteConsumerGroupOffsets(GROUP_ID, Stream.of(tp1).collect(Collectors.toSet())); - TestUtils.assertFutureThrows(errorResult.all(), GroupAuthorizationException.class); - TestUtils.assertFutureThrows(errorResult.partitionResult(tp1), GroupAuthorizationException.class); + TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); + TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); } } @@ -5396,7 +5396,7 @@ public void testListShareGroups() throws Exception { env.cluster().nodeById(3)); final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE))); - TestUtils.assertFutureThrows(result.all(), UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); assertEquals(4, listings.size()); @@ -5431,7 +5431,7 @@ public void testListShareGroupsMetadataFailure() throws Exception { Collections.emptyList())); final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE))); - TestUtils.assertFutureThrows(result.all(), KafkaException.class); + TestUtils.assertFutureThrows(KafkaException.class, result.all()); } } @@ -5490,7 +5490,7 @@ public void testListShareGroupsWithStatesOlderBrokerVersion() { .setGroupId("share-group-1")))), env.cluster().nodeById(0)); ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE))); - TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -5726,10 +5726,10 @@ public void testIncrementalAlterConfigs() throws Exception { configs.put(groupResource, singletonList(alterConfigOp4)); AlterConfigsResult result = env.adminClient().incrementalAlterConfigs(configs); - TestUtils.assertFutureThrows(result.values().get(brokerResource), ClusterAuthorizationException.class); - TestUtils.assertFutureThrows(result.values().get(topicResource), InvalidRequestException.class); - TestUtils.assertFutureThrows(result.values().get(metricResource), InvalidRequestException.class); - TestUtils.assertFutureThrows(result.values().get(groupResource), InvalidConfigurationException.class); + TestUtils.assertFutureThrows(ClusterAuthorizationException.class, result.values().get(brokerResource)); + TestUtils.assertFutureThrows(InvalidRequestException.class, result.values().get(topicResource)); + TestUtils.assertFutureThrows(InvalidRequestException.class, result.values().get(metricResource)); + TestUtils.assertFutureThrows(InvalidConfigurationException.class, result.values().get(groupResource)); // Test a call where there are no errors. responseData = new IncrementalAlterConfigsResponseData(); @@ -5823,7 +5823,7 @@ public void testRemoveMembersFromGroupNumRetries() throws Exception { final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove)); - TestUtils.assertFutureThrows(result.all(), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.all()); } } @@ -5959,8 +5959,8 @@ public void testRemoveMembersFromGroupNonRetriableErrors() throws Exception { final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove)); - TestUtils.assertFutureThrows(result.all(), error.exception().getClass()); - TestUtils.assertFutureThrows(result.memberResult(memberToRemove), error.exception().getClass()); + TestUtils.assertFutureThrows(error.exception().getClass(), result.all()); + TestUtils.assertFutureThrows(error.exception().getClass(), result.memberResult(memberToRemove)); } } } @@ -5995,8 +5995,8 @@ public void testRemoveMembersFromGroup() throws Exception { MemberToRemove memberOne = new MemberToRemove(instanceOne); MemberToRemove memberTwo = new MemberToRemove(instanceTwo); - TestUtils.assertFutureThrows(unknownErrorResult.memberResult(memberOne), UnknownServerException.class); - TestUtils.assertFutureThrows(unknownErrorResult.memberResult(memberTwo), UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, unknownErrorResult.memberResult(memberOne)); + TestUtils.assertFutureThrows(UnknownServerException.class, unknownErrorResult.memberResult(memberTwo)); MemberResponse responseOne = new MemberResponse() .setGroupInstanceId(instanceOne) @@ -6017,8 +6017,8 @@ public void testRemoveMembersFromGroup() throws Exception { new RemoveMembersFromConsumerGroupOptions(membersToRemove) ); - TestUtils.assertFutureThrows(memberLevelErrorResult.all(), UnknownMemberIdException.class); - TestUtils.assertFutureThrows(memberLevelErrorResult.memberResult(memberOne), UnknownMemberIdException.class); + TestUtils.assertFutureThrows(UnknownMemberIdException.class, memberLevelErrorResult.all()); + TestUtils.assertFutureThrows(UnknownMemberIdException.class, memberLevelErrorResult.memberResult(memberOne)); assertNull(memberLevelErrorResult.memberResult(memberTwo).get()); // Return with missing member. @@ -6032,9 +6032,9 @@ public void testRemoveMembersFromGroup() throws Exception { new RemoveMembersFromConsumerGroupOptions(membersToRemove) ); - TestUtils.assertFutureThrows(missingMemberResult.all(), IllegalArgumentException.class); + TestUtils.assertFutureThrows(IllegalArgumentException.class, missingMemberResult.all()); // The memberOne was not included in the response. - TestUtils.assertFutureThrows(missingMemberResult.memberResult(memberOne), IllegalArgumentException.class); + TestUtils.assertFutureThrows(IllegalArgumentException.class, missingMemberResult.memberResult(memberOne)); assertNull(missingMemberResult.memberResult(memberTwo).get()); @@ -6186,8 +6186,8 @@ public void testAlterPartitionReassignments() throws Exception { AlterPartitionReassignmentsResult result1 = env.adminClient().alterPartitionReassignments(reassignments); Future future1 = result1.all(); Future future2 = result1.values().get(tp1); - TestUtils.assertFutureThrows(future1, UnknownServerException.class); - TestUtils.assertFutureThrows(future2, UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, future1); + TestUtils.assertFutureThrows(UnknownServerException.class, future2); // 2. NOT_CONTROLLER error handling AlterPartitionReassignmentsResponseData controllerErrResponseData = @@ -6238,7 +6238,7 @@ public void testAlterPartitionReassignments() throws Exception { ); env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(partitionLevelErrData)); AlterPartitionReassignmentsResult partitionLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments); - TestUtils.assertFutureThrows(partitionLevelErrResult.values().get(tp1), Errors.INVALID_REPLICA_ASSIGNMENT.exception().getClass()); + TestUtils.assertFutureThrows(Errors.INVALID_REPLICA_ASSIGNMENT.exception().getClass(), partitionLevelErrResult.values().get(tp1)); partitionLevelErrResult.values().get(tp2).get(); // 4. top-level error @@ -6257,9 +6257,9 @@ public void testAlterPartitionReassignments() throws Exception { ); env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(topLevelErrResponseData)); AlterPartitionReassignmentsResult topLevelErrResult = env.adminClient().alterPartitionReassignments(reassignments); - assertEquals(errorMessage, TestUtils.assertFutureThrows(topLevelErrResult.all(), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass()).getMessage()); - assertEquals(errorMessage, TestUtils.assertFutureThrows(topLevelErrResult.values().get(tp1), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass()).getMessage()); - assertEquals(errorMessage, TestUtils.assertFutureThrows(topLevelErrResult.values().get(tp2), Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass()).getMessage()); + assertEquals(errorMessage, TestUtils.assertFutureThrows(Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass(), topLevelErrResult.all()).getMessage()); + assertEquals(errorMessage, TestUtils.assertFutureThrows(Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass(), topLevelErrResult.values().get(tp1)).getMessage()); + assertEquals(errorMessage, TestUtils.assertFutureThrows(Errors.CLUSTER_AUTHORIZATION_FAILED.exception().getClass(), topLevelErrResult.values().get(tp2)).getMessage()); // 5. unrepresentable topic name error TopicPartition invalidTopicTP = new TopicPartition("", 0); @@ -6278,8 +6278,8 @@ public void testAlterPartitionReassignments() throws Exception { ); env.kafkaClient().prepareResponse(new AlterPartitionReassignmentsResponse(singlePartResponseData)); AlterPartitionReassignmentsResult unrepresentableTopicResult = env.adminClient().alterPartitionReassignments(invalidTopicReassignments); - TestUtils.assertFutureThrows(unrepresentableTopicResult.values().get(invalidTopicTP), InvalidTopicException.class); - TestUtils.assertFutureThrows(unrepresentableTopicResult.values().get(invalidPartitionTP), InvalidTopicException.class); + TestUtils.assertFutureThrows(InvalidTopicException.class, unrepresentableTopicResult.values().get(invalidTopicTP)); + TestUtils.assertFutureThrows(InvalidTopicException.class, unrepresentableTopicResult.values().get(invalidPartitionTP)); unrepresentableTopicResult.values().get(tp1).get(); // Test success scenario @@ -6348,7 +6348,7 @@ public void testListPartitionReassignments() throws Exception { env.kafkaClient().prepareResponse(new ListPartitionReassignmentsResponse(unknownTpData)); ListPartitionReassignmentsResult unknownTpResult = env.adminClient().listPartitionReassignments(new HashSet<>(asList(tp1, tp2))); - TestUtils.assertFutureThrows(unknownTpResult.reassignments(), UnknownTopicOrPartitionException.class); + TestUtils.assertFutureThrows(UnknownTopicOrPartitionException.class, unknownTpResult.reassignments()); // 3. Success ListPartitionReassignmentsResponseData responseData = new ListPartitionReassignmentsResponseData() @@ -6399,7 +6399,7 @@ public void testAlterConsumerGroupOffsets() throws Exception { assertNull(result.all().get()); assertNull(result.partitionResult(tp1).get()); assertNull(result.partitionResult(tp2).get()); - TestUtils.assertFutureThrows(result.partitionResult(tp3), IllegalArgumentException.class); + TestUtils.assertFutureThrows(IllegalArgumentException.class, result.partitionResult(tp3)); } } @@ -6468,8 +6468,8 @@ public void testAlterConsumerGroupOffsetsNonRetriableErrors() throws Exception { AlterConsumerGroupOffsetsResult errorResult = env.adminClient() .alterConsumerGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(errorResult.all(), error.exception().getClass()); - TestUtils.assertFutureThrows(errorResult.partitionResult(tp1), error.exception().getClass()); + TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.all()); + TestUtils.assertFutureThrows(error.exception().getClass(), errorResult.partitionResult(tp1)); } } } @@ -6521,8 +6521,8 @@ public void testAlterConsumerGroupOffsetsFindCoordinatorNonRetriableErrors() thr final AlterConsumerGroupOffsetsResult errorResult = env.adminClient() .alterConsumerGroupOffsets(GROUP_ID, offsets); - TestUtils.assertFutureThrows(errorResult.all(), GroupAuthorizationException.class); - TestUtils.assertFutureThrows(errorResult.partitionResult(tp1), GroupAuthorizationException.class); + TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.all()); + TestUtils.assertFutureThrows(GroupAuthorizationException.class, errorResult.partitionResult(tp1)); } } @@ -6696,7 +6696,7 @@ public void testListOffsetsNonRetriableErrors() throws Exception { partitions.put(tp0, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); - TestUtils.assertFutureThrows(result.all(), TopicAuthorizationException.class); + TestUtils.assertFutureThrows(TopicAuthorizationException.class, result.all()); } } @@ -6724,7 +6724,7 @@ public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() { ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp())); - TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -6772,7 +6772,7 @@ public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Ex put(tp1, OffsetSpec.latest()); }}); - TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.partitionResult(tp0)); ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get(); assertEquals(345L, tp1Offset.offset()); @@ -6834,7 +6834,7 @@ public void testListOffsetsHandlesFulfillmentTimeouts() throws Exception { put(tp1, OffsetSpec.latest()); } }); - TestUtils.assertFutureThrows(result.partitionResult(tp0), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.partitionResult(tp0)); ListOffsetsResultInfo tp1Result = result.partitionResult(tp1).get(); assertEquals(345L, tp1Result.offset()); assertEquals(543, tp1Result.leaderEpoch().get().intValue()); @@ -6897,7 +6897,7 @@ public void testListOffsetsUnsupportedNonMaxTimestamp() { ListOffsetsResult result = env.adminClient().listOffsets( Collections.singletonMap(tp0, OffsetSpec.latest())); - TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.partitionResult(tp0)); } } @@ -7196,56 +7196,56 @@ public void testDescribeMetadataQuorumFailure() { body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.NONE, false, false, false, false, false)); KafkaFuture future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(future, InvalidRequestException.class); + TestUtils.assertFutureThrows(InvalidRequestException.class, future); // Test incorrect topic count env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, true, false, false, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(future, UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, future); // Test incorrect topic name env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, true, false, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(future, UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, future); // Test incorrect partition count env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, true, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(future, UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, future); // Test incorrect partition index env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, false, false, true, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(future, UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, future); // Test partition level error env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.INVALID_REQUEST, false, false, false, false, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(future, InvalidRequestException.class); + TestUtils.assertFutureThrows(InvalidRequestException.class, future); // Test all incorrect and no errors env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, true, true, true, true, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(future, UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, future); // Test all incorrect and both errors env.kafkaClient().prepareResponse( body -> body instanceof DescribeQuorumRequest, prepareDescribeQuorumResponse(Errors.INVALID_REQUEST, Errors.INVALID_REQUEST, true, true, true, true, false)); future = env.adminClient().describeMetadataQuorum().quorumInfo(); - TestUtils.assertFutureThrows(future, Errors.INVALID_REQUEST.exception().getClass()); + TestUtils.assertFutureThrows(Errors.INVALID_REQUEST.exception().getClass(), future); } } @@ -7460,7 +7460,7 @@ public void testListOffsetsMetadataNonRetriableErrors( partitions.put(tp1, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); - TestUtils.assertFutureThrows(result.all(), expectedFailure); + TestUtils.assertFutureThrows(expectedFailure, result.all()); } } @@ -7528,8 +7528,8 @@ public void testListOffsetsPartialResponse() throws Exception { partitions.put(tp1, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); assertNotNull(result.partitionResult(tp0).get()); - TestUtils.assertFutureThrows(result.partitionResult(tp1), ApiException.class); - TestUtils.assertFutureThrows(result.all(), ApiException.class); + TestUtils.assertFutureThrows(ApiException.class, result.partitionResult(tp1)); + TestUtils.assertFutureThrows(ApiException.class, result.all()); } } @@ -7648,7 +7648,7 @@ private void testApiTimeout(int requestTimeoutMs, }, "Timed out waiting for Metadata request to be sent"); time.sleep(requestTimeoutMs + 1); - TestUtils.assertFutureThrows(result.future, TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.future); } } @@ -7688,7 +7688,7 @@ public void testRequestTimeoutExceedingDefaultApiTimeout() throws Exception { // Now sleep the remaining time for the request timeout to expire time.sleep(60000); - TestUtils.assertFutureThrows(result.future, TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.future); } } @@ -7778,8 +7778,8 @@ public void testAlterClientQuotas() throws Exception { AlterClientQuotasResult result = env.adminClient().alterClientQuotas(entries); result.values().get(goodEntity); - TestUtils.assertFutureThrows(result.values().get(unauthorizedEntity), ClusterAuthorizationException.class); - TestUtils.assertFutureThrows(result.values().get(invalidEntity), InvalidRequestException.class); + TestUtils.assertFutureThrows(ClusterAuthorizationException.class, result.values().get(unauthorizedEntity)); + TestUtils.assertFutureThrows(InvalidRequestException.class, result.values().get(invalidEntity)); // ensure immutable assertThrows(UnsupportedOperationException.class, () -> result.values().put(newClientQuotaEntity(ClientQuotaEntity.USER, "user-3"), null)); @@ -7818,7 +7818,7 @@ public void testAlterReplicaLogDirsLogDirNotFound() throws Exception { logDirs.put(tpr1, "/data1"); AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); assertNull(result.values().get(tpr0).get()); - TestUtils.assertFutureThrows(result.values().get(tpr1), LogDirNotFoundException.class); + TestUtils.assertFutureThrows(LogDirNotFoundException.class, result.values().get(tpr1)); } } @@ -7849,7 +7849,7 @@ public void testAlterReplicaLogDirsPartialResponse() throws Exception { logDirs.put(tpr2, "/data1"); AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); assertNull(result.values().get(tpr1).get()); - TestUtils.assertFutureThrows(result.values().get(tpr2), ApiException.class); + TestUtils.assertFutureThrows(ApiException.class, result.values().get(tpr2)); } } @@ -7885,7 +7885,7 @@ public void testAlterReplicaLogDirsPartialFailure() throws Exception { // Advance time past the default api timeout to time out the inflight request time.sleep(defaultApiTimeout + 1); - TestUtils.assertFutureThrows(result.values().get(tpr1), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.values().get(tpr1)); assertNull(result.values().get(tpr2).get()); } } @@ -8073,7 +8073,7 @@ public void testDescribeLogDirsPartialFailure() throws Exception { // Advance time past the default api timeout to time out the inflight request time.sleep(defaultApiTimeout + 1); - TestUtils.assertFutureThrows(result.descriptions().get(0), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.descriptions().get(0)); assertNotNull(result.descriptions().get(1).get()); } } @@ -8132,7 +8132,7 @@ public void testUnregisterBrokerFailure() { UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId); // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(result.all(), Errors.UNKNOWN_SERVER_ERROR.exception().getClass()); + TestUtils.assertFutureThrows(Errors.UNKNOWN_SERVER_ERROR.exception().getClass(), result.all()); } } @@ -8166,7 +8166,7 @@ public void testUnregisterBrokerTimeoutAndFailureRetry() { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(result.all(), Errors.UNKNOWN_SERVER_ERROR.exception().getClass()); + TestUtils.assertFutureThrows(Errors.UNKNOWN_SERVER_ERROR.exception().getClass(), result.all()); } } @@ -8183,7 +8183,7 @@ public void testUnregisterBrokerTimeoutMaxRetry() { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(result.all(), Errors.REQUEST_TIMED_OUT.exception().getClass()); + TestUtils.assertFutureThrows(Errors.REQUEST_TIMED_OUT.exception().getClass(), result.all()); } } @@ -8200,7 +8200,7 @@ public void testUnregisterBrokerTimeoutMaxWait() { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(result.all(), Errors.REQUEST_TIMED_OUT.exception().getClass()); + TestUtils.assertFutureThrows(Errors.REQUEST_TIMED_OUT.exception().getClass(), result.all()); } } @@ -8260,7 +8260,7 @@ public void testDescribeProducersTimeout(boolean timeoutInMetadataLookup) throws "Future failed to timeout after expiration of timeout"); assertTrue(result.all().isCompletedExceptionally()); - TestUtils.assertFutureThrows(result.all(), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, result.all()); assertFalse(env.kafkaClient().hasInFlightRequests()); } } @@ -8895,7 +8895,7 @@ public void testListClientMetricsResourcesNotSupported() { // Validate response assertNotNull(result.all()); - TestUtils.assertFutureThrows(result.all(), Errors.UNSUPPORTED_VERSION.exception().getClass()); + TestUtils.assertFutureThrows(Errors.UNSUPPORTED_VERSION.exception().getClass(), result.all()); } } @@ -9041,7 +9041,7 @@ public void testAddRaftVoterRequest(boolean fail, boolean sendClusterId) throws options); assertNotNull(result.all()); if (fail) { - TestUtils.assertFutureThrows(result.all(), Errors.DUPLICATE_VOTER.exception().getClass()); + TestUtils.assertFutureThrows(Errors.DUPLICATE_VOTER.exception().getClass(), result.all()); } else { result.all().get(); } @@ -9126,7 +9126,7 @@ public void testRemoveRaftVoterRequest(boolean fail, boolean sendClusterId) thro options); assertNotNull(result.all()); if (fail) { - TestUtils.assertFutureThrows(result.all(), Errors.VOTER_NOT_FOUND.exception().getClass()); + TestUtils.assertFutureThrows(Errors.VOTER_NOT_FOUND.exception().getClass(), result.all()); } else { result.all().get(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ListTransactionsResultTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ListTransactionsResultTest.java index 853602b24f4d2..01556391ad313 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/ListTransactionsResultTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/ListTransactionsResultTest.java @@ -42,9 +42,9 @@ public class ListTransactionsResultTest { @Test public void testAllFuturesFailIfLookupFails() { future.completeExceptionally(new KafkaException()); - assertFutureThrows(result.all(), KafkaException.class); - assertFutureThrows(result.allByBrokerId(), KafkaException.class); - assertFutureThrows(result.byBrokerId(), KafkaException.class); + assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.allByBrokerId()); + assertFutureThrows(KafkaException.class, result.byBrokerId()); } @Test @@ -111,9 +111,9 @@ public void testPartialFailure() throws Exception { assertEquals(broker1Listings, resultBrokerFutures.get(1).get()); // Everything else should fail - assertFutureThrows(result.all(), KafkaException.class); - assertFutureThrows(result.allByBrokerId(), KafkaException.class); - assertFutureThrows(resultBrokerFutures.get(2), KafkaException.class); + assertFutureThrows(KafkaException.class, result.all()); + assertFutureThrows(KafkaException.class, result.allByBrokerId()); + assertFutureThrows(KafkaException.class, resultBrokerFutures.get(2)); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java index 1b0ac0ba0c462..dfc12c578c8b1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResultTest.java @@ -62,7 +62,7 @@ public void testTopLevelErrorConstructor() throws InterruptedException { memberFutures.completeExceptionally(Errors.GROUP_AUTHORIZATION_FAILED.exception()); RemoveMembersFromConsumerGroupResult topLevelErrorResult = new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove); - TestUtils.assertFutureThrows(topLevelErrorResult.all(), GroupAuthorizationException.class); + TestUtils.assertFutureThrows(GroupAuthorizationException.class, topLevelErrorResult.all()); } @Test @@ -78,9 +78,9 @@ public void testMemberMissingErrorInRequestConstructor() throws InterruptedExcep RemoveMembersFromConsumerGroupResult missingMemberResult = new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove); - TestUtils.assertFutureThrows(missingMemberResult.all(), IllegalArgumentException.class); + TestUtils.assertFutureThrows(IllegalArgumentException.class, missingMemberResult.all()); assertNull(missingMemberResult.memberResult(instanceOne).get()); - TestUtils.assertFutureThrows(missingMemberResult.memberResult(instanceTwo), IllegalArgumentException.class); + TestUtils.assertFutureThrows(IllegalArgumentException.class, missingMemberResult.memberResult(instanceTwo)); } @Test @@ -111,9 +111,9 @@ private RemoveMembersFromConsumerGroupResult createAndVerifyMemberLevelError() t RemoveMembersFromConsumerGroupResult memberLevelErrorResult = new RemoveMembersFromConsumerGroupResult(memberFutures, membersToRemove); - TestUtils.assertFutureThrows(memberLevelErrorResult.all(), FencedInstanceIdException.class); + TestUtils.assertFutureThrows(FencedInstanceIdException.class, memberLevelErrorResult.all()); assertNull(memberLevelErrorResult.memberResult(instanceOne).get()); - TestUtils.assertFutureThrows(memberLevelErrorResult.memberResult(instanceTwo), FencedInstanceIdException.class); + TestUtils.assertFutureThrows(FencedInstanceIdException.class, memberLevelErrorResult.memberResult(instanceTwo)); return memberLevelErrorResult; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyIntegrationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyIntegrationTest.java index 25ba77d1b4dae..af69717375d24 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyIntegrationTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategyIntegrationTest.java @@ -77,7 +77,7 @@ public void testFatalLookupError() { driver.onFailure(time.milliseconds(), spec, new UnknownServerException()); assertTrue(result.all().isDone()); - TestUtils.assertFutureThrows(result.all(), UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); assertEquals(Collections.emptyList(), driver.poll()); } @@ -200,7 +200,7 @@ public void testFatalFulfillmentError() throws Exception { driver.onFailure(time.milliseconds(), requestSpec, new UnknownServerException()); assertTrue(future.isDone()); - TestUtils.assertFutureThrows(future, UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, future); assertEquals(Collections.emptyList(), driver.poll()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java index 4e03ae7d952f6..778502505fb80 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/PartitionLeaderStrategyIntegrationTest.java @@ -367,7 +367,7 @@ public void testFatalLookupError() { driver.onFailure(time.milliseconds(), spec, new UnknownServerException()); assertTrue(result.all().get(tp0).isDone()); - TestUtils.assertFutureThrows(result.all().get(tp0), UnknownServerException.class); + TestUtils.assertFutureThrows(UnknownServerException.class, result.all().get(tp0)); assertEquals(Collections.emptyList(), driver.poll()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 1b7ed8599238d..a25c9b95c02a6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -507,7 +507,7 @@ public void testCommitSyncFailsWithCommitFailedExceptionIfUnknownMemberId() { NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds()); assertEquals(0, res.unsentRequests.size()); assertTrue(commitResult.isDone()); - assertFutureThrows(commitResult, CommitFailedException.class); + assertFutureThrows(CommitFailedException.class, commitResult); } @Test @@ -528,7 +528,7 @@ public void testCommitSyncFailsWithCommitFailedExceptionOnStaleMemberEpoch() { // Commit should fail with CommitFailedException assertTrue(commitResult.isDone()); - assertFutureThrows(commitResult, CommitFailedException.class); + assertFutureThrows(CommitFailedException.class, commitResult); } /** @@ -583,7 +583,7 @@ public void testCommitAsyncFailsWithRetriableOnCoordinatorDisconnected() { // Commit should mark the coordinator unknown and fail with RetriableCommitFailedException. assertTrue(commitResult.isDone()); - assertFutureThrows(commitResult, RetriableCommitFailedException.class); + assertFutureThrows(RetriableCommitFailedException.class, commitResult); assertCoordinatorDisconnectHandling(); } @@ -806,10 +806,10 @@ public void testOffsetFetchRequestTimeoutRequests(final Errors error, assertFalse(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); NetworkClientDelegate.PollResult poll = commitRequestManager.poll(time.milliseconds()); mimicResponse(error, poll); - futures.forEach(f -> assertFutureThrows(f, expectedExceptionClass)); + futures.forEach(f -> assertFutureThrows(expectedExceptionClass, f)); assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); } else { - futures.forEach(f -> assertFutureThrows(f, expectedExceptionClass)); + futures.forEach(f -> assertFutureThrows(expectedExceptionClass, f)); assertEmptyPendingRequests(commitRequestManager); } } @@ -931,7 +931,7 @@ public void testOffsetCommitRequestErroredRequestsNotRetriedForAsyncCommit(final assertTrue(commitResult.isDone()); assertTrue(commitResult.isCompletedExceptionally()); if (error.exception() instanceof RetriableException) { - assertFutureThrows(commitResult, RetriableCommitFailedException.class); + assertFutureThrows(RetriableCommitFailedException.class, commitResult); } // We expect that the request should not have been retried on this async commit. @@ -994,7 +994,7 @@ public void testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExp assertEquals(0, res.unsentRequests.size()); assertTrue(commitResult.isDone()); - assertFutureThrows(commitResult, expectedExceptionClass); + assertFutureThrows(expectedExceptionClass, commitResult); } /** @@ -1022,7 +1022,7 @@ public void testOffsetCommitAsyncFailedWithRetriableThrowsRetriableCommitExcepti assertExceptionHandling(commitRequestManager, retriableError, false); // Request should complete with a RetriableCommitException - assertFutureThrows(commitResult, RetriableCommitFailedException.class); + assertFutureThrows(RetriableCommitFailedException.class, commitResult); } @ParameterizedTest @@ -1372,7 +1372,7 @@ private void testRetriable(final CommitRequestManager commitRequestManager, mimicResponse(error, poll); futures.forEach(f -> { assertTrue(f.isCompletedExceptionally()); - assertFutureThrows(f, TimeoutException.class); + assertFutureThrows(TimeoutException.class, f); }); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index 3cdc0ac4845cb..4ee865b42b7d9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -3412,7 +3412,7 @@ public void testPollWithCreateFetchRequestsError() { assertFalse(future.isDone()); assertDoesNotThrow(() -> sendFetches(false)); - assertFutureThrows(future, AuthenticationException.class); + assertFutureThrows(AuthenticationException.class, future); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java index 81eb5187fecfb..4ff967e1f021a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java @@ -129,7 +129,7 @@ public void testTimeoutBeforeSend() throws Exception { time.sleep(REQUEST_TIMEOUT_MS); ncd.poll(0, time.milliseconds()); assertTrue(unsentRequest.future().isDone()); - TestUtils.assertFutureThrows(unsentRequest.future(), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, unsentRequest.future()); } } @@ -142,7 +142,7 @@ public void testTimeoutAfterSend() throws Exception { time.sleep(REQUEST_TIMEOUT_MS); ncd.poll(0, time.milliseconds()); assertTrue(unsentRequest.future().isDone()); - TestUtils.assertFutureThrows(unsentRequest.future(), DisconnectException.class); + TestUtils.assertFutureThrows(DisconnectException.class, unsentRequest.future()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index 2f92740c41411..cfbf13a1dab89 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -565,7 +565,7 @@ public void testResetOffsetsAuthorizationFailure() { CompletableFuture nextReset = assertDoesNotThrow(() -> requestManager.resetPositionsIfNeeded()); assertEquals(0, requestManager.requestsToSend()); assertTrue(nextReset.isCompletedExceptionally()); - assertFutureThrows(nextReset, TopicAuthorizationException.class); + assertFutureThrows(TopicAuthorizationException.class, nextReset); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 911c028f728b7..e5a69b3d6e9c5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -438,7 +438,7 @@ public void testR2JPatternSubscriptionEventFailureWithMixedSubscriptionType() { processor.process(event); verify(subscriptionState).subscribe(pattern, listener); - Exception thrown = assertFutureThrows(event.future(), mixedSubscriptionError.getClass()); + Exception thrown = assertFutureThrows(mixedSubscriptionError.getClass(), event.future()); assertEquals(mixedSubscriptionError, thrown); } @@ -462,7 +462,7 @@ public void testSyncCommitEventWithoutCommitRequestManager() { setupProcessor(false); processor.process(event); - assertFutureThrows(event.future(), KafkaException.class); + assertFutureThrows(KafkaException.class, event.future()); } @Test @@ -476,7 +476,7 @@ public void testSyncCommitEventWithException() { processor.process(event); verify(commitRequestManager).commitSync(Optional.empty(), 12345); - assertFutureThrows(event.future(), IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, event.future()); } @ParameterizedTest @@ -499,7 +499,7 @@ public void testAsyncCommitEventWithoutCommitRequestManager() { setupProcessor(false); processor.process(event); - assertFutureThrows(event.future(), KafkaException.class); + assertFutureThrows(KafkaException.class, event.future()); } @Test @@ -513,7 +513,7 @@ public void testAsyncCommitEventWithException() { processor.process(event); verify(commitRequestManager).commitAsync(Optional.empty()); - assertFutureThrows(event.future(), IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, event.future()); } private static Stream offsetsGenerator() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 3af4a7185cc0e..02525989283d3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -1294,7 +1294,7 @@ public void testInitTransactionsResponseAfterTimeout() throws Exception { "Timed out while waiting for expected `InitProducerId` request to be sent"); time.sleep(maxBlockMs); - TestUtils.assertFutureThrows(future, TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, future); client.respond(initProducerIdResponse(1L, (short) 5, Errors.NONE)); @@ -1532,7 +1532,7 @@ public void testCommitTransactionWithRecordTooLargeException() throws Exception client.prepareResponse(endTxnResponse(Errors.NONE)); producer.beginTransaction(); - TestUtils.assertFutureThrows(producer.send(largeRecord), RecordTooLargeException.class); + TestUtils.assertFutureThrows(RecordTooLargeException.class, producer.send(largeRecord)); assertThrows(KafkaException.class, producer::commitTransaction); } } @@ -1569,7 +1569,7 @@ public void testCommitTransactionWithMetadataTimeoutForMissingTopic() throws Exc producer.initTransactions(); producer.beginTransaction(); - TestUtils.assertFutureThrows(producer.send(record), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, producer.send(record)); assertThrows(KafkaException.class, producer::commitTransaction); } } @@ -1606,7 +1606,7 @@ public void testCommitTransactionWithMetadataTimeoutForPartitionOutOfRange() thr producer.initTransactions(); producer.beginTransaction(); - TestUtils.assertFutureThrows(producer.send(record), TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, producer.send(record)); assertThrows(KafkaException.class, producer::commitTransaction); } } @@ -1645,7 +1645,7 @@ public void testCommitTransactionWithSendToInvalidTopic() throws Exception { producer.initTransactions(); producer.beginTransaction(); - TestUtils.assertFutureThrows(producer.send(record), InvalidTopicException.class); + TestUtils.assertFutureThrows(InvalidTopicException.class, producer.send(record)); assertThrows(KafkaException.class, producer::commitTransaction); } } @@ -2080,7 +2080,7 @@ public void testSendToInvalidTopic() throws Exception { assertEquals(Collections.singleton(invalidTopicName), metadata.fetch().invalidTopics(), "Cluster has incorrect invalid topic list."); - TestUtils.assertFutureThrows(future, InvalidTopicException.class); + TestUtils.assertFutureThrows(InvalidTopicException.class, future); producer.close(Duration.ofMillis(0)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index ab9a56f2b3eb7..38ac88e95ca38 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -352,7 +352,7 @@ private void testCompleteExceptionally( for (int i = 0; i < futures.size(); i++) { FutureRecordMetadata future = futures.get(i); - RuntimeException caughtException = TestUtils.assertFutureThrows(future, RuntimeException.class); + RuntimeException caughtException = TestUtils.assertFutureThrows(RuntimeException.class, future); RuntimeException expectedException = recordExceptions.apply(i); assertEquals(expectedException, caughtException); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index f66ce1f906076..56daddfd53ca8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -2534,15 +2534,15 @@ public void testRecordErrorPropagatedToApplication() throws InterruptedException Integer index = futureEntry.getKey(); if (index == 0 || index == 2) { - InvalidRecordException exception = TestUtils.assertFutureThrows(future, InvalidRecordException.class); + InvalidRecordException exception = TestUtils.assertFutureThrows(InvalidRecordException.class, future); assertInstanceOf(InvalidRecordException.class, exception); assertEquals(index.toString(), exception.getMessage()); } else if (index == 3) { - InvalidRecordException exception = TestUtils.assertFutureThrows(future, InvalidRecordException.class); + InvalidRecordException exception = TestUtils.assertFutureThrows(InvalidRecordException.class, future); assertInstanceOf(InvalidRecordException.class, exception); assertEquals(Errors.INVALID_RECORD.message(), exception.getMessage()); } else { - KafkaException exception = TestUtils.assertFutureThrows(future, KafkaException.class); + KafkaException exception = TestUtils.assertFutureThrows(KafkaException.class, future); assertEquals(KafkaException.class, exception.getClass()); } } @@ -2938,7 +2938,7 @@ public void testTransactionAbortedExceptionOnAbortWithoutError() throws Interrup // drain all the unsent batches with a TransactionAbortedException. sender.runOnce(); // Now attempt to fetch the result for the record. - TestUtils.assertFutureThrows(metadata, TransactionAbortedException.class); + TestUtils.assertFutureThrows(TransactionAbortedException.class, metadata); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 579c8d7fbb251..0d582bf80168d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -769,7 +769,7 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), thi // is reached even though the delivery timeout has expired and the // future has completed exceptionally. assertTrue(responseFuture1.isDone()); - TestUtils.assertFutureThrows(responseFuture1, TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, responseFuture1); assertFalse(transactionManager.hasInFlightRequest()); assertEquals(1, client.inFlightRequestCount()); @@ -1527,8 +1527,8 @@ public void testTopicAuthorizationFailureInAddPartitions() throws InterruptedExc assertAbortableError(TopicAuthorizationException.class); sender.runOnce(); - TestUtils.assertFutureThrows(firstPartitionAppend, TransactionAbortedException.class); - TestUtils.assertFutureThrows(secondPartitionAppend, TransactionAbortedException.class); + TestUtils.assertFutureThrows(TransactionAbortedException.class, firstPartitionAppend); + TestUtils.assertFutureThrows(TransactionAbortedException.class, secondPartitionAppend); } @Test @@ -1575,8 +1575,8 @@ public void testCommitWithTopicAuthorizationFailureInAddPartitionsInFlight() thr // the pending transaction commit. sender.runOnce(); assertTrue(commitResult.isCompleted()); - TestUtils.assertFutureThrows(firstPartitionAppend, TopicAuthorizationException.class); - TestUtils.assertFutureThrows(secondPartitionAppend, TopicAuthorizationException.class); + TestUtils.assertFutureThrows(TopicAuthorizationException.class, firstPartitionAppend); + TestUtils.assertFutureThrows(TopicAuthorizationException.class, secondPartitionAppend); assertInstanceOf(TopicAuthorizationException.class, commitResult.error()); } @@ -2184,7 +2184,7 @@ public void testDisallowCommitOnProduceFailure() throws InterruptedException { runUntil(commitResult::isCompleted); // commit should be cancelled with exception without being sent. assertThrows(KafkaException.class, commitResult::await); - TestUtils.assertFutureThrows(responseFuture, OutOfOrderSequenceException.class); + TestUtils.assertFutureThrows(OutOfOrderSequenceException.class, responseFuture); // Commit is not allowed, so let's abort and try again. TransactionalRequestResult abortResult = transactionManager.beginAbort(); @@ -2373,7 +2373,7 @@ public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedE assertTrue(abortResult.isSuccessful()); assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now. - TestUtils.assertFutureThrows(responseFuture, TransactionAbortedException.class); + TestUtils.assertFutureThrows(TransactionAbortedException.class, responseFuture); } @Test @@ -2399,7 +2399,7 @@ public void testAbortResendsAddPartitionErrorIfRetried() throws InterruptedExcep assertTrue(abortResult.isSuccessful()); assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now. - TestUtils.assertFutureThrows(responseFuture, TransactionAbortedException.class); + TestUtils.assertFutureThrows(TransactionAbortedException.class, responseFuture); } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java index 821f99a3e4317..68bd5e7879fe1 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java @@ -106,7 +106,7 @@ public void testProducerWithInvalidCredentials() { try (KafkaProducer producer = new KafkaProducer<>(props, serializer, serializer)) { ProducerRecord record = new ProducerRecord<>(topic, "message"); Future future = producer.send(record); - TestUtils.assertFutureThrows(future, SaslAuthenticationException.class); + TestUtils.assertFutureThrows(SaslAuthenticationException.class, future); } } @@ -116,7 +116,7 @@ public void testAdminClientWithInvalidCredentials() { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); try (Admin client = Admin.create(props)) { KafkaFuture> future = client.describeTopics(Collections.singleton("test")).allTopicNames(); - TestUtils.assertFutureThrows(future, SaslAuthenticationException.class); + TestUtils.assertFutureThrows(SaslAuthenticationException.class, future); } } diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index f0d0a37b6a8cb..47b21353a2d68 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -583,12 +583,12 @@ public static Set generateRandomTopicPartitions(int numTopic, in * Assert that a future raises an expected exception cause type. Return the exception cause * if the assertion succeeds; otherwise raise AssertionError. * - * @param future The future to await - * @param exceptionCauseClass Class of the expected exception cause * @param Exception cause type parameter + * @param exceptionCauseClass Class of the expected exception cause + * @param future The future to await * @return The caught exception cause */ - public static T assertFutureThrows(Future future, Class exceptionCauseClass) { + public static T assertFutureThrows(Class exceptionCauseClass, Future future) { ExecutionException exception = assertThrows(ExecutionException.class, future::get); Throwable cause = exception.getCause(); assertEquals(exceptionCauseClass, cause.getClass(), @@ -598,11 +598,10 @@ public static T assertFutureThrows(Future future, Class } public static void assertFutureThrows( - Future future, - Class expectedCauseClassApiException, - String expectedMessage + Class expectedCauseClassApiException, Future future, + String expectedMessage ) { - T receivedException = assertFutureThrows(future, expectedCauseClassApiException); + T receivedException = assertFutureThrows(expectedCauseClassApiException, future); assertEquals(expectedMessage, receivedException.getMessage()); } diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index 9e4e6f7bb9b44..c57bfcaae3992 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -1258,9 +1258,9 @@ public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionE assertTrue(write1.isDone()); assertTrue(write2.isDone()); assertTrue(write3.isDone()); - assertFutureThrows(write1, NotCoordinatorException.class); - assertFutureThrows(write2, NotCoordinatorException.class); - assertFutureThrows(write3, NotCoordinatorException.class); + assertFutureThrows(NotCoordinatorException.class, write1); + assertFutureThrows(NotCoordinatorException.class, write2); + assertFutureThrows(NotCoordinatorException.class, write3); // Verify that onUnloaded is called. verify(coordinator, times(1)).onUnloaded(); @@ -1415,7 +1415,7 @@ public void testScheduleWriteOpWhenInactive() { // does not exist. CompletableFuture write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult<>(Collections.emptyList(), "response1")); - assertFutureThrows(write, NotCoordinatorException.class); + assertFutureThrows(NotCoordinatorException.class, write); } @Test @@ -1444,7 +1444,7 @@ public void testScheduleWriteOpWhenOpFails() { CompletableFuture write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, state -> { throw new KafkaException("error"); }); - assertFutureThrows(write, KafkaException.class); + assertFutureThrows(KafkaException.class, write); } @Test @@ -1497,7 +1497,7 @@ public void replay( // Write. It should fail. CompletableFuture write = runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1")); - assertFutureThrows(write, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, write); // Verify that the state has not changed. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); @@ -1549,7 +1549,7 @@ public void testScheduleWriteOpWhenWriteFails() { // accept 1 write. CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, state -> new CoordinatorResult<>(List.of("record3", "record4", "record5"), "response2")); - assertFutureThrows(write2, KafkaException.class); + assertFutureThrows(KafkaException.class, write2); // Verify that the state has not changed. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); @@ -1594,7 +1594,7 @@ public void testScheduleWriteOpWhenWriteTimesOut() throws InterruptedException { timer.advanceClock(4); - assertFutureThrows(timedOutWrite, org.apache.kafka.common.errors.TimeoutException.class); + assertFutureThrows(org.apache.kafka.common.errors.TimeoutException.class, timedOutWrite); } @Test @@ -1811,7 +1811,7 @@ public CoordinatorShardBuilder get() { ); // Verify that the future is failed with the expected exception. - assertFutureThrows(future, NotEnoughReplicasException.class); + assertFutureThrows(NotEnoughReplicasException.class, future); // Verify that the writer is not called. verify(writer, times(0)).append( @@ -1983,7 +1983,7 @@ public void testScheduleTransactionCompletionWhenWriteTimesOut() throws Interrup // Advance clock to timeout Complete #1. timer.advanceClock(4); - assertFutureThrows(timedOutCompletion, org.apache.kafka.common.errors.TimeoutException.class); + assertFutureThrows(org.apache.kafka.common.errors.TimeoutException.class, timedOutCompletion); // Verify that the state is still the same. We don't revert when the // operation timeouts because the record has been written to the log. @@ -2051,7 +2051,7 @@ public void testScheduleTransactionCompletionWhenWriteFails() { TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT ); - assertFutureThrows(complete1, KafkaException.class); + assertFutureThrows(KafkaException.class, complete1); // Verify that the state has not changed. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); @@ -2140,7 +2140,7 @@ public void replayEndTransactionMarker( TransactionResult.COMMIT, DEFAULT_WRITE_TIMEOUT ); - assertFutureThrows(complete1, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, complete1); // Verify that the state has not changed. assertEquals(2L, ctx.coordinator.lastWrittenOffset()); @@ -2235,7 +2235,7 @@ public void testScheduleReadOpWhenPartitionInactive() { // Schedule a read. It fails because the coordinator does not exist. CompletableFuture read = runtime.scheduleReadOperation("read", TP, (state, offset) -> "read-response"); - assertFutureThrows(read, NotCoordinatorException.class); + assertFutureThrows(NotCoordinatorException.class, read); } @Test @@ -2282,7 +2282,7 @@ public void testScheduleReadOpWhenOpsFails() { assertEquals(ctx.coordinator.lastCommittedOffset(), offset); throw new IllegalArgumentException("error"); }); - assertFutureThrows(read, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, read); } @Test @@ -2393,8 +2393,8 @@ public void testClose() throws Exception { runtime.close(); // All the pending operations are completed with NotCoordinatorException. - assertFutureThrows(write1, NotCoordinatorException.class); - assertFutureThrows(write2, NotCoordinatorException.class); + assertFutureThrows(NotCoordinatorException.class, write1); + assertFutureThrows(NotCoordinatorException.class, write2); // Verify that the loader was closed. verify(loader).close(); @@ -3690,7 +3690,7 @@ public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() { state -> new CoordinatorResult<>(records, "response1") ); - assertFutureThrows(write, RecordTooLargeException.class); + assertFutureThrows(RecordTooLargeException.class, write); } @Test @@ -3766,11 +3766,11 @@ public void testScheduleWriteOperationWithBatchingWhenWriteFails() { state -> new CoordinatorResult<>(records.subList(3, 4), "response4")); // Verify the futures. - assertFutureThrows(write1, KafkaException.class); - assertFutureThrows(write2, KafkaException.class); - assertFutureThrows(write3, KafkaException.class); + assertFutureThrows(KafkaException.class, write1); + assertFutureThrows(KafkaException.class, write2); + assertFutureThrows(KafkaException.class, write3); // Write #4 is also expected to fail. - assertFutureThrows(write4, KafkaException.class); + assertFutureThrows(KafkaException.class, write4); // Verify the state. The state should be reverted to the initial state. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); @@ -3867,8 +3867,8 @@ public void replay( state -> new CoordinatorResult<>(records.subList(1, 2), "response2")); // Verify the futures. - assertFutureThrows(write1, IllegalArgumentException.class); - assertFutureThrows(write2, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, write1); + assertFutureThrows(IllegalArgumentException.class, write2); // Verify the state. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); @@ -4086,11 +4086,11 @@ public long append( state -> new CoordinatorResult<>(records.subList(3, 4), "response4")); // Verify the futures. - assertFutureThrows(write1, NotCoordinatorException.class); - assertFutureThrows(write2, NotCoordinatorException.class); - assertFutureThrows(write3, NotCoordinatorException.class); + assertFutureThrows(NotCoordinatorException.class, write1); + assertFutureThrows(NotCoordinatorException.class, write2); + assertFutureThrows(NotCoordinatorException.class, write3); // Write #4 is also expected to fail. - assertFutureThrows(write4, NotCoordinatorException.class); + assertFutureThrows(NotCoordinatorException.class, write4); // Verify that the state machine was loaded twice. verify(loader, times(2)).load(eq(TP), any()); @@ -4234,7 +4234,7 @@ public void testScheduleNonAtomicWriteOperation() throws ExecutionException, Int state -> new CoordinatorResult<>(records, "write#1") ); - assertFutureThrows(write1, RecordTooLargeException.class); + assertFutureThrows(RecordTooLargeException.class, write1); // Let's try to write the same records non-atomically. CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), @@ -4370,7 +4370,7 @@ public void testScheduleNonAtomicWriteOperationWithRecordTooLarge() throws Inter timer.advanceClock(11); // The write should have failed... - assertFutureThrows(write2, RecordTooLargeException.class); + assertFutureThrows(RecordTooLargeException.class, write2); // ... but write#1 should be left intact. assertFalse(write1.isDone()); @@ -4462,11 +4462,11 @@ public void testScheduleNonAtomicWriteOperationWhenWriteFails() { state -> new CoordinatorResult<>(records.subList(3, 4), "response4", null, true, false)); // Verify the futures. - assertFutureThrows(write1, KafkaException.class); - assertFutureThrows(write2, KafkaException.class); - assertFutureThrows(write3, KafkaException.class); + assertFutureThrows(KafkaException.class, write1); + assertFutureThrows(KafkaException.class, write2); + assertFutureThrows(KafkaException.class, write3); // Write #4 is also expected to fail. - assertFutureThrows(write4, KafkaException.class); + assertFutureThrows(KafkaException.class, write4); // Verify the state. The state should be reverted to the initial state. assertEquals(0L, ctx.coordinator.lastWrittenOffset()); @@ -4511,7 +4511,7 @@ public void testEmptyBatch() throws Exception { state -> new CoordinatorResult<>(List.of("1"), "response1")); // Write #1 should fail and leave an empty batch. - assertFutureThrows(write1, BufferOverflowException.class); + assertFutureThrows(BufferOverflowException.class, write1); assertNotNull(ctx.currentBatch); // Write #2, with no records. diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index f4a04b3bb42b3..9e0b45c3145e0 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -649,7 +649,7 @@ public void testMaybeInitializeWithErrorPartitionResponse() { CompletableFuture result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, CoordinatorNotAvailableException.class); + assertFutureThrows(CoordinatorNotAvailableException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); // Mock COORDINATOR_NOT_AVAILABLE error. @@ -663,7 +663,7 @@ public void testMaybeInitializeWithErrorPartitionResponse() { result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, CoordinatorNotAvailableException.class); + assertFutureThrows(CoordinatorNotAvailableException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); // Mock COORDINATOR_LOAD_IN_PROGRESS error. @@ -677,7 +677,7 @@ public void testMaybeInitializeWithErrorPartitionResponse() { result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, CoordinatorNotAvailableException.class); + assertFutureThrows(CoordinatorNotAvailableException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); // Mock GROUP_ID_NOT_FOUND error. @@ -691,7 +691,7 @@ public void testMaybeInitializeWithErrorPartitionResponse() { result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, GroupIdNotFoundException.class); + assertFutureThrows(GroupIdNotFoundException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); // Mock UNKNOWN_TOPIC_OR_PARTITION error. @@ -705,7 +705,7 @@ public void testMaybeInitializeWithErrorPartitionResponse() { result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, UnknownTopicOrPartitionException.class); + assertFutureThrows(UnknownTopicOrPartitionException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); // Mock FENCED_STATE_EPOCH error. @@ -719,7 +719,7 @@ public void testMaybeInitializeWithErrorPartitionResponse() { result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, FencedStateEpochException.class); + assertFutureThrows(FencedStateEpochException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); // Mock FENCED_LEADER_EPOCH error. @@ -733,7 +733,7 @@ public void testMaybeInitializeWithErrorPartitionResponse() { result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, NotLeaderOrFollowerException.class); + assertFutureThrows(NotLeaderOrFollowerException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); // Mock UNKNOWN_SERVER_ERROR error. @@ -747,7 +747,7 @@ public void testMaybeInitializeWithErrorPartitionResponse() { result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, UnknownServerException.class); + assertFutureThrows(UnknownServerException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); } @@ -767,7 +767,7 @@ public void testMaybeInitializeWithInvalidStartOffsetStateBatches() { CompletableFuture result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); } @@ -787,7 +787,7 @@ public void testMaybeInitializeWithInvalidTopicIdResponse() { CompletableFuture result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); } @@ -807,7 +807,7 @@ public void testMaybeInitializeWithInvalidPartitionResponse() { CompletableFuture result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); } @@ -841,7 +841,7 @@ public void testMaybeInitializeWithNullResponse() { CompletableFuture result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); } @@ -856,7 +856,7 @@ public void testMaybeInitializeWithNullTopicsData() { CompletableFuture result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); } @@ -871,7 +871,7 @@ public void testMaybeInitializeWithEmptyTopicsData() { CompletableFuture result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); } @@ -885,7 +885,7 @@ public void testMaybeInitializeWithReadException() { CompletableFuture result = sharePartition1.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, RuntimeException.class); + assertFutureThrows(RuntimeException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition1.partitionState()); persister = Mockito.mock(Persister.class); @@ -1781,7 +1781,7 @@ public void testAcknowledgeOutOfRangeCachedData() { MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(0, 15, Collections.singletonList((byte) 3)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRecordStateException.class); + assertFutureThrows(InvalidRecordStateException.class, ackResult); MemoryRecords records = memoryRecords(5, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -1801,7 +1801,7 @@ public void testAcknowledgeOutOfRangeCachedData() { MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(20, 25, Collections.singletonList((byte) 3)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRequestException.class); + assertFutureThrows(InvalidRequestException.class, ackResult); } @Test @@ -1839,7 +1839,7 @@ public void testAcknowledgeOutOfRangeCachedDataFirstBatch() { CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, acknowledgeBatches); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRequestException.class); + assertFutureThrows(InvalidRequestException.class, ackResult); // Create data for the batch with offsets 5-10. records = memoryRecords(6, 5); @@ -1881,7 +1881,7 @@ public void testAcknowledgeWithAnotherMember() { "member-2", Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 3)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRecordStateException.class); + assertFutureThrows(InvalidRecordStateException.class, ackResult); } @Test @@ -1912,7 +1912,7 @@ public void testAcknowledgeWhenOffsetNotAcquired() { MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 1)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRecordStateException.class); + assertFutureThrows(InvalidRecordStateException.class, ackResult); // Re-acquire the same batch and then acknowledge subset with ACCEPT type. acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -1936,7 +1936,7 @@ public void testAcknowledgeWhenOffsetNotAcquired() { MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(6, 8, Collections.singletonList((byte) 3)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRecordStateException.class); + assertFutureThrows(InvalidRecordStateException.class, ackResult); } @Test @@ -1986,7 +1986,7 @@ public void testAcknowledgeRollbackWithFullBatchError() { // Add another batch which should fail the request. new ShareAcknowledgementBatch(15, 19, Collections.singletonList((byte) 1)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRecordStateException.class); + assertFutureThrows(InvalidRecordStateException.class, ackResult); // Check the state of the cache. The state should be acquired itself. assertEquals(3, sharePartition.cachedState().size()); @@ -2042,7 +2042,7 @@ public void testAcknowledgeRollbackWithSubsetError() { // Add another batch which should fail the request. new ShareAcknowledgementBatch(16, 19, Collections.singletonList((byte) 1)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRecordStateException.class); + assertFutureThrows(InvalidRecordStateException.class, ackResult); // Check the state of the cache. The state should be acquired itself. assertEquals(3, sharePartition.cachedState().size()); @@ -3680,7 +3680,7 @@ public void testAcknowledgeAfterAcquisitionLockTimeout() throws InterruptedExcep CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 1)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRecordStateException.class); + assertFutureThrows(InvalidRecordStateException.class, ackResult); assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); assertEquals(0, sharePartition.timer().size()); @@ -3688,7 +3688,7 @@ public void testAcknowledgeAfterAcquisitionLockTimeout() throws InterruptedExcep ackResult = sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 3)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRecordStateException.class); + assertFutureThrows(InvalidRecordStateException.class, ackResult); assertNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); assertEquals(0, sharePartition.timer().size()); } @@ -4368,7 +4368,7 @@ public void testReleaseBatchWithWriteShareGroupStateFailure() { CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); assertTrue(releaseResult.isCompletedExceptionally()); - assertFutureThrows(releaseResult, GroupIdNotFoundException.class); + assertFutureThrows(GroupIdNotFoundException.class, releaseResult); // Due to failure in writeShareGroupState, the cached state should not be updated. assertEquals(1, sharePartition.cachedState().size()); @@ -4407,7 +4407,7 @@ public void testReleaseOffsetWithWriteShareGroupStateFailure() { CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); assertTrue(releaseResult.isCompletedExceptionally()); - assertFutureThrows(releaseResult, GroupIdNotFoundException.class); + assertFutureThrows(GroupIdNotFoundException.class, releaseResult); // Due to failure in writeShareGroupState, the cached state should not be updated. assertEquals(1, sharePartition.cachedState().size()); @@ -5682,7 +5682,7 @@ public void testWriteShareGroupStateWithNullResponse() { Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(null)); CompletableFuture result = sharePartition.writeShareGroupState(Collections.emptyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, result); } @Test @@ -5696,7 +5696,7 @@ public void testWriteShareGroupStateWithNullTopicsData() { Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); CompletableFuture result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, result); } @Test @@ -5711,7 +5711,7 @@ public void testWriteShareGroupStateWithInvalidTopicsData() { Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); CompletableFuture writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); - assertFutureThrows(writeResult, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, writeResult); // TopicsData contains more results than expected. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Arrays.asList( @@ -5720,7 +5720,7 @@ public void testWriteShareGroupStateWithInvalidTopicsData() { Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); - assertFutureThrows(writeResult, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, writeResult); // TopicsData contains no partition data. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -5728,7 +5728,7 @@ public void testWriteShareGroupStateWithInvalidTopicsData() { Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); - assertFutureThrows(writeResult, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, writeResult); // TopicsData contains wrong topicId. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -5737,7 +5737,7 @@ public void testWriteShareGroupStateWithInvalidTopicsData() { Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); - assertFutureThrows(writeResult, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, writeResult); // TopicsData contains more partition data than expected. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -5747,7 +5747,7 @@ public void testWriteShareGroupStateWithInvalidTopicsData() { Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); - assertFutureThrows(writeResult, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, writeResult); // TopicsData contains wrong partition. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -5756,7 +5756,7 @@ public void testWriteShareGroupStateWithInvalidTopicsData() { Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); writeResult = sharePartition.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); - assertFutureThrows(writeResult, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, writeResult); } @Test @@ -5768,7 +5768,7 @@ public void testWriteShareGroupStateWithWriteException() { Mockito.when(persister.writeState(Mockito.any())).thenReturn(FutureUtils.failedFuture(new RuntimeException("Write exception"))); CompletableFuture writeResult = sharePartition1.writeShareGroupState(anyList()); assertTrue(writeResult.isCompletedExceptionally()); - assertFutureThrows(writeResult, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, writeResult); persister = Mockito.mock(Persister.class); // Throw exception for write state. @@ -5813,7 +5813,7 @@ public void testWriteShareGroupStateFailure() { CompletableFuture result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, CoordinatorNotAvailableException.class); + assertFutureThrows(CoordinatorNotAvailableException.class, result); // Mock Write state RPC to return error response, COORDINATOR_NOT_AVAILABLE. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -5823,7 +5823,7 @@ public void testWriteShareGroupStateFailure() { result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, CoordinatorNotAvailableException.class); + assertFutureThrows(CoordinatorNotAvailableException.class, result); // Mock Write state RPC to return error response, COORDINATOR_LOAD_IN_PROGRESS. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -5833,7 +5833,7 @@ public void testWriteShareGroupStateFailure() { result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, CoordinatorNotAvailableException.class); + assertFutureThrows(CoordinatorNotAvailableException.class, result); // Mock Write state RPC to return error response, GROUP_ID_NOT_FOUND. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -5843,7 +5843,7 @@ public void testWriteShareGroupStateFailure() { result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, GroupIdNotFoundException.class); + assertFutureThrows(GroupIdNotFoundException.class, result); // Mock Write state RPC to return error response, UNKNOWN_TOPIC_OR_PARTITION. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -5853,7 +5853,7 @@ public void testWriteShareGroupStateFailure() { result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, UnknownTopicOrPartitionException.class); + assertFutureThrows(UnknownTopicOrPartitionException.class, result); // Mock Write state RPC to return error response, FENCED_STATE_EPOCH. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -5863,7 +5863,7 @@ public void testWriteShareGroupStateFailure() { result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, FencedStateEpochException.class); + assertFutureThrows(FencedStateEpochException.class, result); // Mock Write state RPC to return error response, FENCED_LEADER_EPOCH. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -5873,7 +5873,7 @@ public void testWriteShareGroupStateFailure() { result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, NotLeaderOrFollowerException.class); + assertFutureThrows(NotLeaderOrFollowerException.class, result); // Mock Write state RPC to return error response, UNKNOWN_SERVER_ERROR. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( @@ -5883,7 +5883,7 @@ public void testWriteShareGroupStateFailure() { result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, UnknownServerException.class); + assertFutureThrows(UnknownServerException.class, result); } @Test @@ -6361,7 +6361,7 @@ public void testAcknowledgeBatchWithWriteShareGroupStateFailure() { CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new ShareAcknowledgementBatch(5, 14, Collections.singletonList((byte) 1)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, UnknownTopicOrPartitionException.class); + assertFutureThrows(UnknownTopicOrPartitionException.class, ackResult); // Due to failure in writeShareGroupState, the cached state should not be updated. assertEquals(1, sharePartition.cachedState().size()); @@ -6426,7 +6426,7 @@ public void testAcknowledgeSubsetWithAnotherMember() { CompletableFuture ackResult = sharePartition.acknowledge("member-2", Collections.singletonList(new ShareAcknowledgementBatch(9, 11, Collections.singletonList((byte) 1)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRecordStateException.class); + assertFutureThrows(InvalidRecordStateException.class, ackResult); } @Test @@ -6452,7 +6452,7 @@ public void testAcknowledgeWithAnotherMemberRollbackBatchError() { new ShareAcknowledgementBatch(15, 19, Collections.singletonList((byte) 1)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRecordStateException.class); + assertFutureThrows(InvalidRecordStateException.class, ackResult); // State should be rolled back to the previous state for any changes. assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState()); @@ -6488,7 +6488,7 @@ public void testAcknowledgeWithAnotherMemberRollbackSubsetError() { // Acknowledging subset with another member will cause failure and rollback. new ShareAcknowledgementBatch(16, 18, Collections.singletonList((byte) 1)))); assertTrue(ackResult.isCompletedExceptionally()); - assertFutureThrows(ackResult, InvalidRecordStateException.class); + assertFutureThrows(InvalidRecordStateException.class, ackResult); assertEquals(3, sharePartition.cachedState().size()); // Check the state of the cache. State should be rolled back to the previous state for any changes. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 04181258f92b7..ece65fe0b110d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -1915,7 +1915,7 @@ public void testCompleteTransactionWhenNotCoordinatorServiceStarted() { Duration.ofMillis(100) ); - assertFutureThrows(future, CoordinatorNotAvailableException.class); + assertFutureThrows(CoordinatorNotAvailableException.class, future); } @Test @@ -1935,7 +1935,7 @@ public void testCompleteTransactionWithUnexpectedPartition() { Duration.ofMillis(100) ); - assertFutureThrows(future, IllegalStateException.class); + assertFutureThrows(IllegalStateException.class, future); } @Test @@ -2316,7 +2316,7 @@ public void testDescribeShareGroupOffsetsWithDefaultPersisterThrowsError() { CompletableFuture future = service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); - assertFutureThrows(future, Exception.class, "Unable to validate read state summary request"); + assertFutureThrows(Exception.class, future, "Unable to validate read state summary request"); } @Test @@ -2343,7 +2343,7 @@ public void testDescribeShareGroupOffsetsWithDefaultPersisterNullResult() { CompletableFuture future = service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); - assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary"); + assertFutureThrows(IllegalStateException.class, future, "Result is null for the read state summary"); } @Test @@ -2373,7 +2373,7 @@ public void testDescribeShareGroupOffsetsWithDefaultPersisterNullTopicData() { CompletableFuture future = service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); - assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary"); + assertFutureThrows(IllegalStateException.class, future, "Result is null for the read state summary"); } @Test diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 79c255efbc8e0..fdd20c9cd39f1 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -3666,7 +3666,7 @@ public void testLeaderGracefulShutdownTimeout(boolean withKip853Rpc) throws Exce context.client.poll(); assertFalse(context.client.isRunning()); assertTrue(shutdownFuture.isCompletedExceptionally()); - assertFutureThrows(shutdownFuture, TimeoutException.class); + assertFutureThrows(TimeoutException.class, shutdownFuture); } @ParameterizedTest diff --git a/raft/src/test/java/org/apache/kafka/raft/MockExpirationServiceTest.java b/raft/src/test/java/org/apache/kafka/raft/MockExpirationServiceTest.java index 57fb701304743..887fc40868e3c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockExpirationServiceTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockExpirationServiceTest.java @@ -39,18 +39,18 @@ public void testFailAfter() { CompletableFuture future4 = expirationService.failAfter(50); time.sleep(25); - TestUtils.assertFutureThrows(future2, TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, future2); assertFalse(future1.isDone()); assertFalse(future3.isDone()); assertFalse(future4.isDone()); time.sleep(25); - TestUtils.assertFutureThrows(future1, TimeoutException.class); - TestUtils.assertFutureThrows(future4, TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, future1); + TestUtils.assertFutureThrows(TimeoutException.class, future4); assertFalse(future3.isDone()); time.sleep(25); - TestUtils.assertFutureThrows(future3, TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, future3); } } \ No newline at end of file diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/ThresholdPurgatoryTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/ThresholdPurgatoryTest.java index cb745239bca39..6860e8181c232 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/ThresholdPurgatoryTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/ThresholdPurgatoryTest.java @@ -88,7 +88,7 @@ public void testExpiration() { assertEquals(5, purgatory.numWaiting()); time.sleep(50); - assertFutureThrows(future3, TimeoutException.class); + assertFutureThrows(TimeoutException.class, future3); assertFalse(future1.isDone()); assertFalse(future2.isDone()); assertFalse(future4.isDone()); @@ -96,9 +96,9 @@ public void testExpiration() { assertEquals(4, purgatory.numWaiting()); time.sleep(50); - assertFutureThrows(future1, TimeoutException.class); - assertFutureThrows(future2, TimeoutException.class); - assertFutureThrows(future5, TimeoutException.class); + assertFutureThrows(TimeoutException.class, future1); + assertFutureThrows(TimeoutException.class, future2); + assertFutureThrows(TimeoutException.class, future5); assertFalse(future4.isDone()); assertEquals(1, purgatory.numWaiting()); @@ -107,7 +107,7 @@ public void testExpiration() { assertEquals(1, purgatory.numWaiting()); time.sleep(50); - assertFutureThrows(future4, TimeoutException.class); + assertFutureThrows(TimeoutException.class, future4); assertEquals(0, purgatory.numWaiting()); } @@ -134,9 +134,9 @@ public void testCompleteAllExceptionally() { assertEquals(3, purgatory.numWaiting()); purgatory.completeAllExceptionally(new NotLeaderOrFollowerException()); - assertFutureThrows(future1, NotLeaderOrFollowerException.class); - assertFutureThrows(future2, NotLeaderOrFollowerException.class); - assertFutureThrows(future3, NotLeaderOrFollowerException.class); + assertFutureThrows(NotLeaderOrFollowerException.class, future1); + assertFutureThrows(NotLeaderOrFollowerException.class, future2); + assertFutureThrows(NotLeaderOrFollowerException.class, future3); assertEquals(0, purgatory.numWaiting()); } diff --git a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java index 50ffcf90f812d..708de117ad0f6 100644 --- a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java @@ -142,14 +142,14 @@ public void testWriteStateValidate() { CompletableFuture result = defaultStatePersister.writeState(null); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // groupTopicPartitionData is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // groupId is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -158,7 +158,7 @@ public void testWriteStateValidate() { .setGroupId(null).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // topicsData is empty defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -168,7 +168,7 @@ public void testWriteStateValidate() { .setTopicsData(Collections.emptyList()).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // topicId is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -180,7 +180,7 @@ public void testWriteStateValidate() { partition, 1, 0, 0, null))))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // partitionData is empty defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -190,7 +190,7 @@ public void testWriteStateValidate() { .setTopicsData(Collections.singletonList(new TopicData<>(topicId, Collections.emptyList()))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // partition value is incorrect defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -202,7 +202,7 @@ public void testWriteStateValidate() { incorrectPartition, 1, 0, 0, null))))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); } @Test @@ -218,14 +218,14 @@ public void testReadStateValidate() { CompletableFuture result = defaultStatePersister.readState(null); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // groupTopicPartitionData is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // groupId is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -234,7 +234,7 @@ public void testReadStateValidate() { .setGroupId(null).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // topicsData is empty defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -244,7 +244,7 @@ public void testReadStateValidate() { .setTopicsData(Collections.emptyList()).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // topicId is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -256,7 +256,7 @@ public void testReadStateValidate() { ).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // partitionData is empty defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -266,7 +266,7 @@ public void testReadStateValidate() { .setTopicsData(Collections.singletonList(new TopicData<>(topicId, Collections.emptyList()))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // partition value is incorrect defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -277,7 +277,7 @@ public void testReadStateValidate() { Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(incorrectPartition, 1))))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); } @Test @@ -293,14 +293,14 @@ public void testReadStateSummaryValidate() { CompletableFuture result = defaultStatePersister.readSummary(null); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // groupTopicPartitionData is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); result = defaultStatePersister.readSummary(new ReadShareGroupStateSummaryParameters.Builder().setGroupTopicPartitionData(null).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // groupId is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -309,7 +309,7 @@ public void testReadStateSummaryValidate() { .setGroupId(null).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // topicsData is empty defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -319,7 +319,7 @@ public void testReadStateSummaryValidate() { .setTopicsData(Collections.emptyList()).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // topicId is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -331,7 +331,7 @@ public void testReadStateSummaryValidate() { ).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // partitionData is empty defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -341,7 +341,7 @@ public void testReadStateSummaryValidate() { .setTopicsData(List.of(new TopicData<>(topicId, Collections.emptyList()))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // partition value is incorrect defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -352,7 +352,7 @@ public void testReadStateSummaryValidate() { List.of(PartitionFactory.newPartitionIdLeaderEpochData(incorrectPartition, 1))))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); } @Test @@ -367,14 +367,14 @@ public void testDeleteStateValidate() { CompletableFuture result = defaultStatePersister.deleteState(null); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // groupTopicPartitionData is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); result = defaultStatePersister.deleteState(new DeleteShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // groupId is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -383,7 +383,7 @@ public void testDeleteStateValidate() { .setGroupId(null).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // topicsData is empty defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -393,7 +393,7 @@ public void testDeleteStateValidate() { .setTopicsData(List.of()).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // topicId is null defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -405,7 +405,7 @@ public void testDeleteStateValidate() { partition, 1, 0, 0, null))))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // partitionData is empty defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -415,7 +415,7 @@ public void testDeleteStateValidate() { .setTopicsData(List.of(new TopicData<>(topicId, List.of()))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); // partition value is incorrect defaultStatePersister = DefaultStatePersisterBuilder.builder().build(); @@ -427,7 +427,7 @@ public void testDeleteStateValidate() { incorrectPartition))))).build()).build()); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(result, IllegalArgumentException.class); + assertFutureThrows(IllegalArgumentException.class, result); } @Test diff --git a/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java b/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java index a87368e378dce..a340bc5229bd7 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java @@ -53,9 +53,9 @@ public void testReaper() throws Exception { CompletableFuture t1 = add(timer, 100L); CompletableFuture t2 = add(timer, 200L); CompletableFuture t3 = add(timer, 300L); - TestUtils.assertFutureThrows(t1, TimeoutException.class); - TestUtils.assertFutureThrows(t2, TimeoutException.class); - TestUtils.assertFutureThrows(t3, TimeoutException.class); + TestUtils.assertFutureThrows(TimeoutException.class, t1); + TestUtils.assertFutureThrows(TimeoutException.class, t2); + TestUtils.assertFutureThrows(TimeoutException.class, t3); } }