go-nsq only process a single message at a time.

1,492 views
Skip to first unread message

Athiwat Chunlakhan

unread,
Sep 26, 2013, 4:47:33 AM9/26/13
to nsq-...@googlegroups.com
This is the code that I use to subscribe.

signalRawNSQ := setUpNSQ("signal_insert", "raw", new(SignalInsertRaw))

func setUpNSQ(topic, channel string, handle nsq.Handler) *nsq.Reader {
// Setting up the NSQ reader
reader, err := nsq.NewReader(topic, channel)
if err != nil {
//fmt.Println(err.Error())
}

reader.SetMaxInFlight(100)
reader.AddHandler(handle)

reader.ConnectToLookupd("127.0.0.1:4161")

return reader
}

But the function only process 1 message at a time. Instead of multiple. What I'm I doing wrong here. Or it's this by design?

Note that I process up to 1000000 message per second. So most of the time the queue gets filled up. Even then it only process one job at a time in a single go program.

Henrik Johansson

unread,
Sep 26, 2013, 7:49:27 AM9/26/13
to Athiwat Chunlakhan, nsq-...@googlegroups.com
Funny! I was just considering the concurrency behavior of the Handlers. :)

I came to the conclusion that each handler is called sequentially per "add".
So it would be by design but someone from the NSQ team could perhaps correct me if I am wrong.

Jehiah Czebotar

unread,
Sep 26, 2013, 7:53:06 AM9/26/13
to nsq-...@googlegroups.com
There are currently two separate signatures for handling messages in
the reader. A synchronous one (`reader.AddHandler`) that processes one
message at a time and uses the return of the function as msg handling
status (ie: ok or not) and an asynchronous one
(`reader.AddAsyncHandler`) which provides a channel to defer
completion of the message to some event after completion of the
handling function. (see docs
http://godoc.org/github.com/bitly/go-nsq#Reader.AddAsyncHandler)

In both cases, the number of concurrent goroutines that are running
the message handling function is dictated by the number of times you
call `reader.AddHandler` or `reader.AddAsyncHandler`. You can see an
example of that here:
https://github.com/bitly/nsq/blob/master/apps/nsq_to_http/nsq_to_http.go#L288-L298

Does that help?

--
Jehiah

Athiwat Chunlakhan

unread,
Sep 26, 2013, 8:24:47 AM9/26/13
to nsq-...@googlegroups.com
I understand that as of https://github.com/bitly/go-nsq/blob/master/reader.go#L1141

So there's no way to run multiple message concurrently with error checking?

So my solution here is to just ignore the error and do a "return nil" after creating a new goroutine.

Like

func Handler(message ...) error {
  temp := message.body
  go func() {.. Do something ..}()
  return nil
}

But with the above. You have to implement rate limiting yourself.

Could there be a better solution than this? With build-in rate limiting. Something like.
func (q *Reader) AddHandler(handler Handler, parallel int) {

Thanks for a great product! Really help out with my problem here.

Henrik Johansson

unread,
Sep 26, 2013, 8:35:47 AM9/26/13
to Athiwat Chunlakhan, nsq-...@googlegroups.com
I think I do not grok the Async part of the AsyncHandler.

Looking at https://github.com/bitly/go-nsq/blob/master/reader.go#L1215 for example I would expect it to run this in another goroutine such as you suggested.

It seems like both the handler interfaces read from the incomingMessages channel in the same way.
How and where does it become async given that handle.HandleMessage is a synchronous call?

What am I missing?

Athiwat Chunlakhan

unread,
Sep 26, 2013, 8:39:48 AM9/26/13
to nsq-...@googlegroups.com, Athiwat Chunlakhan
Thanks, I got it working. I got confused by the code.

Thanks again!

Matt Reiferson

unread,
Sep 26, 2013, 9:57:12 AM9/26/13
to Athiwat Chunlakhan, Henrik Johansson, nsq-...@googlegroups.com
Hi,

There are two parts that control go-nsq worker concurrency.  One is the handler side, like Jehiah said you need to add >1 handler to operate concurrently.  The other part is MaxInFlight, this also needs to be set >1 or your worker will not ask for more than 1 message at a time.

Both async and non-async can perform work concurrently because they are both launched in their own goroutines when added, see:

and

The "async" mode of operation is useful for a few specific situations, primarily when you want to batch the work you're doing.  The canonical example is nsq_to_file, see: https://github.com/bitly/nsq/blob/master/apps/nsq_to_file/nsq_to_file.go

In most cases, because go has goroutines and all handlers are running in their own, it's probably easier (and produces more readable code) to use non-async handlers.

Hope this helps.

-Matt

Henrik Johansson

unread,
Sep 26, 2013, 10:30:57 AM9/26/13
to Matt Reiferson, Athiwat Chunlakhan, nsq-...@googlegroups.com
Hi Mat,

I saw the AddHandler spawned goroutine and its async counterpart and they are basically doing the same thing.
Ill try out MaxInFlight but what does in-fligt really mean? Is a message that is in flight transient in such away that it can be lost in case of a severe crash?

I am for sure interested in the concurrency aspects but I really need to grok how this ties in to delivery guarantees as well.

Thanks,
Henrik
 

Matt Reiferson

unread,
Sep 26, 2013, 10:38:46 AM9/26/13
to Henrik Johansson, Athiwat Chunlakhan, nsq-...@googlegroups.com
The "Building Client Libraries" document is actually very useful to understand the philosophy and implementation of client libraries.  Particularly sections:


To summarize, in-flight simply means nsqd has sent the message to a consumer.  MaxInFlight is a consumer side concept that determines how many messages Reader will indicate it is capable of receiving *before responding*.

In-flight messages are all maintained server-side (in nsqd), so unstable consumers do not impact availability.  If a consumer does not respond to an in-flight message (for whatever reason), it times out server side and is re-queued automatically.

Henrik Johansson

unread,
Sep 26, 2013, 10:43:59 AM9/26/13
to Matt Reiferson, Athiwat Chunlakhan, nsq-...@googlegroups.com
A bit embarrassed, I had read it... :S

It clearly answers it, thanks!
Reply all
Reply to author
Forward
0 new messages