diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index c01a3b6f4..7b5a1c223 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -36,7 +36,6 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - run: make test_integration_ethereum - run: make test_integration_vent_complete docker: diff --git a/acm/validator/ring.go b/acm/validator/ring.go index 56b37f864..f04c09040 100644 --- a/acm/validator/ring.go +++ b/acm/validator/ring.go @@ -7,7 +7,7 @@ import ( "github.com/hyperledger/burrow/crypto" ) -// Ring stores the validator power history in buckets as a riNng buffer. The primary storage is a the difference between +// Ring stores the validator power history in buckets as a ring buffer. The primary storage is a the difference between // each rotation (commit - i.e. block) in 'delta' and the cumulative result of each delta in cum, where the result of // the delta at i is stored in the cum at i+1. For example suppose we have 4 buckets then graphically: // diff --git a/cmd/burrow/commands/vent.go b/cmd/burrow/commands/vent.go index 106f8e249..bb2dbbcb4 100644 --- a/cmd/burrow/commands/vent.go +++ b/cmd/burrow/commands/vent.go @@ -5,6 +5,8 @@ import ( "io/ioutil" "os" "os/signal" + "strconv" + "strings" "sync" "syscall" "time" @@ -56,6 +58,7 @@ func Vent(output Output) func(cmd *cli.Cmd) { watchAddressesOpt := cmd.StringsOpt("watch", nil, "Add contract address to global watch filter") minimumHeightOpt := cmd.IntOpt("minimum-height", 0, "Only process block greater than or equal to height passed") maxRetriesOpt := cmd.IntOpt("max-retries", int(cfg.BlockConsumerConfig.MaxRetries), "Maximum number of retries when consuming blocks") + maxRequestRateOpt := cmd.StringOpt("max-request-rate", "", "Maximum request rate given as (number of requests)/(time base), e.g. 1000/24h for 1000 requests per day") backoffDurationOpt := cmd.StringOpt("backoff", "", "The minimum duration to wait before asking for new blocks - increases exponentially when errors occur. Values like 200ms, 1s, 2m") batchSizeOpt := cmd.IntOpt("batch-size", int(cfg.BlockConsumerConfig.MaxBlockBatchSize), @@ -77,6 +80,10 @@ func Vent(output Output) func(cmd *cli.Cmd) { cfg.HTTPListenAddress = *httpAddrOpt cfg.WatchAddresses = make([]crypto.Address, len(*watchAddressesOpt)) cfg.MinimumHeight = uint64(*minimumHeightOpt) + cfg.BlockConsumerConfig.MaxRequests, cfg.BlockConsumerConfig.TimeBase, err = parseRequestRate(*maxRequestRateOpt) + if err != nil { + output.Fatalf("Could not parse max request rate: %w", err) + } cfg.BlockConsumerConfig.MaxRetries = uint64(*maxRetriesOpt) cfg.BlockConsumerConfig.BaseBackoffDuration, err = parseDuration(*backoffDurationOpt) if err != nil { @@ -107,7 +114,7 @@ func Vent(output Output) func(cmd *cli.Cmd) { cmd.Spec = "--spec=... [--abi=...] " + "[--watch=...] [--minimum-height=] " + "[--max-retries=] [--backoff=] " + - "[--batch-size=] " + + "[--max-request-rate=] [--batch-size=] " + "[--db-adapter] [--db-url] [--db-schema] [--blocks] [--txs] [--chain-addr] [--http-addr] " + "[--log-level] [--announce-every=]" @@ -269,6 +276,25 @@ func parseDuration(duration string) (time.Duration, error) { return time.ParseDuration(duration) } +func parseRequestRate(rate string) (int, time.Duration, error) { + if rate == "" { + return 0, 0, nil + } + ratio := strings.Split(rate, "/") + if len(ratio) != 2 { + return 0, 0, fmt.Errorf("expected a ratio string separated by a '/' but got %s", rate) + } + requests, err := strconv.ParseInt(ratio[0], 10, 0) + if err != nil { + return 0, 0, fmt.Errorf("could not parse max requests as base 10 integer: %w", err) + } + timeBase, err := time.ParseDuration(ratio[1]) + if err != nil { + return 0, 0, fmt.Errorf("could not parse time base: %w", err) + } + return int(requests), timeBase, nil +} + type dbOpts struct { adapter *string url *string diff --git a/cmd/burrow/commands/vent_test.go b/cmd/burrow/commands/vent_test.go new file mode 100644 index 000000000..10dd5105b --- /dev/null +++ b/cmd/burrow/commands/vent_test.go @@ -0,0 +1,21 @@ +package commands + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseRequestRate(t *testing.T) { + requests, base, err := parseRequestRate("2334223/24h") + require.NoError(t, err) + assert.Equal(t, 2334223, requests) + assert.Equal(t, time.Hour*24, base) + + requests, base, err = parseRequestRate("99990/24h") + require.NoError(t, err) + assert.Equal(t, 99_990, requests) + assert.Equal(t, time.Hour*24, base) +} diff --git a/logging/logger.go b/logging/logger.go index a3d6219f1..4e697a6e1 100644 --- a/logging/logger.go +++ b/logging/logger.go @@ -59,6 +59,9 @@ func NewNoopLogger() *Logger { // Handle signals func (l *Logger) Sync() error { + if l == nil { + return nil + } // Send over input channels (to pass through any capture loggers) err := structure.Sync(l.Info) if err != nil { @@ -68,6 +71,9 @@ func (l *Logger) Sync() error { } func (l *Logger) Reload() error { + if l == nil { + return nil + } // Send directly to output logger return structure.Reload(l.Output) } @@ -81,6 +87,9 @@ func (l *Logger) Reload() error { // assumption about the name or signature of the logging method(s). // See InfoTraceLogger func (l *Logger) With(keyvals ...interface{}) *Logger { + if l == nil { + return nil + } return &Logger{ Output: l.Output, Info: log.With(l.Info, keyvals...), @@ -90,6 +99,9 @@ func (l *Logger) With(keyvals ...interface{}) *Logger { // Establish a context on the Info channel keeping Trace the same func (l *Logger) WithInfo(keyvals ...interface{}) *Logger { + if l == nil { + return nil + } return &Logger{ Output: l.Output, Info: log.With(l.Info, keyvals...), @@ -99,6 +111,9 @@ func (l *Logger) WithInfo(keyvals ...interface{}) *Logger { // Establish a context on the Trace channel keeping Info the same func (l *Logger) WithTrace(keyvals ...interface{}) *Logger { + if l == nil { + return nil + } return &Logger{ Output: l.Output, Info: l.Info, @@ -107,6 +122,9 @@ func (l *Logger) WithTrace(keyvals ...interface{}) *Logger { } func (l *Logger) WithPrefix(keyvals ...interface{}) *Logger { + if l == nil { + return nil + } return &Logger{ Output: l.Output, Info: log.WithPrefix(l.Info, keyvals...), @@ -116,19 +134,44 @@ func (l *Logger) WithPrefix(keyvals ...interface{}) *Logger { // Hot swap the underlying outputLogger with another one to re-route messages func (l *Logger) SwapOutput(infoLogger log.Logger) { + if l == nil { + return + } l.Output.Swap(infoLogger) } // Record structured Info log line with a message func (l *Logger) InfoMsg(message string, keyvals ...interface{}) error { + if l == nil { + return nil + } return Msg(l.Info, message, keyvals...) } // Record structured Trace log line with a message func (l *Logger) TraceMsg(message string, keyvals ...interface{}) error { + if l == nil { + return nil + } return Msg(l.Trace, message, keyvals...) } +// Record structured Info log line +func (l *Logger) InfoLog(keyvals ...interface{}) error { + if l == nil { + return nil + } + return l.Info.Log(keyvals...) +} + +// Record structured Trace log line +func (l *Logger) TraceLog(keyvals ...interface{}) error { + if l == nil { + return nil + } + return l.Trace.Log(keyvals...) +} + // Establish or extend the scope of this logger by appending scopeName to the Scope vector. // Like With the logging scope is append only but can be used to provide parenthetical scopes by hanging on to the // parent scope and using once the scope has been exited. The scope mechanism does is agnostic to the type of scope diff --git a/vent/chain/chain.go b/vent/chain/chain.go index 3ba965a78..eba6d4a36 100644 --- a/vent/chain/chain.go +++ b/vent/chain/chain.go @@ -16,8 +16,11 @@ import ( ) const ( + // Infura has free tier usage of 100,000 req/day + defaultMaxRequests = 99_990 + defaultTimeBase = time.Hour * 24 defaultMaxRetires = 5 - defaultBackoffBase = 250 * time.Millisecond + defaultBackoffBase = time.Second defaultMaxBlockBatchSize = 100 ) @@ -71,6 +74,10 @@ type Origin struct { // Client-side block consumer configuration. Requests are retried subject to backoff if a non-fatal error is detected type BlockConsumerConfig struct { + // The maximum number of requests to make per TimeBase before throttling requests + MaxRequests int + // The base duration over which to count requests to check for overage of MaxRequests + TimeBase time.Duration // The base backoff - we wait this amount of time between each batch and we increase the backoff exponentially // until we reach MaxRetries from BaseBackoffDuration BaseBackoffDuration time.Duration @@ -82,6 +89,12 @@ type BlockConsumerConfig struct { } func (config *BlockConsumerConfig) Complete() { + if config.MaxRequests == 0 { + config.MaxRequests = defaultMaxRequests + } + if config.TimeBase == 0 { + config.TimeBase = defaultTimeBase + } if config.MaxBlockBatchSize == 0 { config.MaxBlockBatchSize = defaultMaxBlockBatchSize } diff --git a/vent/chain/ethereum/consumer.go b/vent/chain/ethereum/consumer.go index 36f2bec6b..64d50da50 100644 --- a/vent/chain/ethereum/consumer.go +++ b/vent/chain/ethereum/consumer.go @@ -18,7 +18,7 @@ import ( const ConsumerScope = "EthereumConsumer" type consumer struct { - client EthClient + client ThrottleClient filter *chain.Filter blockRange *rpcevents.BlockRange logger *logging.Logger @@ -33,7 +33,7 @@ type consumer struct { blockBatchSize uint64 } -func Consume(client EthClient, filter *chain.Filter, blockRange *rpcevents.BlockRange, config *chain.BlockConsumerConfig, +func Consume(client ThrottleClient, filter *chain.Filter, blockRange *rpcevents.BlockRange, config *chain.BlockConsumerConfig, logger *logging.Logger, consume func(block chain.Block) error) error { c := consumer{ client: client, @@ -66,6 +66,7 @@ func (c *consumer) Consume() error { if err != nil { return err } + // Avoid spinning excessively where there may be no blocks available time.Sleep(c.backoffDuration) } @@ -75,6 +76,8 @@ func (c *consumer) Consume() error { func (c *consumer) ConsumeInBatches(start, end uint64) error { c.logger.TraceMsg("ConsumeInBatches", "start", start, "end", end) for batchStart := start; batchStart <= end; batchStart += c.blockBatchSize { + // Avoid breaching requests limit + c.client.Throttle() batchEnd := batchStart + c.blockBatchSize c.logger.TraceMsg("Consuming batch", "batch_start", batchStart, "batch_end", batchEnd) if batchEnd > end { diff --git a/vent/chain/ethereum/ethereum.go b/vent/chain/ethereum/ethereum.go index b0a983502..644ba930c 100644 --- a/vent/chain/ethereum/ethereum.go +++ b/vent/chain/ethereum/ethereum.go @@ -23,8 +23,10 @@ import ( "google.golang.org/grpc/connectivity" ) +const Scope = "Ethereum" + type Chain struct { - client EthClient + client ThrottleClient filter *chain.Filter chainID string version string @@ -46,16 +48,18 @@ type EthClient interface { // We rely on this failing if the chain is not an Ethereum Chain func New(client EthClient, filter *chain.Filter, consumerConfig *chain.BlockConsumerConfig, logger *logging.Logger) (*Chain, error) { - chainID, err := client.NetVersion() + logger = logger.WithScope(Scope) + throttleClient := NewThrottleClient(client, consumerConfig.MaxRequests, consumerConfig.TimeBase, logger) + chainID, err := throttleClient.NetVersion() if err != nil { return nil, fmt.Errorf("could not get Ethereum ChainID: %w", err) } - version, err := client.Web3ClientVersion() + version, err := throttleClient.Web3ClientVersion() if err != nil { return nil, fmt.Errorf("could not get Ethereum node version: %w", err) } return &Chain{ - client: client, + client: throttleClient, filter: filter, chainID: chainID, version: version, diff --git a/vent/chain/ethereum/throttle_client.go b/vent/chain/ethereum/throttle_client.go new file mode 100644 index 000000000..ac846f963 --- /dev/null +++ b/vent/chain/ethereum/throttle_client.go @@ -0,0 +1,58 @@ +package ethereum + +import ( + "time" + + "github.com/hyperledger/burrow/logging" + + "github.com/hyperledger/burrow/rpc/web3/ethclient" +) + +type ThrottleClient interface { + EthClient + Throttle() +} + +type throttleClient struct { + *Throttler + client EthClient +} + +func NewThrottleClient(client EthClient, maxRequests int, timeBase time.Duration, logger *logging.Logger) *throttleClient { + return &throttleClient{ + Throttler: NewThrottler(maxRequests, timeBase, timeBase, + logger.WithScope("ThrottleClient"). + With("max_requests", maxRequests, "time_base", timeBase.String())), + client: client, + } +} + +func (t *throttleClient) GetLogs(filter *ethclient.Filter) ([]*ethclient.EthLog, error) { + t.addNow() + return t.client.GetLogs(filter) +} + +func (t *throttleClient) BlockNumber() (uint64, error) { + t.addNow() + return t.client.BlockNumber() +} + +func (t *throttleClient) GetBlockByNumber(height string) (*ethclient.Block, error) { + t.addNow() + return t.client.GetBlockByNumber(height) +} + +func (t *throttleClient) NetVersion() (string, error) { + t.addNow() + return t.client.NetVersion() +} + +func (t *throttleClient) Web3ClientVersion() (string, error) { + t.addNow() + return t.client.Web3ClientVersion() +} + +func (t *throttleClient) Syncing() (bool, error) { + t.addNow() + return t.client.Syncing() +} diff --git a/vent/chain/ethereum/throttler.go b/vent/chain/ethereum/throttler.go new file mode 100644 index 000000000..bd705b1bd --- /dev/null +++ b/vent/chain/ethereum/throttler.go @@ -0,0 +1,70 @@ +package ethereum + +import ( + "math/big" + "time" + + "github.com/hyperledger/burrow/logging" +) + +type Throttler struct { + // Request timestamps as unix nanos (avoid space overhead of time.Time) + requests []int64 + maxRequestsPerNanosecond *big.Float + // Window over which to accumulate request times + window time.Duration + logger *logging.Logger +} + +func NewThrottler(maxRequests int, timeBase time.Duration, window time.Duration, logger *logging.Logger) *Throttler { + maxRequestsPerNanosecond := new(big.Float).SetInt64(int64(maxRequests)) + maxRequestsPerNanosecond.Quo(maxRequestsPerNanosecond, new(big.Float).SetInt64(int64(timeBase))) + return &Throttler{ + maxRequestsPerNanosecond: maxRequestsPerNanosecond, + window: window, + logger: logger, + } +} + +func (t *Throttler) Throttle() { + time.Sleep(t.calculateWait()) +} + +func (t *Throttler) calculateWait() time.Duration { + requests := len(t.requests) + if requests < 2 { + return 0 + } + delta := t.requests[requests-1] - t.requests[0] + deltaNanoseconds := new(big.Float).SetInt64(delta) + + allowedRequestsInDelta := new(big.Float).Mul(deltaNanoseconds, t.maxRequestsPerNanosecond) + + overage := allowedRequestsInDelta.Sub(new(big.Float).SetInt64(int64(requests)), allowedRequestsInDelta) + if overage.Sign() > 0 { + // Wait just long enough to eat our overage at max request rate + nanos, _ := new(big.Float).Quo(overage, t.maxRequestsPerNanosecond).Int64() + wait := time.Duration(nanos) + t.logger.InfoMsg("throttling connection", + "num_requests", requests, + "over_period", time.Duration(delta).String(), + "requests_overage", overage.String(), + "wait", wait.String(), + ) + return wait + } + return 0 +} + +func (t *Throttler) addNow() { + t.add(time.Now()) +} + +func (t *Throttler) add(now time.Time) { + cutoff := now.Add(-t.window) + // Remove expired requests + for len(t.requests) > 0 && t.requests[0] < cutoff.UnixNano() { + t.requests = t.requests[1:] + } + t.requests = append(t.requests, now.UnixNano()) +} diff --git a/vent/chain/ethereum/throttler_test.go b/vent/chain/ethereum/throttler_test.go new file mode 100644 index 000000000..d5c4fc63f --- /dev/null +++ b/vent/chain/ethereum/throttler_test.go @@ -0,0 +1,56 @@ +package ethereum + +import ( + "testing" + "time" + + "github.com/hyperledger/burrow/logging/logconfig" + + "github.com/stretchr/testify/assert" +) + +var logger = logconfig.Sink().Terminal().LoggingConfig().WithTrace().MustLogger() + +const delta = float64(time.Millisecond) + +func TestThrottler_Overage(t *testing.T) { + throttler := NewThrottler(100, time.Second, time.Minute, logger) + + now := doRequests(throttler, 100, time.Now(), time.Second) + assert.InDelta(t, time.Duration(0), throttler.calculateWait(), delta) + + doRequests(throttler, 200, now, time.Second) + assert.InDelta(t, time.Second, throttler.calculateWait(), delta) +} + +func TestThrottler_Expiry(t *testing.T) { + throttler := NewThrottler(100, time.Second, 2*time.Second, logger) + + now := doRequests(throttler, 200, time.Now(), time.Second) + assert.InDelta(t, time.Second, throttler.calculateWait(), delta) + + now = doRequests(throttler, 100, now, time.Second) + assert.InDelta(t, time.Second, throttler.calculateWait(), delta) + + now = doRequests(throttler, 100, now, time.Second) + assert.InDelta(t, time.Duration(0), throttler.calculateWait(), delta) +} + +func TestThrottler_Bursts(t *testing.T) { + throttler := NewThrottler(10_000, time.Hour, 2*time.Hour, logger) + + now := doRequests(throttler, 200, time.Now(), time.Millisecond) + assert.InDelta(t, time.Minute+12*time.Second, throttler.calculateWait(), delta) + + now = doRequests(throttler, 100, now, time.Second) + assert.InDelta(t, time.Minute+47*time.Second, throttler.calculateWait(), delta) +} + +// Do numRequests many requests from start within interval +func doRequests(throttler *Throttler, numRequests int, start time.Time, interval time.Duration) time.Time { + period := interval / time.Duration(numRequests-1) + for i := 0; i < numRequests; i++ { + throttler.add(start.Add(period * time.Duration(i))) + } + return start.Add(interval) +} diff --git a/vent/service/consumer_integration_test.go b/vent/service/consumer_integration_test.go index 52154a9fb..9ff6d7292 100644 --- a/vent/service/consumer_integration_test.go +++ b/vent/service/consumer_integration_test.go @@ -12,11 +12,12 @@ import ( "testing" "time" + "github.com/hyperledger/burrow/vent/chain/ethereum" + "github.com/hyperledger/burrow/crypto" "github.com/hyperledger/burrow/execution/exec" "github.com/hyperledger/burrow/logging/logconfig" "github.com/hyperledger/burrow/rpc/rpctransact" - "github.com/hyperledger/burrow/vent/chain/ethereum" "github.com/hyperledger/burrow/vent/config" "github.com/hyperledger/burrow/vent/service" "github.com/hyperledger/burrow/vent/sqldb" @@ -36,7 +37,7 @@ const ( var tables = types.DefaultSQLTableNames // Tweak logger for debug purposes here -var logger = logconfig.Sink().Terminal().FilterScope(ethereum.ConsumerScope).LoggingConfig().WithTrace().MustLogger() +var logger = logconfig.Sink().Terminal().FilterScope(ethereum.Scope).LoggingConfig().WithTrace().MustLogger() func testConsumer(t *testing.T, chainID string, cfg *config.VentConfig, tcli test.TransactClient, inputAddress crypto.Address) {