S3 Error or Something Else? (Python/MRJob/Amazon EMR)

151 views
Skip to first unread message

Pingometer LLC

unread,
May 26, 2015, 12:00:41 PM5/26/15
to common...@googlegroups.com
I'm using Python, MRJob, and Amazon EMR to run a job based on the tag_counter example (https://github.com/commoncrawl/cc-mrjob/blob/master/tag_counter.py).

This streams WET files from Amazon S3 and works well for small jobs (when the input is ~50 WET file paths). If I increase the input size to > 50 WET file paths, the job always completes 63/64 tasks, but fails on the last one.

In my code, I surround all logic with a try/except block to silence any errors (as I understand anything going wrong kills the whole job:

try:
 
# my code
except:
 
pass

Here is the error output available from Amazon EMR:

+ __mrjob_PWD=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001/container_1432626749504_0001_01_000076
+ exec
+ python -c 'import fcntl; fcntl.flock(9, fcntl.LOCK_EX)'
+ export PYTHONPATH=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001/container_1432626749504_0001_01_000076/mrcc.py.tar.gz:
+ PYTHONPATH=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001/container_1432626749504_0001_01_000076/mrcc.py.tar.gz:
+ exec
+ cd /mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001/container_1432626749504_0001_01_000076
+ python2.7 tag_grabber.py --step-num=0 --mapper --source s3
Traceback (most recent call last):
 
File "tag_grabber.py", line 45, in <module>
   
TagGrabber.run()
 
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 461, in run
    mr_job
.execute()
 
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 470, in execute
   
self.run_mapper(self.options.step_num)
 
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 535, in run_mapper
   
for out_key, out_value in mapper(key, value) or ():
 
File "/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001/container_1432626749504_0001_01_000076/mrcc.py.tar.gz/mrcc.py", line 40, in mapper
   
for i, record in enumerate(f):
 
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 393, in __iter__
    record
= self.read_record()
 
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 364, in read_record
   
self.finish_reading_current_record()
 
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 359, in finish_reading_current_record
   
self.expect(self.current_payload.fileobj, "\r\n")
 
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 352, in expect
   
raise IOError(message)
IOError: Expected '\r\n', found '


2015-05-26 08:25:37,365 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
2015-05-26 08:25:37,417 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
2015-05-26 08:25:37,954 INFO [main] org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2015-05-26 08:25:37,976 INFO [main] org.apache.hadoop.metrics2.sink.cloudwatch.CloudWatchSink: Initializing the CloudWatchSink for metrics.
2015-05-26 08:25:38,115 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSinkAdapter: Sink file started
2015-05-26 08:25:38,222 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 300 second(s).
2015-05-26 08:25:38,222 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system started
2015-05-26 08:25:38,240 INFO [main] org.apache.hadoop.mapred.YarnChild: Executing with tokens:
2015-05-26 08:25:38,240 INFO [main] org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service: job_1432626749504_0001, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@568c11d5)
2015-05-26 08:25:38,355 INFO [main] org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Got null now.
2015-05-26 08:25:38,796 INFO [main] org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child: /mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001,/mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001,/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001,/mnt3/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432626749504_0001
2015-05-26 08:25:38,931 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
2015-05-26 08:25:38,947 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
2015-05-26 08:25:39,348 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
2015-05-26 08:25:40,088 INFO [main] amazon.emr.metrics.MetricsSaver: MetricsSaver YarnChild root:hdfs:///mnt/var/em/ period:120 instanceId:i-fc835515 jobflow:j-3CNU7SCUOLBJB
2015-05-26 08:25:40,282 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
2015-05-26 08:25:40,285 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
2015-05-26 08:25:40,287 INFO [main] com.amazon.ws.emr.hadoop.fs.guice.EmrFSBaseModule: Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as FileSystem implementation.
2015-05-26 08:25:41,348 INFO [main] com.amazon.ws.emr.hadoop.fs.EmrFileSystem: Using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
2015-05-26 08:25:42,324 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
2015-05-26 08:25:42,325 WARN [main] org.apache.hadoop.conf.Configuration: job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
2015-05-26 08:25:42,339 INFO [main] org.apache.hadoop.mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
2015-05-26 08:25:42,587 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: s3://mrjob-91ef8460071eeea6/tmp/tag_grabber.root.20150526.073515.773670/files/process.wet:440+220
2015-05-26 08:25:42,595 INFO [main] com.amazon.ws.emr.hadoop.fs.guice.EmrFSBaseModule: Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as FileSystem implementation.
2015-05-26 08:25:42,665 INFO [main] com.amazon.ws.emr.hadoop.fs.EmrFileSystem: Using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
2015-05-26 08:25:42,769 INFO [main] com.hadoop.compression.lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
2015-05-26 08:25:42,772 INFO [main] com.hadoop.compression.lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 77cfa96225d62546008ca339b7c2076a3da91578]
2015-05-26 08:25:42,804 INFO [main] com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem: Opening 's3://mrjob-91ef8460071eeea6/tmp/tag_grabber.root.20150526.073515.773670/files/process.wet' for reading
2015-05-26 08:25:43,139 INFO [main] com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem: Stream for key 'tmp/tag_grabber.root.20150526.073515.773670/files/process.wet' seeking to position '440'
2015-05-26 08:25:43,471 INFO [main] org.apache.hadoop.mapred.MapTask: numReduceTasks: 0
2015-05-26 08:25:43,637 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed exec [/bin/sh, -ex, setup-wrapper.sh, python2.7, tag_grabber.py, --step-num=0, --mapper, --source, s3]
2015-05-26 08:25:43,680 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2015-05-26 08:25:44,484 INFO [Thread-13] org.apache.hadoop.streaming.PipeMapRed: Records R/W=1/1
2015-05-26 08:35:35,384 INFO [Thread-13] org.apache.hadoop.streaming.PipeMapRed: Records R/W=1/1143
2015-05-26 08:35:35,422 INFO [Thread-14] org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done
2015-05-26 08:35:35,424 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed failed!
java
.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
 at org
.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:330)
 at org
.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:543)
 at org
.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
 at org
.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
 at org
.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
 at org
.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:432)
 at org
.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
 at org
.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
 at java
.security.AccessController.doPrivileged(Native Method)
 at javax
.security.auth.Subject.doAs(Subject.java:415)
 at org
.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
 at org
.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
2015-05-26 08:35:36,315 INFO [main] com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore: s3.putObject tagscout tag-1408500800767.23-251-351/part-00002 160997
2015-05-26 08:35:36,319 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
 at org
.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:330)
 at org
.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:543)
 at org
.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
 at org
.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
 at org
.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
 at org
.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:432)
 at org
.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
 at org
.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
 at java
.security.AccessController.doPrivileged(Native Method)
 at javax
.security.auth.Subject.doAs(Subject.java:415)
 at org
.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
 at org
.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)


2015-05-26 08:35:36,322 INFO [main] org.apache.hadoop.mapred.Task: Runnning cleanup for the task
2015-05-26 08:35:36,322 INFO [main] org.apache.hadoop.mapred.DirectFileOutputCommitter: Nothing to clean up on abort since there are no temporary files written

If I remove the "pass" under except and instead print any errors, I get the same errors above but also this:

+ __mrjob_PWD=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432538379516_0001/container_1432538379516_0001_01_000029
+ exec
+ python -c 'import fcntl; fcntl.flock(9, fcntl.LOCK_EX)'
+ export PYTHONPATH=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432538379516_0001/container_1432538379516_0001_01_000029/mrcc.py.tar.gz:
+ PYTHONPATH=/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432538379516_0001/container_1432538379516_0001_01_000029/mrcc.py.tar.gz:
+ exec
+ cd /mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432538379516_0001/container_1432538379516_0001_01_000029
+ python2.7 tag_grabber.py --step-num=0 --mapper --source s3
Traceback (most recent call last):
 
File "tag_grabber.py", line 56, in <module>
   
TagGrabber.run()
 
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 461, in run
    mr_job
.execute()
 
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 470, in execute
   
self.run_mapper(self.options.step_num)
 
File "/usr/lib/python2.7/site-packages/mrjob/job.py", line 535, in run_mapper
   
for out_key, out_value in mapper(key, value) or ():
 
File "/mnt2/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1432538379516_0001/container_1432538379516_0001_01_000029/mrcc.py.tar.gz/mrcc.py", line 39, in mapper
   
for i, record in enumerate(f):
 
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 393, in __iter__
    record
= self.read_record()
 
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 373, in read_record
    header
= self.read_header(fileobj)
 
File "/usr/lib/python2.7/site-packages/warc/warc.py", line 338, in read_header
    line
= fileobj.readline()
 
File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 75, in readline
    result
= super(GzipStreamFile, self).readline(*args, **kwargs)
 
File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 48, in readinto
    data
= self.read(len(b))
 
File "/usr/lib/python2.7/site-packages/gzipstream/gzipstreamfile.py", line 38, in read
    raw
= self.stream.read(io.DEFAULT_BUFFER_SIZE)
 
File "/usr/lib/python2.7/site-packages/boto/s3/key.py", line 400, in read
    data
= self.resp.read(size)
 
File "/usr/lib/python2.7/site-packages/boto/connection.py", line 413, in read
   
return http_client.HTTPResponse.read(self, amt)
 
File "/usr/lib64/python2.7/httplib.py", line 567, in read
    s
= self.fp.read(amt)
 
File "/usr/lib64/python2.7/socket.py", line 380, in read
    data
= self._sock.recv(left)
 
File "/usr/lib64/python2.7/ssl.py", line 246, in recv
   
return self.read(buflen)
 
File "/usr/lib64/python2.7/ssl.py", line 165, in read
   
return self._sslobj.read(len)
socket
.error: [Errno 104] Connection reset by peer


It looks to me like the issue is related first being unable to stream the file from S3 which causes another issue (the IOError). Is this a correct interpretation and is there anything I can do about it? Is S3 unreliable with > 50 input files at a time?

Martin Zahra

unread,
Jun 20, 2015, 4:39:20 PM6/20/15
to common...@googlegroups.com
Hi,

Were you able to resolve this problem?  I'm encountering similar errors myself.  Thank you.
...

Pingometer

unread,
Jun 20, 2015, 7:38:05 PM6/20/15
to common...@googlegroups.com
Hi Martin,

I was able to solve the problem. I did a few things:

(1) use the latest version of mrjob
(2) use the latest version of boto
(3) experiment with a few different ami versions - I tried 3.2.1 and 3.6.0 (had better luck with this)
(4) substantially increase the task timeout (https://pythonhosted.org/mrjob/guides/hadoop-cookbook.html)
(5) surround your logic with exception handling logic that prints errors, like this:

try:
  ...
except Exception as e:
  sys.stderr.write(str(e.message) + '\n')

then, check the stderr log to see if you can determine why it's failing (if the failure is related to your code)

(6) use us-east-1 for your region (both for ec2 instances and s3)

(7) minimize the number of dependencies (third-party python libraries) and keep your mapper/reducer/combiner logic to a minimum

Let me know if any of this helps you get it going (it did in my case). Otherwise, I'd be happy to lend you a hand.
  

--
You received this message because you are subscribed to a topic in the Google Groups "Common Crawl" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/common-crawl/765oqytcHzs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to common-crawl...@googlegroups.com.
To post to this group, send email to common...@googlegroups.com.
Visit this group at http://groups.google.com/group/common-crawl.
For more options, visit https://groups.google.com/d/optout.

Martin Zahra

unread,
Jun 20, 2015, 9:24:37 PM6/20/15
to common...@googlegroups.com
This is really helpful.  Thank you.  I'm looking forward to trying this stuff.

Pingometer

unread,
Jun 20, 2015, 10:50:21 PM6/20/15
to common...@googlegroups.com
You bet! Let me know how it goes :)
Reply all
Reply to author
Forward
0 new messages