Hello,
I’m trying to do EXACTLY THIS:
http://stackoverflow.com/questions/2799731/wait-for-a-single-rabbitmq-message-with-a-timeout
However I’m using Pika for all of my RabbitMQ communications, and unfortunately the “Answer” is just “Fixed” which isn’t really applicable to use in my situation.
For a quick summary, I’m trying to send a message to RabbitMQ, then wait on a reply from the workers on the other side of the RabbitMQ queue on an exclusive queue. I would like to have a timeout setting on the monitor to wait for the message, just in case the
server is down, or taking an absurd amount of time.
For details on the connections, I’ve coded a synchronous publisher that uses one Select connection to connect to RabbitMQ, publishes the message on one channel, and consumes on a UUID channel.
From the documentation it appears that I’d need to be using a BlockingConnection in order to get the functionality that I need, but I currently reuse almost all of the code for the publisher/monitor, and would rather not have to code an all new synchronous publisher for this functionality.
Thank you for your time,
-Christopher Lefevre
> Hello,
> I’m trying to do EXACTLY THIS:
> http://stackoverflow.com/questions/2799731/wait-for-a-single-rabbitmq-message-with-a-timeout
> However I’m using Pika for all of my RabbitMQ communications, and unfortunately the “Answer” is just “Fixed” which isn’t really applicable to use in my situation.
The 'fixed' refers to amqplib, it doesn't mention pika.
>
> For a quick summary, I’m trying to send a message to RabbitMQ, then wait on a reply from the workers on the other side of the RabbitMQ queue on an exclusive queue. I would like to have a timeout setting on the monitor to wait for the message, just in case the server is down, or taking an absurd amount of time.
>
I don't have much experience using pika, but I'll try
to provide a general answer.
You shouldn't be blocking waiting for a single message
if your program is async. Instead you should use basic_consume
for every reply you are interested in, with a callback for each.
Something like:
reply_exchange = "replies"
def handle_reply(message):
reply_id = message.properties
def add_reply_callback(channel, publisher_id, reply_id, callback):
channel.exchange_declare(reply_exchange)
channel.queue_declare(publisher_id)
channel.queue_bind(publisher_id, reply_exchange, reply_id)
channel.basic_consume(callback, queue=publisher_id)
def send_rpc(message, callback, producer_channel,
consumer_channel, publisher_id):
reply_id = uuid()
add_reply_callback(consumer_channel, publisher_id, reply_id, callback)
producer_channel.basic_publish(message, properties={
"reply_to": reply_id})
This assumes you have a 'thread' draining events from the consumer_channel.
If you need to add a timeout on top of this, then the best route
depends on the async framework you are using?
_______________________________________________
rabbitmq-discuss mailing list
rabbitmq...@lists.rabbitmq.com
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss