Consumer thread contains channel and receive message for processing.
Consumer.java:
import java.util.HashMap;
import java.util.Map;
import org.json.JSONObject;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer implements Runnable {
private static final String RPC_QUEUE_NAME = "XXX";
private static final String EXCHANGE_NAME = "YYYY";
@Override
public void run() {
try {
ConnectionFactory factory = new ConnectionFactory();
Connection con = factory.newConnection();
Channel channel = con.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(RPC_QUEUE_NAME, true, false, false, null);
channel.queueBind(RPC_QUEUE_NAME, EXCHANGE_NAME, "XXX");
channel.basicQos(1);
boolean ack = false;
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, ack, consumer);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
while (delivery != null) {
try {
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
//Processing logic goes here
} catch(Exception e) {
e.printStackTrace();
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
delivery = consumer.nextDelivery();
}
channel.close();
con.close();
} catch (Exception e) {
// TODO: Produce RPC based error message if exception occurred
e.printStackTrace();
}
}
}Main.java:
import java.io.IOException;
import java.util.Date;
public class MainThread {
public static void main(String[] args) throws IOException, InterruptedException {
int workerThreads = 4;
for (int i = 1; i <= workerThreads; i++) {
(new Thread(new Consumer())).start();
}
}
}My java application starts with the main method where I am running the thread pool with count of 4. The 4 new threads started and each thread work as a consumer and receive and process message one at a time and made active with the while loop.
My requirement is, I have to stop the java application such that, it should not affect the message processing consumer and should terminate after completing after all processing threads completes. Can anyone help me to achieve this.
I am trying to using https://dzone.com/articles/gracefully-shutting-down-java-in-containers as refence to stop the application. But i am not able find how to stop the consumers using this.
I am using Java 7 with amqp-client 3.0.4 version.
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.