Skip to content

Commit

Permalink
Merge pull request #1648 from kaleido-io/fix-transportwrapper-network…
Browse files Browse the repository at this point in the history
…name

[privatemessaging] [operations] Ensure Transports are Networked for Retried Messages
  • Loading branch information
peterbroadhurst authored Feb 28, 2025
2 parents 75bb921 + 0569b53 commit 490539e
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 8 deletions.
3 changes: 2 additions & 1 deletion internal/privatemessaging/operations.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -130,6 +130,7 @@ func (pm *privateMessaging) PrepareOperation(ctx context.Context, op *core.Opera
return nil, err
}
transport := &core.TransportWrapper{Group: group, Batch: batch}
pm.prepareBatchForNetworkTransport(ctx, transport)
return opSendBatch(op, node, transport), nil

default:
Expand Down
3 changes: 2 additions & 1 deletion internal/privatemessaging/operations_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -149,6 +149,7 @@ func TestPrepareAndRunBatchSend(t *testing.T) {
assert.Equal(t, node, po.Data.(batchSendData).Node)
assert.Equal(t, group, po.Data.(batchSendData).Transport.Group)
assert.Equal(t, batch, po.Data.(batchSendData).Transport.Batch)
assert.Equal(t, "ns1-remote", po.Data.(batchSendData).Transport.Batch.Namespace) // ensure its set to the network name not the local namespace name

_, phase, err := pm.RunOperation(context.Background(), po)

Expand Down
12 changes: 10 additions & 2 deletions internal/privatemessaging/privatemessaging.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -179,12 +179,20 @@ func (pm *privateMessaging) dispatchUnpinnedBatch(ctx context.Context, payload *
return pm.dispatchBatchCommon(ctx, payload)
}

// prepareBatchForNetworkTransport is mainly for documentation purposes, we need to "normalize" a batch before sending
// it over the network to other parties, so that all nodes see the same batch, regardless of what they call their local
// namespace. This is used when we dispatch a batch per the regular messaging flow, and when we retry a private messaging
// send operation.
func (pm *privateMessaging) prepareBatchForNetworkTransport(ctx context.Context, tw *core.TransportWrapper) {
tw.Batch.Namespace = pm.namespace.NetworkName
}

func (pm *privateMessaging) dispatchBatchCommon(ctx context.Context, payload *batch.DispatchPayload) error {
batch := payload.Batch.GenInflight(payload.Messages, payload.Data)
batch.Namespace = pm.namespace.NetworkName
tw := &core.TransportWrapper{
Batch: batch,
}
pm.prepareBatchForNetworkTransport(ctx, tw)

// Retrieve the group
group, nodes, err := pm.groupManager.getGroupNodes(ctx, batch.Group, false /* fail if not found */)
Expand Down
4 changes: 2 additions & 2 deletions internal/privatemessaging/privatemessaging_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -84,7 +84,7 @@ func newTestPrivateMessagingCommon(t *testing.T, metricsEnabled bool) (*privateM
mmi.On("IsMetricsEnabled").Return(metricsEnabled)
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1-remote"}
pm, err := NewPrivateMessaging(ctx, ns, mdi, mdx, mbi, mim, mba, mdm, msa, mmp, mmi, mom, cmi)
assert.NoError(t, err)
cmi.AssertCalled(t, "GetCache", cache.NewCacheConfig(
Expand Down
4 changes: 2 additions & 2 deletions internal/privatemessaging/recipients_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestResolveMemberListNewGroupE2E(t *testing.T) {
um.RunFn = func(a mock.Arguments) {
msg := a[1].(*core.Message)
assert.Equal(t, core.MessageTypeGroupInit, msg.Header.Type)
assert.Equal(t, "ns1", msg.Header.Namespace)
assert.Equal(t, "ns1-remote", msg.Header.Namespace) // note this matches the remote network name, not the local namespace
assert.Len(t, msg.Data, 1)
assert.Equal(t, *dataID, *msg.Data[0].ID)
}
Expand Down

0 comments on commit 490539e

Please sign in to comment.