same consumer taking message twice

2,906 views
Skip to first unread message

Mohamad Nachabe

unread,
Apr 7, 2017, 3:38:37 AM4/7/17
to rabbitmq-users
Hello, 

I seem to be facing a problem whenever my connection to rabbitmq is interrupted due to a bad internet connection. 
In brief, I have a consumer that fetches messages from a queue, process these messages one at a time and finally notifies rabbit when done processing to move to the next one.

So while testing, I tried a scenario that involves multiple connection failures in a small interval of time or let's say I tried a worst case scenario. After carefully examining my results, I noticed that my consumer processed a message twice. And by the way, I'm using automatic connection/consumer recovery. 

Any solution suggestions to this problem would be highly appreciated.

Also, please note that I added below all major parts of my code.

Here's how I'm consuming my messages.

connection = mRabbitConnectionFactory.newConnection();
mReadChannel = connection.createChannel();
mReadChannel.basicQos(1);
mReadChannel.queueDeclare(Settings.get().getSendQueue(), true, false, false, null);
consumer = new DefaultConsumer(mReadChannel) {
                @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bodyBytes) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, bodyBytes);
try {

if(mReadChannel != null) {

String body = new String(bodyBytes);
Rabbit.Message message = Rabbit.MessageBuilder.fromJson(body);

if (working) {
mReadChannel.basicNack(envelope.getDeliveryTag(), false, true);
return;
}

                            working = true;
mReadChannel.basicAck(envelope.getDeliveryTag(), false);

doWork(message);


} else {
return;
}

try {
synchronized (mLock) {
mLock.wait();
                                working = false;
}
} catch (InterruptedException e) {
e.printStackTrace();
}



} catch(Exception ex) {
                        working = false;
ex.printStackTrace();
}

};
mReadChannel.basicConsume(Settings.get().getSendQueue(), false, consumer);

Also below is how I'm setting up my connection factory.

   private void initRabbitConnectionFactory() {
mRabbitConnectionFactory = new ConnectionFactory();
mRabbitConnectionFactory.setHost("xxx.xxx.xxx");
mRabbitConnectionFactory.setUsername("user");
mRabbitConnectionFactory.setPassword("password");

mRabbitConnectionFactory.setVirtualHost("/");
mRabbitConnectionFactory.setPort(5672);
mRabbitConnectionFactory.setAutomaticRecoveryEnabled(true);
mRabbitConnectionFactory.setTopologyRecoveryEnabled(true);
mRabbitConnectionFactory.setExceptionHandler(new ExceptionHandler() {
@Override
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
exception.printStackTrace();
}

@Override
public void handleReturnListenerException(Channel channel, Throwable exception) {
exception.printStackTrace();
}

@Override
public void handleFlowListenerException(Channel channel, Throwable exception) {
exception.printStackTrace();
}

@Override
public void handleConfirmListenerException(Channel channel, Throwable exception) {
exception.printStackTrace();
}

@Override
public void handleBlockedListenerException(Connection connection, Throwable exception) {
exception.printStackTrace();
}

@Override
public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) {
exception.printStackTrace();
}

@Override
public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
exception.printStackTrace();
}

@Override
public void handleChannelRecoveryException(Channel ch, Throwable exception) {
exception.printStackTrace();
}

@Override
public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception) {
exception.printStackTrace();
}
});
}

Finally the below function releases the wait lock for handleDelivery.

private synchronized void notifyRabbit(final String queue, String smsFrom, String smsBody, boolean proceed, boolean smsSent) {

AsyncTask<Void, Void, Void> a = new AsyncTask<Void, Void, Void>() {
@Override
protected Void doInBackground(Void... params) {
while (true) {
Connection connection = null;
Channel channel = null;
try {
connection = mRabbitConnectionFactory.newConnection();
channel = connection.createChannel();
channel.confirmSelect();
AMQP.BasicProperties messageProperties = new AMQP.BasicProperties.Builder().contentType("text/plain").contentEncoding("UTF-8").build();
channel.basicPublish("", queue, messageProperties, b.since(timeSMSSent).secret(Settings.get().getSecret()).buildJson().getBytes());

} catch (Exception e) {
e.printStackTrace();
}
try {
if (channel != null)
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
try {
if (connection != null)
connection.close();
} catch (Exception e) {
                    e.printStackTrace();
}
if (didWork) {

synchronized (mLock) {
mLock.notifyAll();
}

return null;
}
}
}
};
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB) {
a.executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
} else {
a.execute();
}
}

Karl Nilsson

unread,
Apr 7, 2017, 4:00:25 AM4/7/17
to rabbitm...@googlegroups.com
Hi,

When using acknowledgements RabbitMQ provides at-least-once delivery. That means that occasionally after failures you may receive a redelivery of a message that was partially processed by the consumer (e.g. written to a database) but not acknowledged before the consumer crashed. As rabbit didn't receive an ack and the connection was closed it assumes the message was not successfully processed


The absolutely best solution would be to design your application so that your consumers process messages idempotently, i.e. processing duplicates doesn't change the end result. This is easier said than done in many cases however. Another option may be to locally keep a persistent small window of say the last 100 messages that have been processed and check that a newly received message isn't in the last processed list. That said if your application fails after processing but _before_ updating your local message window you still can process duplicates.


Cheers
Karl

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Karl Nilsson

Staff Software Engineer, Pivotal/RabbitMQ

Mohamad Nachabe

unread,
Apr 7, 2017, 4:19:43 AM4/7/17
to rabbitmq-users
Hi, 

But the first thing my consumer does is acking the received message. So a connection failure before the ack will probably throw an exception, making the consumer unable to process the message and thus keep the message unacked in the queue. In a second scenario, a connection failure occurring after the ack, would result in a processed (processing does not require internet connection) and acked message. So, in theory, no redelivery should happen, since in my second scenario the message was acked and processed and in the first scenario neither ack nor processing occurred. 
Am I missing out on something here?
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Karl Nilsson

unread,
Apr 7, 2017, 5:02:30 AM4/7/17
to rabbitm...@googlegroups.com
It may be that your processes failed before that ack message was fully written to the socket. That should be fairly rare though.

You're approach is a bit unusual. Normally you would ack the message after work is done to ensure the message isn't "lost" when processing fails. Is it really more important to your service not to processes duplicates than not to process a message at all?



To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Mohamad Nachabe

unread,
Apr 7, 2017, 5:34:38 AM4/7/17
to rabbitmq-users
Actually yes, acking the message before processing is essential for me because the process involves money transactions on my side (I, therefore, cannot process duplicates). But what you said makes a good point, what if I failed somewhere when processing? I am actually handling all types of failures, but there is always a small probability of failure.
My concern is the following, let's say I have 2 consumers listening to the same queue. I publish a message. Consumer 1 takes the message and waits until processing is done to ack (processing might take up to 2 minutes in a worst case scenario). Meanwhile, consumer 2 is waiting for a message. Now since consumer 1 hasn't finished its work, wouldn't consumer 2 take that unacked message? And I end up with a message consumed by 2 different consumers and lose money? Is there any flag raised whenever a consumer takes a delivery? I am not aware whether this problem exists, however, if it somewhat happens I cannot afford it.
Anyway, I would like to note that I previously tried using autoacks, yet this trial proved to be a failure because whenever I had more than 1 message waiting in the queue, a message autoack seemed to kill all other messages in the queue. Therefore, I moved to manual acks. 

Karl Nilsson

unread,
Apr 7, 2017, 5:44:40 AM4/7/17
to rabbitm...@googlegroups.com
No, consumer 2 would not receive the message currently being processed by consumer 1 unless  consumer 1 either nacks the message or fails before acking.

The basic properties of a message contains a 'redelivered' flag which indicates if a message has previously been seen by a consumer but was returned without ack. In a case where the ack happens after processing this means it may or may not have been successfully processed before. You could inspect this on delivery and perhaps rather than automatically processing it could republish it to another queue for manual handling.



To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages