Trouble reading batches of large files from s3

2,249 views
Skip to first unread message

Eugene Brevdo

unread,
Aug 7, 2012, 2:51:15 PM8/7/12
to spark...@googlegroups.com
I'm trying to read about 12GB of .csv files from s3 (each file is <4GB).

Here is my code (running on mesos+EC2):

MASTER=localhost:5050 ./spark-shell

scala> val tf = sc.textFile("s3n://<key>:<secret>@bucket/directory_containing_csvs/").cache
12/08/07 18:34:05 INFO mapred.FileInputFormat: Total input paths to process : 22
tf: spark.RDD[String] = spark.MappedRDD@50d4855d

scala> tf.count

I've seen two issues with spark in this regard:

1. In my initial configuration, I have one master and two slaves; each slave gets 2GB ram.  So when I cache my data, it should spill to disk once RAM runs out.  However, I get the following errors and the job fails:

12/08/07 18:16:08 INFO spark.SimpleJob: Lost TID 13 (task 6:1)
12/08/07 18:16:08 INFO spark.SimpleJob: Loss was due to java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:3221)
        at java.lang.String.<init>(String.java:233)
        at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
        at java.nio.CharBuffer.toString(CharBuffer.java:1176)
        at org.apache.hadoop.io.Text.decode(Text.java:350)
        at org.apache.hadoop.io.Text.decode(Text.java:327)
        at org.apache.hadoop.io.Text.toString(Text.java:254)
        at spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:97)
        at spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:97)
        at scala.collection.Iterator$$anon$19.next(Iterator.scala:335)
        at scala.collection.Iterator$class.foreach(Iterator.scala:660)
        at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:333)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:99)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)
        at scala.collection.Iterator$$anon$19.toBuffer(Iterator.scala:333)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)
        at scala.collection.Iterator$$anon$19.toArray(Iterator.scala:333)
        at spark.CacheTracker.getOrCompute(CacheTracker.scala:203)
        at spark.RDD.iterator(RDD.scala:76)
        at spark.ResultTask.run(ResultTask.scala:17)
        at spark.Executor$TaskRunner.run(Executor.scala:82)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:679)

2. Once I set each worker to have 6GB of memory in the mesos spark-env.sh, and restart mesos, the job is still failing, except with these errors:

12/08/07 18:44:47 INFO spark.SimpleJob: Lost TID 25 (task 0:16)
12/08/07 18:44:47 INFO spark.SimpleJob: Loss was due to java.net.SocketTimeoutException: Read timed out
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.read(SocketInputStream.java:146)
        at sun.security.ssl.InputRecord.readFully(InputRecord.java:312)
        at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:424)
        at sun.security.ssl.InputRecord.read(InputRecord.java:379)
        at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:850)
        at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:807)
        at sun.security.ssl.AppInputStream.read(AppInputStream.java:94)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
        at org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:169)
        at java.io.FilterInputStream.read(FilterInputStream.java:133)
        at org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:107)
        at org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)
        at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:273)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
        at java.io.DataInputStream.read(DataInputStream.java:100)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
        at spark.HadoopRDD$$anon$1.hasNext(HadoopRDD.scala:81)
        at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:334)
        at scala.collection.Iterator$class.foreach(Iterator.scala:660)
        at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:333)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:99)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)
        at scala.collection.Iterator$$anon$19.toBuffer(Iterator.scala:333)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)
        at scala.collection.Iterator$$anon$19.toArray(Iterator.scala:333)
        at spark.CacheTracker.getOrCompute(CacheTracker.scala:203)
        at spark.RDD.iterator(RDD.scala:76)
        at spark.ResultTask.run(ResultTask.scala:17)
        at spark.Executor$TaskRunner.run(Executor.scala:82)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:679)


Note that in each of these cases, the tasks keep restarting (and re-failing) seemingly indefinitely.


There is no problem running this code with individual CSV files within that s3 path (each is of size 100MB-500MB).

best,
Eugene

Matei Zaharia

unread,
Aug 7, 2012, 3:53:30 PM8/7/12
to spark...@googlegroups.com
In the past, the Hadoop S3 access libraries, which are what we use to access S3, have had some problems. Can you try copying the data to HDFS first using the hadoop distcp command? Otherwise you can also try to bump up the Hadoop library version that Spark uses. In project/SparkBuild.scala, change the HADOOP_VERSION variable at the top to either "1.0.3" or "0.20.2-cdh3u5" (the later is Cloudera's Hadoop, which back ports some useful patches). After you do this you will need to run sbt/sbt clean compile at the top-level Spark directory, followed by ~/mesos-ec2/copy-dir . to copy the recompiled code to all the slave nodes.

Matei

Matei Zaharia

unread,
Aug 7, 2012, 4:10:10 PM8/7/12
to spark...@googlegroups.com
By the way, that first error you got might be due to misestimating the space usage of each line of text (unless you have really huge lines that take too much RAM). Can you attach some log files from a slave? There should be entries from BoundedMemoryCache that explain what it's doing. Also, let me know how many files there were and how many map tasks the master ran.

Matei

Eugene Brevdo

unread,
Aug 7, 2012, 6:25:10 PM8/7/12
to spark...@googlegroups.com
+Alexy

I tried HADOOP_VERSION 1.0.3 and 0.20.2 and followed your instructions (even restarted mesos and hdfs).  no love.  Same exception:
  java.net.SocketTimeoutException: Read timed out.

I'm also trying to copy from s3 to hdfs (ephemeral or persistent); and my dfs master is having trouble syncing with any of its slaves (restarted the dfs/mapred servers and everything).  As a result, I can't distcp or hadoop fs -put to it.  I wonder if this is an issue identical to the one Alexy had in Dec. 2011:

I'm reading S3 data on EC2 from Spark and get HTTP errors/timeouts
often, after which Spark shell hangs, usually.  Matei said he can tune
the S3 timeouts, possibly, and a workaround to try is to copy into
local HDFS on the cluster.  However, when trying /root/persistent-hdfs/
bin/hadoop distcp s3n://... /data, I get errors where hadoop tries to
connect to external names.  How do we use persistent/ephemeral-hdfs
in /root on Mesos to copy to a local HDFS?

A+

---

How did you resolve that problem?

Best,
Eugene

Eugene Brevdo

unread,
Aug 8, 2012, 1:59:28 PM8/8/12
to spark...@googlegroups.com
My HDFS errors were caused by a wrong private key; effectively, mesos-ec2/setup was not able to log onto the slaves and set up the datanodes.

After copying the files to HDFS from s3, I run a similar command (now on a reduced dataset, ~6GB of data).  I also set each of my 2 slaves to have a SPARK_MEM of 6g.  I also tried this with the mesos-0.9 branch of spark:

scala> val tr = sc.textFile("hdfs://ip-10-34-90-99:9000/.../199*.csv").cache
scala> tr.count

I either see "out of Java heap space" errors, or GC overhead limit exceeded errors (which often happens when lots of small objects are being created and possibly destroyed).  The job never completes.

I'm attaching the mesos work- and logs- from one of the slaves.
mesos_logs.zip
201208081652-1666851338-5050-3568-0000.zip

Reynold Xin

unread,
Aug 8, 2012, 3:04:54 PM8/8/12
to spark...@googlegroups.com
From the log (stderr), the bounded memory cache is already full. 

12/08/08 17:13:38 INFO spark.BoundedMemoryCache: ensureFreeSpace((0,3), 304177815) called with curBytes=3958757569, maxBytes=4074850222
12/08/08 17:13:38 INFO spark.BoundedMemoryCache: Didn't add key ((0,3),35) because we would have evicted part of same dataset


Even though your data set is only 6G, caching those in JVM heap as strings can have overhead because of Java string's overhead (min 50 - 60B). If your line length is small, this overhead is substantial.

And we get the OOM because the heap is also filled up (rest 6G - 4G = 2G). How many cores do you have? It could also be that you have many cores, and each is using some substantial memory and GC hasn't had a chance to clean the heap.

--
Reynold Xin
Algorithms, Machines, People Lab | Database Group
Electrical Engineering and Computer Science, UC Berkeley

Eugene Brevdo

unread,
Aug 8, 2012, 3:14:27 PM8/8/12
to spark...@googlegroups.com
They're m1.large; I believe I was using 2 cores per slave.  Is there a work-around?  I thought spark flushed data to local disk when it ran out of RAM cache?  The ultimate goal is to analyze datasets of much larger size, that won't fit in RAM on all of my compute nodes.

Reynold Xin

unread,
Aug 8, 2012, 4:22:14 PM8/8/12
to spark...@googlegroups.com
To spill cache to disk, set spark.cache.class to spark.DiskSpillingCache

Can I get a handle on the csv file you are running? With only two cores, it shouldn't OOM. I suspect the bounded memory cache is giving very wrong estimates on the sizing of the data in memory.


--
Reynold Xin
Algorithms, Machines, People Lab | Database Group
Electrical Engineering and Computer Science, UC Berkeley



Eugene Brevdo

unread,
Aug 8, 2012, 4:43:23 PM8/8/12
to spark...@googlegroups.com
It's the RITA dataset, available here: http://stat-computing.org/dataexpo/2009/the-data.html

It's slow to download from their website; so contact me directly and I'll see if I can provide access directly from my files on s3.

Reynold Xin

unread,
Aug 8, 2012, 4:50:58 PM8/8/12
to spark...@googlegroups.com, Shivaram Venkataraman
Eugene,

Can you give us access to the s3 data? Shivaram and I will take a look.


--
Reynold Xin
Algorithms, Machines, People Lab | Database Group
Electrical Engineering and Computer Science, UC Berkeley



Matei Zaharia

unread,
Aug 8, 2012, 5:18:50 PM8/8/12
to Reynold Xin, spark...@googlegroups.com, Shivaram Venkataraman
Try lowering the bounded memory cache's heap fraction to compensate for the misestimation. Set the system property spark.boundedMemoryCache.memoryFraction to 0.4. (Easiest way is to call System.setProperty("spark.boundedMemoryCache.memoryFraction", "0.4") *before* you create your SparkContext.)

Matei

Eugene Brevdo

unread,
Aug 8, 2012, 6:08:20 PM8/8/12
to spark...@googlegroups.com
Is this equivalent to setting SPARK_JAVA_OPTS="-Dspark.boundedMemoryCache.memoryFraction=0.4" before running spark-shell?  Or do I have to also add this line to /root/mesos-ec2/templates/root/spark/conf/spark-env.sh and rerun setup?

Eugene Brevdo

unread,
Aug 8, 2012, 7:13:53 PM8/8/12
to spark...@googlegroups.com
I started the mesos cluster with the memoryFraction setting as you suggested, and DiskSpillingCache; SPARK_MEM=6g.   I can now read from s3 successfully (the earlier timeouts were definitely the HDFS problem).  However; I still have the java heapspace errors.

The code:

scala> val tr = spark.textFile("s3n://.../RITA/original/199*").cache
scala> tr.count

The job took 300 seconds to load 6GB of data from s3 and give me a count; and several of the slave tasks fail.  However, I finally got a result (~5M rows).

Calling tr.count again, the job took 200 seconds to run; with no failures on any of the slaves.  Still not as fast as I expected.

Calling tr.count a third time, the job took 120 seconds to run; and I see one Java heap space error.

A fourth time, tr.count has been running over 5 minutes and, though I haven't seen any Java heap space errors.

The stderr log for one slave from the first job:

12/08/08 22:47:03 INFO spark.Executor: Using REPL class URI: http://10.34.81.124:42146
12/08/08 22:47:03 INFO spark.DiskSpillingCache: BoundedMemoryCache.maxBytes = 2469606195
12/08/08 22:47:03 INFO spark.ShuffleManager: Shuffle dir: /tmp/spark-local-a02a1574-cb7f-4671-8f97-9787bf9ada17/shuffle
12/08/08 22:47:03 INFO server.Server: jetty-7.5.3.v20111011
12/08/08 22:47:03 INFO server.AbstractConnector: Started SelectChann...@0.0.0.0:48522 STARTING
12/08/08 22:47:03 INFO spark.ShuffleManager: Local URI: http://10.40.103.61:48522
12/08/08 22:47:03 INFO spark.Executor: Running task ID 1
12/08/08 22:47:03 INFO spark.Executor: Running task ID 3
12/08/08 22:47:04 INFO spark.CacheTracker: Looking for RDD partition 1:3
12/08/08 22:47:04 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@694
12/08/08 22:47:04 INFO spark.CacheTracker: Looking for RDD partition 1:1
12/08/08 22:47:04 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@692
12/08/08 22:47:46 INFO spark.DiskSpillingCache: Asked to add key ((0,1),3)
12/08/08 22:47:46 INFO spark.DiskSpillingCache: Estimated size for key ((0,1),3) is 500894700
12/08/08 22:47:46 INFO spark.DiskSpillingCache: Size estimation for key ((0,1),3) took 2 ms
12/08/08 22:47:46 INFO spark.DiskSpillingCache: ensureFreeSpace((0,1), 500894700) called with curBytes=0, maxBytes=2469606195
12/08/08 22:47:46 INFO spark.DiskSpillingCache: Adding key ((0,1),3)
12/08/08 22:47:46 INFO spark.DiskSpillingCache: Number of entries is now 1
12/08/08 22:47:46 INFO spark.Executor: Finished task ID 3
12/08/08 22:47:46 INFO spark.Executor: Running task ID 4
12/08/08 22:47:46 INFO spark.CacheTracker: Looking for RDD partition 1:4
12/08/08 22:47:46 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@695
12/08/08 22:47:58 INFO spark.DiskSpillingCache: Asked to add key ((0,1),1)
12/08/08 22:47:58 INFO spark.DiskSpillingCache: Estimated size for key ((0,1),1) is 501363989
12/08/08 22:47:58 INFO spark.DiskSpillingCache: Size estimation for key ((0,1),1) took 0 ms
12/08/08 22:47:58 INFO spark.DiskSpillingCache: ensureFreeSpace((0,1), 501363989) called with curBytes=500894700, maxBytes=2469606195
12/08/08 22:47:58 INFO spark.DiskSpillingCache: Adding key ((0,1),1)
12/08/08 22:47:58 INFO spark.DiskSpillingCache: Number of entries is now 2
12/08/08 22:47:58 INFO spark.Executor: Finished task ID 1
12/08/08 22:47:58 INFO spark.Executor: Running task ID 5
12/08/08 22:47:58 INFO spark.CacheTracker: Looking for RDD partition 1:5
12/08/08 22:47:58 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@696
12/08/08 22:48:48 ERROR spark.Executor: Exception in task ID 4
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2798)
at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:150)
at spark.JavaSerializerInstance.serialize(JavaSerializer.scala:28)
at spark.DiskSpillingCache.put(DiskSpillingCache.scala:49)
at spark.KeySpace.put(Cache.scala:60)
at spark.CacheTracker.getOrCompute(CacheTracker.scala:204)
at spark.RDD.iterator(RDD.scala:76)
at spark.ResultTask.run(ResultTask.scala:17)
at spark.Executor$TaskRunner.run(Executor.scala:82)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)

And from the last job (still unfinished):
This slave is running one java process at 100% cpu, 83% memory (~6GB of 7GB available on slave), presumably loading or counting the RDD?  It's been stuck for over 5 minutes.

12/08/08 23:00:44 INFO spark.Executor: Using REPL class URI: http://10.34.81.124:42146
12/08/08 23:00:44 INFO spark.DiskSpillingCache: BoundedMemoryCache.maxBytes = 2469606195
12/08/08 23:00:44 INFO spark.ShuffleManager: Shuffle dir: /tmp/spark-local-4c1fa3b1-8d59-48ee-99e5-31612fcd21bb/shuffle
12/08/08 23:00:44 INFO server.Server: jetty-7.5.3.v20111011
12/08/08 23:00:44 INFO server.AbstractConnector: Started SelectChann...@0.0.0.0:60559 STARTING
12/08/08 23:00:44 INFO spark.ShuffleManager: Local URI: http://10.40.103.61:60559
12/08/08 23:00:44 INFO spark.Executor: Running task ID 40
12/08/08 23:00:44 INFO spark.CacheTracker: Looking for RDD partition 1:5
12/08/08 23:00:44 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@696
12/08/08 23:01:22 INFO spark.DiskSpillingCache: Asked to add key ((0,1),5)
12/08/08 23:01:22 INFO spark.DiskSpillingCache: Estimated size for key ((0,1),5) is 541406484
12/08/08 23:01:22 INFO spark.DiskSpillingCache: Size estimation for key ((0,1),5) took 2 ms
12/08/08 23:01:22 INFO spark.DiskSpillingCache: ensureFreeSpace((0,1), 541406484) called with curBytes=0, maxBytes=2469606195
12/08/08 23:01:22 INFO spark.DiskSpillingCache: Adding key ((0,1),5)
12/08/08 23:01:22 INFO spark.DiskSpillingCache: Number of entries is now 1
12/08/08 23:01:22 INFO spark.Executor: Finished task ID 40
12/08/08 23:01:34 INFO spark.Executor: Running task ID 43
12/08/08 23:01:34 INFO spark.Executor: Running task ID 45
12/08/08 23:01:34 INFO spark.CacheTracker: Looking for RDD partition 1:3
12/08/08 23:01:34 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@694
12/08/08 23:01:34 INFO spark.CacheTracker: Looking for RDD partition 1:1
12/08/08 23:01:34 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@692
12/08/08 23:02:12 INFO spark.DiskSpillingCache: Asked to add key ((0,1),3)
12/08/08 23:02:12 INFO spark.DiskSpillingCache: Estimated size for key ((0,1),3) is 500894700
12/08/08 23:02:12 INFO spark.DiskSpillingCache: Size estimation for key ((0,1),3) took 0 ms
12/08/08 23:02:12 INFO spark.DiskSpillingCache: ensureFreeSpace((0,1), 500894700) called with curBytes=541406484, maxBytes=2469606195
12/08/08 23:02:12 INFO spark.DiskSpillingCache: Adding key ((0,1),3)
12/08/08 23:02:12 INFO spark.DiskSpillingCache: Number of entries is now 2
12/08/08 23:02:12 INFO spark.Executor: Finished task ID 45
12/08/08 23:02:12 INFO spark.Executor: Running task ID 48
12/08/08 23:02:12 INFO spark.CacheTracker: Looking for RDD partition 1:4
12/08/08 23:02:12 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@695
12/08/08 23:02:16 INFO spark.DiskSpillingCache: Asked to add key ((0,1),1)
12/08/08 23:02:16 INFO spark.DiskSpillingCache: Estimated size for key ((0,1),1) is 501363989
12/08/08 23:02:16 INFO spark.DiskSpillingCache: Size estimation for key ((0,1),1) took 0 ms
12/08/08 23:02:16 INFO spark.DiskSpillingCache: ensureFreeSpace((0,1), 501363989) called with curBytes=1042301184, maxBytes=2469606195
12/08/08 23:02:16 INFO spark.DiskSpillingCache: Adding key ((0,1),1)
12/08/08 23:02:16 INFO spark.DiskSpillingCache: Number of entries is now 3
12/08/08 23:02:16 INFO spark.Executor: Finished task ID 43
12/08/08 23:02:16 INFO spark.Executor: Running task ID 49
12/08/08 23:02:16 INFO spark.CacheTracker: Looking for RDD partition 1:5
12/08/08 23:02:24 INFO spark.CacheTracker: Found partition in cache!
12/08/08 23:02:24 INFO spark.Executor: Finished task ID 49
12/08/08 23:02:24 INFO spark.Executor: Running task ID 51
12/08/08 23:02:24 INFO spark.CacheTracker: Looking for RDD partition 1:8
12/08/08 23:02:24 INFO spark.CacheTracker: Computing partition spark.HadoopSplit@699

Reynold Xin

unread,
Aug 8, 2012, 8:02:55 PM8/8/12
to spark...@googlegroups.com, Shivaram Venkataraman, Aurojit Panda
We took a look at this and wasn't able to reproduce the problem.

We downloaded 5 of the 10 files (199*), ran it on a single node, allocated 4G of heap with 2 cores to the job. We did see a lot of full GC (takes about 5 secs and each time releasing 300M of heap space) at the end, but it finished in 108 secs.

Which version of spark are you running?

--
Reynold Xin
Algorithms, Machines, People Lab | Database Group
Electrical Engineering and Computer Science, UC Berkeley



Eugene Brevdo

unread,
Aug 8, 2012, 8:10:36 PM8/8/12
to spark...@googlegroups.com, Shivaram Venkataraman, Aurojit Panda
I did a git pull from the ec2 master (and copied compiled version to the slaves using the mesos-ec2 script).  My understanding is that this version runs on OpenJDK, not sun's java?  This is on the most recent spark+mesos ami.

Try running on ec2 with a setup more similar to mine.

BTW that job (the last tr.count) never finished!

# MASTER=localhost:5050 ./spark-shell 
Welcome to
      ____              __  
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.5.1-SNAPSHOT
      /_/                  

Using Scala version 2.9.1.final (OpenJDK 64-Bit Server VM, Java 1.6.0_24)
Initializing interpreter...
12/08/09 00:08:33 INFO server.Server: jetty-7.5.3.v20111011
12/08/09 00:08:33 INFO server.AbstractConnector: Started SelectChann...@0.0.0.0:51573 STARTING
Creating SparkContext...
12/08/09 00:08:41 INFO spark.DiskSpillingCache: BoundedMemoryCache.maxBytes = 2469606195
12/08/09 00:08:41 INFO spark.CacheTrackerActor: Registered actor on port 7077
12/08/09 00:08:41 INFO spark.CacheTrackerActor: Started slave cache (size 2.3GB) on ip-10-34-81-124.ec2.internal
12/08/09 00:08:41 INFO spark.MapOutputTrackerActor: Registered actor on port 7077
12/08/09 00:08:41 INFO spark.ShuffleManager: Shuffle dir: /tmp/spark-local-01ef4549-39d3-498d-8793-dc6e01d59966/shuffle
12/08/09 00:08:41 INFO server.Server: jetty-7.5.3.v20111011
12/08/09 00:08:41 INFO server.AbstractConnector: Started SelectChann...@0.0.0.0:41354 STARTING
12/08/09 00:08:41 INFO spark.ShuffleManager: Local URI: http://10.34.81.124:41354
12/08/09 00:08:41 INFO server.Server: jetty-7.5.3.v20111011
12/08/09 00:08:41 INFO server.AbstractConnector: Started SelectChann...@0.0.0.0:60578 STARTING
12/08/09 00:08:41 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.34.81.124:60578
I0809 00:08:42.153389  4157 logging.cpp:86] Logging to STDERR
12/08/09 00:08:42 INFO spark.MesosScheduler: Registered as framework ID 201208082243-2085691914-5050-3610-0001
Spark context available as sc.
Type in expressions to have them evaluated.
Type :help for more information.

Shivaram Venkataraman

unread,
Aug 9, 2012, 12:57:10 AM8/9/12
to spark...@googlegroups.com, Shivaram Venkataraman, Aurojit Panda
Could you try to set the hdfs block size to be 32MB and try again ?
You'll need to change dfs.block.size in
/root/ephemeral-hdfs/conf/hdfs-site.xml to 33554432 and restart the
namenode and datanodes.
Also you'll need to re-create your input files in HDFS for them to use
the new block size.

Thanks
Shivaram
Reply all
Reply to author
Forward
0 new messages