rabbitmq-c - How to build an event loop?

653 views
Skip to first unread message

Elias Mårtenson

unread,
Jun 28, 2015, 10:44:14 PM6/28/15
to rabbitm...@googlegroups.com
I would like to implement an asynchronous API for cl-rabbit (which is built on top of rabbitmq-c).

I've been experimenting a bit and it seems to me that it should be possible to do this, assuming that there is a single main loop that polls for AMQP messages and then dispatches them to the various waiting threads.

Alan, do you think that you could provide some insight into what I should do in order to do this right? As far as I can tell, I should be polling/selecting on the socket returned from amqp_get_sockfd() and when input is available I should be calling amqp_simple_wait_frame()/amqp_simple_wait_frame_noblock() until the buffer is drained.

(In my loop I guess I also will have to open a pipe and select on it as well so that I can tell the main loop that there are outstanding requests from a different thread)

Assuming this assumption is correct, what types of frames can I receive, other than actual messages? In other words, what am I supposed to do with the value that is returned from amqp_simple_wait_frame()?

Regards,
Elias

Alan Antonuk

unread,
Jun 30, 2015, 1:52:54 AM6/30/15
to Elias Mårtenson, rabbitm...@googlegroups.com
tl;dr: this is a use-case (async) that I'd like to see rabbitmq-c support well, unfortunately it doesn't do this well today.

On Sun, Jun 28, 2015 at 7:44 PM, Elias Mårtenson <lok...@gmail.com> wrote:
I would like to implement an asynchronous API for cl-rabbit (which is built on top of rabbitmq-c).

I've been experimenting a bit and it seems to me that it should be possible to do this, assuming that there is a single main loop that polls for AMQP messages and then dispatches them to the various waiting threads.

Alan, do you think that you could provide some insight into what I should do in order to do this right? As far as I can tell, I should be polling/selecting on the socket returned from amqp_get_sockfd() and when input is available I should be calling amqp_simple_wait_frame()/amqp_simple_wait_frame_noblock() until the buffer is drained.

First: please read this warning about using amqp_get_sockfd(). It doesn't sound like you want to manipulate the state of the socket, so you're probably safe on that part of the warning. The other part of the warning is that rabbitmq-c doesn't present a stable interface in terms of how it uses the socket internally, and it will change version to version (e.g., there are likely some differences between v0.6 and v0.7).  

Second: while I believe you'll be able to do most of the async stuff by select()/poll() on the socket, you're likely to run into issues when it comes to dealing with heartbeats. The two issues being: the library currently swallows heartbeat frames, so its hard to tell if the library received a heartbeat frame, and the library doesn't expose what the current heartbeat expiration times are (and for now, I'd like to keep this an implementation detail).

If you're willing to deal with the above issues read on...


(In my loop I guess I also will have to open a pipe and select on it as well so that I can tell the main loop that there are outstanding requests from a different thread)

Assuming this assumption is correct, what types of frames can I receive, other than actual messages? In other words, what am I supposed to do with the value that is returned from amqp_simple_wait_frame()?

You'll receive async methods. Depending on how use rabbitmq-c you'll most likely get connection.close, channel.close, basic.return, basic.ack, basic.cancel, connection.blocked, connection.unblocked.  Take a look at https://www.rabbitmq.com/amqp-0-9-1-reference.html for a description of what kind of methods you can get from the broker.

Regards,
Elias

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Elias Mårtenson

unread,
Jun 30, 2015, 3:36:41 AM6/30/15
to rabbitm...@googlegroups.com, lok...@gmail.com
On Tuesday, 30 June 2015 13:52:54 UTC+8, Alan Antonuk wrote:
tl;dr: this is a use-case (async) that I'd like to see rabbitmq-c support well, unfortunately it doesn't do this well today.

Hopefully well enough that I'll be able to make it work. :-) Thanks a lot for your reply.
 
First: please read this warning about using amqp_get_sockfd(). It doesn't sound like you want to manipulate the state of the socket, so you're probably safe on that part of the warning.

I did, and as you correctly assumed, I will only use it to poll/select on.

Speaking of that: I noticed there are two similar, but distinctly different (in terms of when they return true) calls to check whether there are any packets available: amqp_data_in_buffer() and amqp_frames_enqueued(). As far as I can tell, the former simply indicates that some data has been read off the socket but not parsed yet, while the latter is a a list of unprocessed packets (is it a list of packets that were read while waiting for a specific reply to come in?). I'm assuming I should call amqp_simple_wait_frame() while (amqp_data_in_buffer() && amqp_frames_enqueued()) is true?
 
The other part of the warning is that rabbitmq-c doesn't present a stable interface in terms of how it uses the socket internally, and it will change version to version (e.g., there are likely some differences between v0.6 and v0.7).

I don't intend to support async in with rabbitmq-c versions below 0.7.
 
Second: while I believe you'll be able to do most of the async stuff by select()/poll() on the socket, you're likely to run into issues when it comes to dealing with heartbeats. The two issues being: the library currently swallows heartbeat frames, so its hard to tell if the library received a heartbeat frame, and the library doesn't expose what the current heartbeat expiration times are (and for now, I'd like to keep this an implementation detail).

What happens if I call amqp_simple_wait_frame() after the poll/select told me there is data on the socket and that data happens to be a heartbeat?

Also, by default heartbeats are disabled, right?
 
If you're willing to deal with the above issues read on...

I sure am. :-)
 
You'll receive async methods. Depending on how use rabbitmq-c you'll most likely get connection.close, channel.close, basic.return, basic.ack, basic.cancel, connection.blocked, connection.unblocked.  Take a look at https://www.rabbitmq.com/amqp-0-9-1-reference.html for a description of what kind of methods you can get from the broker.

In my limited testing, the only thing I seem to get is basic.deliver. I have noticed that these always come in triplets. In other words, I keep getting three packets for every message: AMQP_FRAME_METHOD, AMQP_FRAME_HEADER and finally AMQP_FRAME_BODY. Can I assume this is always the case and that I can simply call amqp_simple_wait_frame() three times after receiving indication that there is data to read?

Finally, what am I supposed to do when receiving these other messages (ack, blocked, etc…)?

As far as I can tell, it seems slightly easier to have the main loop do amqp_consume_message() instead of amqp_simple_wait_frame(), although that still doesn't explain what I'm supposed to do when the other messages appear (and do these also consist of three frames each?).

Thanks again for the information you've provided. It's very valuable to me.

Regards,
Elias

Alan Antonuk

unread,
Jun 30, 2015, 4:32:55 PM6/30/15
to Elias Mårtenson, rabbitm...@googlegroups.com
On Tue, Jun 30, 2015 at 12:36 AM, Elias Mårtenson <lok...@gmail.com> wrote:


Speaking of that: I noticed there are two similar, but distinctly different (in terms of when they return true) calls to check whether there are any packets available: amqp_data_in_buffer() and amqp_frames_enqueued(). As far as I can tell, the former simply indicates that some data has been read off the socket but not parsed yet, while the latter is a a list of unprocessed packets (is it a list of packets that were read while waiting for a specific reply to come in?). I'm assuming I should call amqp_simple_wait_frame() while (amqp_data_in_buffer() && amqp_frames_enqueued()) is true?
 
Your analysis is correct. If amqp_frames_enqueued() returns true, amqp_simple_wait_frame will return a frame without attempting to read from the network.  amqp_data_in_buffer() means that there is un-parsed data in the socket read buffer.  There may or may not be a complete frame available in that data, so amqp_simple_wait_frame may attempt to read the socket and block if there is a partial frame in the buffer.  Practically speaking: RabbitMQ (the broker) tends to send whole frames and the intended use case is in a LAN environment (though I'm seeing more users on cell phones...), so this happens rarely.  When it does happen it usually means that the connection got interrupted. 
 


What happens if I call amqp_simple_wait_frame() after the poll/select told me there is data on the socket and that data happens to be a heartbeat?
 
It may block, maybe use amqp_simple_wait_frame_noblock() with a 0 timeout.
 

Also, by default heartbeats are disabled, right?

It depends on the heartbeat parameter specified in amqp_login. Specifying 0 will disable heartbeats.
 
If you're willing to deal with the above issues read on...

I sure am. :-)
 
You'll receive async methods. Depending on how use rabbitmq-c you'll most likely get connection.close, channel.close, basic.return, basic.ack, basic.cancel, connection.blocked, connection.unblocked.  Take a look at https://www.rabbitmq.com/amqp-0-9-1-reference.html for a description of what kind of methods you can get from the broker.

In my limited testing, the only thing I seem to get is basic.deliver. I have noticed that these always come in triplets. In other words, I keep getting three packets for every message: AMQP_FRAME_METHOD, AMQP_FRAME_HEADER and finally AMQP_FRAME_BODY. Can I assume this is always the case and that I can simply call amqp_simple_wait_frame() three times after receiving indication that there is data to read?

No.  This is where its important to understand how the AMQP protocol works (because this is more or less the level you're working at with amqp_simple_wait_frame). A message delivery is: an AMQP_FRAME_METHOD with the basic.deliver method, a AMQP_FRAME_HEADER, then one or more AMQP_FRAME_BODY frames for large messages (if message_size > frame_size). Take a look at how amqp_consume_message is implemented.

Finally, what am I supposed to do when receiving these other messages (ack, blocked, etc…)?

Depends on the method, and what sort of API you're offering to your users.  The messages you MUST handle are channel.close, which indicates that a channel exception has occurred and that you must stop using the channel, and connection.close, which indicates that a connection exception has occurred, and the connection must be terminated.  Others depend on what sort of API you want to expose to your users. connection.blocked/.unblocked might be surfaced as a callback that informs your API users they should stop publishing messages, basic.ack can be surfaced as a callback indicating that a message that was published has been successfully received by the broker if publisher confirmation is enabled. A basic.reject message is followed by content (header, plus 1+n body frames), which happens when an error occurs when publishing a message (like publishing to an exchange that does not exist).  If you have the consumer cancellation notification enabled, basic.cancel indicates that a consumer you're listening to messages from has stopped.

You'll also need to consider that each connection can have multiple channels, and that messages delivered on different channels may be interleaved (the RabbitMQ broker may not do this, but it cannot be relied upon behavior).


As far as I can tell, it seems slightly easier to have the main loop do amqp_consume_message() instead of amqp_simple_wait_frame(), although that still doesn't explain what I'm supposed to do when the other messages appear (and do these also consist of three frames each?).

Thanks again for the information you've provided. It's very valuable to me.

Regards,
Elias

--

Elias Mårtenson

unread,
Jul 1, 2015, 12:09:08 PM7/1/15
to rabbitm...@googlegroups.com, lok...@gmail.com
Thanks a lot for your assistance. I have now reached a point where I can actually manage multiple channel objects from multiple threads and it all works seamlessly by multiplexing requests to a single thread that handles the communication (pretty neat, actually).

For the record, the code I'm working on is here (it's not ready for production use at this time though): https://github.com/lokedhs/cl-rabbit-async

On Wednesday, 1 July 2015 04:32:55 UTC+8, Alan Antonuk wrote:
 
What happens if I call amqp_simple_wait_frame() after the poll/select told me there is data on the socket and that data happens to be a heartbeat?
 
It may block, maybe use amqp_simple_wait_frame_noblock() with a 0 timeout.

I'll try this. I'm not dealing with heartbeats yet.
 
Finally, what am I supposed to do when receiving these other messages (ack, blocked, etc…)?

Depends on the method, and what sort of API you're offering to your users.  The messages you MUST handle are channel.close, which indicates that a channel exception has occurred and that you must stop using the channel, and connection.close, which indicates that a connection exception has occurred, and the connection must be terminated.  Others depend on what sort of API you want to expose to your users. connection.blocked/.unblocked might be surfaced as a callback that informs your API users they should stop publishing messages, basic.ack can be surfaced as a callback indicating that a message that was published has been successfully received by the broker if publisher confirmation is enabled. A basic.reject message is followed by content (header, plus 1+n body frames), which happens when an error occurs when publishing a message (like publishing to an exchange that does not exist).  If you have the consumer cancellation notification enabled, basic.cancel indicates that a consumer you're listening to messages from has stopped.

What are channel exceptions (leading to a connection.close), and under what circumstances can they be sent? If at all possible I'd like to force this to occur so that I can test my code. As of now, I've never seen one. The same question applies to the others, in particular connection.close.
 
You'll also need to consider that each connection can have multiple channels, and that messages delivered on different channels may be interleaved (the RabbitMQ broker may not do this, but it cannot be relied upon behavior).

When you say "may not do this", does that mean that RabbitMQ never does this?

Regards,
Elias

Alan Antonuk

unread,
Jul 6, 2015, 1:28:47 PM7/6/15
to Elias Mårtenson, rabbitm...@googlegroups.com
On Wed, Jul 1, 2015 at 9:09 AM, Elias Mårtenson <lok...@gmail.com> wrote:


What are channel exceptions (leading to a connection.close), and under what circumstances can they be sent? If at all possible I'd like to force this to occur so that I can test my code. As of now, I've never seen one. The same question applies to the others, in particular connection.close.

Exceptions are errors are reported in AMQP. Channel exceptions are usually reported for run-time error conditions such as publishing to a non-existent exchange. Connection exceptions are usually reported when there are bugs in the client implementation (such as trying to use a channel that isn't open). Reading the AMQP spec can give more information on this topic.
 
You'll also need to consider that each connection can have multiple channels, and that messages delivered on different channels may be interleaved (the RabbitMQ broker may not do this, but it cannot be relied upon behavior).

When you say "may not do this", does that mean that RabbitMQ never does this?

I've observed that RabbitMQ behaves this way currently.  The AMQP spec doesn't prevent this behavior from changing. IOW: don't rely on this behavior. 

Reply all
Reply to author
Forward
0 new messages