Hi Group,
I am trying to perform a map reduce job using rmr2 on a hadoop cluster. I have .csv dataset of 20 GB uploaded on hdfs. However, I am trying to perform this exercise on a smaller dataset in order to make debug easier.
Everything works fine on my hadoop configuration as I am able to run examplescripts available on github.
My R Script has the following form (is applied on a small dataset cars.csv):
I am trying to compute the mean value based on my key.
rm(list=ls(all=TRUE))
Sys.setenv(HADOOP_HOME="/opt/mapr/hadoop/hadoop-0.20.2/")
Sys.setenv(HADOOP_CMD="/opt/mapr/hadoop/hadoop-0.20.2/bin/hadoop")
Sys.setenv(HADOOP_CONF="/opt/mapr/hadoop/conf")
Sys.setenv(HADOOP_STREAMING="/opt/mapr/hadoop/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-dev-streaming.jar")
Sys.setenv(HADOOP_CONF_DIR="/opt/mapr/hadoop/hadoop-0.20.2/conf/")
Sys.setenv(HIVE_HOME="/opt/mapr/hive/hive-0.13/bin")
Sys.setenv(RHIVE_FS_HOME="/opt/mapr/hive/hive-0.13")
Sys.setenv(RHIVE_FS_HOME="/user/uocom/rhive/")
Sys.getenv("HADOOP_CMD")
library(rmr2);library(rhdfs);library(plyrmr);
options(digits=13)
hdfs.init()
rmr.options(backend = 'local')
# Hdfs data path
hdfs.data.root = '/user/XXXX/uploadCars'
# Define data path
hdfs.data = file.path(hdfs.data.root, 'cars.csv')
hdfs.out.root = hdfs.data.root
hdfs.out = file.path(hdfs.out.root, 'out')
# Make input format
csv.input.format = make.input.format(format='csv', mode='text', streaming.format = NULL, sep=',',
col.names = c("marque","mpg","cyl","disp","hp","drat","wt","qsec","vs","am","gear","carb"),
stringsAsFactors=F)
# Create output directory
hdfs.mkdir(hdfs.out)
# Define Map Reduce Functions
# Mapper takes as key the cylinder and returns a number of specific columns
mapper<-function(k,v){
keyval(v$cyl, v[,c(5:7,12)])
}
# Function used to compute mean value by carb
temp1 <- function(v){
library(data.table)
v0 <- data.table(v)
v1<-v0[,lapply(.SD,as.numeric)]
r1<-as.data.frame(v1[,lapply(.SD,mean),keyby=carb])
return(r1)
}
reducer cbonds resesults
reducer<- function(k,v){
v1 <- do.call(cbind,v)
r1 <- temp1(v1)
keyval(k,r1)
}
test<-function (input) {
mapreduce(input = input,
# output = output,
input.format = csv.input.format,
map = mapper,
reduce = reducer,
backend.parameters = list(
hadoop = list(D = "mapred.reduce.tasks=2")),
verbose=T)
}
system.time(outPut<-(from.dfs(test(hdfs.data))))
Error in file(fname, paste(if (is.read) "r" else "w", if (format$mode == :
cannot open the connection
I guess that I am doing a fundamental mistake here. I would appreciate it if you could provide assistance.
Thanks,
Kostas