Hi,
I am new to the Rhadoop package and I have an issue where I am trying to control the inputs to my mappers. Let me detail the setup.
Main Objective:
I have a large number of individual files that I want each mapper to read (one file goes to only one mapper and one mapper gets only one file) and sample from. This sampled data is going to be given to the reducer to do further analysis. To simulate this setup I have created a small example.
Example Setup:
I am using the wordcount example. I have two data files called data.txt and data1.txt. I have stored these files on a HDFS location /user/root/wordcount/data/ . I have another file which lists the file names with the full path. I called it file_list.txt and it is stored in another hdfs location /user/root/wordcount/files/file_list.txt . I am providing the file_list.txt as input to the mapper such that mapper gets a line from the file (which is the location of a data file). The mapper uses hdfs.get to pull data to local directory and then processes it. The mapper generates the output which is given to the reducer. The reducer summarizes the wordcount.
Problem:
Ofcourse the above solution does not seem to work for me. The issue I am having is that one mapper gets both lines of input (data file locations), and process them in one mapper.
Is there a way I can force the mapper to get only one line of input? When I used the "hadoopStreaming" package in R, I had an option called "chunkSize" in hsLineReader. I could use this option to give only one line to the mapper.
Here is my code: (Edited to make it work)
Sys.setenv(HADOOP_HOME="/usr/lib/hadoop")
Sys.setenv(HADOOP_CMD="/usr/bin/hadoop")
library(rmr2)
library(rhdfs)
hdfs.init()
map <- function(k,lines) {
Sys.setenv(HADOOP_HOME="/usr/lib/hadoop") # do I need to do set this up again?
Sys.setenv(HADOOP_CMD="/usr/bin/hadoop")
library(rhdfs)
hdfs.init()
hdfs.get(lines,'.') #based on some debugging the length(lines) is 2. I would ideally like this to be 1
word_list <- list() # added to make it work
file_name <- basenames(lines)
for (fn in file_names) {
file_handle <- file(fn,'r')
lines <-readLines(file_handle)
words_list <- strpline(lines,"\\s")
words <- unlist(words_list)
word_list <- C(unlist(word_list),words)
}
return(keyval(word_list,1)
}
reduce <- function(word, counts) {
keyval(word, sum(counts))
}
wordcount <- function (input, output=NULL) {
mapreduce(input=input, output=output, input.format="text", map=map, reduce=reduce)
}
## read input file from folder wordcount/files
## save result in folder wordcount/out
## Submit job
hdfs.root <- 'wordcount'
hdfs.data <- file.path(hdfs.root, 'files')
hdfs.out <- file.path(hdfs.root, 'out')
out <- wordcount(hdfs.data, hdfs.out)
#output processing follow
..
...
....
Any help would be greatly appreciated.
Thanks,
Sudhamsh.