io bug at the reduce merge stage

45 views
Skip to first unread message

David Wang

unread,
Mar 10, 2015, 3:17:50 AM3/10/15
to rha...@googlegroups.com
Hi,

I am wondering how rmr2 sent data to each reducer? Below is a simple pieces of code. My idea is the following:
1. Input 1:5
2. At the map stage, for each value (1 to 5), add a key (1 to 500). At the end of mapper stage, we will have key-value pairs:
(1, 1|1) (1, 1|2) ... (1, 1|5)
(2, 2|1) (2, 2|2) ... (2, 2|5)
...
(500, 500|1) (500, 500|2) ... (500, 500|5)
3. At the reduce stage, just print values for each group. For example, for key 398, the reducer should print 
398|1
398|2
398|3
398|4
398|5

When I run the following code, I have the following errors:
1. "Unsupported type: 149"
2. "!is.null(template) is not TRUE"
3. For some reducers, they receive 5 records. However, some reducers seem not receive any records.

I am using rmr2 version 3.3.1.
May I ask in what situation, rmr2 generate "Unsupported type: 149" and "!is.null(template) is not TRUE" error.

Thanks,

David

library("rmr2")
reduce_func <- function(key, vals) {
write("in reduce func", stderr())
count = 1
for (v in vals) {
status_str = paste("record ", count, " out of ", length(vals), sep="")
status(status_str)
count = count + 1
increment.counter('mapper_reducer', 'count', 1)
write(v, stderr())
}
if(length(key) > 0) {
return(keyval(key, vals))
} else {
return(NULL)
}
}
map_func <- function(., vals) {
write("in map func", stderr())
key_list = list();
val_list = list();
for (v in vals) {
increment.counter('mapper_counter', 'count', 1)
for (i in 1:N_reducers) {
key_list = c(key_list, i)
val_list = c(val_list, paste0(as.character(i), "|", v))
}
}
if(length(key_list) > 0) {
return(keyval(key_list, val_list))
} else {
return(NULL)
}
}

N_reducers = 500
opt = rmr.options(backend="hadoop",
backend.parameters=list(hadoop=list(D=paste0("mapreduce.job.reduces=", N_reducers)))
)
small.ints = to.dfs(1:5)

mapreduce(input=small.ints,


map=map_func,
reduce=reduce_func,
vectorized.reduce=FALSE)

Antonio Piccolboni

unread,
Mar 10, 2015, 2:36:16 PM3/10/15
to rha...@googlegroups.com
 Since typecode 149 is the latest addition to the set of legal values, I normally ask people to verify they don't have outdated or multiple copies of rmr2 on the cluster. But 149 is associated with logical,  I don't see any logical key or value in your code, please correct me if I am wrong, so I think this is less likely in your case. The second error points to a problem with deserialization and may be related to the third problem, which is a very excessively high number of reducers. If you set the number of reducer to about the number of available reducers, it would probably work. That said, I will look into making the deserialization machinery more resilient to extreme parameter settings.

Antonio Piccolboni

unread,
Mar 10, 2015, 3:21:32 PM3/10/15
to rha...@googlegroups.com
It seems like you posted the same issue twice (" typebytes_reader error: unsupported type 149" ) Could you please enlighten me as to the difference between the two issues or just quit doing that, for the sake of efficiency and fairness to other users? Thanks
...

David Wang

unread,
Mar 10, 2015, 5:17:35 PM3/10/15
to rha...@googlegroups.com
Hi Antonio,

Thanks for your reply.

First, I want to clarify the difference of two posts. The previous post was initialized for the inconsistency of the rmr2 version. Like I said in that post, I would report the results once I have consistent rmr2. The followup threads are similar to this post, but using hdfs input and output. From reading other people's post, file reading and writing might be a cause. In this post, I avoid using file operation, using to.dfs instead. With the error 2 and 3, I think creating a new post will be better and clearer for a comprehensive discussion.

Back to your answers.

1. In this piece of code, there is no logical variable, that is why I am wondering where "logical" data type is coming from.

2. Follow you suggestion, I turned # of reducers to 50, # of  small.ints to to.dfs(1:100), then everything is good. If I increased the # of reducers to 100, the job can be successful, but I saw some attempts with errors. Once # of reducers reached 200, the whole job failed because there are too many failed attempts.

I think the error might happen at the merge stage at the reducer because of deserialization or other IO problems.

From what I observed, it seemed there is a sweet spot for # of reducers. What is your advise? Should this number depends on the volume of data or just a number?

Thanks again.

David 
...

Antonio Piccolboni

unread,
Mar 10, 2015, 7:51:11 PM3/10/15
to RHadoop Google Group
On Tue, Mar 10, 2015 at 2:17 PM, David Wang <ww2265c...@gmail.com> wrote:
Hi Antonio,

Thanks for your reply.

First, I want to clarify the difference of two posts. The previous post was initialized for the inconsistency of the rmr2 version. Like I said in that post, I would report the results once I have consistent rmr2. The followup threads are similar to this post, but using hdfs input and output. From reading other people's post, file reading and writing might be a cause. In this post, I avoid using file operation, using to.dfs instead. With the error 2 and 3, I think creating a new post will be better and clearer for a comprehensive discussion.

Thanks for the clarification. I had missed the difference.  

Back to your answers.

1. In this piece of code, there is no logical variable, that is why I am wondering where "logical" data type is coming from.
 

2. Follow you suggestion, I turned # of reducers to 50, # of  small.ints to to.dfs(1:100), then everything is good. If I increased the # of reducers to 100, the job can be successful, but I saw some attempts with errors. Once # of reducers reached 200, the whole job failed because there are too many failed attempts.

I think the error might happen at the merge stage at the reducer because of deserialization or other IO problems. 

From what I observed, it seemed there is a sweet spot for # of reducers. What is your advise? Should this number depends on the volume of data or just a number?


Per Cloudera recommendations

  • The number of reducers is best set to be the number of reduce slots in the cluster (minus a few to allow for failures). This allows the reducers to complete in a single wave.

That said, I will try to make the serialization more resilient to extreme settings like #of reducers == number of reduce input records, but I wouldn't count on it. It would help if you convinced me this is an important use case, but to fire up a java process and an R interpreter to process one record is not a typical hadoop pattern and I am not sure we are going to make a dedicated effort to support it.


Antonio 
 

--
post: rha...@googlegroups.com ||
unsubscribe: rhadoop+u...@googlegroups.com ||
web: https://groups.google.com/d/forum/rhadoop?hl=en-US
---
You received this message because you are subscribed to the Google Groups "RHadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rhadoop+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Antonio Piccolboni

unread,
Mar 10, 2015, 8:58:04 PM3/10/15
to rha...@googlegroups.com, ant...@piccolboni.info
I'd say the same answer applies also to your other post, even if I am not sure why the problem manifests itself the way it does. The expected error is template not found, not the type code issue, but I suspect if you take the number of reducer to a more normal value it will go away.


Antonio
...

David Wang

unread,
Mar 10, 2015, 9:00:46 PM3/10/15
to rha...@googlegroups.com, ant...@piccolboni.info
Hi Antonio,

The code using very large number of reducers is coming from a real application. Here is the scenario:

Step 1. Learn a model.
Step 2. Apply learned model to for example 10M records. However, we can not apply the model to 10M records at the same time, like what linear regression does. We have to apply the model to records one at a time. Since number of mapper is controlled by split size, the number of mapper is generally small. We passed the data to reducers without doing anything and let reducer do the real work. The more reducers the faster jobs can be done.

Another questions:

1. What partitioner rmr2 uses? The default one which hadoop uses or our customized partitioner.

2. I used rmr2 version 3.1.0 before and don't remember have such errors. Is it v3.1.0 more stable than v3.3.1?

Thanks,

David 

...

David Wang

unread,
Mar 10, 2015, 9:09:03 PM3/10/15
to rha...@googlegroups.com, ant...@piccolboni.info
Yes, I agree. If I trim the number of reducers to a small number, like 50, the errors go away. However, I am trying to find what this "normal value" upper bound is since I need more reducers based on my application. 

Antonio Piccolboni

unread,
Mar 10, 2015, 9:21:04 PM3/10/15
to RHadoop Google Group
On Tue, Mar 10, 2015 at 6:00 PM, David Wang <ww2265c...@gmail.com> wrote:
Hi Antonio,

The code using very large number of reducers is coming from a real application. Here is the scenario:

Step 1. Learn a model.
Step 2. Apply learned model to for example 10M records. However, we can not apply the model to 10M records at the same time, like what linear regression does. We have to apply the model to records one at a time. Since number of mapper is controlled by split size, the number of mapper is generally small. We passed the data to reducers without doing anything and let reducer do the real work. The more reducers the faster jobs can be done.

Sounds like you have 10M cpu cores available. Congratulations, you have all my geek envy. In the real world, just set the number reducers to total number of cores for CPU bound jobs. 
 

Another questions:

1. What partitioner rmr2 uses? The default one which hadoop uses or our customized partitioner.

Default (hash)
 

2. I used rmr2 version 3.1.0 before and don't remember have such errors. Is it v3.1.0 more stable than v3.3.1?

It's hard to tell,  3.3 has more features and more fixes. 

Antonio

 

--
Reply all
Reply to author
Forward
0 new messages