package rmq.test.pool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RMQPool {
private final static String QUEUE_NAME = "myqueue";
private final static String USERNAME = "username";
private final static String PASSWORD = "123456";
private final static String SERVER_ADR = "localhost";
private final static int MAX_CHANNEL = 20;
private final static long TIME_OUT = 5000;
private Connection connection;
private List<Channel> channels;
private static RMQPool instance;
public RMQPool() throws IOException, TimeoutException {
awakeConnection();
}
public static RMQPool getInstance() throws IOException, TimeoutException {
if (instance == null) {
instance = new RMQPool();
}
return instance;
}
private boolean awakeConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setHost(SERVER_ADR);
connection = factory.newConnection();
if (channels != null) {
channels.clear();
} else {
channels = new ArrayList<Channel>();
}
for (int i = 0; i < MAX_CHANNEL; i++) {
spawnChannel();
}
return true;
}
private void spawnChannel() throws IOException {
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channels.add(channel);
}
public Channel getChannel() throws IOException, TimeoutException {
if (this.connection.isOpen()) {
while (channels.size() == 0) {
try {
channels.wait(TIME_OUT);
} catch (InterruptedException e) {
}
if (channels.size() == 0) {
spawnChannel();
}
}
return channels.remove(0);
} else {
awakeConnection();
return getChannel();
}
}
public void releaseChannel (Channel channel) throws IOException {
if (channel.isOpen()) {
channels.add(channel);
channels.notifyAll();
} else if (channels.size() < MAX_CHANNEL) {
spawnChannel();
}
}
}