Is it possible to use RabbitMQ direct reply-to feature with a Pika generator consumer in Python?

232 views
Skip to first unread message

Maggyero

unread,
Jul 2, 2019, 4:56:20 AM7/2/19
to Pika
Also published on Stackoverflow: https://stackoverflow.com/q/56842059/2326961

I would like to use the [direct reply-to](https://www.rabbitmq.com/direct-reply-to.html) feature of RabbitMQ with the [Pika](https://pika.readthedocs.io/en/latest/index.html) client library in Python. It works with a *basic* consumer. But it raises the following exception with a *generator* consumer:

> pika.exceptions.ChannelClosedByBroker: (406, 'PRECONDITION_FAILED - reply consumer already set')

**Is there a way to use the direct reply-to feature with a generator consumer?**

Sample client code using a basic consumer (it works):

    import pika
   
   
    def handle(channel, method, properties, body):
        message = body.decode()
        print("received:", message)
   
   
    connection = pika.BlockingConnection()
    channel = connection.channel()
   
    with connection, channel:
        message = "hello"
        channel.basic_consume(queue="amq.rabbitmq.reply-to",
                              on_message_callback=handle, auto_ack=True)
        channel.basic_publish(
            exchange="", routing_key="test", body=message.encode(),
            properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
        print("sent:", message)
        channel.start_consuming()

Sample client code using a generator consumer (it raises the exception):

    import pika
   
   
    def handle(channel, method, properties, body):
        message = body.decode()
        print("received:", message)
   
   
    connection = pika.BlockingConnection()
    channel = connection.channel()
   
    with connection, channel:
        message = "hello"
        channel.basic_publish(
            exchange="", routing_key="test", body=message.encode(),
            properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
        print("sent:", message)
   
        for (method, properties, body) in channel.consume(
                queue="amq.rabbitmq.reply-to", auto_ack=True):
            handle(channel, method, properties, body)

Sample server code for both consumers:

    import pika
   
   
    def handle(channel, method, properties, body):
        message = body.decode()
        print("received:", message)
        message = "world"
        channel.basic_publish(
            exchange="", routing_key=properties.reply_to, body=message.encode())
        print("sent:", message)
        channel.basic_ack(method.delivery_tag)
   
   
    connection = pika.BlockingConnection()
    channel = connection.channel()
   
    with connection, channel:
        for (method, properties, body) in channel.consume(queue="test"):
            handle(channel, method, properties, body)

Environment: Windows 10, RabbitMQ 3.7.13, CPython 3.7.3, Pika 1.0.1.

Luke Bakken

unread,
Jul 2, 2019, 10:37:56 AM7/2/19
to Pika
Hello!

I won't have time to look at this this week. What I recommend is to run your two test programs using DEBUG level logging, and then compare the output to see what is different.

Some of the examples here show how to enable DEBUG: https://github.com/pika/pika/blob/master/examples/basic_consumer_threaded.py

Thanks -
Luke

Maggyero

unread,
Jul 3, 2019, 3:29:07 AM7/3/19
to Pika
Hello Luke,

Sorry I copied the wrong exception, here is the correct one:

> pika.exceptions.ChannelClosedByBroker: (406, 'PRECONDITION_FAILED - fast reply consumer does not exist')

*Note.* — Calling the `basic_consume` method *after* the `basic_publish` method in the sample client code using a basic consumer raises the same exception as when using a generator consumer:

    import pika
    
    
    def handle(channel, method, properties, body):
        message = body.decode()
        print("received:", message)
    
    
    connection = pika.BlockingConnection()
    channel = connection.channel()
    
    with connection, channel:
        message = "hello"
        channel.basic_publish(
            exchange="", routing_key="test", body=message.encode(),
            properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
        print("sent:", message)
        channel.basic_consume(queue="amq.rabbitmq.reply-to",
                              on_message_callback=handle, auto_ack=True)
        channel.start_consuming()

So it looks that the `basic_consume` method should be called *before* the `basic_publish` method. However we cannot do that with the `consume` method (generator consumer), can we?

Géry

Luke Bakken

unread,
Jul 3, 2019, 9:15:46 AM7/3/19
to Pika
Hello,

OK that makes more sense. Generally it's better for publishers and consumers to be using different connections as RabbitMQ will block a connection if load gets too high or an alarm is hit.

For the "consumer generator" method, you would have to add a short timeout on the first iteration, then publish on the first iteration, then re-iterate in the consumer loop.

At some point (like now) the limitations of BlockingConnection become apparent and you should use SelectConnection to accomplish what you need.

Thanks -
Luke

Maggyero

unread,
Jul 3, 2019, 2:19:15 PM7/3/19
to Pika
Hello Luke,

Thank you, this does the trick:

    import pika
   
   
    def handle(channel, method, properties, body):
        message = body.decode()
        print("received:", message)
   
   
    connection = pika.BlockingConnection()
    channel = connection.channel()
   
    with connection, channel:
        message = "hello"
        next(channel.consume(queue="amq.rabbitmq.reply-to", auto_ack=True,
                             inactivity_timeout=1))

        channel.basic_publish(
            exchange="", routing_key="test", body=message.encode(),
            properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
        print("sent:", message)
   
        for (method, properties, body) in channel.consume(
                queue="amq.rabbitmq.reply-to", auto_ack=True):
            handle(channel, method, properties, body)

 Géry
Reply all
Reply to author
Forward
0 new messages