Skip to content

Commit

Permalink
Merge pull request #33 from reugn/develop
Browse files Browse the repository at this point in the history
v0.5.2
  • Loading branch information
reugn authored Oct 14, 2020
2 parents 67e21cc + e1674a1 commit b9f7274
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 58 deletions.
3 changes: 3 additions & 0 deletions flow/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type Filter struct {
parallelism uint
}

// Verify Filter satisfies the Flow interface.
var _ streams.Flow = (*Filter)(nil)

// NewFilter returns a new Filter instance.
// filterFunc is the filter predicate function.
// parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.
Expand Down
3 changes: 3 additions & 0 deletions flow/flat_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type FlatMap struct {
parallelism uint
}

// Verify FlatMap satisfies the Flow interface.
var _ streams.Flow = (*FlatMap)(nil)

// NewFlatMap returns a new FlatMap instance.
// flatMapFunc is the FlatMap transformation function.
// parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.
Expand Down
3 changes: 3 additions & 0 deletions flow/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type Map struct {
parallelism uint
}

// Verify Map satisfies the Flow interface.
var _ streams.Flow = (*Map)(nil)

// NewMap returns a new Map instance.
// mapFunc is the Map transformation function.
// parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.
Expand Down
3 changes: 3 additions & 0 deletions flow/pass_through.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type PassThrough struct {
out chan interface{}
}

// Verify PassThrough satisfies the Flow interface.
var _ streams.Flow = (*PassThrough)(nil)

// NewPassThrough returns a new PassThrough instance.
func NewPassThrough() *PassThrough {
passThrough := &PassThrough{
Expand Down
42 changes: 24 additions & 18 deletions flow/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"github.com/reugn/go-streams"
)

// SlidingWindow flow
// Generates windows of a specified fixed size
// Slides by a slide interval with records overlap
// SlidingWindow assigns elements to windows of fixed length configured by the window size parameter.
// An additional window slide parameter controls how frequently a sliding window is started.
// Hence, sliding windows can be overlapping if the slide is smaller than the window size.
// In this case elements are assigned to multiple windows.
type SlidingWindow struct {
sync.Mutex
size time.Duration
Expand All @@ -22,26 +23,31 @@ type SlidingWindow struct {
timestampExtractor func(interface{}) int64
}

// NewSlidingWindow returns a new Processing time sliding window
// size - The size of the generated windows
// slide - The slide interval of the generated windows
// Verify SlidingWindow satisfies the Flow interface.
var _ streams.Flow = (*SlidingWindow)(nil)

// NewSlidingWindow returns a new processing time based SlidingWindow.
// Processing time refers to the system time of the machine that is executing the respective operation.
// size is the size of the generated windows.
// slide is the sliding interval of the generated windows.
func NewSlidingWindow(size time.Duration, slide time.Duration) *SlidingWindow {
return NewSlidingWindowWithTsExtractor(size, slide, nil)
}

// NewSlidingWindowWithTsExtractor returns a new Event time sliding window
// Gives correct results on out-of-order events, late events, or on replays of data
// size - The size of the generated windows
// slide - The slide interval of the generated windows
// timestampExtractor - The record timestamp (in nanoseconds) extractor
// NewSlidingWindowWithTsExtractor returns a new event time based SlidingWindow.
// Event time is the time that each individual event occurred on its producing device.
// Gives correct results on out-of-order events, late events, or on replays of data.
// size is the size of the generated windows.
// slide is the sliding interval of the generated windows.
// timestampExtractor is the record timestamp (in nanoseconds) extractor.
func NewSlidingWindowWithTsExtractor(size time.Duration, slide time.Duration,
timestampExtractor func(interface{}) int64) *SlidingWindow {
window := &SlidingWindow{
size: size,
slide: slide,
queue: &PriorityQueue{},
in: make(chan interface{}),
out: make(chan interface{}), //windows channel
out: make(chan interface{}), // windows channel
done: make(chan struct{}),
timestampExtractor: timestampExtractor,
}
Expand All @@ -50,13 +56,13 @@ func NewSlidingWindowWithTsExtractor(size time.Duration, slide time.Duration,
return window
}

// Via streams a data through the given flow
// Via streams data through the given flow
func (sw *SlidingWindow) Via(flow streams.Flow) streams.Flow {
go sw.transmit(flow)
return flow
}

// To streams a data to the given sink
// To streams data to the given sink
func (sw *SlidingWindow) To(sink streams.Sink) {
sw.transmit(sink)
}
Expand All @@ -71,15 +77,15 @@ func (sw *SlidingWindow) In() chan<- interface{} {
return sw.in
}

// retransmit an emitted window to the next Inlet
// submit emitted windows to the next Inlet
func (sw *SlidingWindow) transmit(inlet streams.Inlet) {
for elem := range sw.Out() {
inlet.In() <- elem
}
close(inlet.In())
}

// extract a timestamp from the record if timestampExtractor is set
// extract timestamp from the record if the timestampExtractor is set
// return the system clock time otherwise
func (sw *SlidingWindow) timestamp(elem interface{}) int64 {
if sw.timestampExtractor == nil {
Expand All @@ -99,7 +105,7 @@ func (sw *SlidingWindow) receive() {
close(sw.out)
}

// triggered by the slide interval
// emit is triggered by the sliding interval
func (sw *SlidingWindow) emit() {
for {
select {
Expand Down Expand Up @@ -130,7 +136,7 @@ func (sw *SlidingWindow) emit() {
}
sw.Unlock()

// send to the out chan
// send window slice to the out chan
if len(windowSlice) > 0 {
sw.out <- windowSlice
}
Expand Down
3 changes: 3 additions & 0 deletions flow/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Throttler struct {
counter uint
}

// Verify Throttler satisfies the Flow interface.
var _ streams.Flow = (*Throttler)(nil)

// NewThrottler returns a new Throttler instance.
// elements is the maximum number of elements to be produced per the given period of time.
// bufferSize defines the incoming elements buffer size.
Expand Down
22 changes: 12 additions & 10 deletions flow/tumbling_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"github.com/reugn/go-streams"
)

// TumblingWindow flow
// Generates windows of a specified window size
// Tumbling windows have a fixed size and do not overlap
// TumblingWindow assigns each element to a window of a specified window size.
// Tumbling windows have a fixed size and do not overlap.
type TumblingWindow struct {
sync.Mutex
size time.Duration
Expand All @@ -19,27 +18,30 @@ type TumblingWindow struct {
buffer []interface{}
}

// NewTumblingWindow returns a new TumblingWindow instance
// size - The size of the generated windows
// Verify TumblingWindow satisfies the Flow interface.
var _ streams.Flow = (*TumblingWindow)(nil)

// NewTumblingWindow returns a new TumblingWindow instance.
// size is the size of the generated windows.
func NewTumblingWindow(size time.Duration) *TumblingWindow {
window := &TumblingWindow{
size: size,
in: make(chan interface{}),
out: make(chan interface{}), //windows channel
out: make(chan interface{}), // windows channel
done: make(chan struct{}),
}
go window.receive()
go window.emit()
return window
}

// Via streams a data through the given flow
// Via streams data through the given flow
func (tw *TumblingWindow) Via(flow streams.Flow) streams.Flow {
go tw.transmit(flow)
return flow
}

// To streams a data to the given sink
// To streams data to the given sink
func (tw *TumblingWindow) To(sink streams.Sink) {
tw.transmit(sink)
}
Expand All @@ -54,7 +56,7 @@ func (tw *TumblingWindow) In() chan<- interface{} {
return tw.in
}

// retransmit the emitted window to the next Inlet
// submit emitted windows to the next Inlet
func (tw *TumblingWindow) transmit(inlet streams.Inlet) {
for elem := range tw.Out() {
inlet.In() <- elem
Expand All @@ -81,7 +83,7 @@ func (tw *TumblingWindow) emit() {
windowSlice := append(tw.buffer[:0:0], tw.buffer...)
tw.buffer = nil
tw.Unlock()
// send to the out chan
// send window slice to the out chan
if len(windowSlice) > 0 {
tw.out <- windowSlice
}
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ module github.com/reugn/go-streams
go 1.14

require (
github.com/Shopify/sarama v1.27.0
github.com/aerospike/aerospike-client-go v3.0.5+incompatible
github.com/apache/pulsar-client-go v0.1.1
github.com/Shopify/sarama v1.27.1
github.com/aerospike/aerospike-client-go v3.1.0+incompatible
github.com/apache/pulsar-client-go v0.2.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/onsi/gomega v1.10.1 // indirect
github.com/onsi/gomega v1.10.3 // indirect
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
)
Loading

0 comments on commit b9f7274

Please sign in to comment.