Skip to content

Commit

Permalink
Merge pull request #19 from h9jiang/response_path
Browse files Browse the repository at this point in the history
Response path implementation
  • Loading branch information
hrasadi authored Sep 24, 2020
2 parents c6eaf26 + 08cd8d2 commit 367edfb
Show file tree
Hide file tree
Showing 17 changed files with 264 additions and 37 deletions.
1 change: 1 addition & 0 deletions api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ proto_library(
"//envoy/service/listener/v3:pkg",
"//envoy/service/load_stats/v3:pkg",
"//envoy/service/metrics/v3:pkg",
"//envoy/service/pubsub/v3alpha:pkg",
"//envoy/service/ratelimit/v3:pkg",
"//envoy/service/route/v3:pkg",
"//envoy/service/runtime/v3:pkg",
Expand Down
8 changes: 2 additions & 6 deletions api/envoy/extensions/filters/http/gcp_events_convert/v3/BUILD
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
# DO NOT EDIT. This file is generated by tools/proto_sync.py.
# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package(
deps = [
"//envoy/config/core/v3:pkg",
"//envoy/type/v3:pkg",
"@com_github_cncf_udpa//udpa/annotations:pkg",
],
deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ syntax = "proto3";

package envoy.extensions.filters.http.gcp_events_convert.v3;

import "envoy/config/core/v3/base.proto";
import "envoy/type/v3/range.proto";

import "google/api/annotations.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";
Expand All @@ -17,12 +14,11 @@ import "validate/validate.proto";
option java_package = "io.envoyproxy.envoy.extensions.filters.http.gcp_events_convert.v3";
option java_outer_classname = "GcpEventsConvertProto";
option java_multiple_files = true;
option (udpa.annotations.file_status).work_in_progress = true;
option (udpa.annotations.file_status).package_version_status = ACTIVE;

message GcpEventsConvert {
// content-type to let filter use to match the incoming HTTP request
// if content_type matches, trigger the functionality
// else run as a pass-through
string content_type = 1 [(validate.rules).string.min_bytes = 1];
string content_type = 1 [(validate.rules).string = {min_bytes: 1}];
}
7 changes: 5 additions & 2 deletions api/envoy/service/pubsub/v3alpha/received_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ syntax = "proto3";

package envoy.service.pubsub.v3alpha;

import "google/protobuf/empty.proto";
import "google/pubsub/v1/pubsub.proto";

import "udpa/annotations/migrate.proto";
Expand All @@ -16,6 +15,10 @@ option (udpa.annotations.file_status).work_in_progress = true;
option (udpa.annotations.file_status).package_version_status = ACTIVE;

service ReceivedMessageService {
rpc SendReceivedMessage(google.pubsub.v1.ReceivedMessage) returns (google.protobuf.Empty) {
rpc SendReceivedMessage(google.pubsub.v1.ReceivedMessage) returns (Ack) {
}
}

message Ack {
string ack_id = 1;
}
1 change: 0 additions & 1 deletion api/versioning/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ proto_library(
"//envoy/config/filter/http/dynamo/v2:pkg",
"//envoy/config/filter/http/ext_authz/v2:pkg",
"//envoy/config/filter/http/fault/v2:pkg",
"//envoy/config/filter/http/gcp_events_convert/v2:pkg",
"//envoy/config/filter/http/grpc_http1_bridge/v2:pkg",
"//envoy/config/filter/http/grpc_http1_reverse_bridge/v2alpha1:pkg",
"//envoy/config/filter/http/grpc_stats/v2alpha:pkg",
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions source/extensions/filters/http/gcp_events_convert/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ envoy_cc_library(
"@com_github_cloudevents_sdk//v1/protocol_binding:pubsub_binder_lib",
"@com_google_googleapis//google/pubsub/v1:pubsub_cc_proto",
"@envoy_api//envoy/extensions/filters/http/gcp_events_convert/v3:pkg_cc_proto",
"@envoy_api//envoy/service/pubsub/v3alpha:pkg_cc_grpc",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Http::FilterFactoryCb GcpEventsConvertFilterFactory::createFilterFactoryFromProt
std::make_shared<GcpEventsConvertFilterConfig>(proto_config);

return [config](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamDecoderFilter(std::make_shared<GcpEventsConvertFilter>(config));
callbacks.addStreamFilter(std::make_shared<GcpEventsConvertFilter>(config));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/extensions/filters/http/gcp_events_convert/v3/gcp_events_convert.pb.h"
#include "envoy/http/filter.h"
#include "envoy/server/filter_config.h"
#include "envoy/service/pubsub/v3alpha/received_message.grpc.pb.h"

#include "common/buffer/buffer_impl.h"
#include "common/common/cleanup.h"
Expand All @@ -25,6 +26,7 @@ namespace Extensions {
namespace HttpFilters {
namespace GcpEventsConvert {

using envoy::service::pubsub::v3alpha::Ack;
using google::pubsub::v1::PubsubMessage;
using google::pubsub::v1::ReceivedMessage;
using io::cloudevents::v1::CloudEvent;
Expand All @@ -42,6 +44,12 @@ GcpEventsConvertFilter::GcpEventsConvertFilter(GcpEventsConvertFilterConfigShare
Http::RequestHeaderMap* headers)
: request_headers_(headers), has_cloud_event_(has_cloud_event), config_(config) {}

GcpEventsConvertFilter::GcpEventsConvertFilter(GcpEventsConvertFilterConfigSharedPtr config,
bool has_cloud_event,
std::string ack_id,
bool acked)
: ack_id_(ack_id), acked_(acked), has_cloud_event_(has_cloud_event), config_(config) {}

void GcpEventsConvertFilter::onDestroy() {}

Http::FilterHeadersStatus GcpEventsConvertFilter::decodeHeaders(Http::RequestHeaderMap& headers,
Expand All @@ -63,6 +71,8 @@ Http::FilterDataStatus GcpEventsConvertFilter::decodeData(Buffer::Instance& buff
if (!has_cloud_event_)
return Http::FilterDataStatus::Continue;

has_cloud_event_ = false;

// For any request body that is not the end of HTTP request and not empty
// Buffer the current HTTP request's body
if (!end_stream)
Expand All @@ -84,6 +94,8 @@ Http::FilterDataStatus GcpEventsConvertFilter::decodeData(Buffer::Instance& buff
return Http::FilterDataStatus::Continue;
}

ack_id_ = received_message.ack_id();

const PubsubMessage& pubsub_message = received_message.message();
cloudevents::binding::PubsubBinder pubsub_binder;

Expand All @@ -104,6 +116,7 @@ Http::FilterDataStatus GcpEventsConvertFilter::decodeData(Buffer::Instance& buff
updateHeader(*req);
updateBody(*req, buffer);

has_cloud_event_ = true;
return Http::FilterDataStatus::Continue;
}

Expand All @@ -116,6 +129,59 @@ void GcpEventsConvertFilter::setDecoderFilterCallbacks(
decoder_callbacks_ = &callbacks;
}

Http::FilterHeadersStatus GcpEventsConvertFilter::encodeHeaders(Http::ResponseHeaderMap& headers,
bool) {
// for any requst header that is not related to cloud event. Pass through
if (!has_cloud_event_)
return Http::FilterHeadersStatus::Continue;

uint64_t status;
if (!absl::SimpleAtoi(headers.getStatusValue(), &status) || status != 200) {
// invalid code, not acknowledged
ENVOY_LOG(warn, "Gcp Events Convert Filter log: upstream response with status {}", headers.getStatusValue());
return Http::FilterHeadersStatus::Continue;
}

headers.setContentType(config_->content_type_);
acked_ = true;

return Http::FilterHeadersStatus::Continue;
}

Http::FilterDataStatus GcpEventsConvertFilter::encodeData(Buffer::Instance& buffer, bool end_stream) {
// for any requst body that is not related to cloud event or status code is not 200. Pass through
if (!has_cloud_event_ || !acked_)
return Http::FilterDataStatus::Continue;

if (!end_stream)
return Http::FilterDataStatus::StopIterationAndBuffer;

Ack ack;
ack.set_ack_id(ack_id_);
std::string proto_string;
ack.SerializeToString(&proto_string);

buffer.drain(buffer.length());
if (encoder_callbacks_ == nullptr || encoder_callbacks_->encodingBuffer() == nullptr) {
buffer.add(proto_string);
} else {
encoder_callbacks_->modifyEncodingBuffer([&proto_string](Buffer::Instance& buffered){
buffered.drain(buffered.length());
buffered.add(proto_string);
});
}

return Http::FilterDataStatus::Continue;
}

Http::FilterTrailersStatus GcpEventsConvertFilter::encodeTrailers(Http::ResponseTrailerMap&) {
return Http::FilterTrailersStatus::Continue;
}

void GcpEventsConvertFilter::setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) {
encoder_callbacks_ = &callbacks;
}

std::string GcpEventsConvertFilter::buildBody(const Buffer::Instance* buffered,
const Buffer::Instance& last) {
std::string body;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include "common/common/logger.h"

#include "extensions/filters/http/common/pass_through_filter.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
Expand All @@ -27,7 +29,7 @@ using GcpEventsConvertFilterConfigSharedPtr = std::shared_ptr<GcpEventsConvertFi
/**
* The filter instance for convert Cloud Event Pubsub Binding to HTTP binding
*/
class GcpEventsConvertFilter : public Http::StreamDecoderFilter,
class GcpEventsConvertFilter : public Envoy::Http::PassThroughFilter,
public Logger::Loggable<Logger::Id::filter> {
public:
// normal constructor
Expand All @@ -36,6 +38,10 @@ class GcpEventsConvertFilter : public Http::StreamDecoderFilter,
GcpEventsConvertFilter(GcpEventsConvertFilterConfigSharedPtr config,
bool has_cloud_event,
Http::RequestHeaderMap* headers);
GcpEventsConvertFilter(GcpEventsConvertFilterConfigSharedPtr config,
bool has_cloud_event,
std::string ack_id,
bool acked);
// Http::StreamFilterBase
void onDestroy() override;

Expand All @@ -46,6 +52,13 @@ class GcpEventsConvertFilter : public Http::StreamDecoderFilter,
Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override;
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override;

// Http::StreamEncoderFilter
Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap& headers,
bool end_stream) override;
Http::FilterDataStatus encodeData(Buffer::Instance& buffer, bool end_stream) override;
Http::FilterTrailersStatus encodeTrailers(Http::ResponseTrailerMap& trailers) override;
void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override;

private:
using HttpRequest = boost::beast::http::request<boost::beast::http::string_body>;

Expand All @@ -64,9 +77,12 @@ class GcpEventsConvertFilter : public Http::StreamDecoderFilter,
// 2. add cloud event information, ce-version, ce-type...... (except ce's data)
void updateHeader(const HttpRequest& request);

std::string ack_id_;
bool acked_ = false;
Http::RequestHeaderMap* request_headers_ = nullptr;
bool has_cloud_event_ = false;
const GcpEventsConvertFilterConfigSharedPtr config_;
Http::StreamEncoderFilterCallbacks* encoder_callbacks_ = nullptr;
Http::StreamDecoderFilterCallbacks* decoder_callbacks_ = nullptr;
};

Expand Down
1 change: 1 addition & 0 deletions source/extensions/grpc_stream_demuxer/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ envoy_cc_library(
deps = [
"//include/envoy/event:dispatcher_interface",
"//source/common/common:minimal_logger_lib",
"//source/common/common:statusor_lib",
"@com_google_googleapis//google/pubsub/v1:pubsub_cc_grpc",
"@com_github_grpc_grpc//:grpc++",
"@envoy_api//envoy/service/pubsub/v3alpha:pkg_cc_grpc",
Expand Down
17 changes: 10 additions & 7 deletions source/extensions/grpc_stream_demuxer/grpc_stream_demuxer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ ReceivedMessageServiceClient::ReceivedMessageServiceClient(std::shared_ptr<Chann

// Assembles the client's payload, sends it and presents the response back
// from the server.
std::string ReceivedMessageServiceClient::SendReceivedMessage(const ReceivedMessage &request) {
google::protobuf::Empty reply;
Envoy::StatusOr<std::string> ReceivedMessageServiceClient::SendReceivedMessage(const ReceivedMessage &request) {
Ack reply;
// Context for the client. It could be used to convey extra information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
Expand All @@ -20,10 +20,10 @@ std::string ReceivedMessageServiceClient::SendReceivedMessage(const ReceivedMess

// Act upon its status.
if (status.ok()) {
return "RPC worked";
return reply.ack_id();
} else {
std::cout << status.error_code() << ": " << status.error_message() << std::endl;
return "RPC failed";
return absl::Status(absl::StatusCode::kAborted, status.error_message());
}
}

Expand Down Expand Up @@ -65,9 +65,12 @@ void GrpcStreamDemuxer::start() {
// Send the message using a unary grpc request.
std::string target_uri = address_ + ":" + std::to_string(port_);
ReceivedMessageServiceClient client(grpc::CreateChannel(target_uri, grpc::InsecureChannelCredentials()));
std::string reply = client.SendReceivedMessage(message);
ENVOY_LOG(info, "Unary request response: {}", reply);
ack_request.add_ack_ids(message.ack_id());
Envoy::StatusOr<std::string> reply = client.SendReceivedMessage(message);
if (reply.ok()){
ack_request.add_ack_ids(reply.value());
} else {
ENVOY_LOG(info, "Unary request response: {}", reply.status());
}
}
stream->Write(ack_request);
}
Expand Down
4 changes: 3 additions & 1 deletion source/extensions/grpc_stream_demuxer/grpc_stream_demuxer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "envoy/service/pubsub/v3alpha/received_message.grpc.pb.h"

#include "common/common/logger.h"
#include "common/common/statusor.h"

#include "google/pubsub/v1/pubsub.grpc.pb.h"
#include "grpc++/grpc++.h"
Expand All @@ -16,6 +17,7 @@ using google::pubsub::v1::Subscriber;
using google::pubsub::v1::StreamingPullRequest;
using google::pubsub::v1::StreamingPullResponse;
using google::pubsub::v1::ReceivedMessage;
using envoy::service::pubsub::v3alpha::Ack;
using envoy::service::pubsub::v3alpha::ReceivedMessageService;

namespace Envoy {
Expand All @@ -26,7 +28,7 @@ class ReceivedMessageServiceClient {
public:
ReceivedMessageServiceClient(std::shared_ptr<Channel> channel);

std::string SendReceivedMessage(const ReceivedMessage &request);
Envoy::StatusOr<std::string> SendReceivedMessage(const ReceivedMessage &request);

private:
std::unique_ptr<ReceivedMessageService::Stub> stub_;
Expand Down
Loading

0 comments on commit 367edfb

Please sign in to comment.