Skip to content

Commit

Permalink
refactor: replace empty interface with any
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Jan 12, 2024
1 parent fef9ee5 commit 2e0a6b1
Show file tree
Hide file tree
Showing 24 changed files with 138 additions and 138 deletions.
12 changes: 6 additions & 6 deletions aerospike/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type AerospikeSource struct {
client *aero.Client
recordsChannel chan *aero.Result
scanPolicy *aero.ScanPolicy
out chan interface{}
out chan any
ctx context.Context
properties *AerospikeProperties
changeNotificationProperties *ChangeNotificationProperties
Expand All @@ -61,7 +61,7 @@ func NewAerospikeSource(ctx context.Context,
client: client,
recordsChannel: records,
scanPolicy: scanPolicy,
out: make(chan interface{}),
out: make(chan any),
ctx: ctx,
properties: properties,
changeNotificationProperties: changeNotificationProperties,
Expand Down Expand Up @@ -150,7 +150,7 @@ func (as *AerospikeSource) Via(_flow streams.Flow) streams.Flow {
}

// Out returns an output channel for sending data
func (as *AerospikeSource) Out() <-chan interface{} {
func (as *AerospikeSource) Out() <-chan any {
return as.out
}

Expand All @@ -164,7 +164,7 @@ type AerospikeKeyBins struct {
// AerospikeSink represents an Aerospike sink connector.
type AerospikeSink struct {
client *aero.Client
in chan interface{}
in chan any
ctx context.Context
properties *AerospikeProperties
writePolicy *aero.WritePolicy
Expand All @@ -184,7 +184,7 @@ func NewAerospikeSink(ctx context.Context,

source := &AerospikeSink{
client: client,
in: make(chan interface{}),
in: make(chan any),
ctx: ctx,
properties: properties,
writePolicy: writePolicy,
Expand Down Expand Up @@ -228,6 +228,6 @@ func (as *AerospikeSink) init() {
}

// In returns an input channel for receiving data
func (as *AerospikeSink) In() chan<- interface{} {
func (as *AerospikeSink) In() chan<- any {
return as.in
}
2 changes: 1 addition & 1 deletion examples/redis/stream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func main() {

var toUpper = func(msg *redis.XMessage) *redis.XMessage {
fmt.Printf("Got: %v\n", msg.Values)
values := make(map[string]interface{}, len(msg.Values))
values := make(map[string]any, len(msg.Values))
for key, element := range msg.Values {
values[key] = strings.ToUpper(fmt.Sprintf("%v", element))
}
Expand Down
4 changes: 2 additions & 2 deletions examples/std/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ var addUTC = func(msg *message) *message {
return msg
}

func tickerChan(repeat time.Duration) chan interface{} {
func tickerChan(repeat time.Duration) chan any {
ticker := time.NewTicker(repeat)
oc := ticker.C
nc := make(chan interface{})
nc := make(chan any)
go func() {
for range oc {
nc <- &message{strconv.FormatInt(time.Now().UnixNano(), 10)}
Expand Down
4 changes: 2 additions & 2 deletions examples/ws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

type wsServer struct {
clients map[*websocket.Conn]bool
broadcast chan interface{}
broadcast chan any
upgrader websocket.Upgrader
}

Expand All @@ -29,7 +29,7 @@ func startWsServer() {
}
server := &wsServer{
clients: make(map[*websocket.Conn]bool),
broadcast: make(chan interface{}),
broadcast: make(chan any),
upgrader: upgrader,
}

Expand Down
12 changes: 6 additions & 6 deletions extension/chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (

// ChanSource represents an inbound connector that streams items from a channel.
type ChanSource struct {
in chan interface{}
in chan any
}

// NewChanSource returns a new ChanSource instance
func NewChanSource(in chan interface{}) *ChanSource {
func NewChanSource(in chan any) *ChanSource {
return &ChanSource{in}
}

Expand All @@ -22,21 +22,21 @@ func (cs *ChanSource) Via(_flow streams.Flow) streams.Flow {
}

// Out returns an output channel for sending data
func (cs *ChanSource) Out() <-chan interface{} {
func (cs *ChanSource) Out() <-chan any {
return cs.in
}

// ChanSink represents an outbound connector that streams items to a channel.
type ChanSink struct {
Out chan interface{}
Out chan any
}

// NewChanSink returns a new ChanSink instance
func NewChanSink(out chan interface{}) *ChanSink {
func NewChanSink(out chan any) *ChanSink {
return &ChanSink{out}
}

// In returns an input channel for receiving data
func (ch *ChanSink) In() chan<- interface{} {
func (ch *ChanSink) In() chan<- any {
return ch.Out
}
12 changes: 6 additions & 6 deletions extension/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
// FileSource represents an inbound connector that reads items from a file.
type FileSource struct {
fileName string
in chan interface{}
in chan any
}

// NewFileSource returns a new FileSource instance.
func NewFileSource(fileName string) *FileSource {
source := &FileSource{
fileName: fileName,
in: make(chan interface{}),
in: make(chan any),
}
source.init()
return source
Expand Down Expand Up @@ -60,21 +60,21 @@ func (fs *FileSource) Via(_flow streams.Flow) streams.Flow {
}

// Out returns an output channel for sending data
func (fs *FileSource) Out() <-chan interface{} {
func (fs *FileSource) Out() <-chan any {
return fs.in
}

// FileSink represents an outbound connector that writes items to a file.
type FileSink struct {
fileName string
in chan interface{}
in chan any
}

// NewFileSink returns a new FileSink instance.
func NewFileSink(fileName string) *FileSink {
sink := &FileSink{
fileName: fileName,
in: make(chan interface{}),
in: make(chan any),
}
sink.init()
return sink
Expand All @@ -97,6 +97,6 @@ func (fs *FileSink) init() {
}

// In returns an input channel for receiving data
func (fs *FileSink) In() chan<- interface{} {
func (fs *FileSink) In() chan<- any {
return fs.in
}
16 changes: 8 additions & 8 deletions extension/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ type NetSource struct {
conn net.Conn
listener net.Listener
connType ConnType
out chan interface{}
out chan any
}

// NewNetSource returns a new instance of NetSource.
func NewNetSource(ctx context.Context, connType ConnType, address string) (*NetSource, error) {
var err error
var conn net.Conn
var listener net.Listener
out := make(chan interface{})
out := make(chan any)

switch connType {
case TCP:
Expand Down Expand Up @@ -85,7 +85,7 @@ func (ns *NetSource) listenCtx() {
}

// acceptConnections accepts new TCP connections.
func acceptConnections(listener net.Listener, out chan<- interface{}) {
func acceptConnections(listener net.Listener, out chan<- any) {
for {
// accept a new connection
conn, err := listener.Accept()
Expand All @@ -100,7 +100,7 @@ func acceptConnections(listener net.Listener, out chan<- interface{}) {
}

// handleConnection handles new connections.
func handleConnection(conn net.Conn, out chan<- interface{}) {
func handleConnection(conn net.Conn, out chan<- any) {
log.Printf("NetSource connected on: %v", conn.LocalAddr())
reader := bufio.NewReader(conn)

Expand All @@ -127,15 +127,15 @@ func (ns *NetSource) Via(_flow streams.Flow) streams.Flow {
}

// Out returns an output channel for sending data
func (ns *NetSource) Out() <-chan interface{} {
func (ns *NetSource) Out() <-chan any {
return ns.out
}

// NetSink represents an outbound network socket connector.
type NetSink struct {
conn net.Conn
connType ConnType
in chan interface{}
in chan any
}

// NewNetSink returns a new instance of NetSink.
Expand All @@ -151,7 +151,7 @@ func NewNetSink(connType ConnType, address string) (*NetSink, error) {
sink := &NetSink{
conn: conn,
connType: connType,
in: make(chan interface{}),
in: make(chan any),
}

go sink.init()
Expand Down Expand Up @@ -180,6 +180,6 @@ func (ns *NetSink) init() {
}

// In returns an input channel for receiving data
func (ns *NetSink) In() chan<- interface{} {
func (ns *NetSink) In() chan<- any {
return ns.in
}
12 changes: 6 additions & 6 deletions extension/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import "fmt"
// StdoutSink represents a simple outbound connector that sends incoming
// items to standard output.
type StdoutSink struct {
in chan interface{}
in chan any
}

// NewStdoutSink returns a new StdoutSink instance.
func NewStdoutSink() *StdoutSink {
sink := &StdoutSink{
in: make(chan interface{}),
in: make(chan any),
}
sink.init()

Expand All @@ -27,20 +27,20 @@ func (stdout *StdoutSink) init() {
}

// In returns an input channel for receiving data
func (stdout *StdoutSink) In() chan<- interface{} {
func (stdout *StdoutSink) In() chan<- any {
return stdout.in
}

// IgnoreSink represents a simple outbound connector that discards
// all of the incoming items.
type IgnoreSink struct {
in chan interface{}
in chan any
}

// NewIgnoreSink returns a new IgnoreSink instance.
func NewIgnoreSink() *IgnoreSink {
sink := &IgnoreSink{
in: make(chan interface{}),
in: make(chan any),
}
sink.init()

Expand All @@ -59,6 +59,6 @@ func (ignore *IgnoreSink) init() {
}

// In returns an input channel for receiving data
func (ignore *IgnoreSink) In() chan<- interface{} {
func (ignore *IgnoreSink) In() chan<- any {
return ignore.in
}
12 changes: 6 additions & 6 deletions flow/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type FilterPredicate[T any] func(T) bool
// out -- 1 -- 2 ------------------ 5 --
type Filter[T any] struct {
filterPredicate FilterPredicate[T]
in chan interface{}
out chan interface{}
in chan any
out chan any
parallelism uint
}

Expand All @@ -33,8 +33,8 @@ var _ streams.Flow = (*Filter[any])(nil)
func NewFilter[T any](filterPredicate FilterPredicate[T], parallelism uint) *Filter[T] {
filter := &Filter[T]{
filterPredicate: filterPredicate,
in: make(chan interface{}),
out: make(chan interface{}),
in: make(chan any),
out: make(chan any),
parallelism: parallelism,
}
go filter.doStream()
Expand All @@ -54,12 +54,12 @@ func (f *Filter[T]) To(sink streams.Sink) {
}

// Out returns an output channel for sending data
func (f *Filter[T]) Out() <-chan interface{} {
func (f *Filter[T]) Out() <-chan any {
return f.out
}

// In returns an input channel for receiving data
func (f *Filter[T]) In() chan<- interface{} {
func (f *Filter[T]) In() chan<- any {
return f.in
}

Expand Down
12 changes: 6 additions & 6 deletions flow/flat_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type FlatMapFunction[T, R any] func(T) []R
// out -- 1' - 2' -------- 4'- 4" - 5' -
type FlatMap[T, R any] struct {
flatMapFunction FlatMapFunction[T, R]
in chan interface{}
out chan interface{}
in chan any
out chan any
parallelism uint
}

Expand All @@ -31,8 +31,8 @@ var _ streams.Flow = (*FlatMap[any, any])(nil)
func NewFlatMap[T, R any](flatMapFunction FlatMapFunction[T, R], parallelism uint) *FlatMap[T, R] {
flatMap := &FlatMap[T, R]{
flatMapFunction: flatMapFunction,
in: make(chan interface{}),
out: make(chan interface{}),
in: make(chan any),
out: make(chan any),
parallelism: parallelism,
}
go flatMap.doStream()
Expand All @@ -52,12 +52,12 @@ func (fm *FlatMap[T, R]) To(sink streams.Sink) {
}

// Out returns an output channel for sending data
func (fm *FlatMap[T, R]) Out() <-chan interface{} {
func (fm *FlatMap[T, R]) Out() <-chan any {
return fm.out
}

// In returns an input channel for receiving data
func (fm *FlatMap[T, R]) In() chan<- interface{} {
func (fm *FlatMap[T, R]) In() chan<- any {
return fm.in
}

Expand Down
Loading

0 comments on commit 2e0a6b1

Please sign in to comment.