Consumer message ACK does not pass (JAVA)

1,129 views
Skip to first unread message

Vadim Kimlaychuk

unread,
Sep 24, 2014, 2:59:37 AM9/24/14
to rabbitm...@googlegroups.com
Hello all,

       I have implemented client in JAVA according to https://www.rabbitmq.com/api-guide.html. Have subclass that extendes DefaultConsumer. Overrided handleDelivery method that has at the end

channel.basicAck(deliveryTag, false);  
      I am receiving the messages without the problem, but never get them ACK-ed  on the server side unless I release connection (do shutdown for the client).   I am monitoring the queue during runtime and messages are never live the queue. During reading they are moved from "Ready" to "Unacked" and when I release the connection from "Unacked" back to "Ready".  I could not remove them from queue at all !  Have tried different combination of queue consuming including:

    ch.basicConsume(exchange.getQueue(), consumer);
    ch.basicConsume(exchange.getQueue(), false, consumer);
    ch.basicConsume(exchange.getQueue(), true, consumer);

     Also have tried to ACK message just at the beginning of handleDelivery () -- all with the same result. Messages are not removed from the queue. 
     RabbitMQ server version is 3.0.1, client - 3.3.5.  Have no idea where to search the error.

Please, help

Vadim.


Michael Klishin

unread,
Sep 24, 2014, 3:12:16 AM9/24/14
to rabbitm...@googlegroups.com, Vadim Kimlaychuk
On 24 September 2014 at 10:59:42, Vadim Kimlaychuk (vad...@gmail.com) wrote:
> I am receiving the messages without the problem, but never get
> them ACK-ed on the server side unless I release connection (do
> shutdown for the client). I am monitoring the queue during runtime
> and messages are never live the queue.

I suspect this is mean to be  "leave".

> During reading they are
> moved from "Ready" to "Unacked" and when I release the connection
> from "Unacked" back to "Ready". I could not remove them from queue
> at all ! Have tried different combination of queue consuming
> including:
>
> ch.basicConsume(exchange.getQueue(), consumer);
> ch.basicConsume(exchange.getQueue(), false, consumer);
> ch.basicConsume(exchange.getQueue(), true, consumer);

Well, the 2nd argument of Channel#basicConsume definitely matters here.
Lines 1 and 3 are equivalent but 2 is not.
If you use automatic acknowledgements (the 2nd line), you must not use basicAck,
and if you do, you will get a channel exception.

See Tutorial 2:
http://www.rabbitmq.com/tutorials/tutorial-two-java.html

> Also have tried to ACK message just at the beginning of handleDelivery
> () -- all with the same result. Messages are not removed from the
> queue.
> RabbitMQ server version is 3.0.1, client - 3.3.5. Have no idea
> where to search the error.

Take a look at what's in RabbitMQ log: are there any channel errors?
Use Tracer [1] to see what's being sent on the wire.
Posting some code would certainly make it easier to tell what's going on.

1. http://www.rabbitmq.com/java-tools.html
--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Vadim Kimlaychuk

unread,
Sep 24, 2014, 3:37:24 AM9/24/14
to rabbitm...@googlegroups.com, vad...@gmail.com
Thank you for tips, Michael. I have moved forward by simplifying my handleDelivery() and have localized the issue. Still messages can't leave (you are right here as well) the queue when I wanted.

1. I have message sending to another queue (error queue) if error is detected.  If I comment out this piece of code - messages are ACKed after reading and removed from the queue successfully. 
2. For sending error messages I create another channel and do : ch.basicPublish(destination.getExchangeName(), destination.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, message.getBytes());  

Can't understand -- why sending to another queue over newly created channel affects existing delivery ???

Vadim.

Michael Klishin

unread,
Sep 24, 2014, 3:40:30 AM9/24/14
to rabbitm...@googlegroups.com, Vadim Kimlaychuk
 On 24 September 2014 at 11:37:29, Vadim Kimlaychuk (vad...@gmail.com) wrote:
> 1. I have message sending to another queue (error queue) if error
> is detected. If I comment out this piece of code - messages are
> ACKed after reading and removed from the queue successfully.
> 2. For sending error messages I create another channel and do
> : ch.basicPublish(destination.getExchangeName(), destination.getRoutingKey(),
> MessageProperties.PERSISTENT_BASIC, message.getBytes());
>
> Can't understand -- why sending to another queue over newly created
> channel affects existing delivery ???

I can't suggest much unless you post more code or at least a Tracer log.

Acks are scoped per channel, not queue. Publishes cannot affect acknowledgements
performed by consumers.

Vadim Kimlaychuk

unread,
Sep 24, 2014, 4:15:17 AM9/24/14
to rabbitm...@googlegroups.com, vad...@gmail.com
This piece of code inside handleDelivery() affects the ACK. If I comment it out -- everything works.
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
       ....
         RabbitMqSender sender = new RabbitMqSender(new Destination(exchange.getName(), exchange.getRetrunRoutingKey()));  //sending to another queue
         sender.send(response);
       ....
         getChannel().basicAck(deliveryTag, false);
   }

--------------------------------   RabbitMqSender.java --------------------------------------------------
public class RabbitMqSender {
    private static Logger logger = Logger.getLogger(RabbitMqSender.class);
    private Destination destination;
    private RabbitMqConnection connection;
    private Channel ch;

    public RabbitMqSender(Destination destination) throws IOException {
        this.destination = destination;
        this.connection = RabbitMqConnection.getInstance();   // Global connection to RabbitMQ
        this.ch = connection.getConnection().createChannel();
    }

    public void send(String message) throws IOException {
            ch.basicPublish(destination.getExchangeName(), destination.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, message.getBytes());
            logger.trace("Messge: " + message  + "\n is sent to: " + destination.getExchangeName() + " returning key: "+ destination.getRoutingKey() + " channel:" + ch.getChannelNumber());
            logger.debug("Message with routing key: " + destination.getRoutingKey() + " sent over channel: " + ch.getChannelNumber());
            ch.close();
     }
}

---------------------- Destination.java --------------------
public class Destination {
    private String exchangeName;
    private String routingKey;

    public Destination(String exchangeName, String routingKey) {
        this.exchangeName = exchangeName;
        this.routingKey = routingKey;
    }

    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }

    public String getExchangeName() {
        return exchangeName;
    }

    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public String getRoutingKey() {
        return routingKey;
    }
}
-------------------------------------------------------

Vadim Kimlaychuk

unread,
Sep 24, 2014, 4:19:42 AM9/24/14
to rabbitm...@googlegroups.com
This piece of code affects ACK :

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        long deliveryTag = envelope.getDeliveryTag();
        ...
        RabbitMqSender sender = new RabbitMqSender(new Destination(exchange.getName(), exchange.getRetrunRoutingKey()));
        sender.send(response);
        ...
        getChannel().basicAck(deliveryTag, false);
    }

-------------------------    RabbitMqSender.java ------------------------------------

public class RabbitMqSender {
    private static Logger logger = Logger.getLogger(RabbitMqSender.class);
    private Destination destination;
    private RabbitMqConnection connection;
    private Channel ch;

    public RabbitMqSender(Destination destination) throws IOException {
        this.destination = destination;
        this.connection = RabbitMqConnection.getInstance();  // Global connection
        this.ch = connection.getConnection().createChannel();
    }

    public void send(String message) throws IOException {
            ch.basicPublish(destination.getExchangeName(), destination.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, message.getBytes());
            logger.trace("Messge: " + message  + "\n is sent to: " + destination.getExchangeName() + " returning key: "+ destination.getRoutingKey() + " channel:" + ch.getChannelNumber());
            logger.debug("Message with routing key: " + destination.getRoutingKey() + " sent over channel: " + ch.getChannelNumber());
            ch.close();
     }
}

----------------------------  Destination.java ---------------------------------------

public class Destination {
    private String exchangeName;
    private String routingKey;

    public Destination(String exchangeName, String routingKey) {
        this.exchangeName = exchangeName;
        this.routingKey = routingKey;
    }

    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }

    public String getExchangeName() {
        return exchangeName;
    }

    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public String getRoutingKey() {
        return routingKey;
    }
}

----------------------------------------------------------------------

Michael Klishin

unread,
Sep 24, 2014, 5:09:35 AM9/24/14
to rabbitm...@googlegroups.com, Vadim Kimlaychuk
You can only ack a delivery on the same channel it was delivered on.

I am pretty sure that getChannel() in your consumer returns a different
channel instance from connection.getConnection().createChannel(), which is
what RabbitMqSender uses. Acking on a different channel will result in
a channel exception, which should be visible in RabbitMQ log (which I've asked
about before and got no response). 

As a side note, if you use your own abstractions on top of the client, make them
accept a channel (and possibly other pieces of context they need, e.g. well-known
queue names). Or even consider spring-amqp, which does encapsulate several
common patterns.

Vadim Kimlaychuk

unread,
Sep 24, 2014, 6:17:49 AM9/24/14
to rabbitm...@googlegroups.com, vad...@gmail.com
I guess RabbitMQ.log is on the server side, isn't it? I don't have access there, that is why I didn't post it.  
What do you mean under "accepting channel" ? Who should do it -- receiver or sender ?

Michael Klishin

unread,
Sep 24, 2014, 7:28:56 AM9/24/14
to rabbitm...@googlegroups.com, Vadim Kimlaychuk
On 24 September 2014 at 14:17:55, Vadim Kimlaychuk (vad...@gmail.com) wrote:
> I guess RabbitMQ.log is on the server side, isn't it? I don't
> have access there, that is why I didn't post it.

That's too bad. Channel exceptions can be spotted by adding ShutdownListeners and using Tracer.

> What do you mean under "accepting channel" ? Who should do it --
> receiver or sender ?

You must ack on the same channel a delivery was received on (the same is true
for publisher confirms, actually). If you attempt to ack on a different channel,
then all you get is a channel exception. The message will not be acked, if
that's wasn't clear. 

I cannot suggest much else without seeing what's going on on the wire
(or RabbitMQ log).

Vadim Kimlaychuk

unread,
Sep 24, 2014, 7:56:43 AM9/24/14
to rabbitm...@googlegroups.com, vad...@gmail.com
I have played with logging channels on the client side while my access to server is being organized.  I am sure I use the same channel to confirm delivery where it comes from. At least ID numbers are the same. Receiving goes over channel number 1 and sending over channel number 5. I have 3 more queue consumers that runs in parallel on channels 2, 3 and 4.
But after sending message over other channel, the original one changes somehow.  I see that only RabbitMQ server log can show the error. When I got access to it I will continue this thread. Thank you for help !

Michael Klishin

unread,
Sep 24, 2014, 8:17:27 AM9/24/14
to rabbitm...@googlegroups.com, Vadim Kimlaychuk
 On 24 September 2014 at 15:56:49, Vadim Kimlaychuk (vad...@gmail.com) wrote:
> I see that only RabbitMQ server log can show the error. When I
> got access to it I will continue this thread.

And a Tracer log (on-the-wire capture):
http://www.rabbitmq.com/java-tools.html

Vadim Kimlaychuk

unread,
Sep 25, 2014, 3:33:50 AM9/25/14
to rabbitm...@googlegroups.com, vad...@gmail.com
Trace log is really helpful. Used it for a first time and now have more questions then before. :)

Testing scenario:  1 primary queue with 1 XML message inside ready for delivery. Consumer should read the message and send it to delay queue with timeout = 20 sec. Sending happens on the separate channel.  Here is the trace log:
Result: Sending happens, original message is not deleted, 1 clone to original queue has being added. Particularly it is not clear for me -- where did message with delivery-tag=2 come from !!???
Question number 2:   When successful delivery happens I got #method<basic.ack> BEFORE #method<basic.deliver> .  It is not logical -- I should first get the message and then send ACK isn't it ?
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1411629100306: <Tracer-0> ch#0 <- {#method<connection.start>(version-major=0, version-minor=9, server-properties={product=RabbitMQ, information=Licensed under the MPL.  See http://www.rabbitmq.com/, platform=Erlang/OTP,capabilities={exchange_exchange_bindings=true, consumer_cancel_notify=true, basic.nack=true, publisher_confirms=true}, copyright=Copyright (C) 2007-2012 VMware, Inc., version=3.0.1}, mechanisms=PLAIN AMQPLAIN, locales=en_US), null, ""}
1411629100318: <Tracer-0> ch#0 -> {#method<connection.start-ok>(client-properties={product=RabbitMQ, information=Licensed under the MPL. See http://www.rabbitmq.com/, platform=Java,capabilities={exchange_exchange_bindings=true, authentication_failure_close=true, publisher_confirms=true, basic.nack=true, consumer_cancel_notify=true, connection.blocked=true}, copyright=Copyright (C) 2007-2014 GoPivotal, Inc., version=3.3.5}, mechanism=PLAIN, response= guest guest, locale=en_US), null, ""}
1411629100361: <Tracer-0> ch#0 <- {#method<connection.tune>(channel-max=0, frame-max=131072, heartbeat=600), null, ""}
1411629100371: <Tracer-0> ch#0 -> {#method<connection.tune-ok>(channel-max=0, frame-max=131072, heartbeat=30), null, ""}
1411629100372: <Tracer-0> ch#0 -> {#method<connection.open>(virtual-host=/, capabilities=, insist=false), null, ""}
1411629100414: <Tracer-0> ch#0 <- {#method<connection.open-ok>(known-hosts=), null, ""}
1411629100433: <Tracer-0> ch#1 -> {#method<channel.open>(out-of-band=), null, ""}
1411629100474: <Tracer-0> ch#1 <- {#method<channel.open-ok>(channel-id=), null,""}
1411629100484: <Tracer-0> ch#1 -> {#method<exchange.declare>(ticket=0, exchange=anto_topic_exchange, type=topic, passive=false, durable=false, auto-delete=false, internal=false, nowait=false, arguments={}), null, ""}
1411629100526: <Tracer-0> ch#1 <- {#method<exchange.declare-ok>(), null, ""}
1411629100533: <Tracer-0> ch#1 -> {#method<queue.declare>(ticket=0, queue=anto_kmdb, passive=false, durable=true, exclusive=false, auto-delete=false, nowait=false, arguments={}), null, ""}
1411629100574: <Tracer-0> ch#1 <- {#method<queue.declare-ok>(queue=anto_kmdb, message-count=1, consumer-count=0), null, ""}
1411629100580: <Tracer-0> ch#1 -> {#method<queue.bind>(ticket=0, queue=anto_kmdb, exchange=anto_topic_exchange, routing-key=elion.portconf.kmdb.anto, nowait=false, arguments={}), null, ""}
1411629100623: <Tracer-0> ch#1 <- {#method<queue.bind-ok>(), null, ""}
1411629100646: <Tracer-0> ch#1 -> {#method<basic.consume>(ticket=0, queue=anto_kmdb, consumer-tag=, no-local=false, no-ack=false, exclusive=false, nowait=false, arguments={}), null, ""}
1411629100691: <Tracer-0> ch#1 <- {#method<basic.consume-ok>(consumer-tag=amq.ctag-5PHZ3IFIXx7vEpqqVo2whA), null, ""}
1411629100700: <Tracer-0> ch#1 <- {#method<basic.deliver>(consumer-tag=amq.ctag-5PHZ3IFIXx7vEpqqVo2whA, delivery-tag=1, redelivered=false, exchange=anto_topic_exchange, routing-key=elion.portconf.kmdb.anto), #contentHeader<basic>(content-type=application/octet-stream, content-encoding=null, headers=null, delivery-mode=2, priority=0, correlation-id=null, reply-to=null, expiration=null, message-id=n
ull, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null), "<?xml version="1.0"?>
<ORDER_ANSWER>
    <ID>22059</ID>
    <ANSWER_STATUS>FAIL</ANSWER_STATUS>
    <ANSWER_MSG>Timeout</ANSWER_MSG>
    <ACTION>CONFIGURE</ACTION>
    <P_NR>P01133291</P_NR>
    <ANSWER_DATE>14.07.2014 12:22</ANSWER_DATE>
</ORDER_ANSWER>
"}
1411629100902: <Tracer-0> ch#2 -> {#method<channel.open>(out-of-band=), null, ""}
1411629100945: <Tracer-0> ch#2 <- {#method<channel.open-ok>(channel-id=), null,""}
1411629100946: <Tracer-0> ch#2 -> {#method<queue.declare>(ticket=0, queue=anto_kmdb_delayed, passive=false, durable=true, exclusive=false, auto-delete=false, nowait=false, arguments={x-message-ttl=20000, x-dead-letter-exchange=anto_topic_exchange, x-dead-letter-routing-key=elion.portconf.kmdb.anto}), null, ""}
1411629100990: <Tracer-0> ch#2 <- {#method<queue.declare-ok>(queue=anto_kmdb_delayed, message-count=0, consumer-count=0), null, ""}
1411629100998: <Tracer-0> ch#2 -> {#method<basic.publish>(ticket=0, exchange=anto_topic_exchange, routing-key=elion.portconf.kmdb.anto.delay, mandatory=false, immediate=false),#contentHeader<basic>(content-type=application/octet-stream, content-encoding=null, headers=null, delivery-mode=2, priority=0, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null), "<?xml version="1.0"?>
<ORDER_ANSWER>
    <ID>22059</ID>
    <ANSWER_STATUS>FAIL</ANSWER_STATUS>
    <ANSWER_MSG>Timeout</ANSWER_MSG>
    <ACTION>CONFIGURE</ACTION>
    <P_NR>P01133291</P_NR>
    <ANSWER_DATE>14.07.2014 12:22</ANSWER_DATE>
</ORDER_ANSWER>
"}
1411629101001: <Tracer-0> ch#2 -> {#method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0), null, ""}
1411629101040: <Tracer-0> ch#1 <- {#method<basic.deliver>(consumer-tag=amq.ctag-5PHZ3IFIXx7vEpqqVo2whA, delivery-tag=2, redelivered=false, exchange=anto_topic_exchange, routing-key=elion.portconf.kmdb.anto.delay), #contentHeader<basic>(content-type=application/octet-stream, content-encoding=null, headers=null, delivery-mode=2, priority=0, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null), "<?xml version="1.0"?>
<ORDER_ANSWER>
    <ID>22059</ID>
    <ANSWER_STATUS>FAIL</ANSWER_STATUS>
    <ANSWER_MSG>Timeout</ANSWER_MSG>
    <ACTION>CONFIGURE</ACTION>
    <P_NR>P01133291</P_NR>
    <ANSWER_DATE>14.07.2014 12:22</ANSWER_DATE>
</ORDER_ANSWER>
"}
1411629101041: <Tracer-0> ch#2 <- {#method<channel.close-ok>(), null, ""}
1411629101045: <Tracer-0> ch#1 -> {#method<basic.ack>(delivery-tag=1, multiple=false), null, ""} 

Michael Klishin

unread,
Sep 25, 2014, 3:50:47 AM9/25/14
to rabbitm...@googlegroups.com, Vadim Kimlaychuk


On 25 September 2014 at 11:33:56, Vadim Kimlaychuk (vad...@gmail.com) wrote:
> > Result: Sending happens, original message is not deleted,
> 1 clone to original queue has being added. Particularly it is
> not clear for me -- where did message with delivery-tag=2 come
> from !!???

ch#1 -> {#method<queue.declare>(queue=anto_kmdb}
ch#1 <- {#method<queue.declare-ok>(queue=anto_kmdb, message-count=1, consumer-count=0)}

^^^ one clue

ch#1 -> {#method<queue.bind>(queue=anto_kmdb, exchange=anto_topic_exchange, routing-key=elion.portconf.kmdb.anto}
ch#1 -> {#method<basic.consume>(queue=anto_kmdb, consumer-tag=}
ch#1 <- {#method<basic.consume-ok>(consumer-tag=amq.ctag-5PHZ3IFIXx7vEpqqVo2whA)}
ch#1 <- {#method<basic.deliver>(consumer-tag=amq.ctag-5PHZ3IFIXx7vEpqqVo2whA, delivery-tag=1, redelivered=false, exchange=anto_topic_exchange, routing-key=elion.portconf.kmdb.anto}
ch#2 -> {#method<queue.declare>(queue=anto_kmdb_delayed}
ch#2 -> {#method<basic.publish>(exchange=anto_topic_exchange, routing-key=elion.portconf.kmdb.anto.delay}
ch#2 -> {#method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0), null, ""}
ch#1 <- {#method<basic.deliver>(consumer-tag=amq.ctag-5PHZ3IFIXx7vEpqqVo2whA, delivery-tag=2, redelivered=false, exchange=anto_topic_exchange, routing-key=elion.portconf.kmdb.anto.delay}
ch#2 <- {#method<channel.close-ok>(), null, ""}
ch#1 -> {#method<basic.ack>(delivery-tag=1, multiple=false), null, ""} 

So both deliveries are for the same consumer, and not redeliveries.
By the time you add a consumer, you already had 1 message in it. After you
publish a message, you get another one with

routing-key=elion.portconf.kmdb.anto.delay

which is what your "delayer" just used. That's pretty suspicious, isn't it?

You use a single topic exchange for publishing to 2 completely unrelated queues.
My guess is that your republished ("delayed") message ends up being routed to both of the queues without you realizing it.

Since you seem to be using the topic largely as a direct exchange, start with
publishing "directly to queues" via default exchange. This cannot produce unexpected
copies and may be conceptually easier to reason about for now. As you get more
familiar with RabbitMQ, you can go back to using a topic exchange if needed.


> Question number 2: When successful delivery happens I got #method
> BEFORE #method . It is not logical -- I should
> first get the message and then send ACK isn't it ?

This is just tracer's output, since it talks to both your client and RabbitMQ
concurrently (or even in parallel), it can sometimes output
subsequent frames out of order.

Michael Klishin

unread,
Sep 25, 2014, 3:54:15 AM9/25/14
to rabbitm...@googlegroups.com, Vadim Kimlaychuk
 On 25 September 2014 at 11:50:46, Michael Klishin (mic...@rabbitmq.com) wrote:
> it can sometimes output
> subsequent frames out of order

Err. By "subsequent" I mean "received by Tracer nearly at the same time".

Vadim Kimlaychuk

unread,
Sep 25, 2014, 4:55:57 AM9/25/14
to rabbitm...@googlegroups.com, vad...@gmail.com
I have tried sending the message in a separate application with routing key = "elion.portconf.kmdb.anto.delay" and yes - it got published into 2 queues : elion.portconf.kmdb.anto and elion.portconf.kmdb.anto.delay
So the problem was not in ACK messages at all, but at wrong publishing logic.  I haven't expected such behavior. I thought that key is "matched" exactly.
We are probably using topic-s in a wrong way and I just followed this logic. Basically we have single topic to publish with different queues that have different routing keys. So I just created another queue with another key. 

By using "delay" queue I would like to solve the problem of "retries".  If i just send NACK back to original queue I got the same message ready for re-delivery immediately.  Try to imagine I need to send this message to web-service and it is down right now. I would like to try sending it after a while, not the next millisecond.  That is why I have created "delayed" queue that holds messages for some time and then put it back to original queue for re-delivery. 

If you know better way to solve this problem (rabbit-way) - I'll appreciate it.

Thank you for help. You really have opened "another" rabbit for me :)

Vadim.
Reply all
Reply to author
Forward
0 new messages