I got OutOfMemoryError When using sc.parallelize(data) add data to spark.RDD many times, any help, thanks!

437 views
Skip to first unread message

Xianying He

unread,
Jan 8, 2013, 3:45:09 AM1/8/13
to spark...@googlegroups.com
Dear sir,
           I got OutOfMemoryError When using sc.parallelize(data)  add data to spark.RDD many times,
           the Pseudo code like follows,
         
           def  lines:spark.RDD[String]=sc.textFile(fileUrl,8)
           ......................
         
          for(i <-0 to 10000)
            lines.++(sc.parallelize(data))
         
          this run many time,will got error like follows,

14:18:49,321 INFO  BoundedMemoryCache :: BoundedMemoryCache.maxBytes = 358097879
14:18:49,322 INFO  CacheTrackerActor :: Registered actor on port 7077
14:18:49,323 INFO  MapOutputTrackerActor :: Registered actor on port 7077
14:18:49,323 INFO  ShuffleManager :: Shuffle dir: /tmp/spark-local-e4cac498-862c-47e2-a522-0f9708ca9e41/shuffle
14:18:49,326 INFO  Server :: jetty-7.5.3.v20111011
14:18:49,337 INFO  AbstractConnector :: Started SelectChann...@0.0.0.0:33632 STARTING
14:18:49,338 INFO  ShuffleManager :: Local URI: http://192.168.1.56:33632
14:18:49,354 INFO  FileInputFormat :: Total input paths to process : 1
14:18:49,357 INFO  BoundedMemoryCache :: BoundedMemoryCache.maxBytes = 358097879
14:18:49,358 INFO  CacheTrackerActor :: Registered actor on port 7077
14:18:49,359 INFO  MapOutputTrackerActor :: Registered actor on port 7077
14:18:49,359 INFO  ShuffleManager :: Shuffle dir: /tmp/spark-local-8c86f7e3-ee42-4718-8647-93c645419b5d/shuffle
14:18:49,361 INFO  Server :: jetty-7.5.3.v20111011
14:18:49,368 WARN  AbstractLifeCycle :: FAILED qtp396857438{8<=3<=3/254,0}#FAILED: java.lang.OutOfMemoryError: unable to create new native thread
java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:640)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.startThread(QueuedThreadPool.java:435)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.doStart(QueuedThreadPool.java:103)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:59)
    at org.eclipse.jetty.util.component.AggregateLifeCycle.doStart(AggregateLifeCycle.java:45)
    at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:53)
    at org.eclipse.jetty.server.handler.HandlerWrapper.doStart(HandlerWrapper.java:90)
    at org.eclipse.jetty.server.Server.doStart(Server.java:261)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:59)
    at spark.HttpServer.start(HttpServer.scala:39)
    at spark.ShuffleManager.initialize(ShuffleManager.scala:72)
    at spark.ShuffleManager.<init>(ShuffleManager.scala:19)
    at spark.SparkEnv$.createFromSystemProperties(SparkEnv.scala:39)
    at spark.SparkContext.<init>(SparkContext.scala:57)
    at spark.cbir.index.CbirServerV11.sc(CbirServerV11.scala:421)
    at spark.cbir.index.CbirServerV11.addRDD(CbirServerV11.scala:427)
    at cn.ac.trimps.vsd.cbir.thrift.common.RecvImgFeatures.run(RecvImgFeatures.java:57)

   any help thanks!

Matei Zaharia

unread,
Jan 11, 2013, 2:03:37 AM1/11/13
to spark...@googlegroups.com
Ah, yes, what you wrote is not going to work. You are creating a dependency chain 10,000 elements long among your RDDs -- that's going to lead to some really deep recursion and probably make Java run out of stack space. There are much better ways to implement this, such as doing a flatMap that just returns each input line multiple times. Basically when you do a Spark operation, like ++, you are *not* modifying a dataset; you're creating a new dataset with a dependency on the two that you've unioned, so you build up this big binary tree of datasets.

If you really want to make one big RDD by having many copies of a small one, here's a better way:

val lines = sc.textFile(fileUrl,8)
val array = new Array[RDD[String]](10000)
for (i <- 0 until 10000) {
array(i) = lines
}
val data = new UnionRDD(sc, array)

This creates one "union" dataset that has 10,000 references to the same one, but at least it's not a giant chain of dependencies. However the flatMap method I mentioned is even better.

Matei

Prashant Sharma

unread,
Jan 11, 2013, 2:09:25 AM1/11/13
to spark...@googlegroups.com

what exactly are you doing here? See highlighted text below.
--
s

Matei Zaharia

unread,
Jan 11, 2013, 2:16:19 AM1/11/13
to spark...@googlegroups.com
I'm just setting element i of the array to lines (like array[i] = lines in Java). So "array" will be an array of 10000 pointers to "lines", which is what he was doing before.

I'm doing that to use the UnionRDD constructor that takes an array of RDDs to union.

Matei

He XianYing

unread,
Jan 18, 2013, 1:35:14 AM1/18/13
to spark...@googlegroups.com
Thanks for all ansers.
It's like this some mistake I had made with different spark versions, now it works well.

2013/1/11 Matei Zaharia <ma...@eecs.berkeley.edu>
Reply all
Reply to author
Forward
0 new messages