Tornado stream interface

138 views
Skip to first unread message

A. Jesse Jiryu Davis

unread,
Sep 28, 2012, 12:35:36 PM9/28/12
to python-...@googlegroups.com
Hi, I've been reading about Node's stream interface, a standard set of methods on asynchronously readable or writable streams:

https://github.com/joyent/node/wiki/Streams

The benefit of standardizing how async streams work is, obviously, that you can mix and match them. E.g., a request is a readable stream, a response is a writable stream, and any filter is both. If you want to transform a stream of bytes into a stream of objects and write that to a database there could be a standard means of doing so.

Has anyone proposed or implemented something like this for Tornado? I think the obvious thing would be to abstract what IOStream already provides, but with some additional capabilities borrowed from Node's streams.

Ben Darnell

unread,
Sep 28, 2012, 1:43:57 PM9/28/12
to python-...@googlegroups.com
I think IOStream basically is that interface for Tornado (especially
in 3.0 where I've done some refactoring to make it less
socket-specific and add support for pipes). What sort of capabilities
are you thinking of borrowing from Node's streams? The Node stream
interface looks lower-level than the Tornado one.

A Jesse Jiryu Davis

unread,
Sep 28, 2012, 6:00:18 PM9/28/12
to python-...@googlegroups.com
Ah, I hadn't been following your BaseIOStream development. This looks *closer* to what I'm thinking about, but it's still limited to reading and writing bytes from something with a file-descriptor. I'm hoping for a more general interface for streaming *things*.

A small answer to your question, "What sort of capabilities are you thinking of borrowing from Node's streams?": Perhaps the readable and writable attributes and the pause() and resume() methods would be nice for Tornado streams.

A big answer: What I'm attracted to in Node is how its streams are a standard API for shipping data around, whether that data is bytes or objects or something else. For example the latest Node driver for MongoDB has a CursorStream that reads documents.

This standard makes streams composable and pluggable. There's no similar agreement among Tornado-related libraries. For example, OutputTransform is a different API from IOStream. The Tornado database drivers don't provide an IOStream-like interface for getting rows from cursors.

I could get the show started by making Motor's GridFS interface more IOStream-like, but returning Python dictionaries instead of bytes.

Ben Darnell

unread,
Sep 28, 2012, 10:21:09 PM9/28/12
to python-...@googlegroups.com
On Fri, Sep 28, 2012 at 3:00 PM, A Jesse Jiryu Davis
<ajd...@cs.oberlin.edu> wrote:
> Ah, I hadn't been following your BaseIOStream development. This looks
> *closer* to what I'm thinking about, but it's still limited to reading and
> writing bytes from something with a file-descriptor. I'm hoping for a more
> general interface for streaming *things*.

If you want streaming of general objects then I think BaseIOStream may
actually be moving in the wrong direction. The methods defined on
IOStream and PipeIOStream could be mapped to something like Node's
streams, but at the BaseIOStream level everything is distinctly
bytes-oriented. If we had an interface like what you're proposing
then instead of BaseIOStream as a superclass, we might have a
BufferedStream class that wraps a SocketStream or PipeStream.

>
> A small answer to your question, "What sort of capabilities are you thinking
> of borrowing from Node's streams?": Perhaps the readable and writable
> attributes and the pause() and resume() methods would be nice for Tornado
> streams.

I can see the need for pause/resume in an asynchronous object queue,
but I don't think it makes sense in conjunction with the other
IOStream methods. An IOStream is paused if there are no pending read
operations, but if there is an outstanding read why would you want to
pause? I suppose the one case where it would be useful would be if
you're forgoing IOStream's buffering and just using read_until_close
with a streaming_callback, although in that case maybe it's better to
introduce a read_some() method that would let this be structured as a
series of independent reads instead of one large one.

>
> A big answer: What I'm attracted to in Node is how its streams are a
> standard API for shipping data around, whether that data is bytes or objects
> or something else. For example the latest Node driver for MongoDB has a
> CursorStream that reads documents.
>
> This standard makes streams composable and pluggable. There's no similar
> agreement among Tornado-related libraries. For example, OutputTransform is a
> different API from IOStream. The Tornado database drivers don't provide an
> IOStream-like interface for getting rows from cursors.
>
> I could get the show started by making Motor's GridFS interface more
> IOStream-like, but returning Python dictionaries instead of bytes.

I think it probably makes sense to keep the byte and object interfaces
distinct. With objects you usually want to work with one object at a
time, while with bytes you nearly always want multiple bytes (and
there are well-established patterns for specifying this). I'd look at
Queue (or Node's streams) rather than IOStream as a role model for
this interface.

-Ben

A. Jesse Jiryu Davis

unread,
Sep 29, 2012, 12:02:16 PM9/29/12
to python-...@googlegroups.com, b...@bendarnell.com
Hi Ben. Yeah, I think you're right. =)

Regarding byte streams, do you think it'd be useful to include something like Node's Stream.pipe(other_stream) in BaseIOStream?:

http://docs.nodejitsu.com/articles/advanced/streams/how-to-use-stream-pipe

Additionally, what if we moved read_until, read_until_regex, and read_until_close into a mixin? Then they could be used with non-fd-based stream-like things. Currently, it's hard to provide read_until_regex on a byte-stream from Mongo's GridFS, or a byte-stream that's being parsed from some other stream, like a streaming decompressor or decrypter. It seems like we could decouple the methods that use file descriptors from these higher-level methods. Does that make sense?

And regarding streams of objects, I think you're right about using Queue as the model. As as side project I'm playing with a Queue for Tornado. I'm curious what you think. The API is like:

class TornadoQueue:
    __init__(max_size=None)
    put(item, callback=None, timeout=None)
    get(callback=None, timeout=None)


put() executes callback after item is enqueued, or if the queue stays full for timeout seconds, callback is passed an error. put() without a callback is non-blocking, and raises an error if the queue is full. get() is behaves analogously. So gen.engine-decorated functions could use TornadoQueue much like Python's standard Queue:

q = TornadoQueue(10)

@gen.engine
def producer(q):
    while True:
        item = yield Task(make_item) # Do some I/O
        yield Task(q.put, item)

@gen.engine
def consumer(q):
    while True:
        item = yield Task(q.get)
        yield Task(do_something_with, item) # Do some I/O

producer(q)
consumer(q)


Thoughts? Seem useful?
Reply all
Reply to author
Forward
0 new messages