(defn pipe-xform [in-ch out-ch xform]
(let [tr
(let [tr (xform (fn
([result] result)
([result input] (conj! result input))))]
(fn
([] (locking tr (persistent! (tr (transient [])))))
([input] (locking tr (persistent! (tr (transient []) input))))))]
(go-loop []
(if-some [value (<! in-ch)]
(do (doseq [v (tr value)]
(>! out-ch v))
(recur))
(do (doseq [v (tr)]
(>! out-ch v))
(close! out-ch))))))
(let [xf-ch (chan 1 xform)]
(pipe in-ch xf-ch)
(pipe xf-ch out-ch)
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
(let [f (fn [i x] (println (str "i " i " " (Thread/currentThread))) (flush) x) r-map-indexed #(r/folder %2 (map-indexed %1))] (->> [6 7 8 9 10] (r-map-indexed f) (r/fold 1 (fn ([] (vector)) ([x] x) ([a b] (into a b))) conj)))
i 0 Thread[ForkJoinPool-1-worker-2,5,main]
i 2 Thread[ForkJoinPool-1-worker-1,5,main]
i 3 Thread[ForkJoinPool-1-worker-1,5,main]
i 4 Thread[ForkJoinPool-1-worker-1,5,main]
i 1 Thread[ForkJoinPool-1-worker-3,5,main]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscribe@googlegroups.com.
... acquir[ing a] monitor ... has the effect of invalidating the local processor cache so that variables will be reloaded from main memory. We will then be able to see all of the writes made visible by the previous release.
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+unsubscribe@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
(defn map-indexed-transducer-base [f box-mutable inc-mutable]
(fn [rf]
(let [i (box-mutable -1)]
(fn
([] (rf))
([result] (rf result))
([result input]
(rf result (f (inc-mutable i) input)))))))
(defn map-indexed-transducer-single-threaded [f]
(map-indexed-transducer-base f unsynchronized-mutable-long! #(unsynchronized-mutable-swap! % inc))
(defn map-indexed-transducer-sequentially-multi-threaded [f]
(map-indexed-transducer-base f volatile! #(vswap! % inc))
(defn map-indexed-transducer-concurrently-multi-threaded [f]
(map-indexed-transducer-base f atom #(swap! % inc)) ; or an AtomicLong variant
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscribe@googlegroups.com.
(defn broken-reducers-map-indexed [f coll]
(r/folder coll (map-indexed-transducer-concurrently-multi-threaded f)))
(->> (range 10 20) vec (broken-reducers-map-indexed vector) (r/fold ...))
(defn non-foldable-reducers-map-indexed [f coll]
(r/reducer coll (core/map-indexed f)))
(->> (range 10 20) vec (non-foldable-reducers-map-indexed vector) (fold ...)) ; won't employ parallelism
(defn reducers-map-implemented-with-transducer [f coll]
(r/folder coll (core/map f)))
(->> (range 10 20)
(r/map ...)
(reducers-map-indexed vector)
...
(fold ...))
(defn reduce-indexed [f init xs]
(reduce (let [i (mutable-long -1)]
(fn [ret x] (f ret (mutable-swap! i inc) x)))
init xs))
(go (let [x (unsynchronized-mutable 0)]
(println "thread 1" (Thread/currentThread))
(reset! x 1)
(<! c) ; defined elsewhere
(println "possibly not thread 1" (Thread/currentThread))
(println "x =" @x))) ; possibly cached in memory and may thus still read as x=0, so maybe x should be `volatile!`
While transducing processes may provide locking to cover the visibility of state updates in a stateful transducer, transducers should still use stateful constructs that ensure visibility (by using volatile, atoms, etc).
As an aside about the stateful `take` transducer, Tesser uses the equivalent of one but skirts the issue by not guaranteeing that the first n items of the collection will be returned, but rather, n items of the collection in no particular order and starting at no particular index. This is achievable without Tesser by simply replacing the `volatile` in the `core/take` transducer with an `atom` and using it with `fold`. But yes, `take`'s contract is broken with this and so still follows the rule of thumb you established that `fold` can't use stateful transducers (at least, not without weird things like reordering of the indices in `map-indexed` and so on).
That's interesting that `fold` can use transducers directly! I haven't tried that yet — I've just been wrapping them in an `r/folder`.
On Monday, April 10, 2017 at 12:39:37 PM UTC-4, Alex Miller wrote:
Oh, you still need r/folder, sorry! Something like:(r/fold + (r/folder v (map inc)))
Léo, I definitely agree that you can use unsynchronized mutable stateful transducers as long as you can guarantee they'll be used only in single-threaded contexts.
What you said holds for reduction but not necessarily a parallel fold (see clojure.core.reducers/fold).
These kinds of failures are inherently difficult to reproduce unless the code is in production and you're on vacation. ;)
Léo, I definitely agree that you can use unsynchronized mutable stateful transducers as long as you can guarantee they'll be used only in single-threaded contexts.
The problem is at a lower level. The memory model of the JVM doesn't guarantee that changes to an unsynchronized non-volatile reference are visible to other threads.
A transducing process could apply each step of the transduce using a thread from a pool and also not use a memory barrier
Transducers included in core cannot make the assumption that they will only be used that way.
Yes, that makes sense that you can't make that assumption.
What you said holds for reduction but not necessarily a parallel fold (see clojure.core.reducers/fold).Exactly, and that's why stateful transducers are explicitly forbidden in fold and in core.async pipeline functions.This is not related to memory visibility, this is due to the fact that stateful transducers force the reducing process to be sequential.Does it make any sense to parallelize map-indexed ? partition-all ? dedupe ?
Léo, I definitely agree that you can use unsynchronized mutable stateful transducers as long as you can guarantee they'll be used only in single-threaded contexts.The problem is at a lower level. The memory model of the JVM doesn't guarantee that changes to an unsynchronized non-volatile reference are visible to other threads.The Java Memory Model allows using unsynchronized variables to share data across threads as long as a memory barrier is set between the writer and the reader. For example, in the case of core.async, the channel lock sets a barrier, and there is also (redundant) barriers for each volatile inside transducers.
A transducing process could apply each step of the transduce using a thread from a pool and also not use a memory barrierTransducers included in core cannot make the assumption that they will only be used that way.Yes, that makes sense that you can't make that assumption.This is the key point : what assumptions a transducer can make ?
In my opinion, it is reasonable for a stateful transducer to assume that the transducing context will fulfill the contract of "always passing the result of step n to the first argument of step n+1".
This assumption is powerful because it guarantees that there will always be a memory barrier between two successive steps.Proof (reductio ad absurdum) : without a memory barrier, the result of the step n wouldn't be visible to the (potentially different) thread performing the step n+1.
So here is my question to the language designers : is it reasonable to assume that ?
If yes, that means it's ok to use unsynchronized variables in stateful transducers as long as they stay local.
If no, that means we'll use synchronization in all stateful transducers, with an obvious performance penalty and a benefit that remains unclear.
I think you present a key question: what assumptions can a transducer make? We know the standard ones, but what of memory barriers?
Based on the current implementation, in terms of concurrency, it seems to make (inconsistent — see also `partition-by`) guarantees that sequential writes and reads will be consistent, no matter what thread does the reads or writes. Concurrent writes are not supported. But should sequential multi-threaded reads/writes be supported?
This is a question best left to Alex but I think I already know the answer based on his conversation with Rich: it's part of the contract.I think another key question is, is the channel lock memory barrier part of the contract of a core.async channel implementation?
The JVM is pretty good at minimizing this stuff - so while you are stating these barriers are redundant and are implying that's an issue, it would not surprise me if the JVM is able to reduce or eliminate the impacts of that. At the very least, it's too difficult to reason about without a real perf test and numbers.
Fast wrong results are still wrong. I do not think it's at all obvious how this affects performance without running some benchmarks. Volatiles do not require flushing values to all cores or anything like that. They just define constraints - the JVM is very good at optimizing these kinds of things. It would not surprise me if an uncontended thread-contained volatile could be very fast (for the single-threaded transducer case) or that a volatile under a lock would be no worse than the lock by itself.
A transducer can assume it will be invoked by no more than one thread at a time
Transducers should ensure stateful changes guarantee visibility. That is: you should not make assumptions about external memory barriers.
You're conflating the stateful values inside the transducer with the state returned by and passed into a transducer. That's a linkage that does not necessarily exist.
volatile! is what ensures that there's a memory barrier.
Transducers should ensure stateful changes guarantee visibility. That is: you should not make assumptions about external memory barriers.How do you enforce no more than one thread at a time without setting a memory barrier ?
For the JMM, no more than one thread at a time means exactly that return of step n will *happen-before* the call to step n+1.
This implies that what was visible to the thread performing step n will be visible to the thread performing the step n+1, including all memory writes performed during step n inside stateful transducers.
Still no need for extra synchronization.
You're conflating the stateful values inside the transducer with the state returned by and passed into a transducer. That's a linkage that does not necessarily exist.What do you mean ? How could a function return a value without having executed its body ?
I could have one thread that invokes a transduce step on odd seconds and another that invokes on even seconds. Or some external api call that tells me to take the next step, which I do on a thread pulled from a pool.
While this is a good reference, it's also 13 years old and the JMM has been updated since then. A much better reference explaining the semantics and constraints is:
https://shipilev.net/blog/2014/jmm-pragmatics/
In particular, even if there is a memory barrier, there are some reorderings allowed if the transducer state is not volatile that may be surprising. Making it volatile adds a critical edge in the total program order.
I'm saying that the logical ordering of steps is irrelevant wrt how a multi-threaded program can be optimized/reordered under the JMM.
happens-before across threads requires a volatile or lock, but I don't see how the use of one is guaranteed by this logical ordering.
(def xf (comp (partition-all 64) cat))
(defn ! [f & args] (apply f args) f)
(defn run [m n]
(let [p (promise)
f (reduce (fn [f _] (partial send (agent f) (xf !)))
#(when (zero? %) (deliver p nil)) (range m))]
(doseq [i (reverse (range n))] (f i))
(f)
@p))
(run 1000 1000)
Seems risky to depend on that. eduction creates an iterable for example - it has no way of preventing somebody from creating the iterator on one thread and consuming it on another.