IOLoop wait on callback or detect if on IOLoop thread?

802 views
Skip to first unread message

Jeffrey Bush

unread,
Aug 27, 2014, 7:21:20 PM8/27/14
to python-...@googlegroups.com
Hi,

There seem to be two things in IOLoop lacking, for me at least. One I can implement outside of IOLoop myself, the other is difficult.

First, can you wait for a callback to finish? The cleanest way would be to have IOLoop.add_callback return a future. You can do it yourself (mostly):

def run_on_ioloop_and_wait(ioloop, cb, *args, **kwargs):
    f
= Future()
   
def run():
       
try:
            f
.set_result(cb(*args, **kwargs))
       
except Exception, e:
            f
.set_exception(e)
    ioloop
.add_callback(run)
   
return f.result()

It would be nice if this was in Tornado itself, maybe not in `add_callback`, but maybe in a new function `run_callback`.

Of course, that could cause huge problems if we are currently running on the ioloop we add things to which brings me to my second point. I would like to be able to detect if we are on the ioloop or not. `IOLoop.current() == ioloop` does not work since if there is no ioloop for the current thread, `ioloop == IOLoop.instance()` which will be true in many cases. The current() function needs a flag like `or_global_instance=True` which when manually set to False returns None instead of IOLoop.instance(). Another solution would be to add a function `def is_on_ioloop(): return getattr(IOLoop._current, "instance", None) is not None`.

A side notes: I have developed a writable file-like class that wraps a request-handler. This way you could give the file-like object to something like TarFile and it will stream the output (since my class auto-flushes by default). I also have the reverse: turns a streaming request handler into a readable file-like class so that the body can be read directly. Is there any interest in this to be shared? 

Jeff

Ben Darnell

unread,
Aug 27, 2014, 10:23:45 PM8/27/14
to Tornado Mailing List
On Wed, Aug 27, 2014 at 7:21 PM, Jeffrey Bush <je...@coderforlife.com> wrote:
Hi,

There seem to be two things in IOLoop lacking, for me at least. One I can implement outside of IOLoop myself, the other is difficult.

First, can you wait for a callback to finish? The cleanest way would be to have IOLoop.add_callback return a future. You can do it yourself (mostly):

def run_on_ioloop_and_wait(ioloop, cb, *args, **kwargs):
    f
= Future()
   
def run():
       
try:
            f
.set_result(cb(*args, **kwargs))
       
except Exception, e:
            f
.set_exception(e)
    ioloop
.add_callback(run)
   
return f.result()

It would be nice if this was in Tornado itself, maybe not in `add_callback`, but maybe in a new function `run_callback`.

This only works if cb is synchronous.  Why not just wrap the function in gen.coroutine so it returns the Future itself?

The final f.result() call doesn't work - Tornado Futures don't support blocking for results.
 

Of course, that could cause huge problems if we are currently running on the ioloop we add things to which brings me to my second point. I would like to be able to detect if we are on the ioloop or not. `IOLoop.current() == ioloop` does not work since if there is no ioloop for the current thread, `ioloop == IOLoop.instance()` which will be true in many cases. The current() function needs a flag like `or_global_instance=True` which when manually set to False returns None instead of IOLoop.instance(). Another solution would be to add a function `def is_on_ioloop(): return getattr(IOLoop._current, "instance", None) is not None`.


I find that wanting to ask whether you are on an IOLoop thread suggests a poorly-defined threading model.  Among other things, if there are multiple IOLoop threads you generally want to be on a specific one (and on a clean stack, etc).  Inquiring whether the current thread has an IOLoop introduces similar problems to reentrant mutexes.  Whenever a piece of code wants to guarantee something about which thread it is running on, it should make that so by transferring control with IOLoop.add_callback.  

-Ben
 
A side notes: I have developed a writable file-like class that wraps a request-handler. This way you could give the file-like object to something like TarFile and it will stream the output (since my class auto-flushes by default). I also have the reverse: turns a streaming request handler into a readable file-like class so that the body can be read directly. Is there any interest in this to be shared? 

Jeff

--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jeffrey Bush

unread,
Aug 28, 2014, 1:22:55 AM8/28/14
to python-...@googlegroups.com, b...@bendarnell.com
You are right. First off, I was using real Futures. But yes, a different approach is better. For the other part, I also agree that usually knowing that you are on a certain thread can be bad design.

After all that, I found the solution to my problem which I was trying to fix wit these. I had to add @web.asynchronous to a function that already had @gen.coroutine. The documentation made it seem like this would be redundant, but @gen.coroutine still causes finish() to be called automatically.

The last thing I am having a "problem" with is that when using @web.stream_request_body the get/post/... functions are not called until after the body is done being read in. However, from my point of view, and what I am trying to make, is that the self.request.body is a file-like object which you can read from during get/post/... functions. Right now its all working except the delay.

Jeff

Ben Darnell

unread,
Aug 29, 2014, 1:18:08 PM8/29/14
to Jeffrey Bush, Tornado Mailing List
On Thu, Aug 28, 2014 at 1:22 AM, Jeffrey Bush <je...@coderforlife.com> wrote:
You are right. First off, I was using real Futures. But yes, a different approach is better. For the other part, I also agree that usually knowing that you are on a certain thread can be bad design.

Knowing your execution context is crucial and there's nothing wrong with wanting to control it.  The problem is that properties of the thread (does it have an IOLoop? is it the main thread?) are only part of the picture.  It generally ends up being better to queue things up on the correct executor than to ask what state you're in.
 

After all that, I found the solution to my problem which I was trying to fix wit these. I had to add @web.asynchronous to a function that already had @gen.coroutine. The documentation made it seem like this would be redundant, but @gen.coroutine still causes finish() to be called automatically.

Be careful with this.  If you mix coroutines and callbacks (which it sounds like your'e doing since something must be calling finish() after the main method returns) it's easy to create situations where error handling doesn't work as you expect.  I would recommend making sure that the coroutine yields until it's ready to finish the request.
 

The last thing I am having a "problem" with is that when using @web.stream_request_body the get/post/... functions are not called until after the body is done being read in. However, from my point of view, and what I am trying to make, is that the self.request.body is a file-like object which you can read from during get/post/... functions. Right now its all working except the delay.

The easiest solution here is probably to just do everything from prepare() instead of from get/post.  It's a little ugly but it would have the ordering you want.

How do you deal with the fact that the file-like interface is synchronous? Is that where the other threads come in?

-Ben

Jeffrey Bush

unread,
Aug 29, 2014, 2:48:00 PM8/29/14
to python-...@googlegroups.com, je...@coderforlife.com, b...@bendarnell.com
I was considering putting everything in prepare() and then making that prepare call stream_get(), stream_post(), ... however I noticed that there is a prepare-future callback into the application ("_prepared_future") and I didn't know how this would mess things up not having that called until later.

The reason co-routines and callbacks is necessary is because of the following:

@tornado.web.asynchronous
@tornado.gen.coroutine
def get(self, filename):
   
with RequestResponder(self) as out:
       
yield self.application.executor.submit(self.application.read, filename, out)

Where RequestResponder is the file-like object that writes to the request handler (as you notice, this is done with an executor, so yes, on another thread). The RequestResponder does everything with callbacks (write(data), flush(), and finish()). Without @asynchronous when out is closed it posts the callback to finish, then get completes so finish is called in the current callback, then the posted callback is executed (calling finish() a second time). Sometimes, a write() will also get called after get() finishes. Without the yield statement, out would close immediately (at the end of the with statement). One solution is to do this instead, which isn't as "pretty" but equivalent:

@tornado.web.asynchronous
def get(self, filename):
   
out = RequestResponder(self)
    f
= self.application.executor.submit(self.application.read, filename, out)
    f
.add_done_callback(out.close)

As you point out, files typically use blocking. The RequestResponder does not use blocking (posts callbacks to the IOLoop) and my first thought was to make at least close() blocking but that would require what I initially asked about, but I don't think it is necessary. The reading version, RequestReader, by default blocks on all read functions waiting for enough data to come in (or end-of-stream). However, the functions also take a 'block' argument which when False always returns immediately with as much data is available (up to the amount requested). Of course to use that you would have to know that it is a RequestReader and not just any file-like object, so things like TarFile and GzipFile can't use that special feature.

The main reason I am doing all of this is to make the handlers as minimal as possible, basically just a call to an executor, and have separately threaded function actually do everything. That means that the separately threaded function needs to be able to read the request-body-stream in (if its big, otherwise I would just send the data as a param) and write back to the response (once again, if it is big, otherwise just return it).

Jeff

Ben Darnell

unread,
Aug 30, 2014, 1:01:57 PM8/30/14
to Jeffrey Bush, Tornado Mailing List
On Fri, Aug 29, 2014 at 2:47 PM, Jeffrey Bush <je...@coderforlife.com> wrote:
I was considering putting everything in prepare() and then making that prepare call stream_get(), stream_post(), ... however I noticed that there is a prepare-future callback into the application ("_prepared_future") and I didn't know how this would mess things up not having that called until later.

According to the docs, it should work because data_received may overlap with an asynchronous prepare (and that's what I was basing my recommendation on).  However, upon reviewing the code I think that comment is out of date, so simply making prepare a coroutine won't help you.  
 

The reason co-routines and callbacks is necessary is because of the following:

@tornado.web.asynchronous
@tornado.gen.coroutine
def get(self, filename):
   
with RequestResponder(self) as out:
       
yield self.application.executor.submit(self.application.read, filename, out)

Where RequestResponder is the file-like object that writes to the request handler (as you notice, this is done with an executor, so yes, on another thread). The RequestResponder does everything with callbacks (write(data), flush(), and finish()). Without @asynchronous when out is closed it posts the callback to finish, then get completes so finish is called in the current callback, then the posted callback is executed (calling finish() a second time). Sometimes, a write() will also get called after get() finishes. Without the yield statement, out would close immediately (at the end of the with statement). One solution is to do this instead, which isn't as "pretty" but equivalent:

@tornado.web.asynchronous
def get(self, filename):
   
out = RequestResponder(self)
    f
= self.application.executor.submit(self.application.read, filename, out)
    f
.add_done_callback(out.close)

As you point out, files typically use blocking. The RequestResponder does not use blocking (posts callbacks to the IOLoop) and my first thought was to make at least close() blocking but that would require what I initially asked about, but I don't think it is necessary. The reading version, RequestReader, by default blocks on all read functions waiting for enough data to come in (or end-of-stream). However, the functions also take a 'block' argument which when False always returns immediately with as much data is available (up to the amount requested). Of course to use that you would have to know that it is a RequestReader and not just any file-like object, so things like TarFile and GzipFile can't use that special feature.

That's normal behavior for a file-like object; file.read may return less than the requested amount (the blocking parameter is an out-of-band flag on the file-like object, instead of an argument to read()).  However, modules expecting blocking files won't understand the exception you need to raise when there are zero bytes available.
 

The main reason I am doing all of this is to make the handlers as minimal as possible, basically just a call to an executor, and have separately threaded function actually do everything. That means that the separately threaded function needs to be able to read the request-body-stream in (if its big, otherwise I would just send the data as a param) and write back to the response (once again, if it is big, otherwise just return it).


I think you can do this with a combination of prepare and post/put:

  def prepare(self):
    self.out = RequestResponder(self)
    self.future = self.application.executor.submit(...)

  @gen.coroutine
  def post(self):
    try:
      yield self.future
    finally:
      self.out.close()

  put = post

Now whatever you submit to the executor can do blocking I/O on the RequestResponder and when it's done control comes back to post() to clean up.

-Ben
Reply all
Reply to author
Forward
0 new messages