Skip to content

Commit

Permalink
Merge pull request #4 from gregwebs/dependencies
Browse files Browse the repository at this point in the history
move some code to go-concurrent
  • Loading branch information
gregwebs authored Oct 29, 2024
2 parents 7b7812c + 97bc56e commit eff0dff
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 198 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- name: setup
uses: actions/setup-go@v4
with:
go-version: '>=1.21'
go-version: '>=1.23.2'
cache: false
check-latest: true

Expand Down Expand Up @@ -63,6 +63,6 @@ jobs:
- id: govulncheck
uses: golang/govulncheck-action@v1
with:
go-version-input: 1.21
go-version-input: 1.23.2
go-package: ./...
cache: false
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
module github.com/gregwebs/go-parallel

go 1.19
go 1.23.1

toolchain go1.23.2

require (
github.com/gregwebs/go-recovery v0.2.1
github.com/stretchr/testify v1.8.1
github.com/gregwebs/go-concurrent v0.1.0
github.com/gregwebs/go-recovery v0.3.2
github.com/shoenig/test v1.11.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gregwebs/errors v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/gregwebs/errors v1.14.0 // indirect
)
23 changes: 10 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gregwebs/errors v1.1.0 h1:I0CqB+CPseFcYyoR3t/+mOVPgdukwTUcZKAN8nm7xRY=
github.com/gregwebs/errors v1.1.0/go.mod h1:oGma9emsMfJUvDNUK5O72GU4o6WBKiQWK7dl1md9Plo=
github.com/gregwebs/go-recovery v0.2.1 h1:ei1tYhPM0AEB9cDVwNYYOI/y6OwvoEKulCCWYlawEvQ=
github.com/gregwebs/go-recovery v0.2.1/go.mod h1:uZLbMT7fdZLQ+PoBqUwN1cceDlhYj0fPk8WUN74zyKc=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gregwebs/errors v1.14.0 h1:bkCLXmqiH+nNXUZi4+CNmDEUq04y9HQg0kNOtNPMS8E=
github.com/gregwebs/errors v1.14.0/go.mod h1:1NkCObP7+scylHlC69lwHl2ACOHwktWYrZV4EJDEl6g=
github.com/gregwebs/go-concurrent v0.1.0 h1:khACF0e3zdgm2/rIxXzbl8ZvNn4xHf+JJs9HxWDgde0=
github.com/gregwebs/go-concurrent v0.1.0/go.mod h1:7TJ6cEgQT2xq6vl3ba9/gGyNC6/rX1042wi8e1TOk3U=
github.com/gregwebs/go-recovery v0.3.2 h1:vGjQtFXS5lRhDlMlMWCpbm4kQSu2FnZ6ozmxCtCGbJE=
github.com/gregwebs/go-recovery v0.3.2/go.mod h1:q1/QQy0LZN6bIBT90Bc07Xo2ndJEoTc0k8e0l6BN/xY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/shoenig/test v1.11.0 h1:NoPa5GIoBwuqzIviCrnUJa+t5Xb4xi5Z+zODJnIDsEQ=
github.com/shoenig/test v1.11.0/go.mod h1:UxJ6u/x2v/TNs/LoLxBNJRV9DiwBBKYxXSyczsBHFoI=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
109 changes: 10 additions & 99 deletions parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,11 @@ package parallel

import (
"reflect"
"sync"

"github.com/gregwebs/go-concurrent"
"github.com/gregwebs/go-recovery"
)

// Concurrent spawns n go routines each of which runs the given function.
// a panic in the given function is recovered and converted to an error
// errors are returned as []error, a slice of errors
// If there are no errors, the slice will be nil
// To combine the errors as a single error, use errors.Join
func Concurrent(n int, fn func(int) error) []error {
errs := make([]error, n)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
i := i
wg.Add(1)
go recovery.GoHandler(func(err error) { errs[i] = err }, func() error {
defer wg.Done()
errs[i] = fn(i)
return nil
})
}
wg.Wait()
return removeNilErrors(errs...)
}

func sendErrorRecover(c chan<- error, err error) error {
if err == nil {
return nil
Expand All @@ -43,27 +22,6 @@ func sendWithRecover[T any](c chan<- T, obj T) error {
})
}

// try to send to a channel, return true if sent, false if not
func TrySend[T any](c chan<- T, obj T) bool {
select {
case c <- obj:
return true
default:
return false
}
}

// try to receive from a channel, return false if nothing received
func TryRecv[T any](c <-chan T) (receivedObject T, received bool) {
select {
case receivedObject = <-c:
received = true
default:
received = false
}
return
}

// Wait for all errors from a channel of errors.
// errors are returned as []error, a slice of errors
// If there are no errors, the slice will be nil
Expand Down Expand Up @@ -105,7 +63,7 @@ func QueueWorkers[T any](n int, queue <-chan T, fn func(T) error) <-chan error {

distributeWork := func(work T) {
for _, c := range chans {
if worked := TrySend(c, work); worked {
if worked := concurrent.TrySend(c, work); worked {
return
}
}
Expand Down Expand Up @@ -154,23 +112,23 @@ func QueueWorkers[T any](n int, queue <-chan T, fn func(T) error) <-chan error {
return sendErrorRecover(errChannel, recovery.Call(func() error {
unsentErrorsAll := make([][]error, n)
// run a concurrent worker for each channel
recoveredErrors := Concurrent(n, func(i int) error {
recoveredErrors := concurrent.GoN(n, func(i int) error {
// Attempt to send an error or recovered panic to the error channel.
// But if the error channel is blocked, don't wait:
// go ahead and process the work
// try sending the error again when done processing that work
var unsentErrors []error
for work := range chans[i] {
if err := recovery.Call(func() error { return fn(work) }); err != nil {
if sent := TrySend(errChannel, err); !sent {
if sent := concurrent.TrySend(errChannel, err); !sent {
unsentErrors = append(unsentErrors, err)
}
}
}
// Try sending unsent errors again
// Don't block though so the Go routine can be shutdown
for unsentI, err := range unsentErrors {
if sent := TrySend(errChannel, err); sent {
if sent := concurrent.TrySend(errChannel, err); sent {
unsentErrors[unsentI] = nil
}
}
Expand Down Expand Up @@ -258,7 +216,7 @@ func ArrayWorkers1[T any](nParallel int, objects []T, cancel <-chan struct{}, fn
defer close(queue)

for i, object := range objects {
if _, canceled := TryRecv(cancel); canceled {
if _, canceled := concurrent.TryRecv(cancel); canceled {
break
}
queue <- withIndex[T]{Index: i, val: object}
Expand All @@ -273,7 +231,7 @@ func ArrayWorkers1[T any](nParallel int, objects []T, cancel <-chan struct{}, fn

// queue and errChannel are not closed even though errQueueWorkers is?
errQueueWorkers := QueueWorkers(nParallel, queue, withIndexFn)
return ChannelMerge(errQueueWorkers, errChannel)
return concurrent.ChannelMerge(errQueueWorkers, errChannel)
}

// If the length is too small, decrease the batch size
Expand Down Expand Up @@ -301,7 +259,7 @@ func BatchWorkers[T any](bw BatchWork, objects []T, worker func([]T) error) []er
bw.Cancel = make(chan struct{})
}
queue, errBatched := BatchedChannel(bw, objects)
errors := ChannelMerge(QueueWorkers(bw.Parallelism, queue, worker), errBatched)
errors := concurrent.ChannelMerge(QueueWorkers(bw.Parallelism, queue, worker), errBatched)
return CancelAfterFirstError(bw.Cancel, errors)
}

Expand Down Expand Up @@ -330,7 +288,7 @@ func BatchedChannel[T any](bw BatchWork, objects []T) (<-chan []T, <-chan error)
tryCancel := func() bool { return false }
if bw.Cancel != nil {
tryCancel = func() bool {
_, canceled := TryRecv(bw.Cancel)
_, canceled := concurrent.TryRecv(bw.Cancel)
return canceled
}
}
Expand Down Expand Up @@ -360,7 +318,7 @@ func MapBatches[T any, U any](nParallel int, objects []T, fn func(T) (U, error))
total := len(objects)
out := make([]U, total)
batchSize := total / nParallel
err := Concurrent(nParallel, func(n int) (err error) {
err := concurrent.GoN(nParallel, func(n int) (err error) {
max := (n + 1) * batchSize
// Because we do integer division there is an extra remainder
// Add it onto the end
Expand All @@ -386,50 +344,3 @@ func MapBatches[T any, U any](nParallel int, objects []T, fn func(T) (U, error))
})
return out, err
}

// This is the same as errors.Join but does not wrap the array
func removeNilErrors(errs ...error) []error {
n := 0
for _, err := range errs {
if err != nil {
n++
}
}
if n == 0 {
return nil
}
newErrs := make([]error, 0, n)
for _, err := range errs {
if err != nil {
newErrs = append(newErrs, err)
}
}
return newErrs
}

// From this article: https://go.dev/blog/pipelines
func ChannelMerge[T any](cs ...<-chan T) <-chan T {
var wg sync.WaitGroup
out := make(chan T)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan T) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
Loading

0 comments on commit eff0dff

Please sign in to comment.