Error handling and recovery features for Ruby amqp gem 0.8.0.RC14

428 views
Skip to first unread message

Michael Klishin

unread,
Jun 23, 2011, 2:47:06 PM6/23/11
to ruby...@googlegroups.com
Given number of questions/feature requests about application recovery,


I get a feeling that documentation guides
improvements that I am working on are less important than some basic
solutions and/or guidelines in place for app recovery.
So I sat down and put together a list of things I will be working on next.
I can't guarantee that they will
work great for everyone, so if something doesn't go well, it will be
taken out. Recovery is a hard enough problem
on its own to forever support/fight broken features.

Lets take a look what I think is a good list of features to have:

:recovery event
-----------------------------------------------------------------
Right now there is a number of events on connections you
can register callbacks (handlers) for:

* successful connection
* authentication failure
* TCP connection loss

and so on. Channels, exchanges and queues have limited ability
to respond to TCP connection failure event, for example. They are
automatically reset, but that's about it.

What is badly needed is a new event:

* recovery

that is fired when network connection is back up *and* AMQP connection
is reopened.


Methods to re-open/re-declare
-----------------------------------------------------------------
Next piece of the puzzle is a bunch of methods that all serve a similar
purpose:
AMQP::Channel#reopen
AMQP::Exchange#redeclare
AMQP::Queue#redeclare
Those methods will simply use existing object state/attributes to
redeclare themselves (supposedly once connection comes up).
Which leads to the Crown Jewel of recovery features...


Automagical recovery done right
-----------------------------------------------------------------
By combining recovery event, re-declaration methods and existing
"failure shutdown propagation" (when connection resets channels,
they reset exchanges and queues and so on, the RabbitMQ Java client
calls this the "shutdown protocol"), we can implement a reasonably
good "automatical recovery".
Automagical recovery will only apply to entities that you
specify as auto-recovering (using :auto_recovery => true option).
It will set up a handler for :recovery even that will call/schedule
#reopen/#redeclare, so it will be easy to implement your own
"auto recovery" with those basic tools.
Recovery is hard and I am sure automagical recovery mode will still suck
a great deal, but everyone wants to see it with their own eyes ;)
There is a number of cases when automagical recovery probably will have
some kind of default behavior. For example, server-named queues pretty
much must be re-declared upon recovery (otherwise we cannot know whether
their names are unique).


I also want to fine-grain some events a little bit. Right now we have

* X has happened

but for some events, it makes sense to split them into 2:

* before X
* after X

I haven't made my mind up about what those events are, but :recovery
is one of them.

In the best traditions of Hammock-driven development [1] I will sleep on
this idea for at least one day. But other than that, I think this "opt-in" behavior
plus ability to implement your own recovery strategy using the same
basic tools as the automagical behavior are worth having and won't cause
people grief (if you don't like those, don't use them, they are not shoveled
down developers throats).

Let me know if you have better ideas. Or just think this particular idea
sucks.

1. http://blip.tv/clojure/hammock-driven-development-4475586


MK

http://github.com/michaelklishin
http://twitter.com/michaelklishin

Vlad Tepes

unread,
Jun 23, 2011, 5:20:26 PM6/23/11
to ruby...@googlegroups.com
Sounds possibly great? I just had the idea of some sort of callback...something like :on_connection_established, maybe, & we could put startup code for channels, queues, exchanges, binding & whatever in that, so it'd be called regardless of whether it was a connection or reconnection. In my case, anyway, I don't see any reason for the code to be different between the scenarios.

I've already had to convert some of my consumers that get assigned moderately long-running tasks to a polling mechanism because of the limitations of heartbeats, which makes it easier to get around the recovery problems (because I can periodically redeclare everything anyway) (but this means I have to "ack" each message right away so RabbitMQ doesn't think it's lost, & so my app has to periodically re-publish messages after a timeout interval, which introduces a previously-unnecessary delay when something goes wrong)...but it'd be nice to have some long-running workers that can reliably respond more quickly to published messages. Of course, the problem is pseudo-solved in my case by patching the gem to raise an error & thus shut down, anyway, but with a lot of workers & email notification when they get restarted I'm thinking something else would be nice.

-David


Michael Klishin

unread,
Jun 23, 2011, 5:31:33 PM6/23/11
to ruby...@googlegroups.com
Vlad Tepes escribió:

> Sounds possibly great? I just had the idea of some sort of callback...something like :on_connection_established, maybe, & we could put startup code for channels, queues, exchanges, binding & whatever in that, so it'd be called regardless of whether it was a connection or reconnection. In my case, anyway, I don't see any reason for the code to be different between the scenarios.
We already have such event. But I have seen cases when it is a good idea to tell the difference between recovery and initial setup. OK, I hope "possibly great" has positive connotation :)

> I've already had to convert some of my consumers that get assigned moderately long-running tasks to a polling mechanism because of the limitations of heartbeats, which makes it easier to get around the recovery problems (because I can periodically redeclare everything anyway) (but this means I have to "ack" each message right away so RabbitMQ doesn't think it's lost, & so my app has to periodically re-publish messages after a timeout interval, which introduces a previously-unnecessary delay when something goes wrong)...but it'd be nice to have some long-running workers that can reliably respond more quickly to published messages. Of course, the problem is pseudo-solved in my case by patching the gem to raise an error & thus shut down, anyway, but with a lot of workers & email notification when they get restarted I'm thinking something else would be nice.

Eek. So, specifically, what do you want with respect to heartbeats handling? because this is all part of reconnection/recovery, at least in my eyes. Lets make heartbeat handling more useful while we are at it.

Michael Klishin

unread,
Jun 24, 2011, 1:00:10 AM6/24/11
to ruby...@googlegroups.com
2011/6/23 Michael Klishin <michael....@gmail.com>

Automagical recovery done right
-----------------------------------------------------------------
By combining recovery event, re-declaration methods and existing
"failure shutdown propagation" (when connection resets channels,
they reset exchanges and queues and so on, the RabbitMQ Java client
calls this the "shutdown protocol"), we can implement a reasonably
good "automatical recovery".
Automagical recovery will only apply to entities that you
specify as auto-recovering (using :auto_recovery => true option).
It will set up a handler for :recovery even that will call/schedule
#reopen/#redeclare, so it will be easy to implement your own
"auto recovery" with those basic tools.

I have a "70% there" prototype running. Looks good so far, and code complexity didn't
really go up much. This is encouraging.
--

Paul Strong

unread,
Jun 24, 2011, 1:12:26 PM6/24/11
to Ruby AMQP ecosystem
Sounds fantastic, especially if there's a way to customize it as you
mentioned.

I've been able to hack up a temporary recovery solution for our
project, but I consistently lose a couple of messages when the
connection is closed. There's a small period in between the connection
loss and its detection in which any messages that are published are
lost forever. Once the connection/channel is marked as closed then the
messages start to pile up on the `@channel_is_open_deferrable` queue.
On reconnect I'm able to call succeed on the queue to get it to finish
sending out the messages, but I still have no way of recovering the
messages sent out before that. I also had to monkey patch your gem to
prevent the deferrable queue from getting cleared on connection loss
and losing even more messages.

I think in my case the only way around this is to keep a copy of each
message I send out until I get an ack for each of those messages. On
reconnect if some of my messages never were ack`ed then I'd resend
them, preferably before any newer messages to keep the proper order.
On reconnect, I need a way to send out a couple of messages(the "lost"
messages) immediately before any other pending messages on the queue.
Of course if the gem also had an option to do this automagically, then
it would be even more awesome. ;o)

What are your plans regarding re/delivery of lost/pending messages
after a recovery? As you mentioned previously, currently everything is
just reset and all messages are lost. I realize most apps care more
about speed than messages getting lost, but given our requirements
ideally we would not want lose a single message under any
circumstances except for catastrophic failure.

Thanks for working on this.

On Jun 24, 12:00 am, Michael Klishin <michael.s.klis...@gmail.com>
wrote:
> 2011/6/23 Michael Klishin <michael.s.klis...@gmail.com>

Michael Klishin

unread,
Jun 24, 2011, 1:27:43 PM6/24/11
to ruby...@googlegroups.com
Paul Strong escribió:

> Sounds fantastic, especially if there's a way to customize it as you
> mentioned.
After working on implementation of those features for a few hours I learned that it is a good idea to
make this behavior per channel. It is per-connection in my amq-client for right now, but per channel
seem optimal to me.
>
> I've been able to hack up a temporary recovery solution for our
> project, but I consistently lose a couple of messages when the
> connection is closed. There's a small period in between the connection
> loss and its detection in which any messages that are published are
> lost forever. Once the connection/channel is marked as closed then the
> messages start to pile up on the `@channel_is_open_deferrable` queue.
> On reconnect I'm able to call succeed on the queue to get it to finish
> sending out the messages, but I still have no way of recovering the
> messages sent out before that. I also had to monkey patch your gem to
> prevent the deferrable queue from getting cleared on connection loss
> and losing even more messages.
>
This is also something I have seen. I think every AMQP client struggles with this because
network failure detection takes time and if your app publishes messages constantly, at least one
of them is very likely to be lost.

The only solution I see is to add some kind of buffering and "connectivity state" to exchanges. RabbitMQ Java client
has "connectivity state" for queues and exchanges, AFAIR, I need to read the source to learn more.

The problem is that it will affect message throughput. I can't say I have a solution for this in mind right now.

One more problem is that if your network goes down for hours (not completely unheard of, really), number of accumulated
messages may simply top process memory allowance so it will be killed by OOM killer or a monitoring tool like Monit or Nagios.

So maybe we should provide a way for apps to plug into network connection loss handling and stop publishing. I don't believe
amqp gem can really provide a good enough solution here, at least not with the default AMQP::Exchange implementation.

Maybe having a module that overrides Exchange#publish to support buffering is a good idea.

> I think in my case the only way around this is to keep a copy of each
> message I send out until I get an ack for each of those messages. On
> reconnect if some of my messages never were ack`ed then I'd resend
> them, preferably before any newer messages to keep the proper order.
> On reconnect, I need a way to send out a couple of messages(the "lost"
> messages) immediately before any other pending messages on the queue.
> Of course if the gem also had an option to do this automagically, then
> it would be even more awesome. ;o)
>

Like I said, maybe with another opt-in option. I am still working on the consumer side of the problem.

> What are your plans regarding re/delivery of lost/pending messages
> after a recovery? As you mentioned previously, currently everything is
> just reset and all messages are lost. I realize most apps care more
> about speed than messages getting lost, but given our requirements
> ideally we would not want lose a single message under any
> circumstances except for catastrophic failure.

To support all this we need to put several more pieces of the puzzle together:

* Everything described in the original post
* Broker detection + automatic loading of RabbitMQ extensions if broker is RabbitMQ (easy to do)
* Some kind of Exchange extension mechanism that will override #publish and add buffering/will keep track of publisher confirms.

Then I can see how Exchange instances can keep track of what was delivered automagically.

David

unread,
Jun 24, 2011, 5:36:55 PM6/24/11
to Ruby AMQP ecosystem

> > I've already had to convert some of my consumers that get assigned moderately long-running tasks to a polling mechanism because of the limitations of heartbeats, which makes it easier to get around the recovery problems (because I can periodically redeclare everything anyway) (but this means I have to "ack" each message right away so RabbitMQ doesn't think it's lost, & so my app has to periodically re-publish messages after a timeout interval, which introduces a previously-unnecessary delay when something goes wrong)...but it'd be nice to have some long-running workers that can reliably respond more quickly to published messages. Of course, the problem is pseudo-solved in my case by patching the gem to raise an error & thus shut down, anyway, but with a lot of workers & email notification when they get restarted I'm thinking something else would be nice.
>
> Eek. So, specifically, what do you want with respect to heartbeats handling? because this is all part of reconnection/recovery, at least in my eyes. Lets make heartbeat handling more useful while we are at it.
>

Well, the only way to make heartbeats work better may be impossible.
My issue is that if I am using heartbeats, and I require an "ack" for
each message, the heartbeats do not appear to work while I'm
processing a message. From the discussion in https://github.com/ruby-amqp/amqp/issues/43
I gathered I'm not the only person to notice & that the problem
probably lies within RabbitMQ rather than the gem itself. I
set :heartbeat => 120, which gives me four minutes, but some of my
consumers have a 15-minute timeout in which to complete tasks (mostly
uploading/downloading potentially large files). So my only solution
for them was to set up polling & "ack" immediately upon message
receipt. It's okay as is, though the app would be somewhat more
responsive if I didn't have to go that route.

So what do I want? I want heartbeats to be handled in a way that is
not affected by delay between message delivery & "ack". At one point
with the 0.6.7 gem I had this set up by using a socks proxy for all my
consumers, and I had a separate process sending meaningless stuff over
the SSH tunnel to keep it alive, but I ran into other issues (chiefly
that I was using more RAM on virtual machines for this than I wanted
to, for app-specific reasons that don't matter here) & eventually
switched to the 0.7.1 gem after accidentally finding out that
heartbeats were supported.

Other than that, I'm looking forward to what you're already working
on.

Thanks again-

David

Michael Klishin

unread,
Jun 26, 2011, 7:30:45 PM6/26/11
to Ruby AMQP ecosystem
On 24 jun, 09:00, Michael Klishin <michael.s.klis...@gmail.com> wrote:

> I have a "70% there" prototype running. Looks good so far, and code
> complexity didn't
> really go up much. This is encouraging.

OK, so the prototype is in ruby-amqp/amqp and ruby-amqp/amq-client
master branches, so everyone who is interested can either
grab them using Bundler or just clone and build both gems. I did not
release any amq-client updates because
both libraries need to be released at the same time.

I briefly documented what we have in the "Error Handling and Recovery"
guide:
http://rubydoc.info/github/ruby-amqp/amqp/master/file/docs/ErrorHandling.textile

To make opt-in automatic recovery possible, I had to finally introduce
AMQ::Consumer and also solve
"one consumer per queue" limitation amqp gem had for almost 3 years
now.

Read the docs, check out a couple of examples and possibly even code.
RC14 won't be released until we all give it at least
a good week of testing. I personally think this is a better solution
than any other I have imagined. But it adds certain implementation
complexity and some of my original ideas did not work out, so I had to
come up with some new ones. Lets see how well it works
in the real world.

MK

Paul Dlug

unread,
Jun 27, 2011, 3:14:26 PM6/27/11
to ruby...@googlegroups.com
Hi Michael,

Thanks for adding the auto recovery features, exactly what I've been looking for. I've found a few problems in my testing. If you shut the broker down gracefully it triggers the connection.on_error callback with the the message:

[network failure] Trying to reconnect...
error_handling/automatic_recovery_of_channel_and_queues.rb:16:in `block (2 levels) in <main>': CONNECTION_FORCED - broker forced connection closure with reason 'shutdown' (RuntimeError)

Removing that error callback allows the connection loss to be properly detected and recover, either connection forced should be treated as a connection loss error or the on_error should detect this exception and not bail out.

The other issue I hit is that it does not appear to recover the callbacks defined on queue subscriptions. After the broker comes back up the bindings are there but the code in the subscribe block never gets called. Is this supposed to be handled by the automatic recovery or is an additional step required to re-establish the callback? 


--Paul

Michael Klishin

unread,
Jun 27, 2011, 7:17:04 PM6/27/11
to ruby...@googlegroups.com
2011/6/27 Paul Dlug <paul...@gmail.com>
Removing that error callback allows the connection loss to be properly detected and recover, either connection forced should be treated as a connection loss error or the on_error should detect this exception and not bail out.


As far as I understand, this is client's reaction to connection.close received from the broker. on_error should not "detect" this exception. The exception is raised by default on_error handler.

Automatic reconnection is *only* for network failures. connection.close is a separate issue and should be made visibile.
 
The other issue I hit is that it does not appear to recover the callbacks defined on queue subscriptions. After the broker comes back up the bindings are there but the code in the subscribe block never gets called. Is this supposed to be handled by the automatic recovery or is an additional step required to re-establish the callback? 

Consumers should be restored. Maybe there is an issue, to test deliveries there needs to be a separate example.
--

Michael Klishin

unread,
Jun 27, 2011, 7:37:02 PM6/27/11
to ruby...@googlegroups.com
2011/6/28 Michael Klishin <michael....@gmail.com>

Consumers should be restored. Maybe there is an issue, to test deliveries there needs to be a separate example.

I was able to reproduce the issue. Looking into what is going on.

Michael Klishin

unread,
Jun 27, 2011, 7:50:36 PM6/27/11
to ruby...@googlegroups.com
2011/6/28 Michael Klishin <michael....@gmail.com>

I was able to reproduce the issue. Looking into what is going on.


My example used an autodeleted queue. By changing it to use a durable one plus splitting the example into
two:

https://github.com/ruby-amqp/amqp/blob/master/examples/error_handling/automatically_recovering_hello_world_consumer.rb
https://github.com/ruby-amqp/amqp/blob/master/examples/error_handling/hello_world_producer.rb

I am able to see message received both before & after I shut down RabbitMQ. Producer still has to be restarted manually, but lets
take it one step at a time.

Vlad Tepes

unread,
Jun 27, 2011, 8:38:45 PM6/27/11
to ruby...@googlegroups.com

Interesting. Auto-deleting queues are not to be to be automatically re-established under the same name? Some of my consumers use autodeleting exclusive queues as a way to facilitate performance of the same task in multiple locations in a sort-of-simultaneous fashion, and it's handy to have these queues named after each location (makes it easy for me to see what's going on & which locations are active, at any rate).

If this is the desired behavior, it's time for some redesign for me, and perhaps a note in the documentation that relying on autodeleting exclusive queues with well-known names is not a good idea if network fault tolerance is part of the app? Or am I just weird to be doing this to begin with? {8'>

Seems to me it's much tidier not to leave unused queues & messages lying about when there's no active consumer to do anything with them (which in my case can happen for more than one reason, not all of which are bad). But I absolutely do need those consumers to be able to recover from connection loss, and the rest of this looks very useful to me, so I can redesign around this limitation if it's going to be permanent....

-David



Michael Klishin

unread,
Jun 27, 2011, 9:01:05 PM6/27/11
to ruby...@googlegroups.com


2011/6/28 Vlad Tepes <agyar...@gmail.com>

If this is the desired behavior, it's time for some redesign for me, and perhaps a note in the documentation that relying on autodeleting exclusive queues with well-known names is not a good idea if network fault tolerance is part of the app? Or am I just weird to be doing this to begin with? {8'>


No, this is not the desired behavior, it is just what I found. It is certainly fine to use server-named autodeleting exclusive queues. I am looking at what is going on.

Paul Dlug

unread,
Jul 7, 2011, 2:11:21 PM7/7/11
to ruby...@googlegroups.com
Michael,

Any update on the issues recovering autodeleting queues and callbacks?


--Paul

Michael Klishin

unread,
Jul 7, 2011, 2:40:08 PM7/7/11
to ruby...@googlegroups.com
Paul Dlug escribió:

> Any update on the issues recovering autodeleting queues and callbacks?

Paul,

Last time I checked, it worked. We are in the process of migrating travis-ci.org to amqp, this will be
an excellent ground to try automatic recovery, and one of the largest open source examples of
amqp Ruby apps in general.

If you are still having issues, let me know and I will take a look. It is a tedious thing to test, I am not sure how to
automate testing of it so I rely on other people to test it with me.

Paul Dlug

unread,
Jul 8, 2011, 1:17:48 PM7/8/11
to ruby...@googlegroups.com
On Thursday, July 7, 2011 2:40:08 PM UTC-4, Michael Klishin wrote:
Paul Dlug escribió:
> Any update on the issues recovering autodeleting queues and callbacks?

Paul,

Last time I checked, it worked. We are in the process of migrating travis-ci.org to amqp, this will be
an excellent ground to try automatic recovery, and one of the largest open source examples of
amqp Ruby apps in general.

If you are still having issues, let me know and I will take a look. It is a tedious thing to test, I am not sure how to
automate testing of it so I rely on other people to test it with me.


Here's some code to demonstrate what I'm trying:

If I run the consumer and producer they communicate, I then kill -9 the rabbitmq broker and restart it. As far as I can tell the bindings are restored but the callbacks no longer exist and I'm not sure what the best way to re-register them is. I'm using immediate and mandatory when publishing so I can confirm the consumer isn't restored by restarting the producer, it then starts getting messages returned with "NO_CONSUMERS".


--Paul

Michael Klishin

unread,
Jul 8, 2011, 1:20:20 PM7/8/11
to ruby...@googlegroups.com


2011/7/8 Paul Dlug <paul...@gmail.com>

Here's some code to demonstrate what I'm trying:


Thanks.
 
If I run the consumer and producer they communicate, I then kill -9 the rabbitmq broker and restart it. As far as I can tell the bindings are restored but the callbacks no longer exist and I'm not sure what the best way to re-register them is. I'm using immediate and mandatory when publishing so I can confirm the consumer isn't restored by restarting the producer, it then starts getting messages returned with "NO_CONSUMERS".

Consumers should be re-subscribed, too. This was primary motivation behind extracting AMQP::Consumer and switching Queue#subscribe to it. I will take a look.

--

Michael Klishin

unread,
Jul 8, 2011, 5:57:35 PM7/8/11
to ruby...@googlegroups.com
2011/7/8 Michael Klishin <michael....@gmail.com>

Consumers should be re-subscribed, too. This was primary motivation behind extracting AMQP::Consumer and switching Queue#subscribe to it. I will take a look.

So I did and there were couple of issues that are now resolved on master. Indeed the issue was with consumer re-subscription. Paul, can you check if this improves anything for you?

Michael Klishin

unread,
Jul 9, 2011, 12:09:13 PM7/9/11
to ruby...@googlegroups.com
2011/7/8 Paul Dlug <paul...@gmail.com>


If I run the consumer and producer they communicate, I then kill -9 the rabbitmq broker and restart it. As far as I can tell the bindings are restored but the callbacks no longer exist and I'm not sure what the best way to re-register them is. I'm using immediate and mandatory when publishing so I can confirm the consumer isn't restored by restarting the producer, it then starts getting messages returned with "NO_CONSUMERS".

I think after a bunch of fixes from last night and today amqp gem master no longer has these issues. Examples I tried include both explicitly named and server-named queues with 1 or 2 consumers. Publisher recovery is a harder problem, but typically publishers are also structured simpler than consumer apps. So we will see how soon I will find a good enough solution for that part of the puzzle.

In the meantime, please try what's in the master (please note: it needs an unreleased amq-client version from git, too).
--

Paul Dlug

unread,
Jul 10, 2011, 1:06:29 PM7/10/11
to ruby...@googlegroups.com


On Saturday, July 9, 2011 12:09:13 PM UTC-4, Michael Klishin wrote:
I think after a bunch of fixes from last night and today amqp gem master no longer has these issues. Examples I tried include both explicitly named and server-named queues with 1 or 2 consumers. Publisher recovery is a harder problem, but typically publishers are also structured simpler than consumer apps. So we will see how soon I will find a good enough solution for that part of the puzzle.

In the meantime, please try what's in the master (please note: it needs an unreleased amq-client version from git, too).

It's now working perfectly for me, both consumer (with named queue) and producer (with auto-delete temporary queue), thanks! Any recommendations on dealing with a graceful consumer shut down? In that case I get a "CONNECTION_FORCED" to the on_error handler for the connection. I'd like to treat this the same as a connection loss and just go into a reconnect loop. I'm not sure if there are other connection related exceptions the broker can through that should be treated the same. 


--Paul 

Michael Klishin

unread,
Jul 10, 2011, 1:22:03 PM7/10/11
to ruby...@googlegroups.com
2011/7/10 Paul Dlug <paul...@gmail.com>

Any recommendations on dealing with a graceful consumer shut down? In that case I get a "CONNECTION_FORCED" to the on_error handler for the connection. I'd like to treat this the same as a connection loss and just go into a reconnect loop. I'm not sure if there are other connection related exceptions the broker can through that should be treated the same. 

Paul,

Can you explain what exactly causes CONNECTION_FORCED to be raised? AMQP reference does not clarify when it may be raised.

Paul Dlug

unread,
Jul 11, 2011, 8:40:48 AM7/11/11
to ruby...@googlegroups.com
On Sunday, July 10, 2011 1:22:03 PM UTC-4, Michael Klishin wrote:
Can you explain what exactly causes CONNECTION_FORCED to be raised? AMQP reference does not clarify when it may be raised.

It's raised if you try a graceful shutdown of the broker: "q()." in terminal when running in foreground, "rabbitmqctl stop", etc.


--Paul

Michael Klishin

unread,
Jul 11, 2011, 9:11:11 AM7/11/11
to ruby...@googlegroups.com
2011/7/11 Paul Dlug <paul...@gmail.com>

It's raised if you try a graceful shutdown of the broker: "q()." in terminal when running in foreground, "rabbitmqctl stop", etc.

We would need to figure out a way to provide callbacks for specific exceptions and make default handler for CONNECTION_FORCED to act as if it was a network failure. Not very difficult but will take a few evenings. I think it is a good idea.

Michael Klishin

unread,
Jul 17, 2011, 1:00:24 PM7/17/11
to Ruby AMQP ecosystem
On 10 jul, 21:06, Paul Dlug <paul.d...@gmail.com> wrote:

> It's now working perfectly for me, both consumer (with named queue) and
> producer (with auto-delete temporary queue), thanks! Any recommendations on
> dealing with a graceful consumer shut down? In that case I get a
> "CONNECTION_FORCED" to the on_error handler for the connection. I'd like to
> treat this the same as a connection loss and just go into a reconnect loop.
> I'm not sure if there are other connection related exceptions the broker can
> through that should be treated the same.
>
> --Paul

Paul,

I decided to simply demonstrate how this can be achieved without
adding any additional API methods.
So here is a new spec example I added (it uses rabbitmq-server shell
script that is Homebrew/Mac OS X-specific right now,
but even this is way better than testing all this by hand over and
over and over):

https://github.com/ruby-amqp/amqp/commit/9145984622caf23974d02528d51348cbc42788f4

Give it a try. I really want to release RC14 in the next few days.

MK

Paul Dlug

unread,
Jul 19, 2011, 11:37:12 AM7/19/11
to ruby...@googlegroups.com
On Sunday, July 17, 2011 1:00:24 PM UTC-4, Michael Klishin wrote:

https://github.com/ruby-amqp/amqp/commit/9145984622caf23974d02528d51348cbc42788f4

Give it a try. I really want to release RC14 in the next few days.

Michael,

I tested this and it works perfectly, I do think it is a bit messy for the consumer of the amqp gem to have to know to catch an error with a reply code of 320. It would be optimal to have the amqp gem just treat this as a connection loss (since it is). Alternatively if this could be pulled into a constant or test method on the error to hide the reply code and handle any others that may be added in the future (not sure if clustering introduces other connection level errors).
 

Thanks,
Paul

Michael Klishin

unread,
Jul 19, 2011, 12:00:13 PM7/19/11
to ruby...@googlegroups.com
2011/7/19 Paul Dlug <paul...@gmail.com>

I tested this and it works perfectly, I do think it is a bit messy for the consumer of the amqp gem to have to know to catch an error with a reply code of 320.
It would be optimal to have the amqp gem just treat this as a connection loss (since it is).

I am sure there will be plenty of people who disagree. Also, rabbitmqctl has a way to close down specific connections which also uses the same reply_code. So it is not obvious that CONNECTION_FORCED should be treated the same as a network failure. And if it is not obvious, I do not include it into the gem, especially if it is an app recovery feature.

But the main reason why I decided to simply provide an example for now is time constraints. It takes a good week to carefully figure out how all this error handling machinery should work with addition of feature X and right now too many people are asking about 0.8.0 nearly every day.

One idea I have is to provide

AMQP::Session#on_connection_forced

and similar methods. However, what if you define both connection_forced handler AND a generic on_error? And remember, we keep dragging around the "catch all" AMQP.error handler from 0.6.x days, at least for now. Should they all be called? If so, in what order?

When it comes to libraries, poor decisions have to be maintained for years.

Paul Dlug

unread,
Jul 19, 2011, 3:26:05 PM7/19/11
to ruby...@googlegroups.com
I agree with not getting carried away with error handlers for every case. What I would suggest as a good short term solution would just be to provide a method on the error object sent to the on_error handler like #connection_forced? This would at least abstract away the need for the consumer of the amqp library to know what error code 320 is.


--Paul

milandobrota

unread,
Jul 28, 2011, 7:08:14 PM7/28/11
to Ruby AMQP ecosystem
I can't get the publisher to recover properly. After I kill -9
rabbitmq process and start it back up, channel becomes useless (can't
register a new queue or use the default exchange). Here is the code:

require 'rubygems'
require 'amqp'

Thread.new {EM.run}
sleep(1)

def connection
@connection ||= AMQP.connect(:host =>'localhost')
end

def channel
@channel ||= AMQP::Channel.new(connection, :auto_recovery => true)
end

connection.on_tcp_connection_loss do |conn, settings|
puts '[network failure] Trying to reconnect...'
puts conn.reconnect(false, 2)
end

connection.after_recovery do
puts 'recovered'
# Uncommenting following two lines would make it work. This leads me
to believe it is a problem with the channel and not the connection.
# @channel.close if @channel
# @channel = AMQP::Channel.new(connection, :auto_recovery => true)
end

while true do
puts 'posting'
channel.default_exchange.publish('aaa', :routing_key => 'test')
sleep 2
end

Michael Klishin

unread,
Jul 29, 2011, 2:54:37 AM7/29/11
to milandobrota, ruby...@googlegroups.com
2011/7/29 milandobrota <elite...@gmail.com>
I have a problem with publisher recovery. When I kill -9 rabbitmq
process and start it back up, channel.default_exchange.publish does
nothing (neither does channel.queue('test').publish). Here is the
code:

Publisher recovery was not in the scope of 0.8.0.RC14. It is a tougher nut to crack compared to consumers.
So what you are seeing is expected behavior and this is unlikely to change in upcoming months.

I don't have any good solutions for dealing with messages published after the network connect went down. Accumulating them in memory
has one set of problems, dropping them has another set and when used together with features like Publisher Confirms, it only gets
more complicated.

RabbitMQ Java client (as well as .NET and a couple of popular Python clients) simply provide basic means of handling connection shutdown,
without even trying to provide any automatic recovery behavior because different people have different opinion about how this should work.

This is exactly why amqp gem's automatic recovery mode is not on by default, is opt-in and not opt-out and is built on top of a number of callbacks
that are part of the public API and thus available to library users to the same extent they are available to library internals.

Michael Klishin

unread,
Jul 29, 2011, 3:00:19 AM7/29/11
to ruby...@googlegroups.com


2011/7/29 milandobrota <elite...@gmail.com>

I can't get the publisher to recover properly. After I kill -9
rabbitmq process and start it back up, channel becomes useless (can't
register a new queue or use the default exchange). Here is the code:


Did you use RabbitMQ management UI to see that queue.declare doesn't happen? or are you judging by the fact that
publishing does not recover? The latter is expected behavior for now.


I replied to one of your emails, posting it here once again:


Publisher recovery was not in the scope of 0.8.0.RC14. It is a tougher nut to crack compared to consumers.
So what you are seeing is expected behavior and this is unlikely to change in upcoming months.

I don't have any good solutions for dealing with messages published after the network connect went down. Accumulating them in memory
has one set of problems, dropping them has another set and when used together with features like Publisher Confirms, it only gets
more complicated.

RabbitMQ Java client (as well as .NET and a couple of popular Python clients) simply provide basic means of handling connection shutdown,
without even trying to provide any automatic recovery behavior because different people have different opinion about how this should work.

This is exactly why amqp gem's automatic recovery mode is not on by default, is opt-in and not opt-out and is built on top of a number of callbacks
that are part of the public API and thus available to library users to the same extent they are available to library internals.

Michael Klishin

unread,
Jul 29, 2011, 3:10:16 AM7/29/11
to ruby...@googlegroups.com


2011/7/29 milandobrota <elite...@gmail.com>

connection.after_recovery do
 puts 'recovered'
 # Uncommenting following two lines would make it work. This leads me
to believe it is a problem with the channel and not the connection.
 # @channel.close if @channel
 # @channel = AMQP::Channel.new(connection, :auto_recovery => true)
end


Automatic recovery callbacks are fired when AMQP connect is open again. But you also need to re-open the channel (as you already figured out). Automatic recovery does that for you, and only then queues, bindings & consumers are recovered.

What you probably want to do in this example is to use AMQP::Channel#after_recovery (there are also Queue#after_recovery and Exchange#after_recovery). All the classes above also have #on_connection_interruption.

I will try to improve http://rdoc.info/github/ruby-amqp/amqp/master/file/docs/ErrorHandling.textile#Manual_recovery and related sections.

Michael Klishin

unread,
Jul 29, 2011, 3:45:52 AM7/29/11
to ruby...@googlegroups.com


2011/7/29 Michael Klishin <michael....@gmail.com>

Automatic recovery callbacks are fired when AMQP connect is open again. But you also need to re-open the channel (as you already figured out). Automatic recovery does that for you, and only then queues, bindings & consumers are recovered.

Before I expand the documentation, you can see what automatic recovery really does (and how you can use the same public API methods):

connection:
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/async/adapters/event_machine.rb#L258
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/async/adapter.rb#L289
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/async/adapter.rb#L433

channel:
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/async/channel.rb#L251
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/async/channel.rb#L322

exchange:
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/async/exchange.rb#L169
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/async/exchange.rb#L222

queue:
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/async/queue.rb#L370
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/async/queue.rb#L427

consumer:
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/async/consumer.rb#L134
https://github.com/ruby-amqp/amq-client/blob/master/lib/amq/client/async/consumer.rb#L186

So the algorithm is basically this:

1. Once TCP connection is open again, schedule automatic recovery to begin after AMQP connection is open
2. For all channels in automatic recovery mode, call Channel#auto_recover
3. When channel is open again, run #auto_recover on all exchanges and queues on this channel
4. When queue is re-declared (server-named queues get new names by default), recover consumers
5. Recovering consumers means simply re-subscribing for messages with a different consumer tag.

Your application can hook into automatic recovery process but you need to remember that queues & exchanges depend on asynchronously open channels, consumers depend on queue being declared & possibly given a name, and the whole automatic recovery process depends on AMQP connection being reopened, not just the TCP one.

Hope this helps.
Reply all
Reply to author
Forward
0 new messages