Blocked Connection Notification Not Working (Java)

553 views
Skip to first unread message

Aplesh Rana

unread,
Nov 24, 2014, 4:41:27 AM11/24/14
to rabbitm...@googlegroups.com, ar...@qasource.com
Hi All,

I am implementing blocked connection notification of RabbitMQ in my java code, i have implemented both handleBlock and handleUnblocked methods, i have pushed some load to my rabbit 3.3.5 server, in UI under Connections tab status is getting changed to blocking,blocked,running but control is not passing to the handleBlock method.Below is the piece of code and configuration for memory management.

{code}
Connection connection = null;
        try {
            connection = factory.newConnection();
            System.out.println(connection.getClientProperties());
        } catch (IOException e) {
            logger.info("Exception occurred while creating Rabbit connection");
        }
        connection.addBlockedListener(new BlockedListener() {
            @Override
            public void handleUnblocked() throws IOException {
                System.out.println("unblocked");
                logger.info("Rabbit Connection Unblocked");
            }
           
            @Override
            public void handleBlocked(String arg0) throws IOException {
                logger.info("Rabbit Server Error, Connection Blocked");
                System.out.println("Blocked");
                throw new RuntimeException("RabbitMQ Connection Blocked");
            }             
        });{code}

Below is the memory configuration for testing purpose.

{code}
{vm_memory_high_watermark, 0.02},
{vm_memory_high_watermark_paging_ratio, 0.02},
{disk_free_limit, 20000000}
{code}

Can someone help me to bring control to the overridden methods of BlockedListener Interface.

Regards,
Aplesh Rana

Michael Klishin

unread,
Nov 24, 2014, 5:05:57 AM11/24/14
to rabbitm...@googlegroups.com, Aplesh Rana, ar...@qasource.com
On 24 November 2014 at 12:41:30, Aplesh Rana (aplesh...@gmail.com) wrote:
> @Override
> public void handleBlocked(String arg0) throws IOException
> {
> logger.info("Rabbit Server Error, Connection Blocked");
> System.out.println("Blocked");
> throw new RuntimeException("RabbitMQ Connection Blocked");
> }

You cannot throw exceptions in these handlers as they are not dispatched on the
thread you add them on.
--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Aplesh Rana

unread,
Nov 24, 2014, 5:14:07 AM11/24/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Thank You for responding Michael. Ok, let's forget about throwing RunTime Exceptions. Can i print something if my connections are getting blocked? I have added a print statement but control is never going to the handleBlocked method. Any suggestions or thoughts?

Regards,
Aplesh Rana

Michael Klishin

unread,
Nov 24, 2014, 5:23:48 AM11/24/14
to rabbitm...@googlegroups.com, Aplesh Rana, aplesh...@gmail.com
 On 24 November 2014 at 13:14:10, Aplesh Rana (aplesh...@gmail.com) wrote:
> Can i print something if my connections are getting blocked?
> I have added a print statement but control is never going to the
> handleBlocked method. Any suggestions or thoughts?

You need to publish on a connection for it to be blocked. Connections that never publish
are never blocked (we want consumers to drain messages!).

Aplesh Rana

unread,
Nov 24, 2014, 5:30:23 AM11/24/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Yes, I have published a flood of messages to a block it and it is blocked but i didn't get control to the overriden method of the listener. For the time being i have stopped my consumers because i need to test the connection blocking notification. for notification i have added a print statement to notify.

below is the piece of code that i am using to block connection. This code is running under a loop of millions to block connection.

rabbitTemplate.convertAndSend("TEST", test, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setHeader("TESTID", "test");
                        return message;
                    }
                });

Regards,
Aplesh Rana

Michael Klishin

unread,
Nov 24, 2014, 5:32:07 AM11/24/14
to rabbitm...@googlegroups.com, Aplesh Rana, aplesh...@gmail.com
On 24 November 2014 at 13:30:26, Aplesh Rana (aplesh...@gmail.com) wrote:
> below is the piece of code that i am using to block connection.
> This code is running under a loop of millions to block connection.
>
> rabbitTemplate.convertAndSend("TEST", test, new MessagePostProcessor()
> {
> @Override
> public Message postProcessMessage(Message message) throws
> AmqpException {
> message.getMessageProperties().setHeader("TESTID", "test");
> return message;
> }
> });

This is Spring AMQP. Are you sure it uses the connection you assume it does? 

Aplesh Rana

unread,
Nov 24, 2014, 5:37:30 AM11/24/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Michael,

I looked into the implementation of Spring AMQP RabbitTemplate's implementation. It is internally publishing message to channel i.e. channel.basicPublish(exchange, routingKey, mandatory, immediate, convertedMessageProperties, message.getBody());
and we get channel from Connection. Spring AMQP itself manage the connections.

Regards,
Aplesh Rana

Michael Klishin

unread,
Nov 24, 2014, 5:44:45 AM11/24/14
to rabbitm...@googlegroups.com, Aplesh Rana, aplesh...@gmail.com
 On 24 November 2014 at 13:37:33, Aplesh Rana (aplesh...@gmail.com) wrote:
> I looked into the implementation of Spring AMQP RabbitTemplate's
> implementation. It is internally publishing message to channel
> i.e. channel.basicPublish(exchange, routingKey, mandatory,
> immediate, convertedMessageProperties, message.getBody());
> and we get channel from Connection. Spring AMQP itself manage
> the connections.

I am aware of that. My point is: are you sure the connection your message is published on
is exactly the same object you register the connection.[un]blocked listeners on?

When in doubt, use the Tracer tool to see what's going up and down the wire:
http://rabbitmq.com/java-tools.html

Aplesh Rana

unread,
Nov 24, 2014, 5:51:50 AM11/24/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Yes, I am running a standalone application. I have created three java files to publish messages in bulk. No other publisher/sender is communicating with my rabbit instance. After executing main method in all three java files 6-10 connections are being displayed in the console in running state, everything is going smooth but after sometime everything is getting changes all connections are changing state to blocking/blocked none of them in running state. Please find the attached screenshot for more details.

Regrads,
Aplesh Rana
Connection-Blocking.jpg

Michael Klishin

unread,
Nov 24, 2014, 5:54:36 AM11/24/14
to rabbitm...@googlegroups.com, Aplesh Rana, aplesh...@gmail.com
On 24 November 2014 at 13:51:53, Aplesh Rana (aplesh...@gmail.com) wrote:
> Yes, I am running a standalone application. I have created three
> java files to publish messages in bulk. No other publisher/sender
> is communicating with my rabbit instance. After executing main
> method in all three java files 6-10 connections are being displayed
> in the console in running state, everything is going smooth but
> after sometime everything is getting changes all connections
> are changing state to blocking/blocked none of them in running
> state. Please find the attached screenshot for more details.

Now if you either drain messages or bump VM memory watermark to a higher value
with rabbitmqctl, they will be unblocked and you should see a connection.unblocked
frame in the tracer, and registered listeners *on the blocked connections* should be
invoked. 

Aplesh Rana

unread,
Nov 24, 2014, 6:40:43 AM11/24/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Yes Michael, I have executed rabbitmqctl set_vm_memory_high_watermark 0.6. It released the connections and changed their state to running from blocking/blocked. But my concern was not how to release blocked connections. My question is why the control in the code is not going to the overridden methods of Blocked listener. Even after executing rabbitmqctl control never passed to any of overridden method and same case is happened with the connection blocking. Now my question is, Is this Blocked listener feature is not supported in Spring AMQP?

Regards,
Aplesh Rana

Michael Klishin

unread,
Nov 24, 2014, 6:45:35 AM11/24/14
to rabbitm...@googlegroups.com, Aplesh Rana, aplesh...@gmail.com
 On 24 November 2014 at 14:40:45, Aplesh Rana (aplesh...@gmail.com) wrote:
> My question is why the control in the code is not going to the overridden
> methods of Blocked listener. Even after executing rabbitmqctl
> control never passed to any of overridden method and same case
> is happened with the connection blocking. Now my question is,
> Is this Blocked listener feature is not supported in Spring AMQP?

Spring AMQP is not concerned with this. It's Java client feature. I have suggested
this before but perhaps worth repeating: use the Tracer to see what's going on the wire.

If you see connection.[un]blocked methods in Tracer output, by far the most likely
problem is that the connection blocked is not the same that you register the listener
on. Spring AMQP may or may not have anything to do with that.

Inspecting protocol methods sent is the only reasonable way to proceed investigating this.

Aplesh Rana

unread,
Nov 24, 2014, 7:01:02 AM11/24/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Thank You for you Supportive Help Michael, Finally i came to know about the issue. This is an issue with Spring AMQP, Rabbit Template is not adding any Blocked Listener with the connection. I have created a dummy method and published messages with Connection with blocked listener. Below is the sample code, might be helpful for someone. Thank you again Michael.

RabbitTemplate inside has ConnectionFactoryUtils.java to create connection which never add block listener with itself.


Connection connection = null;
                try {
                    connection = factory.newConnection();
                } catch (IOException e) {}

                connection.addBlockedListener(new BlockedListener() {
                    @Override
                    public void handleUnblocked() throws IOException {
                        System.out.println("unblocked");
                    }
                   
                    @Override
                    public void handleBlocked(String arg0) throws IOException {
                        System.out.println("Blocked");
                        throw new RuntimeException("RabbitMQ Connection Blocked");
                    }             
                });
                com.rabbitmq.client.Channel channel = connection.createChannel();
               
                channel.basicPublish("TEST", "test", false, null, null);


Regards,
Aplesh Rana

Michael Klishin

unread,
Nov 24, 2014, 7:02:32 AM11/24/14
to rabbitm...@googlegroups.com, Aplesh Rana, aplesh...@gmail.com
 On 24 November 2014 at 15:01:04, Aplesh Rana (aplesh...@gmail.com) wrote:
> Thank You for you Supportive Help Michael, Finally i came to
> know about the issue. This is an issue with Spring AMQP, Rabbit
> Template is not adding any Blocked Listener with the connection.
> I have created a dummy method and published messages with Connection
> with blocked listener. Below is the sample code, might be helpful
> for someone. Thank you again Michael.

If you use the most recent Spring AMQP version and believe this is a Spring AMQP
limitation, feel free to file an issue with them.

Aplesh Rana

unread,
Nov 24, 2014, 7:07:31 AM11/24/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Sure, I will definitely do that. Thank you Michael.

Regards,
Aplesh Rana

Gary Russell

unread,
Nov 24, 2014, 9:06:55 AM11/24/14
to Aplesh Rana, rabbitm...@googlegroups.com
You can add the listener by using a Spring AMQP ConnectionListener.

I responded with details to your stack overflow question [1].



--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Aplesh Rana

unread,
Nov 25, 2014, 5:56:14 AM11/25/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Thank you Gary. It really helped me to implement Connection Blocking Notification. I am curious to know, why we can't throw any Runtime Exception to pass the control to the caller of the function. I am throwing Runtime Exception from handleBlocked and also added a Exceptionhandler to the RabbitMQ connection factory. Control is passing to the ExceptionHandler but i am unable to catch exception at my code. Any thoughts?

Regards,
Aplesh Rana

Michael Klishin

unread,
Nov 25, 2014, 5:58:45 AM11/25/14
to rabbitm...@googlegroups.com, Aplesh Rana
On 25 November 2014 at 13:56:16, Aplesh Rana (aplesh...@gmail.com) wrote:
> I am curious to know, why we can't throw any Runtime Exception
> to pass the control to the caller of the function. I am throwing
> Runtime Exception from handleBlocked and also added a Exceptionhandler
> to the RabbitMQ connection factory. Control is passing to the
> ExceptionHandler but i am unable to catch exception at my code.
> Any thoughts?

The listeners are invoked on the RabbitMQ client's I/O thread, not the one you use to
open up a connection. Spring AMQP cannot and should not affect this.

Aplesh Rana

unread,
Nov 25, 2014, 6:03:06 AM11/25/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
So there is no way around to handle such scenario?

Aplesh Rana

unread,
Nov 25, 2014, 6:07:31 AM11/25/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Michael, How is BlockedListener working at the thread where i am managing connections?

Aplesh Rana

unread,
Nov 25, 2014, 6:43:05 AM11/25/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
I am adding the ExceptionHandler to the rabbitmq client instead of Spring Amqp. On throwing Runtime exception, it is redirecting to the exception handler but not to the main caller class. And i am getting error message Exception in thread "pool-1-thread-1"

Michael Klishin

unread,
Nov 25, 2014, 6:53:12 AM11/25/14
to rabbitm...@googlegroups.com, Aplesh Rana
On 25 November 2014 at 14:43:07, Aplesh Rana (aplesh...@gmail.com) wrote:
> I am adding the ExceptionHandler to the rabbitmq client instead
> of Spring Amqp. On throwing Runtime exception, it is redirecting
> to the exception handler but not to the main caller class. And
> i am getting error message Exception in thread "pool-1-thread-1"

Aplesh,

Yes, this is the thing I was talking about. Server-sent protocol exceptions and notifications
are dispatched from the I/O thread. You have to use a synchronisation technique
(e.g. an atomic boolean as a flag).

Aplesh Rana

unread,
Nov 25, 2014, 7:54:00 AM11/25/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Michael, Or do I need to remove the blocked listener first?

Michael Klishin

unread,
Nov 25, 2014, 7:57:18 AM11/25/14
to rabbitm...@googlegroups.com, Aplesh Rana
On 25 November 2014 at 15:54:02, Aplesh Rana (aplesh...@gmail.com) wrote:
> Michael, Or do I need to remove the blocked listener first?

You set up Spring AMQP stuff, connection factory, etc on thread 1. The listener is invoked
on thread N. You have to coordinate other code on thread 1 with N if you want thread 1
to be aware of the events. However you do that is up to you but java.util.concurrent.*
primitives likely will cover almost any conceivable scenario.

Aplesh Rana

unread,
Nov 25, 2014, 8:36:44 AM11/25/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Michael, Is there any tested solution for this scenario?

Michael Klishin

unread,
Nov 25, 2014, 8:38:44 AM11/25/14
to rabbitm...@googlegroups.com, Aplesh Rana
On 25 November 2014 at 16:36:46, Aplesh Rana (aplesh...@gmail.com) wrote:
> Michael, Is there any tested solution for this scenario?

Aplesh,

This is a generic Java concurrency question. I cannot possibly suggesting anything
without knowing what you need to do in your system.

Java Concurrency in Practice is the best book on the subject. j.u.c. docs are also
pretty good. 

Aplesh Rana

unread,
Nov 25, 2014, 8:49:30 AM11/25/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Code under my Test class is:
try{
rabbitTemplate.convertAndSend("TEST1", test, new MessagePostProcessor() {

                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setHeader("TESTID1", "test1");
                        return message;
                    }
                });
}catch(Exception exception){}

ConnectionFactory Inside RabbitTemplate:
connectionFactory.addConnectionListener(new ConnectionListener() {           
            @Override
            public void onCreate(Connection connection) {
                Channel channel = connection.createChannel(false);               
                channel.getConnection().addBlockedListener(new BlockedListener() {

                   
                    @Override
                    public void handleUnblocked() throws IOException {                                               
                    }
                   
                    @Override
                    public void handleBlocked(String reason) {                       
                        throw new RuntimeException("Exception Occurred");
                        }
                });       
       
        try {
          channel.close();
      }
      catch (IOException ioException) {      
      }
            }
            @Override
            public void onClose(Connection connection) {
                // TODO Auto-generated method stub
               
            }
        });
       
I am trying to bring control at my Test class when connections are getting blocked. I have thrown RuntimeException to achieve this. Am I doing something wrong?

Michael Klishin

unread,
Nov 25, 2014, 8:51:37 AM11/25/14
to rabbitm...@googlegroups.com, Aplesh Rana
On 25 November 2014 at 16:49:31, Aplesh Rana (aplesh...@gmail.com) wrote:
> I am trying to bring control at my Test class when connections
> are getting blocked. I have thrown RuntimeException to achieve
> this. Am I doing something wrong?

I don't have much to add to what's been said previously. Throwing exceptions in
thread B doesn't make them get thrown in thread A.

This is a generic Java question, this list is about RabbitMQ. 
Please put some effort into investigating what your options are and how to use them
on your own.

Aplesh Rana

unread,
Nov 28, 2014, 2:52:48 AM11/28/14
to rabbitm...@googlegroups.com, aplesh...@gmail.com
Michael,

Clustering of Rabbitmq will be helpful to get rid of connection blocking?
Reply all
Reply to author
Forward
0 new messages