Christian,
You are always using an exchange, it's just that if the exchange
name is empty, it will use the default exchange.
I'm a little unclear of how this works exactly, and so are many
other it seems :)
Kombu doesn't support this yet, but it shouldn't be too hard to add.
It may be harder for the virtual transports though, so instead of
relying on this behaviour I create unique queue/exchange names
with UUIDs instead:
# client
from kombu.utils import gen_unique_id
reply_id = gen_unqiue_id()
reply_exchange = Exchange("reply")
callback_queue = Queue(reply_id, reply_exchange,
routing_key=reply_id)
# must declare in advance so reply message isn't
# published before.
callback_queue(channel).declare()
producer = Producer(channel)
producer.publish(body, exchange=some_exchange_name, routing_key=some_key,
reply_to=reply_id)
# worker
producer = Producer(channel)
def reply_callback(body, message):
producer.publish(response, exchange=reply_exchange,
routing_key=message.properties.reply_to,
correlation_id=message.properties.correlation_id)
message.ack()
The only real difference here is that you use a named reply exchange,
instead of the default exchange. For one I find the example a little confusing
in that the same exchange is used for both requests and replies.
In Kombu master I added support for "anonymous" queues, but this
will only work for amqp:
# client
callback_queue_name = Queue()(channel).declare()
producer.publish(body, exchange="", reply_to=callback_queue_name)
# worker
def reply_callback(body, message):
producer.publish(response, exchange="",
routing_key=message.properties.reply_to,
correlation_id=message.properties.correlation_id)
message.ack()
--
Ask Solem
twitter.com/asksol | +44 (0)7713357179
Ask,
Thank you for the quick response. I also thought about designing of this
RPC paradigma with message queues in general and I will stick to your
named Exchange example. It defines all components explicit and is easy
to follow.
Regards,
Christian