Skip to content

Commit

Permalink
perf: various improvements in pusher, pushsync, salud, reacher (#4958)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Feb 3, 2025
1 parent 5456d14 commit 92057ff
Show file tree
Hide file tree
Showing 16 changed files with 257 additions and 128 deletions.
2 changes: 1 addition & 1 deletion openapi/Swarm.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
openapi: 3.0.3

info:
version: 7.2.0
version: 7.3.0
title: Bee API
description: "A list of the currently provided Interfaces to interact with the swarm, implementing file operations and sending messages"

Expand Down
2 changes: 1 addition & 1 deletion openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
openapi: 3.0.3
info:
version: 4.2.0
version: 4.3.0
title: Common Data Types
description: |
\*****bzzz*****
Expand Down
14 changes: 10 additions & 4 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,15 @@ func NewBee(
}
}(b)

if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > 1 {
if !o.FullNodeMode && o.ReserveCapacityDoubling != 0 {
return nil, fmt.Errorf("reserve capacity doubling is only allowed for full nodes")
}

const maxAllowedDoubling = 1
if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > maxAllowedDoubling {
return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1")
}
var shallowReceiptTolerance = maxAllowedDoubling - o.ReserveCapacityDoubling

reserveCapacity := (1 << o.ReserveCapacityDoubling) * storer.DefaultReserveCapacity

Expand Down Expand Up @@ -723,7 +729,7 @@ func NewBee(
Batchstore: batchStore,
StateStore: stateStore,
RadiusSetter: kad,
WarmupDuration: o.WarmupTime,
WarmupDuration: warmupTime,
Logger: logger,
Tracer: tracer,
CacheMinEvictCount: cacheMinEvictCount,
Expand Down Expand Up @@ -955,7 +961,7 @@ func NewBee(
}
}

pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, warmupTime)
pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, warmupTime, uint8(shallowReceiptTolerance))
b.pushSyncCloser = pushSyncProtocol

// set the pushSyncer in the PSS
Expand All @@ -964,7 +970,7 @@ func NewBee(
retrieval := retrieval.New(swarmAddress, waitNetworkRFunc, localStore, p2ps, kad, logger, acc, pricer, tracer, o.RetrievalCaching)
localStore.SetRetrievalService(retrieval)

pusherService := pusher.New(networkID, localStore, pushSyncProtocol, validStamp, logger, warmupTime, pusher.DefaultRetryCount)
pusherService := pusher.New(networkID, localStore, pushSyncProtocol, batchStore, logger, warmupTime, pusher.DefaultRetryCount)
b.pusherCloser = pusherService

pusherService.AddFeed(localStore.PusherFeed())
Expand Down
36 changes: 13 additions & 23 deletions pkg/p2p/libp2p/internal/reacher/reacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

const (
pingTimeout = time.Second * 15
workers = 8
workers = 16
retryAfterDuration = time.Minute * 5
)

Expand All @@ -32,8 +32,8 @@ type reacher struct {
mu sync.Mutex
peers map[string]*peer

work chan struct{}
quit chan struct{}
newPeer chan struct{}
quit chan struct{}

pinger p2p.Pinger
notifier p2p.ReachableNotifier
Expand All @@ -53,7 +53,7 @@ type Options struct {
func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options) *reacher {

r := &reacher{
work: make(chan struct{}, 1),
newPeer: make(chan struct{}, 1),
quit: make(chan struct{}),
pinger: streamer,
peers: make(map[string]*peer),
Expand Down Expand Up @@ -103,7 +103,7 @@ func (r *reacher) manage() {
select {
case <-r.quit:
return
case <-r.work:
case <-r.newPeer:
continue
case <-time.After(tryAfter):
continue
Expand All @@ -115,12 +115,12 @@ func (r *reacher) manage() {
select {
case <-r.quit:
return
case <-r.work:
case <-r.newPeer:
continue
}
}

// send p to channel
// ping peer
select {
case <-r.quit:
return
Expand All @@ -135,10 +135,6 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {

for p := range c {

r.mu.Lock()
overlay := p.overlay
r.mu.Unlock()

now := time.Now()

ctxt, cancel := context.WithTimeout(ctx, r.options.PingTimeout)
Expand All @@ -149,14 +145,12 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {
if err == nil {
r.metrics.Pings.WithLabelValues("success").Inc()
r.metrics.PingTime.WithLabelValues("success").Observe(time.Since(now).Seconds())
r.notifier.Reachable(overlay, p2p.ReachabilityStatusPublic)
r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPublic)
} else {
r.metrics.Pings.WithLabelValues("failure").Inc()
r.metrics.PingTime.WithLabelValues("failure").Observe(time.Since(now).Seconds())
r.notifier.Reachable(overlay, p2p.ReachabilityStatusPrivate)
r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPrivate)
}

r.notifyManage()
}
}

Expand Down Expand Up @@ -191,13 +185,6 @@ func (r *reacher) tryAcquirePeer() (*peer, time.Duration) {
return nil, time.Until(nextClosest)
}

func (r *reacher) notifyManage() {
select {
case r.work <- struct{}{}:
default:
}
}

// Connected adds a new peer to the queue for testing reachability.
func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) {
r.mu.Lock()
Expand All @@ -207,7 +194,10 @@ func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) {
r.peers[overlay.ByteString()] = &peer{overlay: overlay, addr: addr}
}

r.notifyManage()
select {
case r.newPeer <- struct{}{}:
default:
}
}

// Disconnected removes a peer from the queue.
Expand Down
9 changes: 6 additions & 3 deletions pkg/postage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ type ChainSnapshot struct {
// on the current (highest available) block.
type Storer interface {
ChainStateGetter
BatchExist

Radius() uint8

// Get returns a batch from the store with the given ID.
Get([]byte) (*Batch, error)

// Exists reports whether batch referenced by the give id exists.
Exists([]byte) (bool, error)

// Iterate iterates through stored batches.
Iterate(func(*Batch) (bool, error)) error

Expand All @@ -73,6 +71,11 @@ type Storer interface {
SetBatchExpiryHandler(BatchExpiryHandler)
}

type BatchExist interface {
// Exists reports whether batch referenced by the give id exists.
Exists([]byte) (bool, error)
}

// StorageRadiusSetter is used to calculate total batch commitment of the network.
type CommitmentGetter interface {
Commitment() (uint64, error)
Expand Down
26 changes: 11 additions & 15 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Service struct {
networkID uint64
storer Storer
pushSyncer pushsync.PushSyncer
validStamp postage.ValidStampFn
batchExist postage.BatchExist
logger log.Logger
metrics metrics
quit chan struct{}
Expand All @@ -63,19 +63,15 @@ type Service struct {

const (
traceDuration = 30 * time.Second // duration for every root tracing span
ConcurrentPushes = 100 // how many chunks to push simultaneously
ConcurrentPushes = swarm.Branches // how many chunks to push simultaneously
DefaultRetryCount = 6
)

var (
ErrInvalidAddress = errors.New("invalid address")
)

func New(
networkID uint64,
storer Storer,
pushSyncer pushsync.PushSyncer,
validStamp postage.ValidStampFn,
batchExist postage.BatchExist,
logger log.Logger,
warmupTime time.Duration,
retryCount int,
Expand All @@ -84,7 +80,7 @@ func New(
networkID: networkID,
storer: storer,
pushSyncer: pushSyncer,
validStamp: validStamp,
batchExist: batchExist,
logger: logger.WithName(loggerName).Register(),
metrics: newMetrics(),
quit: make(chan struct{}),
Expand Down Expand Up @@ -251,14 +247,14 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (

defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())

if _, err := s.validStamp(op.Chunk); err != nil {
ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID())
if !ok || err != nil {
loggerV1.Warning(
"stamp with is no longer valid, skipping syncing for chunk",
"stamp is no longer valid, skipping syncing for chunk",
"batch_id", hex.EncodeToString(op.Chunk.Stamp().BatchID()),
"chunk_address", op.Chunk.Address(),
"error", err,
)

return false, errors.Join(err, s.storer.Report(ctx, op.Chunk, storage.ChunkCouldNotSync))
}

Expand Down Expand Up @@ -311,10 +307,10 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err
}
}()

_, err = s.validStamp(op.Chunk)
if err != nil {
logger.Warning(
"stamp with is no longer valid, skipping direct upload for chunk",
ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID())
if !ok || err != nil {
loggerV1.Warning(
"stamp is no longer valid, skipping direct upload for chunk",
"batch_id", hex.EncodeToString(op.Chunk.Stamp().BatchID()),
"chunk_address", op.Chunk.Address(),
"error", err,
Expand Down
28 changes: 15 additions & 13 deletions pkg/pusher/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/postage"
batchstoremock "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
"github.com/ethersphere/bee/v2/pkg/pusher"
"github.com/ethersphere/bee/v2/pkg/pushsync"
pushsyncmock "github.com/ethersphere/bee/v2/pkg/pushsync/mock"
Expand All @@ -33,9 +34,9 @@ const spinTimeout = time.Second * 3

var (
block = common.HexToHash("0x1").Bytes()
defaultMockValidStamp = func(ch swarm.Chunk) (swarm.Chunk, error) {
return ch, nil
}
defaultMockBatchStore = batchstoremock.New(batchstoremock.WithExistsFunc(func(b []byte) (bool, error) {
return true, nil
}))
defaultRetryCount = 3
)

Expand Down Expand Up @@ -134,7 +135,7 @@ func TestChunkSyncing(t *testing.T) {
t,
storer,
pushSyncService,
defaultMockValidStamp,
defaultMockBatchStore,
defaultRetryCount,
)

Expand Down Expand Up @@ -181,7 +182,7 @@ func TestChunkStored(t *testing.T) {
t,
storer,
pushSyncService,
defaultMockValidStamp,
defaultMockBatchStore,
defaultRetryCount,
)

Expand Down Expand Up @@ -239,7 +240,7 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
t,
storer,
pushSyncService,
defaultMockValidStamp,
defaultMockBatchStore,
defaultRetryCount,
)

Expand Down Expand Up @@ -283,7 +284,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
t,
storer,
pushSyncService,
defaultMockValidStamp,
defaultMockBatchStore,
defaultRetryCount,
)

Expand Down Expand Up @@ -326,7 +327,7 @@ func TestPusherRetryShallow(t *testing.T) {
t,
storer,
pushSyncService,
defaultMockValidStamp,
defaultMockBatchStore,
defaultRetryCount,
)

Expand Down Expand Up @@ -364,9 +365,10 @@ func TestChunkWithInvalidStampSkipped(t *testing.T) {
})

wantErr := errors.New("dummy error")
validStamp := func(ch swarm.Chunk) (swarm.Chunk, error) {
return nil, wantErr
}

bmock := batchstoremock.New(batchstoremock.WithExistsFunc(func(b []byte) (bool, error) {
return false, wantErr
}))

storer := &mockStorer{
chunks: make(chan swarm.Chunk),
Expand All @@ -376,7 +378,7 @@ func TestChunkWithInvalidStampSkipped(t *testing.T) {
t,
storer,
pushSyncService,
validStamp,
bmock,
defaultRetryCount,
)

Expand Down Expand Up @@ -412,7 +414,7 @@ func createPusher(
t *testing.T,
storer pusher.Storer,
pushSyncService pushsync.PushSyncer,
validStamp postage.ValidStampFn,
validStamp postage.BatchExist,
retryCount int,
) *pusher.Service {
t.Helper()
Expand Down
Loading

0 comments on commit 92057ff

Please sign in to comment.