Streams work

24 views
Skip to first unread message

Rich Hickey

unread,
Jan 21, 2009, 2:33:23 PM1/21/09
to Clojure
I've started documenting the streams work I have been doing, for those
interested:

http://clojure.org/streams

Feedback welcome,

Rich

Timothy Pratley

unread,
Jan 21, 2009, 4:13:47 PM1/21/09
to Clojure
> Feedback welcome

Brilliant! :)

Frantisek Sodomka

unread,
Jan 21, 2009, 4:52:54 PM1/21/09
to Clojure
Hello Rich,
Looking forward to using them! It is pleasure to see such nice
development!

Frantisek

Vincent Foley

unread,
Jan 21, 2009, 7:40:55 PM1/21/09
to Clojure
I have a question regarding the examples, specifically map* and
filter*

(defn map* [f coll]
(let [iter (stream-iter coll)]
(stream
(fn [eos]
(let [x (next! iter eos)]
(if (= eos x) x (f x)))))))

(take 4 (map* inc (filter* even? (range 1000000))))
-> (1 3 5 7)

How is eos passed to the fn inside stream and what is it?

Vince

Jeremy Bondeson

unread,
Jan 21, 2009, 5:53:46 PM1/21/09
to Clojure
Excellent!

In case anyone else has the same problem I did: if the latest swank-
clojure blows up on you, merging in the changes from the trunk will
solve it. This is fairly painless as there are only a few conflicts.

Rich Hickey

unread,
Jan 21, 2009, 7:59:22 PM1/21/09
to Clojure


On Jan 21, 7:40 pm, Vincent Foley <vfo...@gmail.com> wrote:
> I have a question regarding the examples, specifically map* and
> filter*
>
> (defn map* [f coll]
> (let [iter (stream-iter coll)]
> (stream
> (fn [eos]
> (let [x (next! iter eos)]
> (if (= eos x) x (f x)))))))
>
> (take 4 (map* inc (filter* even? (range 1000000))))
> -> (1 3 5 7)
>
> How is eos passed to the fn inside stream and what is it?
>

The caller provides eos in calls to the (next! iter eos), which in
turn passes it to the generator. The caller should choose a value for
eos that will not be present in the set of items, which can easily be
created by calling (Object.), as is done by reduce*.

I've enhanced the docs to explain this, and commented the example.

Rich

e

unread,
Jan 21, 2009, 8:05:38 PM1/21/09
to clo...@googlegroups.com
I'm stopping to write this after the seq definition (bullet) to give you my exact feeling moving on.  You may find that that is ok and that I will get it by the end of the page:

What I assume at this point to be true is that if I first use an iter to consume some of the stream, that part is used up forever.  If I make a new iter, I start consuming from that point.  AND if I make a seq at this point, first will be the next piece of data that I would have seen in the iter.  That last statement isn't explicitely made, but I assume it to be true.

The other thought I have at this point is, "I wonder if it's a technical reason I can't detach a seq, too . . . .and then when I do iter, I start consuming again."  I'm guessing that any seq reference hanging around would then have to make a copy of the whole stream unlazilly, which would be dangerous because the unsuspecting user wouldn't know they were suffering a major performance hit.

ok, back to reading.  This looks really neat (from this newby's vantage point, at least).

e

unread,
Jan 21, 2009, 8:21:49 PM1/21/09
to clo...@googlegroups.com
I would think it would be useful to have something exactly like a stream but that allowed as many iterators as you like but that a mutex prevented any two from consuming the same piece of information.  So if a lot of agents had a handle to a stream, would they have to attach and detach iterators to use the stream in syncronized blocks?  Keep in mind, If never made an agent.  just speculating based on the reading and videos.

Rich Hickey

unread,
Jan 21, 2009, 8:23:27 PM1/21/09
to Clojure


On Jan 21, 8:05 pm, e <evier...@gmail.com> wrote:
> I'm stopping to write this after the seq definition (bullet) to give you my
> exact feeling moving on. You may find that that is ok and that I will get
> it by the end of the page:
>
> What I assume at this point to be true is that if I first use an iter to *
> consume* some of the stream, that part is used up forever. If I make a new
> iter, I start consuming from that point. AND if I make a seq at this point,
> *first* will be the next piece of data that I would have seen in the iter.
> That last statement isn't explicitely made, but I assume it to be true.

Yes, that's it.

>
> The other thought I have at this point is, "I wonder if it's a technical
> reason I can't detach a seq, too . . . .and then when I do iter, I start
> consuming again." I'm guessing that any seq reference hanging around would
> then have to make a copy of the whole stream unlazilly, which would be
> dangerous because the unsuspecting user wouldn't know they were suffering a
> major performance hit.
>

Well, it's the combination of a seq being persistent and a stream
being one-pass - to realize the seq you'll need that pass. Also, seqs
are an acceptable escaping value from the stream mechanism.

Rich

e

unread,
Jan 21, 2009, 8:39:03 PM1/21/09
to clo...@googlegroups.com

Well, it's the combination of a seq being persistent and a stream
being one-pass - to realize the seq you'll need that pass.

could be implemented still doing that pass . . . and -->  This may be where you are headed in the To Be Continued, but it out to be possible to wrap a sequence with a generator that then makes it work like a stream again.
 
Also, seqs
are an acceptable escaping value from the stream mechanism.

 I'm puzzling on this.  Are you saying the problem is that then someone could hand in the very sequence they made from a given stream when they are getting an iterator?  Hmmmmm.

Vincent Foley

unread,
Jan 21, 2009, 8:58:31 PM1/21/09
to Clojure
I made some tests, and if I am not mistaken, if an eos is not
specifically specified, Object is used, is that right?

user=>
(let [iter (stream-iter (range 5))]
(def s (stream (fn [eos]
(let [x (next! iter eos)]
(if (= eos x)
(do (println eos) eos)
x))))))
#'user/s
user=> (seq s)
(0 1 2 3 #<Object java.lang.Object@25c828>
4)



Vincent

Rich Hickey

unread,
Jan 21, 2009, 9:10:25 PM1/21/09
to clo...@googlegroups.com

On Jan 21, 2009, at 8:58 PM, Vincent Foley wrote:

>
> I made some tests, and if I am not mistaken, if an eos is not
> specifically specified, Object is used, is that right?
>
> user=>
> (let [iter (stream-iter (range 5))]
> (def s (stream (fn [eos]
> (let [x (next! iter eos)]
> (if (= eos x)
> (do (println eos) eos)
> x))))))
> #'user/s
> user=> (seq s)
> (0 1 2 3 #<Object java.lang.Object@25c828>
> 4)

You should never rely on nor care about the value of eos, just return
it if you are done.

It is always specified by some code, after all, it is a required arg
of the generator. It happens that in this case the consumer code is in
AStream.Seq, and is an Object.

Rich

Mark H.

unread,
Jan 22, 2009, 12:53:31 AM1/22/09
to Clojure
On Jan 21, 5:21 pm, e <evier...@gmail.com> wrote:
> I would think it would be useful to have something exactly like a stream but
> that allowed as many iterators as you like but that a mutex prevented any
> two from consuming the same piece of information.  

That might be useful for something, but it's hard to make correct in
all cases. The reason is that both stream elements and connections
between stream elements are mutable and shared. Here follows an
example.

Imagine iterating over a singly-linked list, where you delete each
node (explicitly, not by losing the reference) _and_ the link to the
next node right after you get the next node. (For example, a
directory of files might be implemented as a singly-linked list, and
you want to delete all the files _and_ the directory itself and print
all the files deleted, by only making one pass over the directory.)
You could implement this as a generator which is a closure over a let-
bound Java object (I've made up some utility function names below):

(defn make-dir-gen [dirname]
(let [dirstart (directory-list dirname)]
(fn [eos]
(loop [dirent dirstart]
(if (dir-empty? dirent)
eos
(let [link (next-link dirent)]
next (next-entry link)]
(delete-file! dirent)
(delete-link! link)
(recur next)))))))

Now imagine two threads T1 and T2 accessing this generator at the same
time. Suppose they reach the same node at the same time, and suppose
that you've protected file deletion and link deletion each
individually with a mutex (and forbade multiple deletions silently).
T1 might delete the file and the link and get to "recur next" before
T2 can call "(next-entry link)". Now T1 goes on but T2 is stuck:
"link" is _gone_ so "(next-entry link)" goes nowhere.

Protecting the data for access by multiple threads has to be the
responsibility of the person implementing the generator function,
because a generator function can have arbitrary side effects and
iterate over arbitrary mutable data structures with constantly
changing topology. (Imagine a generator that goes five times around a
circular list, then breaks the circular link and splices the list into
another list, leaving a list fragment floating.) It's too hard to try
to imagine a streams implementation that could "automatically make
everything thread-safe," or even to imagine what "thread-safe" means
in the context of general mutable data structures.

mfh

Rich Hickey

unread,
Jan 22, 2009, 8:18:01 AM1/22/09
to Clojure
Those are important things to think about.

There are, in fact, thread semantics for the streams mechanism, as for
the rest of Clojure. Currently, I've made it such that the stream/iter/
seq combination ensures serialized access to the generator, allowing
use of generators over otherwise unsynchronized multi-step activities.

Obviously, stream-backed seqs are safe, being persistent and
immutable. but it is also currently the case that the stream iter is
MT safe as well. All calls to next! on an iter are atomic re: its
generator, i.e. no 2 threads can be in the generator code at once.
That means that, within a private consumer, you could fire up multiple
threads that share the same iter and coordinate to produce a result,
and return the result or a stream upon it.

Used correctly, this is a powerful feature. However, I'm reluctant to
expose/guarantee it, lest people start using iters as connection
points and invalidating the whole stream protection model. A well-
written API should take/return streams/seqs, never iters, so MT use of
an iter should always be an implementation detail of a single stream
step. But I can't enforce that.

For those looking to experiment with such things, feel free to try it
now and provide feedback.

Thanks,

Rich

e

unread,
Jan 22, 2009, 8:29:23 AM1/22/09
to clo...@googlegroups.com

Now imagine two threads T1 and T2 accessing this generator at the same
time.  Suppose they reach the same node at the same time, and suppose
that you've protected file deletion and link deletion each
individually with a mutex (and forbade multiple deletions silently).
T1 might delete the file and the link and get to "recur next" before
T2 can call "(next-entry link)".  Now T1 goes on but T2 is stuck:
"link" is _gone_ so "(next-entry link)" goes nowhere.

hmmmmmm.  Good point.  I'm sure there's a lot to think about, but, at first, that sounds like a challenge.  Like now I'm wondering if what you could do is have the generator automatically update the list of handed out iterators automatically.  Then I realize you'd have to update eos, too.  But the client may already have checked eos.  So I see that Rich's idea is more obvious.  Rather than making magic streams, he shows how one might do the same thing by sharing a single iterator.

Konrad Hinsen

unread,
Jan 22, 2009, 9:08:38 AM1/22/09
to clo...@googlegroups.com
On 21.01.2009, at 20:33, Rich Hickey wrote:

> I've started documenting the streams work I have been doing, for those
> interested:
>
> http://clojure.org/streams

Nice!

I have played a bit with the stream implementation, and I came across
a behaviour that I do not understand:

First, define a random stream that calls rand, and an iter on it:

(def rand-stream (stream (fn [_] (rand))))
(def rand-iter (stream-iter rand-stream))

Calling it a few times shows that it works:

(next! rand-iter nil)
(next! rand-iter nil)

Next, try to use it as a seq:

(take 4 rand-stream)

This fails, as it should:

java.lang.IllegalStateException: Already iterating (NO_SOURCE_FILE:0)

Detach the iter and try again:

(detach! rand-iter)
(take 4 rand-stream)

Now it works - fine. But what happened to the seq that now owns the
stream? Nothing refers to it, so it should be gone. Did it perhaps
liberate the stream, so that I can create an iter again? Let's try:

(def rand-iter (stream-iter rand-stream))
(next! rand-iter nil)
(next! rand-iter nil)

It seems so. But... let's be mean:

(take 4 rand-stream)

I would expect this to throw the IllegalStateException again, but it
doesn't: it returns the same four-number sequence as the last time it
was called. Where was that one stored? In the stream itself? Or does
the stream keep a reference to the seq, so that it never disappears?
But then I shouldn't be able to create another iterator.

Let's be mean again:

(next! rand-iter nil)
(take 10 rand-stream)
(next! rand-iter nil)
(next! rand-iter nil)
(take 15 rand-stream)

All of these work - it seems I have both an iter and a seq on the
same stream, with the iter returning values that are also in the seq.

Konrad.

Christian Vest Hansen

unread,
Jan 22, 2009, 9:27:46 AM1/22/09
to clo...@googlegroups.com
On Thu, Jan 22, 2009 at 2:18 PM, Rich Hickey <richh...@gmail.com> wrote:
>
> Those are important things to think about.
>
> There are, in fact, thread semantics for the streams mechanism, as for
> the rest of Clojure. Currently, I've made it such that the stream/iter/
> seq combination ensures serialized access to the generator, allowing
> use of generators over otherwise unsynchronized multi-step activities.
>
> Obviously, stream-backed seqs are safe, being persistent and
> immutable. but it is also currently the case that the stream iter is
> MT safe as well. All calls to next! on an iter are atomic re: its
> generator, i.e. no 2 threads can be in the generator code at once.
> That means that, within a private consumer, you could fire up multiple
> threads that share the same iter and coordinate to produce a result,
> and return the result or a stream upon it.
>
> Used correctly, this is a powerful feature. However, I'm reluctant to
> expose/guarantee it, lest people start using iters as connection
> points and invalidating the whole stream protection model. A well-
> written API should take/return streams/seqs, never iters, so MT use of
> an iter should always be an implementation detail of a single stream
> step. But I can't enforce that.

Hmm... could we imagine a (synced-iter an-iter) that returned a
wrapped iter that _did_ guarantee MT safety?

Ofcourse this only makes sense if you end up not guaranteing MT safety
of ordinary iters, and I don't know enough to prefer one approach over
the other.

>
> For those looking to experiment with such things, feel free to try it
> now and provide feedback.
>
> Thanks,
>
> Rich
>
> >
>



--
Venlig hilsen / Kind regards,
Christian Vest Hansen.

Rich Hickey

unread,
Jan 22, 2009, 10:27:45 AM1/22/09
to Clojure


On Jan 22, 9:08 am, Konrad Hinsen <konrad.hin...@laposte.net> wrote:
> On 21.01.2009, at 20:33, Rich Hickey wrote:
>
> > I've started documenting the streams work I have been doing, for those
> > interested:
>
> >http://clojure.org/streams
>
> Nice!
>
> I have played a bit with the stream implementation, and I came across
> a behaviour that I do not understand:
>
> First, define a random stream that calls rand, and an iter on it:
>
> (def rand-stream (stream (fn [_] (rand))))
> (def rand-iter (stream-iter rand-stream))
>
> Calling it a few times shows that it works:
>
> (next! rand-iter nil)
> (next! rand-iter nil)
>
> Next, try to use it as a seq:
>
> (take 4 rand-stream)
>
> This fails, as it should:
>
> java.lang.IllegalStateException: Already iterating (NO_SOURCE_FILE:0)
>
> Detach the iter and try again:
>
> (detach! rand-iter)
> (take 4 rand-stream)
>
> Now it works - fine. But what happened to the seq that now owns the
> stream? Nothing refers to it, so it should be gone.

No, the stream must refer to it, in order to keep its promise to
return the same seq every time.

> Did it perhaps
> liberate the stream, so that I can create an iter again? Let's try:
>
> (def rand-iter (stream-iter rand-stream))
> (next! rand-iter nil)
> (next! rand-iter nil)
>
> It seems so. But... let's be mean:
>
> (take 4 rand-stream)
>
> I would expect this to throw the IllegalStateException again, but it
> doesn't: it returns the same four-number sequence as the last time it
> was called. Where was that one stored? In the stream itself? Or does
> the stream keep a reference to the seq, so that it never disappears?
> But then I shouldn't be able to create another iterator.

What you've created an iter on the second time is the seq of the
stream. Right now, once you've treated a stream as a seq it will
always behave like one. So this second stream-iter call actually
creates an iter on a stream on that seq.

>
> Let's be mean again:
>
> (next! rand-iter nil)
> (take 10 rand-stream)
> (next! rand-iter nil)
> (next! rand-iter nil)
> (take 15 rand-stream)
>
> All of these work - it seems I have both an iter and a seq on the
> same stream, with the iter returning values that are also in the seq.
>

No, you've got a generator, a stream on that, a seq on that, a stream
on that seq and an iter on that stream - that's why you are getting
the same values. You'll never have a seq and an iter on the same
generator, i.e. interleaved values.

I understand this may not be intuitive or clear yet from the docs. Nor
am I set in this being the behavior. The case I am looking towards is
this one:

(def s (stream (range 10)))
(if (seq s)
(take 4 (map-stream inc s))

A stream is used as a seq and then passed to a stream function.
Without this seqed-stream-behaves-as-seq capability, this will fail
with Already iterating, and would have to be written:

(if (seq s)
(take 4 (map-stream inc (seq s))))

Rich

Konrad Hinsen

unread,
Jan 22, 2009, 11:17:40 AM1/22/09
to clo...@googlegroups.com
On 22.01.2009, at 16:27, Rich Hickey wrote:

>> Now it works - fine. But what happened to the seq that now owns the
>> stream? Nothing refers to it, so it should be gone.
>
> No, the stream must refer to it, in order to keep its promise to
> return the same seq every time.

OK.

>> Did it perhaps
>> liberate the stream, so that I can create an iter again? Let's try:
>>
>> (def rand-iter (stream-iter rand-stream))
>> (next! rand-iter nil)
>> (next! rand-iter nil)

> What you've created an iter on the second time is the seq of the


> stream. Right now, once you've treated a stream as a seq it will
> always behave like one. So this second stream-iter call actually
> creates an iter on a stream on that seq.

Does that mean that calling seq on a stream converts the stream into
a seq for all practical purposes? That sounds a bit dangerous
considering that so many operations in Clojure call seq implicitly.
One can easily have a seq "steal" a stream and not notice it before
all memory is used up by the seq.

> I understand this may not be intuitive or clear yet from the docs. Nor
> am I set in this being the behavior. The case I am looking towards is
> this one:
>
> (def s (stream (range 10)))
> (if (seq s)
> (take 4 (map-stream inc s))
>
> A stream is used as a seq and then passed to a stream function.
> Without this seqed-stream-behaves-as-seq capability, this will fail
> with Already iterating, and would have to be written:
>
> (if (seq s)
> (take 4 (map-stream inc (seq s))))

I think the second is in fact clearer. It seems weird in a largely
functional context to have an enormous side-effect of calling seq on
a stream.

Konrad.

Stuart Sierra

unread,
Jan 22, 2009, 12:36:01 PM1/22/09
to Clojure
On Jan 21, 2:33 pm, Rich Hickey <richhic...@gmail.com> wrote:
> I've started documenting the streams work I have been doing, for those
> interested:

Cool! 3 questions:

1. Can you feed things into a stream?

2. Could streams be used for I/O?

3. Can streams have clean-up/close code when they are emptied or go
out of scope?

-Stuart Sierra

Rich Hickey

unread,
Jan 22, 2009, 1:50:27 PM1/22/09
to clo...@googlegroups.com

On Jan 22, 2009, at 11:17 AM, Konrad Hinsen wrote:

>
> On 22.01.2009, at 16:27, Rich Hickey wrote:
>
>>> Now it works - fine. But what happened to the seq that now owns the
>>> stream? Nothing refers to it, so it should be gone.
>>
>> No, the stream must refer to it, in order to keep its promise to
>> return the same seq every time.
>
> OK.
>
>>> Did it perhaps
>>> liberate the stream, so that I can create an iter again? Let's try:
>>>
>>> (def rand-iter (stream-iter rand-stream))
>>> (next! rand-iter nil)
>>> (next! rand-iter nil)
>
>> What you've created an iter on the second time is the seq of the
>> stream. Right now, once you've treated a stream as a seq it will
>> always behave like one. So this second stream-iter call actually
>> creates an iter on a stream on that seq.
>
> Does that mean that calling seq on a stream converts the stream into
> a seq for all practical purposes? That sounds a bit dangerous
> considering that so many operations in Clojure call seq implicitly.
> One can easily have a seq "steal" a stream and not notice it before
> all memory is used up by the seq.
>

Calling seq on a stream yields a seq that will forever own the stream
- if you think about it a bit, you'll see why that has to be the case.

OTOH, that seq is lazy, so I'm not sure what the memory issue is.

>> I understand this may not be intuitive or clear yet from the docs.
>> Nor
>> am I set in this being the behavior. The case I am looking towards is
>> this one:
>>
>> (def s (stream (range 10)))
>> (if (seq s)
>> (take 4 (map-stream inc s))
>>
>> A stream is used as a seq and then passed to a stream function.
>> Without this seqed-stream-behaves-as-seq capability, this will fail
>> with Already iterating, and would have to be written:
>>
>> (if (seq s)
>> (take 4 (map-stream inc (seq s))))
>
> I think the second is in fact clearer. It seems weird in a largely
> functional context to have an enormous side-effect of calling seq on
> a stream.
>

Again, I don't see the enormous side effect. Steams form a safe,
stateful pipeline, you'll generally only call seq on the end of the
pipe. If you ask for a seq on a stream you are asking for a (lazy)
reification. That reification and ownership is what makes the pipeline
safe.

I am working on seq/stream api unification right now, and we will see
how often we'll be calling seq fns yet subsequently using as a stream.
Many of those places where seq is called will now call stream instead
(e.g. sequence fn entry points), and there may be a non-generator-
capturing function for determining eos.

Rich

Rich Hickey

unread,
Jan 22, 2009, 1:55:50 PM1/22/09
to clo...@googlegroups.com

On Jan 22, 2009, at 12:36 PM, Stuart Sierra wrote:

>
> On Jan 21, 2:33 pm, Rich Hickey <richhic...@gmail.com> wrote:
>> I've started documenting the streams work I have been doing, for
>> those
>> interested:
>
> Cool! 3 questions:
>
> 1. Can you feed things into a stream?

Yes, you can put a generator on the end of a queue.

>
>
> 2. Could streams be used for I/O?
>

Yes, that's one of the primary use cases.

> 3. Can streams have clean-up/close code when they are emptied or go
> out of scope?


I'm addressing the resource cleanup issue more generally in a scope
mechanism, also present in the streams SVN branch. Still in progress,
but you can get the gist of it here:

http://paste.lisp.org/display/73838

Rich

evins...@gmail.com

unread,
Jan 22, 2009, 10:46:34 PM1/22/09
to Clojure
This work reminds me in a general way of the old Dylan iteration
protocol. They're not the same, and the Dylan iteration protocol does
not provide the safety for concurrency that you're working on. Still,
just in case any of the old ideas happen to be useful, I thought I'd
provide a link:

http://amigos.rdsathene.org/other/prefix-dylan/book.annotated/ch12.html#iteration%20protocol0

(this link is to a webbed version of the 1992 lisp-syntax version of
Dylan. the manual is getting increasingly hard to find online.)

Konrad Hinsen

unread,
Jan 23, 2009, 3:31:27 AM1/23/09
to clo...@googlegroups.com
On 22.01.2009, at 19:50, Rich Hickey wrote:

>>
>> Does that mean that calling seq on a stream converts the stream into
>> a seq for all practical purposes? That sounds a bit dangerous
>> considering that so many operations in Clojure call seq implicitly.
>> One can easily have a seq "steal" a stream and not notice it before
>> all memory is used up by the seq.
>>
>
> Calling seq on a stream yields a seq that will forever own the stream
> - if you think about it a bit, you'll see why that has to be the case.
>
> OTOH, that seq is lazy, so I'm not sure what the memory issue is.

If my understanding is correct, then

(def rand-stream (stream (fn [_] (rand))))

(take 5 rand-stream)

will create a seq on the stream that is referenced by the stream. As
long as the stream is referenced by a var, the seq will remain
referenced as well. Seqs being cached, this means that the whole
random number sequence will be kept in memory.

The only way to avoid this seems to be not calling any sequence
function on a stream. I could use for example

(defn take-stream
[n s]
(let [iter (stream-iter s)
eos (Object.)
vs (doall (for [_ (range n)] (next! iter eos)))]
(do (detach! iter) vs)))

(take-stream 5 rand-stream)

Writing take-stream made me discover another pitfall: the stream
seems to keep a reference to its iter object as well, meaning that is
never released without an explicit call to detach!. I had expected to
be able to create a "local" iter in a let and have it disappear and
release the stream when it goes out of scope. I guess that would
require the stream not to keep a reference to the iter, but just a
flag that an iter exists. Which in turn requires that the iter resets
the flag when it goes out of scope. I don't even know if that is
doable in the JVM.

> Again, I don't see the enormous side effect. Steams form a safe,
> stateful pipeline, you'll generally only call seq on the end of the
> pipe. If you ask for a seq on a stream you are asking for a (lazy)
> reification. That reification and ownership is what makes the pipeline
> safe.

Then why not make a pipeline using lazy sequences right from the
start? I don't see anything that I could do better with streams than
with lazy sequences.

Konrad.

Christophe Grand

unread,
Jan 23, 2009, 4:50:05 AM1/23/09
to clo...@googlegroups.com
Rich Hickey a écrit :

> Again, I don't see the enormous side effect. Steams form a safe,
> stateful pipeline, you'll generally only call seq on the end of the
> pipe. If you ask for a seq on a stream you are asking for a (lazy)
> reification. That reification and ownership is what makes the pipeline
> safe.
>
> I am working on seq/stream api unification right now, and we will see
> how often we'll be calling seq fns yet subsequently using as a stream.
> Many of those places where seq is called will now call stream instead
> (e.g. sequence fn entry points), and there may be a non-generator-
> capturing function for determining eos.
I undesrtand but I found this behaviour surprising :
user=> (defn take1 [s]
(let [i (stream-iter s)
n (next! i nil)]
(detach! i)
n))

(defn touch [s] (seq s) s)

(def s1 (stream (range 10)))
user=> (take1 s1)
0
user=> (take1 s1)
1
user=> (take1 s1)
2
user=> (touch s1)
#<AStream clojure.lang.AStream@19ee8a>
user=> (take1 s1)
3
user=> (take1 s1)
3
user=> (take1 s1)
3
; s1 is stuck on 3 because stream-iter returns a new iter on a new
stream on the canonical seq for s1


With the attached patch, you get:
user=> (defn take1 [s]
(let [i (stream-iter s)
n (next! i nil)]
(detach! i)
n))

(def s1 (stream (range 10)))
user=> (take1 s1)
0
user=> (take1 s1)
1
user=> (take1 s1)
2
user=> (seq s1)
(3 4 5 6 7 8 9)
user=> (identical? (seq s1) (seq s1))
true
user=> (take1 s1)
3
user=> (take1 s1)
4
user=> (first s1)
5
;; seq lookup or realization don't consume the stream:
user=> (seq s1)
(5 6 7 8 9)
user=> (identical? (seq s1) (seq s1))
true
user=> (first s1)
5
user=> (take1 s1)
5
user=> (take1 s1)
6

I relaxed the constraint saying that "a stream ensures that /*every call
to seq on a stream will return the same seq" to be */"a stream ensures
that /*every call to seq on a stream will return the same seq as long as
the stream state doesn't change".*/
/*What did I lose?

Christophe
*/

streams+seq.patch

Christophe Grand

unread,
Jan 23, 2009, 4:53:11 AM1/23/09
to clo...@googlegroups.com
Christophe Grand a écrit :

> I relaxed the constraint saying that "a stream ensures that /*every call
> to seq on a stream will return the same seq" to be */"a stream ensures
> that /*every call to seq on a stream will return the same seq as long as
> the stream state doesn't change".*/
> /*What did I lose?
>
> Christophe
> */
>
/* and */ are by courtesy of Thunderbird :-(

I relaxed the constraint saying that "a stream ensures that every call
to seq on a stream will return the same seq" to be "a stream ensures
that every call to seq on a stream will return the same seq as long as

the stream state doesn't change".

What did I lose?

Christophe


Christophe Grand

unread,
Jan 23, 2009, 5:06:51 AM1/23/09
to clo...@googlegroups.com
Christophe Grand a écrit :
> I relaxed the constraint saying that "a stream ensures that every call
> to seq on a stream will return the same seq" to be "a stream ensures
> that every call to seq on a stream will return the same seq as long as
> the stream state doesn't change".
Well currently it's stronger than that:
If you call seq on a stream, consume one item, call seq again then the
second seq is the rest of the first one.
This property being transitive you get:

ser=> (def s1 (stream (range 10)))
#'user/s1
user=> (take1 s1)(take1 s1)(take1 s1)
0
1
2
user=> (def seq1 (seq s1))
#'user/seq1
user=> seq1


(3 4 5 6 7 8 9)

user=> (take1 s1)(take1 s1)(take1 s1)
3
4
5
user=> (identical? (seq s1) (drop 3 seq1))
true

Rich Hickey

unread,
Jan 23, 2009, 8:04:10 AM1/23/09
to Clojure


On Jan 23, 3:31 am, Konrad Hinsen <konrad.hin...@laposte.net> wrote:
> On 22.01.2009, at 19:50, Rich Hickey wrote:
>
>
>
> >> Does that mean that calling seq on a stream converts the stream into
> >> a seq for all practical purposes? That sounds a bit dangerous
> >> considering that so many operations in Clojure call seq implicitly.
> >> One can easily have a seq "steal" a stream and not notice it before
> >> all memory is used up by the seq.
>
> > Calling seq on a stream yields a seq that will forever own the stream
> > - if you think about it a bit, you'll see why that has to be the case.
>
> > OTOH, that seq is lazy, so I'm not sure what the memory issue is.
>
> If my understanding is correct, then
>
> (def rand-stream (stream (fn [_] (rand))))
> (take 5 rand-stream)
>
> will create a seq on the stream that is referenced by the stream. As
> long as the stream is referenced by a var, the seq will remain
> referenced as well. Seqs being cached, this means that the whole
> random number sequence will be kept in memory.
>

Creating stateful streams and leaving them lying around in named
globals is not the intended use case. They are for immediate use in
computational pipelines. They are even less collections than are seqs,
i.e. not at all.

> The only way to avoid this seems to be not calling any sequence
> function on a stream. I could use for example
>
> (defn take-stream
> [n s]
> (let [iter (stream-iter s)
> eos (Object.)
> vs (doall (for [_ (range n)] (next! iter eos)))]
> (do (detach! iter) vs)))
>
> (take-stream 5 rand-stream)
>
> Writing take-stream made me discover another pitfall: the stream
> seems to keep a reference to its iter object as well, meaning that is
> never released without an explicit call to detach!. I had expected to
> be able to create a "local" iter in a let and have it disappear and
> release the stream when it goes out of scope.

Were that the case, then the map* and filter* examples wouldn't work,
The most common idiom is to obtain an iter on the incoming stream,
create a computational stage with a generator that wraps that iter,
and returns a stream that owns that generator. So certainly it can't
go out of scope at the end of the let.

> I guess that would
> require the stream not to keep a reference to the iter, but just a
> flag that an iter exists. Which in turn requires that the iter resets
> the flag when it goes out of scope. I don't even know if that is
> doable in the JVM.
>

Nope. You can't tie things like this to the lifetime of GC-able
entities, nor would you want to try to understand a system that did.

> > Again, I don't see the enormous side effect. Steams form a safe,
> > stateful pipeline, you'll generally only call seq on the end of the
> > pipe. If you ask for a seq on a stream you are asking for a (lazy)
> > reification. That reification and ownership is what makes the pipeline
> > safe.
>
> Then why not make a pipeline using lazy sequences right from the
> start? I don't see anything that I could do better with streams than
> with lazy sequences.
>

There are a couple of advantages. First, streams are faster, at least
2x faster. Since a lazy sequence must allocate per stage, a multi-
stage pipeline would incur multiple allocations per step. A stream
could be built that has no allocation other than the results. If your
calculations per step are significant, they'll dominate the time. but
when they are not, this allocation time matters.

Second, streams are fully lazy. Seqs could be made fully lazy, but
currently are not.

Third, stream iters currently provide transparent MT access. Doing the
same for a seq means wrapping it in a ref.

Rich

Rich Hickey

unread,
Jan 23, 2009, 8:50:08 AM1/23/09
to Clojure
I think you lose the game overall. The entire point of this mechanism
is to prevent the dissemination of stateful things. seq being a true
(referentially transparent) function of a stream is a critical part of
that. It ensures that when you've got a seq on a stream, you have
exclusive access to a pipe with no leaks. Inside the pipe, there may
be mutation and state, but no one can see them, since the end, seq,
view is persistent and immutable.

With what you are proposing:

(if (seq astream)
(do-something-with (first astream))

is broken.

More generally, I guess I simply don't understand these use cases for
treating the stream as a seq and subsequently mutating it. Maybe I
should rename them pipes and it would be less likely people would
expect them to divide and run all over the mountain :)

Seriously, is there a real use case for (first astream) ... (take1
astream) ?

You should presume there will be stream version of all the sequence
fns.

Rich

Rich Hickey

unread,
Jan 23, 2009, 9:01:10 AM1/23/09
to Clojure
I've made (stream astream) identity in SVN 1228 - that's easy to
understand, and I'm not sure my case will be all that common.

Rich

Christophe Grand

unread,
Jan 23, 2009, 9:35:54 AM1/23/09
to clo...@googlegroups.com
Rich Hickey a écrit :

> I think you lose the game overall.
I'm sorry if I sounded provocative, I was trying to better understand
the model you propose with streams. Thanks for your answer: it made
thinks clearer to me.

> With what you are proposing:
>
> (if (seq astream)
> (do-something-with (first astream))
>
> is broken.
>

Indeed you're right: astream can change between the two calls to (seq
astream).

> More generally, I guess I simply don't understand these use cases for
> treating the stream as a seq and subsequently mutating it.

It's not a use case, it's the mere angst of nasty bugs basically due to:
(seq astream)
...
(stream-iter astream)
not raising an exception when someone inadvertently mixing seq fns and
stream fns.

Now (rev 1228) I get an "Already iterating" exception so I'm happy.

Christophe

Rich Hickey

unread,
Jan 23, 2009, 9:40:29 AM1/23/09
to Clojure
Good. Let's proceed with that and see how it feels.

Thanks to all for the good feedback - keep it coming!

Rich

Konrad Hinsen

unread,
Jan 23, 2009, 12:09:30 PM1/23/09
to clo...@googlegroups.com
On Jan 23, 2009, at 14:04, Rich Hickey wrote:

>> Then why not make a pipeline using lazy sequences right from the
>> start? I don't see anything that I could do better with streams than
>> with lazy sequences.
>>
>
> There are a couple of advantages. First, streams are faster, at least
> 2x faster. Since a lazy sequence must allocate per stage, a multi-
> stage pipeline would incur multiple allocations per step. A stream
> could be built that has no allocation other than the results. If your
> calculations per step are significant, they'll dominate the time. but
> when they are not, this allocation time matters.
>
> Second, streams are fully lazy. Seqs could be made fully lazy, but
> currently are not.
>
> Third, stream iters currently provide transparent MT access. Doing the
> same for a seq means wrapping it in a ref.

Thanks for those explanations, that makes a lot of sense.

I just wonder about the performance aspect. If I have a pipeline
stage with very little computational cost, say adding 1 to every
element, the I would expect the overhead of the iter layer and the
thread-safeness to dominate CPU time anyway. Does an allocation
really add that much on top of that that it makes a difference?

Konrad.

e

unread,
Jan 23, 2009, 12:34:18 PM1/23/09
to clo...@googlegroups.com
people who love doing stream processing would attack an extra allocation.

chris

unread,
Jan 23, 2009, 2:28:36 PM1/23/09
to Clojure
I work for NVIDIA doing 3d graphics engines and editor platforms on
both PC and embedded platforms.

Konrad, the extra memory allocation is often the difference between
something fitting inside a cache line on a CPU and hitting main ram.
Last time I looked, I believe the difference is a factor of 100. This
is exacerbated on embedded platforms such as ARM where your on-chip
cache is anemic and you CPU is still quite bandwidth starved in terms
of main-memory.

The inability to exactly position datasets in memory is the key
performance difference between c++ and java in my book.

Creating some piece of information lazily that sits outside my current
dataset for no reason is absolutely not acceptable.

I have pretty full confidence that I could produce a java-based
graphics runtime that would work fine on most cell-phones, android or
otherwise and produce graphics on par with anything currently done in
C or any other language *if* I can pack objects into distinct
contiguous blocks of memory that I will then run through and
sequentially access each render step.

Currently Java doesn't give me this ability (for no obvious reason)
and thus I can't easily produce very performant code. Lazy sequences
hurt my limited ability (due to their extra memory allocation) to
control memory layout and access semantics of what I am doing and thus
have an extremely serious impact on what I am doing. C# and Mono,
btw, do allow this as their generics implementation is allows their
struct datatypes in lists. Should clojure provide a wrapper on the
trusty old int array that allows me to use it like an array of
structures then we would be at least halfway there, assuming I could
force JVM to put the array into contiguous memory.

Closures and runtime-generated code in general, btw, represent a
possible way for me to beat a C based runtime as I can create
specialized functions instead of having a bunch of branches and if-
statements. So there is a tradeoff going the other way.

Anyway, any extra memory allocation I don't specifically require is
going to have severe impacts if I can't disable it. So streams fit
the bill *much* better than lazy seqs.

I am apparently someone who loves doing stream processing.

Chris

On Jan 23, 10:34 am, e <evier...@gmail.com> wrote:
> people who love doing stream processing would attack an extra allocation.
>
> On Fri, Jan 23, 2009 at 12:09 PM, Konrad Hinsen
> <konrad.hin...@laposte.net>wrote:

Frantisek Sodomka

unread,
Jan 24, 2009, 5:25:19 PM1/24/09
to Clojure
Word "streams" invokes association to data-flow languages. For a
while, I was following Project V:

Simple Example of the Difference Between Imperative, Functional and
Data Flow.
http://my.opera.com/Vorlath/blog/2008/01/06/simple-example-of-the-difference-between-imperative-functional-and-data-flow

Project V Redefined
http://my.opera.com/Vorlath/blog/2008/04/04/project-v-redefined

Project V: Dealing With Components
http://my.opera.com/Vorlath/blog/2007/10/12/project-v-dealing-with-components

Is there any connection whatsoever? Could we reuse something in
Clojure? (I am still a newbie, so sorry for noise otherwise.)

Greetings, Frantisek

Mark H.

unread,
Jan 26, 2009, 2:57:53 PM1/26/09
to Clojure
On Jan 24, 2:25 pm, Frantisek Sodomka <fsodo...@gmail.com> wrote:
> Word "streams" invokes association to data-flow languages. For a
> while, I was following Project V:
>
> Simple Example of the Difference Between Imperative, Functional and
> Data Flow.http://my.opera.com/Vorlath/blog/2008/01/06/simple-example-of-the-dif...

Some complaints about the article:

1. The reference to the "repeated squaring" article is kind of silly.
Computing x^y iteratively is just unrolling a sequential recursion.
It's something you do either as an optimization or if your chosen
programming language doesn't support recursion (e.g., Fortran 77).

2. Furthermore, unlike what the article says, it's easy to parallelize
the iterative squaring procedure, using a scan operation. For P
processors, this requires creating P log P storage locations for
intermediate values, but you need at least P storage locations for the
pipelining approach anyway.

3. The "turning the call graph 90 degrees" diagram doesn't illustrate
the point of data flow for parallelism: it's about pipelining
operations on a stream of data, not just one datum ("x", in the
diagram). The diagram should show different values flowing through
the pipes.

The main complaint I have is that there's no need to see dataflow as a
radical new thing; it's just pipelining. There's task parallelism and
there's pipeline parallelism. Both are useful and of course it's nice
to have a language and runtime that can support both efficiently. The
problem in both cases is making sure that the optimization actually
improves performance. Both optimizations can fail if it's not
possible to find a good schedule that keeps all the processors busy.
For example, if you're mapping f(g(h(x))) over a collection, the
functions f, g, h had better take about the same time to complete. If
you're computing a(x) = b(x) + c(x) + d(x) with task parallelism,
you'll only get a 3x speedup if b, c, and d all take about the same
time to complete.

Pipelining is also a good sequential optimization. When you're
mapping multiple functions over the same collection, it avoids the
creation of temporary collections, and also avoids iterating over the
collection multiple times (thus saving things like memory bandwidth
and the cost of hash table lookups). SERIES is my favorite example of
a package (in ANSI Common Lisp) that performs this optimization; it's
a source-to-source translator that converts maps over collections into
iterative loops (recall that ANSI Common Lisp doesn't have tail
recursion).

mfh

Reply all
Reply to author
Forward
0 new messages