org.apache.spark.SparkException: Task not serializable

948 views
Skip to first unread message

周梦想

unread,
Aug 3, 2017, 11:49:18 PM8/3/17
to Akka User List

my main file
/**
 * Author: zhouhh
 * Date  : 2017-06-26
 *
 * main file,start web server, init akka ,spark streaming
 */
object WebServer extends Directives {

  val log = Logger(LoggerFactory.getLogger("WebServer"))
  log.info("==========enter WebServer init========")

  //init ActorSystem
  implicit val system = ActorSystem("recommed_System")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  //init spark streaming
  val sparkConf: SparkConf = new SparkConf().setMaster(Config.sparkMaster).setAppName("WebUserActionConsumer")
  val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(Config.batchDuration))

  //init routes
  val routes: Route = BaseRoutes.baseRoutes ~
    new RcmdRoutes().rcmdRoutes ~
    SimpleRoutes.simpleRoutes

  def main(args: Array[String]) {

    log.debug("==========enter WebServer beginning========")


    val userActConsumerActor = system.actorOf(Props(new UserActConsumerActor(ssc)), name = "UserActConsumerActor")

    userActConsumerActor ! UserActMsgConsumer


    val bindingFuture = Http().bindAndHandle(routes, Config.host, Config.port)

    log.info(s"Server online at ${Config.host}:${Config.port}/\nPress RETURN to stop...")

    ssc.start()
    ssc.awaitTermination()

    log.info(s"Server online at ${Config.host}:${Config.port}/\nPress RETURN to stop...")

    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done

  }


my spark streaming kafka consumer file

class UserActConsumerActor(ssc: StreamingContext) extends Actor {
  val logger = Logger(LoggerFactory.getLogger("UserActConsumerActor"))

  val userActMsgParseActor: ActorRef = context.actorOf(Props[UserActMsgParseActor], name = "UserActMsgParseActor")

  def receive = {

    case UserActMsgConsumer => parse()
    case e => logger.info(s"error receive of user act msg:  $e")
  }

  val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
    ssc,
    PreferConsistent,
    Subscribe[String, String](Config.useracttopics.split(","), Config.kafkaParams)
  )

  def parse(): Unit = {

    stream.foreachRDD(rdd => rdd.foreach(x => {
      logger.info("======user act value:====\n")
      logger.info(x.value()) //value is String

      Try {
        x.value().parseJson.convertTo[UserActMsg]
      } match {
        case Success(msg) => userActMsgParseActor ! msg
        //        case Success(msg) => context.parent ! msg
        case Failure(x) => logger.error(s"$x") //println(s"$x") //
        //        case Success(msg) => println(msg)

      }

    }))
  }

}


when i run webserver, it will report the error as the Title repeatly.

17/08/04 11:40:15 ERROR JobScheduler: Error running job streaming job 1501818015000 ms.0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
at com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(UserActConsumerActor.scala:46)
at com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1.apply(UserActConsumerActor.scala:46)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.ronglian.actor.UserActConsumerActor
Serialization stack:
- object not serializable (class: com.ronglian.actor.UserActConsumerActor, value: com.ronglian.actor.UserActConsumerActor@a711490)
- field (class: com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1, name: $outer, type: class com.ronglian.actor.UserActConsumerActor)
- object (class com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1, <function1>)
- field (class: com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1$$anonfun$apply$1, name: $outer, type: class com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1)
- object (class com.ronglian.actor.UserActConsumerActor$$anonfun$parse$1$$anonfun$apply$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 30 more


I don't know how to serialize the UserActConsumerActor class.
Any one give me suggestion?


Justin du coeur

unread,
Aug 4, 2017, 8:35:26 AM8/4/17
to akka...@googlegroups.com
I don't know Spark, so I don't know quite what this is trying to do, but Actors typically are not serializable -- you send the ActorRef for the Actor, not the Actor itself.  I'm not sure it even makes any sense semantically to try to serialize and send an Actor...

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

周梦想

unread,
Aug 5, 2017, 2:07:31 AM8/5/17
to Akka User List
thank you. it is cause by spark streaming closure, if it reference to member variable or method, it needs the object and all it's member to be serialized  . but I don't know how to solve the problem.

在 2017年8月4日星期五 UTC+8下午8:35:26,Justin du coeur写道:
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

Patrik Nordwall

unread,
Aug 5, 2017, 2:43:16 AM8/5/17
to Akka User List
Perhaps try to define props factory method in companion object, see recommended practises in docs http://doc.akka.io/docs/akka/current/scala/actors.html#creating-actors

/Patrik
Reply all
Reply to author
Forward
0 new messages