Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve GoDoc. Export Group #6

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

Go library to run code concurrently

For full documentation see the [GoDoc](https://pkg.go.dev/github.com/gregwebs/go-concurrent).

## Running multiple Go routines concurrency

* GoN - run N go routines concurrently
* GoEach - run a go routine for each array element
* NewGroupContext - Similar to x/sync/errgroup but catches panics and returns all errors
* Group - Similar to x/sync/errgroup but catches panics and returns all errors

It is possible to instrument how the go routines are launched or launch them in serial for debugging.
See:
Expand All @@ -15,11 +17,11 @@ See:
* GoRoutine - create your own go routine launcher
* GoRoutine.GoN(...)
* GoEachRoutine(...)(GoRoutine)
* group.SetGoRoutine(GoRoutine)
* Group.SetGoRoutine(GoRoutine)

## Concurrency helpers
## General concurrency helpers exposed

* UnboundedChan
* ChannelMerge
* TrySend
* TryRecv
* TryRecv
49 changes: 31 additions & 18 deletions concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,47 @@ import (
"github.com/gregwebs/go-recovery"
)

// Concurrent spawns n go routines each of which runs the given function.
// A panic in the given function is recovered and converted to an error.
// errors are returned as []error, a slice of errors
// If there are no errors, the slice will be nil.
// To combine the errors as a single error, use errors.Join.
// GoN runs a function in parallel multiple times using n goroutines.
//
// It recovers any panics that occur during the execution of the function
// and returns them as a slice of errors. If no errors occurred, nil will be returned.
//
// Use [errors.Join] to combine the individual errors into a single error.
func GoN(n int, fn func(int) error) []error {
return GoConcurrent().GoN(n, fn)
}

// GoEach runs a go routine for each item in an Array.
// It is a convenient generic wrapper around GoN.
// A panic in the given function is recovered and converted to an error.
// errors are returned as []error, a slice of errors.
// If there are no errors, the slice will be nil.
// To combine the errors as a single error, use errors.Join.
// It is a convenient generic wrapper around [GoN].
//
// It recovers any panics that occur during the execution of the function
// and returns them as a slice of errors. If no errors occurred, nil will be returned.
//
// Use [errors.Join] to combine the individual errors into a single error.
func GoEach[T any](all []T, fn func(T) error) []error {
return GoN(len(all), func(n int) error {
item := all[n]
return fn(item)
})
}

// [GoConcurrent] is the default implementation for launching a routine.
// It just uses the `go` keyword.
func GoConcurrent() GoRoutine {
return GoRoutine(func(work func()) { go work() })
}

// [GoSerial] allows for running in serial for debugging
func GoSerial() GoRoutine {
return GoRoutine(func(work func()) { work() })
}

// GoRoutine allows for inserting hooks before launching Go routines
// GoSerial() allows for running in serial for debugging
// [GoConcurrent] is the default implementation.
// [GoSerial] allows for running in serial for debugging
type GoRoutine func(func())

// The same as [GoN] but with go routine launching configured by a GoRoutine.
func (gr GoRoutine) GoN(n int, fn func(int) error) []error {
errs := make([]error, n)
var wg sync.WaitGroup
Expand All @@ -59,9 +66,10 @@ func (gr GoRoutine) GoN(n int, fn func(int) error) []error {
return errors.Joins(errs...)
}

// GoEach but with a configurable GoRoutine.
// GoEach uses generics, so it cannot be called directly as a method.
// Instead, apply the GoEach arguments first, than apply the GoRoutine to the resulting function.
// The same as [GoEach] but with go routine launching configured by a GoRoutine.
//
// [GoEach] uses generics, so it cannot be called directly as a method.
// Instead, apply the [GoEach] arguments first, than apply the [GoRoutine] to the resulting function.
func GoEachRoutine[T any](all []T, work func(T) error) func(gr GoRoutine) []error {
return func(gr GoRoutine) []error {
return gr.GoN(len(all), func(n int) error {
Expand All @@ -71,8 +79,10 @@ func GoEachRoutine[T any](all []T, work func(T) error) func(gr GoRoutine) []erro
}
}

// Merge multiple channels together.
// From this article: https://go.dev/blog/pipelines
// ChannelMerge merges multiple channels together.
// See the article [Go Concurrency Patterns].
//
// [Go Concurrency Patterns]: https://go.dev/blog/pipelines
func ChannelMerge[T any](cs ...<-chan T) <-chan T {
var wg sync.WaitGroup
out := make(chan T)
Expand All @@ -99,7 +109,8 @@ func ChannelMerge[T any](cs ...<-chan T) <-chan T {
return out
}

// try to receive from a channel, return false if nothing received
// TryRecv preforms a non-blocking receive from a channel.
// It returns false if nothing received.
func TryRecv[T any](c <-chan T) (receivedObject T, received bool) {
select {
case receivedObject = <-c:
Expand All @@ -110,7 +121,8 @@ func TryRecv[T any](c <-chan T) (receivedObject T, received bool) {
return
}

// try to send to a channel, return true if sent, false if not
// TrySend performs a non-blocking send to a channel.
// It returns true if sent, false if not
func TrySend[T any](c chan<- T, obj T) bool {
select {
case c <- obj:
Expand Down Expand Up @@ -138,6 +150,7 @@ func (uc UnboundedChan[T]) Drain() []T {
return uc.sliceT
}

// NewUnboundedChan create an UnboundedChan that transfers its contents into an unbounded slice
func NewUnboundedChan[T any]() UnboundedChan[T] {
chanSize := 10
uc := UnboundedChan[T]{
Expand Down
34 changes: 20 additions & 14 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,22 @@ import (

type token struct{}

// Similar to x/sync/errgroup. Differences:
// * Wait() will return a slice of all errors encountered.
// * panics in the functions that are ran are recovered and converted to errors.
// Must be constructed with NewGroupContext
type group struct {
// Group is similar to [x/sync/errgroup].
// Improvements:
// - Wait() will return a slice of all errors encountered.
// - panics in the functions that are ran are recovered and converted to errors.
// - Go routine launching can be configured with [*Group.SetGoRoutine]
//
// Must be constructed with [NewGroupContext]
type Group struct {
errChan UnboundedChan[error]
wg sync.WaitGroup
cancel func(error)
sem chan token
goRoutine GoRoutine
}

func (g *group) do(fn func() error) {
func (g *Group) do(fn func() error) {
g.wg.Add(1)
go recovery.GoHandler(func(err error) { g.errChan.Send(err) }, func() error {
defer g.done()
Expand All @@ -71,7 +74,7 @@ func (g *group) do(fn func() error) {
})
}

func (g *group) done() {
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
Expand All @@ -81,7 +84,7 @@ func (g *group) done() {
// Wait waits for any outstanding go routines and returns their errors
// If go routines are started during this Wait,
// their errors might not show up until the next Wait
func (g *group) Wait() []error {
func (g *Group) Wait() []error {
g.wg.Wait()
prevErrChan := g.errChan
g.errChan = NewUnboundedChan[error]()
Expand All @@ -92,27 +95,30 @@ func (g *group) Wait() []error {
return errors.Joins(errs...)
}

func NewGroupContext(ctx context.Context) (*group, context.Context) {
// NewGroupContext constructs a [Group] similar to [x/sync/errgroup] but with aenhancements.
// See [Group].
func NewGroupContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancelCause(ctx)
return &group{
return &Group{
cancel: cancel,
errChan: NewUnboundedChan[error](),
goRoutine: GoConcurrent(),
}, ctx
}

func (g *group) SetGoRoutine(gr GoRoutine) {
// SetGoRoutine allows configuring how go routines are launched
func (g *Group) SetGoRoutine(gr GoRoutine) {
g.goRoutine = gr
}

func (g *group) Go(fn func() error) {
func (g *Group) Go(fn func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.do(fn)
}

func (g *group) TryGo(fn func() error) bool {
func (g *Group) TryGo(fn func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
Expand All @@ -125,7 +131,7 @@ func (g *group) TryGo(fn func() error) bool {
return true
}

func (g *group) SetLimit(n int) {
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
Expand Down
Loading