From 2cf6934d5da004180277c72487a603f1466345db Mon Sep 17 00:00:00 2001 From: Greg Weber Date: Mon, 28 Oct 2024 15:45:15 -0500 Subject: [PATCH] return []error rather than error Concurrent functions return multiple errors. These are now returned only as either a channel or a slice. This avoids a dependency on a 3rd party multi errors library. The standard library errors.Join is awkward to use because there is not a simple way to unwrap the errors. We could use a custom multi error. However, to make this convenient to use it should be returned as a pointer to the struct rather than an opaque error. But Go has a subtle design issue where an interface can end up being non-nil when storing a pointer to a struct. Returning []error still allows the caller to use errors.Join() if they don't care about iterating the individual errors. The main issue is that someone may check != nil rather than len > 0 However, all the APIs are designed to return nil rather than a zero length slice. Additionally, don't use the default recovery handler. Write more errors to the error channels. Panic a few errors that shouldn't happen. --- README.md | 14 +-- go.mod | 2 - go.sum | 5 - parallel.go | 318 ++++++++++++++++++++++++++++++++--------------- parallel_test.go | 269 +++++++++++++++++++++------------------ 5 files changed, 371 insertions(+), 237 deletions(-) diff --git a/README.md b/README.md index 6a0c6e2..abd49a6 100644 --- a/README.md +++ b/README.md @@ -17,12 +17,10 @@ parallelism in Go using generics ## Error handling -This library relies on go-recovery to trap panics that occur in go routines. -go-recovery by default will log panics but can be configured to send them to an error monitoring service. +This library relies on go-recovery to trap panics that occur in user supplied work functions. +This library does have unhandled panics, but only in places where panics should never occur. +Errors and panics are written to an error channel for maximum flexibility. +There are helpers for common patterns for dealing with errors: -For maximum flexibility with error handling, many of the parallel functions return an error channel. -Any errors that occur in work functions will be put into the error channel. -There are helpers for common patterns for dealing with the errors: - -* CollectErrors -* CancelAfterFirstError +* CollectErrors (wait and convert to a slice) +* CancelAfterFirstError (cancel and wait and convert to a slice) diff --git a/go.mod b/go.mod index dcd5565..6daac0e 100644 --- a/go.mod +++ b/go.mod @@ -5,13 +5,11 @@ go 1.19 require ( github.com/gregwebs/go-recovery v0.2.1 github.com/stretchr/testify v1.8.1 - go.uber.org/multierr v1.9.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/gregwebs/errors v1.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.uber.org/atomic v1.7.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 384021f..2ffdba4 100644 --- a/go.sum +++ b/go.sum @@ -10,15 +10,10 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= -go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/parallel.go b/parallel.go index 2ffb754..7b70ae1 100644 --- a/parallel.go +++ b/parallel.go @@ -5,13 +5,14 @@ import ( "sync" "github.com/gregwebs/go-recovery" - "go.uber.org/multierr" ) -// n is the number of go routines to spawn -// errors are returned as a combined multierr -// panics in the given function are recovered and converted to an error -func Concurrent(n int, fn func(int) error) error { +// Concurrent spawns n go routines each of which runs the given function. +// a panic in the given function is recovered and converted to an error +// errors are returned as []error, a slice of errors +// If there are no errors, the slice will be nil +// To combine the errors as a single error, use errors.Join +func Concurrent(n int, fn func(int) error) []error { errs := make([]error, n) var wg sync.WaitGroup for i := 0; i < n; i++ { @@ -24,7 +25,22 @@ func Concurrent(n int, fn func(int) error) error { }) } wg.Wait() - return multierr.Combine(errs...) + return removeNilErrors(errs...) +} + +func sendErrorRecover(c chan<- error, err error) error { + if err == nil { + return nil + } + return sendWithRecover(c, err) +} + +// return true if successful, false if recovered +func sendWithRecover[T any](c chan<- T, obj T) error { + return recovery.Call(func() error { + c <- obj + return nil + }) } // try to send to a channel, return true if sent, false if not @@ -37,7 +53,7 @@ func TrySend[T any](c chan<- T, obj T) bool { } } -// try to send to a channel, return true if sent, false if not +// try to receive from a channel, return false if nothing received func TryRecv[T any](c <-chan T) (receivedObject T, received bool) { select { case receivedObject = <-c: @@ -48,11 +64,14 @@ func TryRecv[T any](c <-chan T) (receivedObject T, received bool) { return } -// Combine all errors from a channel of errors into a single multierr error. -func CollectErrors(errors <-chan error) error { - var errResult error - for e := range errors { - errResult = multierr.Combine(errResult, e) +// Wait for all errors from a channel of errors. +// errors are returned as []error, a slice of errors +// If there are no errors, the slice will be nil +// To combine the errors as a single error, use errors.Join +func CollectErrors(errChannel <-chan error) []error { + var errResult []error + for err := range errChannel { + errResult = append(errResult, err) } return errResult } @@ -67,7 +86,10 @@ func CollectErrors(errors <-chan error) error { // Errors are sent to the returned error channel. // When the given queue is closed and the work is processed, the returned error channel will be closed. func QueueWorkers[T any](n int, queue <-chan T, fn func(T) error) <-chan error { - errors := make(chan error) + if n == 0 { + n = 1 + } + errChannel := make(chan error) chans := make([]chan T, n) for i := range chans { chans[i] = make(chan T) @@ -79,6 +101,8 @@ func QueueWorkers[T any](n int, queue <-chan T, fn func(T) error) <-chan error { } var chanValues []reflect.Value + var cases []reflect.SelectCase + distributeWork := func(work T) { for _, c := range chans { if worked := TrySend(c, work); worked { @@ -86,27 +110,36 @@ func QueueWorkers[T any](n int, queue <-chan T, fn func(T) error) <-chan error { } } + workReflected := reflect.ValueOf(work) // No worker channels were available // Use a select to let the runtime choose the next available channel - // Our channels are dynamic, so to use reflect.Select - // Reflection has some additional overhead - // However, for most use cases of this function it shouldn't be noticeable. if chanValues == nil { chanValues = make([]reflect.Value, len(chans)) for i := range chans { chanValues[i] = reflect.ValueOf(chans[i]) } } - workReflected := reflect.ValueOf(work) - cases := make([]reflect.SelectCase, len(chans)) - for i := range chans { - cases[i] = reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanValues[i], Send: workReflected} + if cases == nil { + cases = make([]reflect.SelectCase, len(chans)) + for i := range chans { + cases[i] = reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanValues[i], Send: workReflected} + } + } else { + for i := range chans { + cases[i].Send = workReflected + } } _, _, _ = reflect.Select(cases) } + errorChannelOrPanic := func(err error) { + if errRecover := sendErrorRecover(errChannel, err); errRecover != nil { + panic(err) + } + } + // distribute the work over the worker channels - go recovery.Go(func() error { + go recovery.GoHandler(errorChannelOrPanic, func() error { defer closeChans() for work := range queue { distributeWork(work) @@ -114,54 +147,78 @@ func QueueWorkers[T any](n int, queue <-chan T, fn func(T) error) <-chan error { return nil }) - // run a concurrent worker for each channel // send errors to the error channel - go func() { - defer close(errors) - - // work the channels in parallel - extraErrors := Concurrent(n, func(i int) error { - // Attempt to send an error or recovered panic to the error channel. - // But if the error channel is blocked, don't wait: - // go ahead and process the work - // try sending the error again when done processing that work - var errs error - for work := range chans[i] { - err := recovery.Call(func() error { return fn(work) }) - if err != nil { - errs = multierr.Combine(errs, err) + // An unexpected panic while writing to the error channel will panic + go recovery.GoHandler(func(err error) { panic(err) }, func() error { + defer close(errChannel) + return sendErrorRecover(errChannel, recovery.Call(func() error { + unsentErrorsAll := make([][]error, n) + // run a concurrent worker for each channel + recoveredErrors := Concurrent(n, func(i int) error { + // Attempt to send an error or recovered panic to the error channel. + // But if the error channel is blocked, don't wait: + // go ahead and process the work + // try sending the error again when done processing that work + var unsentErrors []error + for work := range chans[i] { + if err := recovery.Call(func() error { return fn(work) }); err != nil { + if sent := TrySend(errChannel, err); !sent { + unsentErrors = append(unsentErrors, err) + } + } + } + // Try sending unsent errors again + // Don't block though so the Go routine can be shutdown + for unsentI, err := range unsentErrors { + if sent := TrySend(errChannel, err); sent { + unsentErrors[unsentI] = nil + } } - if errs != nil { - if sent := TrySend(errors, err); sent { - errs = nil + unsentErrorsAll[i] = unsentErrors + return nil + }) + + // Now block while sending any remaining errors + for _, errs := range unsentErrorsAll { + for _, e := range errs { + if e != nil { + errChannel <- e } } } - return errs - }) - // after all work is processed, ensure all errors are sent before the channel is closed - if extraErrors != nil { - errors <- extraErrors - } - }() + for _, err := range recoveredErrors { + errChannel <- err + } - return errors + return nil + })) + }) + + return errChannel } // For functions that take a cancel channel and return an error channel. -// This helper will cancel on the first error. -// Waits for all processing to complete. -// Returns a multierror of any resulting errors. +// Attempt to cancel all processing but wait for it to finish. +// +// This helper will trigger the cancel channel after the first error. +// It then waits for the error channel to be closed. // -// This does not immediately stop existing processing (which requires panicing). -// This cancels future work distribution. -func CancelAfterFirstError(cancel chan struct{}, errors <-chan error) error { - if errResult := <-errors; errResult != nil { +// errors are returned as []error, a slice of errors +// If there are no errors, the slice will be nil +// To combine the errors as a single error, use errors.Join +func CancelAfterFirstError(cancel chan struct{}, errChannel <-chan error) []error { + for { + errResult, ok := <-errChannel + if !ok { + return nil + } + if errResult == nil { + continue + } cancel <- struct{}{} - return multierr.Combine(errResult, CollectErrors(errors)) + return append([]error{errResult}, CollectErrors(errChannel)...) } - return nil } type withIndex[T any] struct { @@ -178,36 +235,45 @@ func ArrayWorkers1[T any](nParallel int, objects []T, cancel <-chan struct{}, fn nParallel = len(objects) } + errChannel := make(chan error, 1) if len(objects) == 1 { - errors := make(chan error) - go recovery.Go(func() error { - errors <- recovery.Call(func() error { + go recovery.GoHandler(func(err error) { panic(err) }, func() error { + defer close(errChannel) + err := recovery.Call(func() error { return fn(0, objects[0]) }) + if err != nil { + errChannel <- err + } return nil }) - return errors + return errChannel } queue := make(chan withIndex[T]) // put the objects into the queue - go recovery.Go(func() error { - defer close(queue) - - for i, object := range objects { - if _, canceled := TryRecv(cancel); canceled { - break + go recovery.GoHandler(func(err error) { panic(err) }, func() error { + defer close(errChannel) + return sendErrorRecover(errChannel, recovery.Call(func() error { + defer close(queue) + + for i, object := range objects { + if _, canceled := TryRecv(cancel); canceled { + break + } + queue <- withIndex[T]{Index: i, val: object} } - queue <- withIndex[T]{Index: i, val: object} - } - return nil + return nil + })) }) withIndexFn := func(wi withIndex[T]) error { return fn(wi.Index, wi.val) } - return QueueWorkers(nParallel, queue, withIndexFn) + // queue and errChannel are not closed even though errQueueWorkers is? + errQueueWorkers := QueueWorkers(nParallel, queue, withIndexFn) + return ChannelMerge(errQueueWorkers, errChannel) } // If the length is too small, decrease the batch size @@ -230,12 +296,12 @@ type BatchWork struct { // BatchWorkers combines BatchedChannel, QueueWorkers, and CancelAfterFirstError // The given objects are batched up and worked in parallel -func BatchWorkers[T any](bw BatchWork, objects []T, worker func([]T) error) error { +func BatchWorkers[T any](bw BatchWork, objects []T, worker func([]T) error) []error { if bw.Cancel == nil { bw.Cancel = make(chan struct{}) } - queue := BatchedChannel(bw, objects) - errors := QueueWorkers(bw.Parallelism, queue, worker) + queue, errBatched := BatchedChannel(bw, objects) + errors := ChannelMerge(QueueWorkers(bw.Parallelism, queue, worker), errBatched) return CancelAfterFirstError(bw.Cancel, errors) } @@ -245,47 +311,52 @@ func (bw *BatchWork) AdjustForSmallLength(total int) int { return bw.Size } -// BatchedChannel sends slices of Batchwork.Size objects to the resulting channel -func BatchedChannel[T any](bw BatchWork, objects []T) <-chan []T { +// BatchedChannel sends slices of Batchwork.Size objects to the resulting channel. +// The error channel should not have an error but is there in case there is a panic in the batching go routine. +func BatchedChannel[T any](bw BatchWork, objects []T) (<-chan []T, <-chan error) { sender := make(chan []T) total := len(objects) if total == 0 { - return sender + return sender, nil } batchSize := bw.AdjustForSmallLength(total) - go recovery.Go(func() error { - defer close(sender) - - tryCancel := func() bool { return false } - if bw.Cancel != nil { - tryCancel = func() bool { - _, canceled := TryRecv(bw.Cancel) - return canceled + errChannel := make(chan error, 1) + go recovery.GoHandler(func(err error) { panic(err) }, func() error { + defer close(errChannel) + return sendErrorRecover(errChannel, recovery.Call(func() error { + defer close(sender) + + tryCancel := func() bool { return false } + if bw.Cancel != nil { + tryCancel = func() bool { + _, canceled := TryRecv(bw.Cancel) + return canceled + } } - } - for i := 0; ((i + 1) * batchSize) <= total; i += 1 { - lower := i * batchSize - if canceled := tryCancel(); canceled { - return nil + for i := 0; ((i + 1) * batchSize) <= total; i += 1 { + lower := i * batchSize + if canceled := tryCancel(); canceled { + return nil + } + sender <- objects[lower : lower+batchSize] } - sender <- objects[lower : lower+batchSize] - } - rem := total % batchSize - if rem > 0 { - if canceled := tryCancel(); canceled { - return nil + rem := total % batchSize + if rem > 0 { + if canceled := tryCancel(); canceled { + return nil + } + sender <- objects[total-rem : total] } - sender <- objects[total-rem : total] - } - return nil + return nil + })) }) - return sender + return sender, errChannel } -func MapBatches[T any, U any](nParallel int, objects []T, fn func(T) (U, error)) ([]U, error) { +func MapBatches[T any, U any](nParallel int, objects []T, fn func(T) (U, error)) ([]U, []error) { total := len(objects) out := make([]U, total) batchSize := total / nParallel @@ -315,3 +386,50 @@ func MapBatches[T any, U any](nParallel int, objects []T, fn func(T) (U, error)) }) return out, err } + +// This is the same as errors.Join but does not wrap the array +func removeNilErrors(errs ...error) []error { + n := 0 + for _, err := range errs { + if err != nil { + n++ + } + } + if n == 0 { + return nil + } + newErrs := make([]error, 0, n) + for _, err := range errs { + if err != nil { + newErrs = append(newErrs, err) + } + } + return newErrs +} + +// From this article: https://go.dev/blog/pipelines +func ChannelMerge[T any](cs ...<-chan T) <-chan T { + var wg sync.WaitGroup + out := make(chan T) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c is closed, then calls wg.Done. + output := func(c <-chan T) { + for n := range c { + out <- n + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} diff --git a/parallel_test.go b/parallel_test.go index ae5fe66..8d6a5eb 100644 --- a/parallel_test.go +++ b/parallel_test.go @@ -1,7 +1,7 @@ package parallel_test import ( - "fmt" + "errors" "sync" "testing" @@ -10,51 +10,8 @@ import ( "github.com/stretchr/testify/assert" ) -func TestRecoveredCall(t *testing.T) { - err := recovery.Call(func() error { - return nil - }) - assert.Nil(t, err) - err = recovery.Call(func() error { - return fmt.Errorf("return error") - }) - assert.NotNil(t, err) - err = recovery.Call(func() error { - panic("panic") - }) - assert.NotNil(t, err) - assert.Equal(t, "panic", err.Error()) -} - -func TestGoRecovered(t *testing.T) { - noError := func(err error) { - assert.Nil(t, err) - } - errHappened := func(err error) { - assert.NotNil(t, err) - } - recovery.GoHandler(noError, func() error { - return nil - }) - recovery.GoHandler(errHappened, func() error { - panic("panic") - }) - - wait := make(chan struct{}) - go recovery.GoHandler(noError, func() error { - wait <- struct{}{} - return nil - }) - go recovery.GoHandler(errHappened, func() error { - defer func() { wait <- struct{}{} }() - panic("panic") - }) - <-wait - <-wait -} - func TestConcurrent(t *testing.T) { - var err error + var err []error workNone := func(_ int) error { return nil } err = parallel.Concurrent(0, workNone) assert.Nil(t, err) @@ -76,71 +33,128 @@ func TestConcurrent(t *testing.T) { } func TestQueueWorkers(t *testing.T) { - var err error - var queue chan int workNone := func(_ int) error { return nil } - queue = make(chan int) - err = parallel.CollectErrors(parallel.QueueWorkers(0, queue, workNone)) - assert.Nil(t, err) - close(queue) - - queue = make(chan int) - go recovery.Go(func() error { - queue <- 1 - queue <- 1 + { + queue := make(chan int) close(queue) - return nil - }) - err = parallel.CollectErrors(parallel.QueueWorkers(2, queue, workNone)) - assert.Nil(t, err) + err := parallel.CollectErrors(parallel.QueueWorkers(0, queue, workNone)) + assert.Nil(t, err) + } - tracked := make([]bool, 10) - workTracked := func(i int) error { tracked[i] = true; return nil } - queue = make(chan int) - go recovery.Go(func() error { - queue <- 0 - queue <- 1 - close(queue) - return nil - }) - err = parallel.CollectErrors(parallel.QueueWorkers(2, queue, workTracked)) - assert.Nil(t, err) - assert.False(t, tracked[2]) - assert.True(t, tracked[1]) - assert.True(t, tracked[0]) + { + queue := make(chan int) + go recovery.Go(func() error { + queue <- 1 + queue <- 1 + close(queue) + return nil + }) + err := parallel.CollectErrors(parallel.QueueWorkers(2, queue, workNone)) + assert.Nil(t, err) + } + + { + tracked := make([]bool, 10) + workTracked := func(i int) error { tracked[i] = true; return nil } + queue := make(chan int) + go recovery.Go(func() error { + queue <- 0 + queue <- 1 + close(queue) + return nil + }) + err := parallel.CollectErrors(parallel.QueueWorkers(2, queue, workTracked)) + assert.Nil(t, err) + assert.False(t, tracked[2]) + assert.True(t, tracked[1]) + assert.True(t, tracked[0]) + } } -func arrayWorkers1[T any](nParallel int, objects []T, worker func(int, T) error) error { +func arrayWorkers1[T any](nParallel int, objects []T, worker func(int, T) error) []error { cancel := make(chan struct{}) errors := parallel.ArrayWorkers1(nParallel, objects, cancel, worker) return parallel.CancelAfterFirstError(cancel, errors) } +func TestCancelAfterFirstError(t *testing.T) { + cancel := make(chan struct{}, 10) + { + errChan := make(chan error, 10) + close(errChan) + errs := parallel.CancelAfterFirstError(cancel, errChan) + assert.Nil(t, errs) + } + + { + errChan := make(chan error, 10) + errChan <- errors.New("first error") + errChan <- errors.New("second error") + close(errChan) + errs := parallel.CancelAfterFirstError(cancel, errChan) + assert.Len(t, errs, 2) + } +} + +func TestChannelMerge(t *testing.T) { + { + c1 := make(chan error) + c2 := make(chan error) + close(c1) + close(c2) + err, ok := parallel.TryRecv(parallel.ChannelMerge(c1, c2)) + assert.False(t, ok) + assert.Nil(t, err) + } + + { + c1 := make(chan error) + c2 := make(chan error) + go func() { + c1 <- errors.New("c1") + c2 <- errors.New("c2") + close(c1) + close(c2) + }() + merged := parallel.ChannelMerge(c1, c2) + _, ok := <-merged + assert.True(t, ok) + _, ok = <-merged + assert.True(t, ok) + _, ok = <-merged + assert.False(t, ok) + } +} + func TestArrayWorkers1(t *testing.T) { - var err error workNone := func(_ int, _ bool) error { return nil } - tracked := make([]bool, 10) - err = arrayWorkers1(0, tracked, workNone) - assert.Nil(t, err) + { + tracked := make([]bool, 10) + err := arrayWorkers1(0, tracked, workNone) + assert.Nil(t, err) + } - tracked = make([]bool, 1) - err = arrayWorkers1(10, tracked, workNone) - assert.Nil(t, err) + { + tracked := make([]bool, 1) + err := arrayWorkers1(10, tracked, workNone) + assert.Nil(t, err) + } - tracked = make([]bool, 10) - workTracked := func(i int, _ bool) error { tracked[i] = true; return nil } - err = arrayWorkers1(0, tracked, workTracked) - assert.Nil(t, err) - assert.False(t, tracked[0]) + { + tracked := make([]bool, 10) + workTracked := func(i int, _ bool) error { tracked[i] = true; return nil } + err := arrayWorkers1(1, tracked, workTracked) + assert.Nil(t, err) + assert.NotContains(t, tracked, false) + } - tracked = make([]bool, 10) - workTracked = func(i int, _ bool) error { tracked[i] = true; return nil } - err = arrayWorkers1(2, tracked, workTracked) - assert.Nil(t, err) - assert.True(t, tracked[0]) - assert.True(t, tracked[1]) - assert.True(t, tracked[2]) - assert.True(t, tracked[9]) + { + tracked := make([]bool, 10) + workTracked := func(i int, _ bool) error { tracked[i] = true; return nil } + err := arrayWorkers1(2, tracked, workTracked) + assert.Nil(t, err) + assert.NotContains(t, tracked, false) + } } type SyncNumber struct { @@ -155,22 +169,21 @@ func (sn *SyncNumber) Add(x int) { } func TestBatchWorkers(t *testing.T) { - var err error workNone := func(_ []bool) error { return nil } - tracked := make([]bool, 10) - bw := parallel.BatchWork{Size: 2, Parallelism: 0} - err = parallel.BatchWorkers(bw, tracked, workNone) - assert.Nil(t, err) - - tracked = make([]bool, 10) - bw = parallel.BatchWork{Size: 2, Parallelism: 2} - err = parallel.BatchWorkers(bw, tracked, workNone) - assert.Nil(t, err) + { + tracked := make([]bool, 10) + bw := parallel.BatchWork{Size: 2, Parallelism: 0} + err := parallel.BatchWorkers(bw, tracked, workNone) + assert.Nil(t, err) + } - work := make([]int, 10) - for i := range work { - work[i] = i + 1 + { + tracked := make([]bool, 10) + bw := parallel.BatchWork{Size: 2, Parallelism: 2} + err := parallel.BatchWorkers(bw, tracked, workNone) + assert.Nil(t, err) } + output := SyncNumber{Number: 0} add := func(batch []int) error { for _, x := range batch { @@ -178,22 +191,34 @@ func TestBatchWorkers(t *testing.T) { } return nil } - bw = parallel.BatchWork{Size: 1, Parallelism: 1} - err = parallel.BatchWorkers(bw, work, add) - assert.Nil(t, err) - assert.Equal(t, output.Number, 55) + work := make([]int, 10) + for i := range work { + work[i] = i + 1 + } - output = SyncNumber{Number: 0} - bw = parallel.BatchWork{Size: 2, Parallelism: 2} - err = parallel.BatchWorkers(bw, work, add) - assert.Nil(t, err) - assert.Equal(t, output.Number, 55) + { + output = SyncNumber{Number: 0} + bw := parallel.BatchWork{Size: 1, Parallelism: 1} + err := parallel.BatchWorkers(bw, work, add) + assert.Nil(t, err) + assert.Equal(t, output.Number, 55) + } - output = SyncNumber{Number: 0} - bw = parallel.BatchWork{Size: 3, Parallelism: 3} - err = parallel.BatchWorkers(bw, work, add) - assert.Nil(t, err) - assert.Equal(t, output.Number, 55) + { + output = SyncNumber{Number: 0} + bw := parallel.BatchWork{Size: 2, Parallelism: 2} + err := parallel.BatchWorkers(bw, work, add) + assert.Nil(t, err) + assert.Equal(t, output.Number, 55) + } + + { + output = SyncNumber{Number: 0} + bw := parallel.BatchWork{Size: 3, Parallelism: 3} + err := parallel.BatchWorkers(bw, work, add) + assert.Nil(t, err) + assert.Equal(t, output.Number, 55) + } } func TestBatchWorkersSmallBatchSize(t *testing.T) {