Rabbit MQ keeps on making new connection even though connection is static

741 views
Skip to first unread message

Vinay Sawant

unread,
Sep 13, 2017, 8:30:31 AM9/13/17
to rabbitmq-users



Here is my code


package com.mintwalk.util;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mintwalk.resource.LeadSquaredConfig;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.Connections;
import net.jodah.lyra.config.Config;
import net.jodah.lyra.config.RecoveryPolicies;

public class SingletonQueueConnection {
    private static final Logger logger = LoggerFactory.getLogger(SingletonQueueConnection.class);

    private static Connection connection = null;

    private Connection getConnection(LeadSquaredConfig leadSquaredConfig) {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            logger.debug(leadSquaredConfig.getUsername() + " : " + leadSquaredConfig.getPwd() + " : "
                    + leadSquaredConfig.getHostAddress() + " : " + leadSquaredConfig.getPortNo());

            factory.setUsername(leadSquaredConfig.getUsername());
            factory.setPassword(leadSquaredConfig.getPwd());
            factory.setHost(leadSquaredConfig.getHostAddress());
            factory.setPort(Integer.parseInt(leadSquaredConfig.getPortNo()));
            factory.setConnectionTimeout(300000);
            factory.setAutomaticRecoveryEnabled(false);
            factory.useBlockingIo();

            // getting a connection
            if (connection == null || !connection.isOpen()) {
                logger.debug("Connection is null or closed");
                if (connection != null) {
                    logger.debug("Existing Connection status : " + connection.isOpen());
                }
                ConnectionOptions options = new ConnectionOptions().withHost(leadSquaredConfig.getHostAddress())
                        .withUsername(leadSquaredConfig.getUsername()).withPassword(leadSquaredConfig.getPwd())
                        .withPort(Integer.parseInt(leadSquaredConfig.getPortNo()));
                final Config config = new Config().withRecoveryPolicy(RecoveryPolicies.recoverNever());
                connection = Connections.create(options, config);
                connection.close(0);
                // connection = factory.newConnection("MQ");
                logger.debug("New Connection status : " + connection.isOpen());
            }
        } catch (Exception ex) {
            logger.error("Create Connection Error : " + ex.toString());
            conClose();
        }
        return connection;
    }

    private void conClose() {
        // closing connection,
        // closes all the open channels
        logger.debug("Closing Queue");
        try {
            if (connection.isOpen()) {
                logger.debug("Connection Open, So closing");
                connection.close();
                connection = null;
            }
        } catch (IOException e) {
            logger.debug("Error closing connection " + e.toString());
            connection = null;
        }
    }

    public void publishMessage(LeadSquaredConfig leadSquaredConfig, Object jsonObj, String routingkey) {
        try {

            // creating a channel
            if (connection == null || !connection.isOpen()) {
                Connection con = getConnection(leadSquaredConfig);
                if (con != null) {
                    Channel channel = con.createChannel();
                    if (channel != null) {
                        channel.queueBind(leadSquaredConfig.getQueueName(), "amq.direct", routingkey);
                        channel.basicPublish(routingkey, leadSquaredConfig.getQueueName(), null,
                                jsonObj.toString().getBytes("utf-8"));
                        channel.close();
                    }
                }
            }
        } catch (Exception e) {
            logger.error("Error connecting to Queue : " + e.toString());
            logger.error(e.getMessage());
            conClose();
        }
    }
}




The moment i load test,it creates new connection on new port


Tried many different ways but still new connection is created. Please help asap.

Arnaud Cogoluègnes

unread,
Sep 13, 2017, 9:32:44 AM9/13/17
to rabbitm...@googlegroups.com
Can you provide the Java Client, Lyra, and RabbitMQ version you're using? And a snippet of code that uses your code above and reproduces the issue.

Interestingly, we saw a similar problem a few days ago [1], because automatic recovery was enabled for both RabbitMQ Java Client and Lyra. Your settings look OK though, so a snippet to reproduce is welcome.


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Vinay Sawant

unread,
Sep 13, 2017, 9:42:44 AM9/13/17
to rabbitmq-users
java client 4.2.1

lyra 0.5.4

rabbitmq version 3.6.1

Here is the snippet

        SingletonQueueConnection mqCon = new SingletonQueueConnection();
                if (mqCon != null) {
                    mqCon.publishMessage(leadSquaredConfig, parentObj, "");

Michael Klishin

unread,
Sep 13, 2017, 11:18:16 AM9/13/17
to rabbitm...@googlegroups.com
Java client 4.0+ has connection recovery enabled by default. Lyra also performs connection
recovery. One of them has to be disabled (and if it's in Lyra, then you don't need Lyra at all).


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Vinay Sawant

unread,
Sep 14, 2017, 2:40:05 AM9/14/17
to rabbitmq-users
package com.mintwalk.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;

import com.mintwalk.resource.LeadSquaredConfig;
import com.rabbitmq.client.Channel;


public class SingletonQueueConnection {
    private static final Logger logger = LoggerFactory.getLogger(SingletonQueueConnection.class);

    private static Connection connection = null;
    private static CachingConnectionFactory factory = null;


    private Connection getConnection(LeadSquaredConfig leadSquaredConfig) {
        try {
            factory = new CachingConnectionFactory(leadSquaredConfig.getHostAddress());

            // ConnectionFactory factory = new ConnectionFactory();


            logger.debug(leadSquaredConfig.getUsername() + " : " + leadSquaredConfig.getPwd() + " : "
                    + leadSquaredConfig.getHostAddress() + " : " + leadSquaredConfig.getPortNo());

            factory.setUsername(leadSquaredConfig.getUsername());
            factory.setPassword(leadSquaredConfig.getPwd());
            factory.setPort(Integer.parseInt(leadSquaredConfig.getPortNo()));
            factory.setConnectionTimeout(300000);
            factory.setConnectionLimit(3);


            // getting a connection
            if (connection == null || !connection.isOpen()) {
                logger.debug("Connection is null or closed");
                if (connection != null) {
                    logger.debug("Existing Connection status : " + connection.isOpen());
                }
                connection = factory.createConnection();


                logger.debug("New Connection status : " + connection.isOpen());
            }
        } catch (Exception ex) {
            logger.error("Create Connection Error : " + ex.toString());
            conClose();
        }
        if (connection != null) {
            logger.debug("Connection established");

        }
        return connection;
    }

    private void conClose() {
        // closing connection,
        // closes all the open channels
        logger.debug("Closing Queue");
        try {
            if (connection.isOpen()) {
                logger.debug("Connection Open, So closing");
                connection.close();
                connection = null;
            }
        } catch (Exception e) {

            logger.debug("Error closing connection " + e.toString());
            connection = null;
        }
    }

    public void publishMessage(LeadSquaredConfig leadSquaredConfig, Object jsonObj, String routingkey) {
        try {

            // creating a channel

            connection = getConnection(leadSquaredConfig);
            if (connection != null) {
                Channel channel = connection.createChannel(false);
                // con.createChannel(false);

                if (channel != null) {
                    channel.queueBind(leadSquaredConfig.getQueueName(), "amq.direct", routingkey);
                    channel.basicPublish(routingkey, leadSquaredConfig.getQueueName(), null,
                            jsonObj.toString().getBytes("utf-8"));
                    channel.close();
                }

            }
        } catch (Exception e) {
            logger.error("Error connecting to Queue : " + e.toString());
            logger.error(e.getMessage());
            conClose();
        }
    }

}


This is the code i tried,i tried using CacheConnectionFactory,removed the lyra code,still it is making too many connections





On Wednesday, September 13, 2017 at 6:00:31 PM UTC+5:30, Vinay Sawant wrote:

Arnaud Cogoluègnes

unread,
Sep 14, 2017, 5:44:59 AM9/14/17
to rabbitm...@googlegroups.com
Sorry, I can't reproduce. You should remove Lyra as the code sets a 'recoverNever' recover policy.

You can also get rid of the CacheConnectionFactory (from Spring AMQP) I guess.

If you want to implement the singleton pattern, you should use a dependency injection container like Spring. It's already on your classpath, as you use Spring AMQP.

--

Arnaud Cogoluègnes

unread,
Sep 14, 2017, 8:13:11 AM9/14/17
to rabbitm...@googlegroups.com
Are you sending messages from different threads? If yes, checking the connection against null may not behave as you expect (threads can see it as null even if a thread did set a value). You can declare the connection as transient or use an AtomicReference, or even better use a container that creates your resources safely (e.g. Spring).

On Thu, Sep 14, 2017 at 8:40 AM, Vinay Sawant <vinay...@gmail.com> wrote:

--
Reply all
Reply to author
Forward
0 new messages