Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add event broadcast metric to determine useless peer #408

Draft
wants to merge 10 commits into
base: develop
Choose a base branch
from
57 changes: 53 additions & 4 deletions gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"
"math/rand"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -57,6 +58,7 @@ const (
softResponseLimitSize = 2 * 1024 * 1024 // Target maximum size of returned events, or other data.
softLimitItems = 250 // Target maximum number of events or transactions to request/response
hardLimitItems = softLimitItems * 4 // Maximum number of events or transactions to request/response
broadcastThreshold = 0.1 // The broadcast threshold metric for useless peer

// txChanSize is the size of channel listening to NewTxsNotify.
// The number is referenced from the size of tx pool.
Expand Down Expand Up @@ -787,10 +789,6 @@ func (h *handler) handle(p *peer) error {
useless = true
discfilter.Ban(p.ID())
}
if !p.Peer.Info().Network.Trusted && useless && h.peers.UselessNum() >= h.maxPeers/10 {
// don't allow more than 10% of useless peers
return p2p.DiscTooManyPeers
}
if !p.Peer.Info().Network.Trusted && useless {
if h.peers.UselessNum() >= h.maxPeers/10 {
// don't allow more than 10% of useless peers
Expand Down Expand Up @@ -920,6 +918,10 @@ func (h *handler) handleTxs(p *peer, txs types.Transactions) {
func (h *handler) handleEventHashes(p *peer, announces hash.Events) {
// Mark the hashes as present at the remote node
for _, id := range announces {
if !p.knownEvents.Contains(id) {
p.AddSentEvent(id)
h.peers.rates.Update(p.ID().String(), EventsMsg, p.GetBroadcastEvents())
}
p.MarkEvent(id)
}
// filter too high IDs
Expand Down Expand Up @@ -947,6 +949,10 @@ func (h *handler) handleEventHashes(p *peer, announces hash.Events) {
func (h *handler) handleEvents(p *peer, events dag.Events, ordered bool) {
// Mark the hashes as present at the remote node
for _, e := range events {
if !p.knownEvents.Contains(e.ID()) {
p.AddSentEvent(e.ID())
h.peers.rates.Update(p.ID().String(), EventsMsg, p.GetBroadcastEvents())
}
p.MarkEvent(e.ID())
}
// filter too high events
Expand Down Expand Up @@ -1016,6 +1022,7 @@ func (h *handler) handleMsg(p *peer) error {
p.SetProgress(progress)

case msg.Code == EvmTxsMsg:
p.SetFullSync()
// Transactions arrived, make sure we have a valid and fresh graph to handle them
if !h.syncStatus.AcceptTxs() {
break
Expand All @@ -1036,6 +1043,7 @@ func (h *handler) handleMsg(p *peer) error {
h.handleTxs(p, txs)

case msg.Code == NewEvmTxHashesMsg:
p.SetFullSync()
// Transactions arrived, make sure we have a valid and fresh graph to handle them
if !h.syncStatus.AcceptTxs() {
break
Expand All @@ -1051,6 +1059,10 @@ func (h *handler) handleMsg(p *peer) error {
h.handleTxHashes(p, txHashes)

case msg.Code == GetEvmTxsMsg:
// If peer is doing snapsync (a.k.a not doing fullsync), don't broadcast txs and events to this peer
if !p.FullSync() {
break
}
var requests []common.Hash
if err := msg.Decode(&requests); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
Expand All @@ -1072,6 +1084,7 @@ func (h *handler) handleMsg(p *peer) error {
})

case msg.Code == EventsMsg:
p.SetFullSync()
if !h.syncStatus.AcceptEvents() {
break
}
Expand All @@ -1087,6 +1100,7 @@ func (h *handler) handleMsg(p *peer) error {
h.handleEvents(p, events.Bases(), events.Len() > 1)

case msg.Code == NewEventIDsMsg:
p.SetFullSync()
// Fresh events arrived, make sure we have a valid and fresh graph to handle them
if !h.syncStatus.AcceptEvents() {
break
Expand All @@ -1101,6 +1115,10 @@ func (h *handler) handleMsg(p *peer) error {
h.handleEventHashes(p, announces)

case msg.Code == GetEventsMsg:
// If peer is doing snapsync (a.k.a not doing fullsync), don't broadcast txs and events to this peer
if !p.FullSync() {
break
}
var requests hash.Events
if err := msg.Decode(&requests); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
Expand Down Expand Up @@ -1153,6 +1171,7 @@ func (h *handler) handleMsg(p *peer) error {
}

case msg.Code == EventsStreamResponse:
p.SetFullSync()
if !h.syncStatus.AcceptEvents() {
break
}
Expand Down Expand Up @@ -1355,6 +1374,27 @@ func (h *handler) decideBroadcastAggressiveness(size int, passed time.Duration,
return fullRecipients
}

// nonProgress check if peer didn't advance its epoch for progressInterval seconds, and its epoch is lower than our
func (h *handler) nonProgress(p *peer) bool {
return time.Since(p.latestProgress.Time()) > progressInterval && p.progress.Less(h.myProgress())
}

// isMostNonActive check if peer is the most non active peer (a.k.a sending utmost least events) in the current tracking peers
func (h *handler) isMostNonActive(p *peer) bool {
peers := &capacitySort{
ids: make([]string, 0, len(h.peers.peers)),
caps: make([]int, 0, len(h.peers.peers)),
}

for id := range h.peers.peers {
peers.ids = append(peers.ids, id)
peers.caps = append(peers.caps, h.peers.rates.Capacity(id, EventsMsg))
}
sort.Sort(peers)

return len(peers.ids) > 0 && peers.ids[0] == p.id
}

// BroadcastEvent will either propagate a event to a subset of it's peers, or
// will only announce it's availability (depending what's requested).
func (h *handler) BroadcastEvent(event *inter.EventPayload, passed time.Duration) int {
Expand All @@ -1374,6 +1414,15 @@ func (h *handler) BroadcastEvent(event *inter.EventPayload, passed time.Duration
var fullBroadcast = make([]*peer, 0, fullRecipients)
var hashBroadcast = make([]*peer, 0, len(peers))
for _, p := range peers {
// Update peers' broadcast metric
useless := discfilter.Banned(p.Node().ID(), p.Node().Record())
if !useless && (!eligibleForSnap(p.Peer) || (p.FullSync() && (h.nonProgress(p) || h.isMostNonActive(p)))) {
useless = true
discfilter.Ban(p.ID())
}
if !p.Peer.Info().Network.Trusted && useless {
p.SetUseless()
}
if !p.Useless() && len(fullBroadcast) < fullRecipients {
fullBroadcast = append(fullBroadcast, p)
} else {
Expand Down
151 changes: 151 additions & 0 deletions gossip/msgrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package gossip

import (
"errors"
"math"
"sync"
)

type Tracker struct {
capacity map[uint64]float64

lock sync.RWMutex
}

func NewTracker(caps map[uint64]float64) *Tracker {
if caps == nil {
caps = make(map[uint64]float64)
}
return &Tracker{
capacity: caps,
}
}

func (t *Tracker) Update(kind uint64, items int) {
t.lock.Lock()
defer t.lock.Unlock()

t.capacity[kind] = float64(items)
}

func (t *Tracker) Capacity(kind uint64) int {
t.lock.RLock()
defer t.lock.RUnlock()

return roundCapacity(t.capacity[kind])
}

func roundCapacity(cap float64) int {
const maxInt32 = float64(1<<31 - 1)
return int(math.Min(maxInt32, math.Max(1, math.Ceil(cap))))
}

type Trackers struct {
trackers map[string]*Tracker

lock sync.RWMutex
}

// NewTrackers creates an empty set of trackers to be filled with peers.
func NewTrackers() *Trackers {
return &Trackers{
trackers: make(map[string]*Tracker),
}
}

// Untrack stops tracking a previously added peer.
func (t *Trackers) Untrack(id string) error {
t.lock.Lock()
defer t.lock.Unlock()

if _, ok := t.trackers[id]; !ok {
return errors.New("not tracking")
}
delete(t.trackers, id)
return nil
}

func (t *Trackers) Track(id string, tracker *Tracker) error {
t.lock.Lock()
defer t.lock.Unlock()

if _, ok := t.trackers[id]; ok {
return errors.New("already tracking")
}
t.trackers[id] = tracker

return nil
}

// Update is a helper function to access a specific tracker without having to
// track it explicitly outside.
func (t *Trackers) Update(id string, kind uint64, items int) {
t.lock.RLock()
defer t.lock.RUnlock()

if tracker := t.trackers[id]; tracker != nil {
tracker.Update(kind, items)
}
}

// MeanCapacities returns the capacities averaged across all the added trackers.
// The purpose of the mean capacities are to initialize a new peer with some sane
// starting values that it will hopefully outperform. If the mean overshoots, the
// peer will be cut back to minimal capacity and given another chance.
func (t *Trackers) MeanCapacities() map[uint64]float64 {
t.lock.RLock()
defer t.lock.RUnlock()

return t.meanCapacities()
}

// meanCapacities is the internal lockless version of MeanCapacities used for
// debug logging.
func (t *Trackers) meanCapacities() map[uint64]float64 {
capacities := make(map[uint64]float64)
for _, tt := range t.trackers {
tt.lock.RLock()
for key, val := range tt.capacity {
capacities[key] += val
}
tt.lock.RUnlock()
}
for key, val := range capacities {
capacities[key] = val / float64(len(t.trackers))
}
return capacities
}

// Capacity is a helper function to access a specific tracker without having to
// track it explicitly outside.
func (t *Trackers) Capacity(id string, kind uint64) int {
t.lock.RLock()
defer t.lock.RUnlock()

tracker := t.trackers[id]
if tracker == nil {
return 1
}
return tracker.Capacity(kind)
}

// capacitySort implements the Sort interface, allowing sorting by peer message
// throughput. Note, callers should use sort.Reverse to get the desired effect
// of highest capacity being at the front.
type capacitySort struct {
ids []string
caps []int
}

func (s *capacitySort) Len() int {
return len(s.ids)
}

func (s *capacitySort) Less(i, j int) bool {
return s.caps[i] < s.caps[j]
}

func (s *capacitySort) Swap(i, j int) {
s.ids[i], s.ids[j] = s.ids[j], s.ids[i]
s.caps[i], s.caps[j] = s.caps[j], s.caps[i]
}
12 changes: 12 additions & 0 deletions gossip/msgrate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package gossip

import "testing"

func TestCapacityOverflow(t *testing.T) {
tracker := NewTracker(nil)
tracker.Update(EventsMsg, 100000*10000000)
cap := tracker.Capacity(EventsMsg)
if int32(cap) < 0 {
t.Fatalf("Negative: %v", int32(cap))
}
}
Loading