Hi Gavin,
On 02/12/14 10:33, Gavin M. Roy wrote:
> Starting with your last statement, rabbitpy is thread safe, and more
> Pythonic, but as Michael pointed out, if you're looking for the highest
> performance, go with an async adapter in Pika (tornado benches the
> fastest for me).
Well, super duper performance isn't the biggest goal, it's
maintainability and reliability. I feel I can generate more reliable
synchronous code and much of my existing code base is synchronous already.
Worst case right now is 1-second samples from a handful of meters. The
samples are being collected and timestamped at the source, and the
system is not a hard real-time system, so delays are tolerable.
What I was getting though was a complete hang.
> The reason why threads are important in pika (or any other client
> adapter) is AMQP is a bi-directional RPC protocol. This means the server
> can send the client commands as well as the client sending the server
> commands. Examples of these commands include Connection.Blocked,
> Channel.Close, Connection.Close, and not directly a command, but Heartbeat.
>
> Async pika expects proper client behavior with regard to these things by
> registering callbacks. BlockingConnection in pika tries to make pika
> more pythonic and remove callback passing style programming but at a
> cost. Instead of being purely event driven, it has to interrupt the
> client code (your code) every so often to make sure nothing is pending
> from the server. We have to do this, otherwise a publisher could flood
> a channel and never see a RPC request or Heartbeat from the server. This
> is why BlockingConnection is slower than the other adapters. With
> regards to threading, it also does not use any locking primitives or
> queues for communication across threads, which would be required for
> proper operation.
Yeah, I can see that being a problem.
What I guess I was hoping for was that I could poll my client library
regularly whilst waiting on transactions to complete, rather than
splitting my code amongst numerous callback functions.
The idea I had was that in my code I'd call a method that returns an
object which will hold the response. Something along the lines of:
class Response(object):
def __init__(self, client):
self._client = client
self._done = threading.Event()
self._res = None
self._state= 0 # state code
def _set_result(self, result, state):
self._res = result
self._state = state
self._done.set()
def done(self):
return self._done.is_set()
def wait(self, timeout=None):
if timeout is not None:
deadline = time.time() + timeout
else:
deadline = None
while (not self._done.wait(0.1)) and \
((deadline is None) or (deadline > time.time()):
client.poll()
result = property(lambda s : s._res)
state = property(lambda s : s._state)
When the response comes in, the event loop (whether in the main thread
or a separate thread) would call _set_result. Whatever requested the
data can either call wait() or done() to see if the transaction is finished.
I find this a little easier to understand than callback functions. I
have otherwise considered re-writing my API to use callbacks instead,
but the complexities of the tasks involved favour a more synchronous
approach.
I imagine somewhere deep inside pika, is a function that more or less is
called in a loop that checks the state of the socket, fires off
callbacks and performs other tasks which is what the fictional
"client.poll()" method above represents.
> Also, what happens when you have two threads adding multiple frames to
> the output buffer with pika? There is the distinct possibility that they
> can be interwoven sending mismatched protocol frames for what you're
> publishing or responding to.
This I'm not sure about, I didn't (intentionally) try sending multiple
frames from separate threads as the docs said not to. Such resource
contention can really make a mess of things so I try to avoid it.
> rabbitpy gets around this by having a dedicated IO thread, ensuring that
> all frame writing is atomic to a thread-safe queue, and handling server
> issued RPC calls appropriately. There is a performance penalty to this,
> but not as big as the penalty for pika's BlockingConnection. It doesn't
> help you now, but pika 0.10's BlockingConnection will be based upon this
> model (and will also share rabbitpy's underlying AMQP serialization
> library pamqp).
This looks like more or less what I've been trying to achieve.
As I say, super performance isn't the goal here as in many cases, we're
talking on RS-485 serial links that are running at 19200 baud and below.
> Threading is hard to get correct, even in Python. Writing async, event
> driven code is much easier to get correct. I'd serious consider the
> async model over threading. I've rarely come across a use case that
> requires threads for normal application flow, such as with a consumer
> application. It's much closer to how RabbitMQ operates conceptually and
> doesn't have to do any special "tricks" for proper client behavior.
>
> For simplicity and scaling throughput, I find that using multiple
> processes with async handlers (Python's multiprocessing.Process) are
> much easier to deal with than threads and you'll get more reliable behavior.
>
> I hope this helps with some context,
It does, and it's given me a bit to think about. I've got rabbitpy
installed now on one box (going to roll a Debian package of it for
further testing) and see how it goes.
Regards,