I seem to be facing a problem whenever my connection to rabbitmq is interrupted due to a bad internet connection.
In brief, I have a consumer that fetches messages from a queue, process these messages one at a time and finally notifies rabbit when done processing to move to the next one.
So while testing, I tried a scenario that involves multiple connection failures in a small interval of time or let's say I tried a worst case scenario. After carefully examining my results, I noticed that my consumer processed a message twice. And by the way, I'm using automatic connection/consumer recovery.
Any solution suggestions to this problem would be highly appreciated.
Also, please note that I added below all major parts of my code.
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bodyBytes) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, bodyBytes);
try {
if(mReadChannel != null) {
String body = new String(bodyBytes);
Rabbit.Message message = Rabbit.MessageBuilder.fromJson(body);
if (working) {
mReadChannel.basicNack(envelope.getDeliveryTag(), false, true);
return;
}
working = true;
mReadChannel.basicAck(envelope.getDeliveryTag(), false);
doWork(message);
} else {
return;
}
try {
synchronized (mLock) {
mLock.wait();
working = false;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch(Exception ex) {
working = false;
ex.printStackTrace();
}
};
mReadChannel.basicConsume(Settings.get().getSendQueue(), false, consumer);
Also below is how I'm setting up my connection factory.
private void initRabbitConnectionFactory() {
mRabbitConnectionFactory = new ConnectionFactory();
mRabbitConnectionFactory.setHost("xxx.xxx.xxx");
mRabbitConnectionFactory.setUsername("user");
mRabbitConnectionFactory.setPassword("password");
mRabbitConnectionFactory.setVirtualHost("/");
mRabbitConnectionFactory.setPort(5672);
mRabbitConnectionFactory.setAutomaticRecoveryEnabled(true);
mRabbitConnectionFactory.setTopologyRecoveryEnabled(true);
mRabbitConnectionFactory.setExceptionHandler(new ExceptionHandler() {
@Override
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
exception.printStackTrace();
}
@Override
public void handleReturnListenerException(Channel channel, Throwable exception) {
exception.printStackTrace();
}
@Override
public void handleFlowListenerException(Channel channel, Throwable exception) {
exception.printStackTrace();
}
@Override
public void handleConfirmListenerException(Channel channel, Throwable exception) {
exception.printStackTrace();
}
@Override
public void handleBlockedListenerException(Connection connection, Throwable exception) {
exception.printStackTrace();
}
@Override
public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) {
exception.printStackTrace();
}
@Override
public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
exception.printStackTrace();
}
@Override
public void handleChannelRecoveryException(Channel ch, Throwable exception) {
exception.printStackTrace();
}
@Override
public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception) {
exception.printStackTrace();
}
});
}
Finally the below function releases the wait lock for handleDelivery.
private synchronized void notifyRabbit(final String queue, String smsFrom, String smsBody, boolean proceed, boolean smsSent) {
AsyncTask<Void, Void, Void> a = new AsyncTask<Void, Void, Void>() {
@Override
protected Void doInBackground(Void... params) {
while (true) {
Connection connection = null;
Channel channel = null;
try {
connection = mRabbitConnectionFactory.newConnection();
channel = connection.createChannel();
channel.confirmSelect();
AMQP.BasicProperties messageProperties = new AMQP.BasicProperties.Builder().contentType("text/plain").contentEncoding("UTF-8").build();
channel.basicPublish("", queue, messageProperties, b.since(timeSMSSent).secret(Settings.get().getSecret()).buildJson().getBytes());
} catch (Exception e) {
e.printStackTrace();
}
try {
if (channel != null)
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
try {
if (connection != null)
connection.close();
} catch (Exception e) { e.printStackTrace();
}
if (didWork) {
synchronized (mLock) {
mLock.notifyAll();
}
return null;
}
}
}
};
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB) {
a.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
} else {
a.execute();
}
}