rabbitmq-c. Examples of "persistence" and "Confirms (aka Publisher Acknowledgements)"

3,684 views
Skip to first unread message

victo...@tut.by

unread,
Aug 29, 2014, 10:47:17 AM8/29/14
to rabbitm...@googlegroups.com
Hi,
I have a good experience in C/C++ programming but I am new to RabbitMQ.
I must to write my application that will be as producer for rabbitmq-c library on Windows environment.
This application must be very reliable.
Because I want to use modes "persistence" and "Confirms (aka Publisher Acknowledgements)".
http://www.rabbitmq.com/tutorials/tutorial-two-python.html
https://www.rabbitmq.com/confirms.html

1. I find the example for "persistence" mode in examples of rabbitmq-c (amqp_sendstring.c).
My code:

amqp_basic_properties_t props;
props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
props.delivery_mode = 2; /* persistent delivery mode */
die_on_error(amqp_basic_publish(conn,1,
                  amqp_cstring_bytes("mytest"),
                  amqp_cstring_bytes(queue_name),
                  0,
                  0,
                  &props,
                  message_bytes),
"Publishing");

Is it right ?


2. I did not find an example for acknowledgements mode for C (only for java).
http://hg.rabbitmq.com/rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java

I very need so example on C.
Help me please.

Thanks in advance,
Victor A

Michael Klishin

unread,
Aug 29, 2014, 11:51:54 AM8/29/14
to victo...@tut.by, rabbitm...@googlegroups.com
On 29 August 2014 at 18:47:23, victo...@tut.by (victo...@tut.by) wrote:
> > 1. I find the example for "persistence" mode in examples of rabbitmq-c
> (amqp_sendstring.c).
> My code:
>
> amqp_basic_properties_t props;
> props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
> props.delivery_mode = 2; /* persistent delivery mode */
> die_on_error(amqp_basic_publish(conn,1,
> amqp_cstring_bytes("mytest"),
> amqp_cstring_bytes(queue_name),
> 0,
> 0,
> &props,
> message_bytes),
> "Publishing");
>
> Is it right ?

Seems so.

> 2. I did not find an example for acknowledgements mode for C (only
> for java).

I'm not sure librabbitmq-c supports publisher confirms. My guess would be that
it doesn't .

The maintainer is on this list, let's wait for him to clarify.
--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Alan Antonuk

unread,
Aug 29, 2014, 9:56:27 PM8/29/14
to Michael Klishin, victo...@tut.by, rabbitm...@googlegroups.com
On Fri, Aug 29, 2014 at 8:51 AM, Michael Klishin <mkli...@pivotal.io> wrote:
On 29 August 2014 at 18:47:23, victo...@tut.by (victo...@tut.by) wrote:
> > 1. I find the example for "persistence" mode in examples of rabbitmq-c
> (amqp_sendstring.c).
> My code:
>
> amqp_basic_properties_t props;
> props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
> props.delivery_mode = 2; /* persistent delivery mode */
> die_on_error(amqp_basic_publish(conn,1,
> amqp_cstring_bytes("mytest"),
> amqp_cstring_bytes(queue_name),
> 0,
> 0,
> &props,
> message_bytes),
> "Publishing");
>
> Is it right ?

Seems so.

> 2. I did not find an example for acknowledgements mode for C (only
> for java).

I'm not sure librabbitmq-c supports publisher confirms. My guess would be that
it doesn't .

rabbitmq-c does support publisher-confirms - though it doesn't provide a very convenient API to use it:

To use it:
Put the channel in publisher-confirm mode by calling:

At this point forward anytime you do an amqp_basic_publish() on the channel you called amqp_confirm_select(), you will eventually get an acknowledgement that the message was processed (successfully or otherwise).

To get this confirmation - you should call amqp_simple_wait_frame() and examine the frame that you get back. If the message is successfully processed you will get a basic.ack method frame. Note that if you've specified the mandatory flag and the message isn't routed to a queue you will get a basic.reject frame + header + body frames before you get the basic.ack.  In rare circumstances you may also get a basic.nack if the broker for some reason cannot process the message. If you publish several messages in quick succession - you may receive a basic.ack with multiple = 1, which means the broker is ack'ing all messages up-to and including that message id that haven't been acknowledged yet.

The rest of the notes on this page do apply: https://www.rabbitmq.com/confirms.html

-Alan

victo...@tut.by

unread,
Sep 1, 2014, 4:09:20 AM9/1/14
to rabbitm...@googlegroups.com, mkli...@pivotal.io, victo...@tut.by
Thank you for speedy and detail answer.
I will attempt to do as you advise.
Probably, I will get the problem - I will direct in this topic again for help.

victo...@tut.by

unread,
Sep 2, 2014, 4:19:08 AM9/2/14
to rabbitm...@googlegroups.com, mkli...@pivotal.io, victo...@tut.by
Hi

I get your example Producer as base.
I did few changes in this example for confirm mode:

int main(int argc, char const *const *argv)
{
..............................
  amqp_channel_open(conn, 1);
  die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");

//add confirm mode in channel
  amqp_confirm_select(conn,1);

  send_batch(conn, "test queue", rate_limit, message_count);
..............................
}


static void send_batch(amqp_connection_state_t conn,
                       char const *queue_name,
                       int rate_limit,
                       int message_count)
{
..............................

    amqp_basic_properties_t props;
    props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
    props.delivery_mode = 2; /* persistent delivery mode */
   
    //set persistent delivery mode  and mandatory flag
    die_on_error(amqp_basic_publish(conn,
                                    1,
                                    amqp_cstring_bytes("tick"),
                                    amqp_cstring_bytes(queue_name),
                                    1,

                                    0,
                                    &props,
                                    message_bytes),
                 "Publishing");

    //wait for ack
    amqp_frame_t decoded_frame;
    int res=amqp_simple_wait_frame(conn,&decoded_frame);
..............................
}

I see (in degugger) that the function amqp_simple_wait_frame() return 0 (AMQP_STATUS_OK).

Is my changes right ?
Excuse me, I do not understand what is:
- "basic.ack method frame"
- "multiple = 1"
- "message id"

If it is possible, please give here  small code fragment as I must execute confirm message.

Victor.

l rus

unread,
Sep 3, 2014, 9:10:47 AM9/3/14
to rabbitm...@googlegroups.com, mkli...@pivotal.io, victo...@tut.by
 
 
As per rabbitmq model description (http://www.rabbitmq.com/tutorials/amqp-concepts.html):
 
> the AMQP model has a notion of message acknowledgements: when a message is delivered to a consumer the consumer notifies the broker, either automatically or as soon as the application developer chooses to do so.
 
 
please confirm that rabbitmq-c developer has to send ACK to the
publisher as per earlier note.  Otherwise, the broker may
end up holding on messages unnecessarily (until it discards thru certain
creteria).

Michael Klishin

unread,
Sep 3, 2014, 9:30:27 AM9/3/14
to rabbitm...@googlegroups.com, l rus, victo...@tut.by
On 3 September 2014 at 17:10:49, l rus (zxto...@gmail.com) wrote:
> > please confirm that rabbitmq-c developer has to send ACK to
> the
> publisher as per earlier note. Otherwise, the broker may
> end up holding on messages unnecessarily (until it discards
> thru certain
> creteria).

You decide if you use automatic or manual acknowledgements, this is
true for any client. See Tutorial 2:

http://www.rabbitmq.com/getstarted.html

Ack is *not* sent to the publisher but to the server. Publisher confirms work
the same way but between a publisher and the server. 

l rus

unread,
Sep 3, 2014, 9:55:26 AM9/3/14
to rabbitm...@googlegroups.com, zxto...@gmail.com, victo...@tut.by
 
 
 
The AMQP 0-9-1 specification proposes two choices:
  • After broker sends a message to an application (using either basic.deliver or basic.get-ok AMQP methods).
  • After the application sends back an acknowledgement (using basic.ack AMQP method).
 
Where is this attribute / selection made?
Is this an exchange or server level property?
 
 
> Publisher confirms work  the same way but between a publisher and the server. 
 
 
I may have misunderstood above statement...
Does publisher need to send an ACK? for what?

Alan Antonuk

unread,
Sep 3, 2014, 11:50:57 AM9/3/14
to victo...@tut.by, rabbitm...@googlegroups.com, Michael Klishin
You probably want to check the result of amqp_confirm_select() and error-handle appropriately.

Assuming you get an AMQP_STATUS_OK from amqp_simple_wait_frame() you need to examine decoded frame (note that this is example code that I've written in my email client - no idea if it compiles or not, also you'll need to decide what what appropriate error-handling will look like for your application).

if (decoded_frame.type == AMQP_FRAME_METHOD && decoded_frame.channel == 1) {
  if (decoded_frame.payload.method.id == AMQP_BASIC_RETURN_METHOD) {
    /* Message was published with mandatory = true and the message
     * wasn't routed to a queue, so the message is returned */
    amqp_message_t returned_message;
    die_on_amqp_error(amqp_read_message(conn, 1, &returned_message, 0));
    /* Do something with returned, free memory when done */
    amqp_destroy_message(returned_message);

    /* look for the AMQP_BASIC_ACK_METHOD from the broker */  
    die_on_error(amqp_simple_wait_frame(conn, &decoded_frame));
    if (decoded_frame.type != AMQP_FRAME_METHOD || decoded_frame.channel != 1) {
     /* something is probably wrong... handle it */
    }
  }
  if (decoded_frame.payload.method.id == AMQP_BASIC_ACK_METHOD) {
    amqp_basic_ack_t *a = (amqp_basic_ack_t*)decoded_frame.payload.method.decoded;
    /* if you've kept a count of the messages you've published on the channel, 
     * the a->delivery_tag is the message serial being acknowledged.
     * if a->multiple != 0, that means all messages up-to-and-including that message 
     * serial are being acknowledged */
  } else {
    /* You've received a different method, probably not what you want */
  }
}


HTH
-Alan 

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Michael Klishin

unread,
Sep 3, 2014, 2:17:56 PM9/3/14
to l rus, rabbitm...@googlegroups.com, victo...@tut.by
basic.get-ok or get-empty is server-returned, ack is sent using basic.ack, a separate method.

MK

Michael Klishin

unread,
Sep 3, 2014, 3:14:58 PM9/3/14
to l rus, rabbitm...@googlegroups.com, victo...@tut.by
 On 3 September 2014 at 22:17:59, Michael Klishin (mic...@rabbitmq.com) wrote:
> basic.get-ok or get-empty is server-returned, ack is sent
> using basic.ack, a separate method.

Now that I'm on a regular keyboard, I'd like to clarify.

basic.get is a client-sent method that can have one of the two responses:
basic.get-ok (there is a delivery) and basic.get-empty (nothing to respond with).

basic.get is very similar to HTTP GET, except that it uses two response methods
as opposed to status codes.

With basic.consume you set up a subscription and RabbitMQ sends basic.deliver to the consumer
as messages become available. When issuing a basic.consume, you explicitly pick which
acknowledgement mode you want: automatic or manual (client has to send basic.[n]ack).

Both basic.get and basic.deliver can use manual acknowledgement mode. To ack a message,
clients use basic.ack which uses the delivery's delivery_tag. To reject a message,
use basic.nack (which is basic.reject that can nack multiple messages at once).

Most of this is covered by the tutorials.

U MK

unread,
Oct 23, 2019, 5:18:19 AM10/23/19
to rabbitmq-users
Hi,

Is there any way to achieve async publisher acknowledgment?

Thanks,
Mohan

Wesley Peng

unread,
Oct 23, 2019, 5:22:37 AM10/23/19
to rabbitm...@googlegroups.com
Hi

Once I used ruby's EventMachine it's easy to do async programming to RMQ.
I saw there is an old blog which had code sample you could refer with.

regards. 

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

U MK

unread,
Oct 23, 2019, 5:43:38 AM10/23/19
to rabbitmq-users
Thanks for your reply.

I am looking RabbitMQ-C library to achieve async publisher confirm.

Regards,
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitm...@googlegroups.com.

Michael Klishin

unread,
Oct 23, 2019, 5:49:27 AM10/23/19
to rabbitmq-users
A quick source code search suggests there isn't even a way to enable publisher confirms on a channel in librabbitmq-c :( [1]


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/799273a5-26e2-43b1-ab59-570751712061%40googlegroups.com.

taju...@gmail.com

unread,
Apr 8, 2022, 8:14:27 AM4/8/22
to rabbitmq-users
Hi All,

tried to implement the above code in mod_amqp of FreeSWITCH  but as soon as confirm_select is called , the thread getrs stuck and thereafter i dont get any reply.

440         switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[] in send function , exchange is :: %s \n", profile->exchange);                                                               
 441         amqp_confirm_select(profile->conn_active->state, 1);                                             :: CODE gets blocked here                                                                                              
442         switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[] in send function , routing key is :: %s \n", msg->routing_key);                                                             
 443         status = amqp_basic_publish(                                                                                                                                                                   
444                                                                 profile->conn_active->state,                                                                                                           
445                                                                 1,                                                                                                                                     
446                                                                 amqp_cstring_bytes(profile->exchange),                                                                                                
 447                                                                 amqp_cstring_bytes(msg->routing_key),                                                                                                
 448                                                                 1,                                                                                                                                    
 449                                                                 0,                                                                                                                                     
450                                                                 &props,                                                                                                                              
 451                                                                 amqp_cstring_bytes(msg->pjson));                                                                                                       
452         status = amqp_simple_wait_frame(profile->conn_active->state, &decoded_frame);                                                                                                                 
 453         switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[] in send function , status is :: %d \n", status);
Reply all
Reply to author
Forward
0 new messages