Non-blocking safe message brokers in go

1,014 views
Skip to first unread message

Alexey Borzenkov

unread,
May 17, 2012, 5:07:53 PM5/17/12
to golang-nuts
Hello,

Recently I've been trying to port my small notification system (which
I use for replacing polling when possible, e.g. when process is
disconnected from the notifier it polls for changes, when connected it
polls for a known state one last time and just waits for messages
instead, switching back to polling when disconnected for whatever
reason) server from Python+gevent (nothing fancy, it an extremely
simple piece of code, probably less than 100 lines of code) to Go
(just to get a proper feel for making that kind of service in go).

What I found, however, is that because go channels are either
unbuffered (always block), or buffered to a certain amount (block when
full), so very trivial implementations of such brokers pose a risk to
the servers. Let's suppose you have a list of subscriber channels, you
iterate over them and send a message to every one of them. No matter
how big a buffer size you chose (the bigger the size, the bigger the
waste), it's still theoretically possible for slow clients to grind
the whole system to a halt because one channel became full (e.g.
network got disconnected in a very bad way, connections left wide
open, timeouts relatively big), dropping messages is a violation of
the protocol, and disconnecting most of the clients just because
there's a sudden spike is equally bad. Basically, with trivial
channels of messages slow clients don't harm themselves, they harm
everyone else.

After some thought I managed to design it something like this:

https://gist.github.com/2714090

Basically, clients' inbox is a buffered channel with only 1 item, but
instead of storing individual messages, it stores a whole batch of
them. This way, when broker itself tries to take it, it either
succeeds and appends to it, or creates a new batch with one message,
and puts it back into the channel (which is guaranteed to be empty,
since broker is the only one writing to it). Finally broker never
blocks on sends, only on receives, almost like Queues in gevent. It
can even disconnect too slow clients after a certain time (instead of
abstract number of messages), saving memory on larger than necessary
buffers.

The question though, is how do others solve these kinds of problems in
Go? (obviously, if there have been channels with dynamic buffer size,
this wouldn't be a problem) My solution works, but it feels kind of
shaky, with restrictions like only the broker is ever allowed to write
to inbox channels, etc.

P.S. Another obvious solution of sending messages to other inboxes by
clients themselves wouldn't really work. Yes, slow receivers wouldn't
penalize the whole system anymore, but they would still penalize some
innocent senders, as they wouldn't be able to send until buffer is
empty enough on the other end again.

Kyle Lemons

unread,
May 17, 2012, 5:24:31 PM5/17/12
to Alexey Borzenkov, golang-nuts
I have one place where I use an "infinite queue" because I need both guaranteed nonblocking send and guaranteed ordering (with no dropped events) but can sacrifice some latency.  If you can drop events, the solution is simple: don't block and drop it.  If ordering is not a hard requirement the solution is to spawn a goroutine to complete the send if it would block.

If I were to make a guess, I would guess that you either don't care about ordering or can drop old events.  If one of these is the case, you can construct a much simpler solution than the one you presented (or the one I demonstrate in iq).

Kevin Ballard

unread,
May 17, 2012, 5:43:48 PM5/17/12
to Alexey Borzenkov, golang-nuts
One possibility is to take your solution and cram it into a goroutine. You have a channel with a buffer of 1 and you just write indiscriminately to it. You spawn off a goroutine that reads from this channel and uses your trick with a second buffered channel. So you have an internal channel that only has one writer (your goroutine), which lets it perform the trick, but you have a second public channel which accepts writes indiscriminately.

func makeChannels() (chan<- Message, <-chan []Message) {
primaryChan := make(chan Message, 1)
secondaryChan := make(chan []Message, 1)
go func(input <-chan Message, output chan []Message) {
for {
msg := <-input
var buffer []Message
select {
case buffer = <-output:
buffer = append(buffer, msg)
default:
buffer = []Message{msg}
}
output <- buffer
}
}(primaryChan, secondaryChan)
return primaryChan, secondaryChan
}

-Kevin

Alexey Borzenkov

unread,
May 17, 2012, 5:47:05 PM5/17/12
to Kyle Lemons, golang-nuts
On Fri, May 18, 2012 at 1:24 AM, Kyle Lemons <kev...@google.com> wrote:
> If I were to make a guess, I would guess that you either don't care about
> ordering or can drop old events.  If one of these is the case, you can
> construct a much simpler solution than the one you presented (or the one I
> demonstrate in iq).

Actually, it's the other way around, my messages cannot be dropped,
and their order (from a single sending client) must be preserved.
Isn't it what I did in my solution?

Alexey Borzenkov

unread,
May 17, 2012, 5:56:00 PM5/17/12
to Kevin Ballard, golang-nuts
On Fri, May 18, 2012 at 1:43 AM, Kevin Ballard <kbal...@gmail.com> wrote:
> One possibility is to take your solution and cram it into a goroutine. You
> have a channel with a buffer of 1 and you just write indiscriminately to it.
> You spawn off a goroutine that reads from this channel and uses your trick
> with a second buffered channel. So you have an internal channel that only
> has one writer (your goroutine), which lets it perform the trick, but you
> have a second public channel which accepts writes indiscriminately.
[...]

This is actually good, thanks a lot.

P.S. It's also possible to add another goroutine that "flattens" chan
[]Message back to chan Message, although I think there lies a danger
of a memory leak: we take the batch, start writing it to a flat
channel, but the receiver is already gone by that time, I don't think
that would ever be garbage collected...

Kyle Lemons

unread,
May 17, 2012, 6:09:27 PM5/17/12
to Alexey Borzenkov, golang-nuts
Just because it's what you did in your solution does not mean that it was a requirement for your particular use case :).  The iq code I linked is, in fact, both order-preserving (per sender) and "guaranteed" delivery.  The tradeoff for those is potentially high memory consumption when backed up and a small amount of latency even when the receiver is keeping up.

Alexey Borzenkov

unread,
May 17, 2012, 6:09:54 PM5/17/12
to Kevin Ballard, golang-nuts
On Fri, May 18, 2012 at 1:43 AM, Kevin Ballard <kbal...@gmail.com> wrote:
> One possibility is to take your solution and cram it into a goroutine. You
> have a channel with a buffer of 1 and you just write indiscriminately to it.
> You spawn off a goroutine that reads from this channel and uses your trick
> with a second buffered channel. So you have an internal channel that only
> has one writer (your goroutine), which lets it perform the trick, but you
> have a second public channel which accepts writes indiscriminately.

Something like this:

https://gist.github.com/2721869

But I still fear the second goroutine might leak by getting stuck on send.

Alexey Borzenkov

unread,
May 17, 2012, 6:15:15 PM5/17/12
to Kyle Lemons, golang-nuts
On Fri, May 18, 2012 at 2:09 AM, Kyle Lemons <kev...@google.com> wrote:
> Just because it's what you did in your solution does not mean that it was a
> requirement for your particular use case :).  The iq code I linked is, in
> fact, both order-preserving (per sender) and "guaranteed" delivery.  The
> tradeoff for those is potentially high memory consumption when backed up and
> a small amount of latency even when the receiver is keeping up.

Ah, sorry, I missed the link among the other text. You solution is
actually quite interesting, much better than mine (you multiplex both
send and receive in one go), thank you.

Kevin Ballard

unread,
May 17, 2012, 6:29:03 PM5/17/12
to Alexey Borzenkov, golang-nuts
Why the large buffers?

Anyway, at this point you may as well get rid of the intermediate channel and just do all the buffering inside of one goroutine.


-Kevin

Kevin Ballard

unread,
May 17, 2012, 6:33:04 PM5/17/12
to Alexey Borzenkov, golang-nuts
And now after finally reading that Iq link of Kyle's, this is exactly what he does, albeit with a subtly different implementation (and a bug where I don't flush my pending buffer after the input is closed, which Kyle does correctly).

-Kevin 

Dmitry Vyukov

unread,
May 18, 2012, 2:25:38 AM5/18/12
to golan...@googlegroups.com
No, it won't solve the problem.
If you have slow clients you can't really solve it on server. Think what will happen if you would use infinite queues. Slow clients will destroy the server in worse ways. First, the whole machine will stuck due to swapping, then the server will crash with no informative OOM. Go channels are good in the sense that they don't provide you an illusion that you can solve that just by plugging infinite queues. Think of a client that is actually deadlocked internally and won't going to read anything at all. You can't solve it on server, but you must deal with it gracefully on the server. Queue up to N messages per client, if the queue if full drop the connection. Think of it as if client has crashed, you need to deal with that anyway, crashed client won't get all the messages (I assume that you store messages only in memory).

In 1-to-1 proxies, one usually stops reading from sever when client get stuck. So that the overload gracefully propagates through the system. In a 1-to-N proxy, you don't want to do that because a slow client will penalize fast clients, so you need to drop the slow client. If most of the clients can't keep up with the stream, then you can as well stop reading from the server. Think of the bogus server that sends to you at infinite rate.

Alexey Borzenkov

unread,
May 20, 2012, 5:27:59 AM5/20/12
to Dmitry Vyukov, golan...@googlegroups.com
On Fri, May 18, 2012 at 10:25 AM, Dmitry Vyukov <dvy...@google.com> wrote:
> No, it won't solve the problem.
> If you have slow clients you can't really solve it on server. Think what
> will happen if you would use infinite queues. Slow clients will destroy the
> server in worse ways. First, the whole machine will stuck due to swapping,
> then the server will crash with no informative OOM. Go channels are good in
> the sense that they don't provide you an illusion that you can solve that
> just by plugging infinite queues. Think of a client that is actually
> deadlocked internally and won't going to read anything at all. You can't
> solve it on server, but you must deal with it gracefully on the server.
> Queue up to N messages per client, if the queue if full drop the connection.
> Think of it as if client has crashed, you need to deal with that anyway,
> crashed client won't get all the messages (I assume that you store messages
> only in memory).
>
> In 1-to-1 proxies, one usually stops reading from sever when client get
> stuck. So that the overload gracefully propagates through the system. In a
> 1-to-N proxy, you don't want to do that because a slow client will penalize
> fast clients, so you need to drop the slow client. If most of the clients
> can't keep up with the stream, then you can as well stop reading from the
> server. Think of the bogus server that sends to you at infinite rate.

The problem with "Queue up to N messages per client" is that N must be
enough to handle a peak throughput. For example, I have a very small
number of messages most of the time, but when processes are
starting/stopping there may be a burst of e.g. 10k+ messages. This
burst is occasional and very short, but does it mean I just drop every
single client during these bursts? Of course not, I would have to
increase buffers. But the bigger the buffer the more the waste per
client, it's always better to start small (just like goroutines use
segmented stack so they can start small). If I would have chosen 1mil
messages (just to be veeeery safe), then every client would waste
4-12MB on the server (depending of message struct size), just by the
act of connecting.

This doesn't mean that queues must be infinite, just elastic enough to
have a potential grow to infinity. You'd want to control the actual
maximum size of the buffers without allocating it upfront, you'd also
want to use time as a factor of disconnection. Unfortunately, channels
cannot be resized (last I checked their buffer is not a pointer, it's
inline after the channel structure itself), and hence they have to
allocate all their space upfront, which makes them less useful when
one wants a very large buffer size. Of course, if we could use
something like make(chan Message, 10, 1000000) to mean a channel of
size 10 that grows up to 1000000 then the question of infinite queues
would be (almost) moot. :)

Dmitry Vyukov

unread,
May 20, 2012, 7:40:51 AM5/20/12
to Alexey Borzenkov, golan...@googlegroups.com
I understand the problem, but I am still against anything infinite in asynchronous in-memory systems.

make(chan Message, 10, 1000000)
10 looks excessive here, just make(chan Message, 1e6) which does not allocate all memory upfront.

vel...@gmail.com

unread,
Oct 4, 2013, 12:59:10 PM10/4/13
to golan...@googlegroups.com
Good explanation and nice code snippets...

I assume you have a working application that handles the required task. If so, can we see the final code?

And as suggestion; have you considered or looked at Bitly's NSQ this seems to do the trick and the problem you are facing.


Op donderdag 17 mei 2012 23:07:53 UTC+2 schreef Alexey Borzenkov:

Aaron Blohowiak

unread,
Oct 4, 2013, 5:20:03 PM10/4/13
to golan...@googlegroups.com
The existing implementations create mailboxes that grow to the MAX(buffer_length) but never shrink :(  using a simple linked-list for your buffer is nicer. and has semantics easy to reason about.

>nothing fancy, it an extremely  simple piece of code, probably less than 100 lines of code

If you could post the python, we could discuss rebuilding its same semantics in golang (for better or worse ;)
Reply all
Reply to author
Forward
0 new messages