Messages taking longer, having numerous timeouts.

1,208 views
Skip to first unread message

Jon Stevens

unread,
Feb 13, 2017, 5:04:39 PM2/13/17
to nsq-users
I am considering a switch from RabbitMQ to NSQ but have been having issues with some of my testing. The first queue I am testing with is a job queue where each message should be delivered once and the jobs range from a couple minutes to close to an hour to run. I want my consumers running 2 jobs at once. Here's my simplified consumer:

func main() {

Subscribe("work", "spawn", func(msg *nsq.Message) error {

defer msg.Finish()
println("Msg received:", string(msg.Body))

//...

return nil
})
}

func Subscribe(topic, channel string, handler nsq.HandlerFunc) {

cnf := nsq.NewConfig()
cnf.MsgTimeout = 1 * time.Minute
cnf.BackoffStrategy = &nsq.FullJitterStrategy{}
cnf.MaxInFlight = 2
cnf.ClientID = "someid"

consumer, err := nsq.NewConsumer(topic, channel, cnf)
if err != nil {
panic(err)
}

consumer.AddHandler(handler)
consumer.SetLogger(nil, nsq.LogLevelWarning)

if err := consumer.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
panic(err)
}

<-consumer.StopChan
}

My producer isn't doing anything special. It is using the same *nsq.Config as above and looks something like this:

func Publish(topic string, msg []byte) error {

producer, err := nsq.NewProducer("127.0.0.1:4150", cnf)
if err != nil {
return err
}
producer.SetLogger(nil, nsq.LogLevelWarning)

return producer.Publish(topic, msg)
}

I am having several issues with this setup and I can't figure out why. I don't feel like I'm doing anything too crazy or out of the ordinary here. For one, these consumers are only accepting one message at a time even though the max in flight is set to 2. Also, in the nsqadmin ui it shows 16k+ messages total and 22k+ timed out & requeued. In watching the ui as the queue is being processed, the number of connections fluctuates between 0 and the actual 40 consumers that are processing the queue. Every minute I update the page it shows a different number of consumers connected; all the while, the timed out/requeued numbers keep going up. In addition, it is taking almost twice as long to process this queue in NSQ as it did in RabbitMQ which doesn't make any sense to me either.

Can anyone see anything glaringly obvious that I'm doing wrong with this? I haven't spent a whole lot of time researching NSQ I just wanted to set up something simple and see if I could get it working but I have had a ton of issues. Any help is greatly appreciated!

Jehiah Czebotar

unread,
Feb 13, 2017, 8:36:22 PM2/13/17
to Jon Stevens, nsq-users
two things that might be helpful

1) go-nsq has difference concepts for configured message concurrency (designed to achieve high throughput and mitigate connection latency), and handler concurrency. There are different ways to structure solving this problem, but in your code you want both. To adjust the number of goroutines running your handler switch to AddConcurrentHandlers instead of AddHandler. (the alternative is to call msg.DisableAutoResponse and immediately pass messages to your own goroutines which must then call touch/finish/requeue as appropriate). 


2) time limits. nsqd has strict limits it imposes on miss-behaving clients. If you see the "timed out" increment. that's you. messages must be completed or touched within the `-msg-timeout=60s` configured on the server. If you exceed that your client will be kicked and the message requeued for a new client to process. You can call `msg.Touch()` repeatedly to extend the lease on a message for another timeout period up to the configured nsqd `-max-msg-timeout=15m`.

NSQ is designed for realtime low latency message processing which is often at odds with processing messages for a long period of time, and reacting to client failures quickly. You will see that reflected in the timeouts chosen.

cheers.

--
Jehiah

Jon Stevens

unread,
Feb 14, 2017, 12:02:09 PM2/14/17
to nsq-users, jonathan...@gmail.com
That makes sense. I didn't realize there was a separate handler for concurrency. Is there any purpose of using the MaxInFlight with more than 1 without using the AddConcurrentHandlers? 

I also didn't realize that about the timeouts. So, the msg timeout is the time that it takes to run, not the timeout for a consumer not accepting a message, correct?

Jehiah Czebotar

unread,
Feb 14, 2017, 12:48:18 PM2/14/17
to Jon Stevens, nsq-users
goroutine concurrency is just one need for concurrency. As i mentioned one use is to mitigate latency. If you imagine that nsqd and the consumer are 100ms apart network wise, if you have max-in-flight of 1 the message must go from nsqd to consumer (+100ms) and the msg of completion must travel back (+100ms) before nsqd starts sending the next message. If processing is instantaneous, you can't process more than 5 messages/second. If you want to let nsqd send more messages than the consumer can handle at a time in this situation, say max-in-flight=10, then nsqd can send 10 on the network wire, and as it receives confirmation of each completion it will send another one (if/when it has more messages) even though the other 9 are still outstanding. This gives a max throughput of 10msgs every 200ms, or 50/sec. The tradeoff is that if the consumer dies there are more messages outstanding that will be requeued.

You can also se how the message concurrency also allows consumers to batch; nsq_to_file is a good example where you might be writing a gzip file, and do a file.write(msg + "\n")  immediately, but wait for a whole bunch of messages (or a timer to trigger) before doing a file.sync() and sending the finish message back to nsqd.

The message times are from the nsqd server perspective, so it's the instant nsqd sends a message until it must have a finish/requeue/touch on that message. They include any network latency, and delay before a consumer processes that message.

hope that helps!

-- 
Jehiah

Jon Stevens

unread,
Feb 14, 2017, 12:57:12 PM2/14/17
to nsq-users, jonathan...@gmail.com
That does help thank you. So, for this particular job queue, some jobs might take a couple hours to run while others might take 30 seconds. What does unsetting the timeout do? Or setting it to 0? Does it not ever time out?

Jehiah Czebotar

unread,
Feb 14, 2017, 1:05:27 PM2/14/17
to Jon Stevens, nsq-users
The tradeoff you have to think about is "what do I want to happen if this process is kill -9'd while handling a message". 

Sometimes that risk isn't a big concern and it's sufficient to simply call msg.Finish() immediately and begin the long running task**. Sometimes you want the tradeoff of msg.Finish() immediately and some local state file so if the worker restarts it can pick up where it left off. Sometimes you want nsqd to manage the retry so a new/different worker can do the retry and leverage those semantics.

NSQ just exposes options for you to make those tradeoffs that are right for you. There is no unsetting timeouts; you will need to set them sufficiently high and touch messages as appropriate in your use case.

** in some cases you want to msg.Finish() right away but NOT get any new messages. To do that run call ChangeMaxInFlight(0) before Finish() and then another ChangeMaxInFlight when ready for more messages 

-- 
Jehiah

Jon Stevens

unread,
Feb 14, 2017, 1:11:57 PM2/14/17
to nsq-users, jonathan...@gmail.com
What is the behavior of nsqd in the event that I do call msg.Finish() immediately and then start the long running process? Does it send another one immediately but only process it if a handler is available? if so, does it just sit in limbo on the consumer waiting for a handler?

Jud White

unread,
Feb 17, 2017, 1:52:17 PM2/17/17
to nsq-users, jonathan...@gmail.com
If you FIN the message immediately but keep the handler from returning you could block the readLoop (bad times). This was recently discussed in https://github.com/nsqio/go-nsq/issues/204.

Jon Stevens

unread,
Feb 17, 2017, 6:32:40 PM2/17/17
to nsq-users, jonathan...@gmail.com
Ok great. That helps a lot. I like the idea of using msg.Touch() periodically rather than setting the timeout to some ridiculous number. I know in that link it mentions calling msg.Touch() every 10 seconds. Is that often really necessary? How often is sufficient to keep the message from timing out?

Pierce Lopez

unread,
Feb 21, 2017, 1:03:55 PM2/21/17
to nsq-users, jonathan...@gmail.com
On Friday, February 17, 2017 at 6:32:40 PM UTC-5, Jon Stevens wrote:
Ok great. That helps a lot. I like the idea of using msg.Touch() periodically rather than setting the timeout to some ridiculous number. I know in that link it mentions calling msg.Touch() every 10 seconds. Is that often really necessary? How often is sufficient to keep the message from timing out?

You just need to be sure that the Touch command gets to the server before the server times-out the message. The default timeout is 60s, so you could send a touch every 50s if you were sure there would never be anything that would delay the Touch command enough to put it over 60s from the server's point of view. You probably also need to change the -max-msg-timeout for nsqd, since by default it won't tolerate Touch extending the message lifetime over 15min.

Jon Stevens

unread,
Feb 25, 2017, 3:01:55 PM2/25/17
to nsq-users, jonathan...@gmail.com
So, I can only Touch the message for 15 minutes before nsqd times it out anyways if it's past the max msg timeout duration?

Pierce Lopez

unread,
Feb 25, 2017, 4:47:57 PM2/25/17
to nsq-users, jonathan...@gmail.com
On Saturday, February 25, 2017 at 3:01:55 PM UTC-5, Jon Stevens wrote:
So, I can only Touch the message for 15 minutes before nsqd times it out anyways if it's past the max msg timeout duration?

Yes. But to state it more accurately, 15 minutes is the default max-msg-timeout

nsqd --help
...
  -max-msg-timeout duration
        maximum duration before a message will timeout (default 15m0s)
...

Jon Stevens

unread,
Feb 25, 2017, 4:51:03 PM2/25/17
to nsq-users, jonathan...@gmail.com
I understand that. I'm asking how long can I use the msg.Touch command past the msg timeout. If I set the msg timeout to 10 minutes but the process takes 15 min, can't I use the touch command to make sure it doesn't time it out? What happens if my timeout is set to 5 min and the process takes an hour? How long can I use the msg.Touch command past the 10 min timeout mark?

Pierce Lopez

unread,
Feb 25, 2017, 5:04:37 PM2/25/17
to nsq-users, jonathan...@gmail.com
You can specify a non-default --max-msg-timeout to be the duration you prefer.
You can not use .Touch() after the max-msg-timeout (or it just won't work).

Jon Stevens

unread,
Feb 25, 2017, 5:09:09 PM2/25/17
to nsq-users, jonathan...@gmail.com
Are you talking about a command line flag to pass when running nsqd? Because there's also a MsgTimeout in the client library. How can I change the max timeout based on the queue? and whats the difference between these two timeouts?

Pierce Lopez

unread,
Feb 25, 2017, 5:24:56 PM2/25/17
to nsq-users, jonathan...@gmail.com
When I wrote the following, it showed that --max-msg-timeout is a command-line flag for nsqd:


On Saturday, February 25, 2017 at 2:47:57 PM UTC-7, Pierce Lopez wrote:

nsqd --help
...
  -max-msg-timeout duration
        maximum duration before a message will timeout (default 15m0s)
...


For an explanation of the different timeouts, read the first reply in this thread again.
Reply all
Reply to author
Forward
0 new messages