-
Notifications
You must be signed in to change notification settings - Fork 163
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add support for Azure Blob Storage connector (#147)
- Loading branch information
Showing
10 changed files
with
462 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,243 @@ | ||
package azure | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io" | ||
"log/slog" | ||
"sync" | ||
|
||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" | ||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" | ||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" | ||
"github.com/reugn/go-streams" | ||
"github.com/reugn/go-streams/flow" | ||
) | ||
|
||
// BlobStorageSourceConfig represents the configuration options for the Azure | ||
// Blob storage source connector. | ||
type BlobStorageSourceConfig struct { | ||
// The name of the Azure Blob storage container to read from. | ||
Container string | ||
// The path within the container to use. If empty, the root of the container will be used. | ||
Prefix string | ||
// Indicates whether to ignore blob prefixes (virtual directories) in blob segments. | ||
Flat bool | ||
} | ||
|
||
// BlobStorageSource represents the Azure Blob storage source connector. | ||
type BlobStorageSource struct { | ||
client *azblob.Client | ||
containerClient *container.Client | ||
config *BlobStorageSourceConfig | ||
out chan any | ||
logger *slog.Logger | ||
} | ||
|
||
var _ streams.Source = (*BlobStorageSource)(nil) | ||
|
||
// NewBlobStorageSource returns a new [BlobStorageSource]. | ||
// The connector reads all objects within the configured path and transmits | ||
// them as a [BlobStorageObject] through the output channel. | ||
func NewBlobStorageSource(ctx context.Context, client *azblob.Client, | ||
config *BlobStorageSourceConfig, logger *slog.Logger) *BlobStorageSource { | ||
|
||
if logger == nil { | ||
logger = slog.Default() | ||
} | ||
logger = logger.With(slog.Group("connector", | ||
slog.String("name", "azure.blob"), | ||
slog.String("type", "source"))) | ||
|
||
blobSource := &BlobStorageSource{ | ||
client: client, | ||
containerClient: client.ServiceClient().NewContainerClient(config.Container), | ||
config: config, | ||
out: make(chan any), | ||
logger: logger, | ||
} | ||
|
||
// read objects and send them downstream | ||
go blobSource.listBlobs(ctx) | ||
|
||
return blobSource | ||
} | ||
|
||
func (s *BlobStorageSource) listBlobs(ctx context.Context) { | ||
s.listBlobsHierarchy(ctx, &s.config.Prefix, nil) | ||
|
||
s.logger.Info("Blob iteration completed") | ||
close(s.out) | ||
} | ||
|
||
func (s *BlobStorageSource) listBlobsHierarchy(ctx context.Context, prefix, marker *string) { | ||
pager := s.containerClient.NewListBlobsHierarchyPager("/", | ||
&container.ListBlobsHierarchyOptions{ | ||
Prefix: prefix, | ||
Marker: marker, | ||
}) | ||
|
||
for pager.More() { | ||
resp, err := pager.NextPage(ctx) | ||
if err != nil { | ||
s.logger.Error("Error reading next page", slog.Any("error", err)) | ||
break | ||
} | ||
|
||
// set the continuation token | ||
marker = resp.Marker | ||
|
||
if !s.config.Flat && resp.Segment.BlobPrefixes != nil { | ||
for _, prefix := range resp.Segment.BlobPrefixes { | ||
s.logger.Debug("Virtual directory", slog.String("prefix", *prefix.Name)) | ||
|
||
// recursively list blobs in the prefix | ||
s.listBlobsHierarchy(ctx, prefix.Name, nil) | ||
} | ||
} | ||
|
||
// list blobs in the current page | ||
for _, blob := range resp.Segment.BlobItems { | ||
resp, err := s.client.DownloadStream(ctx, s.config.Container, *blob.Name, nil) | ||
if err != nil { | ||
s.logger.Error("Error reading blob", slog.Any("error", err)) | ||
continue | ||
} | ||
|
||
select { | ||
// send the blob downstream | ||
case s.out <- &BlobStorageObject{ | ||
Key: *blob.Name, | ||
Data: resp.Body, | ||
}: | ||
case <-ctx.Done(): | ||
s.logger.Info("Blob reading terminated", slog.Any("error", ctx.Err())) | ||
return | ||
} | ||
} | ||
} | ||
|
||
// retrieve the remainder if the continuation token is available | ||
if marker != nil && *marker != "" { | ||
s.logger.Info("Continuation token is available", slog.String("marker", *marker)) | ||
s.listBlobsHierarchy(ctx, prefix, marker) | ||
} | ||
} | ||
|
||
// Via streams data to a specified operator and returns it. | ||
func (s *BlobStorageSource) Via(operator streams.Flow) streams.Flow { | ||
flow.DoStream(s, operator) | ||
return operator | ||
} | ||
|
||
// Out returns the output channel of the BlobStorageSource connector. | ||
func (s *BlobStorageSource) Out() <-chan any { | ||
return s.out | ||
} | ||
|
||
// BlobStorageObject contains details of the Azure Blob storage object. | ||
type BlobStorageObject struct { | ||
// Key is the object name including any subdirectories. | ||
// For example, "directory/file.json". | ||
Key string | ||
// Data is an [io.ReadCloser] representing the binary content of the blob object. | ||
Data io.ReadCloser | ||
} | ||
|
||
// BlobStorageSinkConfig represents the configuration options for the Azure Blob | ||
// storage sink connector. | ||
type BlobStorageSinkConfig struct { | ||
// The name of the Azure Blob storage container to write to. | ||
Container string | ||
// The number of concurrent workers to use when writing data to Azure Blob storage. | ||
// The default is 1. | ||
Parallelism int | ||
// UploadOptions specifies set of configurations for the blob upload operation. | ||
UploadOptions *blockblob.UploadStreamOptions | ||
} | ||
|
||
// BlobStorageSink represents the Azure Blob storage sink connector. | ||
type BlobStorageSink struct { | ||
client *azblob.Client | ||
config *BlobStorageSinkConfig | ||
in chan any | ||
logger *slog.Logger | ||
} | ||
|
||
var _ streams.Sink = (*BlobStorageSink)(nil) | ||
|
||
// NewBlobStorageSink returns a new [BlobStorageSink]. | ||
// Incoming elements are expected to be of the [BlobStorageObject] type. These will | ||
// be uploaded to the configured container using their key field as the path. | ||
func NewBlobStorageSink(ctx context.Context, client *azblob.Client, | ||
config *BlobStorageSinkConfig, logger *slog.Logger) *BlobStorageSink { | ||
|
||
if logger == nil { | ||
logger = slog.Default() | ||
} | ||
logger = logger.With(slog.Group("connector", | ||
slog.String("name", "azure.blob"), | ||
slog.String("type", "sink"))) | ||
|
||
if config.Parallelism < 1 { | ||
config.Parallelism = 1 | ||
} | ||
|
||
blobSink := &BlobStorageSink{ | ||
client: client, | ||
config: config, | ||
in: make(chan any, config.Parallelism), | ||
logger: logger, | ||
} | ||
|
||
// start writing incoming data | ||
go blobSink.uploadBlobs(ctx) | ||
|
||
return blobSink | ||
} | ||
|
||
// uploadBlobs writes incoming stream data elements to Azure Blob storage | ||
// using the configured parallelism. | ||
func (s *BlobStorageSink) uploadBlobs(ctx context.Context) { | ||
var wg sync.WaitGroup | ||
for i := 0; i < s.config.Parallelism; i++ { | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
for data := range s.in { | ||
var err error | ||
switch object := data.(type) { | ||
case BlobStorageObject: | ||
err = s.uploadBlob(ctx, &object) | ||
case *BlobStorageObject: | ||
err = s.uploadBlob(ctx, object) | ||
default: | ||
s.logger.Error("Unsupported data type", | ||
slog.String("type", fmt.Sprintf("%T", object))) | ||
} | ||
|
||
if err != nil { | ||
s.logger.Error("Error uploading blob", | ||
slog.Any("error", err)) | ||
} | ||
} | ||
}() | ||
} | ||
|
||
// wait for all writers to exit | ||
wg.Wait() | ||
s.logger.Info("All blob writers exited") | ||
} | ||
|
||
// uploadBlob uploads a single blob to Azure Blob storage. | ||
func (s *BlobStorageSink) uploadBlob(ctx context.Context, object *BlobStorageObject) error { | ||
defer object.Data.Close() | ||
_, err := s.client.UploadStream(ctx, s.config.Container, object.Key, object.Data, | ||
s.config.UploadOptions) | ||
return err | ||
} | ||
|
||
// In returns the input channel of the BlobStorageSink connector. | ||
func (s *BlobStorageSink) In() chan<- any { | ||
return s.in | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
// Package azure implements streaming connectors for Microsoft Azure services. | ||
package azure |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
module github.com/reugn/go-streams/azure | ||
|
||
go 1.21.0 | ||
|
||
require ( | ||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 | ||
github.com/reugn/go-streams v0.10.0 | ||
) | ||
|
||
require ( | ||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect | ||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect | ||
golang.org/x/net v0.27.0 // indirect | ||
golang.org/x/text v0.16.0 // indirect | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 h1:nyQWyZvwGTvunIMxi1Y9uXkcyr+I7TeNrr/foo4Kpk8= | ||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0= | ||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc= | ||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg= | ||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= | ||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= | ||
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c= | ||
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc= | ||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 h1:cf+OIKbkmMHBaC3u78AXomweqM0oxQSgBXRZf3WH4yM= | ||
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1/go.mod h1:ap1dmS6vQKJxSMNiGJcq4QuUQkOynyD93gLw6MDF7ek= | ||
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= | ||
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= | ||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | ||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= | ||
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= | ||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= | ||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= | ||
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= | ||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= | ||
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= | ||
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= | ||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||
github.com/reugn/go-streams v0.10.0 h1:Y0wHNihEbHsFOFV2/xTOKvud4ZpJPaRTET01fwx2/rQ= | ||
github.com/reugn/go-streams v0.10.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg= | ||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= | ||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= | ||
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= | ||
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= | ||
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= | ||
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= | ||
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= | ||
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= | ||
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= | ||
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= | ||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
/blob/blob |
Oops, something went wrong.