I'm trying to write some wrappers for consume and publish functionality for aio_pika.
I'm having trouble getting reply_to working to trigger the publish end callback.
async def async_consume(connection: RobustConnection, queue_name: str,
process_message: Coroutine, prefetch_count: int=1,
auto_delete: bool=True):
"""If a """
LOG.info(f"Starting async_consume for queue: {queue_name}")
channel = await connection.channel()
await channel.set_qos(prefetch_count=prefetch_count)
queue = await channel.declare_queue(queue_name, auto_delete=auto_delete)
LOG.info(f"Queue declared for: {queue_name}")
if queue_name == 'amq.rabbitmq.reply-to':
await queue.consume(partial(process_message, channel), no_ack=True)
else:
await queue.consume(process_message)
async def async_publish(connection: RobustConnection,
routing_key: str, message: PFMessage,
auto_delete=True):
LOG.info(f"Starting async_publish for routing_key: "
f"{routing_key}")
channel = await connection.channel()
exchange = await channel.declare_exchange("direct", auto_delete=auto_delete)
queue = await channel.declare_queue(routing_key, auto_delete=auto_delete)
await queue.bind(exchange, routing_key)
await exchange.publish(message, routing_key)
pytest test code:
async def callback(message: aio_pika.IncomingMessage):
async with message.process():
print(f"callback output: {message.body}")
async def reply_callback(exchange: Exchange, message: aio_pika.IncomingMessage):
async with message.process():
print(f"reply_callback output: {message.body}")
msg = PFMessage.make_pfmessage({'result':'success'})
print(message.reply_to)
await exchange.publish(msg, message.reply_to)
@pytest.mark.asyncio
async def test_async_consume_reply(base_login):
conn = await ConnectionInfo.get_rmc_connection(**base_login)
await async_consume(conn, "test_queue", reply_callback)
await async_consume(conn, "amq.rabbitmq.reply-to", callback)
msg = PFMessage.make_pfmessage({'do_reply': 1111}, reply_to='amq.rabbitmq.reply-to')
await async_publish(conn, "test_queue", msg)
await conn.close()
ERROR output
AILED [ 66%]
test/test_consume_publish.py:51 (test_async_consume_reply)
base_login = {'connection_name': 'testing', 'host': 'localhost', 'login': 'guest', 'password': 'guest', ...}
@pytest.mark.asyncio
async def test_async_consume_reply(base_login):
conn = await ConnectionInfo.get_rmc_connection(**base_login)
await async_consume(conn, "test_queue", reply_callback)
await async_consume(conn, "amq.rabbitmq.reply-to", callback)
msg = PFMessage.make_pfmessage({'do_reply': 1111}, reply_to='amq.rabbitmq.reply-to')
> await async_publish(conn, "test_queue", msg)
test_consume_publish.py:58:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../pfrabbit/asynced/async_consume_publish.py:85: in async_publish
await exchange.publish(message, routing_key)
../../venvs/rpc/lib/python3.7/site-packages/aio_pika/exchange.py:242: in publish
timeout=timeout,
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:388: in wait_for
return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Channel: "3">, body = b'{"do_reply": 1111}'
async def basic_publish(
self,
body: bytes,
*,
exchange: str = "",
routing_key: str = "",
properties: spec.Basic.Properties = None,
mandatory: bool = False,
immediate: bool = False
) -> typing.Optional[ConfirmationFrameType]:
frame = spec.Basic.Publish(
exchange=exchange,
routing_key=routing_key,
mandatory=mandatory,
immediate=immediate,
)
content_header = ContentHeader(
properties=properties or spec.Basic.Properties(delivery_mode=1),
body_size=len(body),
)
if not content_header.properties.message_id:
# UUID compatible random bytes
rnd_id = os.urandom(16)
content_header.properties.message_id = hexlify(rnd_id).decode()
confirmation = None
async with self.lock:
self.delivery_tag += 1
if self.publisher_confirms:
message_id = content_header.properties.message_id
if self.delivery_tag not in self.confirmations:
self.confirmations[
self.delivery_tag
] = self.create_future()
confirmation = self.confirmations[self.delivery_tag]
self.message_id_delivery_tag[message_id] = self.delivery_tag
confirmation.add_done_callback(
lambda _: self.message_id_delivery_tag.pop(
message_id, None,
),
)
self.writer.write(pamqp.frame.marshal(frame, self.number))
# noinspection PyTypeChecker
self.writer.write(pamqp.frame.marshal(content_header, self.number))
with BytesIO(body) as buf:
read_chunk = partial(buf.read, self.max_content_size)
reader = iter(read_chunk, b"")
for chunk in reader:
# noinspection PyTypeChecker
self.writer.write(
pamqp.frame.marshal(ContentBody(chunk), self.number),
)
if not self.publisher_confirms:
return
> return await confirmation
E aiormq.exceptions.ChannelPreconditionFailed: PRECONDITION_FAILED - fast reply consumer does not exist