Well, it's the combination of a seq being persistent and a stream
being one-pass - to realize the seq you'll need that pass.
Also, seqs
are an acceptable escaping value from the stream mechanism.
>
> I made some tests, and if I am not mistaken, if an eos is not
> specifically specified, Object is used, is that right?
>
> user=>
> (let [iter (stream-iter (range 5))]
> (def s (stream (fn [eos]
> (let [x (next! iter eos)]
> (if (= eos x)
> (do (println eos) eos)
> x))))))
> #'user/s
> user=> (seq s)
> (0 1 2 3 #<Object java.lang.Object@25c828>
> 4)
You should never rely on nor care about the value of eos, just return
it if you are done.
It is always specified by some code, after all, it is a required arg
of the generator. It happens that in this case the consumer code is in
AStream.Seq, and is an Object.
Rich
Now imagine two threads T1 and T2 accessing this generator at the same
time. Suppose they reach the same node at the same time, and suppose
that you've protected file deletion and link deletion each
individually with a mutex (and forbade multiple deletions silently).
T1 might delete the file and the link and get to "recur next" before
T2 can call "(next-entry link)". Now T1 goes on but T2 is stuck:
"link" is _gone_ so "(next-entry link)" goes nowhere.
> I've started documenting the streams work I have been doing, for those
> interested:
>
> http://clojure.org/streams
Nice!
I have played a bit with the stream implementation, and I came across
a behaviour that I do not understand:
First, define a random stream that calls rand, and an iter on it:
(def rand-stream (stream (fn [_] (rand))))
(def rand-iter (stream-iter rand-stream))
Calling it a few times shows that it works:
(next! rand-iter nil)
(next! rand-iter nil)
Next, try to use it as a seq:
(take 4 rand-stream)
This fails, as it should:
java.lang.IllegalStateException: Already iterating (NO_SOURCE_FILE:0)
Detach the iter and try again:
(detach! rand-iter)
(take 4 rand-stream)
Now it works - fine. But what happened to the seq that now owns the
stream? Nothing refers to it, so it should be gone. Did it perhaps
liberate the stream, so that I can create an iter again? Let's try:
(def rand-iter (stream-iter rand-stream))
(next! rand-iter nil)
(next! rand-iter nil)
It seems so. But... let's be mean:
(take 4 rand-stream)
I would expect this to throw the IllegalStateException again, but it
doesn't: it returns the same four-number sequence as the last time it
was called. Where was that one stored? In the stream itself? Or does
the stream keep a reference to the seq, so that it never disappears?
But then I shouldn't be able to create another iterator.
Let's be mean again:
(next! rand-iter nil)
(take 10 rand-stream)
(next! rand-iter nil)
(next! rand-iter nil)
(take 15 rand-stream)
All of these work - it seems I have both an iter and a seq on the
same stream, with the iter returning values that are also in the seq.
Konrad.
>> Now it works - fine. But what happened to the seq that now owns the
>> stream? Nothing refers to it, so it should be gone.
>
> No, the stream must refer to it, in order to keep its promise to
> return the same seq every time.
OK.
>> Did it perhaps
>> liberate the stream, so that I can create an iter again? Let's try:
>>
>> (def rand-iter (stream-iter rand-stream))
>> (next! rand-iter nil)
>> (next! rand-iter nil)
> What you've created an iter on the second time is the seq of the
> stream. Right now, once you've treated a stream as a seq it will
> always behave like one. So this second stream-iter call actually
> creates an iter on a stream on that seq.
Does that mean that calling seq on a stream converts the stream into
a seq for all practical purposes? That sounds a bit dangerous
considering that so many operations in Clojure call seq implicitly.
One can easily have a seq "steal" a stream and not notice it before
all memory is used up by the seq.
> I understand this may not be intuitive or clear yet from the docs. Nor
> am I set in this being the behavior. The case I am looking towards is
> this one:
>
> (def s (stream (range 10)))
> (if (seq s)
> (take 4 (map-stream inc s))
>
> A stream is used as a seq and then passed to a stream function.
> Without this seqed-stream-behaves-as-seq capability, this will fail
> with Already iterating, and would have to be written:
>
> (if (seq s)
> (take 4 (map-stream inc (seq s))))
I think the second is in fact clearer. It seems weird in a largely
functional context to have an enormous side-effect of calling seq on
a stream.
Konrad.
>
> On 22.01.2009, at 16:27, Rich Hickey wrote:
>
>>> Now it works - fine. But what happened to the seq that now owns the
>>> stream? Nothing refers to it, so it should be gone.
>>
>> No, the stream must refer to it, in order to keep its promise to
>> return the same seq every time.
>
> OK.
>
>>> Did it perhaps
>>> liberate the stream, so that I can create an iter again? Let's try:
>>>
>>> (def rand-iter (stream-iter rand-stream))
>>> (next! rand-iter nil)
>>> (next! rand-iter nil)
>
>> What you've created an iter on the second time is the seq of the
>> stream. Right now, once you've treated a stream as a seq it will
>> always behave like one. So this second stream-iter call actually
>> creates an iter on a stream on that seq.
>
> Does that mean that calling seq on a stream converts the stream into
> a seq for all practical purposes? That sounds a bit dangerous
> considering that so many operations in Clojure call seq implicitly.
> One can easily have a seq "steal" a stream and not notice it before
> all memory is used up by the seq.
>
Calling seq on a stream yields a seq that will forever own the stream
- if you think about it a bit, you'll see why that has to be the case.
OTOH, that seq is lazy, so I'm not sure what the memory issue is.
>> I understand this may not be intuitive or clear yet from the docs.
>> Nor
>> am I set in this being the behavior. The case I am looking towards is
>> this one:
>>
>> (def s (stream (range 10)))
>> (if (seq s)
>> (take 4 (map-stream inc s))
>>
>> A stream is used as a seq and then passed to a stream function.
>> Without this seqed-stream-behaves-as-seq capability, this will fail
>> with Already iterating, and would have to be written:
>>
>> (if (seq s)
>> (take 4 (map-stream inc (seq s))))
>
> I think the second is in fact clearer. It seems weird in a largely
> functional context to have an enormous side-effect of calling seq on
> a stream.
>
Again, I don't see the enormous side effect. Steams form a safe,
stateful pipeline, you'll generally only call seq on the end of the
pipe. If you ask for a seq on a stream you are asking for a (lazy)
reification. That reification and ownership is what makes the pipeline
safe.
I am working on seq/stream api unification right now, and we will see
how often we'll be calling seq fns yet subsequently using as a stream.
Many of those places where seq is called will now call stream instead
(e.g. sequence fn entry points), and there may be a non-generator-
capturing function for determining eos.
Rich
>
> On Jan 21, 2:33 pm, Rich Hickey <richhic...@gmail.com> wrote:
>> I've started documenting the streams work I have been doing, for
>> those
>> interested:
>
> Cool! 3 questions:
>
> 1. Can you feed things into a stream?
Yes, you can put a generator on the end of a queue.
>
>
> 2. Could streams be used for I/O?
>
Yes, that's one of the primary use cases.
> 3. Can streams have clean-up/close code when they are emptied or go
> out of scope?
I'm addressing the resource cleanup issue more generally in a scope
mechanism, also present in the streams SVN branch. Still in progress,
but you can get the gist of it here:
http://paste.lisp.org/display/73838
Rich
>>
>> Does that mean that calling seq on a stream converts the stream into
>> a seq for all practical purposes? That sounds a bit dangerous
>> considering that so many operations in Clojure call seq implicitly.
>> One can easily have a seq "steal" a stream and not notice it before
>> all memory is used up by the seq.
>>
>
> Calling seq on a stream yields a seq that will forever own the stream
> - if you think about it a bit, you'll see why that has to be the case.
>
> OTOH, that seq is lazy, so I'm not sure what the memory issue is.
If my understanding is correct, then
(def rand-stream (stream (fn [_] (rand))))
(take 5 rand-stream)
will create a seq on the stream that is referenced by the stream. As
long as the stream is referenced by a var, the seq will remain
referenced as well. Seqs being cached, this means that the whole
random number sequence will be kept in memory.
The only way to avoid this seems to be not calling any sequence
function on a stream. I could use for example
(defn take-stream
[n s]
(let [iter (stream-iter s)
eos (Object.)
vs (doall (for [_ (range n)] (next! iter eos)))]
(do (detach! iter) vs)))
(take-stream 5 rand-stream)
Writing take-stream made me discover another pitfall: the stream
seems to keep a reference to its iter object as well, meaning that is
never released without an explicit call to detach!. I had expected to
be able to create a "local" iter in a let and have it disappear and
release the stream when it goes out of scope. I guess that would
require the stream not to keep a reference to the iter, but just a
flag that an iter exists. Which in turn requires that the iter resets
the flag when it goes out of scope. I don't even know if that is
doable in the JVM.
> Again, I don't see the enormous side effect. Steams form a safe,
> stateful pipeline, you'll generally only call seq on the end of the
> pipe. If you ask for a seq on a stream you are asking for a (lazy)
> reification. That reification and ownership is what makes the pipeline
> safe.
Then why not make a pipeline using lazy sequences right from the
start? I don't see anything that I could do better with streams than
with lazy sequences.
Konrad.
(defn touch [s] (seq s) s)
(def s1 (stream (range 10)))
user=> (take1 s1)
0
user=> (take1 s1)
1
user=> (take1 s1)
2
user=> (touch s1)
#<AStream clojure.lang.AStream@19ee8a>
user=> (take1 s1)
3
user=> (take1 s1)
3
user=> (take1 s1)
3
; s1 is stuck on 3 because stream-iter returns a new iter on a new
stream on the canonical seq for s1
With the attached patch, you get:
user=> (defn take1 [s]
(let [i (stream-iter s)
n (next! i nil)]
(detach! i)
n))
(def s1 (stream (range 10)))
user=> (take1 s1)
0
user=> (take1 s1)
1
user=> (take1 s1)
2
user=> (seq s1)
(3 4 5 6 7 8 9)
user=> (identical? (seq s1) (seq s1))
true
user=> (take1 s1)
3
user=> (take1 s1)
4
user=> (first s1)
5
;; seq lookup or realization don't consume the stream:
user=> (seq s1)
(5 6 7 8 9)
user=> (identical? (seq s1) (seq s1))
true
user=> (first s1)
5
user=> (take1 s1)
5
user=> (take1 s1)
6
I relaxed the constraint saying that "a stream ensures that /*every call
to seq on a stream will return the same seq" to be */"a stream ensures
that /*every call to seq on a stream will return the same seq as long as
the stream state doesn't change".*/
/*What did I lose?
Christophe
*/
I relaxed the constraint saying that "a stream ensures that every call
to seq on a stream will return the same seq" to be "a stream ensures
that every call to seq on a stream will return the same seq as long as
the stream state doesn't change".
What did I lose?
Christophe
ser=> (def s1 (stream (range 10)))
#'user/s1
user=> (take1 s1)(take1 s1)(take1 s1)
0
1
2
user=> (def seq1 (seq s1))
#'user/seq1
user=> seq1
(3 4 5 6 7 8 9)
user=> (take1 s1)(take1 s1)(take1 s1)
3
4
5
user=> (identical? (seq s1) (drop 3 seq1))
true
> With what you are proposing:
>
> (if (seq astream)
> (do-something-with (first astream))
>
> is broken.
>
Indeed you're right: astream can change between the two calls to (seq
astream).
> More generally, I guess I simply don't understand these use cases for
> treating the stream as a seq and subsequently mutating it.
It's not a use case, it's the mere angst of nasty bugs basically due to:
(seq astream)
...
(stream-iter astream)
not raising an exception when someone inadvertently mixing seq fns and
stream fns.
Now (rev 1228) I get an "Already iterating" exception so I'm happy.
Christophe
>> Then why not make a pipeline using lazy sequences right from the
>> start? I don't see anything that I could do better with streams than
>> with lazy sequences.
>>
>
> There are a couple of advantages. First, streams are faster, at least
> 2x faster. Since a lazy sequence must allocate per stage, a multi-
> stage pipeline would incur multiple allocations per step. A stream
> could be built that has no allocation other than the results. If your
> calculations per step are significant, they'll dominate the time. but
> when they are not, this allocation time matters.
>
> Second, streams are fully lazy. Seqs could be made fully lazy, but
> currently are not.
>
> Third, stream iters currently provide transparent MT access. Doing the
> same for a seq means wrapping it in a ref.
Thanks for those explanations, that makes a lot of sense.
I just wonder about the performance aspect. If I have a pipeline
stage with very little computational cost, say adding 1 to every
element, the I would expect the overhead of the iter layer and the
thread-safeness to dominate CPU time anyway. Does an allocation
really add that much on top of that that it makes a difference?
Konrad.