reply_to problem with aio_pika.

239 views
Skip to first unread message

Joel Bremson

unread,
Jun 17, 2020, 3:38:09 PM6/17/20
to rabbitm...@googlegroups.com
I'm trying to write some wrappers for consume and publish functionality for aio_pika.
I'm having trouble getting reply_to working to trigger the publish end callback.




async def async_consume(connection: RobustConnection, queue_name: str,
process_message: Coroutine, prefetch_count: int=1,
auto_delete: bool=True):
"""If a """
LOG.info(f"Starting async_consume for queue: {queue_name}")
channel = await connection.channel()

await channel.set_qos(prefetch_count=prefetch_count)
queue = await channel.declare_queue(queue_name, auto_delete=auto_delete)
LOG.info(f"Queue declared for: {queue_name}")
if queue_name == 'amq.rabbitmq.reply-to':
await queue.consume(partial(process_message, channel), no_ack=True)
else:
await queue.consume(process_message)

async def async_publish(connection: RobustConnection,
routing_key: str, message: PFMessage,
auto_delete=True):
LOG.info(f"Starting async_publish for routing_key: "
f"{routing_key}")
channel = await connection.channel()
exchange = await channel.declare_exchange("direct", auto_delete=auto_delete)
queue = await channel.declare_queue(routing_key, auto_delete=auto_delete)
await queue.bind(exchange, routing_key)
await exchange.publish(message, routing_key)

pytest test code:
async def callback(message: aio_pika.IncomingMessage):
async with message.process():
print(f"callback output: {message.body}")
async def reply_callback(exchange: Exchange, message: aio_pika.IncomingMessage):
async with message.process():
print(f"reply_callback output: {message.body}")
msg = PFMessage.make_pfmessage({'result':'success'})
print(message.reply_to)
await exchange.publish(msg, message.reply_to)

@pytest.mark.asyncio
async def test_async_consume_reply(base_login):
conn = await ConnectionInfo.get_rmc_connection(**base_login)
await async_consume(conn, "test_queue", reply_callback)
await async_consume(conn, "amq.rabbitmq.reply-to", callback)
msg = PFMessage.make_pfmessage({'do_reply': 1111}, reply_to='amq.rabbitmq.reply-to')
await async_publish(conn, "test_queue", msg)
await conn.close() ERROR output

AILED                 [ 66%]
test/test_consume_publish.py:51 (test_async_consume_reply)
base_login = {'connection_name': 'testing', 'host': 'localhost', 'login': 'guest', 'password': 'guest', ...}

    @pytest.mark.asyncio
    async def test_async_consume_reply(base_login):
        conn = await ConnectionInfo.get_rmc_connection(**base_login)
        await async_consume(conn, "test_queue", reply_callback)
        await async_consume(conn, "amq.rabbitmq.reply-to", callback)
        msg = PFMessage.make_pfmessage({'do_reply': 1111}, reply_to='amq.rabbitmq.reply-to')
>       await async_publish(conn, "test_queue", msg)

test_consume_publish.py:58:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../pfrabbit/asynced/async_consume_publish.py:85: in async_publish
    await exchange.publish(message, routing_key)
../../venvs/rpc/lib/python3.7/site-packages/aio_pika/exchange.py:242: in publish
    timeout=timeout,
/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:388: in wait_for
    return await fut
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Channel: "3">, body = b'{"do_reply": 1111}'

    async def basic_publish(
        self,
        body: bytes,
        *,
        exchange: str = "",
        routing_key: str = "",
        properties: spec.Basic.Properties = None,
        mandatory: bool = False,
        immediate: bool = False
    ) -> typing.Optional[ConfirmationFrameType]:
   
        frame = spec.Basic.Publish(
            exchange=exchange,
            routing_key=routing_key,
            mandatory=mandatory,
            immediate=immediate,
        )
   
        content_header = ContentHeader(
            properties=properties or spec.Basic.Properties(delivery_mode=1),
            body_size=len(body),
        )
   
        if not content_header.properties.message_id:
            # UUID compatible random bytes
            rnd_id = os.urandom(16)
            content_header.properties.message_id = hexlify(rnd_id).decode()
   
        confirmation = None
   
        async with self.lock:
            self.delivery_tag += 1
   
            if self.publisher_confirms:
                message_id = content_header.properties.message_id
   
                if self.delivery_tag not in self.confirmations:
                    self.confirmations[
                        self.delivery_tag
                    ] = self.create_future()
   
                confirmation = self.confirmations[self.delivery_tag]
   
                self.message_id_delivery_tag[message_id] = self.delivery_tag
   
                confirmation.add_done_callback(
                    lambda _: self.message_id_delivery_tag.pop(
                        message_id, None,
                    ),
                )
   
            self.writer.write(pamqp.frame.marshal(frame, self.number))
   
            # noinspection PyTypeChecker
            self.writer.write(pamqp.frame.marshal(content_header, self.number))
   
            with BytesIO(body) as buf:
                read_chunk = partial(buf.read, self.max_content_size)
                reader = iter(read_chunk, b"")
   
                for chunk in reader:
                    # noinspection PyTypeChecker
                    self.writer.write(
                        pamqp.frame.marshal(ContentBody(chunk), self.number),
                    )
   
        if not self.publisher_confirms:
            return
   
>       return await confirmation
E       aiormq.exceptions.ChannelPreconditionFailed: PRECONDITION_FAILED - fast reply consumer does not exist

Luke Bakken

unread,
Jun 25, 2020, 2:23:06 PM6/25/20
to rabbitmq-users
Hi Joel,

I suggest you contact the aio-pika maintainers - https://github.com/mosquito/aio-pika

Thanks,
Luke
Reply all
Reply to author
Forward
0 new messages