Tornado as a streaming server

882 views
Skip to first unread message

Dmitry Popov

unread,
Dec 17, 2014, 9:34:59 AM12/17/14
to python-...@googlegroups.com
Hi all,
I'm working on a project that requires streaming as a proxy-server with some business logic layer. 
In other words, proxy must receive request headers, interact with some logical layer, then get request body data chunks and stream them to other servers.
From the documentation, I realized that I need to look at @stream_request_body, AsyncHttpClient and body_producer, How can I transfer data streamed between http server request and AsyncHttpClient?

Dmitry Popov

unread,
Dec 19, 2014, 7:00:07 AM12/19/14
to python-...@googlegroups.com

I wrote the following code:
   
@tornado.web.stream_request_body
class MyHandler(tornado.web.RequestHandler):
    @coroutine
    def prepare(self):

        self.client = tornado.httpclient.AsyncHTTPClient()
        self.req = tornado.httpclient.HTTPRequest( url, method,
                                                   body_producer=self.body_producer, headers )
        self.write_func = None
        self.producer_future = Future()
        try:
            response = yield self.client.fetch(self.req)
            print(response)
        except Exception as ex:
            print ('Exception:', ex)

    def body_producer(self, write):
       
if self.write_func is None:
           
self.write_func = write
       
return self.producer_future

   
@coroutine
   
def data_received(self, chunk):
       
if self.write_func is not None:
           
yield self.write_func(chunk)

   @coroutine
    def put(self):
        self.producer_future.set_result(None)
        self.finish()
But it does not work, cause data_received is not invoked until prepare is complete.
Any ideas?

Ben Darnell

unread,
Dec 19, 2014, 5:59:01 PM12/19/14
to Tornado Mailing List
In prepare(), start the fetch but don't wait for it to finish. Instead, save the Future and yield it in put().

  def prepare(self):
    ...
    self.fetch_future = self.client.fetch(self.req)

  @coroutine
  def put(self):
    response = yield self.fetch_future
    print(response)

-Ben

--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Dmitry Popov

unread,
Dec 22, 2014, 1:53:52 AM12/22/14
to python-...@googlegroups.com, b...@bendarnell.com
Thank you Ben, I have tried this version on Friday and it also does not work, because body_producer is called only after yield.

суббота, 20 декабря 2014 г., 1:59:01 UTC+3 пользователь Ben Darnell написал:

Ben Darnell

unread,
Jan 11, 2015, 4:25:20 PM1/11/15
to Tornado Mailing List
I've put together a complete example that shows how to forward from data_received to an HTTP request with body_producer:

"""Demo of streaming requests with Tornado.
This script features a client using AsyncHTTPClient's body_producer
feature to slowly produce a large request body, and two server
handlers to receive this body (one is a proxy that forwards to the
other, also using body_producer).
It also demonstrates flow control: if --client_delay is smaller than
--server_delay, the client will eventually be suspended to allow the
server to catch up. You can see this in the logs, as the "client
writing" lines are initially once a second but eventually become less
frequent (the details are platform-dependent and adjustable with
setsockopt, but with the defaults on my Mac the delays start to show
up around chunk 18).
Tested with Python 3.4, Tornado 4.1.dev1 (master branch from 11 Jan 2015),
and Toro 0.7.
Runs on Tornado 4.0 and higher, but 4.0 has a bug with flow control on
kqueue platforms (Mac and BSD) so to see the flow control effects on
those platforms you'll need latest source from github (until 4.1 is released).
"""

Dmitry Popov

unread,
Jan 22, 2015, 10:46:07 AM1/22/15
to python-...@googlegroups.com, b...@bendarnell.com
Ben, it works! Thanks a lot!
But i don't understand, how body_producer is called before "yield self.fetch_future"?

понедельник, 12 января 2015 г., 0:25:20 UTC+3 пользователь Ben Darnell написал:

Ben Darnell

unread,
Jan 22, 2015, 9:53:32 PM1/22/15
to Tornado Mailing List
On Thu, Jan 22, 2015 at 10:46 AM, Dmitry Popov <ip.di...@gmail.com> wrote:
Ben, it works! Thanks a lot!
But i don't understand, how body_producer is called before "yield self.fetch_future"?

The fetch starts as soon as you call `AsyncHTTPClient.fetch()`; it runs in the background on the IOLoop. `yield self.fetch_future` just waits for the operation that has already begun to continue.

This is different from the old gen.Task model: Tasks did not start until they were yielded.

-Ben
Reply all
Reply to author
Forward
0 new messages