Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for AWS S3 connector #143

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Supported Connectors:
* [Aerospike](https://www.aerospike.com/)
* [Apache Kafka](https://kafka.apache.org/)
* [Apache Pulsar](https://pulsar.apache.org/)
* AWS ([S3](https://aws.amazon.com/s3/))
* [NATS](https://nats.io/)
* [Redis](https://redis.io/)

Expand Down
2 changes: 2 additions & 0 deletions aws/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package aws implements streaming connectors for Amazon Web Services.
package aws
21 changes: 21 additions & 0 deletions aws/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module github.com/reugn/go-streams/aws

go 1.21.0

require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2
github.com/reugn/go-streams v0.10.0
)

require (
github.com/aws/aws-sdk-go-v2 v1.30.5 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17 // indirect
github.com/aws/smithy-go v1.20.4 // indirect
)
24 changes: 24 additions & 0 deletions aws/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
github.com/aws/aws-sdk-go-v2 v1.30.5 h1:mWSRTwQAb0aLE17dSzztCVJWI9+cRMgqebndjwDyK0g=
github.com/aws/aws-sdk-go-v2 v1.30.5/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 h1:pI7Bzt0BJtYA0N/JEC6B8fJ4RBrEMi1LBrkMdFYNSnQ=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17/go.mod h1:Dh5zzJYMtxfIjYW+/evjQ8uj2OyR/ve2KROHGHlSFqE=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 h1:Mqr/V5gvrhA2gvgnF42Zh5iMiQNcOYthFYwCyrnuWlc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17/go.mod h1:aLJpZlCmjE+V+KtN1q1uyZkfnUWpQGpbsn89XPKyzfU=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17 h1:Roo69qTpfu8OlJ2Tb7pAYVuF0CpuUMB0IYWwYP/4DZM=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17/go.mod h1:NcWPxQzGM1USQggaTVwz6VpqMZPX1CvDJLDh6jnOCa4=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19 h1:FLMkfEiRjhgeDTCjjLoc3URo/TBkgeQbocA78lfkzSI=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19/go.mod h1:Vx+GucNSsdhaxs3aZIKfSUjKVGsxN25nX2SRcdhuw08=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 h1:rfprUlsdzgl7ZL2KlXiUAoJnI/VxfHCvDFr2QDFj6u4=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19/go.mod h1:SCWkEdRq8/7EK60NcvvQ6NXKuTcchAD4ROAsC37VEZE=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17 h1:u+EfGmksnJc/x5tq3A+OD7LrMbSSR/5TrKLvkdy/fhY=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17/go.mod h1:VaMx6302JHax2vHJWgRo+5n9zvbacs3bLU/23DNQrTY=
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2 h1:Kp6PWAlXwP1UvIflkIP6MFZYBNDCa4mFCGtxrpICVOg=
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2/go.mod h1:5FmD/Dqq57gP+XwaUnd5WFPipAuzrf0HmupX27Gvjvc=
github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4=
github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/reugn/go-streams v0.10.0 h1:Y0wHNihEbHsFOFV2/xTOKvud4ZpJPaRTET01fwx2/rQ=
github.com/reugn/go-streams v0.10.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg=
293 changes: 293 additions & 0 deletions aws/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
package aws

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"log/slog"
"sync"

"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)

const s3DefaultChunkSize = 5 * 1024 * 1024 // 5 MB

// S3SourceConfig represents the configuration options for the S3 source
// connector.
type S3SourceConfig struct {
// The name of the S3 bucket to read from.
Bucket string
// The path within the bucket to use. If empty, the root of the
// bucket will be used.
Path string
// The number of concurrent workers to use when reading data from S3.
// The default is 1.
Parallelism int
// The size of chunks in bytes to use when reading data from S3.
// The default is 5 MB.
ChunkSize int
}

// S3Source represents the AWS S3 source connector.
type S3Source struct {
client *s3.Client
config *S3SourceConfig
objectCh chan string
out chan any
logger *slog.Logger
}

var _ streams.Source = (*S3Source)(nil)

// NewS3Source returns a new [S3Source].
// The connector reads all objects within the configured path and transmits
// them as an [S3Object] through the output channel.
func NewS3Source(ctx context.Context, client *s3.Client,
config *S3SourceConfig, logger *slog.Logger) *S3Source {

if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "aws.s3"),
slog.String("type", "source")))

if config.Parallelism < 1 {
config.Parallelism = 1
}
if config.ChunkSize < 1 {
config.ChunkSize = s3DefaultChunkSize
}

s3Source := &S3Source{
client: client,
config: config,
objectCh: make(chan string, config.Parallelism),
out: make(chan any),
logger: logger,
}

// list objects in the configured path
go s3Source.listObjects(ctx)

// read the objects and send data downstream
go s3Source.getObjects(ctx)

return s3Source
}

// listObjects reads the list of objects in the configured path and streams
// the keys to the objectCh channel.
func (s *S3Source) listObjects(ctx context.Context) {
var continuationToken *string
for {
listResponse, err := s.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: &s.config.Bucket,
Prefix: &s.config.Path,
ContinuationToken: continuationToken,
})

if err != nil {
s.logger.Error("Failed to list objects", slog.Any("error", err),
slog.Any("continuationToken", continuationToken))
break
}

for _, object := range listResponse.Contents {
s.objectCh <- *object.Key
}

continuationToken = listResponse.NextContinuationToken
if continuationToken == nil {
break
}
}
// close the objects channel
close(s.objectCh)
}

// getObjects reads the objects data and sends it downstream.
func (s *S3Source) getObjects(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < s.config.Parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
loop:
for {
select {
case key, ok := <-s.objectCh:
if !ok {
break loop
}
objectOutput, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &s.config.Bucket,
Key: &key,
})
if err != nil {
s.logger.Error("Failed to get object", slog.Any("error", err),
slog.String("key", key))
}

var data []byte
n, err := bufio.NewReaderSize(objectOutput.Body, s.config.ChunkSize).
Read(data)
if err != nil {
s.logger.Error("Failed to read object", slog.Any("error", err),
slog.String("key", key))
continue
}

s.logger.Debug("Successfully read object", slog.String("key", key),
slog.Int("size", n))

// send the read data downstream as an S3Object
s.out <- &S3Object{
Key: key,
Data: bytes.NewReader(data),
}
case <-ctx.Done():
s.logger.Debug("Object reading finished", slog.Any("error", ctx.Err()))
break loop
}
}
}()
}

// wait for all object readers to exit
wg.Wait()
s.logger.Info("Closing connector")
// close the output channel
close(s.out)
}

// Via streams data to a specified operator and returns it.
func (s *S3Source) Via(operator streams.Flow) streams.Flow {
flow.DoStream(s, operator)
return operator
}

// Out returns the output channel of the S3Source connector.
func (s *S3Source) Out() <-chan any {
return s.out
}

// S3Object contains details of the S3 object.
type S3Object struct {
// Key is the object name including any subdirectories.
// For example, "directory/file.json".
Key string
// Data is an [io.Reader] representing the binary content of the object.
// This can be a file, a buffer, or any other type that implements the
// io.Reader interface.
Data io.Reader
}

// S3SinkConfig represents the configuration options for the S3 sink
// connector.
type S3SinkConfig struct {
// The name of the S3 bucket to write to.
Bucket string
// The number of concurrent workers to use when writing data to S3.
// The default is 1.
Parallelism int
}

// S3Sink represents the AWS S3 sink connector.
type S3Sink struct {
client *s3.Client
config *S3SinkConfig
in chan any
logger *slog.Logger
}

var _ streams.Sink = (*S3Sink)(nil)

// NewS3Sink returns a new [S3Sink].
// Incoming elements are expected to be of the [S3PutObject] type. These will
// be uploaded to the configured bucket using their key field as the path.
func NewS3Sink(ctx context.Context, client *s3.Client,
config *S3SinkConfig, logger *slog.Logger) *S3Sink {

if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "aws.s3"),
slog.String("type", "sink")))

if config.Parallelism < 1 {
config.Parallelism = 1
}

s3Sink := &S3Sink{
client: client,
config: config,
in: make(chan any, config.Parallelism),
logger: logger,
}

// start writing incoming data
go s3Sink.writeObjects(ctx)

return s3Sink
}

// writeObjects writes incoming stream data elements to S3 using the
// configured parallelism.
func (s *S3Sink) writeObjects(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < s.config.Parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range s.in {
var err error
switch object := data.(type) {
case S3Object:
err = s.writeObject(ctx, &object)
case *S3Object:
err = s.writeObject(ctx, object)
default:
s.logger.Error("Unsupported data type",
slog.String("type", fmt.Sprintf("%T", object)))
}

if err != nil {
s.logger.Error("Error writing object",
slog.Any("error", err))
}
}
}()
}

// wait for all writers to exit
wg.Wait()
s.logger.Info("All object writers exited")
}

// writeObject writes a single object to S3.
func (s *S3Sink) writeObject(ctx context.Context, putObject *S3Object) error {
putObjectOutput, err := s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &s.config.Bucket,
Key: &putObject.Key,
Body: putObject.Data,
})
if err != nil {
return fmt.Errorf("failed to put object: %w", err)
}

s.logger.Debug("Successfully put object", slog.String("key", putObject.Key),
slog.Any("etag", putObjectOutput.ETag))

return nil
}

// In returns the input channel of the S3Sink connector.
func (s *S3Sink) In() chan<- any {
return s.in
}
1 change: 1 addition & 0 deletions examples/aws/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/s3/s3
33 changes: 33 additions & 0 deletions examples/aws/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
services:
minio:
image: minio/minio:latest
container_name: minio-server
ports:
- "9000:9000"
- "9001:9001"
expose:
- "9000"
- "9001"
healthcheck:
test: [ "CMD", "mc", "ready", "local" ]
interval: 10s
timeout: 10s
retries: 3
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data/ --console-address :9001

minio-client:
image: minio/mc
container_name: minio-client
depends_on:
minio:
condition: service_healthy
entrypoint: >
/bin/sh -c "
/usr/bin/mc alias set myminio http://minio:9000 minioadmin minioadmin;
/usr/bin/mc mb myminio/stream-test;
/usr/bin/mc anonymous set public myminio/stream-test;
exit 0;
"
Loading
Loading