Mapper based parallelization

18 views
Skip to first unread message

bkchoi80

unread,
Apr 5, 2011, 5:42:08 PM4/5/11
to Segue for R
I used segue for my job at a startup company, it solve many of my
headaches indeed! During the task, I happened to change some of it. JD
pointed me to this group, so I guess this is a good place to explain
the changes and have some discussions.

The major change was switching from reducer-only job (or "cat" mapper
job) to mapper-only job. Since segue is targeting independent
execution of multiple CPU intensive tasks, it would be best to assign
an R process for each virtual core. The default mapper-only job does
this very poorly, since inputSplit logic is based on block size, which
is much larger than typical serialized strings of R arguments. I
noticed each instance get just one mapper in most of cases, if I run
mapper-only job.

The reducer-only job solves the problem, but it didn't seem to be
perfect. First, the default hash partitioner almost works like random
assignment thus some reducers get more jobs than others. For example,
if we launch 16 reducers with 16 virtual cores and run "emrlapply"
with 16 different arguments, some reducers get no input while some
reducers get more than one jobs, thus waisting some of the cores.
Also, I noticed no matter how many slots and reducers I set, actual
number of reducers in each instance is one less than the number of
cores, for some unknown reason.

So as a workaround, I realized that the mapper assignment process can
be controlled by the number of input files. By default, Hadoop splits
each file individually (not after merging), meaning if we put only one
argument per input file, each mapper will get single argument. Hadoop
framework then schedules the execution of mapper jobs on available
mapper slots. After setting the number of mapper slots equal to the
number of virtual cores per instance, I indeed notice almost 100% cpu
utilization.

One drawback of the above scheme is the overhead of copying input
files to S3, when there are a big number of input arguments, say 1000.
One solution might be setting the number of input arguments per file
equal to the number of total mapper slots. This can provide additional
way of control load balancing too. For example, if we know some
arguments take much longer time than others, we can try to evenly
spread them to mapper jobs, to prevent some mappers get more than one
of such "heavy" arguments.

bkchoi80

unread,
Apr 5, 2011, 5:48:52 PM4/5/11
to Segue for R
> One solution might be setting the number of input arguments per file
> equal to the number of total mapper slots.

I made a mistake. It should be "the number of input files equal to the

jdlong

unread,
Apr 7, 2011, 12:54:45 PM4/7/11
to Segue for R
Thanks for taking the time to write your thoughts. I've been thinking
about this myself and I like your solution. I'm looking at the patch
you sent and I really appreciate you fixing a number of "rough edges"
in the code in addition to the file changes you mentioned in this
post.

I'm playing with some tests and I will get back to you in a day or so
with some thoughts. I fully anticipate that a simple "setting the
number of input arguments per file equal to the number of total mapper
slots" which you proposed will be a good solution. I like keeping it
pretty simple.

Thanks again for sending actual code. It's really really helpful!

-JD
Reply all
Reply to author
Forward
0 new messages