Andre Sprenger
unread,Dec 4, 2009, 5:31:38 PM12/4/09Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to AMQP
Hi all,
I'm trying to run a queue subscriber with message acknowledgment and
error recovery. The
setup is rabbitmq 1.7 and the amqp gem 0.6.5.
The subscriber simulates an error every 100 messages by not sending an
acknowledgment.
There is a periodic timer that sends a recovery to the broker every 5
seconds. This is
the code:
AMQP.start do
MQ.prefetch(1)
queue = MQ.queue('job.queue')
queue.bind(MQ.fanout('job.exchange'))
EM.add_periodic_timer(5) {
MQ.default.callback {
puts "#{Time.new} RECOVER " + '*' * 30
MQ.default.send AMQP::Protocol::Basic::Recover.new({ :requeue =>
false })
}
}
queue.subscribe(:ack => true) do |header, msg|
@counter += 1
id = Marshal.load(msg)
if (@counter % 100 != 0)
header.ack
end
puts "#{Time.new} received #{id}"
end
end
The error recovery works basically, the unacknowledged messages are
redelivered to the subscriber.
But I'm facing two problems here:
1. The subscriber does not receive messages continuously, instead it
receives 100 messages
and then waits until the timer sends the recover (after ~ 5s). After
that it gets another 100 messages and so
it goes on. Is this the correct behavior or am I missing something? I
had expected the subscriber
to receive messages continuously.
2. When I set the recover :requeue parameter to true the subscriber
receives only the first 100 messages
and after that no more messages at all. This looks strange to me.
Any help is greatly appreciated!
Cheers
Andre