From 02bbf305ba7a802637a11e427dd73d02fed5159b Mon Sep 17 00:00:00 2001 From: Troy Gilman <32723159+troygilman0@users.noreply.github.com> Date: Fri, 3 Jan 2025 10:05:33 -0500 Subject: [PATCH] [BUG FIX] Message not delivered to Processor (#174) * fixed inbox bug * trigger build * increased timeout duration * trigger build --------- Co-authored-by: TheTGKing <32723159+TheTGKing@users.noreply.github.com> --- actor/inbox.go | 6 +++++- actor/inbox_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/actor/inbox.go b/actor/inbox.go index 726c905..e47c113 100644 --- a/actor/inbox.go +++ b/actor/inbox.go @@ -72,7 +72,11 @@ func (in *Inbox) schedule() { func (in *Inbox) process() { in.run() - atomic.CompareAndSwapInt32(&in.procStatus, running, idle) + if atomic.CompareAndSwapInt32(&in.procStatus, running, idle) && in.rb.Len() > 0 { + // messages might have been added to the ring-buffer between the last pop and the transition to idle. + // if this is the case, then we should schedule again + in.schedule() + } } func (in *Inbox) run() { diff --git a/actor/inbox_test.go b/actor/inbox_test.go index 6a5a347..678fd5f 100644 --- a/actor/inbox_test.go +++ b/actor/inbox_test.go @@ -31,6 +31,33 @@ func TestInboxSendAndProcess(t *testing.T) { inbox.Stop() } +func TestInboxSendAndProcessMany(t *testing.T) { + for i := 0; i < 100000; i++ { + inbox := NewInbox(10) + processedMessages := make(chan Envelope, 10) + mockProc := MockProcesser{ + processFunc: func(envelopes []Envelope) { + for _, e := range envelopes { + processedMessages <- e + } + }, + } + inbox.Start(mockProc) + msg := Envelope{} + inbox.Send(msg) + + timer := time.NewTimer(time.Second) + select { + case <-processedMessages: // Message processed + case <-timer.C: + t.Errorf("Message was not processed in time") + } + timer.Stop() + + inbox.Stop() + } +} + type MockProcesser struct { processFunc func([]Envelope) }