Skip to content

Commit

Permalink
passing down the input context on awss3 input
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Feb 11, 2025
1 parent 97f4983 commit af58194
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 21 deletions.
13 changes: 0 additions & 13 deletions x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"

"github.com/elastic/beats/v7/libbeat/beat"
)

func (in *s3PollerInput) createS3API(ctx context.Context) (*awsS3API, error) {
Expand All @@ -31,17 +29,6 @@ func (in *s3PollerInput) createS3API(ctx context.Context) (*awsS3API, error) {
return newAWSs3API(s3Client), nil
}

func createPipelineClient(pipeline beat.Pipeline, acks *awsACKHandler) (beat.Client, error) {
return pipeline.ConnectWith(beat.ClientConfig{
EventListener: acks.pipelineEventListener(),
Processing: beat.ProcessingConfig{
// This input only produces events with basic types so normalization
// is not required.
EventNormalization: boolPtr(false),
},
})
}

func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName string) (string, error) {
// Skip region fetching if it's an Access Point ARN
if isValidAccessPointARN(bucketName) {
Expand Down
28 changes: 20 additions & 8 deletions x-pack/filebeat/input/awss3/s3_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,30 +88,32 @@ func (in *s3PollerInput) Run(
in.config.getFileSelectors(),
in.config.BackupConfig)

in.run(ctx)
in.run(inputContext)

return nil
}

func (in *s3PollerInput) run(ctx context.Context) {
func (in *s3PollerInput) run(ctx v2.Context) {
// Scan the bucket in a loop, delaying by the configured interval each
// iteration.
for ctx.Err() == nil {
goctx := v2.GoContextFromCanceler(ctx.Cancelation)
for goctx.Err() == nil {
in.runPoll(ctx)
_ = timed.Wait(ctx, in.config.BucketListInterval)
_ = timed.Wait(goctx, in.config.BucketListInterval)
}
}

func (in *s3PollerInput) runPoll(ctx context.Context) {
func (in *s3PollerInput) runPoll(inputCtx v2.Context) {
var workerWg sync.WaitGroup
workChan := make(chan state)
ctx := v2.GoContextFromCanceler(inputCtx.Cancelation)

// Start the worker goroutines to listen on the work channel
for i := 0; i < in.config.NumberOfWorkers; i++ {
workerWg.Add(1)
go func() {
defer workerWg.Done()
in.workerLoop(ctx, workChan)
in.workerLoop(inputCtx, workChan)
}()
}

Expand All @@ -131,10 +133,20 @@ func (in *s3PollerInput) runPoll(ctx context.Context) {
}
}

func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state) {
func (in *s3PollerInput) workerLoop(inputCtx v2.Context, workChan <-chan state) {
ctx := v2.GoContextFromCanceler(inputCtx.Cancelation)

acks := newAWSACKHandler()
// Create client for publishing events and receive notification of their ACKs.
client, err := createPipelineClient(in.pipeline, acks)
client, err := in.pipeline.ConnectWith(beat.ClientConfig{
InputID: inputCtx.ID,
EventListener: acks.pipelineEventListener(),
Processing: beat.ProcessingConfig{
// This input only produces events with basic types so normalization
// is not required.
EventNormalization: boolPtr(false),
},
})
if err != nil {
in.log.Errorf("failed to create pipeline client: %v", err.Error())
return
Expand Down

0 comments on commit af58194

Please sign in to comment.