Skip to content

Commit

Permalink
Merge pull request #6 from gregwebs/docs-export-Group
Browse files Browse the repository at this point in the history
improve GoDoc. Export Group
  • Loading branch information
gregwebs authored Dec 16, 2024
2 parents 005dd04 + 045e7c7 commit 0bf3281
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 36 deletions.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

Go library to run code concurrently

For full documentation see the [GoDoc](https://pkg.go.dev/github.com/gregwebs/go-concurrent).

## Running multiple Go routines concurrency

* GoN - run N go routines concurrently
* GoEach - run a go routine for each array element
* NewGroupContext - Similar to x/sync/errgroup but catches panics and returns all errors
* Group - Similar to x/sync/errgroup but catches panics and returns all errors

It is possible to instrument how the go routines are launched or launch them in serial for debugging.
See:
Expand All @@ -15,11 +17,11 @@ See:
* GoRoutine - create your own go routine launcher
* GoRoutine.GoN(...)
* GoEachRoutine(...)(GoRoutine)
* group.SetGoRoutine(GoRoutine)
* Group.SetGoRoutine(GoRoutine)

## Concurrency helpers
## General concurrency helpers exposed

* UnboundedChan
* ChannelMerge
* TrySend
* TryRecv
* TryRecv
49 changes: 31 additions & 18 deletions concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,47 @@ import (
"github.com/gregwebs/go-recovery"
)

// Concurrent spawns n go routines each of which runs the given function.
// A panic in the given function is recovered and converted to an error.
// errors are returned as []error, a slice of errors
// If there are no errors, the slice will be nil.
// To combine the errors as a single error, use errors.Join.
// GoN runs a function in parallel multiple times using n goroutines.
//
// It recovers any panics that occur during the execution of the function
// and returns them as a slice of errors. If no errors occurred, nil will be returned.
//
// Use [errors.Join] to combine the individual errors into a single error.
func GoN(n int, fn func(int) error) []error {
return GoConcurrent().GoN(n, fn)
}

// GoEach runs a go routine for each item in an Array.
// It is a convenient generic wrapper around GoN.
// A panic in the given function is recovered and converted to an error.
// errors are returned as []error, a slice of errors.
// If there are no errors, the slice will be nil.
// To combine the errors as a single error, use errors.Join.
// It is a convenient generic wrapper around [GoN].
//
// It recovers any panics that occur during the execution of the function
// and returns them as a slice of errors. If no errors occurred, nil will be returned.
//
// Use [errors.Join] to combine the individual errors into a single error.
func GoEach[T any](all []T, fn func(T) error) []error {
return GoN(len(all), func(n int) error {
item := all[n]
return fn(item)
})
}

// [GoConcurrent] is the default implementation for launching a routine.
// It just uses the `go` keyword.
func GoConcurrent() GoRoutine {
return GoRoutine(func(work func()) { go work() })
}

// [GoSerial] allows for running in serial for debugging
func GoSerial() GoRoutine {
return GoRoutine(func(work func()) { work() })
}

// GoRoutine allows for inserting hooks before launching Go routines
// GoSerial() allows for running in serial for debugging
// [GoConcurrent] is the default implementation.
// [GoSerial] allows for running in serial for debugging
type GoRoutine func(func())

// The same as [GoN] but with go routine launching configured by a GoRoutine.
func (gr GoRoutine) GoN(n int, fn func(int) error) []error {
errs := make([]error, n)
var wg sync.WaitGroup
Expand All @@ -59,9 +66,10 @@ func (gr GoRoutine) GoN(n int, fn func(int) error) []error {
return errors.Joins(errs...)
}

// GoEach but with a configurable GoRoutine.
// GoEach uses generics, so it cannot be called directly as a method.
// Instead, apply the GoEach arguments first, than apply the GoRoutine to the resulting function.
// The same as [GoEach] but with go routine launching configured by a GoRoutine.
//
// [GoEach] uses generics, so it cannot be called directly as a method.
// Instead, apply the [GoEach] arguments first, than apply the [GoRoutine] to the resulting function.
func GoEachRoutine[T any](all []T, work func(T) error) func(gr GoRoutine) []error {
return func(gr GoRoutine) []error {
return gr.GoN(len(all), func(n int) error {
Expand All @@ -71,8 +79,10 @@ func GoEachRoutine[T any](all []T, work func(T) error) func(gr GoRoutine) []erro
}
}

// Merge multiple channels together.
// From this article: https://go.dev/blog/pipelines
// ChannelMerge merges multiple channels together.
// See the article [Go Concurrency Patterns].
//
// [Go Concurrency Patterns]: https://go.dev/blog/pipelines
func ChannelMerge[T any](cs ...<-chan T) <-chan T {
var wg sync.WaitGroup
out := make(chan T)
Expand All @@ -99,7 +109,8 @@ func ChannelMerge[T any](cs ...<-chan T) <-chan T {
return out
}

// try to receive from a channel, return false if nothing received
// TryRecv preforms a non-blocking receive from a channel.
// It returns false if nothing received.
func TryRecv[T any](c <-chan T) (receivedObject T, received bool) {
select {
case receivedObject = <-c:
Expand All @@ -110,7 +121,8 @@ func TryRecv[T any](c <-chan T) (receivedObject T, received bool) {
return
}

// try to send to a channel, return true if sent, false if not
// TrySend performs a non-blocking send to a channel.
// It returns true if sent, false if not
func TrySend[T any](c chan<- T, obj T) bool {
select {
case c <- obj:
Expand Down Expand Up @@ -138,6 +150,7 @@ func (uc UnboundedChan[T]) Drain() []T {
return uc.sliceT
}

// NewUnboundedChan create an UnboundedChan that transfers its contents into an unbounded slice
func NewUnboundedChan[T any]() UnboundedChan[T] {
chanSize := 10
uc := UnboundedChan[T]{
Expand Down
34 changes: 20 additions & 14 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,22 @@ import (

type token struct{}

// Similar to x/sync/errgroup. Differences:
// * Wait() will return a slice of all errors encountered.
// * panics in the functions that are ran are recovered and converted to errors.
// Must be constructed with NewGroupContext
type group struct {
// Group is similar to [x/sync/errgroup].
// Improvements:
// - Wait() will return a slice of all errors encountered.
// - panics in the functions that are ran are recovered and converted to errors.
// - Go routine launching can be configured with [*Group.SetGoRoutine]
//
// Must be constructed with [NewGroupContext]
type Group struct {
errChan UnboundedChan[error]
wg sync.WaitGroup
cancel func(error)
sem chan token
goRoutine GoRoutine
}

func (g *group) do(fn func() error) {
func (g *Group) do(fn func() error) {
g.wg.Add(1)
go recovery.GoHandler(func(err error) { g.errChan.Send(err) }, func() error {
defer g.done()
Expand All @@ -71,7 +74,7 @@ func (g *group) do(fn func() error) {
})
}

func (g *group) done() {
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
Expand All @@ -81,7 +84,7 @@ func (g *group) done() {
// Wait waits for any outstanding go routines and returns their errors
// If go routines are started during this Wait,
// their errors might not show up until the next Wait
func (g *group) Wait() []error {
func (g *Group) Wait() []error {
g.wg.Wait()
prevErrChan := g.errChan
g.errChan = NewUnboundedChan[error]()
Expand All @@ -92,27 +95,30 @@ func (g *group) Wait() []error {
return errors.Joins(errs...)
}

func NewGroupContext(ctx context.Context) (*group, context.Context) {
// NewGroupContext constructs a [Group] similar to [x/sync/errgroup] but with aenhancements.
// See [Group].
func NewGroupContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancelCause(ctx)
return &group{
return &Group{
cancel: cancel,
errChan: NewUnboundedChan[error](),
goRoutine: GoConcurrent(),
}, ctx
}

func (g *group) SetGoRoutine(gr GoRoutine) {
// SetGoRoutine allows configuring how go routines are launched
func (g *Group) SetGoRoutine(gr GoRoutine) {
g.goRoutine = gr
}

func (g *group) Go(fn func() error) {
func (g *Group) Go(fn func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.do(fn)
}

func (g *group) TryGo(fn func() error) bool {
func (g *Group) TryGo(fn func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}:
Expand All @@ -125,7 +131,7 @@ func (g *group) TryGo(fn func() error) bool {
return true
}

func (g *group) SetLimit(n int) {
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
Expand Down

0 comments on commit 0bf3281

Please sign in to comment.