Skip to content

Commit

Permalink
modifies DelayingConsumer to use a message header instead of a delay
Browse files Browse the repository at this point in the history
Instead of a static delay, uses a "not before" time found in a Kafka message
header. Consumption of the message will not be attempted until the time has
passed. This allows for more accurate delays, as the time required to process
an earlier message doesn't further delay the current message's processing.

BACK-2449
  • Loading branch information
ewollesen committed Oct 2, 2024
1 parent 1df3ef9 commit ddc4513
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 25 deletions.
102 changes: 93 additions & 9 deletions data/events/events.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package events

import (
"bytes"
"context"
"fmt"
"os"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -341,9 +343,8 @@ func (r *CascadingSaramaEventsRunner) buildConsumer(ctx context.Context, idx int
}
}
if delay > 0 {
consumer = &DelayingConsumer{
consumer = &NotBeforeConsumer{
Consumer: consumer,
Delay: delay,
Logger: r.Logger,
}
}
Expand All @@ -370,28 +371,73 @@ func (r *CascadingSaramaEventsRunner) logger(ctx context.Context) log.Logger {
return r.Logger
}

// DelayingConsumer injects a delay before consuming a message.
type DelayingConsumer struct {
// NotBeforeConsumer delays consumption until a specified time.
type NotBeforeConsumer struct {
Consumer asyncevents.SaramaMessageConsumer
Delay time.Duration
Logger log.Logger
}

func (c *DelayingConsumer) Consume(ctx context.Context, session sarama.ConsumerGroupSession,
func (c *NotBeforeConsumer) Consume(ctx context.Context, session sarama.ConsumerGroupSession,
msg *sarama.ConsumerMessage) error {

notBefore, err := c.notBeforeFromMsgHeaders(msg)
if err != nil {
c.Logger.WithError(err).Info("Unable to parse kafka header not-before value")
}
delay := time.Until(notBefore)

select {
case <-ctx.Done():
if ctxErr := ctx.Err(); ctxErr != context.Canceled {
return ctxErr
}
return nil
case <-time.After(c.Delay):
c.Logger.WithFields(log.Fields{"topic": msg.Topic, "delay": c.Delay}).Debugf("delayed")
case <-time.After(time.Until(notBefore)):
if !notBefore.IsZero() {
fields := log.Fields{"topic": msg.Topic, "not-before": notBefore, "delay": delay}
c.Logger.WithFields(fields).Debugf("delayed")
}
return c.Consumer.Consume(ctx, session, msg)
}
}

// HeaderNotBefore tells consumers not to consume a message before a certain time.
var HeaderNotBefore = []byte("x-tidepool-not-before")

// NotBeforeTimeFormat specifies the [time.Parse] format to use for HeaderNotBefore.
var NotBeforeTimeFormat = time.RFC3339Nano

// HeaderFailures counts the number of failures encountered trying to consume the message.
var HeaderFailures = []byte("x-tidepool-failures")

// FailuresToDelay maps the number of consumption failures to the next delay.
//
// Rather than using a failures header, the name of the topic could be used as a lookup, if
// so desired.
var FailuresToDelay = map[int]time.Duration{
0: 0,
1: 1 * time.Second,
2: 2 * time.Second,
3: 3 * time.Second,
4: 5 * time.Second,
}

func (c *NotBeforeConsumer) notBeforeFromMsgHeaders(msg *sarama.ConsumerMessage) (
time.Time, error) {

for _, header := range msg.Headers {
if bytes.Equal(header.Key, HeaderNotBefore) {
notBefore, err := time.Parse(NotBeforeTimeFormat, string(header.Value))
if err != nil {
return time.Time{}, fmt.Errorf("parsing not before header: %s", err)
} else {
return notBefore, nil
}
}
}
return time.Time{}, fmt.Errorf("header not found: x-tidepool-not-before")
}

// CascadingConsumer cascades messages that failed to be consumed to another topic.
type CascadingConsumer struct {
Consumer asyncevents.SaramaMessageConsumer
Expand Down Expand Up @@ -444,6 +490,7 @@ func (c *CascadingConsumer) withTxn(f func() error) (err error) {
return f()
}

// cascadeMessage to the next topic.
func (c *CascadingConsumer) cascadeMessage(msg *sarama.ConsumerMessage) *sarama.ProducerMessage {
pHeaders := make([]sarama.RecordHeader, len(msg.Headers))
for idx, header := range msg.Headers {
Expand All @@ -453,6 +500,43 @@ func (c *CascadingConsumer) cascadeMessage(msg *sarama.ConsumerMessage) *sarama.
Key: sarama.ByteEncoder(msg.Key),
Value: sarama.ByteEncoder(msg.Value),
Topic: c.NextTopic,
Headers: pHeaders,
Headers: c.updateCascadeHeaders(pHeaders),
}
}

// updateCascadeHeaders calculates not before and failures header values.
//
// Existing not before and failures headers will be dropped in place of the new ones.
func (c *CascadingConsumer) updateCascadeHeaders(headers []sarama.RecordHeader) []sarama.RecordHeader {
failures := 0
notBefore := time.Now()

keep := make([]sarama.RecordHeader, 0, len(headers))
for _, header := range headers {
switch {
case bytes.Equal(header.Key, HeaderNotBefore):
continue // Drop this header, we'll add a new version below.
case bytes.Equal(header.Key, HeaderFailures):
parsed, err := strconv.ParseInt(string(header.Value), 10, 32)
if err != nil {
c.Logger.WithError(err).Info("Unable to parse consumption failures count")
} else {
failures = int(parsed)
notBefore = notBefore.Add(FailuresToDelay[failures])
}
continue // Drop this header, we'll add a new version below.
}
keep = append(keep, header)
}

keep = append(keep, sarama.RecordHeader{
Key: HeaderNotBefore,
Value: []byte(notBefore.Format(NotBeforeTimeFormat)),
})
keep = append(keep, sarama.RecordHeader{
Key: HeaderFailures,
Value: []byte(strconv.Itoa(failures + 1)),
})

return keep
}
139 changes: 123 additions & 16 deletions data/events/events_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package events

import (
"bytes"
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -110,24 +112,32 @@ var _ = DescribeTable("CappedExponentialBinaryDelay",
Entry("cap: 1m; tries: 20", time.Minute, 20, time.Minute),
)

var _ = Describe("DelayingConsumer", func() {
var _ = Describe("NotBeforeConsumer", func() {
Describe("Consume", func() {
var testMsg = &sarama.ConsumerMessage{
Topic: "test.topic",
var newTestMsg = func(notBefore time.Time) *sarama.ConsumerMessage {
headers := []*sarama.RecordHeader{}
if !notBefore.IsZero() {
headers = append(headers, &sarama.RecordHeader{
Key: HeaderNotBefore,
Value: []byte(notBefore.Format(NotBeforeTimeFormat)),
})
}
return &sarama.ConsumerMessage{Topic: "test.topic", Headers: headers}
}

It("delays by the configured duration", func() {
It("delays based on the x-tidepool-not-before header", func() {
logger := newTestDevlog()
testDelay := 10 * time.Millisecond
ctx := context.Background()
start := time.Now()
dc := &DelayingConsumer{
notBefore := start.Add(testDelay)
msg := newTestMsg(notBefore)
dc := &NotBeforeConsumer{
Consumer: &mockSaramaMessageConsumer{Logger: logger},
Delay: testDelay,
Logger: logger,
}

err := dc.Consume(ctx, nil, testMsg)
err := dc.Consume(ctx, nil, msg)

Expect(err).To(BeNil())
Expect(time.Since(start)).To(BeNumerically(">", testDelay))
Expand All @@ -137,9 +147,10 @@ var _ = Describe("DelayingConsumer", func() {
logger := newTestDevlog()
testDelay := 10 * time.Millisecond
abortAfter := 1 * time.Millisecond
dc := &DelayingConsumer{
notBefore := time.Now().Add(testDelay)
msg := newTestMsg(notBefore)
dc := &NotBeforeConsumer{
Consumer: &mockSaramaMessageConsumer{Delay: time.Minute, Logger: logger},
Delay: testDelay,
Logger: logger,
}
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -149,7 +160,7 @@ var _ = Describe("DelayingConsumer", func() {
}()
start := time.Now()

err := dc.Consume(ctx, nil, testMsg)
err := dc.Consume(ctx, nil, msg)

Expect(err).To(BeNil())
Expect(time.Since(start)).To(BeNumerically(">", abortAfter))
Expand All @@ -158,14 +169,14 @@ var _ = Describe("DelayingConsumer", func() {
})
})

var _ = Describe("ShiftingConsumer", func() {
var _ = Describe("CascadingConsumer", func() {
Describe("Consume", func() {
var testMsg = &sarama.ConsumerMessage{
Topic: "test.topic",
}

Context("on failure", func() {
It("shifts topics", func() {
It("cascades topics", func() {
t := GinkgoT()
logger := newTestDevlog()
ctx := context.Background()
Expand Down Expand Up @@ -195,6 +206,102 @@ var _ = Describe("ShiftingConsumer", func() {
Expect(mockProducer.Close()).To(Succeed())
Expect(err).To(BeNil())
})

It("increments the failures header", func() {
t := GinkgoT()
logger := newTestDevlog()
ctx := context.Background()
testConfig := mocks.NewTestConfig()
mockProducer := mocks.NewAsyncProducer(t, testConfig)
msg := &sarama.ConsumerMessage{
Headers: []*sarama.RecordHeader{
{
Key: HeaderFailures, Value: []byte("3"),
},
},
}
nextTopic := "text-next"
sc := &CascadingConsumer{
Consumer: &mockSaramaMessageConsumer{
Err: fmt.Errorf("test error"),
Logger: logger,
},
NextTopic: nextTopic,
Producer: mockProducer,
Logger: logger,
}

cf := func(msg *sarama.ProducerMessage) error {
failures := 0
for _, header := range msg.Headers {
if !bytes.Equal(header.Key, HeaderFailures) {
continue
}
parsed, err := strconv.ParseInt(string(header.Value), 10, 32)
Expect(err).To(Succeed())
failures = int(parsed)
if failures != 4 {
return fmt.Errorf("expected failures == 4, got %d", failures)
}
return nil
}
return fmt.Errorf("expected failures header wasn't found")
}
mockProducer.ExpectInputWithMessageCheckerFunctionAndSucceed(cf)

err := sc.Consume(ctx, nil, msg)
Expect(mockProducer.Close()).To(Succeed())
Expect(err).To(BeNil())
})

It("updates the not before header", func() {
t := GinkgoT()
logger := newTestDevlog()
ctx := context.Background()
testConfig := mocks.NewTestConfig()
mockProducer := mocks.NewAsyncProducer(t, testConfig)
msg := &sarama.ConsumerMessage{
Headers: []*sarama.RecordHeader{
{
Key: HeaderFailures, Value: []byte("2"),
},
},
}
nextTopic := "text-next"
sc := &CascadingConsumer{
Consumer: &mockSaramaMessageConsumer{
Err: fmt.Errorf("test error"),
Logger: logger,
},
NextTopic: nextTopic,
Producer: mockProducer,
Logger: logger,
}

cf := func(msg *sarama.ProducerMessage) error {
for _, header := range msg.Headers {
if !bytes.Equal(header.Key, HeaderNotBefore) {
continue
}
parsed, err := time.Parse(NotBeforeTimeFormat, string(header.Value))
if err != nil {
return err
}
until := time.Until(parsed)
delta := 10 * time.Millisecond
if until < 2*time.Second-delta || until > 2*time.Second+delta {
return fmt.Errorf("expected 2 seconds' delay, got: %s", until)
}
return nil
}
return fmt.Errorf("expected failures header wasn't found")
}
mockProducer.ExpectInputWithMessageCheckerFunctionAndSucceed(cf)

err := sc.Consume(ctx, nil, msg)
Expect(mockProducer.Close()).To(Succeed())
Expect(err).To(BeNil())
})
})

Context("on success", func() {
Expand Down Expand Up @@ -244,8 +351,8 @@ var _ = Describe("ShiftingConsumer", func() {
})
})

var _ = Describe("ShiftingSaramaEventsRunner", func() {
It("shifts through configured delays", func() {
var _ = Describe("CascadingSaramaEventsRunner", func() {
It("cascades through configured delays", func() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
testDelays := []time.Duration{0, 1, 2, 3, 5}
Expand All @@ -256,7 +363,7 @@ var _ = Describe("ShiftingSaramaEventsRunner", func() {
Logger: testLogger,
}
testConfig := SaramaRunnerConfig{
Topics: []string{"test.shifting"},
Topics: []string{"test.cascading"},
MessageConsumer: testMessageConsumer,
Sarama: mocks.NewTestConfig(),
}
Expand Down Expand Up @@ -338,7 +445,7 @@ var _ = Describe("ShiftingSaramaEventsRunner", func() {
})
})

// testSaramaBuilders injects mocks into the ShiftingSaramaEventsRunner
// testSaramaBuilders injects mocks into the CascadingSaramaEventsRunner
type testSaramaBuilders struct {
consumerGroup func([]string, string, *sarama.Config) (sarama.ConsumerGroup, error)
producer func([]string, *sarama.Config) (sarama.AsyncProducer, error)
Expand Down

0 comments on commit ddc4513

Please sign in to comment.