Parallel doseq?

942 views
Skip to first unread message

Cedric Greevey

unread,
May 24, 2012, 1:56:26 AM5/24/12
to clo...@googlegroups.com
For some reason, this doesn't actually seem to be executing in parallel:

(defmacro pdoseq
"Bindings as for for, but parallel execution as per pmap, pcalls,
pvalues; returns
nil."
[seq-exprs & body]
`(do
(doall
(pmap identity
(for ~seq-exprs (do ~@body)))
nil)))

user=> (pdoseq [i (range 10)] (println n))
0
1
2
3
4
5
6
7
8
9

Never any interleaving of output and if I give it a big CPU-bound job
to do for each integer it only saturates one core.

I thought it might be a chunked-seq issue, but:

user=> (chunked-seq? (range 10))
false

Sean Corfield

unread,
May 24, 2012, 2:15:13 AM5/24/12
to clo...@googlegroups.com
First off, the code you posted can't actually be right: you have
(println n) but the for binding was for i.

Second, given your macro, try (range 100) instead of (range 10) and
see what you get...

Cedric Greevey

unread,
May 24, 2012, 2:37:16 AM5/24/12
to clo...@googlegroups.com
Replacing (range 10) with (take 10 (iterate inc 0)) didn't change
anything. It's still not parallelizing.

I need it to parallelize even for low-length sequences because the
individual elements may be expensive and there may be few of them.

Sean Corfield

unread,
May 24, 2012, 3:00:24 AM5/24/12
to clo...@googlegroups.com
On Wed, May 23, 2012 at 11:37 PM, Cedric Greevey <cgre...@gmail.com> wrote:
> Replacing (range 10) with (take 10 (iterate inc 0)) didn't change
> anything. It's still not parallelizing.

My point was that when you replace (range 10) with (range 100) in your
code, it prints numbers up to 31 and no more. You didn't try that, I
presume?
--
Sean A Corfield -- (904) 302-SEAN
An Architect's View -- http://corfield.org/
World Singles, LLC. -- http://worldsingles.com/

"Perfection is the enemy of the good."
-- Gustave Flaubert, French realist novelist (1821-1880)

Cedric Greevey

unread,
May 24, 2012, 3:17:56 AM5/24/12
to clo...@googlegroups.com
On Thu, May 24, 2012 at 3:00 AM, Sean Corfield <seanco...@gmail.com> wrote:
> On Wed, May 23, 2012 at 11:37 PM, Cedric Greevey <cgre...@gmail.com> wrote:
>> Replacing (range 10) with (take 10 (iterate inc 0)) didn't change
>> anything. It's still not parallelizing.
>
> My point was that when you replace (range 10) with (range 100) in your
> code, it prints numbers up to 31 and no more. You didn't try that, I
> presume?

Sounds like pmap is *really* broken. (doall (pmap identity x)) should
realize every element of x, surely, just like (doall x)?

In any event I have a working workaround now, one that uses an
explicit thread pool. It's ugly but it works.

John Szakmeister

unread,
May 24, 2012, 6:23:32 AM5/24/12
to clo...@googlegroups.com
On Thu, May 24, 2012 at 2:15 AM, Sean Corfield <seanco...@gmail.com> wrote:
> First off, the code you posted can't actually be right: you have
> (println n) but the for binding was for i.
>
> Second, given your macro, try (range 100) instead of (range 10) and
> see what you get...

Can you explain that a little more Sean? I know of some issues with
chunked sequences, but I expected this to work as well with the doall
in there.

Thanks!

-John

Chris Perkins

unread,
May 24, 2012, 7:46:39 AM5/24/12
to clo...@googlegroups.com
On Thursday, May 24, 2012 3:17:56 AM UTC-4, Cedric Greevey wrote:
On Thu, May 24, 2012 at 3:00 AM, Sean Corfield wrote:
> On Wed, May 23, 2012 at 11:37 PM, Cedric Greevey wrote:
>> Replacing (range 10) with (take 10 (iterate inc 0)) didn't change
>> anything. It's still not parallelizing.
>
> My point was that when you replace (range 10) with (range 100) in your
> code, it prints numbers up to 31 and no more. You didn't try that, I
> presume?

Sounds like pmap is *really* broken. (doall (pmap identity x)) should
realize every element of x, surely, just like (doall x)?

 
Unfortunately (pmap identity ...) won't do what you want. The only computation that will be parallelized is the call to identity.

To pmap, the for-expression is a black box - just a seq - it cannot reach into it and parallelize its insides. It can only consume it sequentially.

The meat of pmap is this: (map #(future (f %)) coll)
That must produce each element of coll (in this case, the for-loop), in order, *before* passing it to future.


Also, pmap is not broken; the problem with the code you pasted is simply an errant closing paren. You had (do (doall (pmap ...) nil)), where you meant (do (doall (pmap ...)) nil). Apparently doall has a two-arg version, which is news to me :)


- Chris Perkins
 
 

John Szakmeister

unread,
May 24, 2012, 8:02:34 AM5/24/12
to clo...@googlegroups.com
On Thu, May 24, 2012 at 7:46 AM, Chris Perkins <chrispe...@gmail.com> wrote:
[snip]
> Also, pmap is not broken; the problem with the code you pasted is simply an
> errant closing paren. You had (do (doall (pmap ...) nil)), where you meant
> (do (doall (pmap ...)) nil). Apparently doall has a two-arg version, which
> is news to me :)

I think you answered my question as well. Thanks for the nice explanation!

-John

Sean Corfield

unread,
May 24, 2012, 1:25:12 PM5/24/12
to clo...@googlegroups.com
Right, my point was Cedric's code had a bug, not pmap - I was
suggesting a test that would have highlighted that.

Cedric Greevey

unread,
May 24, 2012, 3:22:35 PM5/24/12
to clo...@googlegroups.com
Sorry. *Something* is apparently messing with my outbound messages.
I'm not sure what, why, or how. Characters are moved or substituted at
random times, sometimes with unfortunate results.

Meanwhile, I had tried using pcalls as well but was getting spurious
behavior. I wound up with:

(defmacro pdoseq
"Bindings as for for, but parallel execution as per pmap, pcalls,
pvalues; returns
nil."
[seq-exprs & body]
`(let [procs# (+ 2 (.availableProcessors (Runtime/getRuntime)))
calls# (for ~seq-exprs (fn [] ~@body))
threads# (for [i# (range procs#)]
(Thread.
#(doall
(map (fn [x#] (x#))
(take-nth procs#
(drop i# calls#))))))]
(doseq [t# threads#]
(.start t#))
(doseq [t# threads#]
(.join t#))))

The body is wrapped in a function so realizing an element of the seq
doesn't immediately execute it. Threads are created that will skip to
every nth element and invoke the function there, each starting offset
from the others. The threads are then all started, and then all joined
so the pdoseq call doesn't return until the job's done, just as plain
doseq has a synchronous return.

One potential issue is that if one thread gets well ahead of the
others a big chunk of the for seq is held onto while the others catch
up. So it's suitable for up to thousands of items that are
individually expensive. If you had millions of individually cheap
items you'd need another strategy. A nested iteration like (doseq [x
s1 y s2] foo) could be split into (pdoseq [x s1] (doseq [y s2] foo))
so that whole inner loops are the granularity of the parallel jobs
(diluting each anonymous function call overhead over a larger portion
of the total work) and the size of the potentially-held-onto seq is
only the size of the outer loop (e.g. the maximum size of held-onto
seq would be 1024 instead of 786432 if you are looping over a 1024x768
array, easily possible with some image manipulation jobs, and there
wouldn't be an added function call overhead once every pixel but only
once every row.)

An interesting question is why something like this isn't already in
the standard library. The supplied parallel functions (pmap, pcalls,
pvalues) don't have for bindings and don't seem to be adaptable to use
them.

Another thing that I thought could be handy is a lazy vector -- each
element is realized only when retrieved. Backed by a vector of delays,
of course. And super-lazy seqs and vectors built on a "weak delay":
something like delay, except that it uses a WeakReference to cache its
value. If it goes away the code that computed it will run again (so it
better not have side effects) if it's needed again. Lazy vectors could
even be backed by a function with a single integer argument -- it
seems unlikely you could define one in another way than as some sort
of mapping from index to value-to-produce anyway. There'd need to be
some kind of tree to hold the cache as well, perhaps one suited to
represent sparse vectors for good performance and low memory use when
few elements were realized. The usual 32^n trees, but with some
branches omitted with nils, might do the job; as it filled in when
elements were realized it would turn into a normal vector as far as
memory use was concerned. In the super-lazy case a (mutable!) map with
weak values (there's one in the Apache Commons) might be preferable to
a tree holding individual weak delays.

How are lazy vectors and super-lazy things relevant to this thread?
Well, the above already implements something like a super-lazy seq in
that a compact closure stands in for each realized item and must be
called repeatedly to produce the true seq element. The only thing
missing is the weak caching. And a super-lazy vector would be
especially easy to use in parallelized manner because it's random
access. The above code that produces offsets i# and stepsizes procs#
would, instead of being fed to (take-nth procs# (drop i# aseq)) to get
functions to invoke, would be fed to (map lvec (range i# (count lvec)
procs#)) to get the actual elements.

Carlo Zancanaro

unread,
May 24, 2012, 3:06:39 AM5/24/12
to clo...@googlegroups.com
You're doing all your computation in the generation of the lazy
sequence (which is in order). Then you're mapping "identity" in
parallel, but that doesn't do anything.

If you're willing to lose the "for" style bindings, try something more
like this:

(defmacro pdoseq
"Run over a sequence in parallel (like pmap)"
[seq-exprs & body]
(let [pairs (partition 2 seq-exprs)]
`(do (doall (pmap (fn ~(vec (map first pairs)) ~@body) ~@(map
second pairs))) nil)))
> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clo...@googlegroups.com
> Note that posts from new members are moderated - please be patient with your first post.
> To unsubscribe from this group, send email to
> clojure+u...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en

Cedric Greevey

unread,
May 27, 2012, 2:52:09 AM5/27/12
to clo...@googlegroups.com
I noticed with the working pdoseq I posted earlier that sometimes the
threads on one core get ahead of those on the others, for some reason,
and then that core is idle for the rest of a job -- Windows, at least,
doesn't seem to reassign one or more threads to the freed core. So I
wrote this version:

(defmacro pdoseq
"Bindings as for for, but parallel execution as per pmap, pcalls,
pvalues; returns
nil."
[seq-exprs & body]
`(let [procs# (+ 2 (.availableProcessors (Runtime/getRuntime)))
calls# (for ~seq-exprs (fn [] ~@body))
chunks# (atom [nil (partition 100 calls#)])
threads# (for [i# (range procs#)]
(Thread.
#(loop []
(let [chunk# (first
(swap! chunks# (fn [[_# [a# &
b#]]] [a# b#])))]
(when chunk#
(doall
(map (fn [x#] (x#)) chunk#))
(recur))))))]
(doseq [t# threads#]
(.start t#))
(doseq [t# threads#]
(.join t#))))

This one replaces the interleaving with a job queue, which each thread
atomically takes from. The input items are chunked into hundreds in
the code above, but that number can easily be changed (or even made
into a parameter of the macro). I think this should divide the work up
more evenly among the available cores until nearly the end (if there
are thousands or more of calls in calls#) when there will eventually
be no chunks left unassigned and worker threads becoming idle.

In case anyone might find this useful, I'm relinquishing any copyright
in either version and placing both into the public domain for others
to freely reuse.

Cedric Greevey

unread,
May 27, 2012, 3:11:40 AM5/27/12
to clo...@googlegroups.com
On Sun, May 27, 2012 at 2:52 AM, Cedric Greevey <cgre...@gmail.com> wrote:
> I noticed with the working pdoseq I posted earlier that sometimes the
> threads on one core get ahead of those on the others, for some reason,
> and then that core is idle for the rest of a job -- Windows, at least,
> doesn't seem to reassign one or more threads to the freed core. So I
> wrote this version:
>
> (defmacro pdoseq
>  "Bindings as for for, but parallel execution as per pmap, pcalls,
> pvalues; returns
>   nil."
>  [seq-exprs & body]
>  `(let [procs# (+ 2 (.availableProcessors (Runtime/getRuntime)))
>         calls# (for ~seq-exprs (fn [] ~@body))
>         chunks# (atom [nil (partition 100 calls#)])

Eh. For general use, that needs to be (partition 100 100 nil calls#)
in case the total size of the job isn't divisible by 100; otherwise,
the items after the last exact multiple of 100 will be silently not
done.

Steve Miner

unread,
May 27, 2012, 11:46:53 AM5/27/12
to clo...@googlegroups.com

On May 27, 2012, at 3:11 AM, Cedric Greevey wrote:

> Eh. For general use, that needs to be (partition 100 100 nil calls#)
> in case the total size of the job isn't divisible by 100; otherwise,
> the items after the last exact multiple of 100 will be silently not
> done.

You could use partition-all.

clojure.core/partition-all
([n coll] [n step coll])
Returns a lazy sequence of lists like partition, but may include
partitions with fewer than n items at the end.

Reply all
Reply to author
Forward
0 new messages