Simple worker pool in golnag

544 views
Skip to first unread message

Amarjeet Anand

unread,
Dec 26, 2019, 11:12:29 PM12/26/19
to golan...@googlegroups.com

Hi

I have to produce some task to kafka parallely. So I want to implement a simple worker group pattern in go.

Does the below code decent enough to take it to production?


var workerCount = runtime.NumCPU()*7 + 1

func WorkerPattern() {
    taskWg := &sync.WaitGroup{}
    taskWg.Add(2)

    autoCancelChan := make(chan string, workerCount*3) // *3, just to make enough room. workers will be slower anyways
    go produceTaskOfType1ToChan(taskWg, autoCancelChan)
    go produceTaskOfType2ToChan(taskWg, autoCancelChan)

    // start workers to push autoCancel to kafka
    workerWg := &sync.WaitGroup{}
    go kafkaProducerWorkers(autoCancelChan, workerWg)

    // wait to close autoCancelChan channel till all the task is written
    taskWg.Wait()
    close(autoCancelChan)

    // wait till all workers finish their task
    workerWg.Wait()

    fmt.Println("Done!!!")
}

func produceTaskOfType1ToChan(wg *sync.WaitGroup, autoCancelChan chan string) {
    defer wg.Done()
    // can produce random number of task on autoCancelChan
    autoCancelChan <- "task of type of 1"
}

func produceTaskOfType2ToChan(wg *sync.WaitGroup, autoCancelChan chan string) {
    defer wg.Done()
    // can produce random number of task on autoCancelChan
    autoCancelChan <- "task of type of 2"
}

func kafkaProducerWorkers(autoCancelChan chan string, workerWg *sync.WaitGroup) {
    workerWg.Add(workerCount)
    for i := 0; i < workerCount; i++ {
        go produceToKafka(autoCancelChan, workerWg)
    }
}

func produceToKafka(autoCancelChan chan string, workerWg *sync.WaitGroup) {
    defer workerWg.Done()

    // for loop will terminate once autoCancelChan is closed
    for autoCancel := range autoCancelChan {
        KafkaClient.PublishToKafkaTopic(autoCancel)
    }
}

Any improvement you can suggest to this code?



robert engels

unread,
Dec 26, 2019, 11:41:41 PM12/26/19
to Amarjeet Anand, golan...@googlegroups.com
Yes, the code doesn’t work :) - it will only ever produce 2 items - unless that was expected - even so, you want the N workers doing work, and probably a constant number sending to Kafka - but a lot depends on your “serial needs”. In your case you only have 2 workers producing work, and N senders - which is backwards to me.

I would also say that your variable names could be improved - as “autoCancelChan” isn’t really meaningful here, it is simple a chan used to send items to the Kafka senders (at least I think).

--
You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/CANFuhy8qBjooo_tB_gT0f%3DTE4DaOFqWL5SWwNghy%2BL-eV82KdA%40mail.gmail.com.

Ian Lance Taylor

unread,
Dec 27, 2019, 12:27:47 AM12/27/19
to Amarjeet Anand, golang-nuts
On Thu, Dec 26, 2019 at 8:12 PM Amarjeet Anand
<amarjeeta...@gmail.com> wrote:
>
> I have to produce some task to kafka parallely. So I want to implement a simple worker group pattern in go.

Bryan has a good talk on this general topic:

https://www.youtube.com/watch?v=5zXAHh5tJqQ&list=PL2ntRZ1ySWBdatAqf-2_125H4sGzaWngM&index=23&t=0s

https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view

Ian

Amarjeet Anand

unread,
Dec 27, 2019, 12:52:21 AM12/27/19
to robert engels, golan...@googlegroups.com
Hi Robert

Actually the code above is simplified to make it easy to understand.

Thanks for the suggestion on variable namings... Will improve that.

The scenario is like the producer functions(produceTaskOfType1ToChan() and produceTaskOfType2ToChan()) will produce a list of strings to the channel... like...

func produceTaskOfType2ToChan(wg *sync.WaitGroup, autoCancelChan chan string) {
    defer wg.Done()

    autoCancelIds := getAutoCancelIdsFromSource2()
    for autoCancelId := range autoCancelIds {
        autoCancelChan <- autoCancelId
    }
}

Now does this code makes some sense?

Amarjeet Anand

unread,
Dec 27, 2019, 12:53:03 AM12/27/19
to Ian Lance Taylor, golang-nuts
Thanks Ian for the resources.

Appreciate it a lot.

robert engels

unread,
Dec 27, 2019, 1:16:00 AM12/27/19
to Amarjeet Anand, golan...@googlegroups.com
I think it is more complex - or simpler :) - than that. A lot depends on the Kafka client - for example the sarama client recommends one client per producer/consumer, other clients may multiplex on the same client so having more than one consumer (sender) may not be beneficial if the IO is fully async acknowledged.

In general, you want to parallelize (add go routines) to the the portions that can be parallized (either because they benefit from additional cpu, scatter/gather IO (network requests or disk), or have independent/multiple destination output stages) - but you have to pay special attention to any “ordering” of events that may be required on the consumer side, and the acknowledgements required on the Kafka side.

In your example, you are still creating 7xNCPU senders, and only 2 producers - which would mean that each send is completely independent and is a minimum 25x slower than the producing (given 8 cores x 7 / 2 producers). This could be the case, but seems unlikely.

Bruno Albuquerque

unread,
Dec 27, 2019, 11:30:48 AM12/27/19
to Amarjeet Anand, golang-nuts
This might be useful too you, in any case:



On Thu, Dec 26, 2019 at 8:12 PM Amarjeet Anand <amarjeeta...@gmail.com> wrote:

Brian Candler

unread,
Dec 28, 2019, 5:09:18 AM12/28/19
to golang-nuts
On Friday, 27 December 2019 16:30:48 UTC, Bruno Albuquerque wrote:
This might be useful too you, in any case:



I think the point from Bryan Mills' video is, "worker pool" is something of an anti-pattern in go.  goroutines are so cheap that you might as well start a goroutine for each piece of work you have to do, and let it terminate when that piece of work is done.

Apart from the startup cost, the other reason for having a "worker pool" is to limit the number of concurrent tasks being executed, and there are better ways of doing that in go (also shown in the video).

Chris Burkert

unread,
Dec 28, 2019, 7:32:08 AM12/28/19
to Brian Candler, golang-nuts
There are Pros and Cons for everything in life. Some time ago I wrote a database tool which does something per table where the runtime largely depends on the table size. I started with one goroutine per table because it was easy. But that put a high load on the database (the task was to limit the whole thing to about 10% of cores) with out of memory situations. So I switched to a worker pool which solved that. However now the overall runtime was unpredictable. When the large tables were handled at the end their runtime defined the overall runtime. So I to feed the pool with large tables first. This again lead to out of memory situations so I reordered the tables such that large tables are mixed with smaller tables.

--
You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.

Brian Candler

unread,
Dec 28, 2019, 7:55:20 AM12/28/19
to golang-nuts
Certainly it's important to limit the concurrency, and in your case you need to process the larger tasks first.

However, to me, the defining characteristic of a "worker pool" are:
1. workers are created before any tasks need to be done
2. the same worker handles multiple tasks sequentially
3. workers remain idle after tasks have been done

What you actually needed was just to limit the number of concurrent tasks, and you don't need a worker pool to do that.  You just need to ensure that when you launch a task, you block until the number of running concurrent tasks is below your limit.  For example, you can do that using a buffered channel of appropriate size.

The video is definitely worth watching.

Agniva De Sarker

unread,
Dec 28, 2019, 8:01:52 AM12/28/19
to golang-nuts
> (the task was to limit the whole thing to about 10% of cores)

I still don't think you needed a worker pool here. Like OP mentioned above, you could just limit the number of goroutines executed to 10% of total cores.

To unsubscribe from this group and stop receiving emails from it, send an email to golan...@googlegroups.com.

Robert Engels

unread,
Dec 28, 2019, 9:11:43 AM12/28/19
to Agniva De Sarker, golang-nuts
Spinning up a Go routine when for each piece of work may be idiomatic but it is far from the best practice for many situations - mainly because of cpu cache invalidation. Most HPC systems create worker pools by type and then isolate them to cpus/cores - something you can’t do in Go.

I believe there are outstanding proposals for grouping routines and core affinity. 

On Dec 28, 2019, at 7:02 AM, Agniva De Sarker <agniva.qu...@gmail.com> wrote:


To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/ecddafbc-eb73-45e1-8a5a-f738e88c6821%40googlegroups.com.

Ian Lance Taylor

unread,
Dec 28, 2019, 11:17:27 AM12/28/19
to Robert Engels, Agniva De Sarker, golang-nuts
On Sat, Dec 28, 2019 at 6:11 AM Robert Engels <ren...@ix.netcom.com> wrote:
>
> Spinning up a Go routine when for each piece of work may be idiomatic but it is far from the best practice for many situations - mainly because of cpu cache invalidation. Most HPC systems create worker pools by type and then isolate them to cpus/cores - something you can’t do in Go.
>
> I believe there are outstanding proposals for grouping routines and core affinity.

I think that today CPU cache invalidation is more or less independent
of spinning up separate goroutines. CPU cache invalidation is
something to consider for threads, but, as you say, goroutines have no
affinity to threads. Whether you use a worker pool or not, Go doesn't
currently give you any way to control how goroutines are assigned to
threads. So in general I don't see any reason to think that a worker
pool would be better or worse with regard to CPU cache invalidation.

If Go introduces some way to associate goroutines with threads and
some way to associate threads with CPUs, that might well have an
effect on whether it is better to use a worker pool.

Ian


> On Dec 28, 2019, at 7:02 AM, Agniva De Sarker <agniva.qu...@gmail.com> wrote:
>
> 
> > (the task was to limit the whole thing to about 10% of cores)
>
> I still don't think you needed a worker pool here. Like OP mentioned above, you could just limit the number of goroutines executed to 10% of total cores.
>
>
> On Saturday, 28 December 2019 18:02:08 UTC+5:30, Chris Burkert wrote:
>>
>> There are Pros and Cons for everything in life. Some time ago I wrote a database tool which does something per table where the runtime largely depends on the table size. I started with one goroutine per table because it was easy. But that put a high load on the database (the task was to limit the whole thing to about 10% of cores) with out of memory situations. So I switched to a worker pool which solved that. However now the overall runtime was unpredictable. When the large tables were handled at the end their runtime defined the overall runtime. So I to feed the pool with large tables first. This again lead to out of memory situations so I reordered the tables such that large tables are mixed with smaller tables.
>>
>> Brian Candler <b.ca...@pobox.com> schrieb am Sa. 28. Dez. 2019 um 11:09:
>>>
>>> On Friday, 27 December 2019 16:30:48 UTC, Bruno Albuquerque wrote:
>>>>
>>>> This might be useful too you, in any case:
>>>>
>>>> https://git.bug-br.org.br/bga/workerpool
>>>>
>>>
>>> I think the point from Bryan Mills' video is, "worker pool" is something of an anti-pattern in go. goroutines are so cheap that you might as well start a goroutine for each piece of work you have to do, and let it terminate when that piece of work is done.
>>>
>>> Apart from the startup cost, the other reason for having a "worker pool" is to limit the number of concurrent tasks being executed, and there are better ways of doing that in go (also shown in the video).
>>>
>>> --
>>> You received this message because you are subscribed to the Google Groups "golang-nuts" group.
>>> To unsubscribe from this group and stop receiving emails from it, send an email to golan...@googlegroups.com.
>>> To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/f840beee-748f-42b6-809f-4c7505208aee%40googlegroups.com.
>
> --
> You received this message because you are subscribed to the Google Groups "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/ecddafbc-eb73-45e1-8a5a-f738e88c6821%40googlegroups.com.
>
> --
> You received this message because you are subscribed to the Google Groups "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/8C1153D5-25E7-4A06-8626-1FE61D9015D5%40ix.netcom.com.

Michael Jones

unread,
Dec 28, 2019, 1:13:46 PM12/28/19
to Ian Lance Taylor, Robert Engels, Agniva De Sarker, golang-nuts
I often choose worker pools when there is working storage to be allocated as part of the work to be done. This way means that such data and processing structures are naturally built and torn down right there before and after the work-channel loop in the worker, with no repeated allocations during runtime, and no need for locked central data or data pools. It feels clean to me and stays clean even with work-stealing or worker-subdivision schemes.

Other ways make sense too, so not arguing for only one approach. Just saying that it works for me very well.

Michael



--
Michael T. Jones
michae...@gmail.com

Chris Burkert

unread,
Dec 28, 2019, 2:01:51 PM12/28/19
to Agniva De Sarker, golang-nuts
I needed a way to let the database complete a few tables (max 25) at a time without the need for the database to handle other tables in between. A worker pool which reads one table after the other does that perfectly. That also means that if all workers wait for the database then my whole program waits - which is intented. I didn't test it but I guess that one goroutine per table would burden the database to handle all tables at once. I asume that a goroutine which triggered the database and waits for it to complete is put aside by the runtime letting other goroutines also trigger the database. And I think then I will have the same out of memory (on the database side, not on the Go side). However I try to test the "one goroutine per table" approach ... maybe it improves my code and I learn something new.

Anyway, all I wanted to say was that there are situations where one approach fits more while it may be opposite in another situation. And I don't like the term "anti-pattern" as it lets you put ideas aside which may be worth thinking about.

To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/ecddafbc-eb73-45e1-8a5a-f738e88c6821%40googlegroups.com.

Jake Montgomery

unread,
Dec 28, 2019, 4:46:53 PM12/28/19
to golang-nuts
The high level design discussions are great. But I want to point out that the code, as written, has a race related bug.

When this line is called, the workerWg has a 0 count.
go kafkaProducerWorkers(autoCancelChan, workerWg)
It is entirely possible for the workerWg.Wait() line to be called be called before the kafkaProducerWorkers() goruotine has started, in which case the WorkerPattern funtion will return without any work haveing been done. See https://play.golang.org/p/p2_A3lBW3Dc for a simuation of this case. The Sleep() on line 52 simulates the slow starting goroutine.

Agniva De Sarker

unread,
Dec 28, 2019, 11:26:05 PM12/28/19
to Chris Burkert, golang-nuts
Perhaps I wasn't clear, but I am not suggesting to use one goroutine per table. You can limit the no. of goroutines running (say 25 at a time) by just spawning them and wait for the entire task to complete, without creating a worker pool of 25 goroutines. It is basically a counting semaphore with a buffered channel.

Robert Engels

unread,
Dec 29, 2019, 5:18:51 PM12/29/19
to Ian Lance Taylor, Agniva De Sarker, golang-nuts
I agree. I meant that worker pools are especially useful when you can do cpu affinity - doesn’t apply to Go.

I think Go probably needs some idea of “capping” for cpu based workloads. You can cap in the local N CPUs , but in a larger app that has multiple parallel processing points you can easy create too many routines and then you are paying overhead switching costs for no reason.

> On Dec 28, 2019, at 10:17 AM, Ian Lance Taylor <ia...@golang.org> wrote:
> To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/CAOyqgcW9ORpo4dcUe6HYk1VS8Jp6J0EDpWAUXG4dHuYoUtbV7w%40mail.gmail.com.

Brian Candler

unread,
Dec 30, 2019, 4:46:29 AM12/30/19
to golang-nuts
On Sunday, 29 December 2019 22:18:51 UTC, Robert Engels wrote:
I agree. I meant that worker pools are especially useful when you can do cpu affinity - doesn’t apply to Go.

I think Go probably needs some idea of “capping” for cpu based workloads. You can cap in the local N CPUs

By default go only runs N threads, where N is the number of CPUs
 
, but in a larger app that has multiple parallel processing points you can easy create too many routines and then you are paying overhead switching costs for no reason.



Which switching cost are you referring to?  The switching cost between goroutines? This is minimal, as it takes places within a single thread.  Or are you referring to cache invalidation issues?  Or something else?

Jesper Louis Andersen

unread,
Dec 30, 2019, 10:12:06 AM12/30/19
to Brian Candler, golang-nuts
On Mon, Dec 30, 2019 at 10:46 AM Brian Candler <b.ca...@pobox.com> wrote:
Which switching cost are you referring to?  The switching cost between goroutines? This is minimal, as it takes places within a single thread.  Or are you referring to cache invalidation issues?  Or something else?


It is the usual dichotomy between concurrency and parallelism. If your problem is concurrent, then it is usually the case that the switching cost is minimal and it is often far easier just to run thousands of goroutines on a much smaller set of cores. However, if your problem is parallel, you often have one task which is your goal, and you want it to finish as soon as possible[0]. Here, explicit control over the cores tend to be more efficient due to caches, memory bandwidth, latency hiding, etc. Processor affinity and pinning threads to processors is often important in this game. But unless you can gain a significant speedup by doing this explicitly, I wouldn't bother.

[0] There is another, sometimes better, way to look at the problem. Rather than being able to solve a problem N times faster, you can also see at as being able to solve a problem N times larger in the same time. This has the advantage that communication is less of a problem. When the problem size gets small enough, the overhead of multiple processing cores gets too large.


--
J.

Robert Engels

unread,
Dec 30, 2019, 10:44:11 AM12/30/19
to Jesper Louis Andersen, Brian Candler, golang-nuts
Right, but the overhead is not constant nor free. So if you parallelize the CPU bound  task into 100 segments and you only have 10 cores, the contention on the internal locking structures (scheduler, locks in channels) will be significant and the entire process will probably take far longer - working on a simple test case to demonstrate - so blindly spinning up Go routines will not be the best solution for some workloads. 

On Dec 30, 2019, at 9:11 AM, Jesper Louis Andersen <jesper.lou...@gmail.com> wrote:


--
You received this message because you are subscribed to the Google Groups "golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts...@googlegroups.com.

Robert Engels

unread,
Dec 30, 2019, 4:15:01 PM12/30/19
to Robert Engels, Jesper Louis Andersen, Brian Candler, golang-nuts
Here is a simple test that demonstrates the dynamics https://play.golang.org/p/6SZcxCEAfFp (cannot run in playground)

Notice that the call that uses an over allocated number of routines takes 5x longer wall time than the properly sized one - this is due to scheduling and contention on the underlying structures. So blindly creating go routines does not achieve optimum performance for many workloads (even when the number of OS threads is capped).

rengels@rengels:~/gotest$ go run main_bench.go 
2.261805812s
1.311269725s
6.341378965s


Robert Engels

unread,
Dec 30, 2019, 4:21:48 PM12/30/19
to Robert Engels, Jesper Louis Andersen, Brian Candler, golang-nuts
Also, if running on a machine with a low cpu count (gomaxprocs) you probably need to increase the 'total' multiplier (mine was 12).

Jesper Louis Andersen

unread,
Dec 30, 2019, 4:42:48 PM12/30/19
to Robert Engels, Brian Candler, golang-nuts
On Mon, Dec 30, 2019 at 10:14 PM Robert Engels <ren...@ix.netcom.com> wrote:
Here is a simple test that demonstrates the dynamics https://play.golang.org/p/6SZcxCEAfFp (cannot run in playground)

Notice that the call that uses an over allocated number of routines takes 5x longer wall time than the properly sized one - this is due to scheduling and contention on the underlying structures. So blindly creating go routines does not achieve optimum performance for many workloads (even when the number of OS threads is capped).

rengels@rengels:~/gotest$ go run main_bench.go 
2.261805812s
1.311269725s
6.341378965s


Yes, this is unsurprising since the problem in the program is parallel.

In a typical concurrent problem, most work is sitting and waiting for some event to happen and those events are usually measured in several milliseconds. For those problems, the switching cost is very low given the other benefits you gain. The key property there is rarely about speed, but about description of the problem to the machine.

Go definitely leans toward concurrent problems. If you want to target parallel problems, you have other options which are better, depending on factors of distribution, GPU availability, etc.


--
J.

Robert Engels

unread,
Dec 30, 2019, 4:47:34 PM12/30/19
to Robert Engels, Jesper Louis Andersen, Brian Candler, golang-nuts
One thing this exercise also reminded me of, is that using Go for any sort of "real time signal processing" is going to be very difficult - maybe if you lock the event handling routine to a thread, then use native code to change the thread priority - not sure how that would interact with the Go scheduler/GC, otherwise the "handler" is the same priority as all other Go routines, which makes it critical to not over-allocate the total number of routines running (vs available cores).

Probably to do this properly you need buffered hardware and kernel support / native code (but signal handling is a catch all, could be reading from a high-priority socket, etc.)

-----Original Message-----
From: Robert Engels
Sent: Dec 30, 2019 3:21 PM
To: Robert Engels , Jesper Louis Andersen
Cc: Brian Candler , golang-nuts
Subject: Re: [go-nuts] Simple worker pool in golnag

Also, if running on a machine with a low cpu count (gomaxprocs) you probably need to increase the 'total' multiplier (mine was 12).

-----Original Message-----
From: Robert Engels
Sent: Dec 30, 2019 3:14 PM
To: Robert Engels , Jesper Louis Andersen
Cc: Brian Candler , golang-nuts
Subject: Re: [go-nuts] Simple worker pool in golnag

Here is a simple test that demonstrates the dynamics https://play.golang.org/p/6SZcxCEAfFp (cannot run in playground)

Notice that the call that uses an over allocated number of routines takes 5x longer wall time than the properly sized one - this is due to scheduling and contention on the underlying structures. So blindly creating go routines does not achieve optimum performance for many workloads (even when the number of OS threads is capped).

rengels@rengels:~/gotest$ go run main_bench.go 
2.261805812s
1.311269725s
6.341378965s


Robert Engels

unread,
Dec 30, 2019, 4:51:41 PM12/30/19
to Jesper Louis Andersen, Brian Candler, golang-nuts
I think that distinction is splitting hairs a bit in the case of Go - usually when you speak of concurrent you are talking about completely disparate processes (in the context of an OS). A typically Go server might be handling many types of client requests but it can easily be viewed as it is parallelizing the "all the clients work" - the Go program runs concurrently with other OS programs, but is parallelizing its own work.

-----Original Message-----
From: Jesper Louis Andersen
Sent: Dec 30, 2019 3:41 PM
To: Robert Engels
Cc: Brian Candler , golang-nuts
Subject: Re: [go-nuts] Simple worker pool in golnag

On Mon, Dec 30, 2019 at 10:14 PM Robert Engels <ren...@ix.netcom.com> wrote:
Here is a simple test that demonstrates the dynamics https://play.golang.org/p/6SZcxCEAfFp (cannot run in playground)

Notice that the call that uses an over allocated number of routines takes 5x longer wall time than the properly sized one - this is due to scheduling and contention on the underlying structures. So blindly creating go routines does not achieve optimum performance for many workloads (even when the number of OS threads is capped).

rengels@rengels:~/gotest$ go run main_bench.go 
2.261805812s
1.311269725s
6.341378965s


Yes, this is unsurprising since the problem in the program is parallel.

In a typical concurrent problem, most work is sitting and waiting for some event to happen and those events are usually measured in several milliseconds. For those problems, the switching cost is very low given the other benefits you gain. The key property there is rarely about speed, but about description of the problem to the machine.

Go definitely leans toward concurrent problems. If you want to target parallel problems, you have other options which are better, depending on factors of distribution, GPU availability, etc.


--
J.



Reply all
Reply to author
Forward
0 new messages