Hello everyone,
I am having issue with training certain engines that have a lot of rows in hbase. (full cluster setup 0.9.7aml)
For example I have this specific hbase index pio_event:events_362 which has 35,949,373 rows, and i want to train it on 3 spark workers with 8 cores each, and 16GB of memory each. (giving total of 24 cores, and 48GB of memory)
In my original understanding even if I have 100GB of data in hbase it should be able to train slower rather than just crash out with various types of errors, or that it ran out of memory.
spark-env.sh has following settings:
export SPARK_JAVA_OPTS="-Dspark.default.parallelism=2 -XX:+UseCompressedOops -Dspark.boundedMemoryCache.memoryFraction=0.8 -Dspark.locality.wait=0"
engine.json of engine view i'm trying to train (its example but shows my settings)
{
"comment":" This config file uses default settings for all but the required values see README.md for docs",
"id": "default",
"description": "Default settings",
"engineFactory": "org.template.RecommendationEngine",
"datasource": {
"params" : {
"name": "",
"appName": "temp",
"eventNames": ["view"]
}
},
"sparkConf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.kryo.registrator": "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator",
"spark.kryo.referenceTracking": "false",
"spark.kryoserializer.buffer": "200m",
"spark.kryoserializer.buffer.max": "500m",
"spark.executor.memory": "8g",
"spark.driver.memory": "6g",
"spark.driver.maxResultSize": "500m",
"spark.memory.offHeap.size": "1g",
"spark.memory.offHeap.enabled": "true",
"es.index.auto.create": "true",
},
"algorithms": [
{
"comment": "simplest setup where all values are default, popularity based backfill, must add eventsNames",
"name": "ur",
"params": {
"appName": "temp",
"indexName": "ur_temp_view",
"typeName": "items",
"comment": "must have data for the first event or the model will not build, other events are optional",
"eventNames": ["view"],
"recsModel": "collabFiltering"
}
}
]
}
I usually problems start quite early:
here: saveAsNewAPIHadoopDataset at HBPEvents.scala:108org.apache.spark.shuffle.FetchFailedException: GC overhead limit exceeded
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:122)
at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:127)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.LinkedList.linkLast(LinkedList.java:142)
at java.util.LinkedList.add(LinkedList.java:338)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:63)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
I have tried adding more workers, but it makes little sense since its not that big data yet, and certainly its not using full amount of memory... as I see on 'stages' page on spark worker in shuffle write and read around 2.9GB of data. Even if we double it, it still isn't taking full amount of memory.
My hadoop is running on 6 servers, each with 140GB of space and 16GB of memory. (separately from spark or hbase)
Any advice would be appreciated. I've read something about tuning garbage collection but not sure where to even start.