Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ListGroups API KIP 848 #1267

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 43 additions & 11 deletions examples/admin_list_consumer_groups/admin_list_consumer_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,53 @@ func main() {

if len(os.Args) < 2 {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> [<state1> <state2> ...]\n", os.Args[0])
"Usage: %s <bootstrap-servers> [-states <state1> <state2> ...] [-types <type1> <type2> ...] \n", os.Args[0])
os.Exit(1)
}

bootstrapServers := os.Args[1]
var states []kafka.ConsumerGroupState
var groupTypes []kafka.ConsumerGroupType

if len(os.Args) > 2 {
statesStr := os.Args[2:]
for _, stateStr := range statesStr {
state, err := kafka.ConsumerGroupStateFromString(stateStr)
if err != nil {
fmt.Fprintf(os.Stderr,
"Given state %s is not a valid state\n", stateStr)
os.Exit(1)
args := os.Args[2:]
isState := false
isType := false
for _, arg := range args {
if arg == "-types" {
if isType {
fmt.Printf("Cannot pass the types flag (-types) more than once.\n")
os.Exit(1)
}
isType = true
} else if arg == "-states" {
if isState {
fmt.Printf("Cannot pass the states flag (-states) more than once.\n")
os.Exit(1)
}
isState = true
} else {
if isState {
state, err := kafka.ConsumerGroupStateFromString(arg)
if err != nil {
fmt.Fprintf(os.Stderr,
"Given state %s is not a valid state\n", arg)
os.Exit(1)
}
states = append(states, state)
} else if isType {
groupType, err := kafka.ConsumerGroupTypeFromString(arg)
if err != nil {
fmt.Fprintf(os.Stderr,
"Given group type %s is not a valid group type\n", arg)
os.Exit(1)
}
groupTypes = append(groupTypes, groupType)
} else {
fmt.Fprintf(os.Stderr,
"Usage: %s <bootstrap-servers> [-states <state1> <state2> ...] [-types <type1> <type2> ...] \n", os.Args[0])
os.Exit(1)
}
}
states = append(states, state)
}
}

Expand All @@ -61,7 +92,7 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
listGroupRes, err := a.ListConsumerGroups(
ctx, kafka.SetAdminMatchConsumerGroupStates(states))
ctx, kafka.SetAdminMatchConsumerGroupStates(states), kafka.SetAdminMatchConsumerGroupTypes(groupTypes))

if err != nil {
fmt.Printf("Failed to list groups with client-level error %s\n", err)
Expand All @@ -74,6 +105,7 @@ func main() {
for _, group := range groups {
fmt.Printf("GroupId: %s\n", group.GroupID)
fmt.Printf("State: %s\n", group.State)
fmt.Printf("Group Type: %s\n", group.GroupType)
fmt.Printf("IsSimpleConsumerGroup: %v\n", group.IsSimpleConsumerGroup)
fmt.Println()
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/confluentinc/confluent-kafka-go/v2
go 1.21

toolchain go1.21.0

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.10.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1
Expand Down
35 changes: 32 additions & 3 deletions kafka/adminapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,41 @@ func (t ConsumerGroupState) String() string {
}

// ConsumerGroupStateFromString translates a consumer group state name/string to
// a ConsumerGroupStateFromString value.
// a ConsumerGroupState value.
func ConsumerGroupStateFromString(stateString string) (ConsumerGroupState, error) {
cStr := C.CString(stateString)
defer C.free(unsafe.Pointer(cStr))
state := ConsumerGroupState(C.rd_kafka_consumer_group_state_code(cStr))
return state, nil
}

// ConsumerGroupType represents a consumer group type
type ConsumerGroupType int

const (
// ConsumerGroupTypeUnknown - Unknown ConsumerGroupType
ConsumerGroupTypeUnknown ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN
// ConsumerGroupTypeConsumer - Consumer ConsumerGroupType
ConsumerGroupTypeConsumer ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER
// ConsumerGroupTypeClassic - Classic ConsumerGroupType
ConsumerGroupTypeClassic ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC
)

// String returns the human-readable representation of a consumer_group_type
func (t ConsumerGroupType) String() string {
return C.GoString(C.rd_kafka_consumer_group_type_name(
C.rd_kafka_consumer_group_type_t(t)))
}

// ConsumerGroupTypeFromString translates a consumer group type name/string to
// a ConsumerGroupType value.
func ConsumerGroupTypeFromString(typeString string) (ConsumerGroupType, error) {
cStr := C.CString(typeString)
defer C.free(unsafe.Pointer(cStr))
groupType := ConsumerGroupType(C.rd_kafka_consumer_group_type_code(cStr))
return groupType, nil
}

// ConsumerGroupListing represents the result of ListConsumerGroups for a single
// group.
type ConsumerGroupListing struct {
Expand All @@ -248,6 +275,8 @@ type ConsumerGroupListing struct {
IsSimpleConsumerGroup bool
// Group state.
State ConsumerGroupState
// Group type.
GroupType ConsumerGroupType
}

// ListConsumerGroupsResult represents ListConsumerGroups results and errors.
Expand Down Expand Up @@ -1476,16 +1505,16 @@ func (a *AdminClient) cToConsumerGroupListings(
C.ConsumerGroupListing_by_idx(cGroups, cGroupCount, C.size_t(idx))
state := ConsumerGroupState(
C.rd_kafka_ConsumerGroupListing_state(cGroup))

groupType := ConsumerGroupType(C.rd_kafka_ConsumerGroupListing_type(cGroup))
result[idx] = ConsumerGroupListing{
GroupID: C.GoString(
C.rd_kafka_ConsumerGroupListing_group_id(cGroup)),
State: state,
IsSimpleConsumerGroup: cint2bool(
C.rd_kafka_ConsumerGroupListing_is_simple_consumer_group(cGroup)),
GroupType: groupType,
}
}

return result
}

Expand Down
38 changes: 36 additions & 2 deletions kafka/adminapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,15 @@ func testAdminAPIsDeleteACLs(what string, a *AdminClient, t *testing.T) {

func testAdminAPIsListConsumerGroups(
what string, a *AdminClient, expDuration time.Duration, t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), expDuration)
defer cancel()

state, err := ConsumerGroupStateFromString("Stable")
if err != nil || state != ConsumerGroupStateStable {
t.Fatalf("Expected ConsumerGroupStateFromString to work for Stable state")
}

ctx, cancel := context.WithTimeout(context.Background(), expDuration)
defer cancel()
listres, err := a.ListConsumerGroups(
ctx, SetAdminRequestTimeout(time.Second),
SetAdminMatchConsumerGroupStates([]ConsumerGroupState{state}))
Expand All @@ -456,6 +458,38 @@ func testAdminAPIsListConsumerGroups(
if ctx.Err() != context.DeadlineExceeded {
t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err())
}

unknownGroupType, err := ConsumerGroupTypeFromString("Unknown")
if err != nil || unknownGroupType != ConsumerGroupTypeUnknown {
t.Fatalf("Expected ConsumerGroupTypeFromString to work for Unknown type")
}

listres, err := a.ListConsumerGroups(
milindl marked this conversation as resolved.
Show resolved Hide resolved
ctx, SetAdminRequestTimeout(time.Second),
SetAdminMatchConsumerGroupTypes([]ConsumerGroupType{unknownGroupType}))
if err == nil {
t.Fatalf("Expected ListConsumerGroups to fail, but got result: %v, err: %v",
listres, err)
}
if ctx.Err() != context.DeadlineExceeded {
t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err())
}

classicGroupType, err := ConsumerGroupTypeFromString("Classic")
if err != nil || unknownGroupType != ConsumerGroupTypeUnknown {
t.Fatalf("Expected ConsumerGroupTypeFromString to work for Classic type")
}

listres, err := a.ListConsumerGroups(
ctx, SetAdminRequestTimeout(time.Second),
SetAdminMatchConsumerGroupTypes([]ConsumerGroupType{classicGroupType, classicGroupType}))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SetAdminMatchConsumerGroupTypes([]ConsumerGroupType{classicGroupType, classicGroupType}))
SetAdminMatchConsumerGroupTypes([]ConsumerGroupType{classicGroupType, classicGroupType}))

Why is the same group type included twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is to test the INVALID_ARGS error code with duplicate group types.

Copy link
Contributor

@milindl milindl Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But your test is not checking for invalid args error code below, so change that. You're checking for deadline exceeded still.

if err == nil {
t.Fatalf("Expected ListConsumerGroups to fail, but got result: %v, err: %v",
listres, err)
}
if ctx.Err() != context.DeadlineExceeded {
t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err())
}
}

func testAdminAPIsDescribeConsumerGroups(
Expand Down
76 changes: 63 additions & 13 deletions kafka/adminoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,68 @@ func (ao AdminOptionMatchConsumerGroupStates) apply(cOptions *C.rd_kafka_AdminOp
return nil
}

// SetAdminMatchConsumerGroupStates decides groups in which state(s) should be
// listed.
//
// Default: nil (lists groups in all states).
//
// Valid for ListConsumerGroups.
func SetAdminMatchConsumerGroupStates(val []ConsumerGroupState) (ao AdminOptionMatchConsumerGroupStates) {
ao.isSet = true
ao.val = val
return ao
}

// AdminOptionMatchConsumerGroupTypes decides groups in which type(s) should be
// listed.
//
// Default: nil (lists groups in all types).
//
// Valid for ListConsumerGroups.
type AdminOptionMatchConsumerGroupTypes struct {
isSet bool
val []ConsumerGroupType
}

func (ao AdminOptionMatchConsumerGroupTypes) supportsListConsumerGroups() {
}

func (ao AdminOptionMatchConsumerGroupTypes) apply(cOptions *C.rd_kafka_AdminOptions_t) error {
if !ao.isSet || ao.val == nil {
return nil
}

// Convert types from Go slice to C pointer.
cTypes := make([]C.rd_kafka_consumer_group_type_t, len(ao.val))
cTypesCount := C.size_t(len(ao.val))

for idx, groupType := range ao.val {
cTypes[idx] = C.rd_kafka_consumer_group_type_t(groupType)
}

cTypesPtr := ((*C.rd_kafka_consumer_group_type_t)(&cTypes[0]))
cError := C.rd_kafka_AdminOptions_set_match_consumer_group_types(
cOptions, cTypesPtr, cTypesCount)
if cError != nil {
C.rd_kafka_AdminOptions_destroy(cOptions)
return newErrorFromCErrorDestroy(cError)
}

return nil
}

// SetAdminMatchConsumerGroupTypes decides groups in which type(s) should be
// listed.
//
// Default: nil (lists groups in all types).
//
// Valid for ListConsumerGroups.
func SetAdminMatchConsumerGroupTypes(val []ConsumerGroupType) (ao AdminOptionMatchConsumerGroupTypes) {
ao.isSet = true
ao.val = val
return ao
}

// AdminOptionIncludeAuthorizedOperations decides if the broker should return
// authorized operations.
//
Expand Down Expand Up @@ -411,18 +473,6 @@ func SetAdminOptionIncludeAuthorizedOperations(val bool) (ao AdminOptionIncludeA
return ao
}

// SetAdminMatchConsumerGroupStates decides groups in which state(s) should be
// listed.
//
// Default: nil (lists groups in all states).
//
// Valid for ListConsumerGroups.
func SetAdminMatchConsumerGroupStates(val []ConsumerGroupState) (ao AdminOptionMatchConsumerGroupStates) {
ao.isSet = true
ao.val = val
return ao
}

// CreateTopicsAdminOption - see setters.
//
// See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly.
Expand Down Expand Up @@ -489,7 +539,7 @@ type DeleteACLsAdminOption interface {

// ListConsumerGroupsAdminOption - see setter.
//
// See SetAdminRequestTimeout, SetAdminMatchConsumerGroupStates.
// See SetAdminRequestTimeout, SetAdminMatchConsumerGroupStates, SetAdminMatchConsumerGroupTypes.
type ListConsumerGroupsAdminOption interface {
supportsListConsumerGroups()
apply(cOptions *C.rd_kafka_AdminOptions_t) error
Expand Down
2 changes: 1 addition & 1 deletion kafka/build_darwin_amd64.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build !dynamic
milindl marked this conversation as resolved.
Show resolved Hide resolved
// +build !dynamic


// This file was auto-generated by librdkafka_vendor/bundle-import.sh, DO NOT EDIT.

package kafka
Expand Down
2 changes: 1 addition & 1 deletion kafka/build_darwin_arm64.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//go:build !dynamic
// +build !dynamic


// This file was auto-generated by librdkafka_vendor/bundle-import.sh, DO NOT EDIT.

package kafka
Expand Down
4 changes: 2 additions & 2 deletions kafka/build_glibc_linux_amd64.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// +build !dynamic
// +build !musl
//go:build !dynamic && !musl
// +build !dynamic,!musl

// This file was auto-generated by librdkafka_vendor/bundle-import.sh, DO NOT EDIT.

Expand Down
4 changes: 2 additions & 2 deletions kafka/build_glibc_linux_arm64.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// +build !dynamic
// +build !musl
//go:build !dynamic && !musl
// +build !dynamic,!musl

// This file was auto-generated by librdkafka_vendor/bundle-import.sh, DO NOT EDIT.

Expand Down
4 changes: 2 additions & 2 deletions kafka/build_musl_linux_amd64.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// +build !dynamic
// +build musl
//go:build !dynamic && musl
// +build !dynamic,musl

// This file was auto-generated by librdkafka_vendor/bundle-import.sh, DO NOT EDIT.

Expand Down
4 changes: 2 additions & 2 deletions kafka/build_musl_linux_arm64.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// +build !dynamic
// +build musl
//go:build !dynamic && musl
// +build !dynamic,musl

// This file was auto-generated by librdkafka_vendor/bundle-import.sh, DO NOT EDIT.

Expand Down
4 changes: 2 additions & 2 deletions kafka/build_windows.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//go:build !dynamic
// +build !dynamic


// This file was auto-generated by librdkafka_vendor/bundle-import.sh, DO NOT EDIT.

package kafka

// #cgo CFLAGS: -DUSE_VENDORED_LIBRDKAFKA -DLIBRDKAFKA_STATICLIB
// #cgo LDFLAGS: ${SRCDIR}/librdkafka_vendor/librdkafka_windows.a -lws2_32 -lsecur32 -lcrypt32
// #cgo LDFLAGS: ${SRCDIR}/librdkafka_vendor/librdkafka_windows.a -lws2_32 -lsecur32 -lcrypt32
import "C"

// LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client
Expand Down
Loading