retrieving just one message

517 views
Skip to first unread message

Jason Aten

unread,
Sep 30, 2015, 1:52:05 AM9/30/15
to nsq-users
How do I set RDY=1 from a Consumer so that I only receive 1 message from a given channel?

I've tried setting ChangeMaxInFlight to 1, but I still get 2 messages retrieved from the channel. e.g.

    config := nsq.NewConfig()
    q, _ := nsq.NewConsumer("write", "ch", config)

    q.ChangeMaxInFlight(1)
    q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        log.Printf("Got a message: %#v", message)                                                                                                                                 
        message.Finish()
    }


Jehiah Czebotar

unread,
Sep 30, 2015, 8:02:30 AM9/30/15
to Jason Aten, nsq-users
NSQ consumers manage their concurrency, not total messages.

That means that `MaxInFlight` is not controlling that you will only
get one message, it's controlling that you only get one message **at a
time**. As soon as you do `message.Finish()` nsq will send you another
message (if it has one to send).

RDY signals the number of messages a specific connection between a
consumer and a single nsqd is ready for. This is typically
(MaxInFlight/#connections).

--
Jehiah

Jason Aten

unread,
Sep 30, 2015, 10:40:08 AM9/30/15
to nsq-users, j.e....@gmail.com
HI Jehiah, thanks for the explanation of the parameters. 

It sounds like it is thus impossible to guarantee that one client can receive a single message then? I assumed with one connection from one Consumer and setting RDY=1 would do this; however:
a) I can't find any way to set RDY from Consumer; Consumer.conns() is not Consumer.Conns() and is thus not exported to users of the Consumer.
b) even when I change the go-nsq library and make conns() -> Conns() and then call  conn.WriteCommand(nsq.Ready(1)), I still get two messages sent to my Consumer.

My use case is dispatching a long running job. I'd like each Consumer to get only one message at a time, and to not get any more until they've finished, minutes later, the first one they received.

Jehiah Czebotar

unread,
Sep 30, 2015, 11:20:30 AM9/30/15
to Jason Aten, nsq-users
Using `MaxInFlight=1` means you will only get a single message until
you call message.Finish(). That should be exactly what you want, you
just need to ensure that all your work happens before you call Finish.
(Under the covers go-nsq will manage RDY if you have more than one
connection). If you have long running tasks you'll just need to make
sure to account for that in your message timeout settings so that NSQ
isn't giving up on the consumer and requeueing the message before you
expect it to.

If you think you are seeing multiple messages can you provide complete
steps to reproduce? You should probably look at nsqd `/stats` output
before and after publishing your one message to make sure you are
seeing the one message you add and that there are not other messages
queued.

--
Jehiah

Jason Aten

unread,
Sep 30, 2015, 11:45:47 AM9/30/15
to nsq-users, j.e....@gmail.com
Hi Jehiah

On Wednesday, September 30, 2015 at 8:20:30 AM UTC-7, Jehiah wrote
If you think you are seeing multiple messages can you provide complete
steps to reproduce? You should probably look at nsqd `/stats` output
before and after publishing your one message to make sure you are
seeing the one message you add and that there are not other messages
queued.

Are we discussing the same scenario? I will frequently have 2 messages waiting in the channel, already published. I would like to consume exactly one them.

(For my job management sceanrio: there will always be lots of jobs sitting waiting, but I'd like to process them in parallel, and so have the Consumer doing the work only grab one at a time).

Jehiah Czebotar

unread,
Sep 30, 2015, 12:28:35 PM9/30/15
to Jason Aten, nsq-users
Yes, i believe we are discussing the same scenario.

When you use max-in-flight=1 nsqd will only send one message
(regardless of number queued) until

a) you call .Finish(), or
b) consumer processing passes the msg timeout (configured via nsqd
`--msg-timeout=60s` or `Config.MsgTimeout`[1] on your client). or
c) the client connection is uncleanly closed in which case the message
is requeued by `b`.

The reason i asked you to check nsqd `/stats` output before and after
is so you could identify if server side timeouts were happening in
your case.

[1]: https://godoc.org/github.com/nsqio/go-nsq#Config.MsgTimeout
--
Jehiah

Jason Aten

unread,
Sep 30, 2015, 12:58:48 PM9/30/15
to nsq-users, j.e....@gmail.com
Here are the full steps, with transcript,to reproduce what sounds like is a bug in the implementation:

Jehiah Czebotar

unread,
Sep 30, 2015, 1:42:51 PM9/30/15
to Jason Aten, nsq-users
A few comments

As i mentioned before, you are handling one message at a time and will
not get a second message until after your code calls `.Finish()` or
the message times out.

Your code however is structured such that the `callee` does
`message.Finish()` immediately before you do a publish. At that point
in time a second message can be sent from nsqd to that `callee`
(because you signaled you are done with that message, and can again
receive up to max-in-flight=1) and it will consume the message, call
`.Finish()` and *then* block on writing to `ch`
https://github.com/glycerine/double-messages-received-bug/blob/master/callee.go#L84
for which `consumeRequest()` will no longer consume from.

You should structure your code so that all your work happens before
you finish a message, and so that consumers are long-lived. If as you
mentioned your processing took a long time (say a sleep 15s) you would
see that during that 15s messages would queue if there were no
additional available consumers (`callee` in your case).
--
Jehiah

josh rotenberg

unread,
Sep 30, 2015, 1:58:27 PM9/30/15
to Jehiah Czebotar, Jason Aten, nsq-users
Sorry, I needed a diversion on the train. Jason, does this gist cover
what you are trying to do?

https://gist.github.com/joshrotenberg/5a3acb44d3dbad884397

Jehiah Czebotar

unread,
Sep 30, 2015, 2:00:32 PM9/30/15
to Jason E. Aten, nsq-...@googlegroups.com
Typically you do all processing before calling Finish, and at that
time you are ready to process the next message. (It's generally
expected that consumers are long-lived, but that isn't a hard
requirement)

if you want to gracefully shut down you have two options

1) ChangeMaxInFlight(0) before calling .Finish()
2) call consumer.Stop() and have your main thread wait on the stop
channel - https://godoc.org/github.com/nsqio/go-nsq#Consumer.Stop

Typically you have a term signal handler that will call
consumer.Stop() and your main goroutine will block on
https://godoc.org/github.com/nsqio/go-nsq#Consumer.StopChan
immediately after connecting to nsqd.


On Wed, Sep 30, 2015 at 1:57 PM, Jason E. Aten <j.e....@gmail.com> wrote:
> Or maybe I should have the callee set MaxInFlight to 0 before calling Finish
> on the message?
>
> On Wed, Sep 30, 2015 at 10:54 AM, Jason E. Aten <j.e....@gmail.com> wrote:
>>
>> It sounds like callee needs a way to atomically Finish a message and then
>> disconnect, all at once.
>>
>> Callee cannot disconnect before it has finished the message, or else there
>> is no way to tell nsqd about the finish. And without a disconnect combined
>> with a Finish atomically, there is a race (no?) where we'll likely get the
>> second message before we can disconnect.
>>
>> Is such an operation available?
>>
>>
>> On Wed, Sep 30, 2015 at 10:42 AM, Jehiah Czebotar <jeh...@gmail.com>
>> wrote:
>>>
--
Jehiah

Jason Aten

unread,
Sep 30, 2015, 2:57:24 PM9/30/15
to nsq-users, j.e....@gmail.com
// to finish the discussion that inadvertently became private

Big thanks to Josh and Jehiah for suggestions on fixing my race. In summary, for future reference: to get a clean shutdown without loosing a message to the shutting-down Consumer, either:

a) call Consumer.Stop() before doing message.Finish(); this seems to be the designed-in way to solve the shutdown race; or
b) SetMaxInFlight(0) before doing message.Finish().

Thanks again guys!

On Wed, Sep 30, 2015 at 11:33 AM, josh wrote:
Well, just for fun, I updated the gist. Now the handler gets a
reference to it's consumer, and calls Stop on it right before it calls
m.Finish(). It seems to, uh, work, I think, maybe.

Hi Josh, Thanks for pointing out Consumer.Stop(); it looks like it sends StartClose(), which according to the doc comment, is designed to handle my shutdown scenario.  I tested it as an alternative to setting MaxInFlight to 0 before Finish(), and it does appear to work just fine to solve my race; my callee only gets one message.

// StartClose creates a new Command to indicate that the                                                                                                             
// client would like to start a close cycle.  nsqd will no longer                                                                                                          
// send messages to a client in this state and the client is expected                                                                                                
// finish pending messages and close the connection     
func StartClose() *Command {
    return &Command{[]byte("CLS"), nil, nil}
}

 
Reply all
Reply to author
Forward
0 new messages