Using a very simple program (see below) we're running into an issue at a 10MB boundary (approximately). Does anyone know what hadoop parameters might need to be adjusted?
When executing the R based (Rhadoop) map-reduce shown below over a text file (csv), when the file size reaches a threshold, the mapper fails with a broken pipe. Specifically, when less than 80156 lines of the input file (10,193,505 bytes) are supplied as input, the program works as expected, and with more than 80187 lines of the file (10,197,527 bytes) (or the entire file) the program doesn't work and has the following error messages: STDERR: Error in value[[3L]](cond) : invalid multibyte string at '<dc> 220 ' SYSLOG: 2014-11-17 16:34:53,308 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=10000/0/0 in:5000=10000/2 [rec/s] out:0=0/2 [rec/s] 2014-11-17 16:34:53,384 INFO [Thread-13] org.apache.hadoop.streaming.PipeMapRed: Records R/W=11303/1 2014-11-17 16:34:54,062 WARN [main] org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Broken pipe 2014-11-17 16:34:54,063 INFO [Thread-14] org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done 2014-11-17 16:34:54,064 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed failed! java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533) at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163) R Program... #' Simple test of text processing with rhadoop. #' run = function (inputdirectory = "/user/vcn_osd/input") { mapper = function (key, line) { keyval (as.numeric (line[[1]]), as.numeric (line[[11]])) } reducer = function (key, values) { keyval (key, sum (values)) } out = mapreduce ( input = inputdirectory, input.format = make.input.format ("csv", sep = ";", fileEncoding = "ISO8859-1"), map = mapper, reduce = reducer, verbose = TRUE ) ret = from.dfs (out) print (ret) }
As far as I can tell the mapper is failing when constructing the key-value pairs and I'm wondering if some parameter isn't tweaked correctly.
Solved. The solution is twofold. 1) Specify additional information to the mapreduce input.format 2) Redirect output to a directory, otherwise it fails in the reduce phase.
The final working program is:
#' First test on real data using RHadoop #' #' Simple test of text processing with rhadoop. #' run = function (inputdirectory = "/user/vcn_osd/input", outputdirectory = "/user/vcn_osd/output", vectorized.reducer = TRUE) { mapper = function (key, line) { increment.counter ("Derrick", "lines", increment = nrow (line)) keyval (as.numeric (line[[1]]), as.numeric (line[[11]])) } reducer = function (keys, values) { increment.counter ("Derrick", "values", increment = length (values)) if (vectorized.reducer) { values = split (values, as.data.frame (keys), drop = TRUE) keyval (names (values), vsum (values)) } else keyval (keys, sum (values)) } out = mapreduce ( input = inputdirectory, input.format = make.input.format ( format = "csv", sep = ";", fileEncoding = "ISO8859-1", encoding = "ISO8859-1", stringsAsFactors = FALSE, col.names = c ("INTERNAL_LSB_ID", "INTERNAL_LSB_ID_2", "STATION_NAME", "YEAR", "MONTH", "DAY", "HOUR", "MINUTE", "SECOND", "SEASON", "VALUE", "UNIT", "QUALITY"), colClasses = c ("integer", "integer", "character", "integer", "integer", "integer", "integer", "integer", "integer", "character", "numeric", "character", "character")), output = outputdirectory, output.format = make.output.format (format = "csv"), map = mapper, combine = FALSE, reduce = reducer, vectorized.reduce = vectorized.reducer, verbose = TRUE ) }
--
post: rha...@googlegroups.com ||
unsubscribe: rhadoop+u...@googlegroups.com ||
web: https://groups.google.com/d/forum/rhadoop?hl=en-US
---
You received this message because you are subscribed to the Google Groups "RHadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rhadoop+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.