rmr2 example works but hadoop is slower than local on AWS EMR

158 views
Skip to first unread message

Dave Hurst

unread,
Mar 20, 2015, 10:49:43 PM3/20/15
to rha...@googlegroups.com
I'm trying to use RHadoop on AWS EMR to tackle some data analysis using the rmr2 mapreduce.  I've been able to get things working, but when I run in "hadoop" mode (versus "local") it actually slows down slightly.  It's as if its ignoring the slave nodes, or at least just running on one node, which makes it slower than the local mode, presumably due to the added overhead.

To simulate my analysis, I wrote a standalone benchmark.R program (below).  Basically it creates 25 groups of xy data, and then runs calculations on each group.  I expect rmr2 to divide the map jobs across the 5 slaves (but I don't think that's happening.  I don't have a reduce task.  Running the full set of calculations in serial ("local") takes about 7 minutes on an AWS m3.xlarge instance:

[1] "local mode finished in    396.1 seconds"
[1] "hadoop mode finished in    449.2 seconds"

Can anyone tell me what I need to do to get the system to parallelize the jobs?

I'm starting the RHadoop cluster with this command via the CLI (bucket name and keypair removed):
aws emr create-cluster --ami-version 3.2.1 --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge instanceGroupType=CORE,InstanceCount=5,InstanceType=m3.xlarge --bootstrap-actions Path=s3://<BUCKET_NAME>/setup/emR_bootstrap.sh,Name=CustomAction,Args=[--rstudio,--rexamples,--plyrmr,--rhdfs] --steps Name=HDFS_tmp_permission,Jar=s3://elasticmapreduce/libs/script-runner/script-runner.jar,Args=s3://<BUCKET_NAME>/setup/hdfs_permission.sh --region us-west-1 --ec2-attributes KeyName=<KEYPAIR>,AvailabilityZone=us-west-1a --no-auto-terminate --name emR-example

Here's the code: to run it:
  1. log into Rstudio on the master node
  2. upload the script and source it
  3. enter "benchmark (modes)" at that Rstudio commmand prompt
#######################
modes <- c("local", "hadoop")

# to run:
# benchmark( modes )
###################

Sys.setenv(HADOOP_CMD="/home/hadoop/bin/hadoop")
Sys.setenv(HADOOP_STREAMING="/home/hadoop/contrib/streaming/hadoop-streaming.jar")
Sys.setenv(JAVA_HOME="/usr/java/latest/jre")

library(rmr2);  

N.id <- 25
N.xy <- 100000

set.seed(2015)
data <- data.frame( x=rnorm(N.id * N.xy), y=rnorm(N.id * N.xy), id=rep(1:N.id, each=N.xy))

map.fun <- function(k, v) {   
    for (id in v) {
        xy <- data[ id==id, ]
        xy$len <- with(xy, sqrt( x^2 + y^2))
        xy$bearing <- with(xy, atan(y / x))
        xy.names <- colnames(xy)
        calcs <- paste0("calc", 1:8)
        for (calc in calcs) xy <- cbind(xy, atan2(xy[ ,length(xy)-1], xy[ ,length(xy)]))
        colnames(xy) <- c(xy.names, calcs)
    }
    keyval("head(xy)", head(xy))  #head to insure from.dfs won't choke
}

benchmark <- function( modes ) {
    if (length(modes) <= 0 ) return("modes is empty")
    
    benchmark.times <- numeric()
    
    for (mode in modes) {
        rmr.options(backend=mode)
        
        p1 <- proc.time()
        out <- mapreduce(
            input = to.dfs( 1:N.id ),
            map = map.fun,
            combine = TRUE
        )
        p2 <- proc.time()
        benchmark.times <- c( benchmark.times, (p2-p1)[3] )
        print(values(from.dfs(out)))
        print( sprintf("%s mode finished in %8.1f seconds", mode, (p2-p1)[3]))
    }
    
    names(benchmark.times) <- modes
    barplot(benchmark.times, ylab="Time (sec)")
}

Antonio Piccolboni

unread,
Mar 20, 2015, 10:57:56 PM3/20/15
to RHadoop Google Group
I wouldn't mind trying this setup, are those bootstrap scripts available? 

--
post: rha...@googlegroups.com ||
unsubscribe: rhadoop+u...@googlegroups.com ||
web: https://groups.google.com/d/forum/rhadoop?hl=en-US
---
You received this message because you are subscribed to the Google Groups "RHadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rhadoop+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Dave Hurst

unread,
Mar 21, 2015, 3:48:56 AM3/21/15
to rha...@googlegroups.com
@Antonio, 
I should have mentioned my starting point for this was Markus Schmidberger's excellent article: Statistical Analysis with Open-Source R and RStudio on Amazon EMR. That article has a little more detail, but the essentials are to install the  AWS Command Line Interface. And to copy the required scripts (available at the emr-bootstrap-action github repository) to your Amazon Simple Storage Service (Amazon S3) bucket and replace <BUCKET_NAME> with your bucket name: 

Antonio Piccolboni

unread,
Mar 23, 2015, 12:57:32 PM3/23/15
to RHadoop Google Group
Thanks for the clarification. The level of parallelism is mostly outside rmr2 control, and completely in the map phase. With smaller input sets and CPU-bound computations, that has been a problem. Most people report that the setting mapreduce.job.maps is just a "hint", without explaining what a hint to a machine is supposed to mean. It seems like the number of splits in the input is more important.  One aspect that you can control with rmr2 is the number of distinct keys, as rmr2 goes to a certain amount of effort to keep same key records together in a single hadoop record, for efficiency. In your case, you seem to already have 25 distinct ids, so I suppose there's nothing we can do there. The other is to try and affect split size with hadoop confguration. I am not sure how's that done or if it is possible with EMR. If it  helps, you can pass arbitrary configuration to the underlying hadoop commands with the backend.parameters argument to mapreduce. In general, hadoop is for big data, not for CPU bound computations. That doesn't mean people haven't been using it successfully outside its core domain of applicability, including record-setting approximations of pi, but to the best of my understanding this use requires specialized configurations that are far from the defaults.
I hope this helps.

Antonio

On Sat, Mar 21, 2015 at 12:48 AM, Dave Hurst <dhur...@gmail.com> wrote:
UPDATE:
I should have mentioned my starting point for this was Markus Schmidberger's excellent article: Statistical Analysis with Open-Source R and RStudio on Amazon EMR. That article has a little more detail, but the essentials are to install the  AWS Command Line Interface. And to copy the required scripts (available at the emr-bootstrap-action github repository) to your Amazon Simple Storage Service (Amazon S3) bucket and replace <BUCKET_NAME> with your bucket name: 

--

Dave Hurst

unread,
Mar 24, 2015, 12:12:00 AM3/24/15
to rha...@googlegroups.com, ant...@piccolboni.info
Not sure where to go at this point.   

In general, hadoop is for big data, not for CPU bound computations 

Isn't the difference between my example, and big data the value that's set for N.id in the example?  I'd like to gain some confidence that the parallelization is happening before trying to tackle something bigger  (for my actual data, N.id ~ 2500).  I've attached a console log file to see if any more info can be gleened from it, but the essentials are that map task forms 44 splits, which I assumed would be distributed across the 5 slaves, but I don't think that's happening although I don't know how to check.  The EMR job produced a lot of logs, but I'm not sure how to sift through them.  

I've also tried this on a 3 node (2 slaves) cluster, with similar results (hadoop mode is higher than local mode, and the delta between hadoop and local looks to be loosely proportional to the number of slaves, so all I'm doing is picking up overhead without any parallelization benefits).
benchmark2_5node.txt

Saar Golde

unread,
Mar 25, 2015, 7:55:04 AM3/25/15
to rha...@googlegroups.com, Antonio Piccolboni
There is a good chance only two mappers are running at a time, as is the default in many mapreduce settings unless you change them directly. Try following the tracking link in the mapreduce outputs (something like "The url to track the job: http://172.31.20.187:9046/proxy/application_1426893407999_0001/") and see how many of them are running at the same time. 
The logs seem to indicate that there are very few mappers working:
Job Counters Launched map tasks=44 Data-local map tasks=29 Rack-local map tasks=15 Total time spent by all maps in occupied slots (ms)=5412048 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=902008 Total vcore-seconds taken by all map tasks=902008 Total megabyte-seconds taken by all map tasks=1298891520 Map-Reduce Framework Map input records=3 Map output records=3

Since you used to.dfs to distribute the data across the cluster, it's likely it all ended up in one place (it's a very small dataset). So the report that 29 map tasks were data-local should start to look suspicious. The second clue is that there were 3 map input records - again, tiny dataset gets split to tiny fragments, not really the what hadoop was mean to figure out how to do on its own. 

That said, log analysis is nice but the only way to know for sure is to track them while they run. 

Another option is to use custom counters in your mapper, to indicate how many times the map function was called. I would guess in this case it was 3 (leaving lots of excess capacity on the cluster), but use your own counters (using the increment.counter function) to make sure. 

-Saar
 

--

Dave Hurst

unread,
Mar 27, 2015, 12:10:32 AM3/27/15
to rha...@googlegroups.com, ant...@piccolboni.info
Thanks for the helpful response.  


On Wednesday, March 25, 2015 at 4:55:04 AM UTC-7, Saar Golde wrote:
There is a good chance only two mappers are running at a time, as is the default in many mapreduce settings unless you change them directly.

Do you know how I can change those settings?  I'm still pretty suspicious that there is only one mapper running though,given the consistency of the benchmark results compared with local mode.  If that's true, changing the settings might not be enough.
 
Try following the tracking link in the mapreduce outputs (something like "The url to track the job: http://172.31.20.187:9046/proxy/application_1426893407999_0001/") and see how many of them are running at the same time. 
 
I did try that link, but it doesn't work (browser error). I'm not sure if this is an AMS limitation or if I don't have everything set up correctly (security settings etc.)

Since you used to.dfs to distribute the data across the cluster, it's likely it all ended up in one place (it's a very small dataset). So the report that 29 map tasks were data-local should start to look suspicious. The second clue is that there were 3 map input records - again, tiny dataset gets split to tiny fragments, not really the what hadoop was mean to figure out how to do on its own. 
 
Here is where things get fuzzy for me.  In this case we have input (the sequence of ID's to process) which is tiny, and data, which is larger -- still not huge by big data standards, but if I extend this problem to its intended application it starts to approach that.  For now, I am relying on rmr to replicate the environment with the data in memory, in the future, that may need to be on HDFS.  Would this make a difference?  In either case, the input would still be a short list.  If I am understanding this correctly, that seems to be a big limitation on how rmr works, i.e. the "input" needs to large in order to invoke multiple mappers, even if the mappers need to access and produce big data that is only referenced by the input.  In other words, rmr handles big_data in and big_data out, but not small_data in and big_data out -- which I would think is a common use case.  Apologies in advance if I'm off base with that conclusion -- if so can you straighten me out?  

If my understanding is correct, a workaround might be to reformulate the input so that it is looks bigger to rmr.  For example I could split the data data.frame into 25, one for each id, and create a big data object reference for each one using to.dfs, and then pass that array of objects to mapreduce as input?  Is that worth a try, or am I off in left field?


That said, log analysis is nice but the only way to know for sure is to track them while they run. 

Another option is to use custom counters in your mapper, to indicate how many times the map function was called. I would guess in this case it was 3 (leaving lots of excess capacity on the cluster), but use your own counters (using the increment.counter function) to make sure. 

I'll look into this as well.  Thanks for the tip 

Dave 

Saar Golde

unread,
Mar 27, 2015, 7:58:51 AM3/27/15
to rha...@googlegroups.com
One possibility of creating an input that hadoop will interpret as worthy of launching multiple mappers on is to create multiple files within a single folder, each with a single line of data that drives a single calculation. Using that folder as an input, with the appropriate input.format, might be useful. 
I doubt it will work on its own, though - it's worth trying to 'hint' to hadoop that you want more mappers by using 
backend.parameters = list(hadoop = list(D = "mapred.map.tasks=1"))
within the call to the mapreduce function

--
Reply all
Reply to author
Forward
0 new messages