RabbitMQ – Cancel consumer.nextDelivery while it's waiting for a response and also no affecting exis

89 views
Skip to first unread message

Soorya Prakash

unread,
Feb 15, 2018, 2:09:01 AM2/15/18
to rabbitmq-users

Consumer thread contains channel and receive message for processing.

Consumer.java:

import java.util.HashMap;
    import java.util.Map;
    import org.json.JSONObject;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;

    public class Consumer implements Runnable {

        private static final String RPC_QUEUE_NAME = "XXX";
        private static final String EXCHANGE_NAME = "YYYY";

        @Override
        public void run() {
            try {
                ConnectionFactory factory = new ConnectionFactory();
                Connection con = factory.newConnection();
                Channel channel = con.createChannel();
                channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

                channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null);
                channel.queueBind(RPC_QUEUE_NAME, EXCHANGE_NAME, "XXX");

                channel.basicQos(1);

                boolean ack = false;
                QueueingConsumer consumer = new QueueingConsumer(channel);

                channel.basicConsume(RPC_QUEUE_NAME, ack, consumer);

                QueueingConsumer.Delivery delivery = consumer.nextDelivery();

                while (delivery != null) {
                    try {
                        String message = new String(delivery.getBody());
                        String routingKey = delivery.getEnvelope().getRoutingKey();

                       //Processing logic goes here
                    } catch(Exception e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                    delivery = consumer.nextDelivery();

                }
                channel.close();
                con.close();
            } catch (Exception e) {
                // TODO: Produce RPC based error message if exception occurred
                e.printStackTrace();
            }
        }
    }

Main.java:

 import java.io.IOException;
    import java.util.Date;

    public class MainThread {

        public static void main(String[] args) throws IOException, InterruptedException {
            int workerThreads = 4;
            for (int i = 1; i <= workerThreads; i++) {
                (new Thread(new Consumer())).start();
            }

        }
    }

My java application starts with the main method where I am running the thread pool with count of 4. The 4 new threads started and each thread work as a consumer and receive and process message one at a time and made active with the while loop.

My requirement is, I have to stop the java application such that, it should not affect the message processing consumer and should terminate after completing after all processing threads completes. Can anyone help me to achieve this.

I am trying to using https://dzone.com/articles/gracefully-shutting-down-java-in-containers as refence to stop the application. But i am not able find how to stop the consumers using this.

I am using Java 7 with amqp-client 3.0.4 version.

Michael Klishin

unread,
Feb 15, 2018, 6:28:56 AM2/15/18
to rabbitm...@googlegroups.com
Please don't use QueueingConsumer. It has been deprecated for years and was removed
in Java client 5.0 (IIRC).

Cancelling a consumer from a delivery handler with a DefaultConsumer subclass, for example, is quite trivial.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ
Reply all
Reply to author
Forward
0 new messages