Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmy committed Feb 7, 2025
1 parent e464b1a commit 093c838
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.common.Node;
Expand Down Expand Up @@ -48,7 +65,7 @@ AlterShareGroupOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<Coor
});

AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection requestTopics = new AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection();
for(Map.Entry<String, Map<TopicPartition, Long>> topicEntry : offsetsByTopic.entrySet()) {
for (Map.Entry<String, Map<TopicPartition, Long>> topicEntry : offsetsByTopic.entrySet()) {
String topic = topicEntry.getKey();
Map<TopicPartition, Long> offsetsByPartition = topicEntry.getValue();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;

import java.util.HashMap;
Expand Down Expand Up @@ -33,7 +49,7 @@ public int throttleTimeMs() {

@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
data.setThrottleTimeMs(throttleTimeMs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirPartitionResult;
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
Expand Down Expand Up @@ -168,6 +169,7 @@
import org.apache.kafka.common.requests.AlterClientQuotasResponse;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.kafka.common.requests.AlterShareGroupOffsetsResponse;
import org.apache.kafka.common.requests.AlterUserScramCredentialsResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ApiVersionsRequest;
Expand Down Expand Up @@ -9296,4 +9298,35 @@ public void testListShareGroupOffsetsWithErrorInOnePartition() throws Exception
assertEquals(500, partitionToOffsetAndMetadata.get(myTopicPartition3));
}
}

@Test
public void testAlterShareGroupOffsets() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData().setResponses(
List.of(
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("foo").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0), new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(1))),
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName("bar").setPartitions(List.of(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(0)))
)
);
env.kafkaClient().prepareResponse(new AlterShareGroupOffsetsResponse(data));

TopicPartition fooTopicPartition0 = new TopicPartition("foo", 0);
TopicPartition fooTopicPartition1 = new TopicPartition("foo", 1);
TopicPartition barPartition0 = new TopicPartition("bar", 0);
TopicPartition zooTopicPartition0 = new TopicPartition("zoo", 0);

final String groupId = "group";
final AlterShareGroupOffsetsResult result = env.adminClient().alterShareGroupOffsets(groupId, Map.of(fooTopicPartition0, 1L, fooTopicPartition1, 2L, barPartition0, 1L));

result.partitionResult(fooTopicPartition0).get();
assertNull(result.all().get());
assertNull(result.partitionResult(fooTopicPartition0).get());
assertNull(result.partitionResult(fooTopicPartition1).get());
assertNull(result.partitionResult(barPartition0).get());
TestUtils.assertFutureThrows(result.partitionResult(zooTopicPartition0), IllegalArgumentException.class);
}
}
}

0 comments on commit 093c838

Please sign in to comment.