Hi! I wrote a Monte stream library, and I wanted to explore the design space a bit and get feedback. Monte currently uses "tubes", an abstraction first introduced in the context of Twisted and still developed there, similar to iteratees in the ML world, but I recently felt like tubes were heavy.
So, I wrote some code that pares down the idea of "stream" to what I feel is bare-bones. I started with the classic lazy I/O linked list:
[1, [2, [3, [4, [5, null]]]]]
Nothing interesting. But, of course, Monte's a strict language, so this is not lazy at all. We can forcefully segment the assembly of the list across turns by making each link in the list into a promise:
def cons(l, x):
return [x, when () -> { l }]
As the Haskell adage goes, we want to be "lazy in the spine, strict in the leaves". This gives us the same effect; the building of the next link in the list will happen later. But how much later? We don't want a stream to buffer itself in memory needlessly; we want to apply backpressure. Backpressure is normally generated by having some sort of pause, but I decided to try for a resume mechanism instead:
def cons(l, x):
def [p, r] := Ref.promise()
def resume():
r.resolve(l)
return [x, resume, p]
Looking good. Now, we'll only progress after downstream has indicated that they are prepared. (And the resumption is a capability which can be passed around if necessary; I rely on this for several optimizations!) Unfortunately, this isn't quite expressive enough, as it doesn't let us express any kind of failure. We can fix that with a bit of semantics and an explicit success flag:
def cons(l, x):
def [p, r] := Ref.promise()
def resume(b :Bool):
if (b) { r.resolve(l) } else { r.smash("Cancelled stream") }
return [x, resume, p]
Now it's possible to indicate, from downstream, that upstream should abort instead of continuing to process stuff. We also need a way for vice versa; to do this, upstream can simply smash the promise. Remaining cleanup can be handled by the GC and finalizers deeper in the runtime.
This is the total of the core. That's what makes it "simple". The "efficient" is that, assuming that scheduling turns on the current vat is relatively fast (as it is in Monte), all of the operations done on streams are things like chunks of .map() or .fold() which only process a small part of the stream and then return quickly. I wish that I could say that this looks like a standard loop to the JIT, but there's some stuff in the way that prevents that from being the case. Nonetheless, it costs about the same to dispatch from a tube as from a stream, and we get to skip all of the pause-related corner cases.
On top of this core, I've written a small maker that produces a rich stream object with all of the useful methods, including .map/1, .fold/2, .filter/1, etc. and also a parameterizable guard based on the recent breakthrough we had with Vows. My code's available here:
https://github.com/MostAwesomeDude/mt-stream If you want to see the example in action, then jump right to
https://github.com/MostAwesomeDude/mt-stream/blob/master/stream.mt#L215-L226(The thing about Vows, if you missed it:
https://github.com/monte-language/typhon/issues/107 )
I haven't done the stuff required to make these streams replace Typhon's current tube-based I/O system, but I don't think that it would be especially hard, and it would actually make some of the state management easier since each stream cell's logic is local.
Thanks for reading this far! I got to kind of a ramble.
Almost unrelated: I had to share this comic:
https://ro-che.info/ccc/15~ C.