Training Big Data - Spark running out of memory

1,087 views
Skip to first unread message

konrad....@hawksearch.com

unread,
Jan 3, 2017, 12:42:08 PM1/3/17
to actionml-user
Hello everyone,

I am having issue with training certain engines that have a lot of rows in hbase. (full cluster setup 0.9.7aml)
For example I have this specific hbase index pio_event:events_362 which has 35,949,373 rows, and i want to train it on 3 spark workers with 8 cores each, and 16GB of memory each. (giving total of 24 cores, and 48GB of memory)

In my original understanding even if I have 100GB of data in hbase it should be able to train slower rather than just crash out with various types of errors, or that it ran out of memory.

spark-env.sh has following settings:
export SPARK_JAVA_OPTS="-Dspark.default.parallelism=2 -XX:+UseCompressedOops -Dspark.boundedMemoryCache.memoryFraction=0.8 -Dspark.locality.wait=0"

engine.json of engine view i'm trying to train (its example but shows my settings)

{
  "comment":" This config file uses default settings for all but the required values see README.md for docs",
  "id": "default",
  "description": "Default settings",
  "engineFactory": "org.template.RecommendationEngine",
  "datasource": {
    "params" : {
      "name": "",
      "appName": "temp",
      "eventNames": ["view"]
    }
  },
  "sparkConf": {
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
    "spark.kryo.registrator": "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator",
   "spark.kryo.referenceTracking": "false",
    "spark.kryoserializer.buffer": "200m",
    "spark.kryoserializer.buffer.max": "500m",
    "spark.executor.memory": "8g",
    "spark.driver.memory": "6g",
    "spark.driver.maxResultSize": "500m",
    "spark.memory.offHeap.size": "1g",
    "spark.memory.offHeap.enabled": "true",
    "es.index.auto.create": "true",

  },
  "algorithms": [
    {
      "comment": "simplest setup where all values are default, popularity based backfill, must add eventsNames",
      "name": "ur",
      "params": {
        "appName": "temp",
        "indexName": "ur_temp_view",
        "typeName": "items",
        "comment": "must have data for the first event or the model will not build, other events are optional",
        "eventNames": ["view"],
                "recsModel": "collabFiltering"
      }
    }
  ]
}



I usually problems start quite early:
here: saveAsNewAPIHadoopDataset at HBPEvents.scala:108


org.apache.spark.shuffle.FetchFailedException: GC overhead limit exceeded
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:122)
	at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:127)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.LinkedList.linkLast(LinkedList.java:142)
	at java.util.LinkedList.add(LinkedList.java:338)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:63)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)


I have tried adding more workers, but it makes little sense since its not that big data yet, and certainly its not using full amount of memory... as I see on 'stages' page on spark worker in shuffle write and read around 2.9GB of data. Even if we double it, it still isn't taking full amount of memory.

My hadoop is running on 6 servers, each with 140GB of space and 16GB of memory. (separately from spark or hbase)

Any advice would be appreciated. I've read something about tuning garbage collection but not sure where to even start.

mitchel...@gmail.com

unread,
Jan 4, 2017, 5:41:24 PM1/4/17
to actionml-user, konrad....@hawksearch.com
I am receiving the same error which always seems to be preceded with this warning.  Always seems to be happening on Stage 4 of training.  When setting up an engine the default settings work with minimal data.  However as the data grows the issue happens more often.

 Lost task 0.0 in stage 4.0 (TID 12): java.lang.OutOfMemoryError: GC overhead limit exceeded
        at sun.reflect.GeneratedSerializationConstructorAccessor135.newInstance(Unknown Source)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
        at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:706)
        at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
        at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
        at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
        at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
        at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:122)
        at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:127)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)


Here is the spark configuration from my engine.json file.

  "sparkConf": {
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
    "spark.kryo.registrator": "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator",
    "spark.kryo.referenceTracking": "false",
    "spark.kryoserializer.buffer": "64k",
    "spark.kryoserializer.buffer.max": "25m",
    "es.index.auto.create": "true",
  },

I have tried running the task with multiple different numbers for executor-memory and executor-cores never exceeding my total amount of resources.


 pio train --conf spark.port.maxRetries=10 --executor-memory 1g --executor-cores 2

Pat Ferrel

unread,
Jan 5, 2017, 11:37:14 AM1/5/17
to mitchel...@gmail.com, actionml-user, konrad....@hawksearch.com
Spark needs 2g per executor to run code with no data so increase to at least 3g for driver memory and executor. If you don’t have that much you need to run on a larger machine, we recommend 16g total mem for toy datasets and 32g for very small ones.

 
--
You received this message because you are subscribed to the Google Groups "actionml-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to actionml-use...@googlegroups.com.
To post to this group, send email to action...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/actionml-user/e382519a-c168-4731-aaca-dc3fa38776b4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

konrad....@hawksearch.com

unread,
Jan 5, 2017, 12:00:56 PM1/5/17
to actionml-user, mitchel...@gmail.com, konrad....@hawksearch.com
Thanks for response Pat, I still have some question if I understood correctly,

To successfully train my 32million hbase row data set I had to assign 30GB memory per executor node - This is a lot of memory for data set that's roughly around 6GB in raw data. As my data set grows, will it require to add more memory to every single one of my workers; or will adding in another worker server to my cluster suffice?

Pat Ferrel

unread,
Jan 5, 2017, 12:34:17 PM1/5/17
to konrad....@hawksearch.com, actionml-user, mitchel...@gmail.com
The memory per executor is related to the number of user-ids + item-ids and will increase as they increase. Otherwise adding workers to Spark will handle more data and the algorithm has a downsampling trigger that limits data used at a certain point that research has shown to be where we get highly diminishing returns.

The downsampling limit is taken from this paper by one of the inventors of the algorithm : https://ssc.io/pdf/rec11-schelter.pdf

--
You received this message because you are subscribed to the Google Groups "actionml-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to actionml-use...@googlegroups.com.
To post to this group, send email to action...@googlegroups.com.

Pat Ferrel

unread,
Jan 5, 2017, 12:39:09 PM1/5/17
to konrad....@hawksearch.com, actionml-user, mitchel...@gmail.com
Put succinctly you can limit the impact of more data in many ways, including using a time window and discarding old data. The Downsampling will kick-in eventually and you can set that to be more strict.

So resource requirements will grow at O(n) where n is the number of user-id + item-ids, not the actual events, only the possible ids. The events will be downsampled to a constant size and so eventually will not cause more resources to be needed.


mitchel...@gmail.com

unread,
Jan 7, 2017, 10:09:36 AM1/7/17
to actionml-user, konrad....@hawksearch.com, mitchel...@gmail.com
Hi Pat,
 
Thanks for the response.  I will look into limiting the data that we are training I believe that I read somewhere that this can be done in the engine.json file..  We are storing a createDate in HBASE so I am hoping this will be enough for me to be able to limit my data set

As for training, our HBASE has around 40 million entries which are divided between 3 events, (cart, view, buy).  When training each event we are erroring out.  We have three workers with with a total of 144 GB of memory between them.  We have alloted 30 GB per worker executor memory giving us a total of 90 GB of executor memory and all training is erroring with an out of memory.

As for unique items we have around 50,000 to 75,000 unique items.  I would have to parse HBASE for unique users however my guess is that it would not be above 20MM. I just wanted to confirm that this could be throwing an out of memory exception as the memery per worker seems to be a lot.

Thanks,

Mitch

Pat Ferrel

unread,
Jan 7, 2017, 3:12:30 PM1/7/17
to mitchel...@gmail.com, actionml-user, konrad....@hawksearch.com
There are many parts of the system that may throw OOM. Is it the Spark driver, the Spark executor, or something else like the JVM that launches the driver. Also remember that if on a certain machine you allocate more than is available it will OOM right away. What is your physical architecture? What processes runs on what machines and what are their resources?

I am confused by the statement that you have 144G of memory available for the executors but only use 90g, why? The driver needs roughly equal memory to the executors so think of it as another node in Spark. The driver (excluding more advanced use of Yarn) will run on the machine where you launch `pio train`. So the driver is often overlooked in configuring Spark since it’s not a worker in the cluster.

There is nothing remarkable about the size of your data so you should be able to balance all the factors above and get training to run without OOM.

Take the total memory available to Spark and divide it by # of workers + 1 (for the driver) call this number N then use the following on to control allocations

    pio train -- --driver-memory N --executor-memory N --master=spark://<ip-of-your-master>

also send your error stack trace.


mitchel...@gmail.com

unread,
Jan 9, 2017, 12:30:44 PM1/9/17
to actionml-user, mitchel...@gmail.com, konrad....@hawksearch.com
Hi Pat,  We changed our configuration a little to give the server that we run pio train 48 GB of ram.  There servers are dedicated to pio training

Here is our configuration

Worker #1 - 48 GB RAM (16 Cores)
Worked #2 - 48 GB RAM (16 Cores)

Totaling 96 GB of RAM

Server where we run pio train - 48 GB RAM (16 Cores)

With your equation you posted above we did the following 96(total worker memory)/(2(workers) + 1(server where we run pio train)) = 32 GB

We ran pio train with the following

executor-memory=32g
driver-memory=32g

The error that we got is at the following location. http://pastebin.com/S8BeHYSp

Pat Ferrel

unread,
Jan 9, 2017, 1:35:11 PM1/9/17
to mitchel...@gmail.com, actionml-user, konrad....@hawksearch.com
OK well connection failed so we seem to be past any OOM errors. A connection error can be a lot of things like security group settings, running out of disk on shuffles, etc. I’d look at executor and machine logs as well as monitor memory, CPU, and disk usage while training. A slow executor could also cause this or an executor hung up waiting for an HBase connection.

We have a bunch of settings we use on bigger data like increasing the HBase connection timeout, allowing many more files open than is default (ulimit command), we set maximum parallelism to 4x the total # of core in executors if HBase can handle that many connections. etc.

The stage you see this is maybe the first clue to look at. Are you logging event (Spark terminology here) to HDFS so you can look at them even after an job has failed? This will show and event timeline in the Spark GUI, which will tell you the job and stage that failed and give executor logs/stack traces.


Message has been deleted

mitchel...@gmail.com

unread,
Jan 10, 2017, 11:00:02 AM1/10/17
to actionml-user, mitchel...@gmail.com, konrad....@hawksearch.com
Hi Pat,

I reduced the executor cores from 16 to 8 and was able to get past step 4.  It looks like I am now getting the following error after completing Step 10.

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded (Full Stack Trace Below)


I was wondering if you had any other ideas as our data does not seem to be that vast (around 40MM records in HBASE).  It is the same set up as posted above except instead of using 16 cores i reduces it to 8.  Any insight would be appreciated.

Thanks,

Mitch

Pat Ferrel

unread,
Jan 10, 2017, 11:37:10 AM1/10/17
to mitchel...@gmail.com, actionml-user, konrad....@hawksearch.com
It looks like the BiMap used to collect user and item ids <—> integer keys is growing to fill memory. If this is true get more memory. It’s the only answer. In the default case memory is allocated per executor, and there is usually one per worker, So make the worker bigger by using a different machine. This is the one part of the algorithm that must be vertically scaled to memory.

Note that this is for ids only, not per event. Horizontal scaling (more workers) will handle this part. 


mitchel...@gmail.com

unread,
Jan 11, 2017, 3:21:34 PM1/11/17
to actionml-user, mitchel...@gmail.com, konrad....@hawksearch.com
Hi Pat,

I was able to get past stage 4 and get all the way to stage 35 filter at package.scala 73.  This stage seems to be taking a very long time.  After looking at the spark logs it looks like it is running on one worker and one core on that worker.  

Per the screenshot it shows it has been running for 35 minutes however I had it running for a couple hours before I killed it on the previous run.



Screenshot from worker that Stage 35 is running on showing that it is running on one core.


I did some searching on previous threads and it looks like the suggestion was to set ES up in a cluster which we already have (6 servers) and to set up the engine.json so that es.nodes is set to a comma separated list of all 6 servers which we have done. My question is, is there anything else that I can do to speed this up.  Is there someway to get this to use multiple cores or some setting that will speed this up?

Pat Ferrel

unread,
Jan 11, 2017, 4:10:39 PM1/11/17
to mitchel...@gmail.com, actionml-user, konrad....@hawksearch.com
Yeah, we removed the filter and some other things in the UR 0.5.1 to speed things up It will be out in a week or so. Try updating the master of the UR, then switch to the “item-sets” branch for the experimental changes. It run something like 70% of the time used by v0.3.0. It’s still experimental so let me know of you have any issues. 

mitchel...@gmail.com

unread,
Jan 11, 2017, 4:37:45 PM1/11/17
to actionml-user, mitchel...@gmail.com, konrad....@hawksearch.com
Hi Pat,

Thanks for the response.  Will UR 0.5 work with the PIO AML version or will we have to update to Salesforce.

Pat Ferrel

unread,
Jan 11, 2017, 5:14:51 PM1/11/17
to mitchel...@gmail.com, actionml-user, konrad....@hawksearch.com
It works with Apache PIO, we merged our changes with SF and they are Apache managed now. We are now Apache committers and on the same page with SF. No other fork is being actively worked on by us or SF afaik.

See actioml.com for upgrade instructions. Or visit the PIO site here: http://predictionio.incubator.apache.org


Reply all
Reply to author
Forward
0 new messages