Recently in creating load testing I've been in need of throttling certain mixed IO/CPU bound processes and have been using
claypoole.core/upmap for those situations
(require '[com.climate.claypoole :as cp])
(defn wait-and-return
[w]
(Thread/sleep (* 1000 w))
w)
(def to-sort
[38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10])
(def timesorted
(time (doall (cp/upmap 20 wait-and-return to-sort))))
timesorted
"Elapsed time: 38004.512729 msecs"
=> (var clay.core/timesorted)
=> (0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38)
This is just an example to show blocking processes returning in a different order, while being restricted to a certain number of threads. I know this won't work with a threadpool smaller than to-sort.
Other parts of these tests would benefit from core.async techniques, but I haven't found a satisfactory combination of the two. What I'm really looking for is a way to use core.async pipeline-blocking syntax, which takes a fixed number of parallel processes, a from channel, a transducer and a to channel, but returns the results from the transducers unordered (ie. fasted delivered first. Something like this, but with an ordered outcome.
(require '[clojure.core.async :as a])
(def to-sort
(a/to-chan [38 20 22 24 36 2 30 18 32 0 4 34 14 28 6 16 12 26 8 10]))
(defn wait-and-return
[w]
(Thread/sleep (* 1000 w))
w)
(def sorted
(a/chan))
(def xwait
(map wait-and-return))
(def sorter
(a/pipeline-blocking 20 sorted xwait to-sort))
(time (a/<!! (a/into [] sorted)))
Is there a function like that, or would there be a recommended way to do something like this in core.async ?
Regards,
Niels