rmr2 - mapreduce - input .csv located on hdfs - error

249 views
Skip to first unread message

Konstantinos Mammas

unread,
Apr 25, 2015, 7:54:03 AM4/25/15
to rha...@googlegroups.com
Hi Group,

I am trying to perform a map reduce job using rmr2 on a hadoop cluster. I have .csv dataset of 20 GB uploaded on hdfs.  However, I am trying to perform this exercise on a smaller dataset in order to make debug easier.

Everything works fine on my hadoop configuration as I am able to run examplescripts available on github.

My R Script has the following form (is applied on a small dataset cars.csv):

I am trying to compute the mean value based on my key.

rm(list=ls(all=TRUE))

Sys.setenv(HADOOP_HOME="/opt/mapr/hadoop/hadoop-0.20.2/")  
Sys.setenv(HADOOP_CMD="/opt/mapr/hadoop/hadoop-0.20.2/bin/hadoop")
Sys.setenv(HADOOP_CONF="/opt/mapr/hadoop/conf")
Sys.setenv(HADOOP_STREAMING="/opt/mapr/hadoop/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-dev-streaming.jar")
Sys.setenv(HADOOP_CONF_DIR="/opt/mapr/hadoop/hadoop-0.20.2/conf/")
Sys.setenv(HIVE_HOME="/opt/mapr/hive/hive-0.13/bin")
Sys.setenv(RHIVE_FS_HOME="/opt/mapr/hive/hive-0.13")
Sys.setenv(RHIVE_FS_HOME="/user/uocom/rhive/")
Sys.getenv("HADOOP_CMD")


library(rmr2);library(rhdfs);library(plyrmr);

options(digits=13)
hdfs.init()

rmr.options(backend = 'local')

# Hdfs data path
hdfs.data.root = '/user/XXXX/uploadCars'

# Define data path
hdfs.data = file.path(hdfs.data.root, 'cars.csv')
hdfs.out.root = hdfs.data.root
hdfs.out = file.path(hdfs.out.root, 'out')

# Make input format
csv.input.format = make.input.format(format='csv', mode='text', streaming.format = NULL, sep=',',
                                         col.names = c("marque","mpg","cyl","disp","hp","drat","wt","qsec","vs","am","gear","carb"),
                                         stringsAsFactors=F)

# Create output directory
hdfs.mkdir(hdfs.out)

# Define Map Reduce Functions

# Mapper takes as key the cylinder and returns a number of specific columns
mapper<-function(k,v){
  keyval(v$cyl, v[,c(5:7,12)])
}

# Function used to compute mean value by carb
temp1 <- function(v){
  library(data.table)
  v0 <- data.table(v)
  v1<-v0[,lapply(.SD,as.numeric)]
  r1<-as.data.frame(v1[,lapply(.SD,mean),keyby=carb])
  return(r1)  
}

reducer cbonds resesults
reducer<- function(k,v){
  v1 <- do.call(cbind,v)
  r1 <- temp1(v1)
  keyval(k,r1)
}


test<-function (input) {
  mapreduce(input = input,
#             output = output,
            input.format = csv.input.format,
            map = mapper,
            reduce = reducer,
            backend.parameters = list( 
              hadoop = list(D = "mapred.reduce.tasks=2")),
verbose=T)
}


system.time(outPut<-(from.dfs(test(hdfs.data))))

Error in file(fname, paste(if (is.read) "r" else "w", if (format$mode == : cannot open the connection


I guess that I am doing a fundamental mistake here. I would appreciate it if you could provide assistance.

Thanks,

Kostas 

Antonio Piccolboni

unread,
Apr 27, 2015, 12:50:24 PM4/27/15
to rha...@googlegroups.com
I think you are using hdfs together with the local backend. Sorry if that's not made clear enough in the docs, but when using the local backend only local file system paths can be used. Only rmr2 and plyrmr are backend-aware, so that they switch from one file system to the other as one changes backend. rhdfs doesn't have a notion of swappable backend. It just works on hdfs. This could be what is making your program fail.


Antonio

Konstantinos Mammas

unread,
Apr 27, 2015, 2:12:18 PM4/27/15
to rha...@googlegroups.com
Hi Antonio,

Thank you for you response. Working on hadoop backend really solved this issue. 

Best Regards,

Kostas
Reply all
Reply to author
Forward
0 new messages