I would like to implement an asynchronous API for cl-rabbit (which is built on top of rabbitmq-c).I've been experimenting a bit and it seems to me that it should be possible to do this, assuming that there is a single main loop that polls for AMQP messages and then dispatches them to the various waiting threads.Alan, do you think that you could provide some insight into what I should do in order to do this right? As far as I can tell, I should be polling/selecting on the socket returned from amqp_get_sockfd() and when input is available I should be calling amqp_simple_wait_frame()/amqp_simple_wait_frame_noblock() until the buffer is drained.
(In my loop I guess I also will have to open a pipe and select on it as well so that I can tell the main loop that there are outstanding requests from a different thread)Assuming this assumption is correct, what types of frames can I receive, other than actual messages? In other words, what am I supposed to do with the value that is returned from amqp_simple_wait_frame()?
--Regards,Elias
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
tl;dr: this is a use-case (async) that I'd like to see rabbitmq-c support well, unfortunately it doesn't do this well today.
First: please read this warning about using amqp_get_sockfd(). It doesn't sound like you want to manipulate the state of the socket, so you're probably safe on that part of the warning.
The other part of the warning is that rabbitmq-c doesn't present a stable interface in terms of how it uses the socket internally, and it will change version to version (e.g., there are likely some differences between v0.6 and v0.7).
Second: while I believe you'll be able to do most of the async stuff by select()/poll() on the socket, you're likely to run into issues when it comes to dealing with heartbeats. The two issues being: the library currently swallows heartbeat frames, so its hard to tell if the library received a heartbeat frame, and the library doesn't expose what the current heartbeat expiration times are (and for now, I'd like to keep this an implementation detail).
If you're willing to deal with the above issues read on...
You'll receive async methods. Depending on how use rabbitmq-c you'll most likely get connection.close, channel.close, basic.return, basic.ack, basic.cancel, connection.blocked, connection.unblocked. Take a look at https://www.rabbitmq.com/amqp-0-9-1-reference.html for a description of what kind of methods you can get from the broker.
Speaking of that: I noticed there are two similar, but distinctly different (in terms of when they return true) calls to check whether there are any packets available: amqp_data_in_buffer() and amqp_frames_enqueued(). As far as I can tell, the former simply indicates that some data has been read off the socket but not parsed yet, while the latter is a a list of unprocessed packets (is it a list of packets that were read while waiting for a specific reply to come in?). I'm assuming I should call amqp_simple_wait_frame() while (amqp_data_in_buffer() && amqp_frames_enqueued()) is true?
What happens if I call amqp_simple_wait_frame() after the poll/select told me there is data on the socket and that data happens to be a heartbeat?
Also, by default heartbeats are disabled, right?
If you're willing to deal with the above issues read on...I sure am. :-)You'll receive async methods. Depending on how use rabbitmq-c you'll most likely get connection.close, channel.close, basic.return, basic.ack, basic.cancel, connection.blocked, connection.unblocked. Take a look at https://www.rabbitmq.com/amqp-0-9-1-reference.html for a description of what kind of methods you can get from the broker.In my limited testing, the only thing I seem to get is basic.deliver. I have noticed that these always come in triplets. In other words, I keep getting three packets for every message: AMQP_FRAME_METHOD, AMQP_FRAME_HEADER and finally AMQP_FRAME_BODY. Can I assume this is always the case and that I can simply call amqp_simple_wait_frame() three times after receiving indication that there is data to read?
Finally, what am I supposed to do when receiving these other messages (ack, blocked, etc…)?
As far as I can tell, it seems slightly easier to have the main loop do amqp_consume_message() instead of amqp_simple_wait_frame(), although that still doesn't explain what I'm supposed to do when the other messages appear (and do these also consist of three frames each?).Thanks again for the information you've provided. It's very valuable to me.Regards,Elias
--
What happens if I call amqp_simple_wait_frame() after the poll/select told me there is data on the socket and that data happens to be a heartbeat?It may block, maybe use amqp_simple_wait_frame_noblock() with a 0 timeout.
Finally, what am I supposed to do when receiving these other messages (ack, blocked, etc…)?Depends on the method, and what sort of API you're offering to your users. The messages you MUST handle are channel.close, which indicates that a channel exception has occurred and that you must stop using the channel, and connection.close, which indicates that a connection exception has occurred, and the connection must be terminated. Others depend on what sort of API you want to expose to your users. connection.blocked/.unblocked might be surfaced as a callback that informs your API users they should stop publishing messages, basic.ack can be surfaced as a callback indicating that a message that was published has been successfully received by the broker if publisher confirmation is enabled. A basic.reject message is followed by content (header, plus 1+n body frames), which happens when an error occurs when publishing a message (like publishing to an exchange that does not exist). If you have the consumer cancellation notification enabled, basic.cancel indicates that a consumer you're listening to messages from has stopped.
You'll also need to consider that each connection can have multiple channels, and that messages delivered on different channels may be interleaved (the RabbitMQ broker may not do this, but it cannot be relied upon behavior).
What are channel exceptions (leading to a connection.close), and under what circumstances can they be sent? If at all possible I'd like to force this to occur so that I can test my code. As of now, I've never seen one. The same question applies to the others, in particular connection.close.
You'll also need to consider that each connection can have multiple channels, and that messages delivered on different channels may be interleaved (the RabbitMQ broker may not do this, but it cannot be relied upon behavior).When you say "may not do this", does that mean that RabbitMQ never does this?