RabbitMQ RPC problem with asyncio

806 views
Skip to first unread message

Yuriy Homyakov

unread,
Jan 18, 2016, 2:33:20 AM1/18/16
to python-tulip
Hi!

I am trying to implement RPC through RabbitMQ, described  here http://www.rabbitmq.com/tutorials/tutorial-six-python.html

With pika and twisted i getting 1000 RPC calls per second, similar implementation on asyncio yields only 25 RPC calls per second.
Asyncio adapter for pika i implement myself - it is just port of Twisted adapter - https://github.com/appetito/pika/blob/master/pika/adapters/asyncio_connection.py

RPC server implementation:


import sys
import time
import asyncio
import pika

from pika.adapters import asyncio_connection


@asyncio.coroutine
def make_connection(loop, host="localhost", port=5672):

    def connection_factory():
        params = pika.ConnectionParameters()
        return asyncio_connection.AsyncioProtocolConnection(params, loop=loop)

    transport, connection = yield from loop.create_connection(connection_factory, host, port)
    yield from connection.ready
    return connection


@asyncio.coroutine
def server(loop):
    conn = yield from make_connection(loop)
    chan = yield from conn.channel()

    conn2 = yield from make_connection(loop)
    chan2 = yield from conn2.channel()
    print('Channel', chan)
    yield from chan.queue_declare(queue='rpc_queue')
    queue, ctag = yield from chan.basic_consume(queue='rpc_queue', no_ack=True)

    while True:
        ch, method, props, body = yield from queue.get()
        # print('Get', body, props)
        yield from chan2.basic_publish(
            exchange='',
            routing_key=props.reply_to,
            body=body[::-1])


loop = asyncio.get_event_loop()

try:
    task = asyncio.ensure_future(server(loop))
    loop.run_until_complete(task)
except KeyboardInterrupt:
    print('Done')

RPC client implementation:



import sys
import time
import asyncio
import pika

from pika.adapters import asyncio_connection


class Counter:

    def __init__(self, name=''):
        self.start = time.time()
        self.cnt = 0
        self.name = name

    def reset(self):
        self.start = time.time()
        self.cnt = 0

    def inc(self, value=1):
        self.cnt += value

    def summary(self):
        now = time.time()
        return {
            'time': now - self.start,
            'count': self.cnt,
            'rate': self.cnt / (now - self.start)
        }

    def __str__(self):
        return '{name}: time: {time}, count: {count}, rate: {rate}'.format(
            name=self.name, **self.summary())


c = Counter('Counter')


@asyncio.coroutine
def make_connection(loop, host="localhost", port=5672):

    def connection_factory():
        params = pika.ConnectionParameters()
        return asyncio_connection.AsyncioProtocolConnection(params, loop=loop)

    transport, connection = yield from loop.create_connection(connection_factory, host, port)
    yield from connection.ready
    return connection


@asyncio.coroutine
def call(loop):
    global c
    conn = yield from make_connection(loop)
    chan = yield from conn.channel()

    print('Channel', chan)
    result = yield from chan.queue_declare(exclusive=True)
    cb_queue = result.method.queue
    print('CBQ', cb_queue)
    queue, ctag = yield from chan.basic_consume(queue=cb_queue, no_ack=True)
    c.reset()

    while True:
        yield from chan.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                 reply_to=cb_queue,
                 ),
            body='Hello World!')

        ch, method, props, body = yield from queue.get()
        c.inc()

loop = asyncio.get_event_loop()

try:
    task = asyncio.ensure_future(call(loop))
    loop.run_until_complete(task)
except KeyboardInterrupt:
    print('Done\n', c)


all sources (with twisted-based implementation) are here https://github.com/appetito/pika_tests

I can speedup asyncio RPC implementation by adding second connection to RabbitMQ - to use one connection for reading and another connection for writing, but i don't like this decision.

What i am doing wrong?

P.S.
i also trying to use https://github.com/polyconseil/aioamqp - with the same result

 

Charles-François Natali

unread,
Jan 18, 2016, 4:06:19 AM1/18/16
to Yuriy Homyakov, python-tulip
It's just a random guess, but does asyncio set TCP_NODELAY on the sockets?

Yuriy Homyakov

unread,
Jan 18, 2016, 7:11:33 AM1/18/16
to python-tulip, yuriy.h...@gmail.com
Its works! Thank you very much!

i have added
transport._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)

is there any caveats? possible problems? 

Charles-François Natali

unread,
Jan 18, 2016, 2:50:12 PM1/18/16
to Yuriy Homyakov, python-tulip
No caveats, actually I think this should be the default (that's the
case for example for all TCP sockets in Go, zmq also sets it, and all
web browers).

Andrew Svetlov

unread,
Jan 18, 2016, 3:01:50 PM1/18/16
to python-tulip
See also https://github.com/python/asyncio/issues/311

Switching NODELAY on for aiohttp helps so much, along with TCP_CORK/TCP_NOPUSH if available.

As long I can say nginx uses NODELAY by default as well.

Anton VV

unread,
Nov 23, 2016, 5:30:25 AM11/23/16
to python-tulip, yuriy.h...@gmail.com
What performance do you get after setting TCP_NODELAY?

Yury Selivanov

unread,
Nov 23, 2016, 9:12:18 PM11/23/16
to python...@googlegroups.com


On 2016-01-18 2:50 PM, Charles-François Natali wrote:
> No caveats, actually I think this should be the default (that's the

asyncio will set it by default in 3.6.

Yury

Rémi Cardona

unread,
Jan 3, 2017, 10:04:06 AM1/3/17
to python-tulip
(disclaimer: I'm the co-maintainer of aioamqp)

Hi Yuriy,

We already set TCP_NODELAY on sockets: https://github.com/Polyconseil/aioamqp/blob/master/aioamqp/__init__.py#L68

Right now, there has been rather little performance work on aioamqp as it's been "good enough" for us. I'd really be interested in improving that and any help would be most welcome.

Cheers,

Rémi
Reply all
Reply to author
Forward
0 new messages