From ba776bfd3b4779b15e7c39ee7798cd9fca58e082 Mon Sep 17 00:00:00 2001 From: Jaroslavs Samcuks Date: Wed, 9 Sep 2020 21:45:47 +0200 Subject: [PATCH] Back to using semaphores removed by #126. This solution is a bit more idiomatic and resonates with "Don't communicate by sharing memory, share memory by communicating." Go proverb[^1]. Similar approach to use buffered channel as a semaphore to limit throughput is discussed in Effective Go[^2]. [^1]: https://go-proverbs.github.io/ [^2]: https://golang.org/doc/effective_go.html#channels --- goprocess/gp.go | 54 +++++++++++++++++-------------------------------- 1 file changed, 19 insertions(+), 35 deletions(-) diff --git a/goprocess/gp.go b/goprocess/gp.go index 8a53d945..60c68737 100644 --- a/goprocess/gp.go +++ b/goprocess/gp.go @@ -7,7 +7,6 @@ package goprocess import ( "os" - "sync" goversion "rsc.io/goversion/version" @@ -39,51 +38,36 @@ func FindAll() []P { type isGoFunc func(ps.Process) (path, version string, agent, ok bool, err error) func findAll(pss []ps.Process, isGo isGoFunc, concurrencyLimit int) []P { - input := make(chan ps.Process, len(pss)) - output := make(chan P, len(pss)) - - for _, ps := range pss { - input <- ps - } - close(input) - - var wg sync.WaitGroup - wg.Add(concurrencyLimit) // used to wait for workers to be finished - - // Run concurrencyLimit of workers until there - // is no more processes to be checked in the input channel. - for i := 0; i < concurrencyLimit; i++ { + output := make(chan []P, 1) + output <- nil + // Using buffered channel as a semaphore to limit throughput. + // See https://golang.org/doc/effective_go.html#channels + type token struct{} + sem := make(chan token, concurrencyLimit) + for _, pr := range pss { + sem <- token{} + pr := pr go func() { - defer wg.Done() - - for pr := range input { - path, version, agent, ok, err := isGo(pr) - if err != nil { - // TODO(jbd): Return a list of errors. - continue - } - if !ok { - continue - } - output <- P{ + defer func() { <-sem }() + if path, version, agent, ok, err := isGo(pr); err != nil { + // TODO(jbd): Return a list of errors. + } else if ok { + output <- append(<-output, P{ PID: pr.Pid(), PPID: pr.PPid(), Exec: pr.Executable(), Path: path, BuildVersion: version, Agent: agent, - } + }) } }() } - wg.Wait() // wait until all workers are finished - close(output) // no more results to be waited for - - var results []P - for p := range output { - results = append(results, p) + // Acquire all semaphore slots to wait for work to complete. + for n := cap(sem); n > 0; n-- { + sem <- token{} } - return results + return <-output } // Find finds info about the process identified with the given PID.