I started the mesos cluster with the memoryFraction setting as you suggested, and DiskSpillingCache; SPARK_MEM=6g. Â I can now read from s3 successfully (the earlier timeouts were definitely the HDFS problem). Â However; I still have the java heapspace errors.
The code:
scala> val tr = spark.textFile("s3n://.../RITA/original/199*").cache
scala> tr.count
The job took 300 seconds to load 6GB of data from s3 and give me a count; and several of the slave tasks fail. Â However, I finally got a result (~5M rows).
Calling tr.count again, the job took 200 seconds to run; with no failures on any of the slaves. Â Still not as fast as I expected.
Calling tr.count a third time, the job took 120 seconds to run;Â and I see one Java heap space error.
A fourth time, tr.count has been running over 5 minutes and, though I haven't seen any Java heap space errors.
The stderr log for one slave from the first job:
12/08/08 22:47:03 INFO spark.DiskSpillingCache: BoundedMemoryCache.maxBytes = 2469606195
12/08/08 22:47:03 INFO spark.ShuffleManager: Shuffle dir: /tmp/spark-local-a02a1574-cb7f-4671-8f97-9787bf9ada17/shuffle
12/08/08 22:47:03 INFO server.Server: jetty-7.5.3.v20111011
12/08/08 22:47:03 INFO spark.Executor: Running task ID 1
12/08/08 22:47:03 INFO spark.Executor: Running task ID 3
12/08/08 22:47:04 INFO spark.CacheTracker: Looking for RDD partition 1:3
12/08/08 22:47:04 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@694
12/08/08 22:47:04 INFO spark.CacheTracker: Looking for RDD partition 1:1
12/08/08 22:47:04 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@692
12/08/08 22:47:46 INFO spark.DiskSpillingCache: Asked to add key ((0,1),3)
12/08/08 22:47:46 INFO spark.DiskSpillingCache: Estimated size for key ((0,1),3) is 500894700
12/08/08 22:47:46 INFO spark.DiskSpillingCache: Size estimation for key ((0,1),3) took 2 ms
12/08/08 22:47:46 INFO spark.DiskSpillingCache: ensureFreeSpace((0,1), 500894700) called with curBytes=0, maxBytes=2469606195
12/08/08 22:47:46 INFO spark.DiskSpillingCache: Adding key ((0,1),3)
12/08/08 22:47:46 INFO spark.DiskSpillingCache: Number of entries is now 1
12/08/08 22:47:46 INFO spark.Executor: Finished task ID 3
12/08/08 22:47:46 INFO spark.Executor: Running task ID 4
12/08/08 22:47:46 INFO spark.CacheTracker: Looking for RDD partition 1:4
12/08/08 22:47:46 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@695
12/08/08 22:47:58 INFO spark.DiskSpillingCache: Asked to add key ((0,1),1)
12/08/08 22:47:58 INFO spark.DiskSpillingCache: Estimated size for key ((0,1),1) is 501363989
12/08/08 22:47:58 INFO spark.DiskSpillingCache: Size estimation for key ((0,1),1) took 0 ms
12/08/08 22:47:58 INFO spark.DiskSpillingCache: ensureFreeSpace((0,1), 501363989) called with curBytes=500894700, maxBytes=2469606195
12/08/08 22:47:58 INFO spark.DiskSpillingCache: Adding key ((0,1),1)
12/08/08 22:47:58 INFO spark.DiskSpillingCache: Number of entries is now 2
12/08/08 22:47:58 INFO spark.Executor: Finished task ID 1
12/08/08 22:47:58 INFO spark.Executor: Running task ID 5
12/08/08 22:47:58 INFO spark.CacheTracker: Looking for RDD partition 1:5
12/08/08 22:47:58 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@696
12/08/08 22:48:48 ERROR spark.Executor: Exception in task ID 4
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2798)
at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:150)
at spark.JavaSerializerInstance.serialize(JavaSerializer.scala:28)
at spark.DiskSpillingCache.put(DiskSpillingCache.scala:49)
at spark.KeySpace.put(Cache.scala:60)
at spark.CacheTracker.getOrCompute(CacheTracker.scala:204)
at spark.RDD.iterator(RDD.scala:76)
at spark.ResultTask.run(ResultTask.scala:17)
at spark.Executor$TaskRunner.run(Executor.scala:82)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
And from the last job (still unfinished):
This slave is running one java process at 100% cpu, 83% memory (~6GB of 7GB available on slave), presumably loading or counting the RDD? Â It's been stuck for over 5 minutes.
12/08/08 23:00:44 INFO spark.DiskSpillingCache: BoundedMemoryCache.maxBytes = 2469606195
12/08/08 23:00:44 INFO spark.ShuffleManager: Shuffle dir: /tmp/spark-local-4c1fa3b1-8d59-48ee-99e5-31612fcd21bb/shuffle
12/08/08 23:00:44 INFO server.Server: jetty-7.5.3.v20111011
12/08/08 23:00:44 INFO spark.Executor: Running task ID 40
12/08/08 23:00:44 INFO spark.CacheTracker: Looking for RDD partition 1:5
12/08/08 23:00:44 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@696
12/08/08 23:01:22 INFO spark.DiskSpillingCache: Asked to add key ((0,1),5)
12/08/08 23:01:22 INFO spark.DiskSpillingCache: Estimated size for key ((0,1),5) is 541406484
12/08/08 23:01:22 INFO spark.DiskSpillingCache: Size estimation for key ((0,1),5) took 2 ms
12/08/08 23:01:22 INFO spark.DiskSpillingCache: ensureFreeSpace((0,1), 541406484) called with curBytes=0, maxBytes=2469606195
12/08/08 23:01:22 INFO spark.DiskSpillingCache: Adding key ((0,1),5)
12/08/08 23:01:22 INFO spark.DiskSpillingCache: Number of entries is now 1
12/08/08 23:01:22 INFO spark.Executor: Finished task ID 40
12/08/08 23:01:34 INFO spark.Executor: Running task ID 43
12/08/08 23:01:34 INFO spark.Executor: Running task ID 45
12/08/08 23:01:34 INFO spark.CacheTracker: Looking for RDD partition 1:3
12/08/08 23:01:34 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@694
12/08/08 23:01:34 INFO spark.CacheTracker: Looking for RDD partition 1:1
12/08/08 23:01:34 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@692
12/08/08 23:02:12 INFO spark.DiskSpillingCache: Asked to add key ((0,1),3)
12/08/08 23:02:12 INFO spark.DiskSpillingCache: Estimated size for key ((0,1),3) is 500894700
12/08/08 23:02:12 INFO spark.DiskSpillingCache: Size estimation for key ((0,1),3) took 0 ms
12/08/08 23:02:12 INFO spark.DiskSpillingCache: ensureFreeSpace((0,1), 500894700) called with curBytes=541406484, maxBytes=2469606195
12/08/08 23:02:12 INFO spark.DiskSpillingCache: Adding key ((0,1),3)
12/08/08 23:02:12 INFO spark.DiskSpillingCache: Number of entries is now 2
12/08/08 23:02:12 INFO spark.Executor: Finished task ID 45
12/08/08 23:02:12 INFO spark.Executor: Running task ID 48
12/08/08 23:02:12 INFO spark.CacheTracker: Looking for RDD partition 1:4
12/08/08 23:02:12 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@695
12/08/08 23:02:16 INFO spark.DiskSpillingCache: Asked to add key ((0,1),1)
12/08/08 23:02:16 INFO spark.DiskSpillingCache: Estimated size for key ((0,1),1) is 501363989
12/08/08 23:02:16 INFO spark.DiskSpillingCache: Size estimation for key ((0,1),1) took 0 ms
12/08/08 23:02:16 INFO spark.DiskSpillingCache: ensureFreeSpace((0,1), 501363989) called with curBytes=1042301184, maxBytes=2469606195
12/08/08 23:02:16 INFO spark.DiskSpillingCache: Adding key ((0,1),1)
12/08/08 23:02:16 INFO spark.DiskSpillingCache: Number of entries is now 3
12/08/08 23:02:16 INFO spark.Executor: Finished task ID 43
12/08/08 23:02:16 INFO spark.Executor: Running task ID 49
12/08/08 23:02:16 INFO spark.CacheTracker: Looking for RDD partition 1:5
12/08/08 23:02:24 INFO spark.CacheTracker: Found partition in cache!
12/08/08 23:02:24 INFO spark.Executor: Finished task ID 49
12/08/08 23:02:24 INFO spark.Executor: Running task ID 51
12/08/08 23:02:24 INFO spark.CacheTracker: Looking for RDD partition 1:8
12/08/08 23:02:24 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@699