I am considering a switch from RabbitMQ to NSQ but have been having issues with some of my testing. The first queue I am testing with is a job queue where each message should be delivered once and the jobs range from a couple minutes to close to an hour to run. I want my consumers running 2 jobs at once. Here's my simplified consumer:
func main() {
Subscribe("work", "spawn", func(msg *nsq.Message) error {
defer msg.Finish()
println("Msg received:", string(msg.Body))
//...
return nil
})
}
func Subscribe(topic, channel string, handler nsq.HandlerFunc) {
cnf := nsq.NewConfig()
cnf.MsgTimeout = 1 * time.Minute
cnf.BackoffStrategy = &nsq.FullJitterStrategy{}
cnf.MaxInFlight = 2
cnf.ClientID = "someid"
consumer, err := nsq.NewConsumer(topic, channel, cnf)
if err != nil {
panic(err)
}
consumer.AddHandler(handler)
consumer.SetLogger(nil, nsq.LogLevelWarning)
if err := consumer.ConnectToNSQLookupd("
127.0.0.1:4161"); err != nil {
panic(err)
}
<-consumer.StopChan
}
My producer isn't doing anything special. It is using the same *nsq.Config as above and looks something like this:
func Publish(topic string, msg []byte) error {
producer, err := nsq.NewProducer("
127.0.0.1:4150", cnf)
if err != nil {
return err
}
producer.SetLogger(nil, nsq.LogLevelWarning)
return producer.Publish(topic, msg)
}
I am having several issues with this setup and I can't figure out why. I don't feel like I'm doing anything too crazy or out of the ordinary here. For one, these consumers are only accepting one message at a time even though the max in flight is set to 2. Also, in the nsqadmin ui it shows 16k+ messages total and 22k+ timed out & requeued. In watching the ui as the queue is being processed, the number of connections fluctuates between 0 and the actual 40 consumers that are processing the queue. Every minute I update the page it shows a different number of consumers connected; all the while, the timed out/requeued numbers keep going up. In addition, it is taking almost twice as long to process this queue in NSQ as it did in RabbitMQ which doesn't make any sense to me either.
Can anyone see anything glaringly obvious that I'm doing wrong with this? I haven't spent a whole lot of time researching NSQ I just wanted to set up something simple and see if I could get it working but I have had a ton of issues. Any help is greatly appreciated!