Exception during creation of ActorReceiver in Spark CDH 5.5.2

38 views
Skip to first unread message

Ricky

unread,
Aug 30, 2016, 7:04:30 PM8/30/16
to Akka User List
I'm running Spark on CDH 5.5.2 which has a dependency on Akka 2.2.3.

I'm attempting to create an actorStream in Spark, but during the creation of the actor I get the following exception.
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2] at akka.util.Reflect$.instantiate(Reflect.scala:69)

Which traces back to https://github.com/akka/akka/blob/v2.2.3/akka-actor/src/main/scala/akka/actor/Props.scala#L337.

Below is a code snippet of my code.

class SampleActorReceiver(url: String) extends Actor with ActorHelper {
  
  lazy private val remotePublisher = context.actorSelection(url)

  override def preStart() {
    remotePublisher ! SubscribeReceiver(context.self)
  }

  def receive = {
     case msg => store(msg.asInstanceOf[String])
   }
}

object ActorStreaming extends App {

    val system = ActorSystem("ActorStreaming")
    val feederRef = system.actorOf(Props[FeederActor], "FeederActor")

    val sparkConf = new SparkConf().setAppName("Actor Streaming")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val url = feederRef.path.toStringWithAddress(Address("akka.tcp", system.name, InetAddress.getLocalHost.getHostAddress, 2552))

    val props = Props.create(classOf[SampleActorReceiver], url)
   
    val lines = ssc.actorStream[String](props, "SampleReceiver")

    lines.flatMap(_.split("\\s+")).map(x => (x,1)).reduceByKey( _ + _).print()

    ssc.start()
    ssc.awaitTermination()
   
    system.awaitTermination()

}

There appears to be an issue with the Creator function being improperly serialized when spark sends the function from the driver to the receiver. Are there any workarounds, or is there something wrong with my code. Btw it works if I run it locally using spark submit, but fails if I submit it on the CDH cluster.

Thanks in advance.

Akka Team

unread,
Sep 8, 2016, 8:56:59 AM9/8/16
to Akka User List
Hi Rick,

The error message hints about providing an incorrect list of parameters: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]

Note the second parameter being a specific lambda expression, so something about serializing lambdas.

The chances of getting a good answer may be better in a more Spark focused forum (see here: https://spark.apache.org/community.html)

--
Johan
Akka Team

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Lightbend - Reactive apps on the JVM
Twitter: @akkateam
Reply all
Reply to author
Forward
0 new messages