diff --git a/README.md b/README.md index 07e9d86..f6e90b6 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ Supported Connectors: * [Apache Kafka](https://kafka.apache.org/) * [Apache Pulsar](https://pulsar.apache.org/) * AWS ([S3](https://aws.amazon.com/s3/)) +* Azure ([Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs/)) * GCP ([Storage](https://cloud.google.com/storage/)) * [NATS](https://nats.io/) * [Redis](https://redis.io/) diff --git a/azure/blob_storage.go b/azure/blob_storage.go new file mode 100644 index 0000000..3735ffa --- /dev/null +++ b/azure/blob_storage.go @@ -0,0 +1,243 @@ +package azure + +import ( + "context" + "fmt" + "io" + "log/slog" + "sync" + + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/reugn/go-streams" + "github.com/reugn/go-streams/flow" +) + +// BlobStorageSourceConfig represents the configuration options for the Azure +// Blob storage source connector. +type BlobStorageSourceConfig struct { + // The name of the Azure Blob storage container to read from. + Container string + // The path within the container to use. If empty, the root of the container will be used. + Prefix string + // Indicates whether to ignore blob prefixes (virtual directories) in blob segments. + Flat bool +} + +// BlobStorageSource represents the Azure Blob storage source connector. +type BlobStorageSource struct { + client *azblob.Client + containerClient *container.Client + config *BlobStorageSourceConfig + out chan any + logger *slog.Logger +} + +var _ streams.Source = (*BlobStorageSource)(nil) + +// NewBlobStorageSource returns a new [BlobStorageSource]. +// The connector reads all objects within the configured path and transmits +// them as a [BlobStorageObject] through the output channel. +func NewBlobStorageSource(ctx context.Context, client *azblob.Client, + config *BlobStorageSourceConfig, logger *slog.Logger) *BlobStorageSource { + + if logger == nil { + logger = slog.Default() + } + logger = logger.With(slog.Group("connector", + slog.String("name", "azure.blob"), + slog.String("type", "source"))) + + blobSource := &BlobStorageSource{ + client: client, + containerClient: client.ServiceClient().NewContainerClient(config.Container), + config: config, + out: make(chan any), + logger: logger, + } + + // read objects and send them downstream + go blobSource.listBlobs(ctx) + + return blobSource +} + +func (s *BlobStorageSource) listBlobs(ctx context.Context) { + s.listBlobsHierarchy(ctx, &s.config.Prefix, nil) + + s.logger.Info("Blob iteration completed") + close(s.out) +} + +func (s *BlobStorageSource) listBlobsHierarchy(ctx context.Context, prefix, marker *string) { + pager := s.containerClient.NewListBlobsHierarchyPager("/", + &container.ListBlobsHierarchyOptions{ + Prefix: prefix, + Marker: marker, + }) + + for pager.More() { + resp, err := pager.NextPage(ctx) + if err != nil { + s.logger.Error("Error reading next page", slog.Any("error", err)) + break + } + + // set the continuation token + marker = resp.Marker + + if !s.config.Flat && resp.Segment.BlobPrefixes != nil { + for _, prefix := range resp.Segment.BlobPrefixes { + s.logger.Debug("Virtual directory", slog.String("prefix", *prefix.Name)) + + // recursively list blobs in the prefix + s.listBlobsHierarchy(ctx, prefix.Name, nil) + } + } + + // list blobs in the current page + for _, blob := range resp.Segment.BlobItems { + resp, err := s.client.DownloadStream(ctx, s.config.Container, *blob.Name, nil) + if err != nil { + s.logger.Error("Error reading blob", slog.Any("error", err)) + continue + } + + select { + // send the blob downstream + case s.out <- &BlobStorageObject{ + Key: *blob.Name, + Data: resp.Body, + }: + case <-ctx.Done(): + s.logger.Info("Blob reading terminated", slog.Any("error", ctx.Err())) + return + } + } + } + + // retrieve the remainder if the continuation token is available + if marker != nil && *marker != "" { + s.logger.Info("Continuation token is available", slog.String("marker", *marker)) + s.listBlobsHierarchy(ctx, prefix, marker) + } +} + +// Via streams data to a specified operator and returns it. +func (s *BlobStorageSource) Via(operator streams.Flow) streams.Flow { + flow.DoStream(s, operator) + return operator +} + +// Out returns the output channel of the BlobStorageSource connector. +func (s *BlobStorageSource) Out() <-chan any { + return s.out +} + +// BlobStorageObject contains details of the Azure Blob storage object. +type BlobStorageObject struct { + // Key is the object name including any subdirectories. + // For example, "directory/file.json". + Key string + // Data is an [io.ReadCloser] representing the binary content of the blob object. + Data io.ReadCloser +} + +// BlobStorageSinkConfig represents the configuration options for the Azure Blob +// storage sink connector. +type BlobStorageSinkConfig struct { + // The name of the Azure Blob storage container to write to. + Container string + // The number of concurrent workers to use when writing data to Azure Blob storage. + // The default is 1. + Parallelism int + // UploadOptions specifies set of configurations for the blob upload operation. + UploadOptions *blockblob.UploadStreamOptions +} + +// BlobStorageSink represents the Azure Blob storage sink connector. +type BlobStorageSink struct { + client *azblob.Client + config *BlobStorageSinkConfig + in chan any + logger *slog.Logger +} + +var _ streams.Sink = (*BlobStorageSink)(nil) + +// NewBlobStorageSink returns a new [BlobStorageSink]. +// Incoming elements are expected to be of the [BlobStorageObject] type. These will +// be uploaded to the configured container using their key field as the path. +func NewBlobStorageSink(ctx context.Context, client *azblob.Client, + config *BlobStorageSinkConfig, logger *slog.Logger) *BlobStorageSink { + + if logger == nil { + logger = slog.Default() + } + logger = logger.With(slog.Group("connector", + slog.String("name", "azure.blob"), + slog.String("type", "sink"))) + + if config.Parallelism < 1 { + config.Parallelism = 1 + } + + blobSink := &BlobStorageSink{ + client: client, + config: config, + in: make(chan any, config.Parallelism), + logger: logger, + } + + // start writing incoming data + go blobSink.uploadBlobs(ctx) + + return blobSink +} + +// uploadBlobs writes incoming stream data elements to Azure Blob storage +// using the configured parallelism. +func (s *BlobStorageSink) uploadBlobs(ctx context.Context) { + var wg sync.WaitGroup + for i := 0; i < s.config.Parallelism; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for data := range s.in { + var err error + switch object := data.(type) { + case BlobStorageObject: + err = s.uploadBlob(ctx, &object) + case *BlobStorageObject: + err = s.uploadBlob(ctx, object) + default: + s.logger.Error("Unsupported data type", + slog.String("type", fmt.Sprintf("%T", object))) + } + + if err != nil { + s.logger.Error("Error uploading blob", + slog.Any("error", err)) + } + } + }() + } + + // wait for all writers to exit + wg.Wait() + s.logger.Info("All blob writers exited") +} + +// uploadBlob uploads a single blob to Azure Blob storage. +func (s *BlobStorageSink) uploadBlob(ctx context.Context, object *BlobStorageObject) error { + defer object.Data.Close() + _, err := s.client.UploadStream(ctx, s.config.Container, object.Key, object.Data, + s.config.UploadOptions) + return err +} + +// In returns the input channel of the BlobStorageSink connector. +func (s *BlobStorageSink) In() chan<- any { + return s.in +} diff --git a/azure/doc.go b/azure/doc.go new file mode 100644 index 0000000..2079391 --- /dev/null +++ b/azure/doc.go @@ -0,0 +1,2 @@ +// Package azure implements streaming connectors for Microsoft Azure services. +package azure diff --git a/azure/go.mod b/azure/go.mod new file mode 100644 index 0000000..907fdc3 --- /dev/null +++ b/azure/go.mod @@ -0,0 +1,15 @@ +module github.com/reugn/go-streams/azure + +go 1.21.0 + +require ( + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 + github.com/reugn/go-streams v0.10.0 +) + +require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/text v0.16.0 // indirect +) diff --git a/azure/go.sum b/azure/go.sum new file mode 100644 index 0000000..46e9f0a --- /dev/null +++ b/azure/go.sum @@ -0,0 +1,38 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 h1:nyQWyZvwGTvunIMxi1Y9uXkcyr+I7TeNrr/foo4Kpk8= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 h1:cf+OIKbkmMHBaC3u78AXomweqM0oxQSgBXRZf3WH4yM= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1/go.mod h1:ap1dmS6vQKJxSMNiGJcq4QuUQkOynyD93gLw6MDF7ek= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/reugn/go-streams v0.10.0 h1:Y0wHNihEbHsFOFV2/xTOKvud4ZpJPaRTET01fwx2/rQ= +github.com/reugn/go-streams v0.10.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= +golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/azure/.gitignore b/examples/azure/.gitignore new file mode 100644 index 0000000..73ba4e3 --- /dev/null +++ b/examples/azure/.gitignore @@ -0,0 +1 @@ +/blob/blob \ No newline at end of file diff --git a/examples/azure/blob/main.go b/examples/azure/blob/main.go new file mode 100644 index 0000000..8a6e7fa --- /dev/null +++ b/examples/azure/blob/main.go @@ -0,0 +1,135 @@ +package main + +import ( + "context" + "fmt" + "io" + "log" + "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/reugn/go-streams" + "github.com/reugn/go-streams/azure" + "github.com/reugn/go-streams/extension" + "github.com/reugn/go-streams/flow" +) + +const ( + azuriteAccountName = "devstoreaccount1" + azuriteAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + + testServiceAddress = "http://127.0.0.1:10000/devstoreaccount1" + testContainerName = "container1" +) + +// Use the Azurite emulator for local Azure Blob storage environment. +// +// docker run -p 10000:10000 mcr.microsoft.com/azure-storage/azurite \ +// azurite-blob --blobHost 0.0.0.0 --blobPort 10000 +func main() { + ctx := context.Background() + client := newBlobClient() + + if _, err := client.CreateContainer(ctx, testContainerName, nil); err != nil { + log.Print(err) + } + + // load blobs from a channel source + newChanSource(). + Via(toBlobStorageFlow()). + To(newBlobSink(ctx, client)) + + awaitFlowCompletion() + + // read blob data to stdout + newBlobSource(ctx, client). + Via(readBlobStorageFlow()). + To(extension.NewStdoutSink()) + + awaitFlowCompletion() + + // clean up the container + cleanUp(ctx, client) +} + +func newBlobClient() *azblob.Client { + cred, err := azblob.NewSharedKeyCredential(azuriteAccountName, azuriteAccountKey) + if err != nil { + log.Fatal(err) + } + client, err := azblob.NewClientWithSharedKeyCredential(testServiceAddress, cred, nil) + if err != nil { + log.Fatal(err) + } + return client +} + +func newBlobSource(ctx context.Context, client *azblob.Client) streams.Source { + return azure.NewBlobStorageSource(ctx, client, &azure.BlobStorageSourceConfig{ + Container: testContainerName, + }, nil) +} + +func newBlobSink(ctx context.Context, client *azblob.Client) streams.Sink { + return azure.NewBlobStorageSink(ctx, client, &azure.BlobStorageSinkConfig{ + Container: testContainerName, + }, nil) +} + +func newChanSource() streams.Source { + data := []string{"aa", "b", "c"} + inputCh := make(chan any, 3) + for _, in := range data { + inputCh <- in + } + close(inputCh) + return extension.NewChanSource(inputCh) +} + +func toBlobStorageFlow() streams.Flow { + return flow.NewMap(func(data string) *azure.BlobStorageObject { + return &azure.BlobStorageObject{ + Key: getKey(data), + Data: io.NopCloser(strings.NewReader(data)), + } + }, 1) +} + +func getKey(value string) string { + if len(value) > 1 { + // add a subfolder + return fmt.Sprintf("%s/%s.txt", value, value) + } + return fmt.Sprintf("%s.txt", value) +} + +func readBlobStorageFlow() streams.Flow { + return flow.NewMap(func(blob *azure.BlobStorageObject) string { + defer blob.Data.Close() + data, err := io.ReadAll(blob.Data) + if err != nil { + log.Fatal(err) + } + return strings.ToUpper(string(data)) + }, 1) +} + +func cleanUp(ctx context.Context, client *azblob.Client) { + log.Print("Deleting blobs...") + + newBlobSource(ctx, client). + Via(flow.NewMap(func(blob *azure.BlobStorageObject) string { + if _, err := client.DeleteBlob(ctx, testContainerName, blob.Key, nil); err != nil { + log.Fatal(err) + } + return blob.Key + }, 1)). + To(extension.NewStdoutSink()) + + awaitFlowCompletion() +} + +func awaitFlowCompletion() { + time.Sleep(500 * time.Millisecond) +} diff --git a/examples/go.mod b/examples/go.mod index 3b4d9aa..92124f1 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -4,6 +4,7 @@ go 1.21.0 require ( cloud.google.com/go/storage v1.43.0 + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 github.com/IBM/sarama v1.43.3 github.com/aerospike/aerospike-client-go/v7 v7.6.1 github.com/apache/pulsar-client-go v0.13.1 @@ -17,6 +18,7 @@ require ( github.com/reugn/go-streams v0.10.0 github.com/reugn/go-streams/aerospike v0.0.0 github.com/reugn/go-streams/aws v0.0.0 + github.com/reugn/go-streams/azure v0.0.0 github.com/reugn/go-streams/gcp v0.0.0 github.com/reugn/go-streams/kafka v0.0.0 github.com/reugn/go-streams/nats v0.0.0 @@ -35,6 +37,8 @@ require ( github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/99designs/keyring v1.2.2 // indirect github.com/AthenZ/athenz v1.11.63 // indirect + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect github.com/DataDog/zstd v1.5.6 // indirect github.com/ardielle/ardielle-go v1.5.2 // indirect github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect @@ -150,6 +154,7 @@ require ( replace ( github.com/reugn/go-streams/aerospike => ../aerospike github.com/reugn/go-streams/aws => ../aws + github.com/reugn/go-streams/azure => ../azure github.com/reugn/go-streams/gcp => ../gcp github.com/reugn/go-streams/kafka => ../kafka github.com/reugn/go-streams/nats => ../nats diff --git a/examples/go.sum b/examples/go.sum index 25a60ba..5f2d2f2 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -21,8 +21,20 @@ github.com/99designs/keyring v1.2.2 h1:pZd3neh/EmUzWONb35LxQfvuY7kiSXAq3HQd97+XB github.com/99designs/keyring v1.2.2/go.mod h1:wes/FrByc8j7lFOAGLGSNEg8f/PaI3cgTBqhFkHUrPk= github.com/AthenZ/athenz v1.11.63 h1:jVJFJRMsZg2tIF62Ax4E5GAAxrSouLY00wO4q201wew= github.com/AthenZ/athenz v1.11.63/go.mod h1:5rBew3PH1ZUV/FegFMhe/vgEbn9lgOF56gWXq3wxVqs= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 h1:nyQWyZvwGTvunIMxi1Y9uXkcyr+I7TeNrr/foo4Kpk8= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 h1:cf+OIKbkmMHBaC3u78AXomweqM0oxQSgBXRZf3WH4yM= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1/go.mod h1:ap1dmS6vQKJxSMNiGJcq4QuUQkOynyD93gLw6MDF7ek= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/zstd v1.5.6 h1:LbEglqepa/ipmmQJUDnSsfvA8e8IStVcGaFWDuxvGOY= @@ -258,6 +270,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -328,6 +342,8 @@ github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9F github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -354,8 +370,8 @@ github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0 github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/reugn/go-streams v0.10.0 h1:Y0wHNihEbHsFOFV2/xTOKvud4ZpJPaRTET01fwx2/rQ= github.com/reugn/go-streams v0.10.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/shirou/gopsutil/v3 v3.23.12 h1:z90NtUkp3bMtmICZKpC4+WaknU1eXtp5vtbQ11DgpE4= github.com/shirou/gopsutil/v3 v3.23.12/go.mod h1:1FrWgea594Jp7qmjHUUPlJDTPgcsb9mGnXDxavtikzM= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= diff --git a/examples/nats/main.go b/examples/nats/main.go index c4bed72..5435b03 100644 --- a/examples/nats/main.go +++ b/examples/nats/main.go @@ -105,7 +105,7 @@ func jetStream() { } fetchJetMsgMapFlow := flow.NewMap(fetchJetMsg, 1) - stdOutSInk := extension.NewStdoutSink() + stdOutSink := extension.NewStdoutSink() fileSource. Via(toUpperMapFlow). @@ -113,7 +113,7 @@ func jetStream() { jetSource. Via(fetchJetMsgMapFlow). - To(stdOutSInk) + To(stdOutSink) } // docker run --rm --name nats-streaming -p 4223:4223 -p 8223:8223 nats-streaming -p 4223 -m 8223 @@ -139,7 +139,7 @@ func streaming() { streamingSource := ext.NewStreamingSource(ctx, subConn, stan.StartWithLastReceived(), []string{"topic1"}, nil) fetchStanMsgMapFlow := flow.NewMap(fetchStanMsg, 1) - stdOutSInk := extension.NewStdoutSink() + stdOutSink := extension.NewStdoutSink() fileSource. Via(toUpperMapFlow). @@ -147,7 +147,7 @@ func streaming() { streamingSource. Via(fetchStanMsgMapFlow). - To(stdOutSInk) + To(stdOutSink) } var toUpperString = func(msg string) []byte {