Interacting with AMQP using multithreadded Python

1,909 views
Skip to first unread message

Stuart Longland

unread,
Nov 30, 2014, 6:29:26 PM11/30/14
to rabbitm...@googlegroups.com
Hi all,

I'm in the process of writing an energy meter data collection package in
Python and wanted to incorporate some real-time data reporting.

The software in question has been in development for some years now,
focussing on the historical aspect of metering; so collecting load
surveys/data logs out of energy meters (Modbus and proprietary, or via
some EMS/SCADA system) dumping this out to CSV files, performing
whatever processing was necessary then pushing it elsewhere.

As such, it uses a largely synchronous interface, made asynchronous by
spawning threads to perform work then setting an Event object to signal
completion.

For real-time, I am looking at AMQP as the underlying messaging
framework. Others I've considered was using ZeroMQ or homebrewing
something using socket.io. I'm new to this whole field, so in all
probability, I've overlooked things.

I've done some tests with Pika using its blocking consumer which worked
okay when used in simple cases, but quickly fell over when I tried any
serious polling.

Since the Pika documentation explicitly states that it is not
threadsafe, I ran the Pika blocking client in its own thread and used
Python Queue objects to pass the messages in and out. I relied on
timeout hooks to poll the outgoing message/command queue. I suspect
this architecture is to blame rather than Pika.

So having gotten my feet wet, I'm looking around at available Python
APIs for AMQP, in particular, looking at ones that are truly threadsafe.
How well do the various libraries cope in a multithreadded environment?
--
Stuart Longland (aka Redhatter, VK4MSL)

I haven't lost my mind...
...it's backed up on a tape somewhere.

Laing, Michael

unread,
Dec 1, 2014, 5:00:00 AM12/1/14
to Stuart Longland, rabbitm...@googlegroups.com
You are better off using the async adapters in pika if you want performance.

That's what we do.

We may have a few threads in our apps, but the heavy lifting is around the event loops.

If we need more schlitz we use the python subprocess module to coordinate and run multiple children, each running async.

Our RabbitMQ/Cassandra interface runs this way, for example, taking advantage of both the pika and cassandra driver async capabilities to keep hundreds of reads/writes in flight and efficiently achieving very high throughput.

ml 

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send an email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Stuart Longland

unread,
Dec 1, 2014, 6:59:46 PM12/1/14
to michae...@nytimes.com, rabbitm...@googlegroups.com
Hi Michael,
On 01/12/14 19:59, Laing, Michael wrote:
> You are better off using the async adapters in pika if you want performance.
>
> That's what we do.
>
> We may have a few threads in our apps, but the heavy lifting is around
> the event loops.
>
> If we need more schlitz we use the python subprocess module to
> coordinate and run multiple children, each running async.
>
> Our RabbitMQ/Cassandra interface runs this way, for example, taking
> advantage of both the pika and cassandra driver async capabilities to
> keep hundreds of reads/writes in flight and efficiently achieving very
> high throughput.

I figured as much regarding using the asynchronous interface, I'll admit
asynchronous programming does my head in at times.

Basically in my system I've got a number of daemon processes which are
the sources (e.g. one might connect to a Modbus network and poll energy
meters and PLCs, another might read and write historical data, etc.),
then there's the clients.

The Pika documentation states that a connection should be used within
*one* thread only.

This bit is what particularly does my head in. Take the server for
example. Suppose a request comes in to read the current energy reading.
So the registered message callback gets called, I spawn a thread and
tell it to go fetch the data.

All good, message received and understood. Then the reading comes in
off the serial port. I cannot do a send of the message right then and
there, because I'm running in a different thread.

So far my solution has been to stow the message in a Python Queue.Queue
object and have a callback registered with 'add_timeout' look through
that queue for outgoing traffic.

At the client end, it's a similar deal. The client is essentially a
library, and I'd like it to be a thread-safe one. I want to be able to
call methods in my client class, which in turn fire off requests over
AMQP and either synchronously or asynchronously retrieve the result
irrespective of what thread I'm calling from.

So I have to do the same tricks. I suspect it's these tricks that are
giving me grief.

I've done some research and stumbled across this:
https://github.com/0x6e6562/hopper/blob/master/Multithreading%20AMQP%20Clients.markdown

which seems to suggest multithreading *might* be possible, but using a
channel per thread, which contradicts what the Pika docs say.

I'm also looking at Kombu at the moment, I started with Pika because
that's what the RabbitMQ tutorials recommend. It seems to hint at some
multiprocessing smarts, but doesn't say so explicitly. Another that I'm
looking at is 'rabbitpy' (same author as pika), not sure what its status
is but it does advertise being thread-safe.

https://rabbitpy.readthedocs.org/en/latest/

I'll keep digging and see what I come up with I guess. :-)

Gavin M. Roy

unread,
Dec 1, 2014, 7:33:20 PM12/1/14
to Stuart Longland, Michael Laing, rabbitm...@googlegroups.com
Starting with your last statement, rabbitpy is thread safe, and more Pythonic, but as Michael pointed out, if you're looking for the highest performance, go with an async adapter in Pika (tornado benches the fastest for me).

The reason why threads are important in pika (or any other client adapter) is AMQP is a bi-directional RPC protocol. This means the server can send the client commands as well as the client sending the server commands. Examples of these commands include Connection.Blocked, Channel.Close, Connection.Close, and not directly a command, but Heartbeat.

Async pika expects proper client behavior with regard to these things by registering callbacks. BlockingConnection in pika tries to make pika more pythonic and remove callback passing style programming but at a cost. Instead of being purely event driven, it has to interrupt the client code (your code) every so often to make sure nothing is pending from the server.  We have to do this, otherwise a publisher could flood a channel and never see a RPC request or Heartbeat from the server. This is why BlockingConnection is slower than the other adapters. With regards to threading, it also does not use any locking primitives or queues for communication across threads, which would be required for proper operation.

Also, what happens when you have two threads adding multiple frames to the output buffer with pika? There is the distinct possibility that they can be interwoven sending mismatched protocol frames for what you're publishing or responding to.

rabbitpy gets around this by having a dedicated IO thread, ensuring that all frame writing is atomic to a thread-safe queue, and handling server issued RPC calls appropriately. There is a performance penalty to this, but not as big as the penalty for pika's BlockingConnection. It doesn't help you now, but pika 0.10's BlockingConnection will be based upon this model (and will also share rabbitpy's underlying AMQP serialization library pamqp).

Threading is hard to get correct, even in Python. Writing async, event driven code is much easier to get correct. I'd serious consider the async model over threading. I've rarely come across a use case that requires threads for normal application flow, such as with a consumer application. It's much closer to how RabbitMQ operates conceptually and doesn't have to do any special "tricks" for proper client behavior.

For simplicity and scaling throughput, I find that using multiple processes with async handlers (Python's multiprocessing.Process) are much easier to deal with than threads and you'll get more reliable behavior.

I hope this helps with some context,

Gavin

Stuart Longland

unread,
Dec 1, 2014, 8:25:30 PM12/1/14
to Gavin M. Roy, Michael Laing, rabbitm...@googlegroups.com
Hi Gavin,
On 02/12/14 10:33, Gavin M. Roy wrote:
> Starting with your last statement, rabbitpy is thread safe, and more
> Pythonic, but as Michael pointed out, if you're looking for the highest
> performance, go with an async adapter in Pika (tornado benches the
> fastest for me).

Well, super duper performance isn't the biggest goal, it's
maintainability and reliability. I feel I can generate more reliable
synchronous code and much of my existing code base is synchronous already.

Worst case right now is 1-second samples from a handful of meters. The
samples are being collected and timestamped at the source, and the
system is not a hard real-time system, so delays are tolerable.

What I was getting though was a complete hang.

> The reason why threads are important in pika (or any other client
> adapter) is AMQP is a bi-directional RPC protocol. This means the server
> can send the client commands as well as the client sending the server
> commands. Examples of these commands include Connection.Blocked,
> Channel.Close, Connection.Close, and not directly a command, but Heartbeat.
>
> Async pika expects proper client behavior with regard to these things by
> registering callbacks. BlockingConnection in pika tries to make pika
> more pythonic and remove callback passing style programming but at a
> cost. Instead of being purely event driven, it has to interrupt the
> client code (your code) every so often to make sure nothing is pending
> from the server. We have to do this, otherwise a publisher could flood
> a channel and never see a RPC request or Heartbeat from the server. This
> is why BlockingConnection is slower than the other adapters. With
> regards to threading, it also does not use any locking primitives or
> queues for communication across threads, which would be required for
> proper operation.

Yeah, I can see that being a problem.

What I guess I was hoping for was that I could poll my client library
regularly whilst waiting on transactions to complete, rather than
splitting my code amongst numerous callback functions.

The idea I had was that in my code I'd call a method that returns an
object which will hold the response. Something along the lines of:

class Response(object):
def __init__(self, client):
self._client = client
self._done = threading.Event()
self._res = None
self._state= 0 # state code
def _set_result(self, result, state):
self._res = result
self._state = state
self._done.set()
def done(self):
return self._done.is_set()
def wait(self, timeout=None):
if timeout is not None:
deadline = time.time() + timeout
else:
deadline = None
while (not self._done.wait(0.1)) and \
((deadline is None) or (deadline > time.time()):
client.poll()

result = property(lambda s : s._res)
state = property(lambda s : s._state)

When the response comes in, the event loop (whether in the main thread
or a separate thread) would call _set_result. Whatever requested the
data can either call wait() or done() to see if the transaction is finished.

I find this a little easier to understand than callback functions. I
have otherwise considered re-writing my API to use callbacks instead,
but the complexities of the tasks involved favour a more synchronous
approach.

I imagine somewhere deep inside pika, is a function that more or less is
called in a loop that checks the state of the socket, fires off
callbacks and performs other tasks which is what the fictional
"client.poll()" method above represents.

> Also, what happens when you have two threads adding multiple frames to
> the output buffer with pika? There is the distinct possibility that they
> can be interwoven sending mismatched protocol frames for what you're
> publishing or responding to.

This I'm not sure about, I didn't (intentionally) try sending multiple
frames from separate threads as the docs said not to. Such resource
contention can really make a mess of things so I try to avoid it.

> rabbitpy gets around this by having a dedicated IO thread, ensuring that
> all frame writing is atomic to a thread-safe queue, and handling server
> issued RPC calls appropriately. There is a performance penalty to this,
> but not as big as the penalty for pika's BlockingConnection. It doesn't
> help you now, but pika 0.10's BlockingConnection will be based upon this
> model (and will also share rabbitpy's underlying AMQP serialization
> library pamqp).

This looks like more or less what I've been trying to achieve.

As I say, super performance isn't the goal here as in many cases, we're
talking on RS-485 serial links that are running at 19200 baud and below.

> Threading is hard to get correct, even in Python. Writing async, event
> driven code is much easier to get correct. I'd serious consider the
> async model over threading. I've rarely come across a use case that
> requires threads for normal application flow, such as with a consumer
> application. It's much closer to how RabbitMQ operates conceptually and
> doesn't have to do any special "tricks" for proper client behavior.
>
> For simplicity and scaling throughput, I find that using multiple
> processes with async handlers (Python's multiprocessing.Process) are
> much easier to deal with than threads and you'll get more reliable behavior.
>
> I hope this helps with some context,

It does, and it's given me a bit to think about. I've got rabbitpy
installed now on one box (going to roll a Debian package of it for
further testing) and see how it goes.

Regards,
Reply all
Reply to author
Forward
0 new messages