Skip to content

Commit

Permalink
feat!: make window and batch flows generic (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Jan 11, 2024
1 parent ec78c7f commit fef9ee5
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 147 deletions.
4 changes: 2 additions & 2 deletions examples/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func main() {
hosts := []string{"127.0.0.1:9092"}
ctx := context.Background()
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Producer.Return.Successes = true
config.Version, _ = sarama.ParseKafkaVersion("2.8.1")
Expand All @@ -35,7 +35,7 @@ func main() {
}

throttler := flow.NewThrottler(1, time.Second, 50, flow.Discard)
tumblingWindow := flow.NewTumblingWindow(time.Second * 5)
tumblingWindow := flow.NewTumblingWindow[*sarama.ConsumerMessage](time.Second * 5)

source.
Via(toUpperMapFlow).
Expand Down
42 changes: 22 additions & 20 deletions flow/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,57 +10,59 @@ import (
// When the maximum batch size is reached or the batch time is elapsed, and the current buffer
// is not empty, a new batch will be emitted.
// Note: once a batch is sent downstream, the timer will be reset.
type Batch struct {
// T indicates the incoming element type, and the outgoing element type is []T.
type Batch[T any] struct {
maxBatchSize int
timeInterval time.Duration
in chan interface{}
out chan interface{}
in chan any
out chan any
}

// Verify Batch satisfies the Flow interface.
var _ streams.Flow = (*Batch)(nil)
var _ streams.Flow = (*Batch[any])(nil)

// NewBatch returns a new Batch instance using the specified maximum batch size and the
// time interval.
// T specifies the incoming element type, and the outgoing element type is []T.
// NewBatch will panic if the maxBatchSize argument is not positive.
func NewBatch(maxBatchSize int, timeInterval time.Duration) *Batch {
func NewBatch[T any](maxBatchSize int, timeInterval time.Duration) *Batch[T] {
if maxBatchSize < 1 {
panic("maxBatchSize must be positive")
}
batchFlow := &Batch{
batchFlow := &Batch[T]{
maxBatchSize: maxBatchSize,
timeInterval: timeInterval,
in: make(chan interface{}),
out: make(chan interface{}),
in: make(chan any),
out: make(chan any),
}
go batchFlow.batchStream()

return batchFlow
}

// Via streams data through the given flow
func (b *Batch) Via(flow streams.Flow) streams.Flow {
// Via streams data to a specified Flow and returns it.
func (b *Batch[T]) Via(flow streams.Flow) streams.Flow {
go b.transmit(flow)
return flow
}

// To streams data to the given sink
func (b *Batch) To(sink streams.Sink) {
// To streams data to a specified Sink.
func (b *Batch[T]) To(sink streams.Sink) {
b.transmit(sink)
}

// Out returns the output channel of the Batch.
func (b *Batch) Out() <-chan interface{} {
func (b *Batch[T]) Out() <-chan any {
return b.out
}

// In returns the input channel of the Batch.
func (b *Batch) In() chan<- interface{} {
func (b *Batch[T]) In() chan<- any {
return b.in
}

// transmit submits batches of elements to the next Inlet.
func (b *Batch) transmit(inlet streams.Inlet) {
func (b *Batch[T]) transmit(inlet streams.Inlet) {
for batch := range b.out {
inlet.In() <- batch
}
Expand All @@ -69,20 +71,20 @@ func (b *Batch) transmit(inlet streams.Inlet) {

// batchStream buffers the incoming stream and emits a batch of elements
// if the maximum batch size reached or the batch times out.
func (b *Batch) batchStream() {
func (b *Batch[T]) batchStream() {
ticker := time.NewTicker(b.timeInterval)
defer ticker.Stop()

batch := make([]interface{}, 0, b.maxBatchSize)
batch := make([]T, 0, b.maxBatchSize)
for {
select {
case element, ok := <-b.in:
if ok {
batch = append(batch, element)
batch = append(batch, element.(T))
// dispatch the batch if the maximum batch size has been reached
if len(batch) >= b.maxBatchSize {
b.out <- batch
batch = make([]interface{}, 0, b.maxBatchSize)
batch = make([]T, 0, b.maxBatchSize)
}
// reset the ticker
ticker.Reset(b.timeInterval)
Expand All @@ -99,7 +101,7 @@ func (b *Batch) batchStream() {
// timeout; dispatch and reset the buffer
if len(batch) > 0 {
b.out <- batch
batch = make([]interface{}, 0, b.maxBatchSize)
batch = make([]T, 0, b.maxBatchSize)
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions flow/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
)

func TestBatch(t *testing.T) {
in := make(chan interface{})
out := make(chan interface{})
in := make(chan any)
out := make(chan any)

source := ext.NewChanSource(in)
batch := flow.NewBatch(4, 40*time.Millisecond)
batch := flow.NewBatch[string](4, 40*time.Millisecond)
sink := ext.NewChanSink(out)

inputValues := []string{"a", "b", "c", "d", "e", "f", "g"}
Expand All @@ -29,18 +29,19 @@ func TestBatch(t *testing.T) {
go func() {
source.
Via(batch).
Via(flow.NewMap(retransmitStringSlice, 1)). // test generic return type
To(sink)
}()

var outputValues [][]interface{}
var outputValues [][]string
for e := range sink.Out {
outputValues = append(outputValues, e.([]interface{}))
outputValues = append(outputValues, e.([]string))
}
fmt.Println(outputValues)

assertEquals(t, 3, len(outputValues)) // [[a b c d] [e f g] [h]]

assertEquals(t, []interface{}{"a", "b", "c", "d"}, outputValues[0])
assertEquals(t, []interface{}{"e", "f", "g"}, outputValues[1])
assertEquals(t, []interface{}{"h"}, outputValues[2])
assertEquals(t, []string{"a", "b", "c", "d"}, outputValues[0])
assertEquals(t, []string{"e", "f", "g"}, outputValues[1])
assertEquals(t, []string{"h"}, outputValues[2])
}
41 changes: 27 additions & 14 deletions flow/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"

Expand All @@ -26,31 +27,43 @@ var reduceSum = func(a int, b int) int {
return a + b
}

func ingestSlice[T any](source []T, in chan interface{}) {
var retransmitStringSlice = func(in []string) []string {
return in
}

var mtx sync.Mutex

func ingestSlice[T any](source []T, in chan any) {
mtx.Lock()
defer mtx.Unlock()
for _, e := range source {
in <- e
}
}

func ingestDeferred[T any](item T, in chan interface{}, wait time.Duration) {
func ingestDeferred[T any](item T, in chan any, wait time.Duration) {
time.Sleep(wait)
mtx.Lock()
defer mtx.Unlock()
in <- item
}

func closeDeferred[T any](in chan T, wait time.Duration) {
time.Sleep(wait)
mtx.Lock()
defer mtx.Unlock()
close(in)
}

func TestComplexFlow(t *testing.T) {
in := make(chan interface{})
out := make(chan interface{})
in := make(chan any)
out := make(chan any)

source := ext.NewChanSource(in)
toUpperMapFlow := flow.NewMap(strings.ToUpper, 1)
appendAsteriskFlatMapFlow := flow.NewFlatMap(addAsterisk, 1)
throttler := flow.NewThrottler(10, 200*time.Millisecond, 50, flow.Backpressure)
tumblingWindow := flow.NewTumblingWindow(200 * time.Millisecond)
tumblingWindow := flow.NewTumblingWindow[string](200 * time.Millisecond)
filterFlow := flow.NewFilter(filterNotContainsA, 1)
sink := ext.NewChanSink(out)

Expand All @@ -63,7 +76,7 @@ func TestComplexFlow(t *testing.T) {
Via(toUpperMapFlow).
Via(appendAsteriskFlatMapFlow).
Via(tumblingWindow).
Via(flow.Flatten(1)).
Via(flow.Flatten[string](1)).
Via(throttler).
Via(filterFlow).
To(sink)
Expand All @@ -79,8 +92,8 @@ func TestComplexFlow(t *testing.T) {
}

func TestSplitFlow(t *testing.T) {
in := make(chan interface{}, 3)
out := make(chan interface{}, 3)
in := make(chan any, 3)
out := make(chan any, 3)

source := ext.NewChanSource(in)
toUpperMapFlow := flow.NewMap(strings.ToUpper, 1)
Expand Down Expand Up @@ -108,8 +121,8 @@ func TestSplitFlow(t *testing.T) {
}

func TestFanOutFlow(t *testing.T) {
in := make(chan interface{})
out := make(chan interface{})
in := make(chan any)
out := make(chan any)

source := ext.NewChanSource(in)
filterFlow := flow.NewFilter(filterNotContainsA, 1)
Expand Down Expand Up @@ -141,8 +154,8 @@ func TestFanOutFlow(t *testing.T) {
}

func TestRoundRobinFlow(t *testing.T) {
in := make(chan interface{})
out := make(chan interface{})
in := make(chan any)
out := make(chan any)

source := ext.NewChanSource(in)
filterFlow := flow.NewFilter(filterNotContainsA, 1)
Expand Down Expand Up @@ -174,8 +187,8 @@ func TestRoundRobinFlow(t *testing.T) {
}

func TestReduceFlow(t *testing.T) {
in := make(chan interface{}, 5)
out := make(chan interface{}, 5)
in := make(chan any, 5)
out := make(chan any, 5)

source := ext.NewChanSource(in)
reduceFlow := flow.NewReduce(reduceSum)
Expand Down
48 changes: 25 additions & 23 deletions flow/session_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,29 @@ import (

// SessionWindow generates groups of elements by sessions of activity.
// Session windows do not overlap and do not have a fixed start and end time.
type SessionWindow struct {
// T indicates the incoming element type, and the outgoing element type is []T.
type SessionWindow[T any] struct {
sync.Mutex
inactivityGap time.Duration
in chan interface{}
out chan interface{}
in chan any
out chan any
reset chan struct{}
done chan struct{}
buffer []interface{}
buffer []T
}

// Verify SessionWindow satisfies the Flow interface.
var _ streams.Flow = (*SessionWindow)(nil)
var _ streams.Flow = (*SessionWindow[any])(nil)

// NewSessionWindow returns a new SessionWindow instance.
// T specifies the incoming element type, and the outgoing element type is []T.
//
// inactivityGap is the gap of inactivity that closes a session window when occurred.
func NewSessionWindow(inactivityGap time.Duration) *SessionWindow {
sessionWindow := &SessionWindow{
func NewSessionWindow[T any](inactivityGap time.Duration) *SessionWindow[T] {
sessionWindow := &SessionWindow[T]{
inactivityGap: inactivityGap,
in: make(chan interface{}),
out: make(chan interface{}),
in: make(chan any),
out: make(chan any),
reset: make(chan struct{}),
done: make(chan struct{}),
}
Expand All @@ -39,29 +41,29 @@ func NewSessionWindow(inactivityGap time.Duration) *SessionWindow {
return sessionWindow
}

// Via streams data through the given flow
func (sw *SessionWindow) Via(flow streams.Flow) streams.Flow {
// Via streams data to a specified Flow and returns it.
func (sw *SessionWindow[T]) Via(flow streams.Flow) streams.Flow {
go sw.transmit(flow)
return flow
}

// To streams data to the given sink
func (sw *SessionWindow) To(sink streams.Sink) {
// To streams data to a specified Sink.
func (sw *SessionWindow[T]) To(sink streams.Sink) {
sw.transmit(sink)
}

// Out returns an output channel for sending data
func (sw *SessionWindow) Out() <-chan interface{} {
// Out returns the output channel of the SessionWindow.
func (sw *SessionWindow[T]) Out() <-chan any {
return sw.out
}

// In returns an input channel for receiving data
func (sw *SessionWindow) In() chan<- interface{} {
// In returns the input channel of the SessionWindow.
func (sw *SessionWindow[T]) In() chan<- any {
return sw.in
}

// transmit submits closed windows to the next Inlet.
func (sw *SessionWindow) transmit(inlet streams.Inlet) {
func (sw *SessionWindow[T]) transmit(inlet streams.Inlet) {
for window := range sw.out {
inlet.In() <- window
}
Expand All @@ -70,18 +72,18 @@ func (sw *SessionWindow) transmit(inlet streams.Inlet) {

// receive buffers the incoming elements.
// It resets the inactivity timer on each new element.
func (sw *SessionWindow) receive() {
func (sw *SessionWindow[T]) receive() {
for element := range sw.in {
sw.Lock()
sw.buffer = append(sw.buffer, element)
sw.buffer = append(sw.buffer, element.(T))
sw.Unlock()
sw.notifyTimerReset() // signal to reset the inactivity timer
}
close(sw.done)
}

// notifyTimerReset sends a notification to reset the inactivity timer.
func (sw *SessionWindow) notifyTimerReset() {
func (sw *SessionWindow[T]) notifyTimerReset() {
select {
case sw.reset <- struct{}{}:
default:
Expand All @@ -91,7 +93,7 @@ func (sw *SessionWindow) notifyTimerReset() {
// emit captures and emits a session window based on the gap of inactivity.
// When this period expires, the current session closes and subsequent elements
// are assigned to a new session window.
func (sw *SessionWindow) emit() {
func (sw *SessionWindow[T]) emit() {
timer := time.NewTimer(sw.inactivityGap)
for {
select {
Expand All @@ -118,7 +120,7 @@ func (sw *SessionWindow) emit() {

// dispatchWindow creates a window from buffered elements and resets the buffer.
// It sends the slice of elements to the output channel if the window is not empty.
func (sw *SessionWindow) dispatchWindow() {
func (sw *SessionWindow[T]) dispatchWindow() {
sw.Lock()
windowElements := sw.buffer
sw.buffer = nil
Expand Down
Loading

0 comments on commit fef9ee5

Please sign in to comment.