Publishing messages in parallel causes Not Connected errors

786 views
Skip to first unread message

Jonathan Lavon

unread,
Jan 25, 2015, 6:03:58 AM1/25/15
to nsq-...@googlegroups.com
Hey,
We have a cluster of NSQ with a lot of throughput - several daemons that consume a message from the queue, process it and return to a different queue.
We've got a lot of goroutines that process and publish messages concurrently.

It usually works OK, but if there is a big load on the servers, we get a lot of Not Connected errors from the NSQ client when we try to publish the messages back to the queue.

We think we know where this is coming from:
It seems like when there's a temporary error, like I/O timeout, the connection to NSQ is closed - it first marks the connection state as disconnected, closes the connection, then waits - I guess it waits for all concurrent users of the connection to finish.
Only after they are all finished, it changes the connection state back to init.

Now, in the meantime, if a message is published, the client will try to reconnect - I can't seem to find any lock or something that will cause it to wait for the closure - and if that happens between the time the state was set to disconnected and the time the state is set to init, a Not Connected error is returned.

So, what is the correct way to handle such a situation?
Is it the users responsibility to retry a message that failed this way? Is there some functionality we're missing?

Thanks.

Matt Reiferson

unread,
Jan 25, 2015, 10:03:20 PM1/25/15
to Jonathan Lavon, nsq-...@googlegroups.com
Hi Jonathan,

Basically, the Producer is trying to make sure that all of its outstanding transactions have been "finished".

Can you share some code?  I'm particularly curious if you're sharing Producers between goroutines, etc.  Relatedly, we need to figure out why you're getting IO timeouts.

Jonathan Lavon

unread,
Jan 26, 2015, 2:47:43 AM1/26/15
to nsq-...@googlegroups.com, jona...@similargroup.com
Yeah, we have a slice of producers that are shared between all goroutines - they're initialized at init():

cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprint("go-nsq/%s", nsq.VERSION)

.....

producers := make([]*nsq.Producer, 0, len(addresses))

for _, addr := range addresses {
producer, err := nsq.NewProducer(addr, cfg)
if err != nil {
kslog.Error(logModule, err.Error())
} else {
producers = append(producers, producer)
}
}

PublishClient = &PublishHandler{
addresses:        addresses,
producers:        producers,
perAddressStatus: perAddressStatus,
timermetrics:     timermetrics.NewTimerMetrics(250, "[aggregate]:"),
}

When we send a message, we select a producer randomly out of the slice:
func (ph *PublishHandler) Publish(header *pb.MessageEnvelope, message []byte, topic string) error {

if len(message) == 0 {
return errors.New("No message")
}

// Choose a random node to publish to
pos := rand.Intn(len(ph.producers))
err := ph.producers[pos].Publish(fmt.Sprintf("%v_%v", topic, header.GetPriority()), message)
if err != nil {
kslog.Warningf(logModule, "Error while sending message: %v", err)
return err
}

return nil
}

The PublishClient.Publish() is accessed by all the goroutines.

We're still investigating the I/O timeout errors we've received - is there some log that can help us see why they're happening? Looking at the servers' CPU and network load hasn't revealed anything suspicious.
We're getting these errors:
(10.0.10.111:4150) IO error - write tcp 10.0.10.111:4150: i/o timeout
(10.0.10.111:4150) sending command - write tcp 10.0.10.111:4150: i/o timeout

Matt Reiferson

unread,
Jan 27, 2015, 1:39:58 PM1/27/15
to Jonathan Lavon, nsq-...@googlegroups.com
You can check the corresponding nsqd logs to see if there's any more information there.

On EC2 there's a virtualization bug that you might be experiencing (that is known to cause unexpected timeouts):


My first priority is to figure out why you're getting timeouts, that's the underlying issue.  But, to answer your original question, yes, go-nsq's Producer pushes the responsibility of retries onto the user, so you should implement a sane strategy for that.

The approach your taking should work fine, but it does mean that a single failing destination nsqd could randomly wreak havoc.  If you had a pool of publisher goroutines, a single bad actor wouldn't do as much damage.

Jonathan Lavon

unread,
Jan 28, 2015, 6:44:25 AM1/28/15
to nsq-...@googlegroups.com, jona...@similargroup.com
Thanks for your help.

We are not using virtual machines - everything is on physical servers without virtualization.
I took a look at the logs from the server, and I'm seeing these messages that correspond roughly to the times we get those timeout errors:

[nsqd] 2015/01/26 03:52:21.555500 TCP: new client(...)
[nsqd] 2015/01/26 03:52:22.653956 CLIENT(...): desired protocol magic '  V2'
[nsqd] 2015/01/26 03:52:22.654216 [...] IDENTIFY: {ShortId:... LongId:... ClientID:... Hostname:... HeartbeatInterval:30000 OutputBufferSize:16384 OutputBufferTimeout:250 FeatureNegotiation:true TLSv1:false Deflate:false DeflateLevel:6 Snappy:false SampleRate:0 UserAgent:go-nsq/%s1.0.2-alpha MsgTimeout:0}
[nsqd] 2015/01/26 03:52:25.127870 ERROR: [...] - E_BAD_MESSAGE PUB failed to read message body - unexpected EOF
[nsqd] 2015/01/26 03:52:25.127924 PROTOCOL(V2): [...] exiting ioloop
[nsqd] 2015/01/26 03:52:25.128001 PROTOCOL(V2): [...] exiting messagePump
[nsqd] 2015/01/26 03:52:25.243042 TCP: new client(...)
[nsqd] 2015/01/26 03:52:25.507237 ERROR: [...] - E_BAD_MESSAGE PUB failed to read message body - unexpected EOF
[nsqd] 2015/01/26 03:52:25.507296 PROTOCOL(V2): [...] exiting ioloop
[nsqd] 2015/01/26 03:52:25.507388 PROTOCOL(V2): [...] exiting messagePump
[nsqd] 2015/01/26 03:52:25.563955 TCP: new client(...)
[nsqd] 2015/01/26 03:52:25.574751 CLIENT(...): desired protocol magic '  V2'


[nsqd] 2015/01/26 06:55:49.326789 ...] IDENTIFY: {ShortId:... LongId:... ClientID:... Hostname:... HeartbeatInterval:30000 OutputBufferSize:16384 OutputBufferTimeout:250 FeatureNegotiation:true TLSv1:false Deflate:false DeflateLevel:6 Snappy:false SampleRate:0 UserAgent:go-nsq/1.0.2-alpha MsgTimeout:0}
[nsqd] 2015/01/26 06:55:49.326895 PROTOCOL(V2): [...] exiting ioloop
[nsqd] 2015/01/26 06:55:49.326933 ERROR: client(...) - failed to read command - EOF
[nsqd] 2015/01/26 06:55:49.326947 PROTOCOL(V2): [...] exiting messagePump

Might these errors cause the timeouts? Perhaps the server doesn't return an answer when failing in such a way, and the client is left waiting on a dead socket?
Is there some way that the messages we are sending are causing these errors? They shouldn't be corrupted in any way - sending the same messages when not in such a load seems to work.
Can you perhaps point me to another log we can investigate?
Thanks.

Jonathan Lavon

unread,
Feb 2, 2015, 4:18:34 AM2/2/15
to nsq-...@googlegroups.com, jona...@similargroup.com
Hey Matt,
We still haven't found anything that can indicate what causes the errors.
Do you have a suggestion/idea?

Tangentially, is there some sort of recommended hardware for deploying NSQ servers in a production environment?
Thanks.
...

Matt Reiferson

unread,
Feb 9, 2015, 12:08:43 PM2/9/15
to Jonathan Lavon, nsq-...@googlegroups.com
Hi Jonathan,

Sorry for the delay.  Those logs don't really reveal anything particularly interesting, just that the clients publishing messages either aren't able to send the full body or the nsqd cannot read it.

You mentioned high throughput and load on these hosts - is there any resource starvation?  Is the client or nsqd maxing out a core, do you have GOMAXPROCS set to greater than 1, is cpu or network on the host saturated?

Do the interfaces on the host report any errors?

Jonathan Lavon

unread,
Feb 10, 2015, 5:19:01 AM2/10/15
to nsq-...@googlegroups.com, jona...@similargroup.com
Hi,
We're still investigating - CPU and network are not saturated, and no errors on any interface. The server has 10G network interface, and is on CentOS 6.
GOMAXPROCS is not set - so it's on the default.

what we've seen which might cause this is that the disk IO is at 100% when at load, which causes IO wait. We're setting up some better disk hardware to see if we can remedy this situation.
I'll update after we test that out. Do you have anything specific you think we should test?

Thanks.

Matt Reiferson

unread,
Feb 10, 2015, 10:16:56 AM2/10/15
to Jonathan Lavon, nsq-...@googlegroups.com
The saturated disk IO could certainly explain why there could be issues causing disconnections - are these topics/channels exceeding the high-water mark and reading/writing to disk?

Jonathan Lavon

unread,
Feb 12, 2015, 9:17:38 AM2/12/15
to nsq-...@googlegroups.com, jona...@similargroup.com
Hey Matt,
Thanks for your help.
The topics were indeed exceeding the limit, and writing most of their messages to the disk. After raising the limit, and installing faster disks, we get considerably less errors.
Now the bottleneck seems to be at the CPU, which is maxing out. Admittedly, we're stress testing now with unrealistic loads, but we're going to install another server and see if they can handle this load together.
Either way, it seems like the errors are indeed caused by hardware load.

If we still encounter these errors after balancing the load with the available resources, I'll post again.
Reply all
Reply to author
Forward
0 new messages