Trouble with Direct reply-to (Python/Pika)

673 views
Skip to first unread message

Matt

unread,
Feb 19, 2016, 9:38:01 PM2/19/16
to rabbitmq-users
Hi all, I'm having some trouble understanding how to implement the direct reply-to feature, as detailed here:  https://www.rabbitmq.com/direct-reply-to.html  

I'm using Python 3.4 with the pika package (https://pika.readthedocs.org/en/0.10.0/).

I'm just trying for a proof-of-concept, so my code is very simple.  The goal is for the client to send "Marco" to the server, and for the server to reply with "Polo" over the direct reply-to channel.  Again, this is a simple proof-of-concept experiment I'm doing.

Here is my server code:

#
# server.py
#   - a client will send us "Marco" and we will send back "Polo"
#     the goal is to use reply-to
 
import pika
 
def callback(ch, method, properties, body):
    print(body)
    ch.basic_publish(exchange='', routing_key='', body='Polo')
 
if __name__=='__main__':
    conn = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))
    ch = conn.channel()
   
    # start listening for messages from clients
    ch.basic_consume(callback, queue='amq.rabbitmq.reply-to', no_ack=True)
    ch.start_consuming()
 
And the client code:

#
# client.py
#   - send "Marco" to the direct reply-to channel
 
import pika
 
def callback(ch, method, properties, body):
    print("here")
    print(body)
 
 
if __name__ == '__main__':
    conn = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))
    ch = conn.channel()
 
    ch.basic_publish(exchange='', routing_key='', body='Marco', properties=pika.BasicProperties(reply_to='amq.rabbitmq.reply-to'))
    print("\t[x] Sent message")
   
    # now that we've sent Marco, listen for responses on the reply-to queue
    ch.basic_consume(callback, queue='amq.rabbitmq.reply-to', no_ack=True)
    ch.start_consuming()


The server starts fine and makes a connection to the broker.  But when I start the client I get the following error:

 
 
  1. (rabbit)user@hostname:~/code/rabbitmq$ python client.py
  2.     [x] Sent message
  3. Traceback (most recent call last):
  4.   File "client.py", line 20, in <module>
  5.     ch.basic_consume(callback, queue='amq.rabbitmq.reply-to', no_ack=True)
  6.   File "/home/user/code/rabbitmq/rabbit/lib/python3.4/site-packages/pika/adapters/blocking_connection.py", line 1459, in basic_consume
  7.     consumer_callback=consumer_callback)
  8.   File "/home/user/code/rabbitmq/rabbit/lib/python3.4/site-packages/pika/adapters/blocking_connection.py", line 1522, in _basic_consume_impl
  9.     self._flush_output(ok_result.is_ready)
  10.   File "/home/user/code/rabbitmq/rabbit/lib/python3.4/site-packages/pika/adapters/blocking_connection.py", line 1181, in _flush_output
  11.     raise exceptions.ChannelClosed(method.reply_code, method.reply_text)
  12. pika.exceptions.ChannelClosed(406, 'PRECONDITION_FAILED - fast reply consumer does not exist')

I have searched but cannot find much information on this error.  Can someone please point me in the right direction?

Regards,
-MS 

vitaly numenta

unread,
Feb 21, 2016, 8:07:06 AM2/21/16
to rabbitmq-users
Hi Matt, a few things aren't quite right with your code sample.

1. The RPC Server needs to consume on a normal queue (not amq.rabbitmq.reply-to).
2. The RPC Server needs to publish its response using routing_key=properties.reply_to (RabbitMQ substitutes reply_to with a virtual queue name)
3. The client needs to create its consumer before publishing the request (otherwise the broker can't substitute reply_to correctly, and you see that ChannelClose exception as the result)
4. The client needs to publish to exchange and routing_key that will reach the server (not exchange='', routing_key='')

See https://github.com/pika/pika/pull/711/files for an example of how to do this with pika.BlockingConnection.

Matt

unread,
Feb 22, 2016, 10:29:15 AM2/22/16
to rabbitmq-users
Very grateful for your time to prepare such a thorough response.  It helped greatly.

For any interested, here is the server code:

#
# server.py
#    - a client will send us "Marco" and we will send back "Polo"
#      the goal is to use reply-to

import pika

SERVER_QUEUE = 'rpc.server.queue'

def on_recv_req(ch, method, properties, body):
    print(body)
    ch.basic_publish(exchange='', routing_key=properties.reply_to, body='Polo')


if __name__=='__main__':
    conn = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))
    ch = conn.channel()

    # declare a queue
    ch.queue_declare(queue=SERVER_QUEUE, exclusive=True, auto_delete=True)
    ch.basic_consume(on_recv_req, queue=SERVER_QUEUE)
    ch.start_consuming()


And the client code:

#
# client.py
#    - send "Marco" to the direct reply-to channel

import pika

SERVER_QUEUE='rpc.server.queue'

def on_recv_resp(ch, method, properties, body):

    print(body)


if __name__ == '__main__':
    conn = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))
    ch = conn.channel()
   
    ch.basic_consume(on_recv_resp, queue='amq.rabbitmq.reply-to', no_ack=True)
    ch.basic_publish(exchange='', routing_key=SERVER_QUEUE, body='Marco', properties=pika.BasicProperties(reply_to='amq.rabbitmq.reply-to'))
    ch.start_consuming()

This works perfectly, and again am very grateful for your help.

Best,
-MS

Shital Jadhav

unread,
Jul 23, 2019, 4:11:24 AM7/23/19
to rabbitmq-users
Hi Matt, you have done a great job. It is only for a single cycle of send_message->receive_message->send_reply_message but what if I want to send many messages continuously and client will receive and reply on those messages. How to implement it for continuous sending messages and getting reply_messages?

Your help will be appreciated.

Matt Selph

unread,
Jul 23, 2019, 7:43:33 AM7/23/19
to rabbitmq-users
Hi there,

Just theorizing here - I haven't tried it.  Maybe wrap the producer in a for loop?  That might be an idea.

--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/rr4HvQRW9F4/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--

Matt Selph

Reply all
Reply to author
Forward
0 new messages