How to discover if a message was published to anexchange with no queues, or non existing routing key

445 views
Skip to first unread message

Marco Galassi

unread,
Sep 12, 2018, 4:34:00 AM9/12/18
to rabbitmq-users
Hi everyone,
I am using Publisher Confirms, but I have noticed that if you send a message to an exchange and a queue is not associated no errors is thrown
and the publisher will then assume that the message has been correctly sent and enqueued to one (or multiple) queue(s).
Now, as far as I have understood, this is an expected behavior.

In my scenario, though, this is a problem: I am receving data from sensors. When these sensors receive a 200 OK response, they assume the data has correctly been sent to the server, so they will 
go on with their data and delete the sent data from their memory to save space.

If a message is sent from a sensor, the publisher publishes to an exchange which, for some reason, has no queue binded to it, this message will be lost (or also if no correct Routing Key is
specified). I need a way to understand when a message could not be enqueued to any queue (either because no queues are there, or only queues with specific Binding Keys not used as Routing Keys).

I have read about the mandatory flag, but I haven't understood how to use it.

I am using NodeJs and the amqp.node package.

Can anyone help me?

This is the simple code I am using:

...
for (let i = 0; i < nChunks; i++) {
  ch.publish('myExchange', 'myRoutingKey', Buffer.from({foo: 'bar'}), {persistent: true});
}
await ch.waitForConfirms();
...

If the exchange named myExchange has no queue bound to it, I get no errors, so the message will silently be lost.

Thank you

Karim Ahmed

unread,
Sep 12, 2018, 5:16:04 AM9/12/18
to rabbitm...@googlegroups.com
I believe that there is no direct solution to it. I think you can define a deadletter on your exchange that push messages to an unrouted-queue. Then you programmatically implement your business to failed routed messages like:
iterater over this messages in your code to see if any message exist match the published message.
Or 
Make another seperate consumer on unrouted-queue to save deleted data from memory.

If rabbit-mq professionals have another solution, please share it with us.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Michael Klishin

unread,
Sep 12, 2018, 6:07:11 AM9/12/18
to rabbitm...@googlegroups.com
Publish messages with the mandatory flag set to true and register a return handler (before publishing).
Unroutable messages will be returned. The rate of returns is collected as a metric that can be seen in the management UI
or collected by monitoring systems.


Then there are alternate exchanges [1] which can be used to collect unroutable messages, e.g. use an alternate exchange of type fanout
and bound a queue to it.


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Michael Klishin

unread,
Sep 12, 2018, 6:07:53 AM9/12/18
to rabbitm...@googlegroups.com
How can a DLX (dead-letter exchange) help here? By definition the message won't be routed anywhere.


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Marco Galassi

unread,
Sep 12, 2018, 6:12:39 AM9/12/18
to rabbitmq-users
Thank you for your answer.
Is it possible to have an example of how to use the mandatory flag, or at least a link to a complete example?

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Karim Ahmed

unread,
Sep 12, 2018, 6:20:55 AM9/12/18
to rabbitm...@googlegroups.com
Sorry, I meant alternate exchange, not deadletter-exchange.

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.


--
-------------------------------------------------------------------------------------------------------------------------
Karim Ahmed Abd El-Kareem

Junior Web Developer

Description: 11

Youxel Technology, Egypt
14/1, District 9 New Maadi,
Egypt – Cairo

+201 205 4919 87
+201 119 5383 78
kka...@youxel.com

www.youxel.com

Karim Ahmed

unread,
Sep 12, 2018, 7:32:50 AM9/12/18
to rabbitm...@googlegroups.com
Based on Michael suggestion and this thread. the following code is what I got.
                       
try {
AtomicBoolean isResponseOffered = new AtomicBoolean(false);
BlockingQueue<Boolean> response = new ArrayBlockingQueue<Boolean>(1);
channel.addReturnListener(
/**
* Implement this interface in order to be notified of failed
* deliveries when basicPublish is called with "mandatory" or
* "immediate" flags set.
*/
new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body)
throws IOException {
if(isResponseOffered.compareAndSet(false, true))
response.offer(false);
}
}); 
channel.addConfirmListener(
/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker.  Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
*/
new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if(isResponseOffered.compareAndSet(false, true))
response.offer(true);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if(isResponseOffered.compareAndSet(false, true))
response.offer(false);
}
});
channel.basicPublish(exchange, routingKey, true, props, message);
// Wait for response 
Boolean responseObj = response.poll(10, TimeUnit.SECONDS);
if(responseObj == null || !responseObj ) {
throw new  Exception ("Problem during publish the message, note message may be published but acknowledge not received");
}
} catch (IOException | InterruptedException e) {
throw new Exception("Problem during publish the message", e);
}

Please if share with me if you have any enhancements to the previous code.
 
Reply all
Reply to author
Forward
0 new messages