Skip to content

Commit

Permalink
sweepbatcher: fix race condition when stopping
Browse files Browse the repository at this point in the history
The race was detected in CI and locally when running with -race.
It happened between the following calls:

WARNING: DATA RACE
Write at 0x00c0003e6638 by goroutine 1374:
  runtime.racewrite()
      <autogenerated>:1 +0x1e
  github.com/lightninglabs/loop/sweepbatcher.(*batch).Wait()
      sweepbatcher/sweep_batch.go:463 +0x6e
  github.com/lightninglabs/loop/sweepbatcher.(*Batcher).Run.func1()
      sweepbatcher/sweep_batcher.go:272 +0x10e

Previous read at 0x00c0003e6638 by goroutine 1388:
  runtime.raceread()
      <autogenerated>:1 +0x1e
  github.com/lightninglabs/loop/sweepbatcher.(*batch).monitorConfirmations()
      sweepbatcher/sweep_batch.go:1144 +0x285
  github.com/lightninglabs/loop/sweepbatcher.(*batch).handleSpend()
      sweepbatcher/sweep_batch.go:1309 +0x10e4
  github.com/lightninglabs/loop/sweepbatcher.(*batch).Run()
      sweepbatcher/sweep_batch.go:526 +0xb04
  github.com/lightninglabs/loop/sweepbatcher.(*Batcher).spinUpBatch.func1()
      sweepbatcher/sweep_batcher.go:455 +0xbd

The race was caused because wg.Add(1) and wg.Wait() were running from different
goroutines (one goroutine was running batch.Run() and another - batcher.Run()).

To avoid this scenario, wg.Wait() call was moved into batch.Run() call, so it
waits itself for its children goroutines, after which the channel b.finished
is closed, and it serves a signal for external waiters (the batcher, calling
batch.Wait()).

Also the channel batch.stopped was renamed to batch.stopping to better reflect
its nature.
  • Loading branch information
starius committed Jun 15, 2024
1 parent 06ae48c commit 58a971b
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,12 @@ type batch struct {
// main event loop.
callLeave chan struct{}

// stopped signals that the batch has stopped.
stopped chan struct{}
// stopping signals that the batch is stopping.
stopping chan struct{}

// finished signals that the batch has stopped and all child goroutines
// have finished.
finished chan struct{}

// quit is owned by the parent batcher and signals that the batch must
// stop.
Expand Down Expand Up @@ -273,7 +277,7 @@ func (b *batch) scheduleNextCall() (func(), error) {
case <-b.quit:
return func() {}, ErrBatcherShuttingDown

case <-b.stopped:
case <-b.stopping:
return func() {}, ErrBatchShuttingDown
}

Expand All @@ -297,7 +301,8 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopped: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
wallet: bk.wallet,
Expand Down Expand Up @@ -340,7 +345,8 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopped: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
batchPkScript: bk.batchPkScript,
Expand Down Expand Up @@ -460,16 +466,20 @@ func (b *batch) sweepExists(hash lntypes.Hash) bool {
// Wait waits for the batch to gracefully stop.
func (b *batch) Wait() {
b.log.Infof("Stopping")
b.wg.Wait()
<-b.finished
}

// Run is the batch's main event loop.
func (b *batch) Run(ctx context.Context) error {
runCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
close(b.stopped)
close(b.stopping)

// Make sure not to call b.wg.Wait from any other place to avoid
// race condition between b.wg.Add(1) and b.wg.Wait().
b.wg.Wait()
close(b.finished)
}()

if b.muSig2SignSweep == nil {
Expand Down

0 comments on commit 58a971b

Please sign in to comment.