Rabbitmq-C : How to recover from AMQP_STATUS_CONNECTION_CLOSED situation programmatically?

1,609 views
Skip to first unread message

Rodrigo Pimenta Carvalho

unread,
Sep 9, 2014, 9:47:04 AM9/9/14
to rabbitm...@googlegroups.com

Hi.

Let's suppose the broker rabbitmq crashed, while it was being used to enqueue messages.
In this case, the client application get the AMQP_STATUS_CONNECTION_CLOSED status.

In addiction, let's suppose there is a program that observes the broker and restarts it always when necessary.
In this scenario, the crash condition will be just temporarily. So, the client application should try to reconnect to the broker, after a while, and recover from this AMQP_STATUS_CONNECTION_CLOSED situation.

How to recover from the AMQP_STATUS_CONNECTION_CLOSED situation programmatically, by means of the rabbitmq-c library and C code?

Any hint will be very helpful.
Thanks alot.




RODRIGO PIMENTA CARVALHO
Inatel Competence Center
Software
Ph: +55 35 3471 9979

Alan Antonuk

unread,
Sep 10, 2014, 11:03:45 AM9/10/14
to Rodrigo Pimenta Carvalho, rabbitm...@googlegroups.com
You need to call amqp_destroy_connection() to destroy the existing connection, then reconnect to the broker as you did initially. rabbitmq-c does not provide any connection state recovery, though it isn't impossible to write some code to accomplish this depending on your use case.

-Alan

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send an email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Rodrigo Pimenta Carvalho

unread,
Sep 10, 2014, 2:13:15 PM9/10/14
to Alan Antonuk, rabbitm...@googlegroups.com
Hi Alan Antonuk.

Thank you very much for the hint!

By the way, the library allows us to read messages from a broker, by means of the c instruction : amqp_consume_message(Conn, &Envelope, TimeVal, 0);.

In my project I use 10 seconds for TimeVal. So, after 10 seconds without reading messages (queue is empty), the code doesn't stay blocked by amqp_consume_message. As the amqp_consume_message is part of a while(true) scope, it is executed again and again. It is ok for me.

However, there is some situations I don know how to recover from that.
While my system is running, whether the broker is stopped and restarted (rabbitmqctl stop_app; rabbitmqctl start_app), my system enters in a AMQP_STATUS_UNEXPECTED_STATE !
How to recover from that state and how to put my system reading messages again, via C code? That is, how to recover from AMQP_STATUS_UNEXPECTED_STATE without reset my system?

I also would like to do my system recover from another abnormal situation: AMQP_STATUS_HEARTBEAT_TIMEOUT . So, how to simulate and recover from these situations.

Just one more question: is there a kind of situation that can let my system blocked in amqp_consume_message forever? I guess so, that is why I use the TimeVal.

Any hit/example will be very helpful.

However, if I'm doing/thinking something wrong, tell me, please!
Best Regards!


RODRIGO PIMENTA CARVALHO
Inatel Competence Center
Software
Ph: +55 35 3471 9979 (Brasil)
________________________________________
De: rabbitm...@googlegroups.com [rabbitm...@googlegroups.com] em Nome de Alan Antonuk [alan.a...@gmail.com]
Enviado: quarta-feira, 10 de setembro de 2014 12:03
Para: Rodrigo Pimenta Carvalho
Cc: rabbitm...@googlegroups.com
Assunto: Re: [rabbitmq-users] Rabbitmq-C : How to recover from AMQP_STATUS_CONNECTION_CLOSED situation programmatically?

You need to call amqp_destroy_connection() to destroy the existing connection, then reconnect to the broker as you did initially. rabbitmq-c does not provide any connection state recovery, though it isn't impossible to write some code to accomplish this depending on your use case.

-Alan

On Tue, Sep 9, 2014 at 6:46 AM, Rodrigo Pimenta Carvalho <pim...@inatel.br<mailto:pim...@inatel.br>> wrote:

Hi.

Let's suppose the broker rabbitmq crashed, while it was being used to enqueue messages.
In this case, the client application get the AMQP_STATUS_CONNECTION_CLOSED status.

In addiction, let's suppose there is a program that observes the broker and restarts it always when necessary.
In this scenario, the crash condition will be just temporarily. So, the client application should try to reconnect to the broker, after a while, and recover from this AMQP_STATUS_CONNECTION_CLOSED situation.

How to recover from the AMQP_STATUS_CONNECTION_CLOSED situation programmatically, by means of the rabbitmq-c library and C code?

Any hint will be very helpful.
Thanks alot.




RODRIGO PIMENTA CARVALHO
Inatel Competence Center
Software
Ph: +55 35 3471 9979<tel:%2B55%2035%203471%209979>

Alan Antonuk

unread,
Sep 11, 2014, 2:08:34 AM9/11/14
to Rodrigo Pimenta Carvalho, rabbitm...@googlegroups.com
On Wed, Sep 10, 2014 at 11:13 AM, Rodrigo Pimenta Carvalho <pim...@inatel.br> wrote:

However, there is some situations  I don know how to recover from that.
While my system is running, whether the broker is stopped and restarted (rabbitmqctl stop_app; rabbitmqctl start_app), my system enters in a AMQP_STATUS_UNEXPECTED_STATE !
How to recover from that state and how to put my system reading messages again, via C code?  That is, how to recover from AMQP_STATUS_UNEXPECTED_STATE without reset my system?

If you get an AMQP_STATUS_UNEXPECTED_STATE from amqp_consume_message, you need to call amqp_simple_wait_frame() and examine the output. That result just means amqp_consume_message received something that wasn't a message delivery. If amqp_simple_wait_frame() returns an error indicating the socket or connection was closed, you will need to destroy the connection and open a new one.

I also would like to do my system  recover from another abnormal situation: AMQP_STATUS_HEARTBEAT_TIMEOUT . So, how to simulate and recover from these situations.

rabbitmq-c doesn't provide a way to do this directly. There are ways to fake it by fiddling around with a firewall rule, or injecting/filtering packets before the OS deals with them, its very OS-dependent though. 

Just one more question: is there a kind of situation that can let my system blocked in amqp_consume_message forever? I guess so, that is why I use the TimeVal.

If you call amqp_consume_message() without a timeout, and the queue that the consumer is attached to doesn't have any messages delivered to it - it'll block forever. 



l rus

unread,
Oct 16, 2014, 11:02:47 AM10/16/14
to rabbitm...@googlegroups.com, pim...@inatel.br


One of the sample example has the following error handling framework for AMQP_STATUS_UNEXPECTED_STATE:

- if   !AMQP_STATUS_OK while amqp_simple_wait_frame,
            how to handle this?

- what are the other possible conditions for frame.payload.method.id besides:
         AMQP_BASIC_ACK_METHOD
         AMQP_BASIC_RETURN_METHOD
          AMQP_CHANNEL_CLOSE_METHOD
         AMQP_CONNECTION_CLOSE_METHOD
 

- I ran into scenario where amqp_consume_message() failed with AMQP_RESPONSE_LIBRARY_EXCEPTION
   (but OTHER than AMQP_STATUS_UNEXPECTED_STATE)

        first it got:  AMQP_STATUS_HEARTBEAT_TIMEOUT then AMQP_STATUS_CONNECTION_CLOSED

Does this require all of the following:
              amqp_new_connection
              amqp_ssl_socket_new
              amqp_ssl_socket_set_cacert
              amqp_ssl_socket_set_key
              amqp_socket_open
              amqp_login
              amqp_channel_open
              amqp_queue_declare
              amqp_basic_consume
              and send tcp login request  thru amqp_basic_publish?



>  If you call amqp_consume_message() without a timeout, and the queue that the consumer is attached to doesn't have any messages delivered to it - it'll block forever.

   i understand that amqp framework may not fully support threads.
   however, i have an independent thread to send / recv amqp messages.
   i am using amqp_consume_message() in blocked mode. Would there be any issues / concerns for doing this?

Michael Klishin

unread,
Oct 16, 2014, 11:04:51 AM10/16/14
to rabbitm...@googlegroups.com, l rus, pim...@inatel.br


On 16 October 2014 at 19:02:54, l rus (zxto...@gmail.com) wrote:
>  - what are the other possible conditions for frame.payload.method.id
> besides:
> AMQP_BASIC_ACK_METHOD
> AMQP_BASIC_RETURN_METHOD
> AMQP_CHANNEL_CLOSE_METHOD
> AMQP_CONNECTION_CLOSE_METHOD

There are many methods in the protocol, I suggest getting familiar with them:
http://www.rabbitmq.com/amqp-0-9-1-quickref.html
--
MK

Staff Software Engineer, Pivotal/RabbitMQ

l rus

unread,
Oct 16, 2014, 4:27:41 PM10/16/14
to rabbitm...@googlegroups.com, pim...@inatel.br



  > if amqp_simple_wait_frame() returns an error indicating the socket or connection was closed, you will need to destroy the connection and open a new one.

      how do i destroy an existing connection?
      do i need to reestablish an entire connection / queue etc [as listed below] when recreating a new connection?

      thanks,

l rus

unread,
Oct 16, 2014, 5:50:16 PM10/16/14
to rabbitm...@googlegroups.com, pim...@inatel.br


  Just to clarify, do i need to clean up any other resources associated with

   - amqp_channel,
    - amqp_queue, basic_consumer etc

   started during the previous connection session  along with amqp_destroy_connection()

   does amqp provide / require any steps to sync up two connection sessions
   (old and new)?

Michael Klishin

unread,
Oct 16, 2014, 5:54:40 PM10/16/14
to l rus, pim...@inatel.br, rabbitm...@googlegroups.com
On 17 October 2014 at 01:50:23, l rus (zxto...@gmail.com) wrote:
> does amqp provide / require any steps to sync up two connection
> sessions
> (old and new)?

Not in the protocol and most clients.

Alan Antonuk

unread,
Oct 17, 2014, 12:56:23 PM10/17/14
to l rus, rabbitm...@googlegroups.com, Rodrigo Pimenta Carvalho
On Thu, Oct 16, 2014 at 8:02 AM, l rus <zxto...@gmail.com> wrote:


One of the sample example has the following error handling framework for AMQP_STATUS_UNEXPECTED_STATE:

- if   !AMQP_STATUS_OK while amqp_simple_wait_frame,
            how to handle this?
You should compare against amqp_status_enum values (see: https://github.com/alanxz/rabbitmq-c/blob/master/librabbitmq/amqp.h#L659). Usually this means that the connection is in a bad state, and you need to tear down the connection and create a new one. 

- what are the other possible conditions for frame.payload.method.id besides:
         AMQP_BASIC_ACK_METHOD
         AMQP_BASIC_RETURN_METHOD
          AMQP_CHANNEL_CLOSE_METHOD
         AMQP_CONNECTION_CLOSE_METHOD
https://github.com/alanxz/rabbitmq-c/blob/master/librabbitmq/amqp_framing.h#L120 (anything ending in _METHOD). Really though you will likely see only a fraction of these. Your best bet is as Michael said to get familiar with the various amqp methods. 
 

- I ran into scenario where amqp_consume_message() failed with AMQP_RESPONSE_LIBRARY_EXCEPTION
   (but OTHER than AMQP_STATUS_UNEXPECTED_STATE)

        first it got:  AMQP_STATUS_HEARTBEAT_TIMEOUT then AMQP_STATUS_CONNECTION_CLOSED 

Does this require all of the following:
              amqp_new_connection
              amqp_ssl_socket_new
              amqp_ssl_socket_set_cacert
              amqp_ssl_socket_set_key
              amqp_socket_open
              amqp_login
              amqp_channel_open
              amqp_queue_declare
              amqp_basic_consume
              and send tcp login request  thru amqp_basic_publish?

Without knowing how you've created the various exchanges, queues, bindings and consumers in the original connection I can't give you a definitive answer to that. That said, yes you would need to create a new connection (amqp_new_connection), setup the socket (amqp_ssl_socket_new, set_cacert, set_key, socket_open), amqp_login, and create a new channel. 



>  If you call amqp_consume_message() without a timeout, and the queue that the consumer is attached to doesn't have any messages delivered to it - it'll block forever.

   i understand that amqp framework may not fully support threads.
   however, i have an independent thread to send / recv amqp messages.
   i am using amqp_consume_message() in blocked mode. Would there be any issues / concerns for doing this?

If you're sharing an amqp_connection_state_t object between two threads without synchronizing access to the object with something like a mutex you will run into problems.  If you're using two independent amqp_connection_state_t objects on different threads that will work without issue.





On Thursday, September 11, 2014 2:08:34 AM UTC-4, Alan Antonuk wrote:


On Wed, Sep 10, 2014 at 11:13 AM, Rodrigo Pimenta Carvalho <pim...@inatel.br> wrote:

However, there is some situations  I don know how to recover from that.
While my system is running, whether the broker is stopped and restarted (rabbitmqctl stop_app; rabbitmqctl start_app), my system enters in a AMQP_STATUS_UNEXPECTED_STATE !
How to recover from that state and how to put my system reading messages again, via C code?  That is, how to recover from AMQP_STATUS_UNEXPECTED_STATE without reset my system?

If you get an AMQP_STATUS_UNEXPECTED_STATE from amqp_consume_message, you need to call amqp_simple_wait_frame() and examine the output. That result just means amqp_consume_message received something that wasn't a message delivery. If amqp_simple_wait_frame() returns an error indicating the socket or connection was closed, you will need to destroy the connection and open a new one.

I also would like to do my system  recover from another abnormal situation: AMQP_STATUS_HEARTBEAT_TIMEOUT . So, how to simulate and recover from these situations.

rabbitmq-c doesn't provide a way to do this directly. There are ways to fake it by fiddling around with a firewall rule, or injecting/filtering packets before the OS deals with them, its very OS-dependent though. 

Just one more question: is there a kind of situation that can let my system blocked in amqp_consume_message forever? I guess so, that is why I use the TimeVal.

If you call amqp_consume_message() without a timeout, and the queue that the consumer is attached to doesn't have any messages delivered to it - it'll block forever. 



--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

Alan Antonuk

unread,
Oct 17, 2014, 1:00:22 PM10/17/14
to l rus, rabbitm...@googlegroups.com, Rodrigo Pimenta Carvalho
On Thu, Oct 16, 2014 at 2:50 PM, l rus <zxto...@gmail.com> wrote:


  Just to clarify, do i need to clean up any other resources associated with

   - amqp_channel,
    - amqp_queue, basic_consumer etc

   started during the previous connection session  along with amqp_destroy_connection()

amqp_destroy_connection() will free all resources that are returned by the library for a connection, with the exception of amqp_message_t objects returned by amqp_read_message  (use amqp_destroy_message), and amqp_envelope_t objects returned by amqp_consume_message (use amqp_destroy_envelope).

   does amqp provide / require any steps to sync up two connection sessions
   (old and new)?

What do you mean by sync? rabbitmq-c does not keep track of the various objects you create on the broker, if you want to recreate the state of a connection to the broker, you need to keep track of this state yourself. 

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
Message has been deleted

l rus

unread,
Oct 17, 2014, 5:38:54 PM10/17/14
to rabbitm...@googlegroups.com, zxto...@gmail.com, pim...@inatel.br

I have a mutex to protect reader thread (before amqp_consume_message() and processing data)
and cleanup thread (before amqp_destroy_connection).

However, occasionally, NEW connection seems to get into some unknown state
doing amqp_consume_message()

[e.g.
- AMQP_STATUS_UNEXPECTED_STATE and  frame.payload.method.id  = 3932181 
- amqp_basic_consume() fails with AMQP_RESPONSE_LIBRARY_EXCEPTION
and
- segfault in amqp_handle_input() invoked thru amqp_consume_message()]

Seems like some cleanup is missing...



l rus

unread,
Nov 6, 2014, 11:52:25 PM11/6/14
to rabbitm...@googlegroups.com, zxto...@gmail.com, pim...@inatel.br


I has open two sets of channel (1,2) + queue with initial connection,
and consumed messages thru amqp_consume_message.

Subsequently, to recover from error (or just testing), i used
amqp_destroy_connection() to cleanup previous setup
before opening two sets of channel (reuse 1,2) + queue with new connection.

However, one of the channels failed to open with "already open".

Shouldn't amqp_destroy_connection() destroyed all resources at client
and severed previous channels / queue from the server?


If not, what else is needed?


thanks,

Alan Antonuk

unread,
Nov 10, 2014, 10:36:07 PM11/10/14
to l rus, rabbitm...@googlegroups.com, Rodrigo Pimenta Carvalho
I can't tell from your description what you're doing.  Could you please post (as a http://gist.github.com/) a pared down example that demonstrates your issue?

-Alan

l rus

unread,
Nov 19, 2014, 1:13:40 PM11/19/14
to rabbitm...@googlegroups.com, zxto...@gmail.com, pim...@inatel.br

I will create a snippet and try to reproduce the problem with it
using a 3rd party broker/server.

thanks,
Reply all
Reply to author
Forward
0 new messages