Why aren't my ACKs being sent to the server as soon as my code performs them? They're sitting there until I process enough messages or unsub from the queue. I don't think I'm doing anything unusual, but I'm new to RabbitMQ, so I can't say for sure.
I'm using amqp 0.9.8, Ruby 1.9.3, rabbitmq-server 2.8.7, and Ubuntu 12.04 LTS, on EC2. However, I get the same behavior on Mac OS 10.7.4.
I've got a consumer that subscribes to a queue with :ack => true. It send acks for each message:
...but after debugging with Wireshark, it's evident that the acks aren't getting sent to the server until it makes a subsequent fetch from the server. That is: if the consumer executes
channel.prefetch(5)
and then process five messages in the above loop, no acks will be sent. If it then processes a sixth message, those five acks will then be sent to the server. (Actually, less frequently it appears to send the acks after the fifth message is processed.) Also, if I unsubscribe from the queue at any point, the acks are sent.
Please advise me as to whether I've found a bug (seems unlikely) or made some other error. I'll take a look at the Ruby code in time but I could use some advice now.
Below the cut is code for two scripts that should allow one to reproduce the problem. Steps:
- Run a broker on your localhost, listening on the default port of 5672.
- Run the producer.rb script, which queues up 5000 messages whose contents are "1", "2", "3", etc.
- Run the consumer.rb script. Verify that it receives "1", "2", "3", etc., but kill it with Ctrl-C before it prints "5".
- Run the consumer again. You'll see the first message it receives is "1", followed by "2", "3", etc. This time, let it run to "6" before you Ctrl-C it.
- Run the consumer one more time. You'll see the first message is now "6".
Thanks,
Larry Edelstein
Now Answers
___________________
producer.rb:
# encoding: utf-8
require "rubygems"
require 'amqp'
require 'aws-sdk'
QUEUE_NAME = "test.ack-testing"
BROKER_HOST = "localhost"
BROKER_URL = "amqp://#{BROKER_HOST}:5672"
begin
AMQP.start ( BROKER_URL ) do | connection |
channel = AMQP::Channel.new(connection)
queue = channel.queue(QUEUE_NAME, :durable => true)
(1..5000).each do | x |
channel.default_exchange.publish x, :routing_key => QUEUE_NAME, :persistent => true
end
EM.add_timer(0.5) do
connection.close do
EM.stop { exit }
end
end
end
end
consumer.rb:
# encoding: utf-8
require "rubygems"
require 'amqp'
require 'aws-sdk'
QUEUE_NAME = "test.ack-testing"
BROKER_HOST= "localhost"
BROKER_URL = "amqp://#{BROKER_HOST}:5672"
N_PREFETCH = 5
begin
AMQP.start ( BROKER_URL ) do | connection |
channel = AMQP::Channel.new(connection)
channel.prefetch(N_PREFETCH)
queue = channel.queue(QUEUE_NAME, :durable => true)
i = 0
queue.subscribe(:ack => true) do | metadata, payload |
puts "Received payload: #{payload}"
metadata.ack
sleep 1
end
end
end