scaling list of channels

342 views
Skip to first unread message

omarsharif...@gmail.com

unread,
Sep 18, 2014, 9:34:53 AM9/18/14
to golan...@googlegroups.com
I've implemented a scalable list of workers. The idea is that I want to be able to scale up and down (depending on the load across the rest of a system as a whole) the number of goroutine workers. The current version I have uses a slice of channels. Each channel is a kill channel, which is passed to the worker go routine. With that I can gracefully terminate the unneeded workers when scaling down, as well as add additional workers when more are needed.

I do this by creating a slice of kill channels, firing off the worker goroutines, then bulk append the kill channel slice to my master slice. When I want to scale down, I simply cut a slice off the end of the master slice, then iterate over the cut slice to send the kill signal to my workers.


My question is this: is this a good design? Is there anything I've missed? Could this be simpler? Any other observations?

Cheers!
Ben

Kevin Malachowski

unread,
Sep 18, 2014, 10:43:47 AM9/18/14
to golan...@googlegroups.com, omarsharif...@gmail.com
Instead of using `chan struct{}` everywhere, you could `type KillChan chan struct{}` so that you can just say `KillChan` (or whatever you want). This has the positive of still being able to use variables of type KillChan in selects/recvs/sends/etc. You could even give KillChan the methods "Check" and "Kill" to do the select or the sending of the kill signal if you didn't want to leak your implementation into code that uses it. Also, when doing kill signals it is customary to close the chan rather than send the `struct{}{}` if you don't plan on reusing the channel anyway.

I also believe that you may have made your "cut" method a little to generic: you wrote code that supports cutting the slice in any arbitrary location. This is good, but then you never actually use it to do anything but slice off the right side of the slice and keep everything on the left intact. I would personally remove the cut method entirely and just do the task of copying the right side of the slice into a new slice to return and shortening the slice saved in the List. It simplifies your code to what it is (a stack).

Also, how do you plan on passing work for these workers? I suppose you code receive on a `chan func()` that is shared among all workers in the select with the kill signal.

omarsharif...@gmail.com

unread,
Sep 18, 2014, 12:02:35 PM9/18/14
to golan...@googlegroups.com, omarsharif...@gmail.com


On Thursday, September 18, 2014 3:43:47 PM UTC+1, Kevin Malachowski wrote:
Instead of using `chan struct{}` everywhere, you could `type KillChan chan struct{}` so that you can just say `KillChan` (or whatever you want). This has the positive of still being able to use variables of type KillChan in selects/recvs/sends/etc.
Gah! I actually had that but I though I'd simplify things for the message board :)
 
You could even give KillChan the methods "Check" and "Kill" to do the select or the sending of the kill signal if you didn't want to leak your implementation into code that uses it.
Thats a great idea, I'll investigate that this evening!
 
Also, when doing kill signals it is customary to close the chan rather than send the `struct{}{}` if you don't plan on reusing the channel anyway.
Nice one, much cleaner too. I'll implement that too! 

 
I also believe that you may have made your "cut" method a little to generic: you wrote code that supports cutting the slice in any arbitrary location. This is good, but then you never actually use it to do anything but slice off the right side of the slice and keep everything on the left intact. I would personally remove the cut method entirely and just do the task of copying the right side of the slice into a new slice to return and shortening the slice saved in the List. It simplifies your code to what it is (a stack).
Great insight. It was a direct port of the cut example from the "slices tips and tricks" wiki page. Now I look at it you are quite right, there is no need for it to be so generic; it would greatly simplify things to be specific to the needs of this. Also, STACK! Thank you! I've been trying use the right name for this for days and finally settled on list, even though I knew it wasn't the right name for it!
 

Also, how do you plan on passing work for these workers? I suppose you code receive on a `chan func()` that is shared among all workers in the select with the kill signal.
For this particular project each worker will have its own connection to a beanstalkd server and will be pulling jobs from that (as well as a shared memcached connection to ensure I don't do work for duplicate jobs). Eventually this will serve as the core of a tool that will pull work from beanstlkd but in order to control costs I want to be able to scale up and down the workersbased on other metrics. But yes, for other similar structured projects using this approach, a shared work channel would be passed to each worker. 

Thanks a lot Kevin, lots of great insight there!

Cheers,
Ben
 

Dmitry Vyukov

unread,
Sep 18, 2014, 12:21:58 PM9/18/14
to omarsharif...@gmail.com, golang-nuts
How many goroutines do you have at max? Goroutines are not that
expensive. So if you have a hundred of idle goroutines, you can just
preserve them till next spike.
> --
> You received this message because you are subscribed to the Google Groups
> "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to golang-nuts...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

omarsharif...@gmail.com

unread,
Sep 18, 2014, 12:24:06 PM9/18/14
to golan...@googlegroups.com, omarsharif...@gmail.com

On Thursday, September 18, 2014 5:21:58 PM UTC+1, Dmitry Vyukov wrote:
How many goroutines do you have at max? Goroutines are not that
expensive. So if you have a hundred of idle goroutines, you can just
preserve them till next spike.
Thats true, but each goroutine will have an open beanstalkd connection. I suppose I could "prefork" them like you say and just modify the killChan so that it disconnects from the beanstalkd server rather than kill the goroutine... interesting.

Konstantin Khomoutov

unread,
Sep 18, 2014, 12:46:42 PM9/18/14
to omarsharif...@gmail.com, golan...@googlegroups.com
On Thu, 18 Sep 2014 09:24:06 -0700 (PDT)
omarsharif...@gmail.com wrote:

> > How many goroutines do you have at max? Goroutines are not that
> > expensive. So if you have a hundred of idle goroutines, you can
> > just preserve them till next spike.
> >
> Thats true, but each goroutine will have an open beanstalkd
> connection. I suppose I could "prefork" them like you say and just
> modify the killChan so that it disconnects from the beanstalkd server
> rather than kill the goroutine... interesting.

May be you'd better implement an approach used by
database/sql.DB [1] which is actually a pool of connections so that
when a new one is requested, either a pooled one is returned or a new
created? You could then separately manage a number of goroutines to
keep around and a number of connections to cache.

http://golang.org/pkg/database/sql/#DB

Kyle Lemons

unread,
Sep 18, 2014, 12:59:18 PM9/18/14
to Ben Davies, golang-nuts
I think I'd take a somewhat more lightweight approach.

One option: Give every goroutine an ID, and before you check for more work from your central work channel, (atomically) check that your ID isn't above the "target goroutines" count.  If it is, exit, (atomically) decrementing a counter of how many are actually still running.  Probably racy, so:
another option: have a parallel (probably synchronous) channel to your (probably buffered) work channel into which you can push an appropriate number of contains "kill tokens."  Workers who pull a token from this channel ack the kill and exit.

On Thu, Sep 18, 2014 at 6:34 AM, <omarsharif...@gmail.com> wrote:

--

Dmitry Vyukov

unread,
Sep 18, 2014, 1:09:45 PM9/18/14
to omarshariffdontlikeit, golang-nuts
If the problem is only in the connection, then you can do something along the lines of:

func worker(c chan req) {
var c *conn
for {
select {
case r, ok := <-c:
if !ok {
return
}
if c == nil {
c = getConn()
}
process(r, c)
case <-time.After(X):
if c != nil {
closeConn(c)
c = nil
}
}
}
}



--

roger peppe

unread,
Sep 18, 2014, 1:34:30 PM9/18/14
to Ben Davies, golang-nuts
I'd be tempted to do something like this, assuming all
the workers are homogeneous.

http://play.golang.org/p/0IwMf1Gw2l

Dmitry Vyukov

unread,
Sep 18, 2014, 1:41:18 PM9/18/14
to roger peppe, Ben Davies, golang-nuts
I don't think you need separate stop/done chans for workers. You can
have a single buffered stop chan, just send N values to it to shutdown
N workers.

Bakul Shah

unread,
Sep 18, 2014, 3:53:01 PM9/18/14
to omarsharif...@gmail.com, golan...@googlegroups.com
Isn't beanstalkd communication far more expensive than
creating a goroutine or channel comm. within a Go program? Why
not have one set of goroutines to pull off work and then fork
off worker goroutines each with the work request and a chan
back to the puller (which communicates on its behalf over the
beankstalkd conn). You can only have one process safely
read/write on a tcp connection (no guarantee that writes from
two proceces on the same fd will be atomically done) hence the
indirection. But you can do the actual work in prallel. The
puller goroutines can self-adjust their number based on load.

roger peppe

unread,
Sep 18, 2014, 4:20:45 PM9/18/14
to Dmitry Vyukov, golang-nuts, OmarShariffDontLikeIt

Of course! Much simpler indeed.
http://play.golang.org/p/pBPYsDQ8VT
But more vulnerable to misbehaving workers too.

omarsharif...@gmail.com

unread,
Sep 19, 2014, 4:14:17 AM9/19/14
to golan...@googlegroups.com, omarsharif...@gmail.com


On Thursday, September 18, 2014 5:46:42 PM UTC+1, Konstantin Khomoutov wrote:
May be you'd better implement an approach used by
database/sql.DB [1] which is actually a pool of connections so that
when a new one is requested, either a pooled one is returned or a new
created?  You could then separately manage a number of goroutines to
keep around and a number of connections to cache.

http://golang.org/pkg/database/sql/#DB

That's true, but I tempted to not open say 100+ connections to beanstalk and manage the workers separately as I'd like to be able to view from the beanstalkd side the number of connections. Its much more transparent when 1 connection = 1 worker, though I do appreciate where your coming from. Dunno, I may have to explore this further, because as you mention, pools of database connections is a pattern used in loads of software so I suppose it isn't too alien a concept. This definitely demands further investigation, thanks! 

omarsharif...@gmail.com

unread,
Sep 19, 2014, 4:22:53 AM9/19/14
to golan...@googlegroups.com, omarsharif...@gmail.com


On Thursday, September 18, 2014 8:53:01 PM UTC+1, Bakul Shah wrote:

Isn't beanstalkd communication far more expensive than
creating a goroutine or channel comm. within a Go program? Why
not have one set of goroutines to pull off work and then fork
off worker goroutines each with the work request and a chan
back to the puller (which communicates on its behalf over the
beankstalkd conn).  You can only have one process safely
read/write on a tcp connection (no guarantee that writes from
two proceces on the same fd will be atomically done) hence the
indirection.  But you can do the actual work in prallel.  The
puller goroutines can self-adjust their number based on load.

This version is actually a rewrite of my second attempt which used this approach. It worked in exactly the method you describe, but I came up against one of the features of beanstalkd which is that reserving a job from beanstalkd will block until a job is available. This meant that if I pulled a load of jobs off beanstalkd and got my workers going processing the jobs if there were no further jobs in beanstalkd to reserve, the connection would block and I couldn't "put" or "delete" jobs off of beanstalkd when the workers completed. There is a feature where you can reserve a job with a timeout but I found that it was still problematic (though the actual structure of the program was very nice, with channels for put, delete, reserve and buries, all in one nice select). In the end I found that the program was essentially just polling beanstalkd constantly for jobs so I wanted to try a different approach that played more to beanstalkds strengths. It's obvious that beanstalkd was written with single process, non-threaded clients in mind as the blocking features is actually very useful in that context. We use beanstalkd for all sorts of tools at work and we work with it primarily in PHP. The two work very well together in that context.

I'll have a look again at the previous version but I'm pretty sure that it was a dead end (the other problem was that I couldn't use a pool of beanstalkd connections to alleviate this, as you cant interact with a job reserved on one connection and manipulate it in another connection; jobs are reserved to a connection, that's how its able to implement a part of its reserve with blocking feature) 

omarsharif...@gmail.com

unread,
Sep 19, 2014, 4:24:07 AM9/19/14
to golan...@googlegroups.com, omarsharif...@gmail.com
Thanks to everyone who has provided input so far, this has been a very helpful discussion with lots of alternative approaches provided and lot and lots of food for thought!

Cheers,
Ben

omarsharif...@gmail.com

unread,
Sep 19, 2014, 7:35:03 AM9/19/14
to golan...@googlegroups.com, omarsharif...@gmail.com
I've now implemented a lot of the changes recommended by everyone and the result is a much cleaner and simpler. I actually surprised at how elegant it is now; the whole thing is very obvious in its function. This primarily comes down to Kevins suggestion of making the worker channel a type with the methods hanging off that.

Anyway, here is the revised design:


Thanks again to everyone who contributed!

Cheers,
Ben

Kevin Malachowski

unread,
Sep 19, 2014, 10:29:14 AM9/19/14
to golan...@googlegroups.com, omarsharif...@gmail.com
Thanks for the recognition!

I have some more comments with your new code if you're interested:

-You need to lock the mutex in the Len method (it's a good general rule that if you guard data with a mutex you have to lock the mutex every time you access that data).

-In Inc you should preallocate the "temp" slice and then fill it using a range:

temp := make([]Worker, count)
for i := range temp {
   temp
[i] = make(Worker, 1)
}


This has the advantage of only having a single allocation as well as removes the need for the compiler to put bounds checks in for the slice (I don't know if gc does that already, but it definitely could and will one day).

-Your Dec method could probably use a cleanup:
     -you use `var [variable name] [variable type] = [expression]` where you could have just used `[variable name] := [expression]`
     -you use `len(s.stack)-count` three times. Why not local variable so it's easier for humans to parse?
     -you give the return param the name `r` and then never use it
     -you don't need to make a new slice `o` to save in s.stack. Instead, just set the indexes you are returning to nil and then use a slicing operation to "chop" off the right side of the slice. That is, after nil'ing you could do `s.stack = s.stack[:len(s.stack)-count]`. This prevents an unnecessary allocation.

-Is the use case of your Stack to always start the workers after calling Inc and always killing them after calling Dec? If so, why not put the starting/killing of goroutines inside the Inc/Dec methods? You can safe a bit of allocation in this case (because you won't have to `make` slices to return to the caller of Inc/Dec.

-You made your Stack so that it is safe to use on multiple goroutines but you didn't actually test to see if it works on multiple goroutines concurrently. If you tried it and then compiled your program with the "-race" flag then you would see the race condition in the Len method I mentioned earlier. When writing goroutine-safe you should always take care to test it this way, it will catch you when you make small (or big!) mistakes.

Bakul Shah

unread,
Sep 19, 2014, 3:14:38 PM9/19/14
to omarsharif...@gmail.com, golan...@googlegroups.com
Looking at the protocol spec I see what you mean....

But here's an idea: Since every request/response is prefixed
with a unique keyword, client/server communication can be
easily pipelined!

As an example, the following should be workable:

client: reserve
server: RESERVED <id1> <n1>\r\n<n1 bytes of data>\r\n
client: reserve-with-timeout 10\n\n
server: RESERVED <id2> <n2>\r\n<n2 bytes of data>\r\n
client: reserve-with-timeout 10\r\n
time passes
client: delete <id2>
server: DELETED <id2>
client: delete <id1>
server: DELETED <id1>
server: RESERVED <id3> <n3>\r\n<n3 bytes of data>\r\n
...

I mention this possible extension since it will require far
fewer client-server connections and achieve better performance
as now you can send followup communication after reserve on
the same connection without having to wait for a reserve to
complete. At present you have to have N connections if you
want to handle N jobs in parallel. Even just one connection
should be enough!

This does add a slight complication on the client/server side
libraries. But if each side is implemented with a little state
machine it should be pretty straightforward.

Ideally every protocol should exchange a version number
precisely to allow such evolution. This is needed here as well
because just from the sequence of messages the server can't
figure out if a client wants this new behaviror.

Chris Holland

unread,
Sep 24, 2014, 3:29:21 AM9/24/14
to golan...@googlegroups.com, omarsharif...@gmail.com
https://play.golang.org/p/f-44AQZBsT

Had a similar issue. I wanted scalable workers per some load factor so I did something like the above. You can push in various load factors and it scales up/down works without explicitly keeping them in a list. It just uses 1 negotiation channel to shutdown extra workers and a counter.

In the real solution I also had a smarter "feeder" that would cache arrays of things by ids, then it would trigger the workers which would pull out messages by random IDs. I needed the ID lists to maintain sequence order.
Reply all
Reply to author
Forward
0 new messages