Spread work onto multiple threads (in pure Clojure)

1,271 views
Skip to first unread message

ronen

unread,
Sep 21, 2011, 7:06:46 PM9/21/11
to Clojure
I was looking for a pure Clojure solution to run multiple non-
dependant tasks on multiple threads, iv considered using Agent,
Promises or Futures, yet the simplest cleanest succinct solution iv
found is:

(defn email-approved [approved]
(doall (pmap deref (for [req approved] (future (email-request
req))))))

The only thing that I don't like about it is the use of pmap, main
since Im using the parallel mapping action just for side effects
(triggering the consumptions of futures by derefing them).

Iv seen solution that involve Java service executor and the Work
project (https://github.com/getwoven/work), yet id rather achieve this
in pure Clojure.

Thanks
Ronen

Andy Fingerhut

unread,
Sep 21, 2011, 7:15:54 PM9/21/11
to clo...@googlegroups.com
pmap already uses future/deref in its implementation.  When it does so, it limits the *maximum* parallelism possible to be at most (number-of-cpus + 2) threads running at a time, and depending upon the time taken by each of the objects you are mapping over, it can be less then that.

I don't know if it would work for your purposes, but Amit Rathore created a small Clojure library called Medusa that is basically a wrapper around Java executors:

https://github.com/amitrathore/medusa

Andy


--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en

Nathan Sorenson

unread,
Sep 21, 2011, 7:28:04 PM9/21/11
to Clojure
Futures begin executing their contents immediately, you don't have to
deref them to trigger the side effects. (perhaps you were thinking of
delay?)

I'm assuming you are using futures because email-request is an io-
blocking operation? The thing to note is that the body of a future
automatically runs in its own thread, so you could go ahead and just
do a regular map: (doall (map #(future (email-request %)) approved));
all the emails would then be sent in parallel threads.

Sean Corfield

unread,
Sep 22, 2011, 1:00:31 AM9/22/11
to clo...@googlegroups.com
On Wed, Sep 21, 2011 at 6:06 PM, ronen <nar...@gmail.com> wrote:
> (defn email-approved [approved]
>  (doall (pmap deref (for [req approved] (future (email-request
> req))))))

Wouldn't the following be close enough to what you want?

(defn email-approved [approved]
(doall (pmap email-request approved)))
--
Sean A Corfield -- (904) 302-SEAN
An Architect's View -- http://corfield.org/
World Singles, LLC. -- http://worldsingles.com/
Railo Technologies, Inc. -- http://www.getrailo.com/

"Perfection is the enemy of the good."
-- Gustave Flaubert, French realist novelist (1821-1880)

ronen

unread,
Sep 22, 2011, 4:47:54 PM9/22/11
to Clojure
Sorry for my delayed reply (busy day),

Andy: Ill check Medusa, the number of threads in pmap does impose a
limit but in my case I can live with that.

Nathan: Yes the reason I wanted futures because of the blocking IO, I
don't know why I figured out that futures are lazy, this indeed makes
pmap redundant.

Sean: I could use pmap, but ill be limited in number of threads still
in my case I can live with that.

Thank you all for clearing it out, looks like I was on the right track

Ronen

On Sep 22, 8:00 am, Sean Corfield <seancorfi...@gmail.com> wrote:
> On Wed, Sep 21, 2011 at 6:06 PM, ronen <nark...@gmail.com> wrote:
> > (defn email-approved [approved]
> >  (doall (pmap deref (for [req approved] (future (email-request
> > req))))))
>
> Wouldn't the following be close enough to what you want?
>
> (defn email-approved [approved]
>   (doall (pmap email-request approved)))
> --
> Sean A Corfield -- (904) 302-SEAN
> An Architect's View --http://corfield.org/
> World Singles, LLC. --http://worldsingles.com/
> Railo Technologies, Inc. --http://www.getrailo.com/

Kevin Livingston

unread,
Sep 22, 2011, 9:55:42 PM9/22/11
to Clojure
if the list is quite large, is there anything preventing the option
below from creating way too many threads?

there is presumably an inflection point where this will cost you.
pmap is trying to keep that in check, right?

Kevin

Andy Fingerhut

unread,
Sep 22, 2011, 11:34:38 PM9/22/11
to clo...@googlegroups.com
pmap will limit the maximum number of simultaneous threads.  So will the medusa library's medusa-pmap.

The difference is that if one job early in the list takes, e.g., 100 times longer than the next 20, and you have 4 cpus available, pmap will start the first (4+2)=6 in parallel threads, let jobs 2 through 6 complete, and then wait for the first one to finish before starting number 7.  Thus most of the time will be spent running one thread.  This has the advantage of limiting the memory required to store intermediate results, but the disadvantage of low usage of multiple cpu cores.

Medusa has a medusa-pmap that will also limit the parallelism, but it lets you pick the level of parallelism (it isn't fixed at (# cpus + 2)), and it will continue starting new threads, even if that means using more memory when one job takes much longer than the following jobs.

If you like pmap's behavior except for the number of threads, you can always copy its source code into your own program and change that number pretty easily.

Andy

Lee Spector

unread,
Sep 24, 2011, 12:16:42 PM9/24/11
to clo...@googlegroups.com

Thanks for this info -- I didn't realize quite how pmap worked.

I often launch parallel threads with pmap and have sometimes been puzzled by dips in processor utilization that I can't trace to memory resource contention, etc.

I have similar issues sometimes when I launch parallel threads via sends to agents. Will this behave similarly to pmap? If so, is there a straightforward way to get the same kind of benefit as medusa-pmap in an agent context?

-Lee

Andy Fingerhut

unread,
Sep 24, 2011, 1:08:16 PM9/24/11
to clo...@googlegroups.com
I don't know whether there are similar limitations of parallelism when launching threads via sends to agents.  I haven't looked at that yet.  If you have an example program you can share, preferably trimmed down to the core of the issue, I might be able to look at it.

I only know about pmap performance in such detail from using it on computer language benchmarks game Clojure programs, and trying to figure out why they weren't finishing sooner.  It helped to print messages when threads started and ended, examine that output, and then read pmap source code to figure out why it explained that output.

Andy


Lee Spector

unread,
Oct 9, 2011, 7:24:43 PM10/9/11
to clo...@googlegroups.com

I've been playing with medusa and it sometimes does what I expect, but sometimes it's doing something strange and I'm wondering if someone can help me to do one specific medusa-like thing but more simply (and without the strangeness, which I haven't fully traced but I hope to avoid having to).

What I want is a version of pmap that will always use any available cores to compute remaining values (except of course the last couple of values, when there are less remaining values than cores).

In other words, I want the behavior that Andy Fingerhut describes medusa as having here:

> On Sep 22, 2011, at 11:34 PM, Andy Fingerhut wrote:
>
> > pmap will limit the maximum number of simultaneous threads. So will the medusa library's medusa-pmap.
> >
> > The difference is that if one job early in the list takes, e.g., 100 times longer than the next 20, and you have 4 cpus available, pmap will start the first (4+2)=6 in parallel threads, let jobs 2 through 6 complete, and then wait for the first one to finish before starting number 7. Thus most of the time will be spent running one thread. This has the advantage of limiting the memory required to store intermediate results, but the disadvantage of low usage of multiple cpu cores.
> >
> > Medusa has a medusa-pmap that will also limit the parallelism, but it lets you pick the level of parallelism (it isn't fixed at (# cpus + 2)), and it will continue starting new threads, even if that means using more memory when one job takes much longer than the following jobs.
> >

FWIW I'll always be calling this on a finite sequence (although it may have 10000 elements or so, so I presume that I shouldn't start threads for all of them at once), and I will want a fully realized result sequence so I'll surround calls with doall if necessary. I will want all of the pmapped computations to finish before I continue doing anything else.

I know that I could launch the threads in other ways -- e.g. I sometimes use agents for something similar -- but pmap is much more elegant in many cases, and anyway I'm not even sure I'm getting full utilization with my other methods.

The medusa-pmap function is very close to what I want but medusa seems to do more that what I need, it requires initi/startup calls, it involves timeouts which I will never want, and it behaves strangely when I run my code on a 48 core node. (It runs fine on my 4-core laptop, and then it seems to work beautifully on the 48 core node too for a while, giving me nearly 4800% utilization, but then it does something that looks like it might be caused by everything hitting the timeouts... which I bumped up by several orders of magnitude but I'm still getting the weird behavior).

So: Is there a simpler way to get "pmap but not lazy and use all cores fully until the whole result sequence is completed"? This is usually what I really want when I'm writing code that utilizes concurrency.

I've looked at the pmap source but it isn't obvious to me how to change that to do what I want... So any pointers would be appreciated.

Thanks,

-Lee

Lee Spector

unread,
Oct 10, 2011, 4:07:28 PM10/10/11
to clo...@googlegroups.com

I think that the following partially answers my own question and that it provides a way to get decent multicore performance for collections of non-uniform but compute-intensive tasks through a simple, pmap-like interface.

But I'm not sure if it's the best approach and I'd like some feedback. If it *is* a good approach then maybe we should refine it and make it more widely available. If it's not the best approach then I'd love some advice on how to do it better.

The use case here -- which I think must be shared by at least some others -- is that I have a finite, non-lazy collection of inputs on which I'd like to run an expensive (but not uniformly expensive) function, gathering the results in a non-lazy sequence. This is a *very* common need in my own projects. I don't care about the order in which the function calls are made, and I'm not concerned about the memory overhead of retaining all of the results since I want to keep them all anyway. I just want all of the computations done as quickly as possible, using all of my available cores. The pmap function seems at first to provide an elegant way to do what's needed, e.g. with (doall (pmap f inputs)), but as discussed earlier in this thread it will often wait for the completion of earlier calls before starting later calls, and this will be particularly problematic when the runtimes of the calls are uneven.

The medusa package provides something that would seem to fit the bill better, but it comes with other baggage that I don't need or want. I just want a version of pmap that will use all available CPU resources to aggressively complete all of the computations in a non-lazy context.

Here's my stab at doing this using agents:

(defn pmapall
"Like pmap but: 1) coll should be finite, 2) the returned sequence
will not be lazy, 3) calls to f may occur in any order, to maximize
multicore processor utilization, and 4) takes only one coll so far."
[f coll]
(let [agents (map agent coll)]
(dorun (map #(send % f) agents))
(apply await agents)
(doall (map deref agents))))

I should make a version that takes multiple colls, but I for now've written it to take just one for clarity.

This does appear to speed things up pretty significantly in certain circumstances, but maybe not as much as possible. Is it the best approach?

To show that it beats pmap I define a time-wasting function (I want to see real cpu utilization so I'm not using delays) like this:

(defn burn
[]
(dotimes [i 10000]
(reduce * (map float (take 1000 (iterate inc i))))))

And then I define a function that takes a lot or a little bit of time depending on its argument:

(defn fast-or-slow
[n]
(if (zero? n)
:done
(do (burn)
:done)))

And then I create an vector of inputs in which the slow ones are scattered sparsely:

(def inputs (take 1000 (cycle (conj (repeat 20 0) 1))))

And then on a 48 core node I get timings like this:

user=> (time (last (pmapall fast-or-slow inputs)))
"Elapsed time: 37244.151 msecs"
:done

user=> (time (last (doall (pmap fast-or-slow inputs))))
"Elapsed time: 110862.187 msecs"
:done

And by the way, plain old serial map does this:

user=> (time (last (doall (map fast-or-slow inputs))))
"Elapsed time: 260941.836 msecs"

So we've improved things; pmap is a little better than twice as fast as map, and pmapall is roughly 3 times faster than pmap. So I think I'm ready to switch all of my pmaps to pmapalls. But that's still nothing close to a 48x speedup, even though all of the tasks should be completely independent and I wouldn't expect a huge loss for coordination. And another confusing thing is that even with pmapall, when I look at the CPU utilization I see numbers close to 4800% in some cases (like the one above) but numbers topping out at something more like 1900% in some others (e.g. with different input vectors).

So I feel like I'm moving in the right direction but that I'm still probably missing something.

Is there a better way to do this? Surely it will come in handy for others as well if there's a simple way to more effectively dispatch tasks to multiple cores.

Thoughts? Code?

Thanks,

-Lee

Lee Spector

unread,
Oct 10, 2011, 5:04:35 PM10/10/11
to clo...@googlegroups.com

A weakness of my pmapall:

#<Exception java.lang.Exception: Can't await in agent action>

Which means, I think, that I can't call pmapall within a function that I pass to pmapall. Unfortunate.

Is there a better way?

-Lee

PS to see these exceptions one must change the call to agent in my definition with something like #(agent % :error-handler (fn [agnt except] (println except))).

> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clo...@googlegroups.com
> Note that posts from new members are moderated - please be patient with your first post.
> To unsubscribe from this group, send email to
> clojure+u...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en

--
Lee Spector, Professor of Computer Science
Cognitive Science, Hampshire College
893 West Street, Amherst, MA 01002-3359
lspe...@hampshire.edu, http://hampshire.edu/lspector/
Phone: 413-559-5352, Fax: 413-559-5438

Phil Hagelberg

unread,
Oct 10, 2011, 7:16:47 PM10/10/11
to clo...@googlegroups.com
On Mon, Oct 10, 2011 at 1:07 PM, Lee Spector <lspe...@hampshire.edu> wrote:
> Here's my stab at doing this using agents:
>
> (defn pmapall
>  "Like pmap but: 1) coll should be finite, 2) the returned sequence
>   will not be lazy, 3) calls to f may occur in any order, to maximize
>   multicore processor utilization, and 4) takes only one coll so far."
>  [f coll]
>  (let [agents (map agent coll)]
>    (dorun (map #(send % f) agents))
>    (apply await agents)
>    (doall (map deref agents))))

What you're really looking for is pdoseq, right? Seems like futures
might be a better building-block for this, although again Clojure's
lack of flexibility over the thread pool could easily bite you here.

-Phil

Lee Spector

unread,
Oct 10, 2011, 7:38:47 PM10/10/11
to clo...@googlegroups.com

On Oct 10, 2011, at 7:16 PM, Phil Hagelberg wrote:
> What you're really looking for is pdoseq, right? Seems like futures
> might be a better building-block for this, although again Clojure's
> lack of flexibility over the thread pool could easily bite you here.

No -- I want all of the returned values, as map/pmap provides but doseq does not. I want what pmap does except that I want greedier use of available cores until all of the values are computed, at the expense of computing them in order.

-Lee

j-g-faustus

unread,
Oct 10, 2011, 9:43:59 PM10/10/11
to clo...@googlegroups.com
I made an alternative implementation using a thread pool and a queue, based on the example at

In short, your pmapall and the pool-based implementation (below) both give approximately
perfect scaling on my 4/8-core system (Intel i7 920 and HT). 
Both give close to full load on all cores and a factor 4.4 speedup compared to single threaded. 
This seems about right, the CPU has four physical cores and get a few percent extra performance 
from the virtual cores, so the speedup is approximately linear with the number of cores.

pmap-pool may be a tiny bit faster than the pmapall, but they are so close that I can't 
really tell.

It is possible that there is some sort of synchronization overhead on your 48-core machine.
95% of the tasks are practically noops, after all - just the cost of a single function call. 
There are only 48 tasks in your test that actually require computation, so each 
core will do a bunch of noops and perhaps one "real" task. 

In real time, a single i7 920 runs the test just as fast as your 48 cores. I don't expect that's
representative for what your 48 cores can do.

I suggest
* Increase the test size and/or the density of "heavy" tasks.
* Let the "light" tasks do a bit more computation, at least enough to pay for the 
overhead of calling them.
* Start with a smaller number of threads, and see where it stops scaling linearly.


Threadpool/queue-based implementation:

(import '(java.util.concurrent Executors))
(defn pmap-pool [f coll]
  (let [queue (ref coll)  ;; shared queue of work units
        nthreads  (.availableProcessors (Runtime/getRuntime))
        pool  (Executors/newFixedThreadPool nthreads)
        tasks (map (fn [_] 
                     (fn [] ; one task per thread
                       (let [local-res (atom [])] ;; collect results per thread to minimize synchronization
                         (while (seq @queue)
                           ;; queue may be emptied between 'while'
                           ;; and 'dosync'.
                           (when-let [wu (dosync
                                                ;; grab work unit, update queue
                                                (when-let [w (first @queue)]
                                                  (alter queue next)
                                                  w))]
                             (swap! local-res conj (f wu))))
                         local-res)))
                   (range nthreads))
        results (doall (map #(deref (.get %)) ;; blocks until completion
                            (.invokeAll pool tasks))) ;; start all tasks
        results (reduce concat results)]
    (.shutdown pool)
    ;; sanity check
    (when-not (and (empty? @queue)     
                   (= (count results) (count coll))
                   (every? #(= % :done) results))
      (println "ERROR: queue " (count @queue) " #results" (count results)))
    results))

Results on an i7 920, 4 cores/8 threads (hyperthreading), Ubuntu 10.10:

user=> (time (last (map fast-or-slow inputs))))
"Elapsed time: 161891.732036 msecs", 100% CPU (out of 800% possible)

user=> (time (last (pmap fast-or-slow inputs))))
"Elapsed time: 163139.249677 msecs", 100% CPU
pmap has zero effect on my system, it won't use more than one core.

user=> (time (last (pmapall fast-or-slow inputs))))    
"Elapsed time: 37710.349712 msecs", ~793% CPU 

user=> (time (last (pmap-pool fast-or-slow inputs))))
"Elapsed time: 36393.132824 msecs", ~795% CPU

Lee Spector

unread,
Oct 10, 2011, 9:55:09 PM10/10/11
to clo...@googlegroups.com

Interesting. I'll try some of your suggested tests to see if my pmapall all is behaving better than I thought.

Does your pmap-pool permit nesting? (That is, does it permit passing pmap-pool a function which itself calls pmap-pool?). If so then that would be a reason to prefer it over my pmapall.

Thanks,

-Lee

> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clo...@googlegroups.com
> Note that posts from new members are moderated - please be patient with your first post.
> To unsubscribe from this group, send email to
> clojure+u...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en

--

Andy Fingerhut

unread,
Oct 11, 2011, 1:22:17 AM10/11/11
to clo...@googlegroups.com
I'll post more on this later, but I wanted to point out one case where I found that pmap was not achieving the desired level of speedup (# of CPUs/cores) that you would initially expect, and it is not due to any reasons that I've posted about before.

Imagine a 4-core CPU.  There are 4 physical CPU cores on the same chip, so if you can find some computation task that is completely compute-bound, _and that computation task does not require reading from or writing to main memory or any other storage media like hard drives after it is "warmed up"_, then pmap or pmapall should ideally be able to achieve an elapsed time of 1/4 of doing the tasks sequentially.

However, suppose instead that the computation task reads from a large data structure and creates a new large data structure as a result, and that the amount of memory reading is large compared to the amount of computation required.  Then it is possible that the speedup will not be limited by the number of CPU cores available, but by the bandwidth that main memory can be read or written.

For example, the 4 CPU cores in the hypothetical example might have a shared bus for writing to main memory that has a maximum capacity of 20 Gbytes/sec.  If doing the tasks sequentially on one CPU core can completely use that CPU core, and require writing 10 Gbytes/sec to main memory, then it doesn't matter whether you have 2 cores or 50 cores on that chip, the 20 Gbytes/sec of write bandwidth to main memory will limit your parallel speedup to a factor of 2 (i.e. parallel run elapsed times will be at least 1/2 of the sequential elapsed times).

I believe I have found similar cases where I did not have a large output data structure, but I simply generated data structures that very soon become garbage.  Because I allocated them, and the garbage collector did not detect them as garbage until *after* they had been written from the CPU's cache to main memory, caused the parallel speedup to be limited by the CPU's write bandwidth to main memory, which was lower than the number of cores.

In such cases, it would be ideal if the garbage collector could somehow be made smart enough to detect the allocated structures as garbage, and reallocate their memory, before they were ever written to main memory.  This would allow that memory to stay in cache, and the computation could proceed with an order of magnitude less data written to main memory.

Andy

Michael Gardner

unread,
Oct 11, 2011, 5:13:36 AM10/11/11
to clo...@googlegroups.com
Just to throw in my two cents here: it would really be a shame for Clojure's pmap to continue to languish in the state it's currently in (i.e. nearly useless). Even though implementing an alternative using thread pools isn't too hard, it hurts the Clojure concurrency sales pitch-- I can't tell people "and often you can parallelize simple programs just by changing a single character (map -> pmap)" without adding the caveat "but pmap kind of sucks for most purposes, so you'll probably have to roll your own or use a library".

Tassilo Horn

unread,
Oct 11, 2011, 7:56:37 AM10/11/11
to clo...@googlegroups.com
Andy Fingerhut <andy.fi...@gmail.com> writes:

Hi Andy,

> pmap will limit the maximum number of simultaneous threads. So will
> the medusa library's medusa-pmap.
>
> The difference is that if one job early in the list takes, e.g., 100
> times longer than the next 20, and you have 4 cpus available, pmap
> will start the first (4+2)=6 in parallel threads, let jobs 2 through 6
> complete, and then wait for the first one to finish before starting
> number 7. Thus most of the time will be spent running one thread.

Wow, that would render pmap pretty useless, IMO. I've just checked the
code, but I cannot grasp it completely. Could you please explain a bit,
or tell me where I'm wrong below?

(I think, I found the answer myself, so feel free to jump forwart do the
"AAAHHHH!!!".)

Here's the definition:

--8<---------------cut here---------------start------------->8---
(defn pmap
([f coll]
(let [n (+ 2 (.. Runtime getRuntime availableProcessors))
rets (map #(future (f %)) coll) ;; (1)
step (fn step [[x & xs :as vs] fs] ;; (2)
(lazy-seq
(if-let [s (seq fs)]
(cons (deref x) (step xs (rest s))) ;; (3)
(map deref vs))))]
(step rets (drop n rets))))
;; Overloaded variant stripped
)
--8<---------------cut here---------------end--------------->8---

Because rets is a lazy seq, nothing in it is actually realized in (1),
so that means that no future is created and thus no new thread is
started there.

The `step' function (2) always dereferences the first element of the
first seq (3), thus here is the place where the future is actually
created by submitting to some thread pool. But dereferencing also
blocks until the result is calculated (i.e., calls future.get()). So it
looks to me as if there's nothing parallel at all.

Of course, I must be wrong! But where???

Hm, well, the destructuring in the arglist of `step' probably realizes
`x' (and maybe create a future for the first element in `xs'). So then
we'd have at most two active threads, but still we wait before starting
the next one...

AAAHHHH!!! I think, I got it! It's the `drop', isn't it? To drop n
elements, you have to call `rest' n times, and because that calls `seq',
actually the first n elements are realized, that is, the tasks are
submitted to the thread pool.

So indeed, one starts with the number of available processors + 2, and
one single longer running task will wipe out any parallelism. :-(

IMO, that's not what 99% of the users would expect nor want when calling
(pmap foo coll). I'd vote for making pmap eager in the sense that it
should always try to keep n threads working as long as more tasks are
available. Clearly, the current implementation is useful when your
tasks are equally expensive, i.e., the costs don't depend on the
argument.

Bye,
Tassilo

Lee Spector

unread,
Oct 11, 2011, 11:19:17 AM10/11/11
to clo...@googlegroups.com

On Oct 11, 2011, at 7:56 AM, Tassilo Horn wrote:
> So indeed, one starts with the number of available processors + 2, and
> one single longer running task will wipe out any parallelism. :-(
>
> IMO, that's not what 99% of the users would expect nor want when calling
> (pmap foo coll). I'd vote for making pmap eager in the sense that it
> should always try to keep n threads working as long as more tasks are
> available. Clearly, the current implementation is useful when your
> tasks are equally expensive, i.e., the costs don't depend on the
> argument.

I can imagine cases in which one *would* want the current behavior of pmap, especially with infinite lazy sequences, so I wouldn't go quite as far as you are going here.

But I agree that many users will often want an eager version that maximizes CPU utilization, and that they may expect pmap to do that. In fact that was what I wanted (and will usually want), and it's what I assumed pmap would do (even though, now that I look afresh at the doc string, it pretty clearly says that it *doesn't* do this).

So my hope wouldn't be that we change pmap but rather that we provide something else that is just as simple to use but provides eager, "use all available cores to get the job done as fast as possible" behavior with a simple pmap-like interface.

My agent-based pmapall probably isn't the best approach, since calls to it can't be nested. Perhaps j-g-faustus's pmap-pool approach is the way to go, but I do not understand it well enough to know.

In any event I would hope that we could provide something like this, because I do think it will fill a common need.

-Lee

j-g-faustus

unread,
Oct 11, 2011, 1:55:28 PM10/11/11
to clo...@googlegroups.com
On Tuesday, October 11, 2011 3:55:09 AM UTC+2, Lee wrote:

Does your pmap-pool permit nesting? (That is, does it permit passing pmap-pool a function which itself calls pmap-pool?). If so then that would be a reason to prefer it over my pmapall.

I expect it would be possible to nest it (possible as in "no exceptions or deadlocks"), but I can't see any scenario where you would want to - you would get an exponentially increasing number of threads. If 48 cores each start 48 threads, each of those threads start another 48 etc., it doesn't take long before you have enough threads to bog down even the most powerful server.

But what would be the purpose of a nested "run this on all cores" construct? You are already running on all cores, there are no spare resources, so in terms of CPU time I can't see how it would differ from merely having the pmapped function use a plain same-thread map? 

There are no particular advantages to pmap-pool over pmapall that I can see, except that pmap-pool lets you control the number of threads more easily. (E.g. for debugging "where does it stop scaling linearly" problems.)
I thought creating thousands of agents (as pmapall does) would be more expensive, so I tried an alternative, but in practice they seem to be equally fast; at least on this test and the kind of hardware I have access to. 
So pmapall wins by having fewer lines of code.


jf

Andy Fingerhut

unread,
Oct 11, 2011, 2:07:16 PM10/11/11
to clo...@googlegroups.com
One benefit would be convenience of enabling parallelism on nested data structures.  One function at the top level could use parallelism, and the pieces, perhaps handled by separate functions, and perhaps nested several levels deep in function calls, could also use parallelism.

If it were implemented not by creating Java Threads for each task, but submitting a task to an ExecutorService, then the actual number of active Java Threads could be kept reasonably low (e.g. maybe 2 times the number of physical CPU cores), whereas the number of parallel tasks the work is divided into could be limited only by memory for storing the tasks scheduled for future execution.

Andy


--

Tassilo Horn

unread,
Oct 11, 2011, 2:21:46 PM10/11/11
to clo...@googlegroups.com
Lee Spector <lspe...@hampshire.edu> writes:

Hi Lee,

>> So indeed, one starts with the number of available processors + 2,
>> and one single longer running task will wipe out any parallelism. :-(
>>
>> IMO, that's not what 99% of the users would expect nor want when
>> calling (pmap foo coll). I'd vote for making pmap eager in the sense
>> that it should always try to keep n threads working as long as more
>> tasks are available. Clearly, the current implementation is useful
>> when your tasks are equally expensive, i.e., the costs don't depend
>> on the argument.
>
> I can imagine cases in which one *would* want the current behavior of
> pmap, especially with infinite lazy sequences, so I wouldn't go quite
> as far as you are going here.

You are right, I didn't think about infinite seqs. So pmap should be
lazy in order to terminate at all and to be a drop-in replacement for
map. But isn't there a way to always keep n submissions to the thread
pool ahead from the actual realization? Of course, that would mean that
in the case when you don't realize all elements, you have calculated n
elements too much.

> But I agree that many users will often want an eager version that
> maximizes CPU utilization, and that they may expect pmap to do that.
> In fact that was what I wanted (and will usually want), and it's what
> I assumed pmap would do

Agreed. Now you've convinced me that pmap shouldn't be eager, but there
should be an eager version.

> (even though, now that I look afresh at the doc string, it pretty
> clearly says that it *doesn't* do this).

Sort of:

Semi-lazy in that the parallel computation stays ahead of the
consumption, but doesn't realize the entire result unless required.

If I understand the code correctly (see my last mail), then the part
that the "parallel computation stays ahead of the computation" is not
true. It starts parallel but converges to sequential evaluation.

> So my hope wouldn't be that we change pmap but rather that we provide
> something else that is just as simple to use but provides eager, "use
> all available cores to get the job done as fast as possible" behavior
> with a simple pmap-like interface.

Yes.

BTW: Am I the only one who sometimes would also like to have an eager
sequential map variant? At the weekend I've written a proof-of-concept
evaluator that gets some syntax graph of some query language and some
graph on which the query should be evaluated. The evaluation model is
simple syntax-driven recursive evaluation, i.e., to calculate the result
of some node in the syntax graph, one simply evaluates the children and
combines the results. There are also nodes that declare variables with
value ranges, where the child subgraph is then evaluated once for each
possible variable binding. That felt very natural to implement using a
^:dynamic hash-map *vars* holding the current binding in that scope and
`binding' to change the current one. Something along the lines of:

(for [b bindings]
(binding [*vars* (merge *vars* b)]
;; evaluate the children
))

However, one has to force realization of any lazy seq in order to be
sure that the calculation is performed in the dynamic extent of the
surrounding variable binding. So in the sketch above, there's a `doall'
around the `for'.

Bye,
Tassilo

Lee Spector

unread,
Oct 11, 2011, 3:51:44 PM10/11/11
to clo...@googlegroups.com

> On Tue, Oct 11, 2011 at 10:55 AM, j-g-faustus <johannes...@gmail.com> wrote:
>> I expect it would be possible to nest it (possible as in "no exceptions or deadlocks"), but I can't see any scenario where you would want to - you would get an exponentially increasing number of threads. If 48 cores each start 48 threads, each of those threads start another 48 etc., it doesn't take long before you have enough threads to bog down even the most powerful server.
>>
>> But what would be the purpose of a nested "run this on all cores" construct? You are already running on all cores, there are no spare resources, so in terms of CPU time I can't see how it would differ from merely having the pmapped function use a plain same-thread map?

On Oct 11, 2011, at 2:07 PM, Andy Fingerhut wrote:
> One benefit would be convenience of enabling parallelism on nested data structures. One function at the top level could use parallelism, and the pieces, perhaps handled by separate functions, and perhaps nested several levels deep in function calls, could also use parallelism.


Or consider the following scenario (which is exactly what I was doing :-): You want to produce the next generation of programs in an evolutionary computation system from n parents, each of which may produce m offspring, with n much larger than your number of cores (c). The runtimes for offspring production may vary widely. So you do something like (pmapall reproduce population) to maximize processor utilization among the parents, but late in the process, when the number of parents who are still busy making offspring is less than c, your cores start to go idle. Suppose you have just a few remaining parents who are much slower than the others, and each has to chug through m independent birth processes with cores sitting idle. If you could nest calls to pmapall then those slowpokes would be farming some of their m births out to the available cores.

In this situation, at least, I don't care about the order in which the (* n m) independent tasks are completed. I just want them all done, with as little wasted CPU capacity as possible. And I don't want to somehow factor out all of the births into one long queue that I pass to a single call to pmapall. The natural decomposition of the problem is to say something like (pmapall reproduce population) at the population level, something like (pmapall (fn [_] (make-baby parent)) (range litter-size)) at the parent level, and ask the platform to make the calls as aggressively as possible, reusing cores for pending calls whenever possible.

Calls to pmap can be nested and I *think* that they will actually do some of this, if the sequencing of fast vs slow things is fortuitous. For example, I *think* that in the some of the situations that have been described here, where some cores are idle waiting for a slow, early computation in a sequence to complete, a pmap from within the slow, early computation will use some of those available cores. I'm not certain of this, however! And in any event it's not ideal for the reasons that have been raised.

-Lee

Avram

unread,
Oct 12, 2011, 4:41:15 PM10/12/11
to Clojure
I haven't read the entire thread carefully, but have you considered
the "work" library <https://github.com/getwoven/work> as a potential
fit for what you are trying to do?

Example from the github link:

user> (require '[work.core :as work])
user> (work/map-work client/get ["http://clojure.org" "http://
github.com/richhickey/clojure"] 2)
user> (doc work/map-work)
-------------------------
work.core/map-work
([f xs threads])
like clojure's map or pmap, but takes a number of threads, executes
eagerly, and blocks.

Hope this is useful.
-A


On Oct 11, 12:51 pm, Lee Spector <lspec...@hampshire.edu> wrote:

Lee Spector

unread,
Oct 12, 2011, 4:53:04 PM10/12/11
to clo...@googlegroups.com

On Oct 12, 2011, at 4:41 PM, Avram wrote:

> I haven't read the entire thread carefully, but have you considered
> the "work" library <https://github.com/getwoven/work> as a potential
> fit for what you are trying to do?

I hadn't heard of it, but it does look promising and I will check it out. Thanks!

-Lee

j-g-faustus

unread,
Oct 12, 2011, 6:21:56 PM10/12/11
to clo...@googlegroups.com
On Tuesday, October 11, 2011 8:07:16 PM UTC+2, Andy Fingerhut wrote:
> If it were implemented not by creating Java Threads for each task, but submitting a task to an ExecutorService...

As far as I understand, that's easy if you're willing to use it asynchronously: Push tasks on the queue, let the worker threads deal with them whenever they get around to it, and never block waiting for results within a task. 
I.e. a task could submit nested tasks to the queue, but it couldn't use them to compute partial results for its own use.

If you want synchronous behavior, there are only two ways that I know of to do that: Either disallow nesting, like (await agent), or use an unbounded thread pool, like pmap. 

If you allow unbounded nesting of "submit task and wait for the result" on a fixed size thread pool, I'm pretty sure you'll end up with deadlock. (Which is why agents disallow it, I assume.)
Imagine a 1-thread ExecutorService where the task running on the single work thread submits a new task to the service and blocks until it receives a result: The only thread that can process that task just went to sleep, and it won't wake up until it receives a result that there are no threads left to compute. 
The issue is the same with more worker threads; except that it will work, more or less, as long as at least one thread is still awake. But the scenario where every worker thread submits-and-waits simultaneously is bound to happen at some point.


With the disclaimer that I might be missing something,

jf

Ivan Koblik

unread,
Oct 13, 2011, 6:41:05 AM10/13/11
to clo...@googlegroups.com
Fail enough. I guess, to allow nested spawning and avoid deadlocks, tasks should finish without waiting for the result but spawn a new task similar to itself that would check for the completion of the child tasks, repeating the cycle if necessary.

Imagine that task t1 is monitored through a future f1, now it spawns child task t2 which is monitored through a future f2. t1 doesn't wait for t2 to complete, but reschedules a new task t3 that would check f2 for completion, and if f2 is done then sets f1 to done, otherwise reschedules itself as t4.

Although I don't know how memory hungry it may get and what would be the impact on the length of the task queue. Maybe we could make the queue of limited length and if new task t2 gets rejected then it is performed within the task t1?

Cheers,
Ivan.

Ivan Koblik

unread,
Oct 13, 2011, 9:14:05 AM10/13/11
to clo...@googlegroups.com
Sorry I meant "Fair enough ..."

Cheers,
Ivan.
Reply all
Reply to author
Forward
0 new messages