Channel.basicPublish doesn't throw exception if HAProxy is crashed

91 views
Skip to first unread message

Nitesh Soni

unread,
Dec 2, 2019, 11:05:34 AM12/2/19
to rabbitmq-users
Hello,
I have one simple rabbitmq publisher JAVA application that connects to HAProxy to send message to RabbitMQ cluster. Following is the code:
===========================================================
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory cf = new ConnectionFactory();
Address haAddr = new Address("192.168.11.113", 5671);
cf.setUsername("test25");
cf.setPassword("test25"); 
Connection con = cf.newConnection(new Address[]{haAddr});
// con.addShutdownListener(new ConnectionShutdownListener());
Channel channel = con.createChannel();
for(int index = 0; index < 500; index++){
sendMessage(channel, "m"+ index);
}
System.out.println("done");
}  
private static void sendMessage(Channel channel, String msg){
try {
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}

===========================================================
This App creates a connection with HAProxy and then keep on sending messages to the RabbitMQ queue. I tested a case where application has created a channel from Connection(Connection with HAProxy) and started sending messages. While it was sending messages, I made HAProxy service stopped. I observed that though HAProxy wasn't running channel.basicPublish was executed successfully for all subsequent messages but messages were not inserted in queue. I expect channel.basicPublish to throw some exception if HAProxy is down so that app can handle this scenario and message loss can be avoided.Is there any approach to achieve it?


Luke Bakken

unread,
Dec 2, 2019, 11:34:52 AM12/2/19
to rabbitmq-users
Hi Nitesh,

At some point you will get an exception related to the TCP connection being closed. Most likely in your case you didn't publish enough messages to see this happen.

You must use publisher confirms if messages can't be lost - https://www.rabbitmq.com/tutorials/tutorial-seven-java.html

Thanks,
Luke

Arnaud Cogoluègnes

unread,
Dec 2, 2019, 11:42:01 AM12/2/19
to rabbitm...@googlegroups.com
I gave it a try with a basic HAProxy configuration and
Channel#basicPublish does throw an exception because it detects the
connection has died. You should see the exception bubbling up in your
code.

Please share more about your test (RabbitMQ and Java client version,
HAProxy configuration, server logs).
> --
> You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
> To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/678811ae-1628-4a57-957e-143aa064d81e%40googlegroups.com.

Nitesh Soni

unread,
Dec 3, 2019, 5:45:14 AM12/3/19
to rabbitm...@googlegroups.com
Hello Luke, 
Thanks for guiding me. Per your suggestion I used confirmSelect() and tested this in heavy load environment where 100 threads were trying to send messages continuously for 2 mins using a single shared channel. In this case though my rabbitMQ node is running, after sending very few messages my all 100 threads are died with exception 'AlreadyClosedException' or 'ShutdownSignalException'. I had also registered 'ShutdownListener' but it was never notified. My worry is how to handle heavy messaging environment with this kind of approach. Following is sample code I executed to simulate this scenario: 
============================Sample Code===============================================
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory cf = new ConnectionFactory();
Address haAddr = new Address("192.168.X.X", 5672);

cf.setUsername("test25");
cf.setPassword("test25");
Connection con = cf.newConnection(new Address[]{haAddr});
con.addShutdownListener(new ConnectionShutdownListener());
Channel channel = con.createChannel();
channel.confirmSelect();
CPublisherConfirm pub = new CPublisherConfirm();
for(int index = 0; index < 100; index++){
MessageSender mSender = pub.new MessageSender(channel);
mSender.start();
}

System.out.println("done");
}  

private class MessageSender extends Thread{

Channel channel = null;
public MessageSender(Channel ch) {
channel = ch;
}
@Override
public void run() {
long curTime = System.currentTimeMillis();
long twoMin = 1000*60*2 + curTime;
int index = 0;
try{
while(System.currentTimeMillis() < twoMin ){
sendMessage(channel, "msg"+index++);
}
}catch(Exception e){
System.out.println(e.getClass().getName());
}
System.out.println("Total: "+ index);

}

}

private static void sendMessage(Channel channel, String msg){
try {
channel.basicPublish("", QUEUE_NAME, true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
try {
channel.waitForConfirmsOrDie(10000);
} catch (InterruptedException | TimeoutException e) {
System.out.println("Exception:");
}
} catch (IOException e) {
e.printStackTrace();
}catch (Exception e) {
System.out.println("Throwe");
throw e;
}
}
private static class ConnectionShutdownListener implements ShutdownListener{

@Override
public void shutdownCompleted(ShutdownSignalException cause) {

System.out.println("ShutDown#############################################");

}

}
====================================================================

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

Luke Bakken

unread,
Dec 3, 2019, 8:35:33 AM12/3/19
to rabbitmq-users
Hi Nitesh,

I see that you opened an issue on GitHub in the rabbitmq-java-client repository with this exact, which my colleague transferred here and then closed as a double-post: https://github.com/rabbitmq/discussions/issues/22

First of all, there is no evidence of a bug in the rabbitmq-java-client library, so opening an issue on GitHub is not appropriate. Please be patient as the people who assist you on this mailing list do so when time allows (for free!).

Any time your application has an issue, you should check RabbitMQ's log file for log entries at the same time. My colleague has answered your question in another message, please check that out.

Thanks,
Luke
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.

Arnaud Cogoluègnes

unread,
Dec 3, 2019, 8:47:32 AM12/3/19
to rabbitm...@googlegroups.com
Do not share channels between threads. For more information, please see [1].

There should be some synchronization after the start of all the
threads to wait for them to finish their work. If you don't do that,
the JVM process will stop, closing the connection likely while the
threads are still running. This may be the reason of the errors you
see. Remember the threads will start and do their work in the
background, it's very unlikely they're all finished once the program
exits from the for loop.

A single channel can publish a decent amount of messages (a few
thousands / second), so you can start with a single thread and see if
it's enough before going multi-threaded.

You can learn more about publisher confirms handling in the tutorial [2].

[1] https://www.rabbitmq.com/api-guide.html#concurrency
[2] https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
> To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/CAC7Usp3g9szO0MJXVuFWoZN9z%3DK19d%2BZ6RMe7LZ9PwK2bQinGQ%40mail.gmail.com.
Reply all
Reply to author
Forward
0 new messages