Ask about java.io.NotSerializableException

559 views
Skip to first unread message

feng zhao

unread,
May 23, 2012, 11:04:59 PM5/23/12
to spark...@googlegroups.com
Hi,

I am implementing a graph algorithm in spark framework, which is to compute the number of triangles for each edge.
The input graph is stored as a text file with one edge in each line, and the output is the edge with a int vale meaning the number of triangles, such as
(1,2), 2
(3,1), 1
...

The source code is in the attachment. Unfortunately, I got the following running time error:

Exception in thread "Thread-44" java.io.NotSerializableException: comp.nus.sociallinks.spark.main.ComputeTriangle
        - field (class "comp.nus.sociallinks.spark.main.ComputeTriangle$$anonfun$3", name: "$outer", type: "class comp.nus.sociallinks.spark.main.ComputeTriangle")
        - object (class "comp.nus.sociallinks.spark.main.ComputeTriangle$$anonfun$3", <function1>)
        - field (class "spark.FlatMappedRDD", name: "f", type: "interface scala.Function1")
        - object (class "spark.FlatMappedRDD", spark.FlatMappedRDD@5867df9)
        - field (class "spark.Dependency", name: "rdd", type: "class spark.RDD")
        - object (class "spark.OneToOneDependency", spark.OneToOneDependency@604788d5)
        - custom writeObject data (class "scala.collection.immutable.$colon$colon")
        - object (class "scala.collection.immutable.$colon$colon", List(spark.OneToOneDependency@604788d5))
        - field (class "spark.MappedRDD", name: "dependencies", type: "class scala.collection.immutable.List")
        - object (class "spark.MappedRDD", spark.MappedRDD@5c3a835d)
        - field (class "spark.Dependency", name: "rdd", type: "class spark.RDD")
        - object (class "spark.RangeDependency", spark.RangeDependency@23e45a5c)
        - custom writeObject data (class "scala.collection.immutable.$colon$colon")
        - object (class "scala.collection.immutable.$colon$colon", List(spark.RangeDependency@23e45a5c, spark.RangeDependency@64e8606c))
        - field (class "spark.UnionRDD", name: "dependencies", type: "class scala.collection.immutable.List")
        - object (class "spark.UnionRDD", spark.UnionRDD@6b541147)
        - field (class "spark.ShuffleMapTask", name: "rdd", type: "class spark.RDD")
        - root object (class "spark.ShuffleMapTask", ShuffleMapTask(2, 0))
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1153)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:326)
        at scala.collection.immutable.$colon$colon.writeObject(List.scala:399)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:945)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1461)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:326)
        at scala.collection.immutable.$colon$colon.writeObject(List.scala:399)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:945)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1461)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:326)
        at spark.JavaSerializationStream.writeObject(JavaSerializer.scala:7)
        at spark.JavaSerializerInstance.serialize(JavaSerializer.scala:26)
        at spark.SimpleJob.slaveOffer(SimpleJob.scala:178)
        at spark.MesosScheduler$$anonfun$resourceOffers$1$$anonfun$apply$2.apply$mcVI$sp(MesosScheduler.scala:209)
        at spark.MesosScheduler$$anonfun$resourceOffers$1$$anonfun$apply$2.apply(MesosScheduler.scala:208)
        at spark.MesosScheduler$$anonfun$resourceOffers$1$$anonfun$apply$2.apply(MesosScheduler.scala:208)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:704)
        at scala.collection.immutable.Range.foreach(Range.scala:75)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:703)
        at spark.MesosScheduler$$anonfun$resourceOffers$1.apply(MesosScheduler.scala:208)
        at spark.MesosScheduler$$anonfun$resourceOffers$1.apply(MesosScheduler.scala:205)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:44)
        at spark.MesosScheduler.resourceOffers(MesosScheduler.scala:205)
12/05/24 10:33:54 INFO MesosScheduler: driver.run() returned with code DRIVER_ABORTED


Could anyone provide some advices to solve it? Thanks very much! By the way, I follow the previous solution to set -Dsun.io.serialization.extendedDebugInfo=true, the information is inside the above error output. Although I cannot understand the meaning inside, hope it is helps to find out what's wrong in my code. 

Zhao Feng 



ComputeTriangle.scala

feng zhao

unread,
May 23, 2012, 11:13:45 PM5/23/12
to spark...@googlegroups.com
One strange thing is I first compute the triangle following the code style in the examples in spark. That is, put all the code into the main function, it can successfully run without NotSerializableException.
 
However, i expect to extract the compute as a class to let other class to call it, this lead the error I find in the last post. This is something I cannot understand why. The previous main is as follows.

  def main( args: Array[String]) {
 
    if (args.length < 2) {
      System.err.println("Usage: kMutualFriendNaive <master> <infile> [<outfile>]")
      System.exit(1)
    }
 
    val spark = new SparkContext(args(0), "ComputeTriangle")
    val k = args(1).toInt
    
    val edges = spark.textFile(args(2)).cache
    val edgePairs = edges.map(line => {
                            val fields = line.split("\t")
                            val vertex1 = fields(0).trim
                            val vertex2 = fields(1).trim
                            val pair = if(vertex1.toInt < vertex2.toInt){
                                (vertex1, vertex2)
                            }
                            else{
                                (vertex2, vertex1)
                            }
                            (pair)
                    })
    
    val result= computeTriangles(edgePairs) 
    
    
    if(args.length==3) {
   result.saveAsTextFile(args(3))
    }
  else{
        result.saveAsTextFile("data/out")

Reynold Xin

unread,
May 24, 2012, 1:47:46 AM5/24/12
to spark...@googlegroups.com
The following flatMap calls getTriadsFromAdjacencyList as part of the closure, and as a result, requires a reference to "this", which is an instance of the ComputeTriangle class and not serializable. Spark serializes all variables that are referenced in the closures and send them to the slave nodes for execution.

   def computeTriangles(vertexPairs: RDD[(String, String)]): RDD[((String, String), Int)] = {
    val triads = vertexPairs.groupByKey()
      .flatMap(adjacencyList => getTriadsFromAdjacencyList(adjacencyList))

To get rid of the serialization error, just make the class serializable, i.e.:

class ComputeTriangle(masterURL:String, val inFilePath: String) extends Serializable {
 
--
Reynold Xin
Algorithms, Machines, People Lab | Database Group
Electrical Engineering and Computer Science, UC Berkeley

feng zhao

unread,
May 24, 2012, 3:46:01 AM5/24/12
to spark...@googlegroups.com
Thanks a lot! The problem is solved.
Reply all
Reply to author
Forward
0 new messages