Skip to content
This repository has been archived by the owner on May 13, 2022. It is now read-only.

Commit

Permalink
Implement throttling for Ethereum consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Silas Davis <[email protected]>
  • Loading branch information
Silas Davis committed Mar 23, 2021
1 parent 9cfe126 commit c2133a9
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 12 deletions.
1 change: 0 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: make test_integration_ethereum
- run: make test_integration_vent_complete

docker:
Expand Down
2 changes: 1 addition & 1 deletion acm/validator/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/hyperledger/burrow/crypto"
)

// Ring stores the validator power history in buckets as a riNng buffer. The primary storage is a the difference between
// Ring stores the validator power history in buckets as a ring buffer. The primary storage is a the difference between
// each rotation (commit - i.e. block) in 'delta' and the cumulative result of each delta in cum, where the result of
// the delta at i is stored in the cum at i+1. For example suppose we have 4 buckets then graphically:
//
Expand Down
28 changes: 27 additions & 1 deletion cmd/burrow/commands/vent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io/ioutil"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -56,6 +58,7 @@ func Vent(output Output) func(cmd *cli.Cmd) {
watchAddressesOpt := cmd.StringsOpt("watch", nil, "Add contract address to global watch filter")
minimumHeightOpt := cmd.IntOpt("minimum-height", 0, "Only process block greater than or equal to height passed")
maxRetriesOpt := cmd.IntOpt("max-retries", int(cfg.BlockConsumerConfig.MaxRetries), "Maximum number of retries when consuming blocks")
maxRequestRateOpt := cmd.StringOpt("max-request-rate", "", "Maximum request rate given as (number of requests)/(time base), e.g. 1000/24h for 1000 requests per day")
backoffDurationOpt := cmd.StringOpt("backoff", "",
"The minimum duration to wait before asking for new blocks - increases exponentially when errors occur. Values like 200ms, 1s, 2m")
batchSizeOpt := cmd.IntOpt("batch-size", int(cfg.BlockConsumerConfig.MaxBlockBatchSize),
Expand All @@ -77,6 +80,10 @@ func Vent(output Output) func(cmd *cli.Cmd) {
cfg.HTTPListenAddress = *httpAddrOpt
cfg.WatchAddresses = make([]crypto.Address, len(*watchAddressesOpt))
cfg.MinimumHeight = uint64(*minimumHeightOpt)
cfg.BlockConsumerConfig.MaxRequests, cfg.BlockConsumerConfig.TimeBase, err = parseRequestRate(*maxRequestRateOpt)
if err != nil {
output.Fatalf("Could not parse max request rate: %w", err)
}
cfg.BlockConsumerConfig.MaxRetries = uint64(*maxRetriesOpt)
cfg.BlockConsumerConfig.BaseBackoffDuration, err = parseDuration(*backoffDurationOpt)
if err != nil {
Expand Down Expand Up @@ -107,7 +114,7 @@ func Vent(output Output) func(cmd *cli.Cmd) {
cmd.Spec = "--spec=<spec file or dir>... [--abi=<abi file or dir>...] " +
"[--watch=<contract address>...] [--minimum-height=<lowest height from which to read>] " +
"[--max-retries=<max block request retries>] [--backoff=<minimum backoff duration>] " +
"[--batch-size=<minimum block batch size>] " +
"[--max-request-rate=<requests / time base>] [--batch-size=<minimum block batch size>] " +
"[--db-adapter] [--db-url] [--db-schema] [--blocks] [--txs] [--chain-addr] [--http-addr] " +
"[--log-level] [--announce-every=<duration>]"

Expand Down Expand Up @@ -269,6 +276,25 @@ func parseDuration(duration string) (time.Duration, error) {
return time.ParseDuration(duration)
}

func parseRequestRate(rate string) (int, time.Duration, error) {
if rate == "" {
return 0, 0, nil
}
ratio := strings.Split(rate, "/")
if len(ratio) != 2 {
return 0, 0, fmt.Errorf("expected a ratio string separated by a '/' but got %s", rate)
}
requests, err := strconv.ParseInt(ratio[0], 10, 0)
if err != nil {
return 0, 0, fmt.Errorf("could not parse max requests as base 10 integer: %w", err)
}
timeBase, err := time.ParseDuration(ratio[1])
if err != nil {
return 0, 0, fmt.Errorf("could not parse time base: %w", err)
}
return int(requests), timeBase, nil
}

type dbOpts struct {
adapter *string
url *string
Expand Down
21 changes: 21 additions & 0 deletions cmd/burrow/commands/vent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package commands

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestParseRequestRate(t *testing.T) {
requests, base, err := parseRequestRate("2334223/24h")
require.NoError(t, err)
assert.Equal(t, 2334223, requests)
assert.Equal(t, time.Hour*24, base)

requests, base, err = parseRequestRate("99990/24h")
require.NoError(t, err)
assert.Equal(t, 99_990, requests)
assert.Equal(t, time.Hour*24, base)
}
43 changes: 43 additions & 0 deletions logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func NewNoopLogger() *Logger {

// Handle signals
func (l *Logger) Sync() error {
if l == nil {
return nil
}
// Send over input channels (to pass through any capture loggers)
err := structure.Sync(l.Info)
if err != nil {
Expand All @@ -68,6 +71,9 @@ func (l *Logger) Sync() error {
}

func (l *Logger) Reload() error {
if l == nil {
return nil
}
// Send directly to output logger
return structure.Reload(l.Output)
}
Expand All @@ -81,6 +87,9 @@ func (l *Logger) Reload() error {
// assumption about the name or signature of the logging method(s).
// See InfoTraceLogger
func (l *Logger) With(keyvals ...interface{}) *Logger {
if l == nil {
return nil
}
return &Logger{
Output: l.Output,
Info: log.With(l.Info, keyvals...),
Expand All @@ -90,6 +99,9 @@ func (l *Logger) With(keyvals ...interface{}) *Logger {

// Establish a context on the Info channel keeping Trace the same
func (l *Logger) WithInfo(keyvals ...interface{}) *Logger {
if l == nil {
return nil
}
return &Logger{
Output: l.Output,
Info: log.With(l.Info, keyvals...),
Expand All @@ -99,6 +111,9 @@ func (l *Logger) WithInfo(keyvals ...interface{}) *Logger {

// Establish a context on the Trace channel keeping Info the same
func (l *Logger) WithTrace(keyvals ...interface{}) *Logger {
if l == nil {
return nil
}
return &Logger{
Output: l.Output,
Info: l.Info,
Expand All @@ -107,6 +122,9 @@ func (l *Logger) WithTrace(keyvals ...interface{}) *Logger {
}

func (l *Logger) WithPrefix(keyvals ...interface{}) *Logger {
if l == nil {
return nil
}
return &Logger{
Output: l.Output,
Info: log.WithPrefix(l.Info, keyvals...),
Expand All @@ -116,19 +134,44 @@ func (l *Logger) WithPrefix(keyvals ...interface{}) *Logger {

// Hot swap the underlying outputLogger with another one to re-route messages
func (l *Logger) SwapOutput(infoLogger log.Logger) {
if l == nil {
return
}
l.Output.Swap(infoLogger)
}

// Record structured Info log line with a message
func (l *Logger) InfoMsg(message string, keyvals ...interface{}) error {
if l == nil {
return nil
}
return Msg(l.Info, message, keyvals...)
}

// Record structured Trace log line with a message
func (l *Logger) TraceMsg(message string, keyvals ...interface{}) error {
if l == nil {
return nil
}
return Msg(l.Trace, message, keyvals...)
}

// Record structured Info log line
func (l *Logger) InfoLog(keyvals ...interface{}) error {
if l == nil {
return nil
}
return l.Info.Log(keyvals...)
}

// Record structured Trace log line
func (l *Logger) TraceLog(keyvals ...interface{}) error {
if l == nil {
return nil
}
return l.Trace.Log(keyvals...)
}

// Establish or extend the scope of this logger by appending scopeName to the Scope vector.
// Like With the logging scope is append only but can be used to provide parenthetical scopes by hanging on to the
// parent scope and using once the scope has been exited. The scope mechanism does is agnostic to the type of scope
Expand Down
15 changes: 14 additions & 1 deletion vent/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ import (
)

const (
// Infura has free tier usage of 100,000 req/day
defaultMaxRequests = 99_990
defaultTimeBase = time.Hour * 24
defaultMaxRetires = 5
defaultBackoffBase = 250 * time.Millisecond
defaultBackoffBase = time.Second
defaultMaxBlockBatchSize = 100
)

Expand Down Expand Up @@ -71,6 +74,10 @@ type Origin struct {

// Client-side block consumer configuration. Requests are retried subject to backoff if a non-fatal error is detected
type BlockConsumerConfig struct {
// The maximum number of requests to make per TimeBase before throttling requests
MaxRequests int
// The base duration over which to count requests to check for overage of MaxRequests
TimeBase time.Duration
// The base backoff - we wait this amount of time between each batch and we increase the backoff exponentially
// until we reach MaxRetries from BaseBackoffDuration
BaseBackoffDuration time.Duration
Expand All @@ -82,6 +89,12 @@ type BlockConsumerConfig struct {
}

func (config *BlockConsumerConfig) Complete() {
if config.MaxRequests == 0 {
config.MaxRequests = defaultMaxRequests
}
if config.TimeBase == 0 {
config.TimeBase = defaultTimeBase
}
if config.MaxBlockBatchSize == 0 {
config.MaxBlockBatchSize = defaultMaxBlockBatchSize
}
Expand Down
7 changes: 5 additions & 2 deletions vent/chain/ethereum/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
const ConsumerScope = "EthereumConsumer"

type consumer struct {
client EthClient
client ThrottleClient
filter *chain.Filter
blockRange *rpcevents.BlockRange
logger *logging.Logger
Expand All @@ -33,7 +33,7 @@ type consumer struct {
blockBatchSize uint64
}

func Consume(client EthClient, filter *chain.Filter, blockRange *rpcevents.BlockRange, config *chain.BlockConsumerConfig,
func Consume(client ThrottleClient, filter *chain.Filter, blockRange *rpcevents.BlockRange, config *chain.BlockConsumerConfig,
logger *logging.Logger, consume func(block chain.Block) error) error {
c := consumer{
client: client,
Expand Down Expand Up @@ -66,6 +66,7 @@ func (c *consumer) Consume() error {
if err != nil {
return err
}
// Avoid spinning excessively where there may be no blocks available
time.Sleep(c.backoffDuration)
}

Expand All @@ -75,6 +76,8 @@ func (c *consumer) Consume() error {
func (c *consumer) ConsumeInBatches(start, end uint64) error {
c.logger.TraceMsg("ConsumeInBatches", "start", start, "end", end)
for batchStart := start; batchStart <= end; batchStart += c.blockBatchSize {
// Avoid breaching requests limit
c.client.Throttle()
batchEnd := batchStart + c.blockBatchSize
c.logger.TraceMsg("Consuming batch", "batch_start", batchStart, "batch_end", batchEnd)
if batchEnd > end {
Expand Down
12 changes: 8 additions & 4 deletions vent/chain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"google.golang.org/grpc/connectivity"
)

const Scope = "Ethereum"

type Chain struct {
client EthClient
client ThrottleClient
filter *chain.Filter
chainID string
version string
Expand All @@ -46,16 +48,18 @@ type EthClient interface {
// We rely on this failing if the chain is not an Ethereum Chain
func New(client EthClient, filter *chain.Filter, consumerConfig *chain.BlockConsumerConfig,
logger *logging.Logger) (*Chain, error) {
chainID, err := client.NetVersion()
logger = logger.WithScope(Scope)
throttleClient := NewThrottleClient(client, consumerConfig.MaxRequests, consumerConfig.TimeBase, logger)
chainID, err := throttleClient.NetVersion()
if err != nil {
return nil, fmt.Errorf("could not get Ethereum ChainID: %w", err)
}
version, err := client.Web3ClientVersion()
version, err := throttleClient.Web3ClientVersion()
if err != nil {
return nil, fmt.Errorf("could not get Ethereum node version: %w", err)
}
return &Chain{
client: client,
client: throttleClient,
filter: filter,
chainID: chainID,
version: version,
Expand Down
58 changes: 58 additions & 0 deletions vent/chain/ethereum/throttle_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package ethereum

import (
"time"

"github.com/hyperledger/burrow/logging"

"github.com/hyperledger/burrow/rpc/web3/ethclient"
)

type ThrottleClient interface {
EthClient
Throttle()
}

type throttleClient struct {
*Throttler
client EthClient
}

func NewThrottleClient(client EthClient, maxRequests int, timeBase time.Duration, logger *logging.Logger) *throttleClient {
return &throttleClient{
Throttler: NewThrottler(maxRequests, timeBase, timeBase,
logger.WithScope("ThrottleClient").
With("max_requests", maxRequests, "time_base", timeBase.String())),
client: client,
}
}

func (t *throttleClient) GetLogs(filter *ethclient.Filter) ([]*ethclient.EthLog, error) {
t.addNow()
return t.client.GetLogs(filter)
}

func (t *throttleClient) BlockNumber() (uint64, error) {
t.addNow()
return t.client.BlockNumber()
}

func (t *throttleClient) GetBlockByNumber(height string) (*ethclient.Block, error) {
t.addNow()
return t.client.GetBlockByNumber(height)
}

func (t *throttleClient) NetVersion() (string, error) {
t.addNow()
return t.client.NetVersion()
}

func (t *throttleClient) Web3ClientVersion() (string, error) {
t.addNow()
return t.client.Web3ClientVersion()
}

func (t *throttleClient) Syncing() (bool, error) {
t.addNow()
return t.client.Syncing()
}
Loading

0 comments on commit c2133a9

Please sign in to comment.