I am implementing a graph algorithm in spark framework, which is to compute the number of triangles for each edge.
The input graph is stored as a text file with one edge in each line, and the output is the edge with a int vale meaning the number of triangles, such as
...
The source code is in the attachment. Unfortunately, I got the following running time error:
Exception in thread "Thread-44" java.io.NotSerializableException: comp.nus.sociallinks.spark.main.ComputeTriangle
- field (class "comp.nus.sociallinks.spark.main.ComputeTriangle$$anonfun$3", name: "$outer", type: "class comp.nus.sociallinks.spark.main.ComputeTriangle")
- object (class "comp.nus.sociallinks.spark.main.ComputeTriangle$$anonfun$3", <function1>)
- field (class "spark.FlatMappedRDD", name: "f", type: "interface scala.Function1")
- object (class "spark.FlatMappedRDD", spark.FlatMappedRDD@5867df9)
- field (class "spark.Dependency", name: "rdd", type: "class spark.RDD")
- object (class "spark.OneToOneDependency", spark.OneToOneDependency@604788d5)
- custom writeObject data (class "scala.collection.immutable.$colon$colon")
- object (class "scala.collection.immutable.$colon$colon", List(spark.OneToOneDependency@604788d5))
- field (class "spark.MappedRDD", name: "dependencies", type: "class scala.collection.immutable.List")
- object (class "spark.MappedRDD", spark.MappedRDD@5c3a835d)
- field (class "spark.Dependency", name: "rdd", type: "class spark.RDD")
- object (class "spark.RangeDependency", spark.RangeDependency@23e45a5c)
- custom writeObject data (class "scala.collection.immutable.$colon$colon")
- object (class "scala.collection.immutable.$colon$colon", List(spark.RangeDependency@23e45a5c, spark.RangeDependency@64e8606c))
- field (class "spark.UnionRDD", name: "dependencies", type: "class scala.collection.immutable.List")
- object (class "spark.UnionRDD", spark.UnionRDD@6b541147)
- field (class "spark.ShuffleMapTask", name: "rdd", type: "class spark.RDD")
- root object (class "spark.ShuffleMapTask", ShuffleMapTask(2, 0))
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1153)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:326)
at scala.collection.immutable.$colon$colon.writeObject(List.scala:399)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:945)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1461)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:326)
at scala.collection.immutable.$colon$colon.writeObject(List.scala:399)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:945)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1461)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:326)
at spark.JavaSerializationStream.writeObject(JavaSerializer.scala:7)
at spark.JavaSerializerInstance.serialize(JavaSerializer.scala:26)
at spark.SimpleJob.slaveOffer(SimpleJob.scala:178)
at spark.MesosScheduler$$anonfun$resourceOffers$1$$anonfun$apply$2.apply$mcVI$sp(MesosScheduler.scala:209)
at spark.MesosScheduler$$anonfun$resourceOffers$1$$anonfun$apply$2.apply(MesosScheduler.scala:208)
at spark.MesosScheduler$$anonfun$resourceOffers$1$$anonfun$apply$2.apply(MesosScheduler.scala:208)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:704)
at scala.collection.immutable.Range.foreach(Range.scala:75)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:703)
at spark.MesosScheduler$$anonfun$resourceOffers$1.apply(MesosScheduler.scala:208)
at spark.MesosScheduler$$anonfun$resourceOffers$1.apply(MesosScheduler.scala:205)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:44)
at spark.MesosScheduler.resourceOffers(MesosScheduler.scala:205)
12/05/24 10:33:54 INFO MesosScheduler: driver.run() returned with code DRIVER_ABORTED
Could anyone provide some advices to solve it? Thanks very much! By the way, I follow the previous solution to set -Dsun.io.serialization.extendedDebugInfo=true, the information is inside the above error output. Although I cannot understand the meaning inside, hope it is helps to find out what's wrong in my code.
Zhao Feng