
Here is my code
package com.mintwalk.util;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mintwalk.resource.LeadSquaredConfig;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.Connections;
import net.jodah.lyra.config.Config;
import net.jodah.lyra.config.RecoveryPolicies;
public class SingletonQueueConnection {
private static final Logger logger = LoggerFactory.getLogger(SingletonQueueConnection.class);
private static Connection connection = null;
private Connection getConnection(LeadSquaredConfig leadSquaredConfig) {
try {
ConnectionFactory factory = new ConnectionFactory();
logger.debug(leadSquaredConfig.getUsername() + " : " + leadSquaredConfig.getPwd() + " : "
+ leadSquaredConfig.getHostAddress() + " : " + leadSquaredConfig.getPortNo());
factory.setUsername(leadSquaredConfig.getUsername());
factory.setPassword(leadSquaredConfig.getPwd());
factory.setHost(leadSquaredConfig.getHostAddress());
factory.setPort(Integer.parseInt(leadSquaredConfig.getPortNo()));
factory.setConnectionTimeout(300000);
factory.setAutomaticRecoveryEnabled(false);
factory.useBlockingIo();
// getting a connection
if (connection == null || !connection.isOpen()) {
logger.debug("Connection is null or closed");
if (connection != null) {
logger.debug("Existing Connection status : " + connection.isOpen());
}
ConnectionOptions options = new ConnectionOptions().withHost(leadSquaredConfig.getHostAddress())
.withUsername(leadSquaredConfig.getUsername()).withPassword(leadSquaredConfig.getPwd())
.withPort(Integer.parseInt(leadSquaredConfig.getPortNo()));
final Config config = new Config().withRecoveryPolicy(RecoveryPolicies.recoverNever());
connection = Connections.create(options, config);
connection.close(0);
// connection = factory.newConnection("MQ");
logger.debug("New Connection status : " + connection.isOpen());
}
} catch (Exception ex) {
logger.error("Create Connection Error : " + ex.toString());
conClose();
}
return connection;
}
private void conClose() {
// closing connection,
// closes all the open channels
logger.debug("Closing Queue");
try {
if (connection.isOpen()) {
logger.debug("Connection Open, So closing");
connection.close();
connection = null;
}
} catch (IOException e) {
logger.debug("Error closing connection " + e.toString());
connection = null;
}
}
public void publishMessage(LeadSquaredConfig leadSquaredConfig, Object jsonObj, String routingkey) {
try {
// creating a channel
if (connection == null || !connection.isOpen()) {
Connection con = getConnection(leadSquaredConfig);
if (con != null) {
Channel channel = con.createChannel();
if (channel != null) {
channel.queueBind(leadSquaredConfig.getQueueName(), "amq.direct", routingkey);
channel.basicPublish(routingkey, leadSquaredConfig.getQueueName(), null,
jsonObj.toString().getBytes("utf-8"));
channel.close();
}
}
}
} catch (Exception e) {
logger.error("Error connecting to Queue : " + e.toString());
logger.error(e.getMessage());
conClose();
}
}
}
The moment i load test,it creates new connection on new port
Tried many different ways but still new connection is created. Please help asap.