Skip to content

Commit

Permalink
Merge pull request #7 from JoshKCarroll/better-error-handling
Browse files Browse the repository at this point in the history
Logging / error fixes in batchproducer
  • Loading branch information
JoshuaC215 authored Feb 8, 2019
2 parents e54b0c3 + 3815db0 commit cb8a9ff
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
8 changes: 5 additions & 3 deletions batchproducer/batchproducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ func (b *batchProducer) Events() <-chan Event {
// from/for interface Producer
// TODO: send all batches in parallel, will require broader refactoring
func (b *batchProducer) Flush(timeout time.Duration, sendStats bool) (int, int, error) {
b.Stop()
if err := b.Stop(); err != nil {
return 0, 0, err
}

timer := time.NewTimer(timeout)
if timeout == 0 {
Expand Down Expand Up @@ -365,14 +367,14 @@ func (b *batchProducer) sendBatch(batchSize int) int {
b.consecutiveErrors = 0
b.currentDelay = 0
var succeeded int
if res.FailedRecordCount == nil {
if res.FailedRecordCount == nil || *res.FailedRecordCount == 0 {
succeeded = len(records)
b.logger.Debug(fmt.Sprintf("PutRecords request succeeded: sent %v records to Kinesis stream %v", succeeded, b.streamName))
} else {
// note *int64 to int conversion - in practice we never expect 2 billion failed records
// in a single call since API only supports 500 records per call
succeeded = len(records) - int(*res.FailedRecordCount)
b.logger.Debug(fmt.Sprintf("Partial success when sending a PutRecords request to Kinesis stream %v: %v succeeded, %v failed. Re-enqueueing failed records.", b.streamName, succeeded, res.FailedRecordCount))
b.logger.Debug(fmt.Sprintf("Partial success when sending a PutRecords request to Kinesis stream %v: %v succeeded, %v failed. Re-enqueueing failed records.", b.streamName, succeeded, int(*res.FailedRecordCount)))
// returnSomeFailedRecordsToBuffer can block if the buffer (channel) if full so we’ll
// call it in a goroutine. This might be problematic WRT ordering. TODO: revisit this.
go b.returnSomeFailedRecordsToBuffer(res, records)
Expand Down
28 changes: 20 additions & 8 deletions batchproducer/batchproducer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,24 @@ func TestStopWhenStopped(t *testing.T) {
}
}

func TestFlushWhenStopped(t *testing.T) {
t.Parallel()
config := Config{
BufferSize: 100,
FlushInterval: 0,
BatchSize: 10,
}
b, err := New(&mockBatchingClient{}, "foo", config)
if err != nil {
t.Fatalf("%v != nil", err)
}

_, _, err = b.Flush(100, false)
if err == nil {
t.Errorf("%v == nil", err)
}
}

func TestSuccessiveStartsAndStops(t *testing.T) {
t.Parallel()
config := Config{
Expand Down Expand Up @@ -722,14 +740,11 @@ func TestFlushWithTimeout(t *testing.T) {
b := newProducer(c, 1000, 0, 10)

// set running to true so Add will succeed
b.running = true
b.Start()

// Adding 600 will enqueue 2 batches
b.addRecordsAndWait(600, 0)

// back to normal
b.running = false

// This should lead to only 1 batch of 500 being sent by Flush
timeout := 5 * time.Millisecond

Expand Down Expand Up @@ -763,14 +778,11 @@ func TestFlushWithoutTimeout(t *testing.T) {
b := newProducer(c, 1000, 0, 10)

// set running to true so Add will succeed
b.running = true
b.Start()

// Adding 600 will enqueue 2 batches
b.addRecordsAndWait(600, 0)

// back to normal
b.running = false

// This should lead to batches of 500 and 100 being sent by Flush
timeout := 0 * time.Millisecond

Expand Down

0 comments on commit cb8a9ff

Please sign in to comment.