ConsumerMixin thread-safety

606 views
Skip to first unread message

Matt Bennett

unread,
Dec 18, 2014, 10:05:47 AM12/18/14
to carrot...@googlegroups.com
Hi,

I'm using kombu, py-amqp and eventlet together, and trying to establish the safety of accessing channels/messages from multiple threads. 

The ConsumerMixin docs mention that it can be used with or without threads, green or otherwise. But I understood that py-amqp connections (presumably their underlying sockets) are not thread-safe, which seems contradictory.

I made an example that consumes messages in one thread and acks then in another. This seems to work but I'm not convinced it isn't by accident: https://gist.github.com/mattbennett/30cc95d42346df62a60e

I found the "experimental" async consumer example [1] and did some digging into the hub implementation. My (perhaps incorrect) understanding is that the hub looks after the file descriptors and mediates access to them. The consumer in the async example registers itself as a reader on the connection socket, but I don't understand how writes are controlled.

Am I correct in thinking that connections cannot be safely accessed from multiple threads, and that my example above does only work by accident? Is this the sort of thing that the async hub is intended for, and if so are there any more examples of how to use it?

Thanks,
Matt.

[1] https://github.com/celery/kombu/blob/master/examples/experimental/async_consume.py

Robert Myers

unread,
Dec 18, 2014, 6:59:18 PM12/18/14
to carrot...@googlegroups.com
Matt,

     I don't know if I'm doing it right myself or not, but the way I'm handling it is spinning off the thread then establishing a new connection to MQ inside the function that I threaded off.  Any message passing for purposes of thread continuity I'm handling in other ways.

     It seems to be working for us this way.  

-Bob


--
You received this message because you are subscribed to the Google Groups "carrot-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to carrot-users...@googlegroups.com.
To post to this group, send email to carrot...@googlegroups.com.
Visit this group at http://groups.google.com/group/carrot-users.
For more options, visit https://groups.google.com/d/optout.

Matt Bennett

unread,
Dec 22, 2014, 10:40:41 AM12/22/14
to carrot...@googlegroups.com
Hi Bob,

Thanks for replying. Do you ack the message as soon as it's received, or after it's been processed by your new thread?

As far as I'm aware you can't ack a message in a connection other than the one it was received from. So if you want to wait until after a message has been processed before you ack or requeue, you either have to do it from the "handler" thread (as my example does, which I think is unsafe) or orchestrate some inter-thread communication so the "consumer" thread knows when to do it.

I guess I'm looking for some confirmation one way or the other, and maybe some pointers on the async hub if this is what it's intended to be used for.

Thanks,
Matt.

Robert Myers

unread,
Dec 22, 2014, 11:32:29 AM12/22/14
to carrot...@googlegroups.com
Matt,

    For the purposes of this project, we ACK it as soon as we get it.  I was having a problem with disconnects due to some network issues outside of my control, so instead of writing a complicated reconnect strategy and trying to work with that, we simply decided that due to our workload, ACKing the message straight away was the 'best' way to handle it.  As far as the other queues we consume, heartbeat etc.,... we don't ACK those, as we're getting them once a second, and it seemed that requiring an ACK on those was a little much.  And for those queues that don't require ACK, I setup another connection in the thread, the same way I do for the thread that's doing the work...

   I think in general I struggled with some of the same questions you did in terms of handling it all in one thread, or somehow passing this all back and forth.  I took the lazy way out and re-architected the process so that some of this wouldn't matter.  But I think that's all going to depend on workload, ours isn't necessarily sensitive to messages that get ACK'd then don't do the work, if that makes sense.

-Bob
Message has been deleted

Matt Bennett

unread,
Dec 30, 2014, 3:05:58 PM12/30/14
to carrot...@googlegroups.com
Thanks again Bob. Yes, ack’ing immediately neatly avoids the whole thread-safety issue by finishing the communication with the broker before spawning another thread. Unfortunately that is not a solution available to me…

I’m starting to answer my own questions now, so I’ll record them here for posterity. My new understanding of the world is:

* Kombu, via PyAMQP, is normally synchronous; calls that expect a result will block until it arrives. There’s no concept of threads or concurrency built in.

* There is an experimental asynchronous hub that lets you consume without blocking, giving you concurrent reads. It is single-threaded and implemented as callbacks. So again there is no consideration for threaded usage or concurrency of anything other than consuming (all other operations remain blocking).

* You can use a library like eventlet to make synchronous operations like those in PyAMQP appear to be concurrent, but you have to be careful not to break any assumptions based on blocking operation. For example, in eventlet writing to a socket is not an atomic operation - your thread can be swapped out for another one mid-write. If that other thread also writes to the socket, the messages become interleaved (and therefore garbage). This is a general eventlet problem and nothing to do with PyAMQP, except that it doesn’t allow for itself to be safely used by threads (e.g. by locking around operations on shared objects, like a socket). As far as I’m aware none of the Python AMQP libraries do except http://amqpy.readthedocs.org.

Incidentally the thing that made me wonder about thread-safety in kombu at all was the following sentence in the docs for the ConsumerMixin: “It can be used outside of threads, with threads, or greenthreads (eventlet/gevent) too.” I guess that means you can put a consumer inside a thread if you want to, but nothing more.
Reply all
Reply to author
Forward
0 new messages