Kombu: Async Consumer

1,830 views
Skip to first unread message

Michael Nelson

unread,
Jan 24, 2014, 8:20:21 AM1/24/14
to carrot...@googlegroups.com
Hello,

First, I apologize if this is the wrong place for this topic -- it seems as though carrot-users is the recommended place to talk about Kombu, but if not, it would be great if you could point me to the right place.

We are looking to use Kombu + AMQP as our message queuing interface in a new project.   We would like to have some consumers that work asynchronously since their work will be heavily IO bound, but the documentation wasn't entirely clear what the recommended method would be for accomplishing this.  I saw https://groups.google.com/forum/#!searchin/carrot-users/async/carrot-users/o5IoS9hZH3c/ypJ3o6km3ogJ , but thought that perhaps that thread is a bit outdated to rely on for the current state of kombu (v3) .

We noticed celery.async.hub as something with potential, but it isn't immediately obvious how to use, especially since we don't have a strong background in asynchronous programming frameworks in general.  Is the hub a good option for IO bound consumers, and if so, might there be a good example or walkthrough of how to use it?  If nothing else, I figure I can walk through the celery code and see what is done in there, but I was hoping someone might have some more expert advice on the best way to dig into this topic.  

Also, if you think of any expositions on async programming -- general or specific -- that might make things in the kombu and/or celery code easier to follow, e.g. whatever patterns are being used by kombu/celery, that would be much appreciated also.

Thanks,

Mike

Robert Myers

unread,
Jan 28, 2014, 11:48:24 AM1/28/14
to carrot...@googlegroups.com
Mike,

    I'll take a stab at this, and let you know how we're doing it.  I can't say if this is right per se, but it's how I've decided to tackle it.
  
    Basically, we have some processes that do some IO bound operations, and we instruct those processes to do their thing via MQ.

    So from the standpoint of async, we grab the messages from the queue, once we get one, we spin off the thread with the function that handles the message, all the while maintaining the heartbeat to the MQ server.  We've eschewed the examples where the eventloop is involved, because it doesn' t seem to meet our needs.  Then rinse/repeat.  The code we're using here is far more complex, but I think this should give you the gist of what we're trying to accomplish.

    YMMV


    Sample Code:


    with Connection(mq_connection_string, heartbeat=10) as connection:
       exchange = Exchange(mq_exchange_name, type=mq_exchange_type)(connection)
       exchange.declare()
       channel = connection.channel()
       queue = Queue(name=mq_queue_name, routing_key=mq_routing_key, exchange=exchange)(connection)
       bound_queue = queue(channel)
       bound_queue.declare()
       # perhaps something here to stop processing, like a flag/counter/timer
       while true:
           msg = bound_queue.get()
           if msg is not None:
               #kwargs here needs set so that handle_message is getting the proper arguments.
               my_thread = threading.Thread(target=handle_message, kwargs=kwargs)
               my_thread.daemon = true
               my_thread.start()
           connection.heartbeat_check(1)
           time.sleep(1)


-Bob


--
You received this message because you are subscribed to the Google Groups "carrot-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to carrot-users...@googlegroups.com.
To post to this group, send email to carrot...@googlegroups.com.
Visit this group at http://groups.google.com/group/carrot-users.
For more options, visit https://groups.google.com/groups/opt_out.

Ask Solem

unread,
Feb 3, 2014, 10:43:10 AM2/3/14
to carrot...@googlegroups.com
On Jan 24, 2014, at 1:20 PM, Michael Nelson <mjn...@gmail.com> wrote:

Hello,

First, I apologize if this is the wrong place for this topic -- it seems as though carrot-users is the recommended place to talk about Kombu, but if not, it would be great if you could point me to the right place.

We are looking to use Kombu + AMQP as our message queuing interface in a new project.   We would like to have some consumers that work asynchronously since their work will be heavily IO bound, but the documentation wasn't entirely clear what the recommended method would be for accomplishing this.  I saw https://groups.google.com/forum/#!searchin/carrot-users/async/carrot-users/o5IoS9hZH3c/ypJ3o6km3ogJ , but thought that perhaps that thread is a bit outdated to rely on for the current state of kombu (v3) .

We noticed celery.async.hub as something with potential, but it isn't immediately obvious how to use, especially since we don't have a strong background in asynchronous programming frameworks in general.  Is the hub a good option for IO bound consumers, and if so, might there be a good example or walkthrough of how to use it?  If nothing else, I figure I can walk through the celery code and see what is done in there, but I was hoping someone might have some more expert advice on the best way to dig into this topic.  

This is currently only used by Celery and the API is unstable in that it will probably change.

I’m currently working on full async support and the API will eventually be stabilized and compatible
with the tulip/asyncio efforts in Python 3.4.

The current code enables you to consume asynchronously, but the socket is still blocking so when a message
is available to be read it will block until it has received all of the message data.  Fixing this is my first goal,
then I will be working on async publish, and then exhange.declare,queue.declare/queue.bind/basic.ack/basic.qos.  Async connect and disconnect is not a priority for Celery  but that will also happen eventually.

Here’s an example using the kombu.async event loop to consume:



But I’m not sure why that would be useful to you as any code you want to execute at the same
time must use the same event loop, or you will have to move the existing code over to another event loop
(e.g twisted/tornado).

Using connection.drain_events() is almost the same for existing code unless you want to
consume from multiple connections.



Also, if you think of any expositions on async programming -- general or specific -- that might make things in the kombu and/or celery code easier to follow, e.g. whatever patterns are being used by kombu/celery, that would be much appreciated also.

Thanks,

Mike

Reply all
Reply to author
Forward
0 new messages