Skip to content

Commit

Permalink
feat: implement batch with timeout flow (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Jan 11, 2024
1 parent ac53b80 commit ec78c7f
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 1 deletion.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Building blocks:
* Flow - A Flow is a set of stream processing steps that has one open input and one open output.
* Sink - A Sink is a set of stream processing steps that has one open input. Can be used as a Subscriber.

Flow capabilities ([flow](https://github.com/reugn/go-streams/tree/master/flow) package):
Implemented Flows ([flow](https://github.com/reugn/go-streams/tree/master/flow) package):
* Map
* FlatMap
* Filter
Expand All @@ -27,6 +27,7 @@ Flow capabilities ([flow](https://github.com/reugn/go-streams/tree/master/flow)
* RoundRobin
* Merge
* Flatten
* Batch
* Throttler
* SlidingWindow
* TumblingWindow
Expand Down
106 changes: 106 additions & 0 deletions flow/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package flow

import (
"time"

"github.com/reugn/go-streams"
)

// Batch processor breaks a stream of elements into batches based on size or timing.
// When the maximum batch size is reached or the batch time is elapsed, and the current buffer
// is not empty, a new batch will be emitted.
// Note: once a batch is sent downstream, the timer will be reset.
type Batch struct {
maxBatchSize int
timeInterval time.Duration
in chan interface{}
out chan interface{}
}

// Verify Batch satisfies the Flow interface.
var _ streams.Flow = (*Batch)(nil)

// NewBatch returns a new Batch instance using the specified maximum batch size and the
// time interval.
// NewBatch will panic if the maxBatchSize argument is not positive.
func NewBatch(maxBatchSize int, timeInterval time.Duration) *Batch {
if maxBatchSize < 1 {
panic("maxBatchSize must be positive")
}
batchFlow := &Batch{
maxBatchSize: maxBatchSize,
timeInterval: timeInterval,
in: make(chan interface{}),
out: make(chan interface{}),
}
go batchFlow.batchStream()

return batchFlow
}

// Via streams data through the given flow
func (b *Batch) Via(flow streams.Flow) streams.Flow {
go b.transmit(flow)
return flow
}

// To streams data to the given sink
func (b *Batch) To(sink streams.Sink) {
b.transmit(sink)
}

// Out returns the output channel of the Batch.
func (b *Batch) Out() <-chan interface{} {
return b.out
}

// In returns the input channel of the Batch.
func (b *Batch) In() chan<- interface{} {
return b.in
}

// transmit submits batches of elements to the next Inlet.
func (b *Batch) transmit(inlet streams.Inlet) {
for batch := range b.out {
inlet.In() <- batch
}
close(inlet.In())
}

// batchStream buffers the incoming stream and emits a batch of elements
// if the maximum batch size reached or the batch times out.
func (b *Batch) batchStream() {
ticker := time.NewTicker(b.timeInterval)
defer ticker.Stop()

batch := make([]interface{}, 0, b.maxBatchSize)
for {
select {
case element, ok := <-b.in:
if ok {
batch = append(batch, element)
// dispatch the batch if the maximum batch size has been reached
if len(batch) >= b.maxBatchSize {
b.out <- batch
batch = make([]interface{}, 0, b.maxBatchSize)
}
// reset the ticker
ticker.Reset(b.timeInterval)
} else {
// send the available buffer elements as a new batch, close the
// output channel and return
if len(batch) > 0 {
b.out <- batch
}
close(b.out)
return
}
case <-ticker.C:
// timeout; dispatch and reset the buffer
if len(batch) > 0 {
b.out <- batch
batch = make([]interface{}, 0, b.maxBatchSize)
}
}
}
}
46 changes: 46 additions & 0 deletions flow/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package flow_test

import (
"fmt"
"testing"
"time"

ext "github.com/reugn/go-streams/extension"
"github.com/reugn/go-streams/flow"
)

func TestBatch(t *testing.T) {
in := make(chan interface{})
out := make(chan interface{})

source := ext.NewChanSource(in)
batch := flow.NewBatch(4, 40*time.Millisecond)
sink := ext.NewChanSink(out)

inputValues := []string{"a", "b", "c", "d", "e", "f", "g"}
go func() {
for _, e := range inputValues {
ingestDeferred(e, in, 5*time.Millisecond)
}
}()
go ingestDeferred("h", in, 90*time.Millisecond)
go closeDeferred(in, 100*time.Millisecond)

go func() {
source.
Via(batch).
To(sink)
}()

var outputValues [][]interface{}
for e := range sink.Out {
outputValues = append(outputValues, e.([]interface{}))
}
fmt.Println(outputValues)

assertEquals(t, 3, len(outputValues)) // [[a b c d] [e f g] [h]]

assertEquals(t, []interface{}{"a", "b", "c", "d"}, outputValues[0])
assertEquals(t, []interface{}{"e", "f", "g"}, outputValues[1])
assertEquals(t, []interface{}{"h"}, outputValues[2])
}

0 comments on commit ec78c7f

Please sign in to comment.