on-recovery callbacks

113 views
Skip to first unread message

Joe Freeman

unread,
Aug 23, 2013, 5:57:27 AM8/23/13
to clojure-...@googlegroups.com
I've just started using RabbitMQ and Langohr, and I've noticed that my server process occasionally crashes with the exception:

  com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel

I was hoping this could be fixed with the recent addition of the `on-recovery` callbacks, but I'm not sure how these should be used. So far I have something like this:

  (defn setup-channel
    []
    (let [conn (rmq/connect)
          ch (atom (lch/open conn))]
      (letfn [(handle-conn-recovery [conn']
                (println "RabbitMQ connection recovered."))
              (handle-ch-recovery [ch']
                (println "RabbitMQ channel recovered.")
                (rmq/on-recovery ch' handle-ch-recovery)
                (reset! ch ch'))]
        (rmq/on-recovery conn handle-conn-recovery)
        (rmq/on-recovery @ch handle-ch-recovery)
        ch)))

I was hoping to be able to test this by restarting the RabbitMQ server and have the channel recover automatically, but in that case I get the same exception as above without any apparent recovery. Am I missing something? Can anyone share a working example of this feature?

Thanks.

Michael Klishin

unread,
Aug 23, 2013, 6:10:24 AM8/23/13
to clojure-...@googlegroups.com

2013/8/23 Joe Freeman <bit...@gmail.com>

Am I missing something? Can anyone share a working example of this feature?

Your code does not demonstrate handle-*-recovery but they use earlier closed channels.

Channels can be closed for a valid reason, e.g. trying to consume from a queue that does not exist,
 redeclaring a queue with different properties or ack-ing using unknown delivery tag.

I recommend to investigate *why* you have closed channels before trying to perform manual recovery.
In addition, these callbacks are for recovering from *network failures*, not arbitrary protocol exceptions.
See RabbitMQ log or handle shutdown exceptions.

RabbitMQ Java client does not currently have any automatic recovery features, unlikely, say, Ruby clients.
Langohr currently only does two things:

 * Reconnects
 * Reopens existing channels

It does not keep track of declared entities, so they are not redeclared and consumers are not recovered.

Joe Freeman

unread,
Aug 23, 2013, 8:49:44 AM8/23/13
to clojure-...@googlegroups.com
Thanks for your reply, Michael.

I assumed these exceptions were the result of network failures (e.g., from my laptop going to sleep). I'll keep an eye on the RabbitMQ logs in case not. But to clarify, should my `handle-ch-recovery` function be called if, for testing purposes, I restart the RabbitMQ server?

Michael Klishin

unread,
Aug 23, 2013, 9:00:01 AM8/23/13
to clojure-...@googlegroups.com

2013/8/23 Joe Freeman <bit...@gmail.com>

But to clarify, should my `handle-ch-recovery` function be called if, for testing purposes, I restart the RabbitMQ server?

If you enable automatic recovery with :automatically-recover true when connecting, it should.

Michael Klishin

unread,
Aug 23, 2013, 9:00:34 AM8/23/13
to clojure-...@googlegroups.com

2013/8/23 Joe Freeman <bit...@gmail.com>

  com.rabbitmq.client.AlreadyClosedException: clean connection shutdown; reason: Attempt to use closed channel

By the way, in theory since we provide our own Channel wrapper, we can accumulate operations until the channel is open
(that's what amqp gem does, for example) but it also may lead to confusion, so it's not an easy decision
to make.

Right now Langohr only recovers connections and channels, the absolute minimum we can do.

Joe Freeman

unread,
Aug 23, 2013, 11:04:55 AM8/23/13
to clojure-...@googlegroups.com

On Friday, 23 August 2013 14:00:01 UTC+1, Michael Klishin wrote:
2013/8/23 Joe Freeman <bit...@gmail.com>
But to clarify, should my `handle-ch-recovery` function be called if, for testing purposes, I restart the RabbitMQ server?

If you enable automatic recovery with :automatically-recover true when connecting, it should.

Ah, I was missing this out. But I'm still confused about how to handle network failures (and the RabbitMQ server being restarted). Perhaps it would help if I explain my expectation with a simple example:

If I setup the channel and create a queue like this:

=> (def ch (lch/open (rmq/connect {:automatically-recover true})))
...
=> (lq/declare ch "test" :durable true :auto-delete false)
...

I can check that the channel is open and get the status of the queue:

=> (lch/open? ch)
true
=> (lq/status ch "test")
{:message-count 0, :consumer-count 0}

If I stop the RabbitMQ server, I can see that the channel is now closed:

=> (lch/open? ch)
false

Then, if I start the RabbitMQ server, I can see that the channel is still closed:

=> (lch/open? ch)
false

And if I try to, say, get the status of a queue, I get an exception back:

=> (lq/status ch "test")
AlreadyClosedException clean connection shutdown; reason: Attempt to use closed channel  com.rabbitmq.client.impl.AMQChannel.ensureIsOpen (AMQChannel.java:190)

I was expecting the channel to automatically recover, and hence allow me to get the queue status. Shouldn't it?

Incidentally, I think I don't actually need to use the on-recovery callbacks, but if I do set these up, I notice two things:

 - The Connection version of on-recovery (`handle-conn-recovery` in my initial example) does get called, but the parameter is an AMQConnection and not, as I was expecting, a com.novemberain.langohr.Channel.

 - The Channel version of on-recovery (`handle-ch-recovery` in my initial example) doesn't get called, as I was expecting.

Thanks.

Michael Klishin

unread,
Aug 23, 2013, 11:06:25 AM8/23/13
to clojure-...@googlegroups.com
Joe Freeman:

> If I stop the RabbitMQ server, I can see that the channel is now closed:
>
> => (lch/open? ch)
> false
>
> Then, if I start the RabbitMQ server, I can see that the channel is still closed:
>
> => (lch/open? ch)
> false
>
> And if I try to, say, get the status of a queue, I get an exception back:
>
> => (lq/status ch "test")
> AlreadyClosedException clean connection shutdown; reason: Attempt to use closed channel com.rabbitmq.client.impl.AMQChannel.ensureIsOpen (AMQChannel.java:190)
>



> - The Channel version of on-recovery (`handle-ch-recovery` in my initial example) doesn't get called, as I was expecting.

This sounds like a bug.
--
MK

signature.asc

Joe Freeman

unread,
Aug 23, 2013, 11:43:21 AM8/23/13
to clojure-...@googlegroups.com

On Friday, 23 August 2013 16:06:25 UTC+1, Michael Klishin wrote:
This sounds like a bug.

Looks like the problem is that the channel isn't registered here:


As it is here:


I'll submit a pull request for this.

A work-around that causes `handle-ch-recovery` to be called is to give the channel an index. For example:

(def ch (lch/open (rmq/connect {:automatically-recover true}) 1))

This also seems to cause the channels to re-open as I was expecting.

Thanks for your help.

Michael Klishin

unread,
Aug 23, 2013, 11:47:36 AM8/23/13
to clojure-...@googlegroups.com
Joe Freeman:

> I'll submit a pull request for this.
>
> A work-around that causes `handle-ch-recovery` to be called is to give the channel an index. For example:
>
> (def ch (lch/open (rmq/connect {:automatically-recover true}) 1))

Excellent, thank you!
--
MK

signature.asc

Aaron Z

unread,
Nov 25, 2013, 4:42:51 PM11/25/13
to clojure-...@googlegroups.com


Michael Klishin <mklishin@...> writes:

> Excellent, thank you!

Hate to beat a dead horse here, but I'm not succeeding at this even if I
connect with {:automatically-recover true}, give the channel an index of 1,
and upgrade to langohr 1.6.0 -- neither handle-conn-recovery nor
handle-ch-recovery get called; I continue to get

AlreadyClosedException clean connection shutdown; reason: Attempt to use
closed channel com.rabbitmq.client.impl.AMQChannel.ensureIsOpen
(AMQChannel.java:190)

...
Code is:
=> (defn setup-channel
[]
(let [conn (rmq/connect {:automatically-recover true}) ; changed
ch (atom (lch/open conn 1))] ; changed
(letfn [(handle-conn-recovery [conn']
(println "RabbitMQ connection recovered."))
(handle-ch-recovery [ch']
(println "RabbitMQ channel recovered.")
(rmq/on-recovery ch' handle-ch-recovery)
(reset! ch ch'))]
(rmq/on-recovery conn handle-conn-recovery)
(rmq/on-recovery @ch handle-ch-recovery)
ch)))

#'setup-channel

=> (def cch (setup-channel))

#'cch

=> (lb/publish @cch "" "" "message")

nil

; ...shutdown, restart rabbit...

=> (lb/publish @cch "" "" "message")

AlreadyClosedException clean connection shutdown; reason: Attempt to use
closed channel com.rabbitmq.client.impl.AMQChannel.ensureIsOpen
(AMQChannel.java:190)


... Neither recovery function is called. What am I missing?


Michael Klishin

unread,
Nov 28, 2013, 5:49:36 AM11/28/13
to clojure-...@googlegroups.com
On nov 26, 2013, at 1:42 AM, Aaron Z <at...@sigsegv.com> wrote:

> Hate to beat a dead horse here, but I'm not succeeding at this even if I
> connect with {:automatically-recover true}, give the channel an index of 1,
> and upgrade to langohr 1.6.0 -- neither handle-conn-recovery nor
> handle-ch-recovery get called; I continue to get
>
> AlreadyClosedException clean connection shutdown; reason: Attempt to use
> closed channel com.rabbitmq.client.impl.AMQChannel.ensureIsOpen
> (AMQChannel.java:190)

Chances are, I will be working on connection recovery in the Java client next week
and adding some real recovery tests to Langohr is a good way to start with that.
--
MK

Software Engineer, Pivotal/RabbitMQ


signature.asc

Michael Klishin

unread,
Nov 28, 2013, 9:58:05 AM11/28/13
to clojure-...@googlegroups.com
2013/11/28 Michael Klishin <mkli...@gopivotal.com>

Chances are, I will be working on connection recovery in the Java client next week
and adding some real recovery tests to Langohr is a good way to start with that.

I got to it earlier than expected.

The issue turned out to be that if Langohr failed to reconnect on the first try, it
would simply stop trying.

1.7.0 will reconnect forever and I have the following example that recovers
just fine:


You can test it by shutting RabbitMQ down or simply closing connection using Management UI.

Recovery of entities (queues, exchanges,bindings) and test automation for recovery will likely land
in 1.8.0.
Reply all
Reply to author
Forward
0 new messages