Using transducers in a new transducing context

640 views
Skip to first unread message

shintotomoe

unread,
Jan 1, 2015, 8:55:55 PM1/1/15
to clo...@googlegroups.com
I was wondering how to apply a transducer inside a go process. What I've so far is the following

(defn pipe-xform [in-ch out-ch xform]
 
(let [tr
       
(let [tr (xform (fn
                         
([result] result)
                         
([result input] (conj! result input))))]
         
(fn
           
([] (locking tr (persistent! (tr (transient [])))))
           
([input] (locking tr (persistent! (tr (transient []) input))))))]
   
(go-loop []
     
(if-some [value (<! in-ch)]
       
(do (doseq [v (tr value)]
             
(>! out-ch v))
           
(recur))
       
(do (doseq [v (tr)]
             
(>! out-ch v))
           
(close! out-ch))))))

Now, I could just do

(let [xf-ch (chan 1 xform)]
 
(pipe in-ch xf-ch)
 
(pipe xf-ch out-ch)


Or just redesign my code so that I can create in-ch or out-ch with the transducer directly, but I was wondering whether there are any obvious flaws with the pipe-xform implementation.

In particular, I was wondering about the locking. At first I was under the impression that transducers are thread-safe due to the use of volatiles, but looking at the partition-all transducer, which uses an ArrayList for its state, It appears that's not the case.

Any feedback greatly appreciated.

Timothy Baldridge

unread,
Jan 1, 2015, 8:58:51 PM1/1/15
to clo...@googlegroups.com
Core.async already has pipeline, pipeline-blocking and pipeline-async. In addition you can use a transducer inside a channel. Use those instead. 

Timothy

--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
“One of the main causes of the fall of the Roman Empire was that–lacking zero–they had no way to indicate successful termination of their C programs.”
(Robert Firth)

shintotomoe

unread,
Jan 1, 2015, 10:36:13 PM1/1/15
to clo...@googlegroups.com
Thank you for the superfast response. I take it implementing your own transducing process is not something you would usually do unless you have a unique use case (my own use case being already implemented by chan taking a transducer).

Still, I was wondering about the use of ArrayList in partition-all, and the recommendation to use volatiles inside transducers, which seem at odds. It seems we don't need to implement transducers in a thread-safe way. Is that correct?

Alexander Gunnarson

unread,
Apr 9, 2017, 2:10:06 AM4/9/17
to Clojure
I was wondering the same thing, shintotomoe. This thread talks about it as well. I think it's safe to assume that since `ArrayList` uses unsynchronized mutability internally (a quick review of the GrepCode entry for `ArrayList` confirms this), then we can rest assured that a `volatile` box as opposed to a totally unsynchronized mutable variable is unnecessary, even in the context of `fold`. After all, `reduce` (and by extension, `transduce`) is only ever going to be single-threaded unless the data structure in question unexpectedly implements a multithreaded reduce, which should never happen (and if it does, you likely have bigger problems). To be honest, I'm not sure why `volatile` is used in transducers instead of e.g. an `unsynchronized-mutable` box. There may be a good reason, but I'm not seeing it immediately. I'd love to learn.

Alexander Gunnarson

unread,
Apr 9, 2017, 2:49:40 AM4/9/17
to Clojure
EDIT: Transducers are actually not safe in `fold` contexts as I thought:

(let [f (fn [i x] (println (str "i " i " " (Thread/currentThread))) (flush) x)
      r-map-indexed #(r/folder %2 (map-indexed %1))]
  (->> [6 7 8 9 10]
       (r-map-indexed f)
       (r/fold 1 (fn ([] (vector)) ([x] x) ([a b] (into a b))) conj)))

Produces:

0 Thread[ForkJoinPool-1-worker-2,5,main]
2 Thread[ForkJoinPool-1-worker-1,5,main]
3 Thread[ForkJoinPool-1-worker-1,5,main]
4 Thread[ForkJoinPool-1-worker-1,5,main]
1 Thread[ForkJoinPool-1-worker-3,5,main]

So you would have to be careful to e.g. create different `map-indexed` transducers for single-threaded (e.g. `unsynchronized-mutable` box) and multi-threaded (e.g. `atom` box) contexts.

Timothy Baldridge

unread,
Apr 9, 2017, 9:47:25 AM4/9/17
to clo...@googlegroups.com
The volatile! is needed for the case where a transducer is only used by one thread at a time, but the thread executing the transducer may change from one call to the next. This happens fairly often with core.async. If you used a non-atomic, non-volatile mutable field, the JVM would be free to perform several optimizations (like keeping the local in a CPU register) that would cause the value to not properly propagate to other threads in the case of a context switch. Using volatile! tells the JVM to flush all writes to this field by the time the next memory barrier rolls around. It also tells the JVM to make sure it doesn't cache the reads to this field across memory barriers. 

It's a tricky subject, and one that's really hard to test, and frankly I probably got some of the specifics wrong in that last paragraph, but that's the general idea of why transducers use volatile!. 

Timothy


For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Seth Verrinder

unread,
Apr 9, 2017, 11:13:35 AM4/9/17
to Clojure
My guess is that partition-all and partition use non-volatile references because none of the built-in stuff will return control back to the caller at a finer resolution than output value (AFAIK). That's why take needs volatile but partition-all doesn't (because for take the state persists between output values).
It does mean that new transducing contexts would need to synchronize though - which core.async does through mutexes.

Alexander Gunnarson

unread,
Apr 9, 2017, 12:58:00 PM4/9/17
to Clojure
Thanks so much for your well-considered reply, Timothy! That makes sense about volatiles being used in e.g. core.async or core.reducers contexts where the reducing function that closes over the mutable value of the stateful transducer is called in different threads. Why, then, are unsynchronized ArrayLists used e.g. in 'partition-by'? It's also closed over by the reducing function in just the same way as the volatile long value internal to e.g. 'map-indexed'. I'm not yet clear on how one (the ArrayList) is acceptable being non-volatile and the other (the volatile long) is unacceptable. When .add is called, an unsynchronized mutable counter is updated so the ArrayList can insert the next value at the correct index. Do you have any insight into this? Meanwhile I'll go do some digging myself on the Clojure JIRA etc. so I'm more informed on the subject.

Seth Verrinder

unread,
Apr 9, 2017, 1:56:38 PM4/9/17
to Clojure
I'll defer to Timothy on the particulars of core.async but it looks like [1] the transducer in channel is protected by a lock. If that's the case volatile isn't adding anything in terms memory barriers.

Alexander Gunnarson

unread,
Apr 9, 2017, 6:19:51 PM4/9/17
to Clojure
It looks that way to me too, Seth, though I'd have to comb over the details of the locks implemented there to give a reasoned opinion of my own. But yes, if that's the case, the volatile isn't adding anything.

Anyway, I'm not trying to poke holes in the current implementation of transducers — on the contrary, I'm very appreciative of and impressed by the efforts the clojure.core (and core.async) contributors have made on that and other fronts. Transducers are an extremely powerful and elegant way to express code that would otherwise be a lot more complex and difficult to reason about. I'm just trying to figure out where I can get away with having unsynchronized mutable versions of stateful transducers that currently use volatiles, and where I need even stronger measures of thread safety than volatiles.

To take these thoughts further, I did a simple test to compare the three types of mutability we've been talking about (unsynchronized, volatile, and atomic — I can reproduce the code here if you'd like) and the takeaway is that `map-indexed` really does rely on atomic operations in a multithreaded context, as each index depends on the previous index value. When doing a `volatile`-based `map-indexed` in parallel on a small collection (8 elements), the `volatile` value stays consistent — that is, all the correct indices are passed to the mapping function. However, over a sufficiently large collection (100 elements, though it could happen on smaller scales too), the `volatile` value starts to break down: duplicate index values are passed to the mapping function and the highest index value only ever reaches 97 at the maximum. The same phenomenon happens, of course, with the unsynchronized-mutable-box-based `map-indexed`, though it happens at a small scale too (calling the unsynchronized `map-indexed` on 8 elements operated on by 2 threads produces only 7 unique indices).

My preliminary conclusions are:
- Unsynchronized mutability is fine in contexts known to be only single-threaded, in which I could replace the `volatile` in `map-indexed` and other transducers with unsynchronized mutable boxes.
- Volatiles are good when all you want to do is set the value and have multiple threads always read the most up-to-date value, without having to depend on a previous value via e.g. `inc`.
- Atomic boxes (`atom`, `AtomicLong`, etc.) are necessary when the mutable value relies on the previous value via e.g. `inc`, as is the case with `map-indexed`.

My guess is that all this applies to e.g. the unsynchronized `ArrayList` in `partition-by` as well, which might need to be a synchronized collection or an immutable one boxed in an atom, but I haven't tested this.

Would you agree with these conclusions, Seth and Timothy?

Alexander Gunnarson

unread,
Apr 9, 2017, 6:39:13 PM4/9/17
to Clojure
I should add that, as Timothy pointed out, if multiple threads mutate and read the value but only one ever does so at a time, as is the case in `core.async`, then a volatile is sufficient. My preliminary conclusions above about volatiles apply only to concurrent mutation via e.g. `fold` or the like.

Also, regarding the locks you mentioned, Seth, I read up a little on the Java memory model here and I can confirm that a lock is sufficient to provide *both* write *and* read thread-safety guarantees: 

... acquir[ing a] monitor ... has the effect of invalidating the local processor cache so that variables will be reloaded from main memory. We will then be able to see all of the writes made visible by the previous release.

`Volatile` only provides a subset of these read-safety guarantees, so a `volatile` in addition to a lock is indeed overkill, if that's what is happening.

Timothy Baldridge

unread,
Apr 9, 2017, 6:47:46 PM4/9/17
to clo...@googlegroups.com
Transducers were never designed to work in parallel context. So I'd define any behavior that arises from using the same transducers in multiple threads *at the same time*, as undefined behavior. 

--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+unsubscribe@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Alexander Gunnarson

unread,
Apr 9, 2017, 6:57:40 PM4/9/17
to Clojure
That makes sense about them not being designed for that use case. I would add, though, that transducers could certainly be used in a parallel context *if* the current transducer implementations were abstracted such that you could pass internal state generator and modifier functions and use the correct ones in whichever context is appropriate (single-threaded read/write, sequentially multi-threaded read/write à la core.async, concurrently multi-threaded read/write à la core.reducers). In the case of `map-indexed`, the fact that its transducer uses a volatile as currently implemented is not part of the `map-indexed` "contract", if you will, and seems to me to be an implementation detail. One could just as easily write the transducer for `map-indexed` as below:

(defn map-indexed-transducer-base [f box-mutable inc-mutable]
 
(fn [rf]
   
(let [i (box-mutable -1)]
     
(fn
       
([] (rf))
       
([result] (rf result))
       
([result input]
         
(rf result (f (inc-mutable i) input)))))))

(defn map-indexed-transducer-single-threaded [f]
 
(map-indexed-transducer-base f unsynchronized-mutable-long! #(unsynchronized-mutable-swap! % inc))

(defn map-indexed-transducer-sequentially-multi-threaded [f]
 
(map-indexed-transducer-base f volatile! #(vswap! % inc))

(defn map-indexed-transducer-concurrently-multi-threaded [f]
 
(map-indexed-transducer-base f atom #(swap! % inc)) ; or an AtomicLong variant

For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Timothy Baldridge

unread,
Apr 9, 2017, 7:16:18 PM4/9/17
to clo...@googlegroups.com
In your example transducer, the problem is with the `result` parameter. The specification of transducers is that the result of `(rf result x)` should be fed into the next call to `rf`. In other words: (-> result (rf x1) (rf x2) (rf x3))` trying to do that in a parallel context is next to impossible. Not saying there isn't a way to code a transducer-like thing to work with multiple threads, but the result of that would look a lot more like core.async or Reactive Extensions, than the transducers we have today. 


For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Alexander Gunnarson

unread,
Apr 9, 2017, 9:39:35 PM4/9/17
to Clojure
You make a very good point. I had been under the misimpression that you could make an `r/folder` out of any thread-safe transducer like so, and it would work out of the box:

(defn broken-reducers-map-indexed [f coll]
 
(r/folder coll (map-indexed-transducer-concurrently-multi-threaded f)))

and then you could use it like so:

(->> (range 10 20) vec (broken-reducers-map-indexed vector) (r/fold ...))

However, while the indices *do* all appear (unlike in the case of the `volatile`-using transducer), they are out of order, unlike the (indexed) elements of the original range which do not rely on a stateful transducer to keep track of the current index. So you're right — a subtler implementation is required here that sadly isn't as simple as I had thought (just reusing transducers).

You could wrap stateful transducers in an `r/reducer` for use with the threading macro, but they wouldn't be foldable:

(defn non-foldable-reducers-map-indexed [f coll]
 
(r/reducer coll (core/map-indexed f)))

(->> (range 10 20) vec (non-foldable-reducers-map-indexed vector) (fold ...)) ; won't employ parallelism

That said, it seems to me that you *can* use stateless transducers like `map` in any context (single-threaded, sequentially multi-threaded, or concurrent) and get consistent results:

(defn reducers-map-implemented-with-transducer [f coll]
 
(r/folder coll (core/map f)))

I guess the pipe dream of writing any transducer, stateful or not, and getting a parallel-ready transformation out of it by wrapping it in an `r/folder` is gone. If I remember correctly, the `tesser` library won't help here. I might end up coding up something to ameliorate the situation because I was planning on being able to just do e.g.

(->> (range 10 20)
     
(r/map ...)
     
(reducers-map-indexed vector)
     
...
     
(fold ...))

Anyway, thanks so much for your insights! I appreciate you taking the time to share them!

Alexander Gunnarson

unread,
Apr 9, 2017, 10:08:43 PM4/9/17
to Clojure
I do have one last question for you on the topic. I'm thinking of using the below function within a core.async context — specifically within a `go` block:

(defn reduce-indexed [f init xs]
 
(reduce (let [i (mutable-long -1)]
           
(fn [ret x] (f ret (mutable-swap! i inc) x)))
          init xs
))

Are you saying that *any* unsynchronized, non-volatile mutable operation is unsafe within a core.async context, or specifically within transducers when applied to e.g. `chan`? That is, when are thread context switches possible — specifically within `go` blocks where the blocking takes and puts are transformed into asynchronous takes and puts participating in a state machine, like so?:

(go (let [x (unsynchronized-mutable 0)]
     
(println "thread 1" (Thread/currentThread))
     
(reset! x 1)
     
(<! c) ; defined elsewhere
     
(println "possibly not thread 1" (Thread/currentThread))
     
(println "x =" @x))) ; possibly cached in memory and may thus still read as x=0, so maybe x should be `volatile!`

Thanks!

Alex Miller

unread,
Apr 9, 2017, 10:22:13 PM4/9/17
to Clojure
Hey all, just catching up on this thread after the weekend. Rich and I discussed the thread safety aspects of transducers last fall and the intention is that transducers are expected to only be used in a single thread at a time, but that thread can change throughout the life of the transducing process (for example when a go block is passed over threads in a pool in core.async). While transducing processes may provide locking to cover the visibility of state updates in a stateful transducer, transducers should still use stateful constructs that ensure visibility (by using volatile, atoms, etc).

The major transducing processes provided in core are transduce, into, sequence, eduction, and core.async. All but core.async are single-threaded. core.async channel transducers may occur on many threads due to interaction with the go processing threads, but never happen on more than one thread at a time. These operations are covered by the channel lock which should guarantee visibility. Transducers used within a go block (via something like transduce or into) occur eagerly and don't incur any switch in threads so just fall back to the same old expectations of single-threaded use and visibility.

Note that there are a couple of stateful transducers that use ArrayList (partition-by and partition-all). From my last conversation with Rich, he said those should really be changed to protect themselves better with volatile or something else. I thought I wrote up a ticket for this but looks like maybe I didn't, so I will take care of that. 

Reducer fold is interesting in that each "bucket" is reduced via its reduce function, which can actually use a transducer (since that produces a reduce function), however, it can't be a stateful transducer (something like take, etc).

Hope that helps with respect to intent.

Alexander Gunnarson

unread,
Apr 9, 2017, 10:44:00 PM4/9/17
to Clojure
Thanks so much for your input Alex! It was a very helpful confirmation of the key conclusions arrived at in this thread, and I appreciate the additional elaborations you gave, especially the insight you passed on about the stateful transducers using `ArrayList`. I'm glad that I wasn't the only one wondering about the apparent lack of parity between its unsynchronized mutability and the volatile boxes used for e.g. `map-indexed` and others.

As an aside about the stateful `take` transducer, Tesser uses the equivalent of one but skirts the issue by not guaranteeing that the first n items of the collection will be returned, but rather, n items of the collection in no particular order and starting at no particular index. This is achievable without Tesser by simply replacing the `volatile` in the `core/take` transducer with an `atom` and using it with `fold`. But yes, `take`'s contract is broken with this and so still follows the rule of thumb you established that `fold` can't use stateful transducers (at least, not without weird things like reordering of the indices in `map-indexed` and so on).

That's interesting that `fold` can use transducers directly! I haven't tried that yet — I've just been wrapping them in an `r/folder`.

Léo Noel

unread,
Apr 10, 2017, 9:37:29 AM4/10/17
to Clojure
This topic is of high interest to me as it is at the core of my current works. I had a similar questioning a while ago and I have to say I'm even more confused with this :

While transducing processes may provide locking to cover the visibility of state updates in a stateful transducer, transducers should still use stateful constructs that ensure visibility (by using volatile, atoms, etc).

I actually tried pretty hard to find a use case that would make partition-all fail because of its unsynchronized local state, and did not manage to find one that did not break any contract. I arrived at the conclusion that it is always safe to use unsynchronized constructs in stateful transducers. The reason is that you need to ensure that the result of each step is given to the next, and doing so you will necessarily set a memory barrier of some sort between each step. Each step happens-before the next, and therefore mutations performed by the thread at step n are always visible by the thread performing the step n+1. This is really brilliant : when designing a transducer, you can be confident that calls to your reducing function will be sequential and stop worrying about concurrency. You just have to ensure that mutable state stays local. True encapsulation, the broken promise of object-oriented programming.

My point is that the transducer contract "always feed the result of step n as the first argument of step n+1" is strong enough to safely use local unsynchronized state. For this reason, switching partition-* transducers to volatile constructs really sounds like a step backwards to me. However, after re-reading the documentation on transducers, I found that this contract is not explicitly stated. It is just *natural* to think this way, because transducers are all about reducing processes. Is there a plan to reconsider this principle ? I would be very interested to know what Rich has in mind that could lead him to advise to overprotect local state of transducers.

adrian...@mail.yu.edu

unread,
Apr 10, 2017, 11:59:50 AM4/10/17
to Clojure
What you said holds for reduction but not necessarily a parallel fold (see clojure.core.reducers/fold). 

Alex Miller

unread,
Apr 10, 2017, 12:31:06 PM4/10/17
to Clojure
I don't agree with your conclusions. :) 

A transducing process could apply each step of the transduce using a thread from a pool and also not use a memory barrier - in that scenario visibility across threads would not be ensured. These kinds of failures are inherently difficult to reproduce unless the code is in production and you're on vacation. ;)

Alex Miller

unread,
Apr 10, 2017, 12:39:37 PM4/10/17
to Clojure


On Sunday, April 9, 2017 at 9:44:00 PM UTC-5, Alexander Gunnarson wrote:

As an aside about the stateful `take` transducer, Tesser uses the equivalent of one but skirts the issue by not guaranteeing that the first n items of the collection will be returned, but rather, n items of the collection in no particular order and starting at no particular index. This is achievable without Tesser by simply replacing the `volatile` in the `core/take` transducer with an `atom` and using it with `fold`. But yes, `take`'s contract is broken with this and so still follows the rule of thumb you established that `fold` can't use stateful transducers (at least, not without weird things like reordering of the indices in `map-indexed` and so on).

Right, we intentionally chose to require transducer takes to occur in order to match the sequence take. Tesser's approach is perfectly fine too (as long as you understand the difference).
 
That's interesting that `fold` can use transducers directly! I haven't tried that yet — I've just been wrapping them in an `r/folder`.

Oh, you still need r/folder, sorry! Something like:

(r/fold + (r/folder v (map inc)))



 

 

Alexander Gunnarson

unread,
Apr 10, 2017, 12:42:35 PM4/10/17
to Clojure
On Monday, April 10, 2017 at 12:39:37 PM UTC-4, Alex Miller wrote: 
Oh, you still need r/folder, sorry! Something like:

(r/fold + (r/folder v (map inc)))

Ah, okay, glad to know I wasn't going crazy :) Thanks!

Alexander Gunnarson

unread,
Apr 10, 2017, 12:48:41 PM4/10/17
to Clojure
Léo, I definitely agree that you can use unsynchronized mutable stateful transducers as long as you can guarantee they'll be used only in single-threaded contexts. We were talking up above on which version of synchronization is appropriate for which context. With core.async, if you're using a transducer on a `chan` or `pipeline` or the like, it is guaranteed that only one thread will use that at a time (thus `atom`s weren't needed), but a different thread might come in and reuse that same stateful transducer, in which case the result of that mutation will need to propagate to that thread via a `volatile`. With reducers `fold`, stateful transducers don't necessarily hold up their contract (e.g. with `map-indexed` as we discussed above) even if you use an `atom` or the like. But in truly single-threaded contexts, even within a `go` block or a `thread` or the like (as long as the transducer is not re-used e.g. on a `chan` etc. where the necessity for a `volatile` applies), it's certainly fine to use unsynchronized mutable stateful transducers.


On Monday, April 10, 2017 at 9:37:29 AM UTC-4, Léo Noel wrote:

Seth Verrinder

unread,
Apr 10, 2017, 12:49:17 PM4/10/17
to clo...@googlegroups.com
The problem is at a lower level. The memory model of the JVM doesn't
guarantee that changes to an unsynchronized non-volatile reference are
visible to other threads. Transducers don't have to worry about
concurrency but they do have to worry about visibility of changes
across different threads.
> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clo...@googlegroups.com
> Note that posts from new members are moderated - please be patient with your
> first post.
> To unsubscribe from this group, send email to
> clojure+u...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en
> ---
> You received this message because you are subscribed to a topic in the
> Google Groups "Clojure" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/clojure/VQj0E9TJWYY/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to

Alex Miller

unread,
Apr 10, 2017, 1:06:14 PM4/10/17
to Clojure

On Monday, April 10, 2017 at 11:48:41 AM UTC-5, Alexander Gunnarson wrote:
Léo, I definitely agree that you can use unsynchronized mutable stateful transducers as long as you can guarantee they'll be used only in single-threaded contexts.

Transducers included in core cannot make the assumption that they will only be used that way. (But you may be able to guarantee that with your own.)

Alexander Gunnarson

unread,
Apr 10, 2017, 1:34:39 PM4/10/17
to Clojure
Yes, that makes sense that you can't make that assumption. You'd have to create something like what I was discussing above:

(defn map-indexed-transducer-base [f box-mutable inc-mutable]
  
(fn [rf]
    
(let [(box-mutable -1)]
      
(fn
        
([] (rf))
        
([result] (rf result))
        
([result input]
          
(rf result ((inc-mutable i) input)))))))


;; this is the version that Léo would want
(defn map-indexed-transducer-single-threaded [f]
  
(map-indexed-transducer-base f unsynchronized-mutable-long! #(unsynchronized-mutable-swap! % inc))

;; this is the version included in clojure.core
(defn map-indexed-transducer-sequentially-accessed-by-different-threads [f]

  
(map-indexed-transducer-base f volatile! #(vswap! % inc))


;; this works with `fold` and gives you all the indices at least, but in a nondeterministic order
(defn map-indexed-transducer-concurrently-accessed-by-different-threads [f]

  
(map-indexed-transducer-base f atom #(swap! % inc)) ; or an AtomicLong variant

Léo Noel

unread,
Apr 10, 2017, 2:57:10 PM4/10/17
to Clojure
What you said holds for reduction but not necessarily a parallel fold (see clojure.core.reducers/fold).

Exactly, and that's why stateful transducers are explicitly forbidden in fold and in core.async pipeline functions.
This is not related to memory visibility, this is due to the fact that stateful transducers force the reducing process to be sequential.
Does it make any sense to parallelize map-indexed ? partition-all ? dedupe ?


These kinds of failures are inherently difficult to reproduce unless the code is in production and you're on vacation. ;)

Couldn't agree more. However, we're all clever people and the Java Memory Model is not magic :) 


Léo, I definitely agree that you can use unsynchronized mutable stateful transducers as long as you can guarantee they'll be used only in single-threaded contexts. 

The problem is at a lower level. The memory model of the JVM doesn't guarantee that changes to an unsynchronized non-volatile reference are visible to other threads.
 
The Java Memory Model allows using unsynchronized variables to share data across threads as long as a memory barrier is set between the writer and the reader. For example, in the case of core.async, the channel lock sets a barrier, and there is also (redundant) barriers for each volatile inside transducers.


A transducing process could apply each step of the transduce using a thread from a pool and also not use a memory barrier
 
Transducers included in core cannot make the assumption that they will only be used that way.

Yes, that makes sense that you can't make that assumption. 
 
This is the key point : what assumptions a transducer can make ?
In my opinion, it is reasonable for a stateful transducer to assume that the transducing context will fulfill the contract of "always passing the result of step n to the first argument of step n+1".
This assumption is powerful because it guarantees that there will always be a memory barrier between two successive steps.
Proof (reductio ad absurdum) : without a memory barrier, the result of the step n wouldn't be visible to the (potentially different) thread performing the step n+1.
So here is my question to the language designers : is it reasonable to assume that ?
If yes, that means it's ok to use unsynchronized variables in stateful transducers as long as they stay local.
If no, that means we'll use synchronization in all stateful transducers, with an obvious performance penalty and a benefit that remains unclear.

Alexander Gunnarson

unread,
Apr 10, 2017, 3:25:48 PM4/10/17
to Clojure
I think you present a key question: what assumptions can a transducer make? We know the standard ones, but what of memory barriers? Based on the current implementation, in terms of concurrency, it seems to make (inconsistent — see also `partition-by`) guarantees that sequential writes and reads will be consistent, no matter what thread does the reads or writes. Concurrent writes are not supported. But should sequential multi-threaded reads/writes be supported? This is a question best left to Alex but I think I already know the answer based on his conversation with Rich: it's part of the contract.

I think another key question is, is the channel lock memory barrier part of the contract of a core.async channel implementation? If not, volatiles will be necessary in that context if the memory barrier is ever taken away, and it would make sense that volatiles are used in transducers "just in case" specifically for that use case. But if the channel lock memory barrier is part of the contract and not just an implementation detail, then I'm not certain that it's very useful at all for transducers to provide a guarantee of safe sequential multi-threaded reads/writes.

Alex Miller

unread,
Apr 10, 2017, 3:40:27 PM4/10/17
to Clojure

On Monday, April 10, 2017 at 1:57:10 PM UTC-5, Léo Noel wrote:
What you said holds for reduction but not necessarily a parallel fold (see clojure.core.reducers/fold).

Exactly, and that's why stateful transducers are explicitly forbidden in fold and in core.async pipeline functions.
This is not related to memory visibility, this is due to the fact that stateful transducers force the reducing process to be sequential.
Does it make any sense to parallelize map-indexed ? partition-all ? dedupe ?

Parallel transducers is something Rich has thought about some but that's a future effort.
 
Léo, I definitely agree that you can use unsynchronized mutable stateful transducers as long as you can guarantee they'll be used only in single-threaded contexts. 

The problem is at a lower level. The memory model of the JVM doesn't guarantee that changes to an unsynchronized non-volatile reference are visible to other threads.
 
The Java Memory Model allows using unsynchronized variables to share data across threads as long as a memory barrier is set between the writer and the reader. For example, in the case of core.async, the channel lock sets a barrier, and there is also (redundant) barriers for each volatile inside transducers.

The JVM is pretty good at minimizing this stuff - so while you are stating these barriers are redundant and are implying that's an issue, it would not surprise me if the JVM is able to reduce or eliminate the impacts of that. At the very least, it's too difficult to reason about without a real perf test and numbers.
 
A transducing process could apply each step of the transduce using a thread from a pool and also not use a memory barrier
 
Transducers included in core cannot make the assumption that they will only be used that way.

Yes, that makes sense that you can't make that assumption. 
 
This is the key point : what assumptions a transducer can make ?

A transducer can assume it will be invoked by no more than one thread at a time
 
In my opinion, it is reasonable for a stateful transducer to assume that the transducing context will fulfill the contract of "always passing the result of step n to the first argument of step n+1".
 
This assumption is powerful because it guarantees that there will always be a memory barrier between two successive steps.
Proof (reductio ad absurdum) : without a memory barrier, the result of the step n wouldn't be visible to the (potentially different) thread performing the step n+1.

You're conflating the stateful values inside the transducer with the state returned by and passed into a transducer. That's a linkage that does not necessarily exist.
 
So here is my question to the language designers : is it reasonable to assume that ?

No.
 
If yes, that means it's ok to use unsynchronized variables in stateful transducers as long as they stay local.
 
If no, that means we'll use synchronization in all stateful transducers, with an obvious performance penalty and a benefit that remains unclear.

Fast wrong results are still wrong. I do not think it's at all obvious how this affects performance without running some benchmarks. Volatiles do not require flushing values to all cores or anything like that. They just define constraints - the JVM is very good at optimizing these kinds of things. It would not surprise me if an uncontended thread-contained volatile could be very fast (for the single-threaded transducer case) or that a volatile under a lock would be no worse than the lock by itself.


Alex Miller

unread,
Apr 10, 2017, 3:46:45 PM4/10/17
to Clojure


On Monday, April 10, 2017 at 2:25:48 PM UTC-5, Alexander Gunnarson wrote:
I think you present a key question: what assumptions can a transducer make? We know the standard ones, but what of memory barriers?

Transducers should ensure stateful changes guarantee visibility. That is: you should not make assumptions about external memory barriers.
 
Based on the current implementation, in terms of concurrency, it seems to make (inconsistent — see also `partition-by`) guarantees that sequential writes and reads will be consistent, no matter what thread does the reads or writes. Concurrent writes are not supported. But should sequential multi-threaded reads/writes be supported?

Yes. core.async channel transducers already do this.
 
This is a question best left to Alex but I think I already know the answer based on his conversation with Rich: it's part of the contract.

I think another key question is, is the channel lock memory barrier part of the contract of a core.async channel implementation?

Yes, but other transducing processes may exist either in core in the future or in external libs.

Alexander Gunnarson

unread,
Apr 10, 2017, 3:51:30 PM4/10/17
to Clojure
Thanks for clearing all of that up Alex! Very helpful.

Léo Noel

unread,
Apr 11, 2017, 3:46:55 AM4/11/17
to Clojure
Thank you Alex for these precisions.


The JVM is pretty good at minimizing this stuff - so while you are stating these barriers are redundant and are implying that's an issue, it would not surprise me if the JVM is able to reduce or eliminate the impacts of that. At the very least, it's too difficult to reason about without a real perf test and numbers.

Fast wrong results are still wrong. I do not think it's at all obvious how this affects performance without running some benchmarks. Volatiles do not require flushing values to all cores or anything like that. They just define constraints - the JVM is very good at optimizing these kinds of things. It would not surprise me if an uncontended thread-contained volatile could be very fast (for the single-threaded transducer case) or that a volatile under a lock would be no worse than the lock by itself.

I agree that the perf argument is weak.


A transducer can assume it will be invoked by no more than one thread at a time

Fine. Even simpler like this.


Transducers should ensure stateful changes guarantee visibility. That is: you should not make assumptions about external memory barriers.

How do you enforce no more than one thread at a time without setting a memory barrier ?
For the JMM, no more than one thread at a time means exactly that return of step n will *happen-before* the call to step n+1.
This implies that what was visible to the thread performing step n will be visible to the thread performing the step n+1, including all memory writes performed during step n inside stateful transducers.
Still no need for extra synchronization.


You're conflating the stateful values inside the transducer with the state returned by and passed into a transducer. That's a linkage that does not necessarily exist.

What do you mean ? How could a function return a value without having executed its body ?

Seth Verrinder

unread,
Apr 11, 2017, 7:36:30 AM4/11/17
to clo...@googlegroups.com
The single thread at a time rule is implemented by the transducing
context (transduce, into, core.async, etc). Inside of a transducer's
implementation you just have to make the assumption that it's being
used properly. volatile! is what ensures that there's a memory
barrier.

Léo Noel

unread,
Apr 11, 2017, 8:32:09 AM4/11/17
to Clojure
volatile! is what ensures that there's a memory barrier.

No. The memory barrier is set by the transducing context as a consequence of implementing the "single thread at a time" rule. Be it lock, thread isolation, agent isolation, or anything that ensures that the end of a step happens-before the beginning of the next. All these techniques ensure visibility of unsynchronized variables between two successive steps, even when multiple threads are involved.

Seth Verrinder

unread,
Apr 11, 2017, 10:54:58 AM4/11/17
to clo...@googlegroups.com
Seems risky to depend on that. eduction creates an iterable for
example - it has no way of preventing somebody from creating the
iterator on one thread and consuming it on another.

Alex Miller

unread,
Apr 11, 2017, 11:24:38 AM4/11/17
to Clojure

Transducers should ensure stateful changes guarantee visibility. That is: you should not make assumptions about external memory barriers.

How do you enforce no more than one thread at a time without setting a memory barrier ?

I could have one thread that invokes a transduce step on odd seconds and another that invokes on even seconds. Or some external api call that tells me to take the next step, which I do on a thread pulled from a pool. I'm sure I could come up with others.
 
For the JMM, no more than one thread at a time means exactly that return of step n will *happen-before* the call to step n+1.

happens-before across threads requires a volatile or lock, but I don't see how the use of one is guaranteed by this logical ordering.

This implies that what was visible to the thread performing step n will be visible to the thread performing the step n+1, including all memory writes performed during step n inside stateful transducers.

Only if there is a volatile or lock forcing that visibility.
 
Still no need for extra synchronization.

While this is a good reference, it's also 13 years old and the JMM has been updated since then. A much better reference explaining the semantics and constraints is:


In particular, even if there is a memory barrier, there are some reorderings allowed if the transducer state is not volatile that may be surprising. Making it volatile adds a critical edge in the total program order.

You're conflating the stateful values inside the transducer with the state returned by and passed into a transducer. That's a linkage that does not necessarily exist.

What do you mean ? How could a function return a value without having executed its body ?

I'm saying that the logical ordering of steps is irrelevant wrt how a multi-threaded program can be optimized/reordered under the JMM.

Léo Noel

unread,
Apr 12, 2017, 8:15:09 AM4/12/17
to Clojure
I could have one thread that invokes a transduce step on odd seconds and another that invokes on even seconds. Or some external api call that tells me to take the next step, which I do on a thread pulled from a pool.

Both strategies will fail to ensure no more than one thread at time. You need something to prevent overlapping, e.g when a long step is running and you get a request to start the next one.


While this is a good reference, it's also 13 years old and the JMM has been updated since then. A much better reference explaining the semantics and constraints is:
https://shipilev.net/blog/2014/jmm-pragmatics/ 
In particular, even if there is a memory barrier, there are some reorderings allowed if the transducer state is not volatile that may be surprising. Making it volatile adds a critical edge in the total program order.
I'm saying that the logical ordering of steps is irrelevant wrt how a multi-threaded program can be optimized/reordered under the JMM.

Thank you for the reference. Very enlightening (esp. part III).
I understand reordering is a thing. Does ordering really matter ? What matters to us is that each step is able to see the changes made the step before. That is, we need to ensure memory visibility across steps. This is all what we need to be sure that the JVM won't run the program in an order that doesn't yield the same result as what we expect.
In a degenerate case, we'll put volatile on every variable, ensuring that the running program is totally ordered and totally unoptimizable. Is this what we want ?


happens-before across threads requires a volatile or lock, but I don't see how the use of one is guaranteed by this logical ordering.

Volatiles and locks are means to an end. The end is memory visibility, and the happens-before partial ordering is what is of interest to us, application developers, to reason about this end. The happens-before rules have not changed since jsr-133 (source) :
* Each action in a thread happens-before every action in that thread that comes later in the program's order.
* An unlock (synchronized block or method exit) of a monitor happens-before every subsequent lock (synchronized block or method entry) of that same monitor. And because the happens-before relation is transitive, all actions of a thread prior to unlocking happen-before all actions subsequent to any thread locking that monitor.
* A write to a volatile field happens-before every subsequent read of that same field. Writes and reads of volatile fields have similar memory consistency effects as entering and exiting monitors, but do not entail mutual exclusion locking.
* A call to start on a thread happens-before any action in the started thread.
* All actions in a thread happen-before any other thread successfully returns from a join on that thread.

Here is an example of a multithreaded transducing context that doesn't use locks nor volatiles (inspired from the official documentation for agents) :

(def xf (comp (partition-all 64) cat))

(defn ! [f & args] (apply f args) f)

(defn run [m n]
 
(let [p (promise)
        f
(reduce (fn [f _] (partial send (agent f) (xf !)))
                 
#(when (zero? %) (deliver p nil)) (range m))]
   
(doseq [i (reverse (range n))] (f i))
   
(f)
   
@p))

(run 1000 1000)

The unsynchronized ArrayList in partition-all will be accessed by multiple threads, and I can still be confident about visibility, because agents ensure a happens-before ordering between each message. This behaviour is actually delegated to the backing Executor, which may or may not use locks. Locks are really an implementation detail, what is important is happens-before guarantees.


Seems risky to depend on that. eduction creates an iterable for example - it has no way of preventing somebody from creating the iterator on one thread and consuming it on another. 

Iterators are unsynchronized and mutable, like many classes in the Java standard library. You know they're unsafe and need to treat them as such. This leads to a more general debate on mutability. Objects generally fall in 3 categories :
1. Immutable objects aka values. They're the default in Clojure and that's great because they can be exposed safely and they're so easy to reason about.
2. Thread-safe mutable objects. Includes core reference types, core.async channels. They're useful to model identity, they can be exposed safely but sparingly as they tend to complect the system in the large.
3. Unsafe mutable objects. Includes transients, Iterator, InputStream/OutputStream. They exist for performance or legacy reasons. They need extra care when used in multithreaded contexts and they should not be exposed as-is.

Clearly, reducing functions produced by stateful transducers are of the third type. You need to care if you're designing a transducing context, but most of the time you don't because :
- when you design a stateful transducer, you can write your code as if it were single-threaded because it will be run in a context that cares about concurrency for you
- once your ugly mutable state is encapsulated in a transducer, it's a value. Instead of handling a mutable process, you handle a recipe for making a mutable process, it's immutable and it's good.

Once again, I don't understand what's wrong with that.

Java, beeing mutable by default, has refrained to clutter its standard library with volatiles and locks. Instead, most classes are unsynchronized, optimized for single-threading. When synchronization is needed, we rely on a limited set of concurrency primitives. What's wrong with this approach ?

Seth Verrinder

unread,
Apr 12, 2017, 11:34:32 AM4/12/17
to Clojure
Reordering definitely matters:

StepA: write to x
StepB: read from x

StepB: read from x
StepA: write to x

Léo Noel

unread,
Apr 13, 2017, 5:43:24 AM4/13/17
to Clojure
An optimizer (hardware or software) will not execute the program in an order that it can't prove to be sequentially consistent with the original order.
What is important for us is not to specify exact ordering, it is to specify our intent unambiguously. To find the fastest ordering is optimizers' responsibility.
Reply all
Reply to author
Forward
0 new messages