Re: pio ur template 0.3.0, eventWindow problem, the train failed

65 views
Skip to first unread message

Pat Ferrel

unread,
Apr 28, 2016, 11:48:20 AM4/28/16
to adam....@gmail.com, predictionio-user, actionml-user
How many days are in your EventServer now? They expected us is to remove only events that have aged out since the last training so it will generally run pretty fast. If you are removing a lot or de-duping many events it will put more load on HBase.

If a failure happens during cleaning, the EventServer should be in a consistent state but not all work will have been done so another pass may be required. We researched creating a temp table with HBase, then swapping but this is not supported so settled on a modify-in-place method for changing the EventStore. 

That said, the failure is not good but can be caused by lack of resources like memory or disk. In this case is it possible that temp Spark storage is running out? This is usually in /tmp of each Spark executor. Depending on how recent your version of Spark is, there may be lots of old unneeded temp files there, which you can safely delete.

Guessing from the output error: "Missing an output location” and the fact that it’s during a shuffle, this can be caused by not enough temp storage or not enough memory. An on-disk shuffle sometimes can be avoided by giving the task more memory.

BTW we are moving discussions of the Universal Recommender to its own group for faster support here: https://groups.google.com/forum/#!forum/actionml-user 


On Apr 28, 2016, at 7:02 AM, adam....@gmail.com wrote:

Okay, it's clear in the documentation:
eventWindow: This is optional and controls how much of the data in the EventServer to keep and how to compress events. The default it to not have a time window and do no compression. This will compact and drop old events from the EventServer permanently in the persisted data—so make sure to have some other archive of events it you are playing with the timeWindow: duration

Should I increase the timeout of the executors?

2016. április 28., csütörtök 16:00:28 UTC+2 időpontban adam....@gmail.com a következőt írta:
Hello everybody!

There is an event storage with 15731525 event. I tried to run the train, with the eventWindow setting:
"eventWindow": {
        "duration": "28 days",
        "removeDuplicates": false,
        "compressProperties": false
      }

After an hour of train, one of the executor lost and the 8 stage was failed. With the eventWindow.duration property enabled, will be the older events deleted in the Hbase, or it creates a "temporary table" for the events in the time window? 
If it creates only a temp table, the creation of this table takes too much time to use it. I've attached a html, the view of the spark ui maybe it helps.

[Stage 8:>                                                          (0 + 3) / 3][WARN] [HeartbeatReceiver] Removing executor 1 with no recent heartbeats: 163511 ms exceeds timeout 120000 ms
[ERROR] [TaskSchedulerImpl] Lost executor 1 on sparkslave01.profession.hu: Executor heartbeat timed out after 163511 ms
[WARN] [TaskSetManager] Lost task 2.0 in stage 8.0 (TID 23, sparkslave01.profession.hu): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 163511 ms
[WARN] [TaskSetManager] Lost task 0.0 in stage 8.0 (TID 21, sparkslave01.profession.hu): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 163511 ms
[WARN] [TransportChannelHandler] Exception in connection from sparkslave01.profession.hu/172.31.9.45:35889
[ERROR] [TaskSchedulerImpl] Lost executor 1 on sparkslave01.profession.hu: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[WARN] [TaskSetManager] Lost task 0.1 in stage 8.0 (TID 24, sparkslave01.profession.hu): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[Stage 8:>                                                          (0 + 3) / 3][WARN] [HeartbeatReceiver] Removing executor 0 with no recent heartbeats: 176955 ms exceeds timeout 120000 ms
[ERROR] [TaskSchedulerImpl] Lost executor 0 on sparkmaster.profession.hu: Executor heartbeat timed out after 176955 ms
[WARN] [TaskSetManager] Lost task 0.2 in stage 8.0 (TID 26, sparkmaster.profession.hu): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 176955 ms
[WARN] [TaskSetManager] Lost task 1.0 in stage 8.0 (TID 22, sparkmaster.profession.hu): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 176955 ms
[WARN] [TaskSetManager] Lost task 2.1 in stage 8.0 (TID 25, sparkmaster.profession.hu): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 176955 ms
[WARN] [TransportChannelHandler] Exception in connection from sparkmaster.profession.hu/172.31.3.141:26291
[ERROR] [TaskSchedulerImpl] Lost executor 0 on sparkmaster.profession.hu: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[WARN] [TaskSetManager] Lost task 1.1 in stage 8.0 (TID 28, sparkmaster.profession.hu): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[WARN] [TaskSetManager] Lost task 2.2 in stage 8.0 (TID 27, sparkslave01.profession.hu): FetchFailed(null, shuffleId=3, mapId=-1, reduceId=2, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
        at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
        at org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:121)
        at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:127)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

)
[WARN] [TaskSetManager] Lost task 0.3 in stage 8.0 (TID 29, sparkslave01.profession.hu): FetchFailed(null, shuffleId=3, mapId=-1, reduceId=0, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
        at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
        at org.apache.spark.rdd.SubtractedRDD.integrate$1(SubtractedRDD.scala:121)
        at org.apache.spark.rdd.SubtractedRDD.compute(SubtractedRDD.scala:127)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)

Regards,
Adam Krajcs

Pat Ferrel

unread,
Apr 28, 2016, 12:16:05 PM4/28/16
to adam....@gmail.com, actionml-user
I was looking at the HTML you sent and there is code in the SelfCleaningDataSource that would be sensitive to how many events are to be removed. You must have a lot so maybe you should wait until we make this more efficient. Looking at that now.

A work around is to give the task lots of memory, more than will be needed once we fix this.


--
You received this message because you are subscribed to the Google Groups "actionml-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to actionml-use...@googlegroups.com.
To post to this group, send email to action...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/actionml-user/A0ECBB7D-8279-4755-9CC3-1224977C1EFD%40occamsmachete.com.
For more options, visit https://groups.google.com/d/optout.

adam....@gmail.com

unread,
Apr 29, 2016, 3:40:26 AM4/29/16
to actionml-user, adam....@gmail.com
First of all, thanks for the info! There are around 60 days in the main event storage. I made a workaround to solve the "garbage collection" problem. Another app was created to store the events parallelly. It contains only 20 days, so it think the eventwindow will work on this dataset.
Anyway I'll try with more memory the train.


Regards,
Adam
Reply all
Reply to author
Forward
0 new messages