ACKs not delivered immediately

147 views
Skip to first unread message

Larry Edelstein

unread,
Nov 8, 2012, 8:49:29 PM11/8/12
to ruby...@googlegroups.com
Why aren't my ACKs being sent to the server as soon as my code performs them? They're sitting there until I process enough messages or unsub from the queue. I don't think I'm doing anything unusual, but I'm new to RabbitMQ, so I can't say for sure.

I'm using amqp 0.9.8, Ruby 1.9.3, rabbitmq-server 2.8.7, and Ubuntu 12.04 LTS, on EC2. However, I get the same behavior on Mac OS 10.7.4. 

I've got a consumer that subscribes to a queue with :ack => true. It send acks for each message:

        queue.subscribe(:ack => true) do | metadata, payload |
          sku = payload
          # do some work, then
          metadata.ack
        end

...but after debugging with Wireshark, it's evident that the acks aren't getting sent to the server until it makes a subsequent fetch from the server. That is: if the consumer executes

channel.prefetch(5)

and then process five messages in the above loop, no acks will be sent. If it then processes a sixth message, those five acks will then be sent to the server. (Actually, less frequently it appears to send the acks after the fifth message is processed.) Also, if I unsubscribe from the queue at any point, the acks are sent.

Please advise me as to whether I've found a bug (seems unlikely) or made some other error. I'll take a look at the Ruby code in time but I could use some advice now.

Below the cut is code for two scripts that should allow one to reproduce the problem. Steps:

- Run a broker on your localhost, listening on the default port of 5672.
- Run the producer.rb script, which queues up 5000 messages whose contents are "1", "2", "3", etc. 
- Run the consumer.rb script. Verify that it receives "1", "2", "3", etc., but kill it with Ctrl-C before it prints "5".
- Run the consumer again. You'll see the first message it receives is "1", followed by "2", "3", etc. This time, let it run to "6" before you Ctrl-C it.
- Run the consumer one more time. You'll see the first message is now "6".

Thanks,

Larry Edelstein
Now Answers


___________________

producer.rb:

# encoding: utf-8                                                                                                                                            
require "rubygems"
require 'amqp'
require 'aws-sdk'

QUEUE_NAME = "test.ack-testing"
BROKER_HOST = "localhost"
BROKER_URL = "amqp://#{BROKER_HOST}:5672"
begin

  AMQP.start ( BROKER_URL ) do | connection |

    channel  = AMQP::Channel.new(connection)
    queue    = channel.queue(QUEUE_NAME, :durable => true)

    (1..5000).each do | x |
      channel.default_exchange.publish x, :routing_key => QUEUE_NAME, :persistent => true
    end

    EM.add_timer(0.5) do
      connection.close do
        EM.stop { exit }
      end
    end


  end

end



consumer.rb:
# encoding: utf-8                                                                                                                                            
require "rubygems"
require 'amqp'
require 'aws-sdk'

QUEUE_NAME = "test.ack-testing"
BROKER_HOST= "localhost"
BROKER_URL = "amqp://#{BROKER_HOST}:5672"
N_PREFETCH = 5

begin

  AMQP.start ( BROKER_URL ) do | connection |

    channel  = AMQP::Channel.new(connection)
    channel.prefetch(N_PREFETCH)
    queue = channel.queue(QUEUE_NAME, :durable => true)

    i = 0
    queue.subscribe(:ack => true) do | metadata, payload |
      puts "Received payload: #{payload}"
      metadata.ack
      sleep 1
    end


  end

end



 

Michael Klishin

unread,
Nov 8, 2012, 8:53:13 PM11/8/12
to ruby...@googlegroups.com
2012/11/9 Larry Edelstein <ladle...@gmail.com>

I've got a consumer that subscribes to a queue with :ack => true. It send acks for each message:

        queue.subscribe(:ack => true) do | metadata, payload |
          sku = payload
          # do some work, then
          metadata.ack
        end

...but after debugging with Wireshark, it's evident that the acks aren't getting sent to the server


Something is blocking the event loop. Queue#subscribe blocks run in the same thread as EventMachine event loop
so if that thread is blocked waiting for something, data cannot be sent or received. This is a well known limitation of
[completely or predominantly] single-threaded event-loop based libraries.
--
MK

http://github.com/michaelklishin
http://twitter.com/michaelklishin

Michael Klishin

unread,
Nov 8, 2012, 8:58:03 PM11/8/12
to ruby...@googlegroups.com
You can try to wrap your code in EventMachine.defer do … end to validate
that hypothesis. This will run your code in a thread pool EventMachine maintains,
not on the event loop thread. However, this also means your messages will be processed
concurrently so if you need them to be processed in strict order, I will have to put together
an example that shows how to do it.

MK

Richard Heycock

unread,
Nov 8, 2012, 10:06:58 PM11/8/12
to ruby...@googlegroups.com
Quoting Larry Edelstein (2012-11-09 12:49:29)
As Michael said something is blocking the reacter:


<====
sleep 1
====>

Remove the sleep.

rgh


> end
>
>
> end
>
> end
>
>
>
>
>
>
> --
> Documentation guides: http://bit.ly/rubyamqp
> Code examples: http://bit.ly/amq-gem-examples
> API reference: http://bit.ly/mDm1JE
>
> Drop by #rabbitmq on irc.freenode.net
> Bug tracker: https://github.com/ruby-amqp/amqp/issues
>
> Post to the group: ruby...@googlegroups.com | unsubscribe:
> ruby-amqp+...@googlegroups.com

Richard Heycock
Technical Director
DIGIVIZER Pty Ltd
Suite 104
37-39 The Corso
MANLY NSW 2095

m: 0410 646 369
w: www.digivizer.com
e: richard...@digivizer.com

DIGIVIZER - delivering you the digital footprint of the people you know and
the people you really should know.

Larry Edelstein

unread,
Nov 20, 2012, 8:11:38 PM11/20/12
to ruby...@googlegroups.com
I put this issue down because I had a workaround - setting prefetch to 1 - but am picking it up now because I'd like to be able to have a prefetch window, and also because it still puzzles me.

I've tried what I think you're suggesting. First, I tried wrapping the queue.subscribe call and block in EventMachine.defer. Then I tried just wrapping the metadata.ack call in EventMachine.defer. In both cases I got the same result as before. The call to Kernel.sleep makes no difference; if I remove it, things happen a lot faster, but acks still don't get delivered until I exceed the prefetch window.

And so still I'm puzzled. What could be more simple than what I'm doing? I'm subscribing to a channel, receiving messages, and ack'ing them. EM is delivering me messages the whole time. Doesn't that indicate that the event loop isn't blocked?

-Larry

Larry Edelstein

unread,
Nov 20, 2012, 9:31:44 PM11/20/12
to ruby...@googlegroups.com
My implicit question here: why should this "prefetch" value set the batch size of outgoing acks? I would expect it to set the batch size of incoming messages, and it does; but why does it affect outgoing acks??

Michael Klishin

unread,
Nov 21, 2012, 12:15:53 AM11/21/12
to ruby...@googlegroups.com


2012/11/21 Larry Edelstein <ladle...@gmail.com>

My implicit question here: why should this "prefetch" value set the batch size of outgoing acks? I would expect it to set the batch size of incoming messages, and it does; but why does it affect outgoing acks??

It does not affect acks. It is driven by acks. Acks control when it is OK for RabbitMQ to remove messages
from the queue.

Michael Klishin

unread,
Nov 21, 2012, 12:23:04 AM11/21/12
to ruby...@googlegroups.com


2012/11/21 Larry Edelstein <ladle...@gmail.com>

EM is delivering me messages the whole time. Doesn't that indicate that the event loop isn't blocked?

No it does not.

Here is roughly how event loops work (in EventMachine, Node.js and so on):

while true
  send some data if there's something to send
  data = read some data if it is available

  run "next tick" callbacks
  run receive_data callbacks
end

The block you are passing to AMQP::Queue#subscribe is executed on the 2nd step. However, it is executed on the same thread
so if something takes time there, *it will block all network activity*. So, the event loop was not blocked by the time you got the
data but it is perfectly possible for your own code to block it.

EventMachine.defer executes the block you pass to it in a thread pool (20 threads by default) that EventMachine maintains.
All that happens in that thread pool, however, still depends on the event loop for delivery of acks and any other data.
This is a well known limitation of event loops.

If you want to never deal with this, use Hot Bunnies (JRuby only), all network activity there happens in a thread
that works a lot like what I've demonstrated before but your code is never executed on it.

Michael Klishin

unread,
Nov 21, 2012, 12:24:01 AM11/21/12
to ruby...@googlegroups.com
2012/11/21 Michael Klishin <michael....@gmail.com>

The block you are passing to AMQP::Queue#subscribe is executed on the 2nd step


This should read "in the 4th".

Larry Edelstein

unread,
Nov 21, 2012, 4:35:35 AM11/21/12
to ruby...@googlegroups.com
Michael, surely you must be right, but I'm still at sea trying to figure out what to do differently. You saw my code. When I receive a message, I ack it. That's all. That's not a long-running process. It's not blocking the thread any longer than it takes to send the ack.

Also, why does setting prefetch to 1 make my acks work? I haven't altered my event loop at all.

As I said, I put the ack in a defer block, and the results were the same.

I note that some of the examples in the "patterns and use cases" section at rubyamqp.info include a call to channel.prefetch(1). So it seems to be important.

Please continue to try to enlighten me...

-Larry

Sent from my iPad

Michael Klishin

unread,
Nov 21, 2012, 5:07:52 AM11/21/12
to ruby...@googlegroups.com


2012/11/21 Larry Edelstein <ladle...@gmail.com>

When I receive a message, I ack it. That's all. That's not a long-running process. It's not blocking the thread any longer than it takes to send the ack.

Would it be possible for you to record what happens on the network when you run your code (or a couple of scripts that
reproduce your case)?

You can either use Wireshark GUI or run

tshark -i 5 -n -q -O "amqp"

after installing Wireshark where -i is the number of loopback interface on your machine, as displayed by tshark -D. For example, for me
the output is

1. en0
2. fw0
3. en1
4. p2p0
5. lo0

so I use the number 5. If you run OS X, wireshark w/o GUI can be installed via Homebrew. On Linux, it should
be available via your package manager. For Windows, there are downloads at http://www.wireshark.org/download.html.

Having a recorded wireshark session will greatly help.

Thanks.

Larry Edelstein

unread,
Nov 21, 2012, 12:23:17 PM11/21/12
to ruby...@googlegroups.com
I had done just as you suggest, but forgot to mention it here, as I've been posting this question in a couple places. The gist of it was that the acks aren't sent over the interface until I've received n messages, where n is the prefetch value. I could run the trace again if more detail would be edifying.

Sent from my iPad

Michael Klishin

unread,
Nov 21, 2012, 12:34:40 PM11/21/12
to ruby...@googlegroups.com


2012/11/21 Larry Edelstein <ladle...@gmail.com>

I could run the trace again if more detail would be edifying.

I can do that on my end if you can provide me a couple of sample scripts (sorry if I haven't noticed
them before). The reason for this is that the only definitive source of information about what has been
sent is a tool like Wireshark. There are many layers below amqp gem that may be
delaying delivery.

Given example scripts, I will be happy to dive into the issue.

Larry Edelstein

unread,
Nov 21, 2012, 1:19:55 PM11/21/12
to ruby...@googlegroups.com
Thanks, Michael. All the information, including the scripts, is in my first message.

Sent from my iPad

Larry Edelstein

unread,
Nov 23, 2012, 6:36:04 PM11/23/12
to ruby...@googlegroups.com
Just in case, here are the scripts, again. To use them:

- Run a broker on your localhost, listening on the default port of 5672.
- Run the producer.rb script, which queues up 5000 messages whose contents are "1", "2", "3", etc. 
- Run the consumer.rb script. Verify that it receives "1", "2", "3", etc., but kill it with Ctrl-C before it prints "5".
- Run the consumer again. You'll see the first message it receives is "1", followed by "2", "3", etc. This time, let it run to "6" before you Ctrl-C it.
- Run the consumer one more time. You'll see the first message is now "6".

And now, the scripts...
    queue.subscribe(:ack => true) do | metadata, payload |

Larry Edelstein

unread,
Nov 30, 2012, 6:09:57 PM11/30/12
to ruby...@googlegroups.com
I'm still hoping for some clarity. The question: why are outgoing acks not send until the prefetch window has been exceeded?

To reproduce the issue, you can follow the procedure below, using the scripts pasted there.

-larry

Michael Klishin

unread,
Dec 3, 2012, 4:11:53 AM12/3/12
to ruby...@googlegroups.com
Apologies for not helping to investigate this so far.

I am a little swamped IRL and work on Bunny 0.9.
WIll try to reproduce this later today.
Reply all
Reply to author
Forward
0 new messages