library("rmr2") | |
reduce_func <- function(key, vals) { | |
write("in reduce func", stderr()) | |
count = 1 | |
for (v in vals) { | |
status_str = paste("record ", count, " out of ", length(vals), sep="") | |
status(status_str) | |
count = count + 1 | |
increment.counter('mapper_reducer', 'count', 1) | |
write(v, stderr()) | |
} | |
if(length(key) > 0) { | |
return(keyval(key, vals)) | |
} else { | |
return(NULL) | |
} | |
} | |
map_func <- function(., vals) { | |
write("in map func", stderr()) | |
key_list = list(); | |
val_list = list(); | |
for (v in vals) { | |
increment.counter('mapper_counter', 'count', 1) | |
for (i in 1:N_reducers) { | |
key_list = c(key_list, i) | |
val_list = c(val_list, paste0(as.character(i), "|", v)) | |
} | |
} | |
if(length(key_list) > 0) { | |
return(keyval(key_list, val_list)) | |
} else { | |
return(NULL) | |
} | |
} | |
N_reducers = 500 | |
opt = rmr.options(backend="hadoop", | |
backend.parameters=list(hadoop=list(D=paste0("mapreduce.job.reduces=", N_reducers))) | |
) | |
small.ints = to.dfs(1:5) | |
mapreduce(input=small.ints, | |
map=map_func, | |
reduce=reduce_func, | |
vectorized.reduce=FALSE) |
...
...
Hi Antonio,Thanks for your reply.First, I want to clarify the difference of two posts. The previous post was initialized for the inconsistency of the rmr2 version. Like I said in that post, I would report the results once I have consistent rmr2. The followup threads are similar to this post, but using hdfs input and output. From reading other people's post, file reading and writing might be a cause. In this post, I avoid using file operation, using to.dfs instead. With the error 2 and 3, I think creating a new post will be better and clearer for a comprehensive discussion.
Back to your answers.1. In this piece of code, there is no logical variable, that is why I am wondering where "logical" data type is coming from.
2. Follow you suggestion, I turned # of reducers to 50, # of small.ints to to.dfs(1:100), then everything is good. If I increased the # of reducers to 100, the job can be successful, but I saw some attempts with errors. Once # of reducers reached 200, the whole job failed because there are too many failed attempts.
I think the error might happen at the merge stage at the reducer because of deserialization or other IO problems.
From what I observed, it seemed there is a sweet spot for # of reducers. What is your advise? Should this number depends on the volume of data or just a number?
- The number of reducers is best set to be the number of reduce slots in the cluster (minus a few to allow for failures). This allows the reducers to complete in a single wave.
--
post: rha...@googlegroups.com ||
unsubscribe: rhadoop+u...@googlegroups.com ||
web: https://groups.google.com/d/forum/rhadoop?hl=en-US
---
You received this message because you are subscribed to the Google Groups "RHadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rhadoop+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
...
...
Hi Antonio,
The code using very large number of reducers is coming from a real application. Here is the scenario:Step 1. Learn a model.Step 2. Apply learned model to for example 10M records. However, we can not apply the model to 10M records at the same time, like what linear regression does. We have to apply the model to records one at a time. Since number of mapper is controlled by split size, the number of mapper is generally small. We passed the data to reducers without doing anything and let reducer do the real work. The more reducers the faster jobs can be done.
Another questions:1. What partitioner rmr2 uses? The default one which hadoop uses or our customized partitioner.
2. I used rmr2 version 3.1.0 before and don't remember have such errors. Is it v3.1.0 more stable than v3.3.1?
--