Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][exporter/googlecloudreceiver] Fix goroutines leak #37311

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .chloggen/fix-goleak-gcppubreceiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
change_type: bug_fix
component: googlecloudpubsubreceiver
note: Fix a goroutine leak during shutdown.
issues: [30438]
subtext: |
A goroutine leak was found in the googlecloudpubsubreceiver.
The goroutine leak was caused by the receiver not closing the underlying created gRPC client when using an insecure custom endpoint.
change_logs: []
53 changes: 53 additions & 0 deletions receiver/googlecloudpubsubreceiver/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion receiver/googlecloudpubsubreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
cloud.google.com/go/logging v1.13.0
cloud.google.com/go/pubsub v1.45.3
github.com/google/go-cmp v0.6.0
github.com/googleapis/gax-go/v2 v2.14.1
github.com/iancoleman/strcase v0.3.0
github.com/json-iterator/go v1.1.12
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -46,7 +47,6 @@ require (
github.com/google/s2a-go v0.1.8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand Down
5 changes: 2 additions & 3 deletions receiver/googlecloudpubsubreceiver/internal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sync/atomic"
"time"

pubsub "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand All @@ -27,7 +26,7 @@ type StreamHandler struct {
pushMessage func(ctx context.Context, message *pubsubpb.ReceivedMessage) error
acks []string
mutex sync.Mutex
client *pubsub.SubscriberClient
client SubscriberClient

clientID string
subscription string
Expand All @@ -53,7 +52,7 @@ func (handler *StreamHandler) ack(ackID string) {
func NewHandler(
ctx context.Context,
logger *zap.Logger,
client *pubsub.SubscriberClient,
client SubscriberClient,
clientID string,
subscription string,
callback func(ctx context.Context, message *pubsubpb.ReceivedMessage) error,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"

import (
"context"

"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"github.com/googleapis/gax-go/v2"
)

// subscriberClient subset of `pubsub.PublisherClient`
type SubscriberClient interface {
Close() error
StreamingPull(ctx context.Context, opts ...gax.CallOption) (pubsubpb.Subscriber_StreamingPullClient, error)
}
2 changes: 0 additions & 2 deletions receiver/googlecloudpubsubreceiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ tests:
timeout: 20s
subscription: projects/my-project/subscriptions/otlp-subscription
skip_lifecycle: true
skip_shutdown: true
goleak:
skip: false
ignore:
# See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information.
top: go.opencensus.io/stats/view.(*worker).start
Expand Down
53 changes: 12 additions & 41 deletions receiver/googlecloudpubsubreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"sync"
"time"

pubsub "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -24,11 +23,6 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"
)
Expand All @@ -42,7 +36,7 @@ type pubsubReceiver struct {
logsConsumer consumer.Logs
userAgent string
config *Config
client *pubsub.SubscriberClient
client internal.SubscriberClient
tracesUnmarshaler ptrace.Unmarshaler
metricsUnmarshaler pmetric.Unmarshaler
logsUnmarshaler plog.Unmarshaler
Expand All @@ -68,34 +62,14 @@ const (
gZip = iota
)

func (receiver *pubsubReceiver) generateClientOptions() (copts []option.ClientOption) {
if receiver.userAgent != "" {
copts = append(copts, option.WithUserAgent(receiver.userAgent))
}
if receiver.config.Endpoint != "" {
if receiver.config.Insecure {
var dialOpts []grpc.DialOption
if receiver.userAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(receiver.userAgent))
}
conn, _ := grpc.NewClient(receiver.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
copts = append(copts, option.WithGRPCConn(conn))
} else {
copts = append(copts, option.WithEndpoint(receiver.config.Endpoint))
}
}
return copts
}

func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) error {
if receiver.tracesConsumer == nil && receiver.metricsConsumer == nil && receiver.logsConsumer == nil {
return errors.New("cannot start receiver: no consumers were specified")
}

var startErr error
receiver.startOnce.Do(func() {
copts := receiver.generateClientOptions()
client, err := pubsub.NewSubscriberClient(ctx, copts...)
client, err := newSubscriberClient(ctx, receiver.config, receiver.userAgent)
if err != nil {
startErr = fmt.Errorf("failed creating the gRPC client to Pubsub: %w", err)
return
Expand All @@ -115,21 +89,18 @@ func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) err
}

func (receiver *pubsubReceiver) Shutdown(_ context.Context) error {
var err error
if receiver.client != nil {
// A canceled code means the client connection is already closed,
// Shutdown shouldn't return an error in that case.
if closeErr := receiver.client.Close(); status.Code(closeErr) != codes.Canceled {
err = closeErr
}
if receiver.handler != nil {
receiver.logger.Info("Stopping Google Pubsub receiver")
receiver.handler.CancelNow()
receiver.logger.Info("Stopped Google Pubsub receiver")
receiver.handler = nil
}
if receiver.handler == nil {
return err
if receiver.client == nil {
return nil
}
receiver.logger.Info("Stopping Google Pubsub receiver")
receiver.handler.CancelNow()
receiver.logger.Info("Stopped Google Pubsub receiver")
return err
client := receiver.client
receiver.client = nil
return client.Close()
}

func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, message *pubsubpb.ReceivedMessage) error {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type": "service_account",
"private_key_id": "abc",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDY3E8o1NEFcjMM\nHW/5ZfFJw29/8NEqpViNjQIx95Xx5KDtJ+nWn9+OW0uqsSqKlKGhAdAo+Q6bjx2c\nuXVsXTu7XrZUY5Kltvj94DvUa1wjNXs606r/RxWTJ58bfdC+gLLxBfGnB6CwK0YQ\nxnfpjNbkUfVVzO0MQD7UP0Hl5ZcY0Puvxd/yHuONQn/rIAieTHH1pqgW+zrH/y3c\n59IGThC9PPtugI9ea8RSnVj3PWz1bX2UkCDpy9IRh9LzJLaYYX9RUd7++dULUlat\nAaXBh1U6emUDzhrIsgApjDVtimOPbmQWmX1S60mqQikRpVYZ8u+NDD+LNw+/Eovn\nxCj2Y3z1AgMBAAECggEAWDBzoqO1IvVXjBA2lqId10T6hXmN3j1ifyH+aAqK+FVl\nGjyWjDj0xWQcJ9ync7bQ6fSeTeNGzP0M6kzDU1+w6FgyZqwdmXWI2VmEizRjwk+/\n/uLQUcL7I55Dxn7KUoZs/rZPmQDxmGLoue60Gg6z3yLzVcKiDc7cnhzhdBgDc8vd\nQorNAlqGPRnm3EqKQ6VQp6fyQmCAxrr45kspRXNLddat3AMsuqImDkqGKBmF3Q1y\nxWGe81LphUiRqvqbyUlh6cdSZ8pLBpc9m0c3qWPKs9paqBIvgUPlvOZMqec6x4S6\nChbdkkTRLnbsRr0Yg/nDeEPlkhRBhasXpxpMUBgPywKBgQDs2axNkFjbU94uXvd5\nznUhDVxPFBuxyUHtsJNqW4p/ujLNimGet5E/YthCnQeC2P3Ym7c3fiz68amM6hiA\nOnW7HYPZ+jKFnefpAtjyOOs46AkftEg07T9XjwWNPt8+8l0DYawPoJgbM5iE0L2O\nx8TU1Vs4mXc+ql9F90GzI0x3VwKBgQDqZOOqWw3hTnNT07Ixqnmd3dugV9S7eW6o\nU9OoUgJB4rYTpG+yFqNqbRT8bkx37iKBMEReppqonOqGm4wtuRR6LSLlgcIU9Iwx\nyfH12UWqVmFSHsgZFqM/cK3wGev38h1WBIOx3/djKn7BdlKVh8kWyx6uC8bmV+E6\nOoK0vJD6kwKBgHAySOnROBZlqzkiKW8c+uU2VATtzJSydrWm0J4wUPJifNBa/hVW\ndcqmAzXC9xznt5AVa3wxHBOfyKaE+ig8CSsjNyNZ3vbmr0X04FoV1m91k2TeXNod\njMTobkPThaNm4eLJMN2SQJuaHGTGERWC0l3T18t+/zrDMDCPiSLX1NAvAoGBAN1T\nVLJYdjvIMxf1bm59VYcepbK7HLHFkRq6xMJMZbtG0ryraZjUzYvB4q4VjHk2UDiC\nlhx13tXWDZH7MJtABzjyg+AI7XWSEQs2cBXACos0M4Myc6lU+eL+iA+OuoUOhmrh\nqmT8YYGu76/IBWUSqWuvcpHPpwl7871i4Ga/I3qnAoGBANNkKAcMoeAbJQK7a/Rn\nwPEJB+dPgNDIaboAsh1nZhVhN5cvdvCWuEYgOGCPQLYQF0zmTLcM+sVxOYgfy8mV\nfbNgPgsP5xmu6dw2COBKdtozw0HrWSRjACd1N4yGu75+wPCcX/gQarcjRcXXZeEa\nNtBLSfcqPULqD+h7br9lEJio\n-----END PRIVATE KEY-----\n",
"client_email": "[email protected]",
"client_id": "123-abc.apps.googleusercontent.com",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "http://localhost:8080/token"
}
76 changes: 76 additions & 0 deletions receiver/googlecloudpubsubreceiver/wrapped_subscriber_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package googlecloudpubsubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver"

import (
"context"
"fmt"

pubsub "cloud.google.com/go/pubsub/apiv1"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal"
)

// wrappedSubscriberClient allows to override the close function
type wrappedSubscriberClient struct {
internal.SubscriberClient
closeFn func() error
}

func (c *wrappedSubscriberClient) Close() error {
if c.closeFn != nil {
return c.closeFn()
}
return c.SubscriberClient.Close()
}

func newSubscriberClient(ctx context.Context, config *Config, userAgent string) (internal.SubscriberClient, error) {
clientOptions, closeFn, err := generateClientOptions(config, userAgent)
if err != nil {
return nil, fmt.Errorf("failed preparing the gRPC client options to PubSub: %w", err)
}

client, err := pubsub.NewSubscriberClient(ctx, clientOptions...)
if err != nil {
return nil, fmt.Errorf("failed creating the gRPC client to PubSub: %w", err)
}

if closeFn == nil {
return client, nil
}

return &wrappedSubscriberClient{
SubscriberClient: client,
closeFn: closeFn,
}, nil
}

func generateClientOptions(config *Config, userAgent string) ([]option.ClientOption, func() error, error) {
var copts []option.ClientOption
var closeFn func() error

if userAgent != "" {
copts = append(copts, option.WithUserAgent(userAgent))
}
if config.Endpoint != "" {
if config.Insecure {
var dialOpts []grpc.DialOption
if userAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(userAgent))
}
client, err := grpc.NewClient(config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...)
if err != nil {
return nil, nil, err
}
copts = append(copts, option.WithGRPCConn(client))
closeFn = client.Close // we need to be able to properly close the grpc client otherwise it'll leak goroutines
} else {
copts = append(copts, option.WithEndpoint(config.Endpoint))
}
}
return copts, closeFn, nil
}
Loading
Loading