|
| library("rmr2")
|
| reduce_func <- function(key, vals) { |
|
|
| write("in reduce func", stderr()) |
| |
| count = 0 |
| |
| for (v in vals) { |
| status_str = paste("record ", count, " out of ", length(vals), sep="") |
| status(status_str) |
| increment.counter('mapper_reducer', 'count', 1) |
| write(v, stderr()) |
| } |
|
|
| return(NULL) |
|
|
| } |
|
|
| map_func <- function(., vals) { |
|
|
| write("in map func", stderr()) |
|
|
| key_list = list(); |
| val_list = list(); |
|
|
| for (v in vals) { |
| increment.counter('mapper_counter', 'count', 1) |
| for (i in 1:N_reducers) { |
| key_list = c(key_list, i) |
| val_list = c(val_list, paste0(as.character(i), "|", v)) |
| } |
| } |
| |
| if(length(key_list) > 0) { |
| return(keyval(key_list, val_list)) |
| } else { |
| return(NULL) |
| } |
| } |
|
|
| inputFile = "/user/myname/test/f.txt" |
| outputFile = "/user/myname/test/test_rmr" |
|
|
| N_reducers = 500 |
|
|
| |
| opt = rmr.options(backend="hadoop", |
| backend.parameters=list(hadoop=list(D=paste0("mapreduce.job.reduces=", N_reducers))) |
| ) |
|
|
| mapreduce(input=inputFile, input.format="text", |
| output=outputFile, output.format=make.output.format("csv", sep="|"), |
| map=map_func, |
| reduce=reduce_func, |
| vectorized.reduce=FALSE) |