Skip to content
This repository has been archived by the owner on May 13, 2022. It is now read-only.

Commit

Permalink
Fix finitely bounded event streams in the presence of empty blocks.
Browse files Browse the repository at this point in the history
ExecutionEventsServer assumes every block will be represented in
state, but this is not true for empty blocks.

Signed-off-by: Silas Davis <[email protected]>
  • Loading branch information
Silas Davis committed May 20, 2021
1 parent 394d62a commit baaaf87
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
28 changes: 28 additions & 0 deletions integration/rpcevents/execution_events_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,34 @@ func TestExecutionEventsTest(t *testing.T) {
n := countEventsAndCheckConsecutive(t, evs)
assert.Equal(t, 0, n, "should not see reverted events")
})

// This test triggered a bug when using 'latest' as the end bound and where the latest block is an empty block
// leading to streaming until another block is emitted causing clients to hang around much longer than they
// should
t.Run("GetEventsWithLatestBlockEmpty", func(t *testing.T) {
numSends := 5
blockRange := doSends(t, numSends, tcli, kern, inputAddress0, 999)
request := &rpcevents.BlocksRequest{
BlockRange: &rpcevents.BlockRange{
Start: blockRange.Start,
End: rpcevents.LatestBound(),
},
}

n := 400
wait := time.Millisecond
before := time.Now()
for i := 0; i < n; i++ {
time.Sleep(wait)
responses, err := getEvents(t, request, ecli)
require.NoError(t, err)
assert.Equal(t, 2*numSends, countEventsAndCheckConsecutive(t, responses), "should receive every single input event per send")
}
elapsed := time.Now().Sub(before)
// This should complete very quickly unless it accidentally starts streaming in which case it will keep
// waiting for an empty block at each iteration after the bug is triggered
require.Less(t, elapsed, time.Duration(n)*wait*10)
})
})
}

Expand Down
7 changes: 4 additions & 3 deletions rpc/rpcevents/execution_events_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,16 @@ func (ees *executionEventsServer) Events(request *BlocksRequest, stream Executio
func (ees *executionEventsServer) streamEvents(ctx context.Context, blockRange *BlockRange,
consumer func(execution *exec.StreamEvent) error) error {

start, end, streaming := blockRange.Bounds(ees.tip.LastBlockHeight())
lastBlockHeight := ees.tip.LastBlockHeight()
start, end, streaming := blockRange.Bounds(lastBlockHeight)
ees.logger.TraceMsg("Streaming blocks", "start", start, "end", end, "streaming", streaming)

// Pull blocks from state and receive the upper bound (exclusive) on the what we were able to send
// Set this to start since it will be the start of next streaming batch (if needed)
start, err := ees.iterateStreamEvents(start, end, consumer)

// If we are not streaming and all blocks requested were retrieved from state then we are done
if !streaming && start > end {
// If we are not streaming and all (non-empty) blocks up to and including end are available from state then we are done
if !streaming && end <= lastBlockHeight {
return err
}

Expand Down

0 comments on commit baaaf87

Please sign in to comment.