Skip to content

Commit

Permalink
Merge pull request #37 from codecrafters-io/F-3320
Browse files Browse the repository at this point in the history
Fix random topic ID collisions
  • Loading branch information
ryan-gang authored Dec 9, 2024
2 parents 586d4dd + 613a9f4 commit a27c6a6
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 164 deletions.
57 changes: 0 additions & 57 deletions protocol/api/api_versions.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,13 @@
package kafkaapi

import (
"fmt"

realdecoder "github.com/codecrafters-io/kafka-tester/protocol/decoder"
realencoder "github.com/codecrafters-io/kafka-tester/protocol/encoder"

"github.com/codecrafters-io/kafka-tester/protocol"
"github.com/codecrafters-io/kafka-tester/protocol/errors"
"github.com/codecrafters-io/tester-utils/logger"
)

func GetAPIVersions(prettyPrint bool) {
broker := protocol.NewBroker("localhost:9092")
if err := broker.Connect(); err != nil {
panic(err)
}
defer broker.Close()

response, err := ApiVersions(broker, &ApiVersionsRequestBody{Version: 4, ClientSoftwareName: "kafka-cli", ClientSoftwareVersion: "0.1"})
if err != nil {
panic(err)
}

if prettyPrint {
PrintAPIVersions(response)
}
}

func EncodeApiVersionsRequest(request *ApiVersionsRequest) []byte {
encoder := realencoder.RealEncoder{}
encoder.Init(make([]byte, 4096))
Expand Down Expand Up @@ -88,40 +68,3 @@ func DecodeApiVersionsHeaderAndResponse(response []byte, version int16, logger *

return &responseHeader, &apiVersionsResponse, nil
}

// ApiVersions returns api version response or error
func ApiVersions(b *protocol.Broker, requestBody *ApiVersionsRequestBody) (*ApiVersionsResponse, error) {
header := RequestHeader{
ApiKey: 18,
ApiVersion: requestBody.Version,
CorrelationId: 0,
ClientId: requestBody.ClientSoftwareName,
}
request := ApiVersionsRequest{
Header: header,
Body: *requestBody,
}
message := EncodeApiVersionsRequest(&request)

response, err := b.SendAndReceive(message)
if err != nil {
return nil, err
}

_, apiVersionsResponse, err := DecodeApiVersionsHeaderAndResponse(response.Payload, requestBody.Version, logger.GetLogger(true, ""))
if err != nil {
return nil, err
}

return apiVersionsResponse, nil
}

func PrintAPIVersions(response *ApiVersionsResponse) {
fmt.Printf("API versions supported by the broker are:\n")
fmt.Println("API Key\tMinVersion\tMaxVersion\t")
apiVersionKeys := response.ApiKeys
// For each API, the broker will return the minimum and maximum supported version
for _, key := range apiVersionKeys {
fmt.Println(key.ApiKey, "\t", key.MinVersion, "\t", key.MaxVersion)
}
}
19 changes: 0 additions & 19 deletions protocol/api/describe_topic_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,6 @@ func EncodeDescribeTopicPartitionsRequest(request *DescribeTopicPartitionsReques
return messageBytes
}

func DecodeDescribeTopicPartitionHeader(response []byte, version int16, logger *logger.Logger) (*ResponseHeader, error) {
decoder := realdecoder.RealDecoder{}
decoder.Init(response)
logger.UpdateSecondaryPrefix("Decoder")
defer logger.ResetSecondaryPrefix()

responseHeader := ResponseHeader{}
logger.Debugf("- .ResponseHeader")
// DescribeTopicPartitions always uses Header v0
if err := responseHeader.DecodeV0(&decoder, logger, 1); err != nil {
if decodingErr, ok := err.(*errors.PacketDecodingError); ok {
return nil, decodingErr.WithAddedContext("Response Header").WithAddedContext("DescribeTopicPartitions v0")
}
return nil, err
}

return &responseHeader, nil
}

// DecodeDescribeTopicPartitionsHeaderAndResponse decodes the header and response
// If an error is encountered while decoding, the returned objects are nil
func DecodeDescribeTopicPartitionsHeaderAndResponse(response []byte, logger *logger.Logger) (*ResponseHeader, *DescribeTopicPartitionsResponse, error) {
Expand Down
79 changes: 0 additions & 79 deletions protocol/api/fetch.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,13 @@
package kafkaapi

import (
"fmt"

realdecoder "github.com/codecrafters-io/kafka-tester/protocol/decoder"
realencoder "github.com/codecrafters-io/kafka-tester/protocol/encoder"

"github.com/codecrafters-io/kafka-tester/protocol"
"github.com/codecrafters-io/kafka-tester/protocol/errors"
"github.com/codecrafters-io/tester-utils/logger"
)

func Fetch() {
broker := protocol.NewBroker("localhost:9092")
if err := broker.Connect(); err != nil {
panic(err)
}
defer broker.Close()

_, err := fetch(broker, &FetchRequestBody{
MaxWaitMS: 500,
MinBytes: 1,
MaxBytes: 52428800,
IsolationLevel: 0,
FetchSessionID: 0,
FetchSessionEpoch: 0,
Topics: []Topic{
{
TopicUUID: "0f62a58e-617b-462f-9161-132a1946d66a",
Partitions: []Partition{
{
ID: 0,
CurrentLeaderEpoch: 0,
FetchOffset: 0,
LastFetchedOffset: -1,
LogStartOffset: -1,
PartitionMaxBytes: 1048576,
},
},
},
},
ForgottenTopics: []ForgottenTopic{},
RackID: "",
}, logger.GetLogger(true, ""))
if err != nil {
panic(err)
}
}

func EncodeFetchRequest(request *FetchRequest) []byte {
encoder := realencoder.RealEncoder{}
// bytes.Buffer{}
Expand Down Expand Up @@ -106,42 +66,3 @@ func DecodeFetchHeaderAndResponse(response []byte, version int16, logger *logger

return &responseHeader, &fetchResponse, nil
}

// Fetch returns api version response or error
func fetch(b *protocol.Broker, requestBody *FetchRequestBody, logger *logger.Logger) ([]byte, error) {
request := FetchRequest{
Header: RequestHeader{
ApiKey: 1,
ApiVersion: 16,
CorrelationId: 0,
ClientId: "kafka-tester",
},
Body: *requestBody,
}

message := EncodeFetchRequest(&request)

response, err := b.SendAndReceive(message)
if err != nil {
return nil, err
}

protocol.PrintHexdump(response.RawBytes)

_, fetchResponse, err := DecodeFetchHeaderAndResponse(response.Payload, 16, logger)
if err != nil {
return nil, err
}

for _, topicResponse := range fetchResponse.TopicResponses {
for _, partitionResponse := range topicResponse.PartitionResponses {
for _, recordBatch := range partitionResponse.RecordBatches {
for _, r := range recordBatch.Records {
fmt.Printf("message: %s\n", r.Value)
}
}
}
}

return nil, nil
}
32 changes: 23 additions & 9 deletions protocol/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"fmt"
"slices"
"sort"

"github.com/codecrafters-io/tester-utils/random"
Expand All @@ -20,15 +21,16 @@ const (
)

var (
all_topic_names = []string{"foo", "bar", "baz", "qux", "quz", "pax", "paz", "saz"}
topic_names = GetSortedValues(random.RandomElementsFromArray(all_topic_names, 4))
TOPIC1_NAME = topic_names[0]
TOPIC2_NAME = topic_names[1]
TOPIC3_NAME = topic_names[2]
TOPIC1_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random.RandomInt(10, 99))
TOPIC2_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random.RandomInt(10, 99))
TOPIC3_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random.RandomInt(10, 99))
TOPICX_UUID = fmt.Sprintf("00000000-0000-0000-0000-00000000%04d", random.RandomInt(1000, 9999)) // Unknown topic used in requests
all_topic_names = []string{"foo", "bar", "baz", "qux", "quz", "pax", "paz", "saz"}
topic_names = GetSortedValues(random.RandomElementsFromArray(all_topic_names, 4))
TOPIC1_NAME = topic_names[0]
TOPIC2_NAME = topic_names[1]
TOPIC3_NAME = topic_names[2]
random_topic_uuids = getUniqueRandomIntegers(10, 99, 3)
TOPIC1_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random_topic_uuids[0])
TOPIC2_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random_topic_uuids[1])
TOPIC3_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random_topic_uuids[2])
TOPICX_UUID = fmt.Sprintf("00000000-0000-0000-0000-00000000%04d", random.RandomInt(1000, 9999)) // Unknown topic used in requests

TOPIC_UNKOWN_NAME = fmt.Sprintf("unknown-topic-%s", topic_names[3])
TOPIC_UNKOWN_UUID = "00000000-0000-0000-0000-000000000000"
Expand All @@ -46,3 +48,15 @@ func GetSortedValues[T string](values []T) []T {
})
return values
}

func getUniqueRandomIntegers(min, max, count int) []int {
randomInts := []int{}
for i := 0; i < count; i++ {
randomInt := random.RandomInt(min, max)
for slices.Contains(randomInts, randomInt) {
randomInt = random.RandomInt(min, max)
}
randomInts = append(randomInts, randomInt)
}
return randomInts
}

0 comments on commit a27c6a6

Please sign in to comment.