diff --git a/gossip/handler.go b/gossip/handler.go index 3da134aed..9eeb93ecb 100644 --- a/gossip/handler.go +++ b/gossip/handler.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "math/rand" + "sort" "strings" "sync" "time" @@ -57,6 +58,7 @@ const ( softResponseLimitSize = 2 * 1024 * 1024 // Target maximum size of returned events, or other data. softLimitItems = 250 // Target maximum number of events or transactions to request/response hardLimitItems = softLimitItems * 4 // Maximum number of events or transactions to request/response + broadcastThreshold = 0.1 // The broadcast threshold metric for useless peer // txChanSize is the size of channel listening to NewTxsNotify. // The number is referenced from the size of tx pool. @@ -787,10 +789,6 @@ func (h *handler) handle(p *peer) error { useless = true discfilter.Ban(p.ID()) } - if !p.Peer.Info().Network.Trusted && useless && h.peers.UselessNum() >= h.maxPeers/10 { - // don't allow more than 10% of useless peers - return p2p.DiscTooManyPeers - } if !p.Peer.Info().Network.Trusted && useless { if h.peers.UselessNum() >= h.maxPeers/10 { // don't allow more than 10% of useless peers @@ -920,6 +918,10 @@ func (h *handler) handleTxs(p *peer, txs types.Transactions) { func (h *handler) handleEventHashes(p *peer, announces hash.Events) { // Mark the hashes as present at the remote node for _, id := range announces { + if !p.knownEvents.Contains(id) { + p.AddSentEvent(id) + h.peers.rates.Update(p.ID().String(), EventsMsg, p.GetBroadcastEvents()) + } p.MarkEvent(id) } // filter too high IDs @@ -947,6 +949,10 @@ func (h *handler) handleEventHashes(p *peer, announces hash.Events) { func (h *handler) handleEvents(p *peer, events dag.Events, ordered bool) { // Mark the hashes as present at the remote node for _, e := range events { + if !p.knownEvents.Contains(e.ID()) { + p.AddSentEvent(e.ID()) + h.peers.rates.Update(p.ID().String(), EventsMsg, p.GetBroadcastEvents()) + } p.MarkEvent(e.ID()) } // filter too high events @@ -1016,6 +1022,7 @@ func (h *handler) handleMsg(p *peer) error { p.SetProgress(progress) case msg.Code == EvmTxsMsg: + p.SetFullSync() // Transactions arrived, make sure we have a valid and fresh graph to handle them if !h.syncStatus.AcceptTxs() { break @@ -1036,6 +1043,7 @@ func (h *handler) handleMsg(p *peer) error { h.handleTxs(p, txs) case msg.Code == NewEvmTxHashesMsg: + p.SetFullSync() // Transactions arrived, make sure we have a valid and fresh graph to handle them if !h.syncStatus.AcceptTxs() { break @@ -1051,6 +1059,10 @@ func (h *handler) handleMsg(p *peer) error { h.handleTxHashes(p, txHashes) case msg.Code == GetEvmTxsMsg: + // If peer is doing snapsync (a.k.a not doing fullsync), don't broadcast txs and events to this peer + if !p.FullSync() { + break + } var requests []common.Hash if err := msg.Decode(&requests); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) @@ -1072,6 +1084,7 @@ func (h *handler) handleMsg(p *peer) error { }) case msg.Code == EventsMsg: + p.SetFullSync() if !h.syncStatus.AcceptEvents() { break } @@ -1087,6 +1100,7 @@ func (h *handler) handleMsg(p *peer) error { h.handleEvents(p, events.Bases(), events.Len() > 1) case msg.Code == NewEventIDsMsg: + p.SetFullSync() // Fresh events arrived, make sure we have a valid and fresh graph to handle them if !h.syncStatus.AcceptEvents() { break @@ -1101,6 +1115,10 @@ func (h *handler) handleMsg(p *peer) error { h.handleEventHashes(p, announces) case msg.Code == GetEventsMsg: + // If peer is doing snapsync (a.k.a not doing fullsync), don't broadcast txs and events to this peer + if !p.FullSync() { + break + } var requests hash.Events if err := msg.Decode(&requests); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) @@ -1153,6 +1171,7 @@ func (h *handler) handleMsg(p *peer) error { } case msg.Code == EventsStreamResponse: + p.SetFullSync() if !h.syncStatus.AcceptEvents() { break } @@ -1355,6 +1374,27 @@ func (h *handler) decideBroadcastAggressiveness(size int, passed time.Duration, return fullRecipients } +// nonProgress check if peer didn't advance its epoch for progressInterval seconds, and its epoch is lower than our +func (h *handler) nonProgress(p *peer) bool { + return time.Since(p.latestProgress.Time()) > progressInterval && p.progress.Less(h.myProgress()) +} + +// isMostNonActive check if peer is the most non active peer (a.k.a sending utmost least events) in the current tracking peers +func (h *handler) isMostNonActive(p *peer) bool { + peers := &capacitySort{ + ids: make([]string, 0, len(h.peers.peers)), + caps: make([]int, 0, len(h.peers.peers)), + } + + for id := range h.peers.peers { + peers.ids = append(peers.ids, id) + peers.caps = append(peers.caps, h.peers.rates.Capacity(id, EventsMsg)) + } + sort.Sort(peers) + + return len(peers.ids) > 0 && peers.ids[0] == p.id +} + // BroadcastEvent will either propagate a event to a subset of it's peers, or // will only announce it's availability (depending what's requested). func (h *handler) BroadcastEvent(event *inter.EventPayload, passed time.Duration) int { @@ -1374,6 +1414,15 @@ func (h *handler) BroadcastEvent(event *inter.EventPayload, passed time.Duration var fullBroadcast = make([]*peer, 0, fullRecipients) var hashBroadcast = make([]*peer, 0, len(peers)) for _, p := range peers { + // Update peers' broadcast metric + useless := discfilter.Banned(p.Node().ID(), p.Node().Record()) + if !useless && (!eligibleForSnap(p.Peer) || (p.FullSync() && (h.nonProgress(p) || h.isMostNonActive(p)))) { + useless = true + discfilter.Ban(p.ID()) + } + if !p.Peer.Info().Network.Trusted && useless { + p.SetUseless() + } if !p.Useless() && len(fullBroadcast) < fullRecipients { fullBroadcast = append(fullBroadcast, p) } else { diff --git a/gossip/msgrate.go b/gossip/msgrate.go new file mode 100644 index 000000000..39c36d768 --- /dev/null +++ b/gossip/msgrate.go @@ -0,0 +1,151 @@ +package gossip + +import ( + "errors" + "math" + "sync" +) + +type Tracker struct { + capacity map[uint64]float64 + + lock sync.RWMutex +} + +func NewTracker(caps map[uint64]float64) *Tracker { + if caps == nil { + caps = make(map[uint64]float64) + } + return &Tracker{ + capacity: caps, + } +} + +func (t *Tracker) Update(kind uint64, items int) { + t.lock.Lock() + defer t.lock.Unlock() + + t.capacity[kind] = float64(items) +} + +func (t *Tracker) Capacity(kind uint64) int { + t.lock.RLock() + defer t.lock.RUnlock() + + return roundCapacity(t.capacity[kind]) +} + +func roundCapacity(cap float64) int { + const maxInt32 = float64(1<<31 - 1) + return int(math.Min(maxInt32, math.Max(1, math.Ceil(cap)))) +} + +type Trackers struct { + trackers map[string]*Tracker + + lock sync.RWMutex +} + +// NewTrackers creates an empty set of trackers to be filled with peers. +func NewTrackers() *Trackers { + return &Trackers{ + trackers: make(map[string]*Tracker), + } +} + +// Untrack stops tracking a previously added peer. +func (t *Trackers) Untrack(id string) error { + t.lock.Lock() + defer t.lock.Unlock() + + if _, ok := t.trackers[id]; !ok { + return errors.New("not tracking") + } + delete(t.trackers, id) + return nil +} + +func (t *Trackers) Track(id string, tracker *Tracker) error { + t.lock.Lock() + defer t.lock.Unlock() + + if _, ok := t.trackers[id]; ok { + return errors.New("already tracking") + } + t.trackers[id] = tracker + + return nil +} + +// Update is a helper function to access a specific tracker without having to +// track it explicitly outside. +func (t *Trackers) Update(id string, kind uint64, items int) { + t.lock.RLock() + defer t.lock.RUnlock() + + if tracker := t.trackers[id]; tracker != nil { + tracker.Update(kind, items) + } +} + +// MeanCapacities returns the capacities averaged across all the added trackers. +// The purpose of the mean capacities are to initialize a new peer with some sane +// starting values that it will hopefully outperform. If the mean overshoots, the +// peer will be cut back to minimal capacity and given another chance. +func (t *Trackers) MeanCapacities() map[uint64]float64 { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.meanCapacities() +} + +// meanCapacities is the internal lockless version of MeanCapacities used for +// debug logging. +func (t *Trackers) meanCapacities() map[uint64]float64 { + capacities := make(map[uint64]float64) + for _, tt := range t.trackers { + tt.lock.RLock() + for key, val := range tt.capacity { + capacities[key] += val + } + tt.lock.RUnlock() + } + for key, val := range capacities { + capacities[key] = val / float64(len(t.trackers)) + } + return capacities +} + +// Capacity is a helper function to access a specific tracker without having to +// track it explicitly outside. +func (t *Trackers) Capacity(id string, kind uint64) int { + t.lock.RLock() + defer t.lock.RUnlock() + + tracker := t.trackers[id] + if tracker == nil { + return 1 + } + return tracker.Capacity(kind) +} + +// capacitySort implements the Sort interface, allowing sorting by peer message +// throughput. Note, callers should use sort.Reverse to get the desired effect +// of highest capacity being at the front. +type capacitySort struct { + ids []string + caps []int +} + +func (s *capacitySort) Len() int { + return len(s.ids) +} + +func (s *capacitySort) Less(i, j int) bool { + return s.caps[i] < s.caps[j] +} + +func (s *capacitySort) Swap(i, j int) { + s.ids[i], s.ids[j] = s.ids[j], s.ids[i] + s.caps[i], s.caps[j] = s.caps[j], s.caps[i] +} diff --git a/gossip/msgrate_test.go b/gossip/msgrate_test.go new file mode 100644 index 000000000..3444109a8 --- /dev/null +++ b/gossip/msgrate_test.go @@ -0,0 +1,12 @@ +package gossip + +import "testing" + +func TestCapacityOverflow(t *testing.T) { + tracker := NewTracker(nil) + tracker.Update(EventsMsg, 100000*10000000) + cap := tracker.Capacity(EventsMsg) + if int32(cap) < 0 { + t.Fatalf("Negative: %v", int32(cap)) + } +} diff --git a/gossip/peer.go b/gossip/peer.go index 95139b6ee..c70119ebe 100644 --- a/gossip/peer.go +++ b/gossip/peer.go @@ -13,6 +13,7 @@ import ( "github.com/Fantom-foundation/lachesis-base/utils/datasemaphore" mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/p2p" @@ -30,7 +31,9 @@ var ( ) const ( - handshakeTimeout = 5 * time.Second + handshakeTimeout = 5 * time.Second // handshake timeout + lastEpochNums = 2 // the numbers of epoch to calculate the peer broadcast metric + progressInterval = 200 * time.Second // peer should be marked as useless if it didn't advance epoch within this time duration ) // PeerInfo represents a short summary of the sub-protocol metadata known @@ -62,13 +65,17 @@ type peer struct { queuedDataSemaphore *datasemaphore.DataSemaphore term chan struct{} // Termination channel to stop the broadcaster - progress PeerProgress + progress PeerProgress + latestProgress inter.Timestamp + fullSync uint32 snapExt *snapPeer // Satellite `snap` connection syncDrop *time.Timer // Connection dropper if `eth` sync progress isn't validated in time snapWait chan struct{} // Notification channel for snap connections useless uint32 + tracker *prque.Prque // Priority queue mapping timestamp to number of broadcasted events to gc + rates *Tracker // Tracker to hone in on the number of items broadcasting in last hours sync.RWMutex } @@ -81,10 +88,55 @@ func (p *peer) SetUseless() { atomic.StoreUint32(&p.useless, 1) } +func (p *peer) FullSync() bool { + return atomic.LoadUint32(&p.fullSync) != 0 +} + +func (p *peer) SetFullSync() { + atomic.StoreUint32(&p.fullSync, 1) +} + +func (p *peer) AddSentEvent(event hash.Event) { + p.Lock() + defer p.Unlock() + + p.tracker.Push(event, -time.Now().UnixNano()) +} + +// return number of broadcasted events by the peer in the last 2 hours +func (p *peer) GetBroadcastEvents() int { + p.Lock() + defer p.Unlock() + + chosen := time.Now().Add(-2 * time.Hour).UnixNano() + + // prune all the event older than 2 hours + old := false + for !p.tracker.Empty() { + event, at := p.tracker.Pop() + if -at > chosen { + p.tracker.Push(event, at) + break + } else { + old = true + } + } + + // return the initial mean capacity when the metric is still young + if old { + return p.tracker.Size() + } else { + return p.rates.Capacity(EventsMsg) + } +} + func (p *peer) SetProgress(x PeerProgress) { p.Lock() defer p.Unlock() + if p.progress.Less(x) { + p.latestProgress = inter.Timestamp(time.Now().UnixNano()) + } p.progress = x } @@ -119,6 +171,7 @@ func newPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, cfg PeerCacheConfi queue: make(chan broadcastItem, cfg.MaxQueuedItems), queuedDataSemaphore: datasemaphore.New(dag.Metric{cfg.MaxQueuedItems, cfg.MaxQueuedSize}, getSemaphoreWarningFn("Peers queue")), term: make(chan struct{}), + tracker: prque.New(nil), } go peer.broadcast(peer.queue) diff --git a/gossip/peerset.go b/gossip/peerset.go index 2c6bc0094..06895275c 100644 --- a/gossip/peerset.go +++ b/gossip/peerset.go @@ -48,6 +48,7 @@ var ( // the `eth` protocol, with or without the `snap` extension. type peerSet struct { peers map[string]*peer // Peers connected on the `eth` protocol + rates *Trackers // Set of rate trackers snapPeers int // Number of `snap` compatible peers for connection prioritization snapWait map[string]chan *snap.Peer // Peers connected on `eth` waiting for their snap extension @@ -61,6 +62,7 @@ type peerSet struct { func newPeerSet() *peerSet { return &peerSet{ peers: make(map[string]*peer), + rates: NewTrackers(), snapWait: make(map[string]chan *snap.Peer), snapPend: make(map[string]*snap.Peer), } @@ -150,6 +152,11 @@ func (ps *peerSet) RegisterPeer(p *peer, ext *snap.Peer) error { ps.snapPeers++ } + p.rates = NewTracker(ps.rates.MeanCapacities()) + if err := ps.rates.Track(p.id, p.rates); err != nil { + ps.lock.Unlock() + return err + } ps.peers[id] = p return nil } @@ -165,6 +172,7 @@ func (ps *peerSet) UnregisterPeer(id string) error { return errPeerNotRegistered } delete(ps.peers, id) + ps.rates.Untrack(id) if peer.snapExt != nil { ps.snapPeers-- } diff --git a/gossip/protocol.go b/gossip/protocol.go index 828ff001b..b5b15b537 100644 --- a/gossip/protocol.go +++ b/gossip/protocol.go @@ -79,6 +79,7 @@ const ( ErrNoStatusMsg ErrExtraStatusMsg ErrSuspendedPeer + ErrUseLessPeer ErrEmptyMessage = 0xf00 )