I'm running Spark on CDH 5.5.2 which has a dependency on Akka 2.2.3.
I'm attempting to create an actorStream in Spark, but during the creation of the actor I get the following exception.
Caused by: java.lang.IllegalArgumentException: constructor public akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with arguments [class java.lang.Class, class org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2] at akka.util.Reflect$.instantiate(Reflect.scala:69)
Which traces back to https://github.com/akka/akka/blob/v2.2.3/akka-actor/src/main/scala/akka/actor/Props.scala#L337.
Below is a code snippet of my code.
class SampleActorReceiver(url: String) extends Actor with ActorHelper {
lazy private val remotePublisher = context.actorSelection(url)
override def preStart() {
remotePublisher ! SubscribeReceiver(context.self)
}
def receive = {
case msg => store(msg.asInstanceOf[String])
}
}
object ActorStreaming extends App {
val system = ActorSystem("ActorStreaming")
val feederRef = system.actorOf(Props[FeederActor], "FeederActor")
val sparkConf = new SparkConf().setAppName("Actor Streaming")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val url = feederRef.path.toStringWithAddress(Address("akka.tcp",
system.name, InetAddress.getLocalHost.getHostAddress, 2552))
val props = Props.create(classOf[SampleActorReceiver], url)
val lines = ssc.actorStream[String](props, "SampleReceiver")
lines.flatMap(_.split("\\s+")).map(x => (x,1)).reduceByKey( _ + _).print()
ssc.start()
ssc.awaitTermination()
system.awaitTermination()
}
There appears to be an issue with the Creator function being improperly serialized when spark sends the function from the driver to the receiver. Are there any workarounds, or is there something wrong with my code. Btw it works if I run it locally using spark submit, but fails if I submit it on the CDH cluster.
Thanks in advance.