Reducers failing

290 views
Skip to first unread message

Jay Wineinger

unread,
Mar 12, 2013, 8:34:55 PM3/12/13
to dumbo...@googlegroups.com
Hi everyone,

I've been fighting with a job for the last day or two and I cannot get it to finish.  I'm using a remote HBase for input and output and I'm reading all of the data in one table and transforming the data for output to a different table in a different format. I should only get one row per key so I'm just using an identity mapper and then doing the data format gymnastics in the reducer.  The source table has about 38M rows.

I cannot, for the life of me, get this thing to complete.  I can't even get it past 5%.  When running a small number of reduce tasks (<= 500) not a single reducer would complete.  They all failed with java heap space out of memory errors.  When I bumped the number of reduce tasks to 10,000 I finally started getting some reducers to complete, but enough of them eventually fail to kill the job.   I'm running this on Amazon's EMR and I've played with different instance sizes to try and get around the memory errors.  As I'm writing this, the job is running with 50,000 reduce tasks on m2.2xlarge instances where each task should have a 3200MB heap limit (according to http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/HadoopMemoryDefault.html).  1600 reduces have completed successfully but there are already 200 failures, so I have little hope this one will make it to completion.

Every single one of the errors shows a broken pipe in write_string in typedbytes.py, but I'm assuming that is because the java process that called it has been killed due to the memory error. I've pasted the Last 8KB output of one of the failures below.

I'd appreciate any insight or guidance you can give.  This may have nothing to do with Dumbo but since all of the errors are in typedbytes, I thought I'd ask.

thanks!
Jay

stdout logs



stderr logs
INFO: inputting typed bytes
INFO: outputting typed bytes
Traceback (most recent call last):
  File "/usr/lib/python2.7/runpy.py", line 162, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/mnt/var/lib/hadoop/mapred/taskTracker/hadoop/jobcache/job_201303121752_0003/attempt_201303121752_0003_r_001579_0/work/translate_profiles.py", line 442, in <module>
    dumbo.run(dumbo.identitymapper, Reducer, opts=opts)
  File "dumbo/core.py", line 359, in run
    typedbytes.PairedOutput(sys.stdout).writes(outputs)
  File "typedbytes.py", line 397, in writes
    self._writes(flatten(iterable))
  File "typedbytes.py", line 257, in _writes
    w(obj)
  File "typedbytes.py", line 250, in _write
    writefunc(self, obj)
  File "typedbytes.py", line 321, in write_map
    self._writes(flatten(map.iteritems()))
  File "typedbytes.py", line 257, in _writes
    w(obj)
  File "typedbytes.py", line 250, in _write
    writefunc(self, obj)
  File "typedbytes.py", line 321, in write_map
    self._writes(flatten(map.iteritems()))
  File "typedbytes.py", line 257, in _writes
    w(obj)
  File "typedbytes.py", line 250, in _write
    writefunc(self, obj)
  File "typedbytes.py", line 299, in write_string
    self.file.write(string)
IOError: [Errno 32] Broken pipe
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
	at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
	at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:576)
	at org.apache.hadoop.streaming.PipeReducer.reduce(PipeReducer.java:130)
	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:528)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:429)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
	at org.apache.hadoop.mapred.Child.main(Child.java:249)



syslog logs
2013-03-13 00:20:25,843 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,042 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,063 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,070 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,079 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,106 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,113 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,122 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,128 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,135 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,142 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,152 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,159 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 4 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,166 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 2 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,172 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 1 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,178 INFO org.apache.hadoop.mapred.ReduceTask (main): attempt_201303121752_0003_r_001579_0 Scheduled 1 outputs (0 slow hosts and0 dup hosts)
2013-03-13 00:20:26,858 INFO org.apache.hadoop.mapred.ReduceTask (Thread for polling Map Completion Events): GetMapEventsThread exiting
2013-03-13 00:20:26,858 INFO org.apache.hadoop.mapred.ReduceTask (main): getMapsEventsThread joined.
2013-03-13 00:20:26,859 INFO org.apache.hadoop.mapred.ReduceTask (main): Closed ram manager
2013-03-13 00:20:26,859 INFO org.apache.hadoop.mapred.ReduceTask (main): Interleaved on-disk merge complete: 0 files left.
2013-03-13 00:20:26,859 INFO org.apache.hadoop.mapred.ReduceTask (main): In-memory merge complete: 56 files left.
2013-03-13 00:20:26,869 INFO org.apache.hadoop.mapred.Merger (main): Merging 56 sorted segments
2013-03-13 00:20:26,870 INFO org.apache.hadoop.mapred.Merger (main): Down to the last merge-pass, with 56 segments left of total size: 657025 bytes
2013-03-13 00:20:26,872 INFO org.apache.hadoop.io.compress.CodecPool (main): Got brand-new compressor
2013-03-13 00:20:26,905 INFO org.apache.hadoop.mapred.ReduceTask (main): Merged 56 segments, 657025 bytes to disk to satisfy reduce memory limit
2013-03-13 00:20:26,905 INFO org.apache.hadoop.mapred.ReduceTask (main): Merging 1 files, 290447 bytes from disk
2013-03-13 00:20:26,906 INFO org.apache.hadoop.mapred.ReduceTask (main): Merging 0 segments, 0 bytes from memory into reduce
2013-03-13 00:20:26,906 INFO org.apache.hadoop.mapred.Merger (main): Merging 1 sorted segments
2013-03-13 00:20:26,909 INFO org.apache.hadoop.mapred.Merger (main): Down to the last merge-pass, with 1 segments left of total size: 290443 bytes
2013-03-13 00:20:26,932 INFO org.apache.hadoop.streaming.PipeMapRed (main): PipeMapRed exec [/usr/bin/python, -m, translate_profiles, red, 0, 262144000]
2013-03-13 00:20:26,997 INFO com.strcst.hadoop.typedbytes.HBaseOutputFormat (main): Setting 'hbase.zookeeper.quorum' to '[redacted]'
2013-03-13 00:20:26,997 INFO com.strcst.hadoop.typedbytes.HBaseOutputFormat (main): Setting 'hbase.zookeeper.property.clientPort' to '2181'
2013-03-13 00:20:27,330 INFO org.apache.hadoop.streaming.PipeMapRed (main): R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2013-03-13 00:20:27,330 INFO org.apache.hadoop.streaming.PipeMapRed (main): R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s]
2013-03-13 00:20:27,332 INFO org.apache.hadoop.streaming.PipeMapRed (main): R/W/S=100/0/0 in:NA [rec/s] out:NA [rec/s]
2013-03-13 00:20:27,344 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-40): Records R/W=158/1
2013-03-13 00:20:27,961 WARN org.apache.hadoop.streaming.PipeMapRed (Thread-40): java.lang.OutOfMemoryError: Java heap space
	at org.apache.hadoop.typedbytes.TypedBytesInput.readRawBytes(TypedBytesInput.java:212)
	at org.apache.hadoop.typedbytes.TypedBytesInput.readRaw(TypedBytesInput.java:152)
	at org.apache.hadoop.streaming.io.TypedBytesOutputReader.readKeyValue(TypedBytesOutputReader.java:51)
	at org.apache.hadoop.streaming.PipeMapRed$MROutputThread.run(PipeMapRed.java:418)

2013-03-13 00:20:27,989 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-38): MRErrorThread done
2013-03-13 00:20:27,989 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Broken pipe
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:282)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
	at java.io.DataOutputStream.flush(DataOutputStream.java:106)
	at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:569)
	at org.apache.hadoop.streaming.PipeReducer.reduce(PipeReducer.java:130)
	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:528)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:429)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
	at org.apache.hadoop.mapred.Child.main(Child.java:249)

2013-03-13 00:20:27,989 INFO org.apache.hadoop.streaming.PipeMapRed (main): PipeMapRed failed!
2013-03-13 00:20:27,992 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2013-03-13 00:20:28,012 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2013-03-13 00:20:28,012 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation
2013-03-13 00:20:28,014 WARN org.apache.hadoop.mapred.Child (main): Error running child
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
	at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:362)
	at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:576)
	at org.apache.hadoop.streaming.PipeReducer.reduce(PipeReducer.java:130)
	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:528)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:429)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
	at org.apache.hadoop.mapred.Child.main(Child.java:249)
2013-03-13 00:20:28,017 INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task

Gilles Vandelle

unread,
Mar 13, 2013, 3:44:01 AM3/13/13
to dumbo...@googlegroups.com
Jay,

The broken pipe error is most of the time a consequence of the run time error in the python code. It is possible that you have a memory error in your reducer. The maximum memory allocated to each reducer can be changed using the "memlimit" parameter. A very common case is a realization of the "values" generator into a list.
You will have to share your code if you want more help.

-Gilles

--
You received this message because you are subscribed to the Google Groups "dumbo-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to dumbo-user+...@googlegroups.com.
To post to this group, send email to dumbo...@googlegroups.com.
Visit this group at http://groups.google.com/group/dumbo-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Gilles Vandelle
gil...@vandelle.com

Klaas Bosteels

unread,
Mar 13, 2013, 5:32:29 AM3/13/13
to dumbo...@googlegroups.com
You seem to be go out of memory on line

byte[] bytes = new byte[5 + length];

in readRawBytes() of TypedBytesInput.java, so I guess you're either outputting huge key and/or values in your reducer or something else is using up lots of memory and this just happens to be the place where you actually run into the limit. If it's the latter, then maybe it has something to do with how you're writing to HBase (e.g. buffering that happens there or so). What output format are you using exactly? Does it work when you simply write to an HDFS file instead?

-K


On Wed, Mar 13, 2013 at 1:34 AM, Jay Wineinger <jay.wi...@gmail.com> wrote:

Jay Wineinger

unread,
Mar 13, 2013, 9:07:33 AM3/13/13
to dumbo...@googlegroups.com
Thanks for the replies.

Gilles: I would expect to get only a single value for each key since I'm just scanning an HBase table.  That being said, I am iterating over the values generator and not forcing it to a list.  I did not see that memlimit parameter before. Looking in the code, it appears there is a 256MB default.  How is this enforced?  Does it seem strange that I would be getting java errors with a 3200MB heap when this default dumbo limit should have been hit much sooner?

Klaas: The keys shouldn't be huge (6 - 15 bytes) but the values could be bigger but the outliers could be a few hundred KB and the average ones are usually much less.  As for the output format, we are using one that my coworker wrote (I think because we couldn't get the lasthbase code to work) and I'm not much of a java guy so I can't speak to any buffering, etc that is being done in there.  I will ask him about it though.  I haven't tested writing directly to HDFS instead of HBase, but I'll give that a shot.

Jay
Reply all
Reply to author
Forward
0 new messages