Skip to content

Commit

Permalink
feat(connectors)!: use slog for structured logging
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Aug 19, 2024
1 parent cdcb6c0 commit 8b09356
Show file tree
Hide file tree
Showing 28 changed files with 1,056 additions and 1,131 deletions.
59 changes: 43 additions & 16 deletions aerospike/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"crypto/sha256"
"encoding/json"
"log"
"fmt"
"log/slog"
"time"

aero "github.com/aerospike/aerospike-client-go/v6"
aero "github.com/aerospike/aerospike-client-go/v7"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)
Expand All @@ -27,7 +28,7 @@ type PollingConfig struct {
Namespace string
// SetName determines query set name (optional).
SetName string
// BinNames detemines which bins to retrieve (optional).
// BinNames determines which bins to retrieve (optional).
BinNames []string

filterExpression *aero.Expression
Expand All @@ -41,13 +42,14 @@ type PollingSource struct {
statement *aero.Statement
recordsChan chan *aero.Result
out chan any
logger *slog.Logger
}

var _ streams.Source = (*PollingSource)(nil)

// NewPollingSource returns a new PollingSource instance.
// NewPollingSource returns a new [PollingSource] connector.
func NewPollingSource(ctx context.Context, client *aero.Client,
config PollingConfig) *PollingSource {
config PollingConfig, logger *slog.Logger) *PollingSource {
if config.QueryPolicy == nil {
config.QueryPolicy = aero.NewQueryPolicy()
} else {
Expand All @@ -59,12 +61,21 @@ func NewPollingSource(ctx context.Context, client *aero.Client,
Filter: config.SecondaryIndexFilter,
BinNames: config.BinNames,
}

if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "aerospike"),
slog.String("type", "source")))

source := &PollingSource{
client: client,
config: config,
statement: statement,
recordsChan: make(chan *aero.Result),
out: make(chan any),
logger: logger,
}

go source.pollChanges(ctx)
Expand Down Expand Up @@ -104,7 +115,7 @@ loop:
ps.config.filterExpression,
)
}
log.Printf("Polling records from: %s", lastUpdate)
ps.logger.Debug("Polling records", slog.Any("from", lastUpdate))
// execute the query command
ps.query()
}
Expand All @@ -114,7 +125,7 @@ loop:
func (ps *PollingSource) query() {
recordSet, err := ps.client.Query(ps.config.QueryPolicy, ps.statement)
if err != nil {
log.Printf("Aerospike polling query failed: %s", err)
ps.logger.Error("Polling query failed", slog.Any("error", err))
return
}
for result := range recordSet.Results() {
Expand All @@ -135,11 +146,12 @@ loop:
if result.Err == nil {
ps.out <- result.Record // send the record downstream
} else {
log.Printf("Aerospike query record error: %s", result.Err)
ps.logger.Error("Read record error",
slog.Any("error", result.Err))
}
}
}
log.Printf("Closing Aerospike polling connector")
ps.logger.Info("Closing connector")
close(ps.out)
}

Expand Down Expand Up @@ -201,21 +213,32 @@ type Sink struct {
config SinkConfig
buf []*Record
in chan any
logger *slog.Logger
}

var _ streams.Sink = (*Sink)(nil)

// NewSink returns a new Sink instance.
func NewSink(client *aero.Client, config SinkConfig) *Sink {
// NewSink returns a new [Sink] connector.
func NewSink(client *aero.Client, config SinkConfig, logger *slog.Logger) *Sink {
if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "aerospike"),
slog.String("type", "sink")))

sink := &Sink{
client: client,
config: config,
in: make(chan any),
logger: logger,
}

// initialize the buffer for batch writes
if config.BatchSize > 1 {
sink.buf = make([]*Record, 0, config.BatchSize)
}

// begin processing upstream records
go sink.processStream()

Expand Down Expand Up @@ -253,10 +276,12 @@ loop:
}
}
if err != nil {
log.Printf("Error parsing bin map: %s", err)
as.logger.Error("Error parsing bin map",
slog.Any("error", err))
}
default:
log.Printf("Unsupported message type: %T", message)
as.logger.Error("Unsupported message type",
slog.String("type", fmt.Sprintf("%T", message)))
}
case <-flushTickerChan:
as.flushBuffer()
Expand All @@ -275,7 +300,7 @@ func (as *Sink) writeRecord(record *Record) {
} else {
// use single record put operation
if err := as.client.Put(as.config.WritePolicy, record.Key, record.Bins); err != nil {
log.Printf("Failed to write record: %s", err)
as.logger.Error("Failed to write record", slog.Any("error", err))
}
}
}
Expand All @@ -287,9 +312,11 @@ func (as *Sink) flushBuffer() {
for _, rec := range as.buf {
records = append(records, rec.batchWrite(as.config.BatchWritePolicy))
}
log.Printf("Writing batch of %d records", len(records))
as.logger.Debug("Writing batch of records",
slog.Int("size", len(records)))
if err := as.client.BatchOperate(as.config.BatchPolicy, records); err != nil {
log.Printf("Failed to write batch of records: %s", err)
as.logger.Error("Failed to write batch of records",
slog.Any("error", err))
}
as.buf = as.buf[:0] // clear the buffer
}
Expand Down
16 changes: 8 additions & 8 deletions aerospike/go.mod
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
module github.com/reugn/go-streams/aerospike

go 1.18
go 1.21.0

require (
github.com/aerospike/aerospike-client-go/v6 v6.15.1
github.com/aerospike/aerospike-client-go/v7 v7.6.1
github.com/reugn/go-streams v0.10.0
)

require (
github.com/yuin/gopher-lua v1.1.1 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d // indirect
google.golang.org/grpc v1.63.3 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
44 changes: 26 additions & 18 deletions aerospike/go.sum
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
github.com/aerospike/aerospike-client-go/v6 v6.15.1 h1:meQQ3dVNImi8+EcHJFe4f1+mF6wpg2qgv7dPNAp0L+4=
github.com/aerospike/aerospike-client-go/v6 v6.15.1/go.mod h1:8GzCrqAEvZig6Cr/dz5nwPucIOAZXJTHkt6L7WBZFaA=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/aerospike/aerospike-client-go/v7 v7.6.1 h1:VZK6S9YKq2w6ptTk3kXXjTxG2U9M9Y7Oi3YQ+3T7wQQ=
github.com/aerospike/aerospike-client-go/v7 v7.6.1/go.mod h1:uCbSYMpjlRcH/9f26VSF/luzDDXrcDaV8c6/WIcKtT4=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/pprof v0.0.0-20240319011627-a57c5dfe54fd h1:LjW4RcTwfcqOYGmD7UpFrn1gfBZ9mgu7QN5mSeFkCog=
github.com/onsi/ginkgo/v2 v2.17.0 h1:kdnunFXpBjbzN56hcJHrXZ8M+LOkenKA7NnBzTNigTI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240711041743-f6c9dda6c6da h1:xRmpO92tb8y+Z85iUOMOicpCfaYcv7o3Cg3wKrIpg8g=
github.com/google/pprof v0.0.0-20240711041743-f6c9dda6c6da/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo=
github.com/onsi/ginkgo/v2 v2.16.0 h1:7q1w9frJDzninhXxjZd+Y/x54XNjG/UlRLIYPZafsPM=
github.com/onsi/ginkgo/v2 v2.16.0/go.mod h1:llBI3WDLL9Z6taip6f33H76YcWtJv+7R3HigUjbIBOs=
github.com/onsi/gomega v1.32.0 h1:JRYU78fJ1LPxlckP6Txi/EYqJvjtMrDC04/MM5XRHPk=
github.com/onsi/gomega v1.32.0/go.mod h1:a4x4gW6Pz2yK1MAmvluYme5lvYTn61afQ2ETw/8n4Lg=
github.com/reugn/go-streams v0.10.0 h1:Y0wHNihEbHsFOFV2/xTOKvud4ZpJPaRTET01fwx2/rQ=
github.com/reugn/go-streams v0.10.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d h1:JU0iKnSg02Gmb5ZdV8nYsKEKsP6o/FGVWTrw4i1DA9A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.63.3 h1:FGVegD7MHo/zhaGduk/R85WvSFJ+si70UQIJ0fg+BiU=
google.golang.org/grpc v1.63.3/go.mod h1:5FFeE/YiGPD2flWFCrCx8K3Ay7hALATnKiI8U3avIuw=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
6 changes: 3 additions & 3 deletions examples/aerospike/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"
"time"

aero "github.com/aerospike/aerospike-client-go/v6"
aero "github.com/aerospike/aerospike-client-go/v7"
"github.com/reugn/go-streams/aerospike"
"github.com/reugn/go-streams/flow"
)
Expand All @@ -30,7 +30,7 @@ func main() {
QueryPolicy: queryPolicy,
Namespace: "test",
SetName: "source",
})
}, nil)

mapFlow := flow.NewMap(transform, 1)

Expand All @@ -42,7 +42,7 @@ func main() {
BatchWritePolicy: batchWritePolicy,
Namespace: "test",
SetName: "sink",
})
}, nil)

source.
Via(mapFlow).
Expand Down
Loading

0 comments on commit 8b09356

Please sign in to comment.