Skip to content

Commit

Permalink
feat: add support for Azure Blob Storage connector
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Oct 17, 2024
1 parent d2f45f1 commit 088dda0
Show file tree
Hide file tree
Showing 10 changed files with 462 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Supported Connectors:
* [Apache Kafka](https://kafka.apache.org/)
* [Apache Pulsar](https://pulsar.apache.org/)
* AWS ([S3](https://aws.amazon.com/s3/))
* Azure ([Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs/))
* GCP ([Storage](https://cloud.google.com/storage/))
* [NATS](https://nats.io/)
* [Redis](https://redis.io/)
Expand Down
243 changes: 243 additions & 0 deletions azure/blob_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package azure

import (
"context"
"fmt"
"io"
"log/slog"
"sync"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)

// BlobStorageSourceConfig represents the configuration options for the Azure
// Blob storage source connector.
type BlobStorageSourceConfig struct {
// The name of the Azure Blob storage container to read from.
Container string
// The path within the container to use. If empty, the root of the container will be used.
Prefix string
// Indicates whether to ignore blob prefixes (virtual directories) in blob segments.
Flat bool
}

// BlobStorageSource represents the Azure Blob storage source connector.
type BlobStorageSource struct {
client *azblob.Client
containerClient *container.Client
config *BlobStorageSourceConfig
out chan any
logger *slog.Logger
}

var _ streams.Source = (*BlobStorageSource)(nil)

// NewBlobStorageSource returns a new [BlobStorageSource].
// The connector reads all objects within the configured path and transmits
// them as a [BlobStorageObject] through the output channel.
func NewBlobStorageSource(ctx context.Context, client *azblob.Client,
config *BlobStorageSourceConfig, logger *slog.Logger) *BlobStorageSource {

if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "azure.blob"),
slog.String("type", "source")))

blobSource := &BlobStorageSource{
client: client,
containerClient: client.ServiceClient().NewContainerClient(config.Container),
config: config,
out: make(chan any),
logger: logger,
}

// read objects and send them downstream
go blobSource.listBlobs(ctx)

return blobSource
}

func (s *BlobStorageSource) listBlobs(ctx context.Context) {
s.listBlobsHierarchy(ctx, &s.config.Prefix, nil)

s.logger.Info("Blob iteration completed")
close(s.out)
}

func (s *BlobStorageSource) listBlobsHierarchy(ctx context.Context, prefix, marker *string) {
pager := s.containerClient.NewListBlobsHierarchyPager("/",
&container.ListBlobsHierarchyOptions{
Prefix: prefix,
Marker: marker,
})

for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
s.logger.Error("Error reading next page", slog.Any("error", err))
break
}

// set the continuation token
marker = resp.Marker

if !s.config.Flat && resp.Segment.BlobPrefixes != nil {
for _, prefix := range resp.Segment.BlobPrefixes {
s.logger.Debug("Virtual directory", slog.String("prefix", *prefix.Name))

// recursively list blobs in the prefix
s.listBlobsHierarchy(ctx, prefix.Name, nil)
}
}

// list blobs in the current page
for _, blob := range resp.Segment.BlobItems {
resp, err := s.client.DownloadStream(ctx, s.config.Container, *blob.Name, nil)
if err != nil {
s.logger.Error("Error reading blob", slog.Any("error", err))
continue
}

select {
// send the blob downstream
case s.out <- &BlobStorageObject{
Key: *blob.Name,
Data: resp.Body,
}:
case <-ctx.Done():
s.logger.Info("Blob reading terminated", slog.Any("error", ctx.Err()))
return
}
}
}

// retrieve the remainder if the continuation token is available
if marker != nil && *marker != "" {
s.logger.Info("Continuation token is available", slog.String("marker", *marker))
s.listBlobsHierarchy(ctx, prefix, marker)
}
}

// Via streams data to a specified operator and returns it.
func (s *BlobStorageSource) Via(operator streams.Flow) streams.Flow {
flow.DoStream(s, operator)
return operator
}

// Out returns the output channel of the BlobStorageSource connector.
func (s *BlobStorageSource) Out() <-chan any {
return s.out
}

// BlobStorageObject contains details of the Azure Blob storage object.
type BlobStorageObject struct {
// Key is the object name including any subdirectories.
// For example, "directory/file.json".
Key string
// Data is an [io.ReadCloser] representing the binary content of the blob object.
Data io.ReadCloser
}

// BlobStorageSinkConfig represents the configuration options for the Azure Blob
// storage sink connector.
type BlobStorageSinkConfig struct {
// The name of the Azure Blob storage container to write to.
Container string
// The number of concurrent workers to use when writing data to Azure Blob storage.
// The default is 1.
Parallelism int
// UploadOptions specifies set of configurations for the blob upload operation.
UploadOptions *blockblob.UploadStreamOptions
}

// BlobStorageSink represents the Azure Blob storage sink connector.
type BlobStorageSink struct {
client *azblob.Client
config *BlobStorageSinkConfig
in chan any
logger *slog.Logger
}

var _ streams.Sink = (*BlobStorageSink)(nil)

// NewBlobStorageSink returns a new [BlobStorageSink].
// Incoming elements are expected to be of the [BlobStorageObject] type. These will
// be uploaded to the configured container using their key field as the path.
func NewBlobStorageSink(ctx context.Context, client *azblob.Client,
config *BlobStorageSinkConfig, logger *slog.Logger) *BlobStorageSink {

if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "azure.blob"),
slog.String("type", "sink")))

if config.Parallelism < 1 {
config.Parallelism = 1
}

blobSink := &BlobStorageSink{
client: client,
config: config,
in: make(chan any, config.Parallelism),
logger: logger,
}

// start writing incoming data
go blobSink.uploadBlobs(ctx)

return blobSink
}

// uploadBlobs writes incoming stream data elements to Azure Blob storage
// using the configured parallelism.
func (s *BlobStorageSink) uploadBlobs(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < s.config.Parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range s.in {
var err error
switch object := data.(type) {
case BlobStorageObject:
err = s.uploadBlob(ctx, &object)
case *BlobStorageObject:
err = s.uploadBlob(ctx, object)
default:
s.logger.Error("Unsupported data type",
slog.String("type", fmt.Sprintf("%T", object)))
}

if err != nil {
s.logger.Error("Error uploading blob",
slog.Any("error", err))
}
}
}()
}

// wait for all writers to exit
wg.Wait()
s.logger.Info("All blob writers exited")
}

// uploadBlob uploads a single blob to Azure Blob storage.
func (s *BlobStorageSink) uploadBlob(ctx context.Context, object *BlobStorageObject) error {
defer object.Data.Close()
_, err := s.client.UploadStream(ctx, s.config.Container, object.Key, object.Data,
s.config.UploadOptions)
return err
}

// In returns the input channel of the BlobStorageSink connector.
func (s *BlobStorageSink) In() chan<- any {
return s.in
}
2 changes: 2 additions & 0 deletions azure/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package azure implements streaming connectors for Microsoft Azure services.
package azure
15 changes: 15 additions & 0 deletions azure/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module github.com/reugn/go-streams/azure

go 1.21.0

require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1
github.com/reugn/go-streams v0.10.0
)

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/text v0.16.0 // indirect
)
38 changes: 38 additions & 0 deletions azure/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 h1:nyQWyZvwGTvunIMxi1Y9uXkcyr+I7TeNrr/foo4Kpk8=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 h1:cf+OIKbkmMHBaC3u78AXomweqM0oxQSgBXRZf3WH4yM=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1/go.mod h1:ap1dmS6vQKJxSMNiGJcq4QuUQkOynyD93gLw6MDF7ek=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/reugn/go-streams v0.10.0 h1:Y0wHNihEbHsFOFV2/xTOKvud4ZpJPaRTET01fwx2/rQ=
github.com/reugn/go-streams v0.10.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
1 change: 1 addition & 0 deletions examples/azure/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/blob/blob
Loading

0 comments on commit 088dda0

Please sign in to comment.