Re: [rabbitmq-users] how to confirm message was received by the dela

1,453 views
Skip to first unread message

Michael Klishin

unread,
May 21, 2019, 7:51:46 PM5/21/19
to rabbitmq-users
Using publisher confirms. The exchange does not support mandatory publishing [1] but
3.8 will introduce a new metric for unroutable messages not published as mandatory.
Charting the metric is the only practical solution I see due to, well, the delayed nature of publishing.

The plugin also has documented limitations [1] that might be addressed in a future version (no promises)
or can be contributed by the users of said plugin. We have fairly specific redesign ideas for those interested.


On Tue, May 21, 2019 at 5:24 PM cupidcoffee <cupid...@gmail.com> wrote:
Hi

I keep publishing message to a delayed exchange, now the message will got dropped occasionally. I found out that it is because the delayed exchange did not get the message as the Mnesia table is 0. 
Currently I enabled delivery_confirm on the channel, but each time it returns me True, which I thought would mean the exchange got the message. But seems not.

So I try to use the mandatory flag when publish the message, but I think it is because the message will be hold in the delayed exchange, so I always got NO_ROUTE return even thought the message was received by the delayed exchange.

So my question is how can I confirm the delayed exchange get the message to prevent any message got dropped silently??

Thanks a lot

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/20daaa39-7250-4d97-8112-7912c556377b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--
MK

Staff Software Engineer, Pivotal/RabbitMQ

cupidcoffee

unread,
May 21, 2019, 8:40:18 PM5/21/19
to rabbitmq-users
Thanks for the reply Michael

I did use publish confirm, which I enable the delivery confirm on the channel, and check the return value of basic_publish, if it is True, assume the message was delivered to the broker. But in my case, basic_publish did return me true, however, when I checked the number of the delayed message in the delayed exchange Mnesia table, there is not message there, does that mean the rabbitmq broker receive the message but failed to send it to the delayed exchange?


On Tuesday, May 21, 2019 at 7:51:46 PM UTC-4, Michael Klishin wrote:
Using publisher confirms. The exchange does not support mandatory publishing [1] but
3.8 will introduce a new metric for unroutable messages not published as mandatory.
Charting the metric is the only practical solution I see due to, well, the delayed nature of publishing.

The plugin also has documented limitations [1] that might be addressed in a future version (no promises)
or can be contributed by the users of said plugin. We have fairly specific redesign ideas for those interested.


On Tue, May 21, 2019 at 5:24 PM cupidcoffee <cupid...@gmail.com> wrote:
Hi

I keep publishing message to a delayed exchange, now the message will got dropped occasionally. I found out that it is because the delayed exchange did not get the message as the Mnesia table is 0. 
Currently I enabled delivery_confirm on the channel, but each time it returns me True, which I thought would mean the exchange got the message. But seems not.

So I try to use the mandatory flag when publish the message, but I think it is because the message will be hold in the delayed exchange, so I always got NO_ROUTE return even thought the message was received by the delayed exchange.

So my question is how can I confirm the delayed exchange get the message to prevent any message got dropped silently??

Thanks a lot

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitm...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/20daaa39-7250-4d97-8112-7912c556377b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Michael Klishin

unread,
May 22, 2019, 10:21:12 AM5/22/19
to rabbitmq-users
basic.publish has no response in the protocol. What client are you using? What you should consider
to be a successful publish is a publish for which you have received a positive acknowledgment in a
reasonable amount of time [1].


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

cupidcoffee

unread,
May 22, 2019, 10:28:21 AM5/22/19
to rabbitmq-users
I thought if I enabled the delivery confirmation on the channel, the basic_confirm would at least return ack or nack?

So my code looks something like this:

self.__channel.confirm_delivery()
           if self.__channel.basic_publish(exchange=self.__exchange, routing_key=self.__routingKey, body = request, properties = properties):
                 //assume it is published successfully
          else:
                //resend message
 In my case, it enters the if case, but the delayed exchange mnesia table has 0 delayed message there. So I assume the delayed exchange does not get the message?  

Michael Klishin

unread,
May 23, 2019, 5:52:03 AM5/23/19
to rabbitmq-users
That is not correct. Acks or nacks arrive asynchronously from the publish operation.
How your application handles negative acknowledgements or acks not arriving within a reasonable amount of time
is up to you.

The code seems to be Python, so I'll assume that you are using Pika. There is an extensive publisher confirms example [1]
for that client.

There are basically three options:

 * "Streaming confirms", when you handle acks and nacks as they arrive or don't arrive without waiting
 * Publish a group of messages, await an acknowledgement or timeout for all of them. In case of trouble, retry by republishing the entire batch.
 * Publish a single message, await an acknowledgement or throw an exception/fail immediately. This is NOT recommended.

The latter is tempting but it will be *very* (orders of magnitude) slow compared to the 1st and even 2nd options. Do not do it.


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

cupidcoffee

unread,
May 24, 2019, 11:45:30 AM5/24/19
to rabbitmq-users
Hi Michael

Thanks for the reply.
I am using pika blockingConnection, seems the semantics of confirms have changed since 0.9.6. From what I can see, the callback parameter was removed from BlockingChannel (blocking_connection.py) in pika/pika@f2f1d95.
So seems the right way to do publish confirm is the way I am using here, refer to: https://pika.readthedocs.io/en/latest/examples/blocking_delivery_confirmations.html

This is the link I found also confirms it: https://github.com/rabbitinaction/sourcecode/issues/9

Then I just do not understand, with the current publish confirm I have with blockingConnection, since it returns me True, what does it mean? Does it mean broker side receive the message I sent?
Then furthermore, if that is the case, why the delayed exchange did not get the message from broker?

Thanks

cupidcoffee

unread,
May 28, 2019, 12:01:22 PM5/28/19
to rabbitmq-users
Also, this is the pika lib we are using:

"
def basic_publish(self, exchange, routing_key, body,   # pylint: disable=R0913
                      properties=None, mandatory=False, immediate=False):
        """Publish to the channel with the given exchange, routing key and body.
        Returns a boolean value indicating the success of the operation.

        This is the legacy BlockingChannel method for publishing. See also
        `BasicChannel.publish` that provides more information about failures.

        For more information on basic_publish and what the parameters do, see:


        NOTE: mandatory and immediate may be enabled even without delivery
          confirmation, but in the absence of delivery confirmation the
          synchronous implementation has no way to know how long to wait for
          the Basic.Return or lack thereof.

        :param exchange: The exchange to publish to
        :type exchange: str or unicode
        :param routing_key: The routing key to bind on
        :type routing_key: str or unicode
        :param body: The message body; empty string if no body
        :type body: str or unicode
        :param pika.spec.BasicProperties properties: message properties
        :param bool mandatory: The mandatory flag
        :param bool immediate: The immediate flag

        :returns: True if delivery confirmation is not enabled (NEW in pika
            0.10.0); otherwise returns False if the message could not be
            deliveved (Basic.nack and/or Basic.Return) and True if the message
            was delivered (Basic.ack and no Basic.Return)
        """
        try:
            self.publish(exchange, routing_key, body, properties,
                         mandatory, immediate)
        except (exceptions.NackError, exceptions.UnroutableError):
            return False
        else:
            return True

cupidcoffee

unread,
May 28, 2019, 12:08:08 PM5/28/19
to rabbitmq-users
def publish(self, exchange, routing_key, body,  # pylint: disable=R0913
                properties=None, mandatory=False, immediate=False):
        """Publish to the channel with the given exchange, routing key, and
        body. Unlike the legacy `BlockingChannel.basic_publish`, this method
        provides more information about failures via exceptions.

        For more information on basic_publish and what the parameters do, see:


        NOTE: mandatory and immediate may be enabled even without delivery
          confirmation, but in the absence of delivery confirmation the
          synchronous implementation has no way to know how long to wait for
          the Basic.Return.

        :param exchange: The exchange to publish to
        :type exchange: str or unicode
        :param routing_key: The routing key to bind on
        :type routing_key: str or unicode
        :param body: The message body; empty string if no body
        :type body: str or unicode
        :param pika.spec.BasicProperties properties: message properties
        :param bool mandatory: The mandatory flag
        :param bool immediate: The immediate flag

        :raises UnroutableError: raised when a message published in
            publisher-acknowledgments mode (see
            `BlockingChannel.confirm_delivery`) is returned via `Basic.Return`
            followed by `Basic.Ack`.
        :raises NackError: raised when a message published in
            publisher-acknowledgements mode is Nack'ed by the broker. See
            `BlockingChannel.confirm_delivery`.

        """
        if self._delivery_confirmation:
            # In publisher-acknowledgments mode
            with self._message_confirmation_result:
                self._impl.basic_publish(exchange=exchange,
                                         routing_key=routing_key,
                                         body=body,
                                         properties=properties,
                                         mandatory=mandatory,
                                         immediate=immediate)

                self._flush_output(self._message_confirmation_result.is_ready)
                conf_method = (self._message_confirmation_result.value
                               .method_frame
                               .method)

                if isinstance(conf_method, pika.spec.Basic.Nack):
                    # Broker was unable to process the message due to internal
                    # error
                    LOGGER.warn(
                        "Message was Nack'ed by broker: nack=%r; channel=%s; "
                        "exchange=%s; routing_key=%s; mandatory=%r; "
                        "immediate=%r", conf_method, self.channel_number,
                        exchange, routing_key, mandatory, immediate)
                    if self._puback_return is not None:
                        returned_messages = [self._puback_return]
                        self._puback_return = None
                    else:
                        returned_messages = []
                    raise exceptions.NackError(returned_messages)

                else:
                    assert isinstance(conf_method, pika.spec.Basic.Ack), (
                        conf_method)

                    if self._puback_return is not None:
                        # Unroutable message was returned
                        messages = [self._puback_return]
                        self._puback_return = None
                        raise exceptions.UnroutableError(messages)
        else:
            # In non-publisher-acknowledgments mode
            self._impl.basic_publish(exchange=exchange,
                                     routing_key=routing_key,
                                     body=body,
                                     properties=properties,
                                     mandatory=mandatory,
                                     immediate=immediate)
            self._flush_output()


as it is blocking connection and _flush_output function call ensure to wait for the return, so I think it is sync call here.  Which means after we send the message to an delayed exchange, broker side did not return either NACK or UNROUTE exception, which I assume it means the broker get the message? Then how come the delayed exchange did not get the message then?

Thanks

Luke Bakken

unread,
May 28, 2019, 2:11:14 PM5/28/19
to rabbitmq-users
Hello,

Please don't paste large amounts of code in a message. You can use GitHub to provide a link to specific lines:

https://github.com/pika/pika/blob/master/pika/adapters/blocking_connection.py#L2170-L2249

You appear to be using the library correctly so there may be a subtle issue with your code or in your environment. The best way to proceed would be to provide a means for me to reproduce what you are reporting.

Thanks,
Luke

On Tuesday, May 28, 2019 at 9:08:08 AM UTC-7, cupidcoffee wrote:

as it is blocking connection and _flush_output function call ensure to wait for the return, so I think it is sync call here.  Which means after we send the message to an delayed exchange, broker side did not return either NACK or UNROUTE exception, which I assume it means the broker get the message? Then how come the delayed exchange did not get the message then?
 
On Friday, May 24, 2019 at 11:45:30 AM UTC-4, cupidcoffee wrote:

cupidcoffee

unread,
May 28, 2019, 2:42:36 PM5/28/19
to rabbitmq-users
Thanks for the reply and the tip Luke.

The problem with this one is I do not even know how to replicate it on dev. When I test and develop on our dev environment, everything is fine, it is just when we deploy to our beta and run some beta regression test there, this issue would happen once a while.
Since this is for our company and there is a specific team manager all the rabbitmq broker and clusters, based on the code, I assume the issue happened on the rabbitmq broker side?

Just do not know if this conclusion is correct? and if so, how can they debug this issue on the broker side? 

As far as I see it, since I used the publisher confirm correctly and broker side did not return any exception to me, so I assume the broker side should have get the message, but the broker side should handle the logic to deliver this message to the delayed-exchange I specified, so do not think it should have anything to do with my publisher code..Am I right?

Thanks

Luke Bakken

unread,
May 28, 2019, 3:14:08 PM5/28/19
to rabbitmq-users
Hello -

What version of RabbitMQ and Erlang are you using?

It is extremely unlikely that RabbitMQ is the cause of this issue. The delayed message exchange plugin might be the cause but I'm just speculating.

First of all, Pika 1.0.1 uses exceptions to indicate non-delivered messages:


I have opened the following issue to fix the outdated example code in the docs - https://github.com/pika/pika/issues/1217

Your code will still work fine and if messages aren't confirmed you would be seeing NackError

At this point I would investigate the differences between your dev and "beta" environments. Is RabbitMQ set up exactly the same way? Does one environment use a cluster and the other not? Are you using the same code and data?

Have you tried your code using a regular exchange and not a delayed message exchange?

Thanks,
Luke

cupidcoffee

unread,
May 28, 2019, 4:47:09 PM5/28/19
to rabbitmq-users
Thanks Luke

we are using 3.7.12 rabbitmq and erlang is 21. Both environment should be the same, and this delayed exchange plugin should be installed the same way.

Are you aware of any broker side log we can check to see what happened when the broker try to deliver the message to the delayed-exchange? or why the delayed-exchange does not get the message?

Thanks

Luke Bakken

unread,
May 28, 2019, 4:57:44 PM5/28/19
to rabbitmq-users
You may be able to get useful information using the tracing plugin: https://www.rabbitmq.com/firehose.html

I strongly suggest testing using a regular exchange to see if the issue is in how you are using the delayed message exchange or in the exchange itself.

Luke Bakken

unread,
May 28, 2019, 6:05:32 PM5/28/19
to rabbitmq-users
One explanation for what you are seeing is that you are publishing a delayed message that can't be routed to a queue. When the delay timer expires and RabbitMQ attempts to route the message, the message is dropped. If your delays are short enough, you may never have the chance to confirm that it is in the exchange or not.

You can use this technique to ensure that unroutable messages end up in a queue:

cupidcoffee

unread,
Jun 3, 2019, 2:32:08 PM6/3/19
to rabbitmq-users
Thanks Luke

I just use the firehose as you suggested, and found out that the lost message can be consumed through the trace exchange, but somehow cannot be even delivered to the delayed exchange(since I checked the delayed message number at that time is 0 which should be 1). 

So according to the result, can it be sure that the message was lost on the rabbitmq broker side? ie the broker somehow failed to deliver this message to the delayed-exchange?

Thanks

Luke Bakken

unread,
Jun 3, 2019, 5:02:40 PM6/3/19
to rabbitmq-users
Hello,

That is an interesting finding but we still don't have enough information to know what is going on.

* Is this issue still random and only happening in your "beta" environment?

* How long is a typical delay value? What was the x-delay value for the message(s) that didn't appear to make it to the delayed exchange?

* As I have suggested several times, please run your tests using a regular (non-delayed) exchange and ensure that messages are not dropped. As an alternative, you can set up an alternate exchange for your delayed exchange as described here so that messages aren't dropped: https://jack-vanlightly.com/blog/2017/3/12/how-to-deal-with-unroutable-messages-rabbitmq-publishing-part-3

Like I said before, if you can't reproduce this in your "dev" environment but you can in your "beta" one we need to figure out what is different, either in the environment or the messages themselves. If you can provide a way to reliably reproduce this issue, I'm sure we can find the root cause very quickly.

Thanks -
Luke

On Monday, June 3, 2019 at 11:32:08 AM UTC-7, cupidcoffee wrote:
Thanks Luke

cupidcoffee

unread,
Jun 3, 2019, 5:16:18 PM6/3/19
to rabbitmq-users
Thanks for the reply

Yes, it is still randomly happen on our beta environment. I tried the same regression test on our dev (which is using a different vhost) several times, and it is totally fine which means it can keep refreshing one message until the time limit I set(30 mins in this case). But on beta, most of the times, it only keep refreshing for 2-3 mins.

Our delay value is 25000 which is 25 seconds.

I did try to add alternate exchange in case delay-exchange fail to route the message as it is known limitation. But since in the monitor I know that the lost message is not even reached the delayed-exchange(mnesia table is of size 0), I really doubt alternate exchange would help here.

For now I am kind of thinking maybe it is the bad install of the delayed exchange plugin on this beta vhost. But the programmer from our internal RMQ support team said if the plugin was bad in some way the brokers would not come up during turnaround.

But will try to add alternate exchange anyway. Will let you know if I got anything. 

Thanks

cupidcoffee

unread,
Jun 5, 2019, 10:17:35 AM6/5/19
to rabbitmq-users
Hi Luke

This might seems wired, but just convinced our rmq support team to remove and re-enable the delayed exchange plugin on our beta vhost, and it solved the issue. Even though our rmq support team claim the setting of the plugin remains the same.

Even though still not sure with a bad setup, why it would only lost message occasinally...ie. it can get and delayed first several messages.

I know it might out of the scope of my original question and I cannot provide more detailed trace. So just let me know if you have any thoughts otherwise it should be good for me although I am urging our rmq support to have a deeper look at the issue.

Thanks

Luke Bakken

unread,
Jun 5, 2019, 11:23:38 AM6/5/19
to rabbitmq-users
Hello,

I can't think of why that would have fixed your issue. It would be interesting to know what commands were run, exactly, to "remove and re-enable" the plugin.

Thanks,
Luke

cupidcoffee

unread,
Jun 5, 2019, 11:38:10 AM6/5/19
to rabbitmq-users
Hello
Yeah, Agree. it is so wired. The command we are using is just the standard   "rabbitmq-plugins enable/disable plugin-name"
Even always wondering it is the delayed exchange's issue, but still shocked that simply re-enable the plugin would solve the issue.

Luke Bakken

unread,
Jun 5, 2019, 11:43:23 AM6/5/19
to rabbitmq-users
Thanks for that information. If you see this again, or, ideally, have a way to reproduce it, please follow up on this list.

cupidcoffee

unread,
Jun 5, 2019, 11:44:33 AM6/5/19
to rabbitmq-users
absolutely. Thanks for all the help. Even still not clear about the root cause of this issue, but did learn a lot of new stuff here. Thanks again
Reply all
Reply to author
Forward
0 new messages