Skip to content

Commit

Permalink
Add logging and optional callback to buffer fills (#297)
Browse files Browse the repository at this point in the history
Slow client related issues:

- `client.maintainConnection` waits for the `messageProcessor` goroutine(s) to
  finish to reconnect: this delays the reconnect and more messages will be missed
- whenever the client buffer `c.in` is full the `connReader` goroutine is blocked
  • Loading branch information
leki75 authored Aug 9, 2024
1 parent 31f709b commit 1c54c5a
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 9 deletions.
40 changes: 33 additions & 7 deletions marketdata/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type client struct {
in chan []byte
subChanges chan []byte

bufferFillCallback func([]byte)
lastBufferFill time.Time
droppedMsgCount int

sub subscriptions

handler msgHandler
Expand All @@ -58,6 +62,7 @@ func (c *client) configure(o options) {
c.reconnectLimit = o.reconnectLimit
c.reconnectDelay = o.reconnectDelay
c.connectCallback = o.connectCallback
c.bufferFillCallback = o.bufferFillCallback
c.disconnectCallback = o.disconnectCallback
c.processorCount = o.processorCount
c.bufferSize = o.bufferSize
Expand Down Expand Up @@ -387,6 +392,17 @@ func (c *client) maintainConnection(ctx context.Context, u url.URL, initialResul
}
}

c.in = make(chan []byte, c.bufferSize)
pwg := sync.WaitGroup{}
pwg.Add(c.processorCount)
for i := 0; i < c.processorCount; i++ {
go c.messageProcessor(ctx, &pwg)
}
defer func() {
close(c.in)
pwg.Wait()
}()

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -450,17 +466,14 @@ func (c *client) maintainConnection(ctx context.Context, u url.URL, initialResul
}
failedAttemptsInARow = 0

c.in = make(chan []byte, c.bufferSize)
wg := sync.WaitGroup{}
wg.Add(c.processorCount + 3)
wg.Add(3)
closeCh := make(chan struct{})
for i := 0; i < c.processorCount; i++ {
go c.messageProcessor(ctx, &wg)
}
go c.connPinger(ctx, &wg, closeCh)
go c.connReader(ctx, &wg, closeCh)
go c.connWriter(ctx, &wg, closeCh)
wg.Wait()

if ctx.Err() != nil {
c.logger.Infof("datav2stream: disconnected")
} else {
Expand Down Expand Up @@ -573,7 +586,6 @@ func (c *client) connReader(
defer func() {
close(closeCh)
c.conn.close()
close(c.in)
wg.Done()
}()

Expand All @@ -586,7 +598,21 @@ func (c *client) connReader(
return
}

c.in <- msg
select {
case c.in <- msg:
default:
c.droppedMsgCount++
now := time.Now()
// Reduce the number of logs to 1 msg/sec if client buffer is full
if now.Add(-1 * time.Second).After(c.lastBufferFill) {
c.logger.Warnf("datav2stream: writing to buffer failed, error: buffer full, dropped: %d", c.droppedMsgCount)
c.droppedMsgCount = 0
c.lastBufferFill = now
}
if c.bufferFillCallback != nil {
c.bufferFillCallback(msg)
}
}
}
}

Expand Down
79 changes: 79 additions & 0 deletions marketdata/stream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vmihailenco/msgpack/v5"

"github.com/alpacahq/alpaca-trade-api-go/v3/marketdata"
)
Expand Down Expand Up @@ -841,6 +842,84 @@ func TestSubscribeFailsDueToError(t *testing.T) {
require.True(t, errors.Is(err, ErrSubscriptionChangeInvalidForFeed))
}

func assertBufferFills(t *testing.T, bufferFills, trades chan Trade, minID, maxID, minTrades int) {
timer := time.NewTimer(100 * time.Millisecond)
count := maxID - minID + 1
minFills := count - minTrades - 1

sumTrades := 0
sumFills := 0
for i := 0; i < minFills; i++ {
select {
case trade := <-bufferFills:
sumFills++
assert.LessOrEqual(t, int64(minID), trade.ID)
assert.GreaterOrEqual(t, int64(maxID), trade.ID)
case <-timer.C:
require.Fail(t, "buffer fill timeout")
}
}

for i := minFills; i < count; i++ {
select {
case trade := <-bufferFills:
sumFills++
assert.LessOrEqual(t, int64(minID), trade.ID)
assert.GreaterOrEqual(t, int64(maxID), trade.ID)
case trade := <-trades:
sumTrades++
assert.LessOrEqual(t, int64(minID), trade.ID)
assert.GreaterOrEqual(t, int64(maxID), trade.ID)
}
}

assert.LessOrEqual(t, minFills, sumFills)
assert.LessOrEqual(t, minTrades, sumTrades)
}

func TestCallbacksCalledOnBufferFill(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

connection := newMockConn()
defer connection.close()

writeInitialFlowMessagesToConn(t, connection, subscriptions{trades: []string{"ALPACA"}})

const bufferSize = 2
bufferFills := make(chan Trade, 10)
trades := make(chan Trade)

c := NewStocksClient(marketdata.IEX,
WithBufferSize(bufferSize),
WithBufferFillCallback(func(msg []byte) {
trades := []tradeWithT{}
require.NoError(t, msgpack.Unmarshal(msg, &trades))
bufferFills <- Trade{
ID: trades[0].ID,
Symbol: trades[0].Symbol,
}
}),
withConnCreator(func(ctx context.Context, u url.URL) (conn, error) { return connection, nil }),
WithTrades(func(t Trade) { trades <- t }, "ALPACA"),
)
require.NoError(t, c.Connect(ctx))

// The buffer size is 2 but we send at least 4 (2 buffer size, 1
// messageProcessor goroutine, 1 extra) trades to have a buffer fill. The
// messageProcessor goroutines can read c.in while the rest of messages can
// be queued in the buffered channel.
for id := int64(1); id <= 4; id++ {
connection.readCh <- serializeToMsgpack(t, []any{tradeWithT{Type: "t", Symbol: "ALPACA", ID: id}})
}
assertBufferFills(t, bufferFills, trades, 1, 4, bufferSize)

for id := int64(5); id <= 10; id++ {
connection.readCh <- serializeToMsgpack(t, []any{tradeWithT{Type: "t", Symbol: "ALPACA", ID: id}})
}
assertBufferFills(t, bufferFills, trades, 5, 10, bufferSize)
}

func TestPingFails(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions marketdata/stream/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"sync"
)

var errClose = errors.New("closed")
var errPingDisabled = errors.New("ping disabled")
var (
errClose = errors.New("closed")
errPingDisabled = errors.New("ping disabled")
)

type mockConn struct {
pingCh chan struct{}
Expand Down
12 changes: 12 additions & 0 deletions marketdata/stream/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type options struct {
reconnectLimit int
reconnectDelay time.Duration
connectCallback func()
bufferFillCallback func([]byte)
disconnectCallback func()
processorCount int
bufferSize int
Expand Down Expand Up @@ -123,6 +124,17 @@ func WithConnectCallback(callback func()) Option {
})
}

// WithBufferFillCallback runs the callback function whenever the buffer is full
// and msg cannot be delivered. This usually happens when trade/quote handlers
// process the messages slowly and they cannot keep up with the pace how messages
// are received. This callback should run fast, so avoid any blocking
// instructions in the callback.
func WithBufferFillCallback(callback func(msg []byte)) Option {
return newFuncOption(func(o *options) {
o.bufferFillCallback = callback
})
}

// WithDisconnectCallback runs the callback function after the streaming connection disconnects.
// If the stream is terminated and can't reconnect, the disconnect callback will timeout one second
// after reaching the end of the stream's maintenance (if it is still running). This is to avoid
Expand Down

0 comments on commit 1c54c5a

Please sign in to comment.