Problem using a combiner when specifying output.format

84 views
Skip to first unread message

Saar

unread,
Jun 27, 2012, 2:50:31 PM6/27/12
to rha...@googlegroups.com
I've been running into problems trying to get some mapreduce code to run when using a combiner and specifying an output format. 
The whole code is at the end of this post (some sample code I wrote for experimenting with output formats), but the gist of the problem is that the following code runs:

mapreduce(
  input
= myData,
  output
= dataOutCsv,
  map
= myMapper,
  reduce
= to.reduce(identity),
  combine
=NULL,
  output
.format= make.output.format(format = csv.output.format.2, mode = c("text")))



But adding the combiner causes the code to fail

mapreduce(
  input
= myData,
  output
= dataOutCsv,
  map
= myMapper,
  reduce
= to.reduce(identity),
  combine
=TRUE,
  output
.format= make.output.format(format = csv.output.format.2, mode = c("text")))

Frustratingly enough, the problem doesn't occur when using the local backend. I tried to dig a bit into the code, and couldn't find any obvious reason for this to fail...

The current workaround is to have a mapreduce job that does all the work and saves output in a native format, then use a second map-only mapreduce job to get the output out, but unless I'm missing something, this is probably a bug. 

Thanks, 

-Saar

Full code: 
######################################################
##
## mapreduce_IO_1.1
##
## testing outputting data saved in R to a csv file
##  on hdfs
##
##
######################################################


######################################################
##
## preamble
##
######################################################


##
## packages
##
require(rmr)


## set local/hadoop backend
local <- FALSE


if (local) {
 
  rmr
.options.set(backend = "local")
 
  dataDir
<- "~/IO_tests"
 
} else {
 
  rmr
.options.set(backend = "hadoop")
 
  dataDir
<- "IO_test"
 
}


dataOutCsv
<- file.path(dataDir, paste("IOtest_", round(as.numeric(Sys.time())), ".csv", sep=""))


######################################################
##
## get some data into hdfs
##
######################################################


csv
.output.format.2 <- function(k, v) {
 
  paste
(
    paste
(as.character(unlist(k)), collapse=","),
    paste
(as.character(unlist(v)), collapse = ","),
    sep
= ",")
 
}


myIris
<- iris
myIris$Species
<- as.character(myIris$Species)
myData
<- to.dfs(myIris)


myMapper
<- function(k, v) {
 
  myKey
<- c(v[["Species"]], runif(1))
  myValue
<- c(v[["Sepal.Length"]], v[["Sepal.Width"]], v[["Petal.Length"]], v[["Petal.Width"]])


 
return(keyval(myKey, myValue))
 
}


mapreduce
(
  input
= myData,
  output
= dataOutCsv,
  map
= myMapper,
  reduce
= to.reduce(identity),
  combine
=TRUE,
  output
.format= make.output.format(format = csv.output.format.2, mode = c("text")))



Antonio Piccolboni

unread,
Jun 28, 2012, 11:51:07 AM6/28/12
to rha...@googlegroups.com
Hi,
I am not 100% sure what is going on here but I noticed that you specify make input format with only three arguments, but it's not a guarantee that the default for the third will work with any combination, and in particular having a mode of text and the default sequence file format is not tested and doesn't make a lot of sense to me. You may want to set that to NULL. I guess we could have a bit more logic in there that selects that default based on mode, it's a few lines of code but it is just not clear what cases to cover. The idea is that there is a easy version of make.output.format, with a string format argument, in your case that would be make.input.format("csv", sep = ",') and the advanced version with the three arguments, a function, a mode (text or binary) and a java class. The steps are as follows
your keyval pair -> R format function -> some output machinery that depends on the mode -> a java class to read the data back into hadoop.

Unfortunately the last  step is missing in the local implementation, as we don't have any java components there to execute the class. It is not a deal breaker, you can do almost anything without, but it's the biggest divergence between the two backends. I understand the frustration, but we can't let perfection get in the way of the good and the two backends are almost equivalent, with the notable exception of these IO classes that are simply ignored. 

Now on why  the combiner breaks things. When you are using the default java class SequenceFileFormat or some such you are expected to generate typedbytes on the R side, not text. What I think is happening here is that if you do that in local mode, the java side is ignored so no problem. When you are using hadoop and the combiner is off, java doesn't understand a thing of what you are writing out but it doesn't matter because it doesn't have to read it back in. When you turn on the combiner, the data has to be read back in to go into the reduce phase and if java can't make the distinction between key and value, which it can't because you didn't use the typedbytes format, it will protest.

So in practice I suggest you use
make.output.format("csv", sep = ",")
just to see if it works with the combiner on. If that does, you may still want to  use your format because of speed (this has been improved in 1.3, but for now the csv reader and writer are pretty sluggish). In that case, just add the argument streaming.format = NULL which will tell streaming to revert to its default behavior, which is text format. If neither of this works, I suspect we may be looking at a streaming bug. I don't have your data so please give it a spin and let me know what you find. I am traveling so I may not be as responsive as I would like, sorry about that.


Antonio

Saar

unread,
Jun 28, 2012, 4:01:33 PM6/28/12
to rha...@googlegroups.com
The data I'm using it the iris dataset in the datasets package - I think it's included with most R distributions by default. 

I understand very little about the technical aspects of the problem, but the design considerations (not letting imperfection/incompleteness hold back the project) makes perfect sense. 

Trying out the alternative csv output format yielded two different types of bad news:

- It still fails when using  make.output.format("csv", sep = ",") and combine=TRUE (so - probably some bug somewhere?)

-  make.output.format("csv", sep = ",")  has some undesired behaviors when the keys are 'complicated'. Undesired behaviors:
* (looks like a bug to me) - when a key has a couple of components (for example c("hello", "world') ) and a value is a vector of values ( for example c(1,2,3,4) ) I get all combinations of key and value in the same data row. I think that's why I started playing with my own output formats to begin with. 
* factor variables are converted to integers (instead of the more informative "as.character" representation)

Here is an example of the problem with 'complicated' keys and the text output format: the output alternates between recycled vector of keys and the values. 
> rmr:::text.output.format(c("hello", "world"), c(1,2,3,4))
[1] "hello\t1world\t2hello\t3world\t4"

Suggestions would be appreciated (as a non-urgent manner - I have a workaround which is breaking it apart to two mapreduce jobs at the moment)


Thanks!

-Saar

Antonio Piccolboni

unread,
Jun 29, 2012, 3:58:55 PM6/29/12
to rha...@googlegroups.com
Entered issue #113 on github to track progress on this issue.
Reply all
Reply to author
Forward
0 new messages