subscribe gobbles messages

282 views
Skip to first unread message

amos

unread,
Feb 11, 2009, 10:20:24 PM2/11/09
to AMQP
i have a problem using this library. just using 2 simple ruby
processes:

producer:

require 'rubygems'
require 'mq'

EM.run do
q = MQ.new
1000.times do
q.queue('test').publish('foo')
end
end


consumer:

require 'rubygems'
require 'mq'

EM.run do
q = MQ.new
q.queue('test').subscribe do |msg|
puts msg
sleep 1
end
end

i see that as soon as the consumer starts, it takes all 1000 messages
off the queue. if i ctrl-c out of that process, the messages are gone
from rabbit. if i try to use an ack version:

require 'rubygems'
require 'mq'

EM.run do
q = MQ.new
q.queue('test').subscribe(:ack => true) do |header, msg|
puts msg
header.ack
sleep 1
end
end


the messages never get acked (after a ctrl-c) and stay on the queue
between restarts. i'm using amqp-0.6 and rabbit 1.5.0.

Aman Gupta

unread,
Feb 11, 2009, 10:32:02 PM2/11/09
to ruby...@googlegroups.com
The ack version works as expected, correct?

When you subscribe, rabbit start sending you messages off of the queue
until the TCP buffer fills up.. since this buffer is pretty big
(especially compared to the size of each message), most of the
messages end up getting dequeued and send to the consumer. When you
ctrl+c and aren't used ack mode, these messages are effectively lost.

The solution is to use ack mode, or to use MQ.pop so you explicitly
ask for a message to be dequeued.

1.5 also contains some new flow control features specifically for this
problem. They should be trivial to add to the library as soon as I
figure out how to use them.

Aman

Ben Hood

unread,
Feb 12, 2009, 3:31:33 AM2/12/09
to ruby...@googlegroups.com
Aman,

On Thu, Feb 12, 2009 at 3:32 AM, Aman Gupta <themast...@gmail.com> wrote:
> 1.5 also contains some new flow control features specifically for this
> problem. They should be trivial to add to the library as soon as I
> figure out how to use them.

You might be interested in the basic.qos command that allows a
consumer to set it's own prefetch window size.

Ben

Aman Gupta

unread,
Feb 12, 2009, 3:35:28 AM2/12/09
to ruby...@googlegroups.com
How about this api:

# open channel
mq = MQ.new

# prefetch up to 10 messages
mq.prefetch(10)

# subscribe to a queue
mq.queue('name').subscribe{ |msg|
puts msg
}

Aman

amos

unread,
Feb 12, 2009, 2:07:07 PM2/12/09
to AMQP
try running the ack mode. if you ctrl+c after a couple of lines of
'foo', you'll still see 1000 messages on the queue. is that because
the ack isn't synchronous? if you let the program run all the way to
then end, then the queue will be down to 0.

i was having trouble with pop(). how would you rewrite the consumer to
pop all 1000 messages?

On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> The ack version works as expected, correct?
>
> When you subscribe, rabbit start sending you messages off of the queue
> until the TCP buffer fills up.. since this buffer is pretty big
> (especially compared to the size of each message), most of the
> messages end up getting dequeued and send to the consumer. When you
> ctrl+c and aren't used ack mode, these messages are effectively lost.
>
> The solution is to use ack mode, or to use MQ.pop so you explicitly
> ask for a message to be dequeued.
>
> 1.5 also contains some new flow control features specifically for this
> problem. They should be trivial to add to the library as soon as I
> figure out how to use them.
>
>   Aman
>

Aman Gupta

unread,
Feb 12, 2009, 2:38:25 PM2/12/09
to ruby...@googlegroups.com
You're using sleep(1) in your ack example, which blocks the reactor
and doesn't give the acks a chance to be sent over the network. Here's
an example using pop:

queue = MQ.new.queue('name')
queue.pop{ |msg|
# if msg is nil, the queue was empty
if msg
process(msg)
end

# wait two seconds and pop another message off
EM.add_timer(2){ queue.pop }
}

Aman

amos

unread,
Feb 12, 2009, 2:52:18 PM2/12/09
to AMQP
thanks, i think pop will work better.

the sleep(1) in my example was just representing processing on our
production machines. so if instead of sleep, i fetch messages from
memcached, fetch records from db, and send email, the result will be
the same: no ack sent. is there a way to have ack block?

On Feb 12, 11:38 am, Aman Gupta <themastermi...@gmail.com> wrote:
> You're using sleep(1) in your ack example, which blocks the reactor
> and doesn't give the acks a chance to be sent over the network. Here's
> an example using pop:
>
> queue = MQ.new.queue('name')
> queue.pop{ |msg|
>   # if msg is nil, the queue was empty
>   if msg
>     process(msg)
>   end
>
>   # wait two seconds and pop another message off
>   EM.add_timer(2){ queue.pop }
>
> }
>
>   Aman
>

Aman Gupta

unread,
Feb 12, 2009, 3:31:54 PM2/12/09
to ruby...@googlegroups.com
there are two solutions to this problem:

one is to use asynchronous libraries for everything such that there
are never any blocking actions. you can use em-mysql
(http://github.com/tmm1/em-mysql) to access a mysql database,
eventedcache (http://github.com/cliffmoon/eventedcache) for memcache
access and EM::P::SmtpClient to send emails.

the other is use use ruby threads to run your blocking code, such that
the main reactor can still continue to run alongside instead of
getting blocked as well. you can do this using EM.defer:

MQ.queue('name').subscribe{ |info, msg|
EM.defer(proc{
process(msg)
}, proc{
info.ack
})
}

Aman

amos

unread,
Feb 13, 2009, 3:57:24 PM2/13/09
to AMQP
i guess what i'm saying is that acks caught in tcp buffer create a
dangerous situation when your client dies (or is manually stopped).
the work is done but never acked. when the buffer consists of a 1000
acks, that's a lot of lost but completed work. i imagine that moving
to non-blocking libraries would minimize the delay, but it still seems
like you'll lose work. maybe i'm wrong. it's a difficult theory to
test without converting my entire application to use em.

i'm surprised more people don't run into this issue. i'm not doing
anything special with the client.

but thanks again for the advice. very helpful.

On Feb 12, 12:31 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> there are two solutions to this problem:
>
> one is to use asynchronous libraries for everything such that there
> are never any blocking actions. you can use em-mysql
> (http://github.com/tmm1/em-mysql) to access a mysql database,
> eventedcache (http://github.com/cliffmoon/eventedcache) for memcache
> access and EM::P::SmtpClient to send emails.
>
> the other is use use ruby threads to run your blocking code, such that
> the main reactor can still continue to run alongside instead of
> getting blocked as well. you can do this using EM.defer:
>
> MQ.queue('name').subscribe{ |info, msg|
>   EM.defer(proc{
>     process(msg)
>   }, proc{
>     info.ack
>   })
>
> }
>
>   Aman
>

Aman Gupta

unread,
Feb 13, 2009, 4:04:19 PM2/13/09
to ruby...@googlegroups.com
the ack is only lost if you cause the reactor to block. another way to
mitigate this is by doing your blocking action inside a EM.next_tick,
which will happen after the EM buffers have been written to the
network:

MQ.queue('name').subscribe{ |info, msg|
info.ack
EM.next_tick{
process(msg)

amos

unread,
Mar 4, 2009, 2:01:34 PM3/4/09
to AMQP
pop seems funky (rabbit crashed a couple of times) so i'm going back
to this method. unfortunately, i still haven't figured out how to
flush the ack buffer. i end up processing more messages than the
server recognizes. is there some stop command that will ensure that
all acks are sent to the server before the client quits? EM.stop and
AMQP.stop still leave messages un-acked.

Aman Gupta

unread,
Mar 4, 2009, 2:09:35 PM3/4/09
to ruby...@googlegroups.com
AMQP.stop should not leave the buffer unacked. Try setting a prefetch window:

MQ.send(AMQP::Protocol::Basic::Qos.new(:prefetch_count=>1, :global => true))

Aman

Ben Hood

unread,
Mar 5, 2009, 4:40:18 AM3/5/09
to ruby...@googlegroups.com, AMQP
Amos,

On 4 Mar 2009, at 19:01, amos <famos...@gmail.com> wrote:

>
> pop seems funky (rabbit crashed a couple of times)

If Rabbit crashed for some reason, it would be interesting (from a
server perspective) to be able to reproduce this, because the client
should ordinarily not be able to crash the broker. Do you have any
server logs at all? Is it possible to provide a stripped down version
of your program that can reproduce a crash?

Ben

amos

unread,
Mar 5, 2009, 11:27:14 AM3/5/09
to AMQP
it happened 2x when the client machines were under heavy load (2x
number of cores). i'm speaking with alexis richardson about analyzing
the log files. i'll let you know what we discover. simple client code
would be something like:

queue = mq.queue(queue_name, :durable => true)
queue.pop(:ack => true) do |info, task|
if task
task = Marshal.load(task)
process(task)
info.ack
end
EM.add_timer(0.001){ queue.pop }
end

On Mar 5, 1:40 am, Ben Hood <0x6e6...@gmail.com> wrote:
> Amos,
>
Reply all
Reply to author
Forward
0 new messages