Core.async unordered pipeline-blocking ?

584 views
Skip to first unread message

Niels van Klaveren

unread,
Nov 27, 2014, 4:32:43 PM11/27/14
to clo...@googlegroups.com
Recently in creating load testing I've been in need of throttling certain mixed IO/CPU bound processes and have been using claypoole.core/upmap for those situations

(require '[com.climate.claypoole :as cp])

(defn wait-and-return
[w]
(Thread/sleep (* 1000 w))
w)

(def to-sort
[38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10])

(def timesorted
(time (doall (cp/upmap 20 wait-and-return to-sort))))

timesorted
"Elapsed time: 38004.512729 msecs"
=> (var clay.core/timesorted)
=> (0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38)

This is just an example to show blocking processes returning in a different order, while being restricted to a certain number of threads. I know this won't work with a threadpool smaller than to-sort.

Other parts of these tests would benefit from core.async techniques, but I haven't found a satisfactory combination of the two. What I'm really looking for is a way to use core.async pipeline-blocking syntax, which takes a fixed number of parallel processes, a from channel, a transducer and a to channel, but returns the results from the transducers unordered (ie. fasted delivered first. Something like this, but with an ordered outcome.

(require '[clojure.core.async :as a])

(def to-sort
(a/to-chan [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10]))

(defn wait-and-return
[w]
(Thread/sleep (* 1000 w))
w)

(def sorted
(a/chan))

(def xwait
(map wait-and-return))

(def sorter
(a/pipeline-blocking 20 sorted xwait to-sort))

(time (a/<!! (a/into [] sorted)))

Is there a function like that, or would there be a recommended way to do something like this in core.async ?

Regards,

Niels

Timothy Baldridge

unread,
Nov 27, 2014, 5:15:58 PM11/27/14
to clo...@googlegroups.com
This wasn't included in core.async mostly because it's fairly easy to write. The code looks something like this (untested):

(def from-chan)
(def to-chan)

(dotimes [_ num-threads]
  (let [rf (fn [_ x] (>!! to-chan))
         f (xform rf)]
    (thread
      (loop []
        (when-some [x (<!! from-chan)]
          (f x))))))

I'll leave it up to you to decide when to terminate or how to shut things down. 

Timothy Baldridge

--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
“One of the main causes of the fall of the Roman Empire was that–lacking zero–they had no way to indicate successful termination of their C programs.”
(Robert Firth)

Timothy Baldridge

unread,
Nov 27, 2014, 5:16:48 PM11/27/14
to clo...@googlegroups.com
eh, that one line should be:

(let [rf (fn [_ x] (>!! to-chan x)) ...)

Francis Avila

unread,
Nov 28, 2014, 12:10:05 PM11/28/14
to clo...@googlegroups.com
I had a need for this too some time ago. It's not very hard to write yourself. Whatever trickiness there is in these functions is in handling exceptions and orchestrating shutdown.

I put my version up in a gist with some other async utility functions I wrote: https://gist.github.com/favila/8e7ad6ea5b01bd7466ff
You are looking for fast-pipeline-blocking.

Niels van Klaveren

unread,
Nov 28, 2014, 5:01:02 PM11/28/14
to clo...@googlegroups.com
Thanks so much for sharing, Francis ! It might be simple to some, but I haven't had an opportunity yet to get acquainted well enough with core.async. Clojure has so much useful libraries, but some force you to get your head around (for me) completely new paradigms which can take time. Reading your code did provide me a with lot of new insights, as well as having a very useful tool I can use.

All time I can devote to learning Clojure have been in projects at work I have to do by myself, often needing to be put together on short notice. There's a snowball's chance in hell of collaboration with my colleagues from the firmly Java entrenched development department. I need to have a solid idea how I can get from start to finish in a very short time, for which Clojure has been quite awesome in some ways, but quite hard in others. The awesome community is often just the push in the back to get over these hurdles though.

Now I have all parts in hand I can start improving some more mature projects by introducing core.async into them. Wiring up existing pieces will get me more experience and confidence to dive deeper into async libraries. Thanks again for the last part of the puzzle, I'm pretty sure I will be able to finish putting it together now !

Stuart Sierra

unread,
Nov 29, 2014, 3:59:57 PM11/29/14
to clo...@googlegroups.com
Here's a version I did that only uses as many threads as necessary to keep up with the input, subject to an upper bound:
http://stuartsierra.com/2013/12/08/parallel-processing-with-core-async

–S
Reply all
Reply to author
Forward
0 new messages