Hello RHadoop experts,
My program has several MR functions chained in series. Output of one is used by the next as the input. The error occurs on the 3rd MR function. 1st and 2nd MR functions are working just fine and their output (HDFS) files look perfect. Seems like I am doing something funny with the 3rd MR function. Please note that the entire program runs fine in local mode but breaks down throwing an error stating failed reduce tasks. Pointers to possible causes of error would be greatly appreciated.
The MR code looks like this -
correlatedPairs = mapreduce(
input = dCustomerPairs,
output = file.path(hdfs.output.dir, 'correlated.csv'),
output.format = make.output.format("csv", sep=","),
map = function(k, v){
keyval(v, "rho") # rho is just a dummy value
},
reduce = function(k, v) {
ts1 = getSalesDataByCustomerIdFromDfs(as.numeric(k[1]))
ts2 = getSalesDataByCustomerIdFromDfs(as.numeric(k[2]))
if(length(ts1) == length(ts2)){
rho = cor(ts1, ts2)
} else {
rho = 0
}
if(rho >= MAX_RHO_LIMIT) keyval(k, rho)
}
)
The MR function can be located on line number 200 in the attached
source in github. The
README.md should give some idea about what I am trying to achieve.