@coroutine does not replace @asynchronous?

108 views
Skip to first unread message

Morgan Tørvolt

unread,
Mar 24, 2015, 4:14:41 PM3/24/15
to python-...@googlegroups.com
I have a problem using @coroutine and @asynchronous together. The code below connects to a rabbitmq message broker. It will make sure the queue is declared, set up a consumer in the queue, and should wait until some data arrives.
I used to go with @web.asynchronous, and that worked fine. Now I need to use yield for some functions, and that complicates matters it seems. The documentation says that @coroutine lets you run asynchronous stuff, and that @asynchronous is not needed anymore, ref: http://tornado.readthedocs.org/en/latest/web.html#decorators

The complication for me is that the basic_consume will call the callback function numerous times, one for each message it receives. In order to buffer up a few messages before returning, I add a basic_cancel after the first message is received so the consumer can consume what is in the queue (and call the callback with each one) before the self.finish() is called from the basic_cancel callback.

I cannot use gen.Task on the basic_consume, as it will do multiple callbacks, the callback argument is not called "callback" but "consumer_callback", and I need the return value for the basic_cancel message later.

Is there a way to combine @coroutine and @asynchronous that will actually work?


class MessageGet(web.RequestHandler):
   
@gen.coroutine
   
def get( self, id ):
       
self.pika_client = self.application.settings.get('pika_client')
       
self.mq_ch = self.pika_client.channel
       
self.canceled = False
       
# Ensure we have a queue to consume
       
yield gen.Task(self.mq_ch.queue_declare, exclusive=False, queue=id)
       
# Consume messages from queue
       
self.consumer_tag = self.mq_ch.basic_consume(consumer_callback=self.receive_message,
                                                     queue
=self.queue_name)
   
def receive_message(self, channel, method, header, body):
       
self.mq_ch.basic_ack( method.delivery_tag )
       
# Handle message content here.
       
self.write( "stuff" )
       
if not self.canceled:
           
# Cancel the consumer only once
           
self.canceled = True
           
self.mq_ch.basic_cancel( self.receive_cancel, self.consumer_tag )

   
def receive_cancel( self, info ):
       
self.write( "stuff" )
       
self.finish()




Ben Darnell

unread,
Mar 24, 2015, 8:48:42 PM3/24/15
to Tornado Mailing List
When you use @gen.coroutine on a top-level method like get(), finish() will be called for you automatically. Therefore, you have to keep the coroutine alive until you want to finish the request; you can no longer call finish() explicitly. To combine coroutines and callbacks it helps to use classes from toro (http://toro.readthedocs.org), which will be included in Tornado in the upcoming 4.2 release:

    @gen.coroutine
    def get(self, id):
        ...
        self.canceled = toro.Event()
        self.consumer_tag = self.mq_ch.basic_consume(consumer_callback=self.receive_message, queue=self.queue_name)
        yield self.canceled wait()
        self.write('stuff from receive_cancel')

    def receive_message(self, channel, method, header, body):
        self.mq_ch.basic_ack(method.delivery_tag)
        if not self.canceled.is_set():
            self.mq.ch.basic_cancel(self.canceled.set, self.consumer_tag)

-Ben

--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Morgan Tørvolt

unread,
Mar 25, 2015, 4:48:39 AM3/25/15
to python-...@googlegroups.com, b...@bendarnell.com
When you use @gen.coroutine on a top-level method like get(), finish() will be called for you automatically. Therefore, you have to keep the coroutine alive until you want to finish the request; you can no longer call finish() explicitly. To combine coroutines and callbacks it helps to use classes from toro (http://toro.readthedocs.org), which will be included in Tornado in the upcoming 4.2 release:

That is contrary to the documentation I linked to, is it not?
 
    @gen.coroutine
    def get(self, id):
        ...
        self.canceled = toro.Event()
        self.consumer_tag = self.mq_ch.basic_consume(consumer_callback=self.receive_message, queue=self.queue_name)
        yield self.canceled wait()
        self.write('stuff from receive_cancel')

    def receive_message(self, channel, method, header, body):
        self.mq_ch.basic_ack(method.delivery_tag)
        if not self.canceled.is_set():
            self.mq.ch.basic_cancel(self.canceled.set, self.consumer_tag)

That looks quite nice. How does it work though, threading? I will be having quite many connections, and I would like to avoid threading if I can.
For now, using @gen.engine (which in the code looks quite similar to coroutine to me, except the automatic finish) together with @asynchronous, and that seems to solve my problem. Not very well documented though.

-Morgan-

Ben Darnell

unread,
Mar 25, 2015, 10:16:05 AM3/25/15
to Morgan Tørvolt, Tornado Mailing List
On Wed, Mar 25, 2015 at 4:48 AM, Morgan Tørvolt <mor...@torvolt.com> wrote:
When you use @gen.coroutine on a top-level method like get(), finish() will be called for you automatically. Therefore, you have to keep the coroutine alive until you want to finish the request; you can no longer call finish() explicitly. To combine coroutines and callbacks it helps to use classes from toro (http://toro.readthedocs.org), which will be included in Tornado in the upcoming 4.2 release:

That is contrary to the documentation I linked to, is it not?

I hadn't thought of it this way but yes, the documentation is unclear. When you use both @coroutine and @asynchronous, the @asynchronous decorator is effectively ignored and everything works as if you used @coroutine alone. There's not actually any reason to do this except habits formed in the Tornado 2.x days, so we should probably just deprecate the use of both decorators together. Furthermore, once we get rid of the current unhelpful combination, we can consider making it work the way you thought it did: finish() is not called automatically at the end of an asynchronous coroutine, to make it easier to start with a coroutine and finish with callbacks.
 
 
    @gen.coroutine
    def get(self, id):
        ...
        self.canceled = toro.Event()
        self.consumer_tag = self.mq_ch.basic_consume(consumer_callback=self.receive_message, queue=self.queue_name)
        yield self.canceled wait()
        self.write('stuff from receive_cancel')

    def receive_message(self, channel, method, header, body):
        self.mq_ch.basic_ack(method.delivery_tag)
        if not self.canceled.is_set():
            self.mq.ch.basic_cancel(self.canceled.set, self.consumer_tag)

That looks quite nice. How does it work though, threading? I will be having quite many connections, and I would like to avoid threading if I can.

No threading; it's all coroutines (that's why you have to use 'yield' when calling wait()).
 
For now, using @gen.engine (which in the code looks quite similar to coroutine to me, except the automatic finish) together with @asynchronous, and that seems to solve my problem. Not very well documented though.

Yes, this works. It's poorly documented because @gen.coroutine is superior to @gen.engine in nearly all cases (mostly for better exception handling), but the combination of @gen.engine and @asynchronous will do what you want here.

-Ben
 

-Morgan-

Ben Darnell

unread,
Mar 26, 2015, 11:13:40 PM3/26/15
to Morgan Tørvolt, Tornado Mailing List
On Wed, Mar 25, 2015 at 10:15 AM, Ben Darnell <b...@bendarnell.com> wrote:
On Wed, Mar 25, 2015 at 4:48 AM, Morgan Tørvolt <mor...@torvolt.com> wrote:
When you use @gen.coroutine on a top-level method like get(), finish() will be called for you automatically. Therefore, you have to keep the coroutine alive until you want to finish the request; you can no longer call finish() explicitly. To combine coroutines and callbacks it helps to use classes from toro (http://toro.readthedocs.org), which will be included in Tornado in the upcoming 4.2 release:

That is contrary to the documentation I linked to, is it not?

I hadn't thought of it this way but yes, the documentation is unclear. When you use both @coroutine and @asynchronous, the @asynchronous decorator is effectively ignored and everything works as if you used @coroutine alone. There's not actually any reason to do this except habits formed in the Tornado 2.x days, so we should probably just deprecate the use of both decorators together. Furthermore, once we get rid of the current unhelpful combination, we can consider making it work the way you thought it did: finish() is not called automatically at the end of an asynchronous coroutine, to make it easier to start with a coroutine and finish with callbacks.

I just realized that I think it works the way you expected if you reverse the decorators and use @coroutine before @asynchronous. This is subtle and I'm hesitant to actually endorse it though.

-Ben

Morgan Tørvolt

unread,
Mar 27, 2015, 4:08:14 AM3/27/15
to Ben Darnell, Tornado Mailing List
On Fri, Mar 27, 2015 at 4:13 AM, Ben Darnell <b...@bendarnell.com> wrote:
I just realized that I think it works the way you expected if you reverse the decorators and use @coroutine before @asynchronous. This is subtle and I'm hesitant to actually endorse it though.

Thanks for the tip Ben, but I did try that, and it was a no-go unfortunately.

-Morgan- 
Reply all
Reply to author
Forward
0 new messages