From b6130e9250aef4fa99a0efc468d51db4238f034e Mon Sep 17 00:00:00 2001 From: Hongxiang Jiang Date: Wed, 16 Sep 2020 21:12:41 +0000 Subject: [PATCH 1/2] response path implementation 1. Change decoder filter to stream filter with functionality 1a. store the ack id 1b. if response is 200, ack that id 1c. else not ack 2. Change extension to receive Ack message instead of empty 2a. Ack message has one field string ack_id 2b. If gRPC success, function will return StatusOr with ok and the value is the ack_id 2c. Else, function will return StatusOr with not ok. --- api/BUILD | 1 + .../filters/http/gcp_events_convert/v3/BUILD | 8 +-- .../v3/gcp_events_convert.proto | 6 +- .../pubsub/v3alpha/received_message.proto | 7 +- api/versioning/BUILD | 1 - .../filters/http/gcp_events_convert/v3/BUILD | 8 +-- .../v3/gcp_events_convert.proto | 6 +- .../pubsub/v3alpha/received_message.proto | 7 +- .../filters/http/gcp_events_convert/BUILD | 1 + .../filters/http/gcp_events_convert/config.cc | 2 +- .../gcp_events_convert_filter.cc | 66 +++++++++++++++++++ .../gcp_events_convert_filter.h | 18 ++++- source/extensions/grpc_stream_demuxer/BUILD | 1 + .../grpc_stream_demuxer.cc | 17 +++-- .../grpc_stream_demuxer/grpc_stream_demuxer.h | 4 +- .../gcp_events_convert_filter_test.cc | 27 ++++++++ 16 files changed, 143 insertions(+), 37 deletions(-) diff --git a/api/BUILD b/api/BUILD index edb67420da3a..77bca5470b8a 100644 --- a/api/BUILD +++ b/api/BUILD @@ -257,6 +257,7 @@ proto_library( "//envoy/service/listener/v3:pkg", "//envoy/service/load_stats/v3:pkg", "//envoy/service/metrics/v3:pkg", + "//envoy/service/pubsub/v3alpha:pkg", "//envoy/service/ratelimit/v3:pkg", "//envoy/service/route/v3:pkg", "//envoy/service/runtime/v3:pkg", diff --git a/api/envoy/extensions/filters/http/gcp_events_convert/v3/BUILD b/api/envoy/extensions/filters/http/gcp_events_convert/v3/BUILD index f139cce54af2..ee92fb652582 100644 --- a/api/envoy/extensions/filters/http/gcp_events_convert/v3/BUILD +++ b/api/envoy/extensions/filters/http/gcp_events_convert/v3/BUILD @@ -1,13 +1,9 @@ -# DO NOT EDIT. This file is generated by tools/proto_sync.py. +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") licenses(["notice"]) # Apache 2 api_proto_package( - deps = [ - "//envoy/config/core/v3:pkg", - "//envoy/type/v3:pkg", - "@com_github_cncf_udpa//udpa/annotations:pkg", - ], + deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"], ) diff --git a/api/envoy/extensions/filters/http/gcp_events_convert/v3/gcp_events_convert.proto b/api/envoy/extensions/filters/http/gcp_events_convert/v3/gcp_events_convert.proto index dea188735435..3227ab65bd32 100644 --- a/api/envoy/extensions/filters/http/gcp_events_convert/v3/gcp_events_convert.proto +++ b/api/envoy/extensions/filters/http/gcp_events_convert/v3/gcp_events_convert.proto @@ -2,9 +2,6 @@ syntax = "proto3"; package envoy.extensions.filters.http.gcp_events_convert.v3; -import "envoy/config/core/v3/base.proto"; -import "envoy/type/v3/range.proto"; - import "google/api/annotations.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/wrappers.proto"; @@ -17,12 +14,11 @@ import "validate/validate.proto"; option java_package = "io.envoyproxy.envoy.extensions.filters.http.gcp_events_convert.v3"; option java_outer_classname = "GcpEventsConvertProto"; option java_multiple_files = true; -option (udpa.annotations.file_status).work_in_progress = true; option (udpa.annotations.file_status).package_version_status = ACTIVE; message GcpEventsConvert { // content-type to let filter use to match the incoming HTTP request // if content_type matches, trigger the functionality // else run as a pass-through - string content_type = 1 [(validate.rules).string.min_bytes = 1]; + string content_type = 1 [(validate.rules).string = {min_bytes: 1}]; } diff --git a/api/envoy/service/pubsub/v3alpha/received_message.proto b/api/envoy/service/pubsub/v3alpha/received_message.proto index 1876c1bfdf63..83fdc7f220f3 100644 --- a/api/envoy/service/pubsub/v3alpha/received_message.proto +++ b/api/envoy/service/pubsub/v3alpha/received_message.proto @@ -2,7 +2,6 @@ syntax = "proto3"; package envoy.service.pubsub.v3alpha; -import "google/protobuf/empty.proto"; import "google/pubsub/v1/pubsub.proto"; import "udpa/annotations/migrate.proto"; @@ -16,6 +15,10 @@ option (udpa.annotations.file_status).work_in_progress = true; option (udpa.annotations.file_status).package_version_status = ACTIVE; service ReceivedMessageService { - rpc SendReceivedMessage(google.pubsub.v1.ReceivedMessage) returns (google.protobuf.Empty) { + rpc SendReceivedMessage(google.pubsub.v1.ReceivedMessage) returns (Ack) { } } + +message Ack { + string ack_id = 1; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index 64d03355628e..05ecf9261a37 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -191,7 +191,6 @@ proto_library( "//envoy/config/filter/http/dynamo/v2:pkg", "//envoy/config/filter/http/ext_authz/v2:pkg", "//envoy/config/filter/http/fault/v2:pkg", - "//envoy/config/filter/http/gcp_events_convert/v2:pkg", "//envoy/config/filter/http/grpc_http1_bridge/v2:pkg", "//envoy/config/filter/http/grpc_http1_reverse_bridge/v2alpha1:pkg", "//envoy/config/filter/http/grpc_stats/v2alpha:pkg", diff --git a/generated_api_shadow/envoy/extensions/filters/http/gcp_events_convert/v3/BUILD b/generated_api_shadow/envoy/extensions/filters/http/gcp_events_convert/v3/BUILD index f139cce54af2..ee92fb652582 100644 --- a/generated_api_shadow/envoy/extensions/filters/http/gcp_events_convert/v3/BUILD +++ b/generated_api_shadow/envoy/extensions/filters/http/gcp_events_convert/v3/BUILD @@ -1,13 +1,9 @@ -# DO NOT EDIT. This file is generated by tools/proto_sync.py. +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") licenses(["notice"]) # Apache 2 api_proto_package( - deps = [ - "//envoy/config/core/v3:pkg", - "//envoy/type/v3:pkg", - "@com_github_cncf_udpa//udpa/annotations:pkg", - ], + deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"], ) diff --git a/generated_api_shadow/envoy/extensions/filters/http/gcp_events_convert/v3/gcp_events_convert.proto b/generated_api_shadow/envoy/extensions/filters/http/gcp_events_convert/v3/gcp_events_convert.proto index dea188735435..3227ab65bd32 100644 --- a/generated_api_shadow/envoy/extensions/filters/http/gcp_events_convert/v3/gcp_events_convert.proto +++ b/generated_api_shadow/envoy/extensions/filters/http/gcp_events_convert/v3/gcp_events_convert.proto @@ -2,9 +2,6 @@ syntax = "proto3"; package envoy.extensions.filters.http.gcp_events_convert.v3; -import "envoy/config/core/v3/base.proto"; -import "envoy/type/v3/range.proto"; - import "google/api/annotations.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/wrappers.proto"; @@ -17,12 +14,11 @@ import "validate/validate.proto"; option java_package = "io.envoyproxy.envoy.extensions.filters.http.gcp_events_convert.v3"; option java_outer_classname = "GcpEventsConvertProto"; option java_multiple_files = true; -option (udpa.annotations.file_status).work_in_progress = true; option (udpa.annotations.file_status).package_version_status = ACTIVE; message GcpEventsConvert { // content-type to let filter use to match the incoming HTTP request // if content_type matches, trigger the functionality // else run as a pass-through - string content_type = 1 [(validate.rules).string.min_bytes = 1]; + string content_type = 1 [(validate.rules).string = {min_bytes: 1}]; } diff --git a/generated_api_shadow/envoy/service/pubsub/v3alpha/received_message.proto b/generated_api_shadow/envoy/service/pubsub/v3alpha/received_message.proto index 1876c1bfdf63..83fdc7f220f3 100644 --- a/generated_api_shadow/envoy/service/pubsub/v3alpha/received_message.proto +++ b/generated_api_shadow/envoy/service/pubsub/v3alpha/received_message.proto @@ -2,7 +2,6 @@ syntax = "proto3"; package envoy.service.pubsub.v3alpha; -import "google/protobuf/empty.proto"; import "google/pubsub/v1/pubsub.proto"; import "udpa/annotations/migrate.proto"; @@ -16,6 +15,10 @@ option (udpa.annotations.file_status).work_in_progress = true; option (udpa.annotations.file_status).package_version_status = ACTIVE; service ReceivedMessageService { - rpc SendReceivedMessage(google.pubsub.v1.ReceivedMessage) returns (google.protobuf.Empty) { + rpc SendReceivedMessage(google.pubsub.v1.ReceivedMessage) returns (Ack) { } } + +message Ack { + string ack_id = 1; +} diff --git a/source/extensions/filters/http/gcp_events_convert/BUILD b/source/extensions/filters/http/gcp_events_convert/BUILD index e0c3491094dd..9ba878bba511 100644 --- a/source/extensions/filters/http/gcp_events_convert/BUILD +++ b/source/extensions/filters/http/gcp_events_convert/BUILD @@ -31,6 +31,7 @@ envoy_cc_library( "@com_github_cloudevents_sdk//v1/protocol_binding:pubsub_binder_lib", "@com_google_googleapis//google/pubsub/v1:pubsub_cc_proto", "@envoy_api//envoy/extensions/filters/http/gcp_events_convert/v3:pkg_cc_proto", + "@envoy_api//envoy/service/pubsub/v3alpha:pkg_cc_grpc", ], ) diff --git a/source/extensions/filters/http/gcp_events_convert/config.cc b/source/extensions/filters/http/gcp_events_convert/config.cc index 8e4cdee45c31..07e5ed3c135d 100644 --- a/source/extensions/filters/http/gcp_events_convert/config.cc +++ b/source/extensions/filters/http/gcp_events_convert/config.cc @@ -23,7 +23,7 @@ Http::FilterFactoryCb GcpEventsConvertFilterFactory::createFilterFactoryFromProt std::make_shared(proto_config); return [config](Http::FilterChainFactoryCallbacks& callbacks) -> void { - callbacks.addStreamDecoderFilter(std::make_shared(config)); + callbacks.addStreamFilter(std::make_shared(config)); }; } diff --git a/source/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter.cc b/source/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter.cc index cda914d940d0..2eab2ba68102 100644 --- a/source/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter.cc +++ b/source/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter.cc @@ -6,6 +6,7 @@ #include "envoy/extensions/filters/http/gcp_events_convert/v3/gcp_events_convert.pb.h" #include "envoy/http/filter.h" #include "envoy/server/filter_config.h" +#include "envoy/service/pubsub/v3alpha/received_message.grpc.pb.h" #include "common/buffer/buffer_impl.h" #include "common/common/cleanup.h" @@ -25,6 +26,7 @@ namespace Extensions { namespace HttpFilters { namespace GcpEventsConvert { +using envoy::service::pubsub::v3alpha::Ack; using google::pubsub::v1::PubsubMessage; using google::pubsub::v1::ReceivedMessage; using io::cloudevents::v1::CloudEvent; @@ -42,6 +44,12 @@ GcpEventsConvertFilter::GcpEventsConvertFilter(GcpEventsConvertFilterConfigShare Http::RequestHeaderMap* headers) : request_headers_(headers), has_cloud_event_(has_cloud_event), config_(config) {} +GcpEventsConvertFilter::GcpEventsConvertFilter(GcpEventsConvertFilterConfigSharedPtr config, + bool has_cloud_event, + std::string ack_id, + bool acked) + : ack_id_(ack_id), acked_(acked), has_cloud_event_(has_cloud_event), config_(config) {} + void GcpEventsConvertFilter::onDestroy() {} Http::FilterHeadersStatus GcpEventsConvertFilter::decodeHeaders(Http::RequestHeaderMap& headers, @@ -63,6 +71,8 @@ Http::FilterDataStatus GcpEventsConvertFilter::decodeData(Buffer::Instance& buff if (!has_cloud_event_) return Http::FilterDataStatus::Continue; + has_cloud_event_ = false; + // For any request body that is not the end of HTTP request and not empty // Buffer the current HTTP request's body if (!end_stream) @@ -84,6 +94,8 @@ Http::FilterDataStatus GcpEventsConvertFilter::decodeData(Buffer::Instance& buff return Http::FilterDataStatus::Continue; } + ack_id_ = received_message.ack_id(); + const PubsubMessage& pubsub_message = received_message.message(); cloudevents::binding::PubsubBinder pubsub_binder; @@ -104,6 +116,7 @@ Http::FilterDataStatus GcpEventsConvertFilter::decodeData(Buffer::Instance& buff updateHeader(*req); updateBody(*req, buffer); + has_cloud_event_ = true; return Http::FilterDataStatus::Continue; } @@ -116,6 +129,59 @@ void GcpEventsConvertFilter::setDecoderFilterCallbacks( decoder_callbacks_ = &callbacks; } +Http::FilterHeadersStatus GcpEventsConvertFilter::encodeHeaders(Http::ResponseHeaderMap& headers, + bool) { + // for any requst header that is not related to cloud event. Pass through + if (!has_cloud_event_) + return Http::FilterHeadersStatus::Continue; + + uint64_t status; + if (!absl::SimpleAtoi(headers.getStatusValue(), &status) || status != 200) { + // invalid code, not acknowledged + ENVOY_LOG(warn, "Gcp Events Convert Filter log: upstream response with status {}", headers.getStatusValue()); + return Http::FilterHeadersStatus::Continue; + } + + headers.setContentType(config_->content_type_); + acked_ = true; + + return Http::FilterHeadersStatus::Continue; +} + +Http::FilterDataStatus GcpEventsConvertFilter::encodeData(Buffer::Instance& buffer, bool end_stream) { + // for any requst body that is not related to cloud event or status code is not 200. Pass through + if (!has_cloud_event_ || !acked_) + return Http::FilterDataStatus::Continue; + + if (!end_stream) + return Http::FilterDataStatus::StopIterationAndBuffer; + + Ack ack; + ack.set_ack_id(ack_id_); + std::string proto_string; + ack.SerializeToString(&proto_string); + + buffer.drain(buffer.length()); + if (encoder_callbacks_ == nullptr || encoder_callbacks_->encodingBuffer() == nullptr) { + buffer.add(proto_string); + } else { + encoder_callbacks_->modifyEncodingBuffer([&proto_string](Buffer::Instance& buffered){ + buffered.drain(buffered.length()); + buffered.add(proto_string); + }); + } + + return Http::FilterDataStatus::Continue; +} + +Http::FilterTrailersStatus GcpEventsConvertFilter::encodeTrailers(Http::ResponseTrailerMap&) { + return Http::FilterTrailersStatus::Continue; +} + +void GcpEventsConvertFilter::setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) { + encoder_callbacks_ = &callbacks; +} + std::string GcpEventsConvertFilter::buildBody(const Buffer::Instance* buffered, const Buffer::Instance& last) { std::string body; diff --git a/source/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter.h b/source/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter.h index 62ec3516f9d9..fc23ac1cb56f 100644 --- a/source/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter.h +++ b/source/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter.h @@ -9,6 +9,8 @@ #include "common/common/logger.h" +#include "extensions/filters/http/common/pass_through_filter.h" + namespace Envoy { namespace Extensions { namespace HttpFilters { @@ -27,7 +29,7 @@ using GcpEventsConvertFilterConfigSharedPtr = std::shared_ptr { public: // normal constructor @@ -36,6 +38,10 @@ class GcpEventsConvertFilter : public Http::StreamDecoderFilter, GcpEventsConvertFilter(GcpEventsConvertFilterConfigSharedPtr config, bool has_cloud_event, Http::RequestHeaderMap* headers); + GcpEventsConvertFilter(GcpEventsConvertFilterConfigSharedPtr config, + bool has_cloud_event, + std::string ack_id, + bool acked); // Http::StreamFilterBase void onDestroy() override; @@ -46,6 +52,13 @@ class GcpEventsConvertFilter : public Http::StreamDecoderFilter, Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override; void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override; + // Http::StreamEncoderFilter + Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers, + bool end_stream) override; + Http::FilterDataStatus encodeData(Buffer::Instance& buffer, bool end_stream) override; + Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap& trailers) override; + void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override; + private: using HttpRequest = boost::beast::http::request; @@ -64,9 +77,12 @@ class GcpEventsConvertFilter : public Http::StreamDecoderFilter, // 2. add cloud event information, ce-version, ce-type...... (except ce's data) void updateHeader(const HttpRequest& request); + std::string ack_id_; + bool acked_ = false; Http::RequestHeaderMap* request_headers_ = nullptr; bool has_cloud_event_ = false; const GcpEventsConvertFilterConfigSharedPtr config_; + Http::StreamEncoderFilterCallbacks* encoder_callbacks_ = nullptr; Http::StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr; }; diff --git a/source/extensions/grpc_stream_demuxer/BUILD b/source/extensions/grpc_stream_demuxer/BUILD index 2ac546aaa435..19bff0f983b2 100644 --- a/source/extensions/grpc_stream_demuxer/BUILD +++ b/source/extensions/grpc_stream_demuxer/BUILD @@ -16,6 +16,7 @@ envoy_cc_library( deps = [ "//include/envoy/event:dispatcher_interface", "//source/common/common:minimal_logger_lib", + "//source/common/common:statusor_lib", "@com_google_googleapis//google/pubsub/v1:pubsub_cc_grpc", "@com_github_grpc_grpc//:grpc++", "@envoy_api//envoy/service/pubsub/v3alpha:pkg_cc_grpc", diff --git a/source/extensions/grpc_stream_demuxer/grpc_stream_demuxer.cc b/source/extensions/grpc_stream_demuxer/grpc_stream_demuxer.cc index 78d1bf3f71ef..d9d25d64506d 100644 --- a/source/extensions/grpc_stream_demuxer/grpc_stream_demuxer.cc +++ b/source/extensions/grpc_stream_demuxer/grpc_stream_demuxer.cc @@ -9,8 +9,8 @@ ReceivedMessageServiceClient::ReceivedMessageServiceClient(std::shared_ptr ReceivedMessageServiceClient::SendReceivedMessage(const ReceivedMessage &request) { + Ack reply; // Context for the client. It could be used to convey extra information to // the server and/or tweak certain RPC behaviors. ClientContext context; @@ -20,10 +20,10 @@ std::string ReceivedMessageServiceClient::SendReceivedMessage(const ReceivedMess // Act upon its status. if (status.ok()) { - return "RPC worked"; + return reply.ack_id(); } else { std::cout << status.error_code() << ": " << status.error_message() << std::endl; - return "RPC failed"; + return absl::Status(absl::StatusCode::kAborted, status.error_message()); } } @@ -65,9 +65,12 @@ void GrpcStreamDemuxer::start() { // Send the message using a unary grpc request. std::string target_uri = address_ + ":" + std::to_string(port_); ReceivedMessageServiceClient client(grpc::CreateChannel(target_uri, grpc::InsecureChannelCredentials())); - std::string reply = client.SendReceivedMessage(message); - ENVOY_LOG(info, "Unary request response: {}", reply); - ack_request.add_ack_ids(message.ack_id()); + Envoy::StatusOr reply = client.SendReceivedMessage(message); + if (reply.ok()){ + ack_request.add_ack_ids(reply.value()); + } else { + ENVOY_LOG(info, "Unary request response: {}", reply.status()); + } } stream->Write(ack_request); } diff --git a/source/extensions/grpc_stream_demuxer/grpc_stream_demuxer.h b/source/extensions/grpc_stream_demuxer/grpc_stream_demuxer.h index 7b9778bb196e..5d294609dffa 100644 --- a/source/extensions/grpc_stream_demuxer/grpc_stream_demuxer.h +++ b/source/extensions/grpc_stream_demuxer/grpc_stream_demuxer.h @@ -4,6 +4,7 @@ #include "envoy/service/pubsub/v3alpha/received_message.grpc.pb.h" #include "common/common/logger.h" +#include "common/common/statusor.h" #include "google/pubsub/v1/pubsub.grpc.pb.h" #include "grpc++/grpc++.h" @@ -16,6 +17,7 @@ using google::pubsub::v1::Subscriber; using google::pubsub::v1::StreamingPullRequest; using google::pubsub::v1::StreamingPullResponse; using google::pubsub::v1::ReceivedMessage; +using envoy::service::pubsub::v3alpha::Ack; using envoy::service::pubsub::v3alpha::ReceivedMessageService; namespace Envoy { @@ -26,7 +28,7 @@ class ReceivedMessageServiceClient { public: ReceivedMessageServiceClient(std::shared_ptr channel); - std::string SendReceivedMessage(const ReceivedMessage &request); + Envoy::StatusOr SendReceivedMessage(const ReceivedMessage &request); private: std::unique_ptr stub_; diff --git a/test/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter_test.cc b/test/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter_test.cc index 5c218a63c7b0..2c10a93fb197 100644 --- a/test/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter_test.cc +++ b/test/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter_test.cc @@ -216,6 +216,33 @@ TEST(GcpEventsConvertFilterUnitTest, DecodeDataWithRandomBody) { EXPECT_EQ(Http::FilterDataStatus::Continue, filter.decodeData(data2, true)); } +// Unit test for Encode Headers +TEST(GcpEventsConvertFilterUnitTest, EncoderHeaderWithNoCloudEvent) { + envoy::extensions::filters::http::gcp_events_convert::v3::GcpEventsConvert config; + config.set_content_type("application/grpc+cloudevent+json"); + GcpEventsConvertFilter filter(std::make_shared(config), + /*has_cloud_event=*/false, + /*headers =*/nullptr); + Http::TestResponseHeaderMapImpl headers( + {{"content-type", "application/grpc+cloudevent+json"}, {"content-length", "100"}}); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter.encodeHeaders(headers, false)); +} + +TEST(GcpEventsConvertFilterUnitTest, EncoderHeaderWithCloudEvent) { + envoy::extensions::filters::http::gcp_events_convert::v3::GcpEventsConvert config; + config.set_content_type("application/grpc+cloudevent+json"); + GcpEventsConvertFilter filter(std::make_shared(config), + /*has_cloud_event=*/true, + /*headers =*/nullptr); + Http::TestResponseHeaderMapImpl headers( + {{"content-type", "application/html"}, {"content-length", "100"}}); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter.encodeHeaders(headers, false)); +} + +// Unit test for Encode Data + } // namespace GcpEventsConvert } // namespace HttpFilters } // namespace Extensions From 08cd8d2c02bb4fc3fb3b5d5033244ec8776709a3 Mon Sep 17 00:00:00 2001 From: Hongxiang Jiang Date: Thu, 17 Sep 2020 18:16:53 +0000 Subject: [PATCH 2/2] Add Unit Test for encoding filter 1. encode header under cases 1a. cloud event related or not 1b. response 200 / 404 2. encode data under cases 1a. cloud event related or not 1b. response 200 / 404 1c. decodingBuffer empty or not --- .../filters/http/gcp_events_convert/BUILD | 1 + .../gcp_events_convert_filter_test.cc | 130 +++++++++++++++++- 2 files changed, 126 insertions(+), 5 deletions(-) diff --git a/test/extensions/filters/http/gcp_events_convert/BUILD b/test/extensions/filters/http/gcp_events_convert/BUILD index b6de5f444b68..d8ff5fc9d2d3 100644 --- a/test/extensions/filters/http/gcp_events_convert/BUILD +++ b/test/extensions/filters/http/gcp_events_convert/BUILD @@ -30,6 +30,7 @@ envoy_extension_cc_test( "//test/test_common:test_runtime_lib", "@com_google_googleapis//google/pubsub/v1:pubsub_cc_proto", "@envoy_api//envoy/extensions/filters/http/gcp_events_convert/v3:pkg_cc_proto", + "@envoy_api//envoy/service/pubsub/v3alpha:pkg_cc_grpc", ], ) diff --git a/test/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter_test.cc b/test/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter_test.cc index 2c10a93fb197..d82766c988fb 100644 --- a/test/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter_test.cc +++ b/test/extensions/filters/http/gcp_events_convert/gcp_events_convert_filter_test.cc @@ -2,6 +2,7 @@ #include "envoy/event/dispatcher.h" #include "envoy/extensions/filters/http/gcp_events_convert/v3/gcp_events_convert.pb.h" +#include "envoy/service/pubsub/v3alpha/received_message.grpc.pb.h" #include "common/http/header_map_impl.h" #include "common/protobuf/utility.h" @@ -19,6 +20,7 @@ #include "google/pubsub/v1/pubsub.pb.h" #include "gtest/gtest.h" +using envoy::service::pubsub::v3alpha::Ack; using google::pubsub::v1::PubsubMessage; using google::pubsub::v1::ReceivedMessage; @@ -217,31 +219,149 @@ TEST(GcpEventsConvertFilterUnitTest, DecodeDataWithRandomBody) { } // Unit test for Encode Headers -TEST(GcpEventsConvertFilterUnitTest, EncoderHeaderWithNoCloudEvent) { +TEST(GcpEventsConvertFilterUnitTest, EncoderHeaderNoCloudEvent) { envoy::extensions::filters::http::gcp_events_convert::v3::GcpEventsConvert config; config.set_content_type("application/grpc+cloudevent+json"); GcpEventsConvertFilter filter(std::make_shared(config), /*has_cloud_event=*/false, /*headers =*/nullptr); Http::TestResponseHeaderMapImpl headers( - {{"content-type", "application/grpc+cloudevent+json"}, {"content-length", "100"}}); + {{"content-type", "application/html"}, {"content-length", "100"}}); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter.encodeHeaders(headers, false)); } -TEST(GcpEventsConvertFilterUnitTest, EncoderHeaderWithCloudEvent) { +TEST(GcpEventsConvertFilterUnitTest, EncoderHeaderCloudEventStatus200) { envoy::extensions::filters::http::gcp_events_convert::v3::GcpEventsConvert config; config.set_content_type("application/grpc+cloudevent+json"); GcpEventsConvertFilter filter(std::make_shared(config), /*has_cloud_event=*/true, - /*headers =*/nullptr); + /*ack_id=*/"ack id string", + /*acked=*/true); Http::TestResponseHeaderMapImpl headers( - {{"content-type", "application/html"}, {"content-length", "100"}}); + {{":status", "200"}, {"content-type", "application/html"}, {"content-length", "100"}}); EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter.encodeHeaders(headers, false)); + // filter should replace headers content-type with the content type user specified + EXPECT_EQ("application/grpc+cloudevent+json", headers.getContentTypeValue()); +} + +TEST(GcpEventsConvertFilterUnitTest, EncoderHeaderCloudEventStatus404) { + envoy::extensions::filters::http::gcp_events_convert::v3::GcpEventsConvert config; + config.set_content_type("application/grpc+cloudevent+json"); + GcpEventsConvertFilter filter(std::make_shared(config), + /*has_cloud_event=*/true, + /*ack_id=*/"ack id string", + /*acked=*/true); + Http::TestResponseHeaderMapImpl headers( + {{":status", "404"}, {"content-type", "application/html"}, {"content-length", "100"}}); + + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter.encodeHeaders(headers, false)); + // filter should replace headers content-type with the content type user specified + EXPECT_EQ("application/html", headers.getContentTypeValue()); } // Unit test for Encode Data +TEST(GcpEventsConvertFilterUnitTest, EncodeDataStatus200NotEndStream) { + envoy::extensions::filters::http::gcp_events_convert::v3::GcpEventsConvert proto_config; + proto_config.set_content_type("application/grpc+cloudevent+json"); + Http::TestRequestHeaderMapImpl headers; + GcpEventsConvertFilter filter(std::make_shared(proto_config), + /*has_cloud_event=*/true, + /*ack_id=*/"ack id string", + /*acked=*/true); + + Buffer::OwnedImpl data; + EXPECT_EQ(Http::FilterDataStatus::StopIterationAndBuffer, filter.encodeData(data, false)); +} + +TEST(GcpEventsConvertFilterUnitTest, EncodeDataStatus200EndStreamDecodingBuffer) { + envoy::extensions::filters::http::gcp_events_convert::v3::GcpEventsConvert proto_config; + proto_config.set_content_type("application/grpc+cloudevent+json"); + Http::TestRequestHeaderMapImpl headers; + GcpEventsConvertFilter filter(std::make_shared(proto_config), + /*has_cloud_event=*/true, + /*ack_id=*/"ack id string", + /*acked=*/true); + Http::MockStreamEncoderFilterCallbacks callbacks; + filter.setEncoderFilterCallbacks(callbacks); + + Buffer::OwnedImpl buffer; + EXPECT_CALL(callbacks, encodingBuffer).Times(1).WillOnce(testing::Return(&buffer)); + EXPECT_CALL(callbacks, modifyEncodingBuffer) + .Times(1) + .WillOnce([&buffer](std::function callback) { + // callback is the callback function parameter used to manipulate buffered data + // in our use case, we run the lambda function to manipulate buffered data manually + callback(buffer); + }); + + Buffer::OwnedImpl data; + EXPECT_EQ(Http::FilterDataStatus::Continue, filter.encodeData(data, true)); + + std::string proto_string; + Ack ack; + ack.set_ack_id("ack id string"); + bool status = ack.SerializeToString(&proto_string); + ASSERT_TRUE(status); + + // filter should replace body with ack proto format string + EXPECT_EQ(proto_string, buffer.toString()); + EXPECT_EQ("", data.toString()); +} + +TEST(GcpEventsConvertFilterUnitTest, EncodeDataStatus200EndStreamLastData) { + envoy::extensions::filters::http::gcp_events_convert::v3::GcpEventsConvert proto_config; + proto_config.set_content_type("application/grpc+cloudevent+json"); + + GcpEventsConvertFilter filter(std::make_shared(proto_config), + /*has_cloud_event=*/true, + /*ack_id=*/"ack id string", + /*acked=*/true); + Http::MockStreamEncoderFilterCallbacks callbacks; + filter.setEncoderFilterCallbacks(callbacks); + + EXPECT_CALL(callbacks, encodingBuffer).Times(1).WillOnce(testing::Return(nullptr)); + EXPECT_CALL(callbacks, modifyEncodingBuffer).Times(0); + + Buffer::OwnedImpl data; + EXPECT_EQ(Http::FilterDataStatus::Continue, filter.encodeData(data, true)); + + std::string proto_string; + Ack ack; + ack.set_ack_id("ack id string"); + bool status = ack.SerializeToString(&proto_string); + ASSERT_TRUE(status); + + // filter should replace body with ack proto format string + EXPECT_EQ(proto_string, data.toString()); +} + +TEST(GcpEventsConvertFilterUnitTest, EncodeDataStatusNot200) { + envoy::extensions::filters::http::gcp_events_convert::v3::GcpEventsConvert proto_config; + proto_config.set_content_type("application/grpc+cloudevent+json"); + + GcpEventsConvertFilter filter(std::make_shared(proto_config), + /*has_cloud_event=*/true, + /*ack_id=*/"ack id string", + /*acked=*/false); + + Buffer::OwnedImpl data; + EXPECT_EQ(Http::FilterDataStatus::Continue, filter.encodeData(data, true)); +} + +TEST(GcpEventsConvertFilterUnitTest, EncodeDataNoCloudEvent) { + envoy::extensions::filters::http::gcp_events_convert::v3::GcpEventsConvert proto_config; + proto_config.set_content_type("application/grpc+cloudevent+json"); + + GcpEventsConvertFilter filter(std::make_shared(proto_config), + /*has_cloud_event=*/false, + /*ack_id=*/"random string", + /*acked=*/true); + + Buffer::OwnedImpl data; + EXPECT_EQ(Http::FilterDataStatus::Continue, filter.encodeData(data, true)); +} } // namespace GcpEventsConvert } // namespace HttpFilters