From bb1925414b8671ae06e95385ba03ad4ad5f01c02 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Thu, 16 Jan 2025 11:09:21 +0100 Subject: [PATCH] get rid of github.com/gogo/protobuf (#526) --- components/cqrs/marshaler_protobuf.go | 23 +- .../marshaler_protobuf_events_new_test.go | 483 ++++++++++++++++++ components/cqrs/marshaler_protobuf_gogo.go | 127 +++++ .../cqrs/marshaler_protobuf_gogo_test.go | 142 +++++ components/cqrs/marshaler_protobuf_test.go | 102 +++- components/cqrs/testdata/events.proto | 31 +- go.sum | 4 + 7 files changed, 882 insertions(+), 30 deletions(-) create mode 100644 components/cqrs/marshaler_protobuf_events_new_test.go create mode 100644 components/cqrs/marshaler_protobuf_gogo.go create mode 100644 components/cqrs/marshaler_protobuf_gogo_test.go diff --git a/components/cqrs/marshaler_protobuf.go b/components/cqrs/marshaler_protobuf.go index 474861094..8e9f66afb 100644 --- a/components/cqrs/marshaler_protobuf.go +++ b/components/cqrs/marshaler_protobuf.go @@ -5,13 +5,13 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" + "google.golang.org/protobuf/proto" - "github.com/gogo/protobuf/proto" "github.com/pkg/errors" ) -// ProtobufMarshaler is the default Protocol Buffers marshaler. -type ProtobufMarshaler struct { +// ProtoMarshaler is the default Protocol Buffers marshaler. +type ProtoMarshaler struct { NewUUID func() string GenerateName func(v interface{}) string } @@ -31,7 +31,7 @@ func (e NoProtoMessageError) Error() string { } // Marshal marshals the given protobuf's message into watermill's Message. -func (m ProtobufMarshaler) Marshal(v interface{}) (*message.Message, error) { +func (m ProtoMarshaler) Marshal(v interface{}) (*message.Message, error) { protoMsg, ok := v.(proto.Message) if !ok { return nil, errors.WithStack(NoProtoMessageError{v}) @@ -51,7 +51,7 @@ func (m ProtobufMarshaler) Marshal(v interface{}) (*message.Message, error) { return msg, nil } -func (m ProtobufMarshaler) newUUID() string { +func (m ProtoMarshaler) newUUID() string { if m.NewUUID != nil { return m.NewUUID() } @@ -61,12 +61,17 @@ func (m ProtobufMarshaler) newUUID() string { } // Unmarshal unmarshals given watermill's Message into protobuf's message. -func (ProtobufMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error) { - return proto.Unmarshal(msg.Payload, v.(proto.Message)) +func (ProtoMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error) { + protoV, ok := v.(proto.Message) + if !ok { + return errors.WithStack(NoProtoMessageError{v}) + } + + return proto.Unmarshal(msg.Payload, protoV) } // Name returns the command or event's name. -func (m ProtobufMarshaler) Name(cmdOrEvent interface{}) string { +func (m ProtoMarshaler) Name(cmdOrEvent interface{}) string { if m.GenerateName != nil { return m.GenerateName(cmdOrEvent) } @@ -75,6 +80,6 @@ func (m ProtobufMarshaler) Name(cmdOrEvent interface{}) string { } // NameFromMessage returns the metadata name value for a given Message. -func (m ProtobufMarshaler) NameFromMessage(msg *message.Message) string { +func (m ProtoMarshaler) NameFromMessage(msg *message.Message) string { return msg.Metadata.Get("name") } diff --git a/components/cqrs/marshaler_protobuf_events_new_test.go b/components/cqrs/marshaler_protobuf_events_new_test.go new file mode 100644 index 000000000..c7fb8e26c --- /dev/null +++ b/components/cqrs/marshaler_protobuf_events_new_test.go @@ -0,0 +1,483 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.24.4 +// source: events.proto + +package cqrs_test + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Status int32 + +const ( + Status_STATUS_UNSPECIFIED Status = 0 + Status_ACTIVE Status = 1 + Status_DELETED Status = 2 +) + +// Enum value maps for Status. +var ( + Status_name = map[int32]string{ + 0: "STATUS_UNSPECIFIED", + 1: "ACTIVE", + 2: "DELETED", + } + Status_value = map[string]int32{ + "STATUS_UNSPECIFIED": 0, + "ACTIVE": 1, + "DELETED": 2, + } +) + +func (x Status) Enum() *Status { + p := new(Status) + *p = x + return p +} + +func (x Status) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Status) Descriptor() protoreflect.EnumDescriptor { + return file_events_proto_enumTypes[0].Descriptor() +} + +func (Status) Type() protoreflect.EnumType { + return &file_events_proto_enumTypes[0] +} + +func (x Status) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Status.Descriptor instead. +func (Status) EnumDescriptor() ([]byte, []int) { + return file_events_proto_rawDescGZIP(), []int{0} +} + +type TestProtobufLegacyEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + When *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=when,proto3" json:"when,omitempty"` +} + +func (x *TestProtobufLegacyEvent) Reset() { + *x = TestProtobufLegacyEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_events_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TestProtobufLegacyEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TestProtobufLegacyEvent) ProtoMessage() {} + +func (x *TestProtobufLegacyEvent) ProtoReflect() protoreflect.Message { + mi := &file_events_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TestProtobufLegacyEvent.ProtoReflect.Descriptor instead. +func (*TestProtobufLegacyEvent) Descriptor() ([]byte, []int) { + return file_events_proto_rawDescGZIP(), []int{0} +} + +func (x *TestProtobufLegacyEvent) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *TestProtobufLegacyEvent) GetWhen() *timestamppb.Timestamp { + if x != nil { + return x.When + } + return nil +} + +type SubEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Tags []string `protobuf:"bytes,1,rep,name=tags,proto3" json:"tags,omitempty"` + Flags map[string]bool `protobuf:"bytes,2,rep,name=flags,proto3" json:"flags,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` +} + +func (x *SubEvent) Reset() { + *x = SubEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_events_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubEvent) ProtoMessage() {} + +func (x *SubEvent) ProtoReflect() protoreflect.Message { + mi := &file_events_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubEvent.ProtoReflect.Descriptor instead. +func (*SubEvent) Descriptor() ([]byte, []int) { + return file_events_proto_rawDescGZIP(), []int{1} +} + +func (x *SubEvent) GetTags() []string { + if x != nil { + return x.Tags + } + return nil +} + +func (x *SubEvent) GetFlags() map[string]bool { + if x != nil { + return x.Flags + } + return nil +} + +type TestComplexProtobufEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + When *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=when,proto3" json:"when,omitempty"` + // Complex fields to test edge cases + NestedMap map[string]*SubEvent `protobuf:"bytes,4,rep,name=nested_map,json=nestedMap,proto3" json:"nested_map,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Events []*SubEvent `protobuf:"bytes,5,rep,name=events,proto3" json:"events,omitempty"` + // Types that are assignable to Result: + // + // *TestComplexProtobufEvent_Success + // *TestComplexProtobufEvent_Error + // *TestComplexProtobufEvent_Fallback + Result isTestComplexProtobufEvent_Result `protobuf_oneof:"result"` +} + +func (x *TestComplexProtobufEvent) Reset() { + *x = TestComplexProtobufEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_events_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TestComplexProtobufEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TestComplexProtobufEvent) ProtoMessage() {} + +func (x *TestComplexProtobufEvent) ProtoReflect() protoreflect.Message { + mi := &file_events_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TestComplexProtobufEvent.ProtoReflect.Descriptor instead. +func (*TestComplexProtobufEvent) Descriptor() ([]byte, []int) { + return file_events_proto_rawDescGZIP(), []int{2} +} + +func (x *TestComplexProtobufEvent) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *TestComplexProtobufEvent) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *TestComplexProtobufEvent) GetWhen() *timestamppb.Timestamp { + if x != nil { + return x.When + } + return nil +} + +func (x *TestComplexProtobufEvent) GetNestedMap() map[string]*SubEvent { + if x != nil { + return x.NestedMap + } + return nil +} + +func (x *TestComplexProtobufEvent) GetEvents() []*SubEvent { + if x != nil { + return x.Events + } + return nil +} + +func (m *TestComplexProtobufEvent) GetResult() isTestComplexProtobufEvent_Result { + if m != nil { + return m.Result + } + return nil +} + +func (x *TestComplexProtobufEvent) GetSuccess() *SubEvent { + if x, ok := x.GetResult().(*TestComplexProtobufEvent_Success); ok { + return x.Success + } + return nil +} + +func (x *TestComplexProtobufEvent) GetError() string { + if x, ok := x.GetResult().(*TestComplexProtobufEvent_Error); ok { + return x.Error + } + return "" +} + +func (x *TestComplexProtobufEvent) GetFallback() Status { + if x, ok := x.GetResult().(*TestComplexProtobufEvent_Fallback); ok { + return x.Fallback + } + return Status_STATUS_UNSPECIFIED +} + +type isTestComplexProtobufEvent_Result interface { + isTestComplexProtobufEvent_Result() +} + +type TestComplexProtobufEvent_Success struct { + Success *SubEvent `protobuf:"bytes,6,opt,name=success,proto3,oneof"` +} + +type TestComplexProtobufEvent_Error struct { + Error string `protobuf:"bytes,7,opt,name=error,proto3,oneof"` +} + +type TestComplexProtobufEvent_Fallback struct { + Fallback Status `protobuf:"varint,8,opt,name=fallback,proto3,enum=cqrs_test.Status,oneof"` +} + +func (*TestComplexProtobufEvent_Success) isTestComplexProtobufEvent_Result() {} + +func (*TestComplexProtobufEvent_Error) isTestComplexProtobufEvent_Result() {} + +func (*TestComplexProtobufEvent_Fallback) isTestComplexProtobufEvent_Result() {} + +var File_events_proto protoreflect.FileDescriptor + +var file_events_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, + 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x17, 0x54, 0x65, + 0x73, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x4c, 0x65, 0x67, 0x61, 0x63, 0x79, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x2e, 0x0a, 0x04, 0x77, 0x68, 0x65, 0x6e, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, + 0x04, 0x77, 0x68, 0x65, 0x6e, 0x22, 0x8e, 0x01, 0x0a, 0x08, 0x53, 0x75, 0x62, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, + 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x12, 0x34, 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, + 0x74, 0x2e, 0x53, 0x75, 0x62, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x46, 0x6c, 0x61, 0x67, 0x73, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x1a, 0x38, 0x0a, 0x0a, + 0x46, 0x6c, 0x61, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xcb, 0x03, 0x0a, 0x18, 0x54, 0x65, 0x73, 0x74, 0x43, + 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x78, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x2e, 0x0a, 0x04, 0x77, 0x68, 0x65, 0x6e, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x04, 0x77, 0x68, 0x65, 0x6e, 0x12, 0x51, 0x0a, 0x0a, 0x6e, 0x65, 0x73, 0x74, 0x65, + 0x64, 0x5f, 0x6d, 0x61, 0x70, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x63, 0x71, + 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x43, 0x6f, 0x6d, 0x70, + 0x6c, 0x65, 0x78, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x2e, 0x4e, 0x65, 0x73, 0x74, 0x65, 0x64, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, + 0x09, 0x6e, 0x65, 0x73, 0x74, 0x65, 0x64, 0x4d, 0x61, 0x70, 0x12, 0x2b, 0x0a, 0x06, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x63, 0x71, 0x72, + 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x53, 0x75, 0x62, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, + 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x2f, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, + 0x73, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x63, 0x71, 0x72, 0x73, 0x5f, + 0x74, 0x65, 0x73, 0x74, 0x2e, 0x53, 0x75, 0x62, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x00, 0x52, + 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x12, 0x2f, 0x0a, 0x08, 0x66, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x18, 0x08, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x11, 0x2e, 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x00, 0x52, 0x08, 0x66, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, + 0x6b, 0x1a, 0x51, 0x0a, 0x0e, 0x4e, 0x65, 0x73, 0x74, 0x65, 0x64, 0x4d, 0x61, 0x70, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x29, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, + 0x2e, 0x53, 0x75, 0x62, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x3a, 0x02, 0x38, 0x01, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x4a, 0x04, + 0x08, 0x17, 0x10, 0x1f, 0x2a, 0x39, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, + 0x0a, 0x12, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, + 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, + 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x02, 0x42, + 0x0d, 0x5a, 0x0b, 0x2e, 0x2f, 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_events_proto_rawDescOnce sync.Once + file_events_proto_rawDescData = file_events_proto_rawDesc +) + +func file_events_proto_rawDescGZIP() []byte { + file_events_proto_rawDescOnce.Do(func() { + file_events_proto_rawDescData = protoimpl.X.CompressGZIP(file_events_proto_rawDescData) + }) + return file_events_proto_rawDescData +} + +var file_events_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_events_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_events_proto_goTypes = []interface{}{ + (Status)(0), // 0: cqrs_test.Status + (*TestProtobufLegacyEvent)(nil), // 1: cqrs_test.TestProtobufLegacyEvent + (*SubEvent)(nil), // 2: cqrs_test.SubEvent + (*TestComplexProtobufEvent)(nil), // 3: cqrs_test.TestComplexProtobufEvent + nil, // 4: cqrs_test.SubEvent.FlagsEntry + nil, // 5: cqrs_test.TestComplexProtobufEvent.NestedMapEntry + (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp +} +var file_events_proto_depIdxs = []int32{ + 6, // 0: cqrs_test.TestProtobufLegacyEvent.when:type_name -> google.protobuf.Timestamp + 4, // 1: cqrs_test.SubEvent.flags:type_name -> cqrs_test.SubEvent.FlagsEntry + 6, // 2: cqrs_test.TestComplexProtobufEvent.when:type_name -> google.protobuf.Timestamp + 5, // 3: cqrs_test.TestComplexProtobufEvent.nested_map:type_name -> cqrs_test.TestComplexProtobufEvent.NestedMapEntry + 2, // 4: cqrs_test.TestComplexProtobufEvent.events:type_name -> cqrs_test.SubEvent + 2, // 5: cqrs_test.TestComplexProtobufEvent.success:type_name -> cqrs_test.SubEvent + 0, // 6: cqrs_test.TestComplexProtobufEvent.fallback:type_name -> cqrs_test.Status + 2, // 7: cqrs_test.TestComplexProtobufEvent.NestedMapEntry.value:type_name -> cqrs_test.SubEvent + 8, // [8:8] is the sub-list for method output_type + 8, // [8:8] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name +} + +func init() { file_events_proto_init() } +func file_events_proto_init() { + if File_events_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_events_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TestProtobufLegacyEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_events_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_events_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TestComplexProtobufEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_events_proto_msgTypes[2].OneofWrappers = []interface{}{ + (*TestComplexProtobufEvent_Success)(nil), + (*TestComplexProtobufEvent_Error)(nil), + (*TestComplexProtobufEvent_Fallback)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_events_proto_rawDesc, + NumEnums: 1, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_events_proto_goTypes, + DependencyIndexes: file_events_proto_depIdxs, + EnumInfos: file_events_proto_enumTypes, + MessageInfos: file_events_proto_msgTypes, + }.Build() + File_events_proto = out.File + file_events_proto_rawDesc = nil + file_events_proto_goTypes = nil + file_events_proto_depIdxs = nil +} diff --git a/components/cqrs/marshaler_protobuf_gogo.go b/components/cqrs/marshaler_protobuf_gogo.go new file mode 100644 index 000000000..af3047564 --- /dev/null +++ b/components/cqrs/marshaler_protobuf_gogo.go @@ -0,0 +1,127 @@ +package cqrs + +import ( + "fmt" + "runtime/debug" + + stderrors "errors" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + stdproto "google.golang.org/protobuf/proto" +) + +// ProtobufMarshaler a protobuf marshaler using github.com/gogo/protobuf/proto (deprecated). +// +// DEPRECATED: Use ProtoMarshaler instead. This marshaler will not work with newer protobuf files. +// IMPORTANT: This marshaler is backward and forward compatible with ProtoMarshaler. +// ProtobufMarshaler from Watermill versions until v1.4.3 are not forward compatible with ProtoMarshaler. +// Suggested migration steps: +// 1. Update Watermill to v1.4.4 or newer, so all publishers and subscribers will be forward and backward compatible. +// 2. Change all usages of ProtobufMarshaler to ProtoMarshaler. +type ProtobufMarshaler struct { + NewUUID func() string + GenerateName func(v interface{}) string + + // DisableStdProtoFallback disables fallback to github.com/golang/protobuf/proto when github.com/gogo/protobuf/proto + // because receiving a message that was marshaled with github.com/golang/protobuf/proto. + // Fallback is enabled by default to enable migration to ProtoMarshaler. + DisableStdProtoFallback bool +} + +// Marshal marshals the given protobuf's message into watermill's Message. +func (m ProtobufMarshaler) Marshal(v interface{}) (msg *message.Message, err error) { + defer func() { + // gogo proto can panic on unmarshal (for example, because it received a message from ProtoMarshaler with oneof) + if r := recover(); r != nil { + err = stderrors.Join(err, fmt.Errorf( + "github.com/gogo/protobuf/proto panic (we recommend migrating marshaler to cqrs.ProtoMarshaler to avoid that): %v\n%s", + r, + string(debug.Stack()), + )) + } + + if err != nil && !m.DisableStdProtoFallback { + _, isStdProtoMsg := v.(stdproto.Message) + + if isStdProtoMsg { + msg, err = m.ToProtoMarshaler().Marshal(v) + } + } + }() + + protoMsg, ok := v.(proto.Message) + if !ok { + return nil, errors.WithStack(NoProtoMessageError{v}) + } + + b, err := proto.Marshal(protoMsg) + if err != nil { + return nil, err + } + + msg = message.NewMessage( + m.newUUID(), + b, + ) + msg.Metadata.Set("name", m.Name(v)) + + return msg, nil +} + +func (m ProtobufMarshaler) newUUID() string { + if m.NewUUID != nil { + return m.NewUUID() + } + + // default + return watermill.NewUUID() +} + +// Unmarshal unmarshals given watermill's Message into protobuf's message. +func (m ProtobufMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error) { + protoV, ok := v.(proto.Message) + if !ok { + return errors.WithStack(NoProtoMessageError{v}) + } + + defer func() { + // gogo proto can panic on unmarshal (for example, because it received a message from ProtoMarshaler with oneof) + if r := recover(); r != nil { + err = stderrors.Join(err, fmt.Errorf( + "github.com/gogo/protobuf/proto panic (we recommend migrating marshaler to cqrs.ProtoMarshaler to avoid that): %v\n%s", + r, + string(debug.Stack()), + )) + } + + if err != nil && !m.DisableStdProtoFallback { + err = m.ToProtoMarshaler().Unmarshal(msg, v) + } + }() + + return proto.Unmarshal(msg.Payload, protoV) +} + +func (m ProtobufMarshaler) ToProtoMarshaler() ProtoMarshaler { + return ProtoMarshaler{ + NewUUID: m.NewUUID, + GenerateName: m.GenerateName, + } +} + +// Name returns the command or event's name. +func (m ProtobufMarshaler) Name(cmdOrEvent interface{}) string { + if m.GenerateName != nil { + return m.GenerateName(cmdOrEvent) + } + + return FullyQualifiedStructName(cmdOrEvent) +} + +// NameFromMessage returns the metadata name value for a given Message. +func (m ProtobufMarshaler) NameFromMessage(msg *message.Message) string { + return msg.Metadata.Get("name") +} diff --git a/components/cqrs/marshaler_protobuf_gogo_test.go b/components/cqrs/marshaler_protobuf_gogo_test.go new file mode 100644 index 000000000..625c91e8a --- /dev/null +++ b/components/cqrs/marshaler_protobuf_gogo_test.go @@ -0,0 +1,142 @@ +package cqrs_test + +import ( + "testing" + + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProtobufMarshaler_with_fallback(t *testing.T) { + marshaler := cqrs.ProtobufMarshaler{} + + assertProtoMarshalUnmarshal( + t, + marshaler, + marshaler, + newProtoTestComplexEvent(), + &TestComplexProtobufEvent{}, + "cqrs_test.TestComplexProtobufEvent", + ) + + legacyEvent, _ := newProtoLegacyTestEvent() + assertProtoMarshalUnmarshal( + t, + marshaler, + marshaler, + legacyEvent, + &TestProtobufEvent{}, + "cqrs_test.TestProtobufEvent", + ) +} + +func TestProtobufMarshaler_without_fallback_legacy_event(t *testing.T) { + legacyEvent, _ := newProtoLegacyTestEvent() + + marshaler := cqrs.ProtobufMarshaler{ + DisableStdProtoFallback: true, + } + + assertProtoMarshalUnmarshal( + t, + marshaler, + marshaler, + legacyEvent, + &TestProtobufEvent{}, + "cqrs_test.TestProtobufEvent", + ) +} + +func TestProtobufMarshaler_Marshal_generated_name(t *testing.T) { + marshaler := cqrs.ProtobufMarshaler{ + NewUUID: func() string { + return "foo" + }, + } + + msg, err := marshaler.Marshal(newProtoTestComplexEvent()) + require.NoError(t, err) + + assert.Equal(t, msg.UUID, "foo") +} + +func TestProtobufMarshaler_catch_panic(t *testing.T) { + marshalerNoFallback := cqrs.ProtobufMarshaler{ + DisableStdProtoFallback: true, + } + marshalerWithFallback := cqrs.ProtobufMarshaler{ + DisableStdProtoFallback: false, + } + + complexEvent := newProtoTestComplexEvent() + + msg, err := marshalerNoFallback.Marshal(complexEvent) + assert.Nil(t, msg) + assert.ErrorContains(t, err, "(we recommend migrating marshaler to cqrs.ProtoMarshaler to avoid that)") + assert.ErrorContains(t, err, "invalid memory address or nil pointer dereference") + assert.ErrorContains(t, err, "github.com/gogo/protobuf/proto panic") + assert.ErrorContains(t, err, "runtime/debug.Stack()", "error should contain stack trace") + + // let's simulate situation when publishing service uses fallback and consuming service does not + msg, err = marshalerWithFallback.Marshal(complexEvent) + require.NoError(t, err) + + err = marshalerNoFallback.Unmarshal(msg, &TestComplexProtobufEvent{}) + assert.ErrorContains(t, err, "(we recommend migrating marshaler to cqrs.ProtoMarshaler to avoid that)") + assert.ErrorContains(t, err, "protobuf tag not enough fields in TestComplexProtobufEvent.state") + assert.ErrorContains(t, err, "github.com/gogo/protobuf/proto panic") + assert.ErrorContains(t, err, "runtime/debug.Stack()", "error should contain stack trace") + + // marshaler with fallback should handle this message + err = marshalerWithFallback.Unmarshal(msg, &TestComplexProtobufEvent{}) + require.NoError(t, err) +} + +func TestProtobufMarshaler_compatible_with_ProtoMarshaler(t *testing.T) { + legacyEvent, legacyEventRegenerated := newProtoLegacyTestEvent() + complexEvent := newProtoTestComplexEvent() + + deprecatedMarshaler := cqrs.ProtobufMarshaler{} + newMarshaler := cqrs.ProtoMarshaler{} + + t.Run("from_deprecated_to_new", func(t *testing.T) { + assertProtoMarshalUnmarshal( + t, + deprecatedMarshaler, + newMarshaler, + complexEvent, + &TestComplexProtobufEvent{}, + "cqrs_test.TestComplexProtobufEvent", + ) + + assertProtoMarshalUnmarshal( + t, + deprecatedMarshaler, + newMarshaler, + legacyEvent, + &TestProtobufLegacyEvent{}, + "cqrs_test.TestProtobufEvent", + ) + }) + + t.Run("from_new_to_deprecated", func(t *testing.T) { + assertProtoMarshalUnmarshal( + t, + newMarshaler, + deprecatedMarshaler, + complexEvent, + &TestComplexProtobufEvent{}, + "cqrs_test.TestComplexProtobufEvent", + ) + + assertProtoMarshalUnmarshal( + t, + newMarshaler, + deprecatedMarshaler, + legacyEventRegenerated, + &TestProtobufEvent{}, + "cqrs_test.TestProtobufLegacyEvent", + ) + }) +} diff --git a/components/cqrs/marshaler_protobuf_test.go b/components/cqrs/marshaler_protobuf_test.go index 276ea83dc..d6ae116e5 100644 --- a/components/cqrs/marshaler_protobuf_test.go +++ b/components/cqrs/marshaler_protobuf_test.go @@ -1,6 +1,8 @@ package cqrs_test import ( + "encoding/json" + "fmt" "testing" "time" @@ -12,41 +14,101 @@ import ( "github.com/ThreeDotsLabs/watermill/components/cqrs" ) -func TestProtobufMarshaler(t *testing.T) { - marshaler := cqrs.ProtobufMarshaler{} +func TestProtoMarshaler(t *testing.T) { + assertProtoMarshalUnmarshal( + t, + cqrs.ProtoMarshaler{}, + cqrs.ProtoMarshaler{}, + newProtoTestComplexEvent(), + &TestComplexProtobufEvent{}, + "cqrs_test.TestComplexProtobufEvent", + ) +} - when := timestamppb.New(time.Now()) - eventToMarshal := &TestProtobufEvent{ - Id: watermill.NewULID(), - When: when, +func TestProtoMarshaler_Marshal_generated_name(t *testing.T) { + marshaler := cqrs.ProtoMarshaler{ + NewUUID: func() string { + return "foo" + }, } - msg, err := marshaler.Marshal(eventToMarshal) - require.NoError(t, err) - - eventToUnmarshal := &TestProtobufEvent{} - err = marshaler.Unmarshal(msg, eventToUnmarshal) + msg, err := marshaler.Marshal(newProtoTestComplexEvent()) require.NoError(t, err) - assert.EqualValues(t, eventToMarshal.String(), eventToUnmarshal.String()) - assert.Equal(t, msg.Metadata.Get("name"), "cqrs_test.TestProtobufEvent") + assert.Equal(t, msg.UUID, "foo") } -func TestProtobufMarshaler_Marshal_generated_name(t *testing.T) { - marshaler := cqrs.ProtobufMarshaler{ - NewUUID: func() string { - return "foo" - }, +// newProtoLegacyTestEvent returns the same event in two different protobuf versions +func newProtoLegacyTestEvent() (*TestProtobufEvent, *TestProtobufLegacyEvent) { + when := timestamppb.New(time.Now()) + id := watermill.NewULID() + + legacy := &TestProtobufEvent{ + Id: id, + When: when, } + regenerated := &TestProtobufLegacyEvent{ + Id: id, + When: when, + } + + return legacy, regenerated +} + +func newProtoTestComplexEvent() *TestComplexProtobufEvent { when := timestamppb.New(time.Now()) - eventToMarshal := &TestProtobufEvent{ + + eventToMarshal := &TestComplexProtobufEvent{ Id: watermill.NewULID(), + Data: []byte("data"), When: when, + NestedMap: map[string]*SubEvent{ + "foo": { + Tags: []string{"tag1", "tag2"}, + Flags: map[string]bool{"flag1": true, "flag2": false}, + }, + }, + Events: []*SubEvent{ + { + Tags: []string{"tag1", "tag2"}, + }, + { + Tags: []string{"tag3", "tag4"}, + }, + }, + Result: &TestComplexProtobufEvent_Success{ + Success: &SubEvent{ + Tags: []string{"tag10"}, + Flags: map[string]bool{"flag10": true}, + }, + }, } + return eventToMarshal +} + +func assertProtoMarshalUnmarshal[T1, T2 fmt.Stringer]( + t *testing.T, + marshaler cqrs.CommandEventMarshaler, + unmarshaler cqrs.CommandEventMarshaler, + eventToMarshal T1, + eventToUnmarshal T2, + expectedEventName string, +) { + t.Helper() msg, err := marshaler.Marshal(eventToMarshal) require.NoError(t, err) - assert.Equal(t, msg.UUID, "foo") + err = unmarshaler.Unmarshal(msg, eventToUnmarshal) + require.NoError(t, err) + + eventToMarshalJson, err := json.Marshal(eventToMarshal) + require.NoError(t, err) + + eventToUnmarshalJson, err := json.Marshal(eventToUnmarshal) + require.NoError(t, err) + + assert.JSONEq(t, string(eventToMarshalJson), string(eventToUnmarshalJson)) + assert.Equal(t, expectedEventName, msg.Metadata.Get("name")) } diff --git a/components/cqrs/testdata/events.proto b/components/cqrs/testdata/events.proto index b4df7d89f..4de0c9827 100644 --- a/components/cqrs/testdata/events.proto +++ b/components/cqrs/testdata/events.proto @@ -1,9 +1,38 @@ syntax = "proto3"; package cqrs_test; +option go_package = "./cqrs_test"; import "google/protobuf/timestamp.proto"; -message TestProtobufEvent { +message TestProtobufLegacyEvent { string id = 1; google.protobuf.Timestamp when = 3; } + +enum Status { + STATUS_UNSPECIFIED = 0; + ACTIVE = 1; + DELETED = 2; +} + +message SubEvent { + repeated string tags = 1; + map flags = 2; +} + +message TestComplexProtobufEvent { + string id = 1; + bytes data = 2; + google.protobuf.Timestamp when = 3; + + map nested_map = 4; + repeated SubEvent events = 5; + + oneof result { + SubEvent success = 6; + string error = 7; + Status fallback = 8; + } + + reserved 23 to 30; +} diff --git a/go.sum b/go.sum index fd1e8ca4f..ef28ac9a7 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,11 @@ github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -89,6 +92,7 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=