I'm trying to write some wrappers for consume and publish functionality for aio_pika.
I'm having trouble getting reply_to working to trigger the publish end callback.
async def async_consume(connection: RobustConnection, queue_name: str,
process_message: Coroutine, prefetch_count: int=1,
auto_delete: bool=True):
"""If a """
LOG.info(f"Starting async_consume for queue: {queue_name}")
channel = await connection.channel()
await channel.set_qos(prefetch_count=prefetch_count)
queue = await channel.declare_queue(queue_name, auto_delete=auto_delete)
LOG.info(f"Queue declared for: {queue_name}")
if queue_name == 'amq.rabbitmq.reply-to':
await queue.consume(partial(process_message, channel), no_ack=True)
else:
await queue.consume(process_message)
async def async_publish(connection: RobustConnection,
routing_key: str, message: PFMessage,
auto_delete=True):
LOG.info(f"Starting async_publish for routing_key: "
f"{routing_key}")
channel = await connection.channel()
exchange = await channel.declare_exchange("direct", auto_delete=auto_delete)
queue = await channel.declare_queue(routing_key, auto_delete=auto_delete)
await queue.bind(exchange, routing_key)
await exchange.publish(message, routing_key)
pytest test code:
async def callback(message: aio_pika.IncomingMessage):
async with message.process():
print(f"callback output: {message.body}")
async def reply_callback(exchange: Exchange, message: aio_pika.IncomingMessage):
async with message.process():
print(f"reply_callback output: {message.body}")
msg = PFMessage.make_pfmessage({'result':'success'})
print(message.reply_to)
await exchange.publish(msg, message.reply_to)
@pytest.mark.asyncio
async def test_async_consume_reply(base_login):
conn = await ConnectionInfo.get_rmc_connection(**base_login)
await async_consume(conn, "test_queue", reply_callback)
await async_consume(conn, "amq.rabbitmq.reply-to", callback)
msg = PFMessage.make_pfmessage({'do_reply': 1111}, reply_to='amq.rabbitmq.reply-to')
await async_publish(conn, "test_queue", msg)
await conn.close()
ERROR output
AILED Â Â Â Â Â Â Â Â [ 66%]
test/test_consume_publish.py:51 (test_async_consume_reply)
base_login = {'connection_name': 'testing', 'host': 'localhost', 'login': 'guest', 'password': 'guest', ...}
  @pytest.mark.asyncio
  async def test_async_consume_reply(base_login):
    conn = await ConnectionInfo.get_rmc_connection(**base_login)
    await async_consume(conn, "test_queue", reply_callback)
    await async_consume(conn, "amq.rabbitmq.reply-to", callback)
    msg = PFMessage.make_pfmessage({'do_reply': 1111}, reply_to='amq.rabbitmq.reply-to')
> Â Â Â await async_publish(conn, "test_queue", msg)
test_consume_publish.py:58:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../pfrabbit/asynced/async_consume_publish.py:85: in async_publish
  await exchange.publish(message, routing_key)
../../venvs/rpc/lib/python3.7/site-packages/aio_pika/exchange.py:242: in publish
  timeout=timeout,
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:388: in wait_for
  return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Channel: "3">, body = b'{"do_reply": 1111}'
  async def basic_publish(
    self,
    body: bytes,
    *,
    exchange: str = "",
    routing_key: str = "",
    properties: spec.Basic.Properties = None,
    mandatory: bool = False,
    immediate: bool = False
  ) -> typing.Optional[ConfirmationFrameType]:
 Â
    frame = spec.Basic.Publish(
      exchange=exchange,
      routing_key=routing_key,
      mandatory=mandatory,
      immediate=immediate,
    )
 Â
    content_header = ContentHeader(
      properties=properties or spec.Basic.Properties(delivery_mode=1),
      body_size=len(body),
    )
 Â
    if not content_header.properties.message_id:
      # UUID compatible random bytes
      rnd_id = os.urandom(16)
      content_header.properties.message_id = hexlify(rnd_id).decode()
 Â
    confirmation = None
 Â
    async with self.lock:
      self.delivery_tag += 1
 Â
      if self.publisher_confirms:
        message_id = content_header.properties.message_id
 Â
        if self.delivery_tag not in self.confirmations:
          self.confirmations[
            self.delivery_tag
          ] = self.create_future()
 Â
        confirmation = self.confirmations[self.delivery_tag]
 Â
        self.message_id_delivery_tag[message_id] = self.delivery_tag
 Â
        confirmation.add_done_callback(
          lambda _: self.message_id_delivery_tag.pop(
            message_id, None,
          ),
        )
 Â
      self.writer.write(pamqp.frame.marshal(frame, self.number))
 Â
      # noinspection PyTypeChecker
      self.writer.write(pamqp.frame.marshal(content_header, self.number))
 Â
      with BytesIO(body) as buf:
        read_chunk = partial(buf.read, self.max_content_size)
        reader = iter(read_chunk, b"")
 Â
        for chunk in reader:
          # noinspection PyTypeChecker
          self.writer.write(
            pamqp.frame.marshal(ContentBody(chunk), self.number),
          )
 Â
    if not self.publisher_confirms:
      return
 Â
> Â Â Â return await confirmation
E Â Â Â aiormq.exceptions.ChannelPreconditionFailed: PRECONDITION_FAILED - fast reply consumer does not exist