Skip to content

Commit

Permalink
[privatemessaging] [operations] Ensure Transports are Networked for R…
Browse files Browse the repository at this point in the history
…etried Messages

Signed-off-by: hfuss <[email protected]>
  • Loading branch information
onelapahead committed Feb 28, 2025
1 parent 8a0d5c4 commit cd214c8
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 3 deletions.
1 change: 1 addition & 0 deletions internal/privatemessaging/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (pm *privateMessaging) PrepareOperation(ctx context.Context, op *core.Opera
return nil, err
}
transport := &core.TransportWrapper{Group: group, Batch: batch}
pm.prepareBatchForNetworkTransport(ctx, transport)
return opSendBatch(op, node, transport), nil

default:
Expand Down
1 change: 1 addition & 0 deletions internal/privatemessaging/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func TestPrepareAndRunBatchSend(t *testing.T) {
assert.Equal(t, node, po.Data.(batchSendData).Node)
assert.Equal(t, group, po.Data.(batchSendData).Transport.Group)
assert.Equal(t, batch, po.Data.(batchSendData).Transport.Batch)
assert.Equal(t, "ns1-remote", po.Data.(batchSendData).Transport.Batch.Namespace) // ensure its set to the network name not the local namespace name

_, phase, err := pm.RunOperation(context.Background(), po)

Expand Down
10 changes: 9 additions & 1 deletion internal/privatemessaging/privatemessaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,20 @@ func (pm *privateMessaging) dispatchUnpinnedBatch(ctx context.Context, payload *
return pm.dispatchBatchCommon(ctx, payload)
}

// prepareBatchForNetworkTransport is mainly for documentation purposes, we need to "normalize" a batch before sending
// it over the network to other parties, so that all nodes see the same batch, regardless of what they call their local
// namespace. This is used when we dispatch a batch per the regular messaging flow, and when we retry a private messaging
// send operation.
func (pm *privateMessaging) prepareBatchForNetworkTransport(ctx context.Context, tw *core.TransportWrapper) {
tw.Batch.Namespace = pm.namespace.NetworkName
}

func (pm *privateMessaging) dispatchBatchCommon(ctx context.Context, payload *batch.DispatchPayload) error {
batch := payload.Batch.GenInflight(payload.Messages, payload.Data)
batch.Namespace = pm.namespace.NetworkName
tw := &core.TransportWrapper{
Batch: batch,
}
pm.prepareBatchForNetworkTransport(ctx, tw)

// Retrieve the group
group, nodes, err := pm.groupManager.getGroupNodes(ctx, batch.Group, false /* fail if not found */)
Expand Down
2 changes: 1 addition & 1 deletion internal/privatemessaging/privatemessaging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newTestPrivateMessagingCommon(t *testing.T, metricsEnabled bool) (*privateM
mmi.On("IsMetricsEnabled").Return(metricsEnabled)
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1-remote"}
pm, err := NewPrivateMessaging(ctx, ns, mdi, mdx, mbi, mim, mba, mdm, msa, mmp, mmi, mom, cmi)
assert.NoError(t, err)
cmi.AssertCalled(t, "GetCache", cache.NewCacheConfig(
Expand Down
2 changes: 1 addition & 1 deletion internal/privatemessaging/recipients_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestResolveMemberListNewGroupE2E(t *testing.T) {
um.RunFn = func(a mock.Arguments) {
msg := a[1].(*core.Message)
assert.Equal(t, core.MessageTypeGroupInit, msg.Header.Type)
assert.Equal(t, "ns1", msg.Header.Namespace)
assert.Equal(t, "ns1-remote", msg.Header.Namespace) // note this matches the remote network name, not the local namespace
assert.Len(t, msg.Data, 1)
assert.Equal(t, *dataID, *msg.Data[0].ID)
}
Expand Down

0 comments on commit cd214c8

Please sign in to comment.