Adapter merges tornado and pika

730 views
Skip to first unread message

Richard Clark

unread,
Mar 20, 2010, 9:57:39 AM3/20/10
to Tornado Web Server
Ok, I'm half asleep so I'll make this quick. After fighting with
numerous strategies for melding tornado and rabbitmq in a way that
doesn't involve spawning extra processes, mangling my interpreter,
going anywhere near twisted, mangling my ability to dispatch properly
off multiple queues on a single channel etc, I finally managed
something that, at least for first appearances, works.

The changes are fairly simple. Tornado needs one 2 line change to
ioloop to allow it to run a loop once, rather than spinning - sadly
attempts to do it without this change caused nightmarish bugs to
emerge.

In addition to that, two files (attached, no warranty etc etc) are
required:

asyncloop.py, an ioloop handler that mostly pretends to be an asyncore
dispatcher
ioloop_adapter.py, an adapter almost exactly like the
asyncore_adapter.py in pika, except it uses the asyncloop and gets rid
of some crap (and, unfortunately, timers, but they're easy to put back
using ioloop stuff)

In no way is this production code, it's barely even tested. I intend
to try and get it further in that direction tomorrow, but in case
anyone else has been banging their head on the same problem I figured
I'd throw the strategy out there.

Files:
http://gist.github.com/338680

Example:
http://gist.github.com/338683 (requires an amqp echo service on echo/
echo)

cross-posted to rabbitmq.

Andrew Badr

unread,
Mar 22, 2010, 8:56:15 PM3/22/10
to Tornado Web Server
For my own edification, what's wrong with starting a new process? I've
been using http://code.google.com/p/tornado-amqp/

> Example:http://gist.github.com/338683(requires an amqp echo service on echo/
> echo)
>
> cross-posted to rabbitmq.

can xiang

unread,
Mar 23, 2010, 10:30:25 PM3/23/10
to Tornado Web Server
It's looks great. Tornado and pika are good match.

> Example:http://gist.github.com/338683(requires an amqp echo service on echo/
> echo)
>
> cross-posted to rabbitmq.

majek

unread,
Mar 26, 2010, 7:24:01 AM3/26/10
to Tornado Web Server, in...@rabbitmq.com
On Mar 23, 12:56 am, Andrew Badr <andrewb...@gmail.com> wrote:
> For my own edification, what's wrong with starting a new process? I've
> been usinghttp://code.google.com/p/tornado-amqp/

It's great that it woks for you. The main disadvantage of tornado-amqp
is that is needs to spawn a new process for every AMQP connection.
Fork command is here:
http://code.google.com/p/tornado-amqp/source/browse/tamqp/slave.py#22

The main loop of a spawned process:
http://code.google.com/p/tornado-amqp/source/browse/tamqp/__init__.py#77

All the communication between tornado and the amqp worker process
needs to be pickled and go through a socket.

Although that might work, it is a bit hackish. On the other hand I
can't think of
a better way of integrating py-amqplib.

Marek Majkowski

Wil Tan

unread,
Mar 27, 2010, 1:56:51 AM3/27/10
to python-...@googlegroups.com, in...@rabbitmq.com
If you need to integrate a library that uses synchronous sockets such as py-amqp or xmpppy, gtornado is created for that. It patches Tornado IOLoop to use gevent's greenlet hub, so that you can optionally patch other libraries using gevent.


If you have any questions about using gtornado, please feel free to contact me.

=wil


To unsubscribe from this group, send email to python-tornado+unsubscribegooglegroups.com or reply to this email with the words "REMOVE ME" as the subject.

Richard Clark

unread,
Mar 28, 2010, 3:45:42 AM3/28/10
to Tornado Web Server
A number of people have suggested this, or the other alternative using
a spawned process pool to deal with the issue.

Neither are a solution equivalent to having an asynchronous
implementation that can run off the same io loop as tornado. The
reason is simple: only an integrated event loop can multiplex queues
on channels on connections. Gevent cannot utilise the multiplexed
channel functionality, as a result it must open one connection per
queue wait.

This isn't a big deal for a number of use-cases, those operating on a
simple pub/sub model for a few topics can live with this limitation
without it causing any serious problems with their architecture.

If, on the other hand, you've got a full RPC model as part of your
messaging architecture (response from remote consumer is delivered to
private queue), then the number of queues you can be waiting on can
run into the hundreds or thousands. Forcing the use of a full TCP
connection for each one of these is significantly less efficient than
using the built-in channel support.

Worse, a wait on a connection does not correspond to a wait on a
queue. Assuming you have a number of channels, each of which may
contain a number of queue subscriptions with callbacks on a given TCP
channel, patched greenlets become useless - we're not waiting on the
IO on a given socket, we're waiting on a particular set of messages to
that socket, which may be surrounded by hundreds of other messages
destined for other callbacks.

Obviously a native greenlet AMQP client would resolve this issue by
allowing you to wait on the queue itself, but simply changing the io
loop to gevent is not sufficient to do this - which is why I modified
pika instead. Not that that was a complete solution - pika itself
isn't always async - but it was a start and allowed me to do provide a
reply-to on AMQP and only have the callback woken up when a message to
that queue was received - without having to create my own queue
matching and callback handler, duplicating what the AMQP libs already
provide.

I appreciate the neat tricks gevent achieves, but it doesn't solve
these kind of issues.

Regards,
Richard.

Reply all
Reply to author
Forward
0 new messages