Simple Efficient Streams?

17 views
Skip to first unread message

Corbin Simpson

unread,
Jun 16, 2016, 8:27:05 PM6/16/16
to e-lang
Hi! I wrote a Monte stream library, and I wanted to explore the design space a bit and get feedback. Monte currently uses "tubes", an abstraction first introduced in the context of Twisted and still developed there, similar to iteratees in the ML world, but I recently felt like tubes were heavy.

So, I wrote some code that pares down the idea of "stream" to what I feel is bare-bones. I started with the classic lazy I/O linked list:

[1, [2, [3, [4, [5, null]]]]]

Nothing interesting. But, of course, Monte's a strict language, so this is not lazy at all. We can forcefully segment the assembly of the list across turns by making each link in the list into a promise:

def cons(l, x):
    return [x, when () -> { l }]

As the Haskell adage goes, we want to be "lazy in the spine, strict in the leaves". This gives us the same effect; the building of the next link in the list will happen later. But how much later? We don't want a stream to buffer itself in memory needlessly; we want to apply backpressure. Backpressure is normally generated by having some sort of pause, but I decided to try for a resume mechanism instead:

def cons(l, x):
    def [p, r] := Ref.promise()
    def resume():
        r.resolve(l)
    return [x, resume, p]

Looking good. Now, we'll only progress after downstream has indicated that they are prepared. (And the resumption is a capability which can be passed around if necessary; I rely on this for several optimizations!) Unfortunately, this isn't quite expressive enough, as it doesn't let us express any kind of failure. We can fix that with a bit of semantics and an explicit success flag:

def cons(l, x):
    def [p, r] := Ref.promise()
    def resume(b :Bool):
        if (b) { r.resolve(l) } else { r.smash("Cancelled stream") }
    return [x, resume, p]

Now it's possible to indicate, from downstream, that upstream should abort instead of continuing to process stuff. We also need a way for vice versa; to do this, upstream can simply smash the promise. Remaining cleanup can be handled by the GC and finalizers deeper in the runtime.

This is the total of the core. That's what makes it "simple". The "efficient" is that, assuming that scheduling turns on the current vat is relatively fast (as it is in Monte), all of the operations done on streams are things like chunks of .map() or .fold() which only process a small part of the stream and then return quickly. I wish that I could say that this looks like a standard loop to the JIT, but there's some stuff in the way that prevents that from being the case. Nonetheless, it costs about the same to dispatch from a tube as from a stream, and we get to skip all of the pause-related corner cases.

On top of this core, I've written a small maker that produces a rich stream object with all of the useful methods, including .map/1, .fold/2, .filter/1, etc. and also a parameterizable guard based on the recent breakthrough we had with Vows. My code's available here: https://github.com/MostAwesomeDude/mt-stream If you want to see the example in action, then jump right to https://github.com/MostAwesomeDude/mt-stream/blob/master/stream.mt#L215-L226

(The thing about Vows, if you missed it: https://github.com/monte-language/typhon/issues/107 )

I haven't done the stuff required to make these streams replace Typhon's current tube-based I/O system, but I don't think that it would be especially hard, and it would actually make some of the state management easier since each stream cell's logic is local.

Thanks for reading this far! I got to kind of a ramble.

Almost unrelated: I had to share this comic: https://ro-che.info/ccc/15

~ C.

Kevin Reid

unread,
Jun 21, 2016, 8:56:53 PM6/21/16
to e-l...@googlegroups.com
On Jun 16, 2016, at 17:27, Corbin Simpson <mostawe...@gmail.com> wrote:
> Hi! I wrote a Monte stream library, and I wanted to explore the design space a bit and get feedback. Monte currently uses "tubes", an abstraction first introduced in the context of Twisted and still developed there, similar to iteratees in the ML world, but I recently felt like tubes were heavy.

I want to comment on your design but I haven't gotten around to sitting down and thinking about it. A preliminary question:

It sounds like you are thinking about _input_ streams exclusively, no output streams. Is that the case?

(Output streams are of course simpler in a sense because the only things they need to do are error reporting and backpressure, but there are ways in which they ought to be — compatible.)

Corbin Simpson

unread,
Jun 30, 2016, 4:47:55 PM6/30/16
to e-lang

The way I view it is that the stream itself is the actual streaming data and the minimal structure around it that makes it work. Inputs and outputs are turned into producers and consumers which act on the stream. Producers put stuff into the stream, extending it, and consumers process values from the stream, advancing incrementally. This is in analogy to Monte iteration, which uses explicit iterator objects instead of implicit internal iteration. The iterator stands alone, even though there is code behind it producing new values and code calling it and consuming those values.

A prospective stream consumer would just be a runnable object which holds onto references into the stream for as long as necessary to do the processing. Since fold is universal, there's only a few different patterns that a consumer might want to have, and then it's all about writing the higher-order functions.

In practice, we've discussed objects like stdout, which should probably consume streams in order, by exhausting each stream, one by one. It's tempting to ask for interleaving, as in most other uses of stdio, but we're civilized folks who would prefer that stdout not be trashed by interleaved vat turns.

Kevin Reid

unread,
Jul 2, 2016, 10:21:47 AM7/2/16
to e-l...@googlegroups.com

On Jun 30, 2016, at 13:47, Corbin Simpson <mostawe...@gmail.com> wrote:
>
> On Tuesday, June 21, 2016 at 5:56:53 PM UTC-7, Kevin Reid wrote:
>> It sounds like you are thinking about _input_ streams exclusively, no output streams. Is that the case?
>>
>> (Output streams are of course simpler in a sense because the only things they need to do are error reporting and backpressure, but there are ways in which they ought to be — compatible.)
>
> The way I view it is that the stream itself is the actual streaming data and the minimal structure around it that makes it work. Inputs and outputs are turned into producers and consumers which act on the stream.

You can talk about the role of a _process_ (which I mean generally, including sequence-of-turns-which-call-each-other) that way, but what I claim is missing is that there is no _capability_ for output.

That is, I can have a reference to a source of stream-elements (a stream) but I can't have a reference to a sink of stream-elements (within this vocabulary).

(I'm using the word 'sink' to avoid saying 'output stream'.)

> In practice, we've discussed objects like stdout, which should probably consume streams in order, by exhausting each stream, one by one. It's tempting to ask for interleaving, as in most other uses of stdio, but we're civilized folks who would prefer that stdout not be trashed by interleaved vat turns.

I agree that it is useful to be able to write a non-interleaved sequence to a sink even if that sequence is not immediately available. It does mean that any writer is able to permanently prohibit output of any sort (by inserting a stream which does not resolve), which is undesirable for -- well, stderr in particular, when doing debugging/experimental development. (This could be fixed by having multiple stderr caps, where the 'stronger' ones (like the REPL/debugger/logging output) are allowed to output out-of-order -- perhaps trying to aim for "after a newline".)

However, this model express backpressure on the overall series of streams. You could insert a short stream as a probe, but given the interface for streams you specified the stream can't be zero-length. You can of course say "do backpressure within a stream", but then you're back to the issue of having no generic capability-to-a-sink.

Corbin Simpson

unread,
Jul 4, 2016, 3:54:23 PM7/4/16
to e-lang


On Saturday, July 2, 2016 at 7:21:47 AM UTC-7, Kevin Reid wrote:

On Jun 30, 2016, at 13:47, Corbin Simpson <mostawe...@gmail.com> wrote:
>
> On Tuesday, June 21, 2016 at 5:56:53 PM UTC-7, Kevin Reid wrote:
>> It sounds like you are thinking about _input_ streams exclusively, no output streams. Is that the case?
>>
>> (Output streams are of course simpler in a sense because the only things they need to do are error reporting and backpressure, but there are ways in which they ought to be — compatible.)
>
> The way I view it is that the stream itself is the actual streaming data and the minimal structure around it that makes it work. Inputs and outputs are turned into producers and consumers which act on the stream.

You can talk about the role of a _process_ (which I mean generally, including sequence-of-turns-which-call-each-other) that way, but what I claim is missing is that there is no _capability_ for output.

That is, I can have a reference to a source of stream-elements (a stream) but I can't have a reference to a sink of stream-elements (within this vocabulary).

(I'm using the word 'sink' to avoid saying 'output stream'.)

I like "sink". We can say "source" and "sink"; the main emphasis I want to have is on the streaming nature of the data flowing between source and sink (and thus the possibility of lightweight traversals on that stream.)
 
> In practice, we've discussed objects like stdout, which should probably consume streams in order, by exhausting each stream, one by one. It's tempting to ask for interleaving, as in most other uses of stdio, but we're civilized folks who would prefer that stdout not be trashed by interleaved vat turns.

I agree that it is useful to be able to write a non-interleaved sequence to a sink even if that sequence is not immediately available. It does mean that any writer is able to permanently prohibit output of any sort (by inserting a stream which does not resolve), which is undesirable for -- well, stderr in particular, when doing debugging/experimental development. (This could be fixed by having multiple stderr caps, where the 'stronger' ones (like the REPL/debugger/logging output) are allowed to output out-of-order -- perhaps trying to aim for "after a newline".)

Yeah. We could also view the "interleaved" nature of a sink as being the more unsafe version, and then put an in-order multiplexer on top of that. More on that idea in a moment.
 
However, this model express backpressure on the overall series of streams. You could insert a short stream as a probe, but given the interface for streams you specified the stream can't be zero-length. You can of course say "do backpressure within a stream", but then you're back to the issue of having no generic capability-to-a-sink.

Yes! (Minor nit: The stream `null` is of zero length.) So, perhaps more generally, we should think of sinks as objects like...

interface Sink:
    to run(s :Stream) :Vow
    to closeLater() :Vow
    to abortNow() :Vow
    to ordered() :Sink

The .closeLater() and .abortNow() methods would indicate that the sink should stop consuming and clean itself up, either after it finishes consuming all provided streams, or immediately, depending on which method one calls. Nothing new.

By default, .run() would consume a stream by interleaving it with other streams. The .ordered() method would return a new sink which feeds into this sink, but which doesn't interleave. I'm sure there's a more interesting way to express that sort of composition, but I can't think of it right now.

Kevin Reid

unread,
Jul 4, 2016, 4:52:24 PM7/4/16
to e-l...@googlegroups.com
On Jul 4, 2016, at 12:54, Corbin Simpson <mostawe...@gmail.com> wrote:
> On Saturday, July 2, 2016 at 7:21:47 AM UTC-7, Kevin Reid wrote:
>> However, this model express backpressure on the overall series of streams. You could insert a short stream as a probe, but given the interface for streams you specified the stream can't be zero-length. You can of course say "do backpressure within a stream", but then you're back to the issue of having no generic capability-to-a-sink.

Correction: In the above, I meant to write "this model _cannot_ express backpressure on the overall series of streams.

> Yes! (Minor nit: The stream `null` is of zero length.)

It is, but the producer cannot observe backpressure because null has no ask-for-more operation.

(Tangent: In my revised stream API for E, which unfortunately I never wrote down a design document for -- bother me to fix that! -- an input stream produced "chunks" (~sequences) of elements and was allowed to produce a zero-length chunk, so this was possible.)

> So, perhaps more generally, we should think of sinks as objects like...
>
> interface Sink:
> to run(s :Stream) :Vow
> to closeLater() :Vow
> to abortNow() :Vow
> to ordered() :Sink
>
> The .closeLater() and .abortNow() methods would indicate that the sink should stop consuming and clean itself up, either after it finishes consuming all provided streams, or immediately, depending on which method one calls. Nothing new.
>
> By default, .run() would consume a stream by interleaving it with other streams. The .ordered() method would return a new sink which feeds into this sink, but which doesn't interleave. I'm sure there's a more interesting way to express that sort of composition, but I can't think of it right now.

This seems like an overly complex design. It addresses my concerns about stderr, but as I mentioned briefly, I think those can and perhaps should be handled by having outside-the-sink-object operations allowing for interleaving and aborting, specific to stderr/stdout processing rather than arbitrary sinks.

Even ignoring that, it doesn't feel clean and orthogonal to me, but I don't have a good idea of how to improve it.

Corbin Simpson

unread,
Jul 4, 2016, 5:11:37 PM7/4/16
to e-lang
On Monday, July 4, 2016 at 1:52:24 PM UTC-7, Kevin Reid wrote:
Even ignoring that, it doesn't feel clean and orthogonal to me, but I don't have a good idea of how to improve it.

Yeah. I guess we're sticking with tubes for now. Back to the drawing board.
Reply all
Reply to author
Forward
0 new messages