Consumer Cancellation Notification support

99 views
Skip to first unread message

Simon Lundström

unread,
Oct 1, 2012, 11:03:07 AM10/1/12
to ruby...@googlegroups.com
Hello!

I'm trying to get consumer cancellation notification to work with Ruby
AMQP 0.9.7 and RabbitMQ 2.8.7 using HA-queues.

I thought I had found an RabbitMQ bug but the developers thinks that it
is my consumer or Ruby AMQP which isn't handling basic.cancel correctly
(see
<http://thread.gmane.org/gmane.comp.networking.rabbitmq.general/18679>).
Obviously it is my consumer but I can't find how to deal with it. The
closest I've gotten is
<https://groups.google.com/d/topic/ruby-amqp/TA4n5Eq4IgU>, I checked out
the 0.9.x-stable branch (since the links won't work) to read the code
that Michael suggests but I thought CCN was supported since 0.9.6 (see
<https://groups.google.com/d/topic/ruby-amqp/KKxv4DFVJVk>)?

Is the support rudimentary? Do I still have to do all the things Michael
suggests in the thread above?

Thanks!
- Simon

---

Simon Lundstr�m
IT Services
Stockholm University

Michael Klishin

unread,
Oct 2, 2012, 6:02:47 AM10/2/12
to ruby...@googlegroups.com


2012/10/1 Simon Lundström <si...@su.se>

Is the support rudimentary? Do I still have to do all the things Michael
suggests in the thread above?

It was available only in master since May, cherry-picked to 0.9.x-stable now.
--
MK


Simon Lundström

unread,
Oct 2, 2012, 9:59:30 AM10/2/12
to ruby...@googlegroups.com
On Tue, 2012-10-02 at 10:02:47 +0000, Michael Klishin wrote:
> 2012/10/1 Simon Lundstr�m <si...@su.se>
>
> > Is the support rudimentary? Do I still have to do all the things Michael
> > suggests in the thread above?
> >
>
> It was available only in master since May, cherry-picked to 0.9.x-stable

I pulled amq-client, checked out 0.9.x-stable, build the gem, installed
it (and uninstalled the old gem).

If I understand the code correctly it implements all the steps that you
wrote in the original mail to the mailinglist.

I:
# Added: require "amqp/extensions/rabbitmq"
# Inside my `queue.subscribe(:ack => true) do |header, body|` block I added:
queue.on_cancel do |a|
puts a.inspect
end

and I get:
#<AMQ::Protocol::Basic::Cancel:0x10bc4f228 @nowait=true, @consumer_tag="simlu.queue-1349184117000-260638395643">
when ram01 disconnects.

The correct procedure here is to reconnect apparently, but I have not found how
to do this in the queue context (if that makes any sense). The only place to
schedule a reconnect I have found is on an connection error.

My code: http://pastie.org/private/sealckbcknnn2zvrzzihg

Thanks!
- Simon

P.S.
I don't think the `puts "Updating client properties"` is supposed to be in lib/amq/client/extensions/rabbitmq/cancel.rb
Too small to make a pull request (or I'm too lazy ; )

Michael Klishin

unread,
Oct 2, 2012, 10:27:49 AM10/2/12
to ruby...@googlegroups.com
2012/10/2 Simon Lundström <si...@su.se>
The correct procedure here is to reconnect apparently,

Not necessarily, it always depends on a particular system.
 
but I have not found how
to do this in the queue context (if that makes any sense). The only place to
schedule a reconnect I have found is on an connection error.

In this specific example, you can access the queue object in the block, which has a reference
to its channel and that has the connection ref. But there is no hard and fast rule about how to recover in
cases like this.

Also, usually there is no need to build the gem manually, just use Bundler with a git dependency on the 0.9.x-stable branch.
--
MK

Simon Lundström

unread,
Oct 2, 2012, 10:40:39 AM10/2/12
to ruby...@googlegroups.com
On Tue, 2012-10-02 at 18:27:49 +0400, Michael Klishin wrote:
> 2012/10/2 Simon Lundstr�m <si...@su.se>
>
> > The correct procedure here is to reconnect apparently,
> >
>
> Not necessarily, it always depends on a particular system.
>
>
> > but I have not found how
> > to do this in the queue context (if that makes any sense). The only place
> > to
> > schedule a reconnect I have found is on an connection error.
> >
>
> In this specific example, you can access the queue object in the block,
> which has a reference
> to its channel and that has the connection ref. But there is no hard and
> fast rule about how to recover in
> cases like this.

Like this?:
queue.subscribe(:ack => true) do |header, body|
queue.on_cancel do |a|
queue.channel.connection.reconnect(true, 2)
end
puts " [x] Received #{body}"
header.ack
end

Nothing happends = /

Thanks,
- Simon

Michael Klishin

unread,
Oct 3, 2012, 1:20:37 PM10/3/12
to ruby...@googlegroups.com


2012/10/2 Simon Lundström <si...@su.se>

    queue.subscribe(:ack => true) do |header, body|
      queue.on_cancel do |a|
        queue.channel.connection.reconnect(true, 2)
      end
      puts " [x] Received #{body}"
      header.ack
    end

Nothing happends = /

I don't know what "should happen" but consumer cancellation notifications are delivered when a queue is either removed or the consumer is forcefully
cancelled via the management UI. In both cases, reconnecting is not what you need. What you need is probably to use a different queue
and/or register another consumer (in the case of forced cancellation, this may lead to apps constantly adding consumers operations staff
removes, keep this in mind).

To take care of such state transitions, use a separate class similarly to what is demonstrated here:

Simon Lundström

unread,
Oct 9, 2012, 1:55:44 AM10/9/12
to ruby...@googlegroups.com
Sorry for the late response, it's been a busy week.

On Wed, 2012-10-03 at 21:20:37 +0400, Michael Klishin wrote:
> 2012/10/2 Simon Lundstr�m <si...@su.se>
>
> > queue.subscribe(:ack => true) do |header, body|
> > queue.on_cancel do |a|
> > queue.channel.connection.reconnect(true, 2)
> > end
> > puts " [x] Received #{body}"
> > header.ack
> > end
> >
> > Nothing happends = /
>
>
> I don't know what "should happen" but consumer cancellation notifications
> are delivered when a queue is either removed or the consumer is forcefully
> cancelled via the management UI.

I would expect that the consumer would reconnect.

> In both cases, reconnecting is not what you need. What you need is
> probably to use a different queue and/or register another consumer

Yes, now that I read https://www.rabbitmq.com/ha.html a few more times I
can understand and see this.

> (in the case of forced cancellation, this may lead to apps constantly
> adding consumers operations staff removes, keep this in mind).

Yes, this is another problem (but a luxury one since HA is first prio ; ).

> To take care of such state transitions, use a separate class similarly to
> what is demonstrated here:
> http://rubyamqp.info/articles/getting_started/#integration_with_objects

I have adapted
<https://github.com/ruby-amqp/amqp/blob/master/examples/error_handling/connection_level_exception_with_objects.rb>
and added even more error handling and code for CCN:
<http://pastie.org/private/cdgczwx4bedhego0p3g1w>

The code runs and starts but it won't ever reach handle_cancel when the
CCNs are sent out.

I've tried to put on_cancel in multiple ways, but never succeded
(because @default_consumer is nil).
on_cancel gets mixed into the Queue class and when on_cancel is called
it calls itself on the @default_consumer instance variable
<https://github.com/ruby-amqp/amq-client/blob/ffb5c52dc046626decfe82e77d696b35f9b64600/lib/amq/client/async/extensions/rabbitmq/cancel.rb#L40-42>

The only place where @default_consumer is set is
<https://github.com/ruby-amqp/amq-client/blob/0.9.x-stable/lib/amq/client/async/queue.rb#L264>
on the consume method. So AFAIK I either needs to set it in that block
(which I did in the code I pastied in a previous mail) or after the
consume call which I did in the code in this mail.

How can I proceed?

Thanks!
- Simon

Michael Klishin

unread,
Oct 9, 2012, 8:46:05 AM10/9/12
to ruby...@googlegroups.com


2012/10/9 Simon Lundström <si...@su.se>

I've tried to put on_cancel in multiple ways, but never succeded
(because @default_consumer is nil).
on_cancel gets mixed into the Queue class and when on_cancel is called
it calls itself on the @default_consumer instance variable

default consumer is a relic from the pre-0.8 days or so when there was (an artificial) limitation that
one AMQP::Queue instance can only have 1 consumer (added via AMQP::Queue#subscribe). That's not really how other clients work and
there may be good reasons to have multiple consumers on a shared queue in different threads even on MRI.

So sounds like we need to extend cancellation notifications to other consumers (find the consumer by
consumer tag and invoke the callback on it). I will look into this this evening.
--
MK

http://github.com/michaelklishin
http://twitter.com/michaelklishin

Simon Lundström

unread,
Oct 19, 2012, 6:26:45 AM10/19/12
to ruby...@googlegroups.com
On Tue, 2012-10-09 at 16:46:05 +0400, Michael Klishin wrote:
> 2012/10/9 Simon Lundstr�m <si...@su.se>
>
> > I've tried to put on_cancel in multiple ways, but never succeded
> > (because @default_consumer is nil).
> > on_cancel gets mixed into the Queue class and when on_cancel is called
> > it calls itself on the @default_consumer instance variable
> >
>
> default consumer is a relic from the pre-0.8 days or so when there was (an
> artificial) limitation that
> one AMQP::Queue instance can only have 1 consumer (added via
> AMQP::Queue#subscribe). That's not really how other clients work and
> there may be good reasons to have multiple consumers on a shared queue in
> different threads even on MRI.
>
> So sounds like we need to extend cancellation notifications to other
> consumers (find the consumer by
> consumer tag and invoke the callback on it). I will look into this this
> evening.

Just for future reference and to close this thread:

I talked to Michael on IRC and CCN works since 0.9.5.

I was writing my consumer wrong and Michael kindly added an integration
test[1] which can be used as inspiration on how to use CCN.

To auto-recovery a queue on CCN, we used this code:

queue = AMQ::Client::Queue.new(connection, channel, "example.ha.queue")
queue.declare(false, true, false, false, false, { "x-ha-policy" => "all" }) do
queue.consume {
queue.on_cancel {
queue.auto_recover
}
}
end

Note that this *only* recovers the queue when switching HA masters. This
does *not* recover the channel if you are, e.g., disconnected.
So channel recovery is needed too!

And remember, because I forgot and it took *quite* some time for me to
understand why it wouldn't work (it was a friday and I was tired = D), to:
require "amq/client/extensions/rabbitmq"

Otherwise AMQP never will announce to RabbitMQ that it supports CCN so
RabbitMQ will never send any basic.cancel's!

Have a nice weekend!
- Simon

1) <https://github.com/ruby-amqp/amq-client/blob/master/spec/integration/eventmachine/consumer_cancellation_notification_spec.rb>

Michael Klishin

unread,
Oct 19, 2012, 8:35:27 AM10/19/12
to ruby...@googlegroups.com
2012/10/19 Simon Lundström <si...@su.se>

And remember, because I forgot and it took *quite* some time for me to
understand why it wouldn't work (it was a friday and I was tired = D), to:
require "amq/client/extensions/rabbitmq"

Otherwise AMQP never will announce to RabbitMQ that it supports CCN so
RabbitMQ will never send any basic.cancel's!

amqp gem will load these extensions if it sees that broker is RabbitMQ but it's true
that client capabilities won't be announced. This is something that needs fixing.
Reply all
Reply to author
Forward
0 new messages