I am using rmr2(2.3.0) with CDH v4 and getting Broken Pipe error after mapper is done processing the data. I gone through the related posts and tried a couple things.
When there is a reducer configured, the job is successful. Also the entire job with reducer is successful, when there mapper returns a single keyval. So I guess I must be doing something wrong in constructing the keyval or it is really a memory issue. Any help is greatly appreciated.
I attached the mapper code snippet, GC log, keyval returned by mapper, error log. There are 5 mappers and 1 reducer tasks configured. The io.sort.mb setting is 237
I am very new to R. Any suggestions to improve the efficiency of the below code are welcome.
Code snippet (:
for(rc in 1:2) {
row.keyval <- handleRow(v[rc]);
if(rc == 1) {
row.key <- row.keyval[1]
row.value.list <- rbind(row.keyval[[2]])
} else {
row.value.list <- rbind(row.value.list, row.keyval[[2]])
}
}
kv <- keyval(row.key, row.value.list);
rmr.str(kv)
rmr.str(gc())
return (kv)
handleRow function constructs the return value like this:
kv <- list(2)
kv[1] <- row.key;
kv[[2]] <- data.frame(id =
cell.id, value = krige.prediction, wkt = input[3], stringsAsFactors = FALSE)
return (kv)
GC log:
Map. GC log START
Dotted pair list of 6
$ : language (function() { invisible(if (is.null(formals(load)$verbose)) load("./rmr-local-env10a07be32ef") else load("./rmr-local-env10a07be32ef", ...
$ : language rmr2:::map.loop(map = map, keyval.reader = input.reader(), keyval.writer = if (is.null(reduce)) { output.writer() ...
$ : language as.keyval(map(keys(kv), values(kv)))
$ : language is.keyval(x)
$ : language map(keys(kv), values(kv))
$ : language rmr.str(gc())
gc()
num [1:2, 1:6] 8.19e+05 8.37e+05 4.38e+01 6.40 1.17e+06 ...
- attr(*, "dimnames")=List of 2
..$ : chr [1:2] "Ncells" "Vcells"
..$ : chr [1:6] "used" "(Mb)" "gc trigger" "(Mb)" ...
Map. GC log END
keyval returned by mapper
Dotted pair list of 6
$ : language (function() { invisible(if (is.null(formals(load)$verbose)) load("./rmr-local-env10a07be32ef") else load("./rmr-local-env10a07be32ef", ...
$ : language rmr2:::map.loop(map = map, keyval.reader = input.reader(), keyval.writer = if (is.null(reduce)) { output.writer() ...
$ : language as.keyval(map(keys(kv), values(kv)))
$ : language is.keyval(x)
$ : language map(keys(kv), values(kv))
$ : language rmr.str(kv)
kv
List of 2
$ key:List of 1
..$ : chr "7585:665f31e0-6f80-4997-81db-1cd40cc954c5:f9a60452-8e12-43be-a1aa-54c3f921a9b2:6e7dfdaa-d73a-4902-9684-eb0a7e81b3e8:Research:So"| __truncated__
$ val:'data.frame': 2 obs. of 3 variables:
..$ id : chr [1:2] "1036049203895208635" "1036049203895208636"
..$ value: num [1:2] 47.8 43.9
..$ wkt : chr [1:2] "POLYGON ((-64.42166666666667 -32.55305555555556, -64.42157407407407 -32.55305555555556, -64.42157407407407 -32.552962962962965,"| __truncated__ "POLYGON ((-64.42157407407407 -32.55305555555556, -64.42148148148148 -32.55305555555556, -64.42148148148148 -32.552962962962965,"| __truncated__
Error log:
015-03-10 08:03:44,158 INFO org.apache.hadoop.mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2015-03-10 08:03:44,165 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 237
2015-03-10 08:03:44,443 INFO org.apache.hadoop.mapred.MapTask: data buffer = 188869520/236086896
2015-03-10 08:03:44,443 INFO org.apache.hadoop.mapred.MapTask: record buffer = 621280/776601
2015-03-10 08:03:44,492 INFO org.apache.hadoop.streaming.PipeMapRed: PipeMapRed exec [/usr/bin/Rscript, ./rmr-streaming-map10a06b1c6e9b]
2015-03-10 08:03:44,544 INFO org.apache.hadoop.streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2015-03-10 08:03:44,546 INFO org.apache.hadoop.streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s]
2015-03-10 08:03:44,563 INFO org.apache.hadoop.streaming.PipeMapRed: R/W/S=100/0/0 in:NA [rec/s] out:NA [rec/s]
2015-03-10 08:03:49,253 WARN org.apache.hadoop.streaming.PipeMapRed: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.typedbytes.TypedBytesInput.readRawBytes(TypedBytesInput.java:212)
at org.apache.hadoop.typedbytes.TypedBytesInput.readRaw(TypedBytesInput.java:152)
at org.apache.hadoop.streaming.io.TypedBytesOutputReader.readKeyValue(TypedBytesOutputReader.java:51)
at org.apache.hadoop.streaming.PipeMapRed$MROutputThread.run(PipeMapRed.java:418)
2015-03-10 08:03:49,478 INFO org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done
2015-03-10 08:03:49,479 INFO org.apache.hadoop.streaming.PipeMapRed: PipeMapRed failed!
2015-03-10 08:03:49,485 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2015-03-10 08:03:49,490 WARN org.apache.hadoop.mapred.Child: Error running child
java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:376)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:572)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:136)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
Again thanks for your time.