I've been working on building a Random Forest model on my modest AWS cluster (6 nodes each with 16 cores and 32G RAM) using Spark. My training set is pretty big (~30,000 rows and ~2,000 columns) so won't run on a single node. I had cobbled together some code to build the model as a Map Reduce job, and now I've hacked some code together to do this in Spark (see below). It takes about 15 sec to build a model on my cluster using this code (an equivalent implementation as a MapReduce job took about 15 mins).
I'm sure there is likely to be a better way of doing it than this, so please feel free to suggest improvements (or alternative approaches)!
# Load libraries
library(randomForest)
library(SparkR)
# Set spark memory allocation (important even though I set the spark executor and driver memories when initializing the Spark context, as without it I run into Java heap space errors)
Sys.setenv(SPARK_MEM="40g")
# Initialise Spark context
sc <- sparkR.init(master="yarn-client",sparkEnvir=list(spark.executor.memory="24g",spark.driver.memory="24g",spark.executor.instances="5",spark.executor.cores="16", spark.driver.maxResultSize="10g"))
# Pass randomForest package to all nodes
includePackage(sc,randomForest)
### Build randomForest models of 10 trees each for entire data set sliced into 50 chunks (so 500 trees in the forest)
# Load the column names of the training set
load("train.names.rda")
# Define RDD with 50 slices
rdd <- textFile(sc, "hdfs:<server_address>/user/spark/spark_training.csv",50)
# Define mapPartition function to build randomForest models
rf <- mapPartitions(rdd, function(input) {
# Parse the RDD into a data frame
df <- data.frame(do.call('rbind',strsplit(as.character(input),',',fixed=TRUE)))
# Set the data frame column names
names(df) <- train.names
# Fix the data type to numeric for all but the classification column
for (i in 1:(ncol(df)-1)){
df[,i] <- as.numeric(as.character(df[,i]))
}
# Get rid of quotation marks in classification column
df$MyClass <- as.factor(gsub("\"","", df$MyClass))
# Build the RF model
randomForest(df[,-ncol(df)],df[,ncol(df)],ntree=10)
})
# Build the models
output.rf<- collect(rf)
# Define function to split output list into individual models and combine into one uber model
uberRF <- function (x) {
j=1
rf.list <- list()
for (i in 1:(length(x)/18)){
temp <-x[j:(i*18)]
class(temp) <- "randomForest"
varname <- paste("rf",i,sep="")
assign(varname,temp)
j = j + 18
rf.list[[length(rf.list)+1]] <- temp
}
# Combine the models together
final.rf <- do.call(my_combine, rf.list)
final.rf
}
# Combine the RF models
forest <- uberRF(output.rf)