cfg := nsq.NewConfig()cfg.UserAgent = fmt.Sprint("go-nsq/%s", nsq.VERSION)
.....
producers := make([]*nsq.Producer, 0, len(addresses))
for _, addr := range addresses { producer, err := nsq.NewProducer(addr, cfg) if err != nil { kslog.Error(logModule, err.Error()) } else { producers = append(producers, producer) }}
PublishClient = &PublishHandler{ addresses: addresses, producers: producers, perAddressStatus: perAddressStatus, timermetrics: timermetrics.NewTimerMetrics(250, "[aggregate]:"),}
func (ph *PublishHandler) Publish(header *pb.MessageEnvelope, message []byte, topic string) error {
if len(message) == 0 { return errors.New("No message") }
// Choose a random node to publish to pos := rand.Intn(len(ph.producers)) err := ph.producers[pos].Publish(fmt.Sprintf("%v_%v", topic, header.GetPriority()), message) if err != nil { kslog.Warningf(logModule, "Error while sending message: %v", err) return err }
return nil}
[nsqd] 2015/01/26 03:52:21.555500 TCP: new client(...)
[nsqd] 2015/01/26 03:52:22.653956 CLIENT(...): desired protocol magic ' V2'
[nsqd] 2015/01/26 03:52:22.654216 [...] IDENTIFY: {ShortId:... LongId:... ClientID:... Hostname:... HeartbeatInterval:30000 OutputBufferSize:16384 OutputBufferTimeout:250 FeatureNegotiation:true TLSv1:false Deflate:false DeflateLevel:6 Snappy:false SampleRate:0 UserAgent:go-nsq/%s1.0.2-alpha MsgTimeout:0}
[nsqd] 2015/01/26 03:52:25.127870 ERROR: [...] - E_BAD_MESSAGE PUB failed to read message body - unexpected EOF
[nsqd] 2015/01/26 03:52:25.127924 PROTOCOL(V2): [...] exiting ioloop
[nsqd] 2015/01/26 03:52:25.128001 PROTOCOL(V2): [...] exiting messagePump
[nsqd] 2015/01/26 03:52:25.243042 TCP: new client(...)
[nsqd] 2015/01/26 03:52:25.507237 ERROR: [...] - E_BAD_MESSAGE PUB failed to read message body - unexpected EOF
[nsqd] 2015/01/26 03:52:25.507296 PROTOCOL(V2): [...] exiting ioloop
[nsqd] 2015/01/26 03:52:25.507388 PROTOCOL(V2): [...] exiting messagePump
[nsqd] 2015/01/26 03:52:25.563955 TCP: new client(...)
[nsqd] 2015/01/26 03:52:25.574751 CLIENT(...): desired protocol magic ' V2'
[nsqd] 2015/01/26 06:55:49.326789 ...] IDENTIFY: {ShortId:... LongId:... ClientID:... Hostname:... HeartbeatInterval:30000 OutputBufferSize:16384 OutputBufferTimeout:250 FeatureNegotiation:true TLSv1:false Deflate:false DeflateLevel:6 Snappy:false SampleRate:0 UserAgent:go-nsq/1.0.2-alpha MsgTimeout:0}
[nsqd] 2015/01/26 06:55:49.326895 PROTOCOL(V2): [...] exiting ioloop
[nsqd] 2015/01/26 06:55:49.326933 ERROR: client(...) - failed to read command - EOF
[nsqd] 2015/01/26 06:55:49.326947 PROTOCOL(V2): [...] exiting messagePump
...