From 980464ec0fb1959ce3de1022cd0e34f210eb6508 Mon Sep 17 00:00:00 2001 From: TPT Date: Fri, 26 Apr 2024 16:24:10 +0000 Subject: [PATCH] fix(kafka): added better error messaging to kafka driver Signed-off-by: TPT --- driver/otel/kafkadriver.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/driver/otel/kafkadriver.go b/driver/otel/kafkadriver.go index 5332502e..abae5504 100644 --- a/driver/otel/kafkadriver.go +++ b/driver/otel/kafkadriver.go @@ -63,17 +63,17 @@ func (s *KafkaDriver) Register(pc plugins.SFPluginCache) { func (s *KafkaDriver) Init(pipeline plugins.SFPipeline, config map[string]interface{}) error { conf, err := CreateKafkaConfig(config) if err != nil { - return fmt.Errorf("caught error while reading kafka driver configuration") + return fmt.Errorf("caught error while reading kafka driver configuration %v", err) } consumer, err := kafka.NewConsumer(&conf.ConfigMap) if err != nil { - return fmt.Errorf("could not create kafka consumer") + return fmt.Errorf("could not create kafka consumer. Error: %v", err) } err = consumer.SubscribeTopics(conf.Topics, nil) if err != nil { - return fmt.Errorf("unable to subscribe to kafka topics: %v", conf.Topics) + return fmt.Errorf("unable to subscribe to kafka topics: %v. Error: %v", conf.Topics, err) } s.config = conf @@ -97,7 +97,7 @@ func (s *KafkaDriver) Run(path string, running *bool) error { /* reads the message from the topics */ msg, err := s.consumer.ReadMessage(-1) if err != nil { - return fmt.Errorf("error reading message %s", err) + return fmt.Errorf("error reading message %v", err) } /* parses the message into an otel record log */