file size or record count threshold in mapper

95 views
Skip to first unread message

Derrick Oswald

unread,
Nov 17, 2014, 3:04:57 PM11/17/14
to rha...@googlegroups.com
Using a very simple program (see below) we're running into an issue at a 10MB boundary (approximately).
Does anyone know what hadoop parameters might need to be adjusted?

When executing the R based (Rhadoop) map-reduce shown below over a text file (csv), when the file size reaches a threshold, the mapper fails with a broken pipe. Specifically, when less than 80156 lines of the input file (10,193,505 bytes) are supplied as input, the program works as expected, and with more than 80187 lines of the file (10,197,527 bytes) (or the entire file) the program doesn't work and has the following error messages: STDERR: Error in value[[3L]](cond) : invalid multibyte string at '<dc> 220 ' SYSLOG: 2014-11-17 16:34:53,308 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=10000/0/0 in:5000=10000/2 [rec/s] out:0=0/2 [rec/s] 2014-11-17 16:34:53,384 INFO [Thread-13] org.apache.hadoop.streaming.PipeMapRed: Records R/W=11303/1 2014-11-17 16:34:54,062 WARN [main] org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Broken pipe 2014-11-17 16:34:54,063 INFO [Thread-14] org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done 2014-11-17 16:34:54,064 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed failed! java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533) at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1594) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163) R Program... #' Simple test of text processing with rhadoop. #' run = function (inputdirectory = "/user/vcn_osd/input") { mapper = function (key, line) { keyval (as.numeric (line[[1]]), as.numeric (line[[11]])) } reducer = function (key, values) { keyval (key, sum (values)) } out = mapreduce ( input = inputdirectory, input.format = make.input.format ("csv", sep = ";", fileEncoding = "ISO8859-1"), map = mapper, reduce = reducer, verbose = TRUE ) ret = from.dfs (out) print (ret) }

As far as I can tell the mapper is failing when constructing the key-value pairs and I'm wondering if some parameter isn't tweaked correctly.

Derrick Oswald

unread,
Nov 19, 2014, 8:34:25 AM11/19/14
to rha...@googlegroups.com

Solved.
The solution is twofold.
1) Specify additional information to the mapreduce input.format
2) Redirect output to a directory, otherwise it fails in the reduce phase.

The final working program is:

#' First test on real data using RHadoop
#'
#' Simple test of text processing with rhadoop.
#'
run = function (inputdirectory = "/user/vcn_osd/input", outputdirectory = "/user/vcn_osd/output", vectorized.reducer = TRUE)
{
  mapper = function (key, line)
  {
    increment.counter ("Derrick", "lines", increment = nrow (line))
    keyval (as.numeric (line[[1]]), as.numeric (line[[11]]))
  }

  reducer = function (keys, values)
  {
    increment.counter ("Derrick", "values", increment = length (values))
    if (vectorized.reducer)
    {
      values = split (values, as.data.frame (keys), drop = TRUE)
      keyval (names (values), vsum (values))
    }
    else
      keyval (keys, sum (values))
  }

  out = mapreduce (
    input = inputdirectory,
    input.format = make.input.format (
      format = "csv", sep = ";", fileEncoding = "ISO8859-1", encoding = "ISO8859-1", stringsAsFactors = FALSE,
      col.names = c ("INTERNAL_LSB_ID", "INTERNAL_LSB_ID_2", "STATION_NAME", "YEAR", "MONTH", "DAY", "HOUR", "MINUTE", "SECOND", "SEASON", "VALUE", "UNIT", "QUALITY"),
      colClasses = c ("integer", "integer", "character", "integer", "integer", "integer", "integer", "integer", "integer", "character", "numeric", "character", "character")),
    output = outputdirectory,
    output.format = make.output.format (format = "csv"),
    map = mapper,
    combine = FALSE,
    reduce = reducer,
    vectorized.reduce = vectorized.reducer,
    verbose = TRUE
  )
}


Saar Golde

unread,
Nov 19, 2014, 10:41:39 AM11/19/14
to rha...@googlegroups.com
The first part of the solution makes sense to me - it seems like this is a file encoding problem, and the input.format parameters would be the place to solve them. 
Are you sure the second part of your solution is addressing the same problem?

--
post: rha...@googlegroups.com ||
unsubscribe: rhadoop+u...@googlegroups.com ||
web: https://groups.google.com/d/forum/rhadoop?hl=en-US
---
You received this message because you are subscribed to the Google Groups "RHadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rhadoop+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Antonio Piccolboni

unread,
Nov 19, 2014, 12:32:10 PM11/19/14
to RHadoop Google Group
My guess would be that the hadoop tmp is set to something not writeable under the conditions in which jobs are run. By setting an output directory explicitly, the hadoop tmp is not used anymore.

rmr.options("hdfs.tempdir")

set with

rmr.options(hdfs.tempdir = <some path> )

Thanks


Reply all
Reply to author
Forward
0 new messages