diff --git a/protocol/api/api_versions.go b/protocol/api/api_versions.go index 2b1660d..c5ea378 100644 --- a/protocol/api/api_versions.go +++ b/protocol/api/api_versions.go @@ -1,33 +1,13 @@ package kafkaapi import ( - "fmt" - realdecoder "github.com/codecrafters-io/kafka-tester/protocol/decoder" realencoder "github.com/codecrafters-io/kafka-tester/protocol/encoder" - "github.com/codecrafters-io/kafka-tester/protocol" "github.com/codecrafters-io/kafka-tester/protocol/errors" "github.com/codecrafters-io/tester-utils/logger" ) -func GetAPIVersions(prettyPrint bool) { - broker := protocol.NewBroker("localhost:9092") - if err := broker.Connect(); err != nil { - panic(err) - } - defer broker.Close() - - response, err := ApiVersions(broker, &ApiVersionsRequestBody{Version: 4, ClientSoftwareName: "kafka-cli", ClientSoftwareVersion: "0.1"}) - if err != nil { - panic(err) - } - - if prettyPrint { - PrintAPIVersions(response) - } -} - func EncodeApiVersionsRequest(request *ApiVersionsRequest) []byte { encoder := realencoder.RealEncoder{} encoder.Init(make([]byte, 4096)) @@ -88,40 +68,3 @@ func DecodeApiVersionsHeaderAndResponse(response []byte, version int16, logger * return &responseHeader, &apiVersionsResponse, nil } - -// ApiVersions returns api version response or error -func ApiVersions(b *protocol.Broker, requestBody *ApiVersionsRequestBody) (*ApiVersionsResponse, error) { - header := RequestHeader{ - ApiKey: 18, - ApiVersion: requestBody.Version, - CorrelationId: 0, - ClientId: requestBody.ClientSoftwareName, - } - request := ApiVersionsRequest{ - Header: header, - Body: *requestBody, - } - message := EncodeApiVersionsRequest(&request) - - response, err := b.SendAndReceive(message) - if err != nil { - return nil, err - } - - _, apiVersionsResponse, err := DecodeApiVersionsHeaderAndResponse(response.Payload, requestBody.Version, logger.GetLogger(true, "")) - if err != nil { - return nil, err - } - - return apiVersionsResponse, nil -} - -func PrintAPIVersions(response *ApiVersionsResponse) { - fmt.Printf("API versions supported by the broker are:\n") - fmt.Println("API Key\tMinVersion\tMaxVersion\t") - apiVersionKeys := response.ApiKeys - // For each API, the broker will return the minimum and maximum supported version - for _, key := range apiVersionKeys { - fmt.Println(key.ApiKey, "\t", key.MinVersion, "\t", key.MaxVersion) - } -} diff --git a/protocol/api/describe_topic_partitions.go b/protocol/api/describe_topic_partitions.go index a81109f..6910740 100644 --- a/protocol/api/describe_topic_partitions.go +++ b/protocol/api/describe_topic_partitions.go @@ -34,25 +34,6 @@ func EncodeDescribeTopicPartitionsRequest(request *DescribeTopicPartitionsReques return messageBytes } -func DecodeDescribeTopicPartitionHeader(response []byte, version int16, logger *logger.Logger) (*ResponseHeader, error) { - decoder := realdecoder.RealDecoder{} - decoder.Init(response) - logger.UpdateSecondaryPrefix("Decoder") - defer logger.ResetSecondaryPrefix() - - responseHeader := ResponseHeader{} - logger.Debugf("- .ResponseHeader") - // DescribeTopicPartitions always uses Header v0 - if err := responseHeader.DecodeV0(&decoder, logger, 1); err != nil { - if decodingErr, ok := err.(*errors.PacketDecodingError); ok { - return nil, decodingErr.WithAddedContext("Response Header").WithAddedContext("DescribeTopicPartitions v0") - } - return nil, err - } - - return &responseHeader, nil -} - // DecodeDescribeTopicPartitionsHeaderAndResponse decodes the header and response // If an error is encountered while decoding, the returned objects are nil func DecodeDescribeTopicPartitionsHeaderAndResponse(response []byte, logger *logger.Logger) (*ResponseHeader, *DescribeTopicPartitionsResponse, error) { diff --git a/protocol/api/fetch.go b/protocol/api/fetch.go index 0d3e611..5b30a51 100644 --- a/protocol/api/fetch.go +++ b/protocol/api/fetch.go @@ -1,53 +1,13 @@ package kafkaapi import ( - "fmt" - realdecoder "github.com/codecrafters-io/kafka-tester/protocol/decoder" realencoder "github.com/codecrafters-io/kafka-tester/protocol/encoder" - "github.com/codecrafters-io/kafka-tester/protocol" "github.com/codecrafters-io/kafka-tester/protocol/errors" "github.com/codecrafters-io/tester-utils/logger" ) -func Fetch() { - broker := protocol.NewBroker("localhost:9092") - if err := broker.Connect(); err != nil { - panic(err) - } - defer broker.Close() - - _, err := fetch(broker, &FetchRequestBody{ - MaxWaitMS: 500, - MinBytes: 1, - MaxBytes: 52428800, - IsolationLevel: 0, - FetchSessionID: 0, - FetchSessionEpoch: 0, - Topics: []Topic{ - { - TopicUUID: "0f62a58e-617b-462f-9161-132a1946d66a", - Partitions: []Partition{ - { - ID: 0, - CurrentLeaderEpoch: 0, - FetchOffset: 0, - LastFetchedOffset: -1, - LogStartOffset: -1, - PartitionMaxBytes: 1048576, - }, - }, - }, - }, - ForgottenTopics: []ForgottenTopic{}, - RackID: "", - }, logger.GetLogger(true, "")) - if err != nil { - panic(err) - } -} - func EncodeFetchRequest(request *FetchRequest) []byte { encoder := realencoder.RealEncoder{} // bytes.Buffer{} @@ -106,42 +66,3 @@ func DecodeFetchHeaderAndResponse(response []byte, version int16, logger *logger return &responseHeader, &fetchResponse, nil } - -// Fetch returns api version response or error -func fetch(b *protocol.Broker, requestBody *FetchRequestBody, logger *logger.Logger) ([]byte, error) { - request := FetchRequest{ - Header: RequestHeader{ - ApiKey: 1, - ApiVersion: 16, - CorrelationId: 0, - ClientId: "kafka-tester", - }, - Body: *requestBody, - } - - message := EncodeFetchRequest(&request) - - response, err := b.SendAndReceive(message) - if err != nil { - return nil, err - } - - protocol.PrintHexdump(response.RawBytes) - - _, fetchResponse, err := DecodeFetchHeaderAndResponse(response.Payload, 16, logger) - if err != nil { - return nil, err - } - - for _, topicResponse := range fetchResponse.TopicResponses { - for _, partitionResponse := range topicResponse.PartitionResponses { - for _, recordBatch := range partitionResponse.RecordBatches { - for _, r := range recordBatch.Records { - fmt.Printf("message: %s\n", r.Value) - } - } - } - } - - return nil, nil -} diff --git a/protocol/common/constants.go b/protocol/common/constants.go index c15ee0a..510c6ed 100644 --- a/protocol/common/constants.go +++ b/protocol/common/constants.go @@ -2,6 +2,7 @@ package common import ( "fmt" + "slices" "sort" "github.com/codecrafters-io/tester-utils/random" @@ -20,15 +21,16 @@ const ( ) var ( - all_topic_names = []string{"foo", "bar", "baz", "qux", "quz", "pax", "paz", "saz"} - topic_names = GetSortedValues(random.RandomElementsFromArray(all_topic_names, 4)) - TOPIC1_NAME = topic_names[0] - TOPIC2_NAME = topic_names[1] - TOPIC3_NAME = topic_names[2] - TOPIC1_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random.RandomInt(10, 99)) - TOPIC2_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random.RandomInt(10, 99)) - TOPIC3_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random.RandomInt(10, 99)) - TOPICX_UUID = fmt.Sprintf("00000000-0000-0000-0000-00000000%04d", random.RandomInt(1000, 9999)) // Unknown topic used in requests + all_topic_names = []string{"foo", "bar", "baz", "qux", "quz", "pax", "paz", "saz"} + topic_names = GetSortedValues(random.RandomElementsFromArray(all_topic_names, 4)) + TOPIC1_NAME = topic_names[0] + TOPIC2_NAME = topic_names[1] + TOPIC3_NAME = topic_names[2] + random_topic_uuids = getUniqueRandomIntegers(10, 99, 3) + TOPIC1_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random_topic_uuids[0]) + TOPIC2_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random_topic_uuids[1]) + TOPIC3_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random_topic_uuids[2]) + TOPICX_UUID = fmt.Sprintf("00000000-0000-0000-0000-00000000%04d", random.RandomInt(1000, 9999)) // Unknown topic used in requests TOPIC_UNKOWN_NAME = fmt.Sprintf("unknown-topic-%s", topic_names[3]) TOPIC_UNKOWN_UUID = "00000000-0000-0000-0000-000000000000" @@ -46,3 +48,15 @@ func GetSortedValues[T string](values []T) []T { }) return values } + +func getUniqueRandomIntegers(min, max, count int) []int { + randomInts := []int{} + for i := 0; i < count; i++ { + randomInt := random.RandomInt(min, max) + for slices.Contains(randomInts, randomInt) { + randomInt = random.RandomInt(min, max) + } + randomInts = append(randomInts, randomInt) + } + return randomInts +}