Bagel serialization issue

80 views
Skip to first unread message

Mayuresh Kunjir

unread,
Oct 16, 2013, 6:50:06 PM10/16/13
to spark...@googlegroups.com
Hello Spark users,

I am trying to use Bagel for iterative computation. But I keep getting an exception saying the class I am calling Bagel.run method from needs to be serialized. The class can't be serialized as it holds an instance of SparkContext. Following is a code fragment from the unserializable class.

// PRVertex, PRMessage and PRCombiner are copied from the example of PageRank from Spark doc.
 
    val emptyMsgs = sc.parallelize(List[(Int, PRMessage)]())
    val emptyVerts = sc.parallelize(List[(Int, PRVertex]())

    def compute (self: PRVertex, msgs: Option[Iterable[PRMessage]], superstep: Int) 
    : (TRVertex, Array[TRMessage]) = {
       null
    }

    val result = Bagel.run(sc, emptyVerts, emptyMsgs, new PRCombiner(), 1)(compute)

As you can see, this program does nothing but call the run method in Bagel with empty arguments. Even this results in a 'not serializable' exception. Could anyone help me here?

Thanks and regards,
~Mayuresh


 

Mayuresh Kunjir

unread,
Oct 16, 2013, 6:53:32 PM10/16/13
to spark...@googlegroups.com
* A typo fixed in the code.


On Wednesday, October 16, 2013 6:50:06 PM UTC-4, Mayuresh Kunjir wrote:
Hello Spark users,

I am trying to use Bagel for iterative computation. But I keep getting an exception saying the class I am calling Bagel.run method from needs to be serialized. The class can't be serialized as it holds an instance of SparkContext. Following is a code fragment from the unserializable class.

// PRVertex, PRMessage and PRCombiner are copied from the example of PageRank from Spark doc.
 
    val emptyMsgs = sc.parallelize(List[(Int, PRMessage)]())
    val emptyVerts = sc.parallelize(List[(Int, PRVertex]())

    def compute (self: PRVertex, msgs: Option[Iterable[PRMessage]], superstep: Int) 
    : (PRVertex, Array[PRMessage]) = {

Mayuresh Kunjir

unread,
Nov 24, 2013, 12:18:02 AM11/24/13
to spark...@googlegroups.com
Can anyone help me with this? I have not had any luck with Bagel framework so far.

Posting the error message FYI.

13/11/22 22:36:19 INFO scheduler.DAGScheduler: Stage 15 (combineByKey at Bagel.scala:78) finished in 0.190 s
13/11/22 22:36:19 INFO scheduler.DAGScheduler: looking for newly runnable stages
13/11/22 22:36:19 INFO scheduler.DAGScheduler: running: Set()
13/11/22 22:36:19 INFO scheduler.DAGScheduler: waiting: Set(Stage 14)
13/11/22 22:36:19 INFO scheduler.DAGScheduler: failed: Set()
13/11/22 22:36:19 INFO scheduler.DAGScheduler: Missing parents for Stage 14: List()
13/11/22 22:36:19 INFO scheduler.DAGScheduler: Submitting Stage 14 (FlatMappedValuesRDD[117] at flatMapValues at Bagel.scala:220), which is now runnable
13/11/22 22:36:19 INFO scheduler.DAGScheduler: Failed to run foreach at Bagel.scala:237
Exception in thread "main" org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: <masked class name>
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:670)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:668)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:668)
        at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:376)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
        at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Reynold Xin

unread,
Nov 24, 2013, 4:16:45 AM11/24/13
to spark...@googlegroups.com
Serializing OuterClass.compute will serialize compute itself. You can either move compute to a Scala object, or make your outer class serializable, and mark SparkContext as transient. 


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Mayuresh Kunjir

unread,
Nov 24, 2013, 6:56:58 AM11/24/13
to spark...@googlegroups.com
I have tried moving compute to a new serializable class. But that too necessitates in serialization of outer class. I don't understand why. The only solution seems to be serializing the outer class by making SparkContext transient, although I don't like the idea much because a whole bunch of other classes accessed in the outer class have to be serialized.

Reynold Xin

unread,
Nov 24, 2013, 6:59:42 AM11/24/13
to spark...@googlegroups.com
You can do this

val compute: (PRVertex, Option[Iterable[PRMessage]], Int) => (TRVertex, Array[TRMessage])  = (self: PRVertex, msgs: Option[Iterable[PRMessage]], superstep: Int) => ...

This way, you create a closure without the outer class reference. 

Mayuresh Kunjir

unread,
Nov 24, 2013, 7:13:50 AM11/24/13
to spark...@googlegroups.com
Ah that did it. Although the program is not working yet, it has moved beyond the serialization issue. :) Thanks so much!

~Mayuresh




--
You received this message because you are subscribed to a topic in the Google Groups "Spark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/spark-users/5O28LDHtYMs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages