RabbitMQ RPC Multiple Threads Question

1,348 views
Skip to first unread message

Dave H

unread,
Jun 24, 2016, 12:54:25 PM6/24/16
to rabbitmq-users
In the RPC example for RabbitMQ, here:  https://www.rabbitmq.com/tutorials/tutorial-six-java.html

it recommends (for performance reasons) re-using the same reply-to queue and adding a correlation-id header to figure out which message is the reply.  The code that runs on the client caller is this:

public String call(String message) throws Exception {     
   
String response = null;
   
String corrId = java.util.UUID.randomUUID().toString();

   
BasicProperties props = new BasicProperties
                               
.Builder()
                               
.correlationId(corrId)
                               
.replyTo(replyQueueName)
                               
.build();

   
channel.basicPublish("", requestQueueName, props, message.getBytes());

   
while (true) {
       
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
       
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
           
response = new String(delivery.getBody());
           
break;
       
}
   
}

   
return response;
}

Note that if it doesn't match the correlationId it just keeps pulling messages out of the queue.  But what about a multi-threaded client where different threads are calling the "call" method above.  Wouldn't their be a race condition?  Which-ever thread discards non-matching messages would be throwing away messages other threads are waiting for, yes?  

What do I not understand about consumer.nextDelivery()?  Or is the magic in the setup of the channel (some non-exclusivity of some kind)?

Thanks,
Dave

Dave H

unread,
Jun 24, 2016, 1:11:47 PM6/24/16
to rabbitmq-users
I've also read this page about 5 times, but am just not getting it:


Michael Klishin

unread,
Jun 24, 2016, 3:07:13 PM6/24/16
to rabbitm...@googlegroups.com
That tutorial is just an example. We don't recommend using QueueingBasicConsumer in general but it is
fairly convenient to explain the rest of what's covered in the tutorial.

With multiple callers that run concurrently you cannot use a shared queue. You should not share a channel for publishing
either. This is just an example that demonstrates how publishers and consumers can respond to each other
by passing a correlation ID and reply queue name in "request" messages.

I'm not sure how to respond to "not getting it." The above tutorial does not use direct reply-to and it doesn't address
concurrent callers, only helps you avoid declaring a response queue.

On Fri, Jun 24, 2016 at 8:11 PM, Dave H <david....@gmail.com> wrote:
I've also read this page about 5 times, but am just not getting it:


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Dave H

unread,
Jun 24, 2016, 10:18:18 PM6/24/16
to rabbitmq-users
Thanks for your quick reply. It helps me to understand the context of that tutorial.  I was simply reading too much into it.

I am concerned about concurrent callers because I need to make an RPC-style request/response coming from a servlet container thread. Reading the direct-reply feature documentation, it appears that creating a reply-to queue for each caller is inefficient...which I can believe.

Here is the part I don't understand for the documentation...it says an RPC client should "Consume from the pseudo-queue amq.rabbitmq.reply-to in no-ack mode."

Not sure what that would look like in Java code.  So far, my dealings with RabbitMQ have been with Spring Integration, which provides a nice layer of abstraction, but has kept me from having to learn much about the underying implementation.

The the documentation says: "Set the reply-to property in their request message to amq.rabbitmq.reply-to"

Does this mean I literally set the "reply-to" header to "amq.rabbitmq.reply-to"?  If so, is RabbitMQ generating a unique name and replacing the message's reply-to header?  How does rabbit later correlate a message from my rpc-style server with the waiting thread?

I am sorry for asking questions about these details, but I have a 2nd related issue with RPC-style communication I am trying to solve, which, when I understand things better, I'll create another post for....

Dave

Michael Klishin

unread,
Jun 25, 2016, 2:04:18 AM6/25/16
to rabbitm...@googlegroups.com
"Consume from the pseudo-queue amq.rabbitmq.reply-to in no-ack mode." means exactly
what that line says. See Channel#basicConsume and tutorial 2 on rabbitmq.com.

Yes, you need to literally set the reply-to property to "amq.rabbitmq.reply-to". It is not a real queue
but a piece of state that is associated with a channel.

Now that I think about it, if you want request/response with things like channel pooling/caching,
direct reply-to and no need to know how the protocol works, you probably want to take a look at Spring AMQP.
Certainly if you already use Spring.

Laxmi Narayan

unread,
Jun 11, 2017, 9:10:12 AM6/11/17
to rabbitmq-users
Hi,

Is there any example of rpc usages in threaded mode ?

I am having similar issue :

I have one connection for complete application.
One channel per thread.
Then one reply queue per message which I delete as soon as I consume. This much seems fine but


 public static String fetchDataFromRpc(String requestQueueName, byte[] message,Channel channel) throws IOException, InterruptedException {
             private static final Map<String,String> RESPONSE_MAP  = new ConcurrentHashMap<>();
            String  replyQueueName               =  channel.queueDeclare().getQueue();
            String corrId                        =  UUID.randomUUID().toString();
            AMQP.BasicProperties props           =  new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
            channel.basicPublish("", requestQueueName, props, message);
            Consumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    if (properties.getCorrelationId().equals(corrId)) {
                        RESPONSE_MAP.put(String.valueOf(channel.getChannelNumber()), new String(body, "UTF-8"));
                        channel.queueDelete(replyQueueName);
                    }
                }
            };
 
           channel.basicConsume(replyQueueName, true, defaultConsumer);
            String responseMsg = RESPONSE_MAP.get(String.valueOf(channel.getChannelNumber()));

            //update response map to avoid reading same data over and over.
            RESPONSE_MAP.put(String.valueOf(channel.getChannelNumber()),"");
            return responseMsg;
    }
}

The piece of code which is in bold state is consuming hell alot of heap size and code is going memory overflow.
I send message to some queue and listen reply on some tmp-queue , but all of my ACKs go on one particular queue.
Because I am running in threaded mode so I kept a hashmap associated with each channel-number.
what is issue in my code ?


Reply all
Reply to author
Forward
0 new messages