Spark S3 problem java.net.SocketTimeoutException: Read timed out

4,955 views
Skip to first unread message

Grega Kešpret

unread,
Apr 18, 2013, 12:35:14 PM4/18/13
to spark...@googlegroups.com
Hello,

we are having problems with jobs, where there is a lot of data to download from S3. We are constantly getting 

java.net.SocketTimeoutException: Read timed out 

from S3, after which task fails and the whole job hangs up. 
Is it possible to somehow tweak the SocketTimeout-Time? 
Do you have any other proposal?

You can find logs (driver and worker that failed) here:

Thanks,
Grega

Alex Boisvert

unread,
Apr 18, 2013, 12:40:20 PM4/18/13
to spark...@googlegroups.com
If I may suggest, a possibly better (general) approach would be to add retries around all S3 operations.  S3 is known to have lots of transient connection-related failures.   A couple retries deals with 99.99999%+ of those (no matter what type of errors; and there are more than a couple types).


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Stephen Haberman

unread,
Apr 18, 2013, 1:50:52 PM4/18/13
to spark...@googlegroups.com

> If I may suggest, a possibly better (general) approach would be to add
> retries around all S3 operations.

Spark uses the Hadoop file system APIs and, AFAIK, the NativeS3FileSystem
has retry logic built in to it--it reads the "fs.s3.maxRetries" conf
parameter (default of 4) and then uses a "RetryProxy" class to retry on
IOExceptions and S3Exceptions.

I was looking at the Hadoop 1.0.3 source code; no idea if those semantics
change across versions of Hadoop.

So you could potentially try increasing fs.s3.maxRetries, also it seems
odd that you'd get a timeout after 4 retries. Assuming you're running
in EC2; I suppose if you're running out of AWS it would be more likely.

- Stephen

Alex Boisvert

unread,
Apr 18, 2013, 2:02:50 PM4/18/13
to spark...@googlegroups.com
On Thu, Apr 18, 2013 at 10:50 AM, Stephen Haberman <stephen....@gmail.com> wrote:

> If I may suggest, a possibly better (general) approach would be to add
> retries around all S3 operations.

Spark uses the Hadoop file system APIs and, AFAIK, the NativeS3FileSystem
has retry logic built in to it--it reads the "fs.s3.maxRetries" conf
parameter (default of 4) and then uses a "RetryProxy" class to retry on
IOExceptions and S3Exceptions.

But does the RetryProxy also proxy (and retry) all the methods on the InputStream that's returned from the NativeS3FileSystem?    I looked quickly but I didn't see any trace of RetryProxy invocation handler on the stack trace bubbling up from the SocketTimeoutException (below).

alex


java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at com.sun.net.ssl.internal.ssl.InputRecord.readFully(InputRecord.java:293)
at com.sun.net.ssl.internal.ssl.InputRecord.read(InputRecord.java:331)
at com.sun.net.ssl.internal.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:798)
at com.sun.net.ssl.internal.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:755)
at com.sun.net.ssl.internal.ssl.AppInputStream.read(AppInputStream.java:75)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:256)
at java.io.BufferedInputStream.read(BufferedInputStream.java:317)
at org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:169)
at java.io.FilterInputStream.read(FilterInputStream.java:116)
at org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:107)
at org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:256)
at java.io.BufferedInputStream.read(BufferedInputStream.java:317)
at java.io.DataInputStream.read(DataInputStream.java:132)
at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:150)
at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:134)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:76)
at java.io.InputStream.read(InputStream.java:85)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
at spark.rdd.HadoopRDD$$anon$1.hasNext(HadoopRDD.scala:84)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at scala.collection.Iterator$$anon$22.hasNext(Iterator.scala:457)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at scala.collection.Iterator$$anon$22.hasNext(Iterator.scala:457)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.Iterator$$anon$22.foreach(Iterator.scala:451)
at spark.Aggregator.combineValuesByKey(Aggregator.scala:20)
at spark.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:69)
at spark.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:69)
at spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:19)
at spark.RDD.computeOrReadCheckpoint(RDD.scala:206)
at spark.RDD.iterator(RDD.scala:195)
at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:125)
at spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:74)
at spark.executor.Executor$TaskRunner.run(Executor.scala:101)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

Stephen Haberman

unread,
Apr 18, 2013, 2:32:40 PM4/18/13
to spark...@googlegroups.com

> But does the RetryProxy also proxy (and retry) all the methods on the
> InputStream that's returned from the NativeS3FileSystem?

Good point. The AWS EMR version of Hadoop has retry code in this method:

> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read

Where as public Hadoop 1.0.3 does not. Hadoop trunk seems to have a
1-time retry:

https://gist.github.com/stephenh/5415089

- Stephen

Grega Kešpret

unread,
Apr 19, 2013, 5:56:22 AM4/19/13
to spark...@googlegroups.com
Is it possible to use AWS EMR version of Hadoop instead of public Hadoop 1.0.4 ?
Has somebody already done this?

So, currently I have (in SparkBuild.scala):

  val HADOOP_VERSION = "1.0.4"
  val HADOOP_MAJOR_VERSION = "1"

What values should I put for AWS EMR?

Thanks,
Grega

Stephen Haberman

unread,
Apr 19, 2013, 11:37:10 AM4/19/13
to spark...@googlegroups.com

> Is it possible to use AWS EMR version of Hadoop instead of public Hadoop
> 1.0.4? Has somebody already done this?

We do it by running Spark directly on EMR clusters (EMR instances cost
slightly more than regular EC2 instances, but it works well for our
existing workflows).

I don't believe Amazon makes their version of Hadoop otherwise available.

If anyone is interested, we have a few bootstrap actions for setting up Spark on
EMR that we could/should open source at some point, but they have things like
our internal bucket names/jar locations hardcoded into them, so would take some
massaging to make generally useful.

- Stephen

Grega Kešpret

unread,
Apr 19, 2013, 9:26:18 PM4/19/13
to spark...@googlegroups.com
If you are running Spark directly on EMR clusters, do you gain access to EMR Hadoop jars which supposedly fix the S3 timeouts? I would be interested in whatever you can share.

Any other ideas out there regarding S3 timeouts?

Grega
Reply all
Reply to author
Forward
0 new messages