spark/sbt building problem when using custom class as type parameter

385 views
Skip to first unread message

Hao REN

unread,
Jun 6, 2013, 10:07:30 AM6/6/13
to spark-de...@googlegroups.com
Hi,

I am runing spark on EC2. And i have encountered some problem with scala type parameter.

In brief, I have two tasks:

1) aggTask: serialize data of type RDD[Client], where Client is a custom case class which is serializable, and save these object serialized to S3 (RDD.saveAsObjectFile)

2) calculTask:  restore the objects from S3, RDD.objectFile[T]. Here I need a type parametre T, as I showed below: sc.objectFile[Client](objFilePath)

(sc is sparkContext)

def aggTask = {
     val myFile = sc.textFile(tableFilePath) 
     val data = myFile.map(line => line.split('\001')).map(new Row(_))

     val client_list = parseToClient(data)
     client_list.saveAsObjectFile(objFilePath)
}
 
def  calculTask = {
     val client_list = sc.objectFile[Client](objFilePath)
     
     /* some code here */
}

But when using sbt to build my project ($ sbt package run), it always ends with an error msg: 
13/06/05 12:59:54 INFO cluster.TaskSetManager: Loss was due to java.lang.ClassNotFoundException: Client

this msg is pointed to the type parameter Client in val client_list = sc.objectFile[Client](objFilePath)

I dont know why sbt can not recognize type parametre. 

I have test the same code in spark local mode. It works well with SBT.

But when running on my cluster, I encounted the same error message.

I think maybe the project package is not distributed to all the slaves, but I did create sparkContexte rightly as following:

val sc = new SparkContext("spark://" + Source.fromFile("/root/spark-ec2/masters").mkString.trim  + ":7077", "job_dist",
     System.getenv("SPARK_HOME"), Seq("/root/clarabox/target/scala-2.9.2/clarabox_2.9.2-1.0.jar"))

Maybe it's a serialization problem. Any work around here ?

Thank you all.

Hao

Austin Gibbons

unread,
Aug 20, 2013, 8:35:06 PM8/20/13
to spark-de...@googlegroups.com
Hey Hao, have you made any progress on this issue? I have just encountered the same problem, running both through sbt and as an assembled jar (even confirming the presence of the class on the workers)

Ian O'Connell

unread,
Aug 21, 2013, 1:25:17 AM8/21/13
to spark-de...@googlegroups.com
Is it possible for either of you to produce a sample repro to show the issue with steps to reproduce , along with version of spark?


--
You received this message because you are subscribed to the Google Groups "Spark Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-develope...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Hao REN

unread,
Aug 23, 2013, 8:20:50 AM8/23/13
to spark-de...@googlegroups.com, i...@ianoconnell.com
Hi,

No progress has been done to the issue.

But let's reproduce it:

On ec2, say  a file on HDFS:

 // test.csv

1;23;168
2;24;170
3;26;180

// Job.scala

import spark.SparkContext
import SparkContext._
import scala.collection.generic.Sorted

import scala.io.Source

object Job {

  case class Client(val id: Int, val age: Int, val height: Int)

  val hdfs_root = "hdfs://" + Source.fromFile("/root/spark-ec2/masters").mkString.trim + ":9010"
  val sc = new SparkContext("spark://"
    + Source.fromFile("/root/spark-ec2/masters").mkString.trim
    + ":7077", "job_dist",
    System.getenv("SPARK_HOME"), Seq("target/scala-2.9.3/clarabox_2.9.3-1.0.jar"))

  def save = {
    val myFile = sc.textFile(hdfs_root + "/data/test.csv")
    val v1 = myFile.map(_.split(";")).map(x => new Client(x(0).toInt, x(1).toInt, x(2).toInt))
    val v2 = v1.saveAsObjectFile(hdfs_root + "/obj/")
  }

  def load = {
    val v = sc.objectFile[Client](hdfs_root + "/obj/")
    v.collect.foreach(x => println(x.age))
  }

  def main(args: Array[String]) {
    save
    load
  }
}

This prog ends with an exception:

13/08/23 12:12:42 INFO cluster.TaskSetManager: Loss was due to java.lang.ClassNotFoundException: Job$Client
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1661)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1342)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at spark.Utils$.deserialize(Utils.scala:34)
at spark.SparkContext$$anonfun$objectFile$1.apply(SparkContext.scala:390)
at spark.SparkContext$$anonfun$objectFile$1.apply(SparkContext.scala:390)
at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at scala.collection.Iterator$$anon$21.foreach(Iterator.scala:437)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)
at scala.collection.Iterator$$anon$21.toBuffer(Iterator.scala:437)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)
at scala.collection.Iterator$$anon$21.toArray(Iterator.scala:437)
at spark.RDD$$anonfun$1.apply(RDD.scala:461)
at spark.RDD$$anonfun$1.apply(RDD.scala:461)
at spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:613)
at spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:613)
at spark.scheduler.ResultTask.run(ResultTask.scala:77)
at spark.executor.Executor$TaskRunner.run(Executor.scala:98)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
13/08/23 12:12:42 INFO cluster.TaskSetManager: Starting task 1.0:0 as TID 4 on executor 0: ip-10-234-17-229.ec2.internal (preferred)
13/08/23 12:12:42 INFO cluster.TaskSetManager: Serialized task 1.0:0 as 1579 bytes in 1 ms
13/08/23 12:12:42 INFO cluster.TaskSetManager: Lost TID 4 (task 1.0:0)
13/08/23 12:12:42 INFO cluster.TaskSetManager: Loss was due to java.lang.ClassNotFoundException: Job$Client [duplicate 1]
13/08/23 12:12:42 INFO cluster.TaskSetManager: Starting task 1.0:0 as TID 5 on executor 0: ip-10-234-17-229.ec2.internal (preferred)
13/08/23 12:12:42 INFO cluster.TaskSetManager: Serialized task 1.0:0 as 1579 bytes in 0 ms
13/08/23 12:12:42 INFO cluster.TaskSetManager: Lost TID 5 (task 1.0:0)
13/08/23 12:12:42 INFO cluster.TaskSetManager: Loss was due to java.lang.ClassNotFoundException: Job$Client [duplicate 2]
13/08/23 12:12:42 INFO cluster.TaskSetManager: Starting task 1.0:0 as TID 6 on executor 0: ip-10-234-17-229.ec2.internal (preferred)
13/08/23 12:12:42 INFO cluster.TaskSetManager: Serialized task 1.0:0 as 1579 bytes in 0 ms
13/08/23 12:12:42 INFO cluster.TaskSetManager: Lost TID 6 (task 1.0:0)
13/08/23 12:12:42 INFO cluster.TaskSetManager: Loss was due to java.lang.ClassNotFoundException: Job$Client [duplicate 3]
13/08/23 12:12:42 INFO cluster.TaskSetManager: Starting task 1.0:0 as TID 7 on executor 0: ip-10-234-17-229.ec2.internal (preferred)
13/08/23 12:12:42 INFO cluster.TaskSetManager: Serialized task 1.0:0 as 1579 bytes in 1 ms
13/08/23 12:12:43 INFO cluster.TaskSetManager: Lost TID 7 (task 1.0:0)
13/08/23 12:12:43 INFO cluster.TaskSetManager: Loss was due to java.lang.ClassNotFoundException: Job$Client [duplicate 4]
13/08/23 12:12:43 ERROR cluster.TaskSetManager: Task 1.0:0 failed more than 4 times; aborting job
13/08/23 12:12:43 INFO scheduler.DAGScheduler: Failed to run collect at Job.scala:26
[error] (run-main) spark.SparkException: Job failed: Task 1.0:0 failed more than 4 times
spark.SparkException: Job failed: Task 1.0:0 failed more than 4 times
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:642)
at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:640)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:640)
at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:303)
at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:364)
at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107)
13/08/23 12:12:43 INFO network.ConnectionManager: Selector thread was interrupted!
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[error] {file:/root/toolbox/template/}default-376a53/compile:run: Nonzero exit code: 1
[error] Total time: 12 s, completed Aug 23, 2013 12:12:43 PM

where ip-10-234-17-229.ec2.internal is a slave.

The slave can not recognize the custome class Client

Am I doing the right thing? or it's a bug ?

Thank you.

Hao

Hao REN

unread,
Aug 23, 2013, 8:25:28 AM8/23/13
to spark-de...@googlegroups.com, i...@ianoconnell.com
Update:

sbt package run

is used to run the prog.

Please ignore the name of the jar in SparkContext, clarabox_2.9.3-1.0.jar can be the name set in your build.sbt file.

val sc = new SparkContext("spark://"
    + Source.fromFile("/root/spark-ec2/masters").mkString.trim
    + ":7077", "job_dist",
    System.getenv("SPARK_HOME"), Seq("target/scala-2.9.3/clarabox_2.9.3-1.0.jar"))


Ian O'Connell

unread,
Aug 27, 2013, 5:14:05 PM8/27/13
to spark-de...@googlegroups.com
Sorry about the delay, I had a bit of time to look at this today. It looks like the class loader hit by the internal spark code ends up being different to that of the user code so it can't find the class. If I have a chance later I'll look at seeing if there is some better class/class loader stuff to make it behave nicer (I found actually an old thread from the user group where I ran into this and put a workaround to serialize everything with Kryo).

def deserialize[T](bytes: Array[Byte]): T = {
    val bis = new ByteArrayInputStream(bytes)
    val ois = new ObjectInputStream(bis)
    return ois.readObject.asInstanceOf[T]
  }

  def objectFile[T: ClassManifest](
    context: SparkContext,
    path: String): RDD[T] = {
    context.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable])
      .flatMap(x => deserialize[Array[T]](x._2.getBytes))
  }
  def load(context: SparkContext, hdfs_root: String) = {
    val v = context.objectFile[Client](context, hdfs_root + "/obj/")
    v.collect.foreach(x => println(x.age))
  }
  

as the loading operation seems to work
Reply all
Reply to author
Forward
0 new messages