Skip to content

Commit

Permalink
[BUG FIX] Message not delivered to Processor (#174)
Browse files Browse the repository at this point in the history
* fixed inbox bug

* trigger  build

* increased timeout duration

* trigger build

---------

Co-authored-by: TheTGKing <[email protected]>
  • Loading branch information
troygilman0 and troygilman0 authored Jan 3, 2025
1 parent 9721d9c commit 02bbf30
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
6 changes: 5 additions & 1 deletion actor/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ func (in *Inbox) schedule() {

func (in *Inbox) process() {
in.run()
atomic.CompareAndSwapInt32(&in.procStatus, running, idle)
if atomic.CompareAndSwapInt32(&in.procStatus, running, idle) && in.rb.Len() > 0 {
// messages might have been added to the ring-buffer between the last pop and the transition to idle.
// if this is the case, then we should schedule again
in.schedule()
}
}

func (in *Inbox) run() {
Expand Down
27 changes: 27 additions & 0 deletions actor/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,33 @@ func TestInboxSendAndProcess(t *testing.T) {
inbox.Stop()
}

func TestInboxSendAndProcessMany(t *testing.T) {
for i := 0; i < 100000; i++ {
inbox := NewInbox(10)
processedMessages := make(chan Envelope, 10)
mockProc := MockProcesser{
processFunc: func(envelopes []Envelope) {
for _, e := range envelopes {
processedMessages <- e
}
},
}
inbox.Start(mockProc)
msg := Envelope{}
inbox.Send(msg)

timer := time.NewTimer(time.Second)
select {
case <-processedMessages: // Message processed
case <-timer.C:
t.Errorf("Message was not processed in time")
}
timer.Stop()

inbox.Stop()
}
}

type MockProcesser struct {
processFunc func([]Envelope)
}
Expand Down

0 comments on commit 02bbf30

Please sign in to comment.