[go-nuts] Idiomatic distribute-process-collect pattern

253 views
Skip to first unread message

Tobias Klausmann

unread,
May 20, 2023, 4:23:24 AM5/20/23
to golan...@googlegroups.com
Hi!

I find myself often writing tools that follow the pattern in the
subject, that is:

- Gather some input to process, e.g. a list of filenames from the
command line. If this can be streamed (processing starting before
knowing all inputs) even better, but not strictly necessary.
- Start N workers that process several of the inputs. They receive them
through an appropriate channel, e.g. `chan string` for filenames. The
results of the processing are put into another channel, typically some
custom struct. When the input channel is closed the worker exits.
- After starting all the workers, the main goroutine collects the
results from the results channel (and accumulates them in a slice if
they need to be sorted, or just prints them as needed etc).

As code (I've omitted a few things for brevity):

func process(filenames []string) {
results := make([]imageInfo, 0, len(filenames))
wc := make(chan string) // work
rc := make(chan imageInfo) // results
numprocs := maxParallelism()
for i := 0; i <= numprocs; i++ {
go worker(wc, rc)
}
go func(fns []string) {
for _, fn := range fns {
wc <- fn
}
close(wc)
}(filenames)
for i := 0; i < len(filenames); i++ {
results = append(results, <-rc)
}
sort.Slice(results, func(x, y int) bool { return results[x].name < results[y].name })
for _, r := range results {
fmt.Printf("%s %d %d %.2f %t %t\n", r.name, r.width, r.height)
}
}

func worker(wc chan string, rc chan imageInfo) {
for fn := range wc {
// mustLoadImage just wraps functions from image/png etc
rc <- mustLoadImage(fn)
}
}

What I wonder about is the collection stage: the assumption that there
will be exactly as many results as inputs seems brittle, and the only
real way to handle errors is to put them into the result struct and
examining them later.

I also usually prefer handling workers with a `waitgroup`, but that only
really works if there is no return channel to handle, either because
the workers do the output/final processing, or because results are
written to a global. The former is not always possible, and the latter
is ugly.

There is also the option of just making the result channel big enough
to hold all results, but that seems a bit ugly as well.

So what's the most idiomatic way of doing this?

Best,
Tobias

Andrew Harris

unread,
May 20, 2023, 6:15:20 AM5/20/23
to golang-nuts
For aggregating results, an alternative to blocking on outstanding payloads is blocking on outstanding workers. You might find this talk interesting, and it has a pretty clever solution for "outstanding workers": https://about.sourcegraph.com/blog/go/gophercon-2018-rethinking-classical-concurrency-patterns. Also, the part about eliminating idle workers I think is a motif that shows up in nice solutions to other problem or elaborations on this problem.

There's an x/ package along these lines: https://pkg.go.dev/golang.org/x/sync/semaphore
(also worth mentioning: https://pkg.go.dev/golang.org/x/sync/errgroup)

Ian Lance Taylor

unread,
May 20, 2023, 4:25:54 PM5/20/23
to golan...@googlegroups.com
On Sat, May 20, 2023 at 1:23 AM Tobias Klausmann
<klau...@schwarzvogel.de> wrote:
>
> What I wonder about is the collection stage: the assumption that there
> will be exactly as many results as inputs seems brittle, and the only
> real way to handle errors is to put them into the result struct and
> examining them later.
>
> I also usually prefer handling workers with a `waitgroup`, but that only
> really works if there is no return channel to handle, either because
> the workers do the output/final processing, or because results are
> written to a global. The former is not always possible, and the latter
> is ugly.
>
> There is also the option of just making the result channel big enough
> to hold all results, but that seems a bit ugly as well.
>
> So what's the most idiomatic way of doing this?

The pattern I tend to prefer is

rc := make(chan result, numprocs)
var wg sync.WaitGroup
for i := 0; i < numprocs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
sendResults(rc)
}()
}
go func() {
wg.Wait()
close(rc)
}()

This ensures that the channel is closed when all the results are sent.
Then you can collect the results using one or more goroutines with
"for result := range rc".

Note that for this kind of pattern it's usually a good idea to use a
buffered channel to hold the results, to avoid tight synchronization
between the senders and the processors. Of course sometimes you want
that tight synchronization, but usually you don't. The exact size of
the buffer doesn't matter too much.

You may also want to watch Bryan's talk at
https://www.youtube.com/watch?v=5zXAHh5tJqQ .

Ian

Tobias Klausmann

unread,
May 21, 2023, 10:38:31 AM5/21/23
to Ian Lance Taylor, golan...@googlegroups.com
Hi!

First off, thanks both Ian and Andrew!

On Sat, 20 May 2023, Ian Lance Taylor wrote:
> On Sat, May 20, 2023 at 1:23 AM Tobias Klausmann
> <klau...@schwarzvogel.de> wrote:
> > What I wonder about is the collection stage: the assumption that there
> > will be exactly as many results as inputs seems brittle, and the only
> > real way to handle errors is to put them into the result struct and
> > examining them later.
> >
> > I also usually prefer handling workers with a `waitgroup`, but that only
> > really works if there is no return channel to handle, either because
> > the workers do the output/final processing, or because results are
> > written to a global. The former is not always possible, and the latter
> > is ugly.
> >
> > There is also the option of just making the result channel big enough
> > to hold all results, but that seems a bit ugly as well.
> >
> > So what's the most idiomatic way of doing this?
>
> The pattern I tend to prefer is
>
> rc := make(chan result, numprocs)
> var wg sync.WaitGroup
> for i := 0; i < numprocs; i++ {
> wg.Add(1)
> go func() {
> defer wg.Done()
> sendResults(rc)
> }()
> }
> go func() {
> wg.Wait()
> close(rc)
> }()
> This ensures that the channel is closed when all the results are sent.
> Then you can collect the results using one or more goroutines with
> "for result := range rc".
>
> Note that for this kind of pattern it's usually a good idea to use a
> buffered channel to hold the results, to avoid tight synchronization
> between the senders and the processors. Of course sometimes you want
> that tight synchronization, but usually you don't. The exact size of
> the buffer doesn't matter too much.

I'm now using this approach, and my mind is much more at ease about the
robustness. As for the channel size, I've tried numproc, *2, *10 and
*100, and it all makes no practical difference time-wise, so I'll just stay
with numproc

> You may also want to watch Bryan's talk at
> https://www.youtube.com/watch?v=5zXAHh5tJqQ .

Started watching and tried the recipe outlined/on screen at 10:26, but
couldn't get it to work: the result-gathering loop reading from c would
do zero iterations. Not sure what was going on ther. But the above
solution is one I grok, so I'm staying with that :)


Best,
Tobias

Andrew Harris

unread,
May 21, 2023, 4:54:39 PM5/21/23
to golang-nuts
The recipe on 10:26 should work but it's definitely contrived - using Fetch looks strange to me, and the cancellation behavior of errgroup is another moving part that adds some overhead. If it's sufficient, Ian's solution groks better :)

Brian Candler

unread,
May 22, 2023, 3:01:00 AM5/22/23
to golang-nuts
Isn't Ian's solution basically the same, but using a Waitgroup instead of an Errgroup?

I think the Fetch() is just a placeholder for "this is where the implementation detail of the task would go"

Reply all
Reply to author
Forward
0 new messages