Error in pio train for SimilarityAnalysis.cooccurrencesIDSs : Task not serializable

261 views
Skip to first unread message

Vidhya Ramaswamy

unread,
Apr 9, 2015, 8:09:03 AM4/9/15
to predicti...@googlegroups.com
Hi

We are trying to use cooccurrencesIDSs from SimilarityAnalysis from mahout spark ItemSimilarity within predictionIO.

We followed the following steps:
1. We built an DrmRdd using a DenseVector.
2. We used this rdd to build CheckpointedDrmSpark 
3. The IndexedDataset was built using the CheckpointedDrmSpark, and passed to cooccurrencesIDSs in SimilarityAnalysis.

We get the following exception when we train the model:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.mahout.math.DenseVector
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:879)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
SimilarityAnalysis extends the Serializable. Do we need to use a serializable vector to build the DrmRdd? If so, could you please suggest a serializable vector?
If not, could you please suggest another way to call SimilarityAnalysis.cooccurrencesIDSs?

Kenneth Chan

unread,
Apr 9, 2015, 9:22:08 PM4/9/15
to predicti...@googlegroups.com
Hi,

1) looks like some other people hit similar issue but no response  in mahout group:

2) would you try using RandomAccessSparseVector instead? i see this  is how Mahout construct the IndexedDatasetSpark internally

3) worst case, you may need to implement your own serializable vector class which extends one of Mahout's vector class (eg. RandomAccessSparseVector.) with the serialization interface method.

4) site note:

looks like you can convert to DrmRdd[Int] from RDD[ViewEvents] without collect to local map first.

because essentially DrmRDD is RDD of tuple of (K -> Vector)

which means you can convert RDD[ViewEvents] to RDD[ (id -> vector) ] by using a groupBy and map

Kenneth

sagar jp

unread,
Apr 10, 2015, 10:34:46 AM4/10/15
to predicti...@googlegroups.com
We tried using the RandomAccessSparseVector but we got the same error. We are trying to implement our own Serializable vector class. We added a "serialVersionUID" to that class and still got the following error. 

[ERROR] [Executor] Exception in task 1.0 in stage 1227.0 (TID 365)
[ERROR] [Executor] Exception in task 2.0 in stage 1227.0 (TID 366)
[ERROR] [Executor] Exception in task 0.0 in stage 1227.0 (TID 364)
[ERROR] [Executor] Exception in task 3.0 in stage 1227.0 (TID 367)
[ERROR] [TaskSetManager] Task 0.0 in stage 1227.0 (TID 364) had a not serializable result: org.apache.mahout.math.DenseVector; not retrying
[ERROR] [TaskSetManager] Task 1.0 in stage 1227.0 (TID 365) had a not serializable result: org.apache.mahout.math.DenseVector; not retrying
[ERROR] [TaskSetManager] Task 3.0 in stage 1227.0 (TID 367) had a not serializable result: org.apache.mahout.math.DenseVector; not retrying
[ERROR] [TaskSetManager] Task 2.0 in stage 1227.0 (TID 366) had a not serializable result: org.apache.mahout.math.DenseVector; not retrying
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 1227.0 (TID 364) had a not serializable result: org.apache.mahout.math.DenseVector
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

What serialization attributes/methods do we need to implement? 

Thanks for the quick response!

Pat Ferrel

unread,
Apr 10, 2015, 5:02:42 PM4/10/15
to sagar jp, predicti...@googlegroups.com
There is a nasty bug in Spark 1.2+

The Javaserializer doesn’t work. There is a workaround in the Spark Jiras where you have to put some jars on all worker nodes and pass the path to them into the CLI or in creating a context. This is why the release being pushed at Mahout will support Spark 1.1.1 or less only.

Vidhya Ramaswamy

unread,
Apr 11, 2015, 11:50:33 AM4/11/15
to predicti...@googlegroups.com, jpsag...@gmail.com
Hi

Could you please tell us a bit more about the workaround?
Which jars do we add to the worker nodes?

Thanks 

Pat Ferrel

unread,
Apr 12, 2015, 1:57:24 PM4/12/15
to Vidhya Ramaswamy, predicti...@googlegroups.com, jpsag...@gmail.com
For now the recommendation is to stick with the latest known working version namely Spark 1.1.1. Is there something in Spark 1.2 that is required?

If you want you'll have to build Mahout from source and change the pom to reference Spark 1.2.1 (a more stable release than 1.2)

Reply all
Reply to author
Forward
0 new messages