diff --git a/storage/internal/benchmarks/client_pool.go b/storage/internal/benchmarks/client_pool.go index 651ee1c23e73..648fe3f01b79 100644 --- a/storage/internal/benchmarks/client_pool.go +++ b/storage/internal/benchmarks/client_pool.go @@ -22,6 +22,7 @@ import ( "time" "cloud.google.com/go/storage" + "cloud.google.com/go/storage/experimental" "golang.org/x/net/http2" "google.golang.org/api/option" htransport "google.golang.org/api/transport/http" @@ -137,6 +138,7 @@ func initializeClientPools(ctx context.Context, opts *benchmarkOptions) func() { readBufferSize: opts.readBufferSize, connectionPoolSize: opts.connPoolSize, endpoint: opts.endpoint, + useGRPCBidiReads: opts.gRPCBidiReads, }) }, opts.numClients, @@ -179,6 +181,7 @@ type clientConfig struct { useJSON bool // only applicable to HTTP Clients setGCSFuseOpts bool // only applicable to HTTP Clients connectionPoolSize int // only applicable to GRPC Clients + useGRPCBidiReads bool // only applicable to GRPC Clients } func initializeHTTPClient(ctx context.Context, config clientConfig) (*storage.Client, error) { @@ -247,6 +250,9 @@ func initializeGRPCClient(ctx context.Context, config clientConfig) (*storage.Cl if config.readBufferSize != useDefault { opts = append(opts, option.WithGRPCDialOption(grpc.WithReadBufferSize(config.readBufferSize))) } + if config.useGRPCBidiReads { + opts = append(opts, experimental.WithGRPCBidiReads()) + } client, err := storage.NewGRPCClient(ctx, opts...) diff --git a/storage/internal/benchmarks/main.go b/storage/internal/benchmarks/main.go index 39b2a76418b3..2fff4d90d7f6 100644 --- a/storage/internal/benchmarks/main.go +++ b/storage/internal/benchmarks/main.go @@ -82,7 +82,8 @@ type benchmarkOptions struct { minChunkSize int64 maxChunkSize int64 - appendWrites bool + appendWrites bool + gRPCBidiReads bool forceGC bool connPoolSize int @@ -145,6 +146,7 @@ func (b *benchmarkOptions) String() string { fmt.Sprintf("range offset:\t\t%d - %d bytes ", b.minReadOffset, b.maxReadOffset), fmt.Sprintf("range size:\t\t%d bytes (0 -> full object)", b.rangeSize), fmt.Sprintf("append writes:\t\t%t", b.appendWrites), + fmt.Sprintf("gRPC bidi reads:\t%t", b.gRPCBidiReads), fmt.Sprintf("connection pool size:\t%d (GRPC)", b.connPoolSize), fmt.Sprintf("num workers:\t\t%d (max number of concurrent benchmark runs at a time)", b.numWorkers), fmt.Sprintf("force garbage collection:%t", b.forceGC), @@ -189,6 +191,7 @@ func parseFlags() { flag.Int64Var(&opts.maxChunkSize, "max_chunksize", useDefault, "max chunksize in bytes") flag.BoolVar(&opts.appendWrites, "append_writes", false, "use the append writer") + flag.BoolVar(&opts.gRPCBidiReads, "grpc_bidi_reads", false, "use BidiReadObject for gRPC reads") flag.IntVar(&opts.connPoolSize, "connection_pool_size", 4, "GRPC connection pool size") @@ -233,8 +236,14 @@ func parseFlags() { } } - if opts.appendWrites && (opts.api != grpcAPI && opts.api != directPath) { - log.Fatalf("--append_writes requires GRPC or DirectPath; got %v", opts.api) + if opts.api != grpcAPI && opts.api != directPath { + if opts.appendWrites { + log.Fatalf("--append_writes requires GRPC or DirectPath; got %v", opts.api) + } + + if opts.gRPCBidiReads { + log.Fatalf("--grpc_bidi_reads requires GRPC or DirectPath; got %v", opts.api) + } } }