Spark-Streaming custom MailReceiver

427 views
Skip to first unread message

Osahon Osabuohien

unread,
Apr 6, 2014, 2:55:02 PM4/6/14
to akka...@googlegroups.com
Hi All,

I am trying to implement a custom email receiver using akka-camel in a spark application but I keep getting runtime exceptions.. I have tried to figure it out but I am drawing blanks right now. Please anybody out there who can help? I have a main class (Trafiki.scala) configuring the spark application and getting the streaming context as well as calling the collect method of a MailCollector.scala class. The collect(ssc) starts the custom stream Dummy.scala via ssc.actorStream[T](...) and format messages. The Dummy.scala is the class that is supposed to implement a custom stream and use akka-camel and Actors to consume as well as format Mail messages received.... I have posted the source codes as well as error messages below. Also for some reason I get an exception indicating ActorSystem isn't unique

Thanks for any help

-------------------------Dummy.scala-------------------------------------------

import akka.camel.{Consumer, CamelMessage}
import akka.actor.{ActorRef, Props, Actor}
import java.util.Properties
import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.Logging

class Dummy extends Actor with Receiver with Consumer {

  def endpointUri = getMailUri

  var gmailActor: ActorRef = _

  override def preStart = {

    gmailActor = context.system.actorOf(Props[Dummy])

    //val camel=CamelExtension(context.parent.)
    // camel.context.addRoutes(new Routebuilder()...)
  }
  import akka.camel._
  override def onRouteDefinition = {

    rd => rd.from(getUri).process(new EmailMessageProcessor).to(gmailActor)//EmailMessageProcessor for cleaning the returned MailMessage 

  }

  def getUri = {
    val props: Properties = new Properties()
    props.load(this.getClass.getClassLoader.getResourceAsStream("mail.properties"))
    props.getProperty("endpoint")
  }

  def receive = {
    case msg: CamelMessage => {
      val mail: EmailMessage = msg.body.asInstanceOf[EmailMessage]
      pushBlock(mail.body)
     
    }
  }
}
------------------------------MailCollector.scala---------------------------------------------

import org.apache.log4j.{Level, Logger}
import com.algomacro.trafiki.streaming.{Dummy}
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.SparkConf
import akka.actor.Props

/*
polls email and searches for specific keywords via MailInputStream
 */

object MailCollector extends Collector {

  override def collect(ssc: StreamingContext) {
   
    val mailMessages = ssc.actorStream[String](Props(new Dummy), "Dummy")
    val formattedMessages = mailMessages.map(_.split(" "))

    val batchedStatuses = formattedMessages.window(Seconds(60), Seconds(5))
    batchedStatuses.count().print()
//do other stuff
  }

}
---------------------------Trafiki.scala-----------------------------------------------------------

import com.algomacro.trafiki.collectors.{MailCollector, TwitterCollector}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}

object Trafiki {

  def main(args: Array[String]) {

    val conf = new SparkConf().
      setMaster("local[12]").
      setAppName("Trafiki").
      setSparkHome("SPARK_HOME").
      setJars(SparkContext.jarOfClass(this.getClass)).
      set("spark.executor.memory", "512m").
      set("spark.cleaner.ttl", "3600").
      set("spark.cleaner.delay", "3600")

    val ssc = new StreamingContext(new SparkContext(conf), Seconds(1))

    MailCollector.collect(ssc)

    ssc.start()
    ssc.awaitTermination()

  }

}

-----------------------------Error stack trace----------------------------------------------------------------------
"C:\Program Files\Java\jdk1.7.0_25\bin\java" -Didea.launcher.port=7532 "-Didea.launcher.bin.path=C:\Program Files (x86)\JetBrains\IntelliJ IDEA 13.1.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.7.0_25\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\jce.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\jfxrt.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\resources.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\rt.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.7.0_25\jre\lib\ext\zipfs.jar;C:\Users\osahon\IdeaProjects\trafiki\target\classes;C:\Users\osahon\.m2\repository\org\apache\spark\spark-assembly_2.10\0.9.0-incubating\spark-assembly_2.10-0.9.0-incubating.jar;C:\Users\osahon\.m2\repository\org\apache\spark\spark-core_2.10\0.9.0-incubating\spark-core_2.10-0.9.0-incubating.jar;C:\Users\osahon\.m2\repository\org\apache\hadoop\hadoop-client\1.0.4\hadoop-client-1.0.4.jar;C:\Users\osahon\.m2\repository\org\apache\hadoop\hadoop-core\1.0.4\hadoop-core-1.0.4.jar;C:\Users\osahon\.m2\repository\xmlenc\xmlenc\0.52\xmlenc-0.52.jar;C:\Users\osahon\.m2\repository\commons-codec\commons-codec\1.4\commons-codec-1.4.jar;C:\Users\osahon\.m2\repository\org\apache\commons\commons-math\2.1\commons-math-2.1.jar;C:\Users\osahon\.m2\repository\commons-configuration\commons-configuration\1.6\commons-configuration-1.6.jar;C:\Users\osahon\.m2\repository\commons-collections\commons-collections\3.2.1\commons-collections-3.2.1.jar;C:\Users\osahon\.m2\repository\commons-lang\commons-lang\2.5\commons-lang-2.5.jar;C:\Users\osahon\.m2\repository\commons-logging\commons-logging\1.1.1\commons-logging-1.1.1.jar;C:\Users\osahon\.m2\repository\commons-digester\commons-digester\1.8\commons-digester-1.8.jar;C:\Users\osahon\.m2\repository\commons-beanutils\commons-beanutils\1.7.0\commons-beanutils-1.7.0.jar;C:\Users\osahon\.m2\repository\commons-beanutils\commons-beanutils-core\1.8.0\commons-beanutils-core-1.8.0.jar;C:\Users\osahon\.m2\repository\commons-net\commons-net\3.1\commons-net-3.1.jar;C:\Users\osahon\.m2\repository\oro\oro\2.0.8\oro-2.0.8.jar;C:\Users\osahon\.m2\repository\commons-el\commons-el\1.0\commons-el-1.0.jar;C:\Users\osahon\.m2\repository\hsqldb\hsqldb\1.8.0.10\hsqldb-1.8.0.10.jar;C:\Users\osahon\.m2\repository\org\codehaus\jackson\jackson-mapper-asl\1.8.8\jackson-mapper-asl-1.8.8.jar;C:\Users\osahon\.m2\repository\org\codehaus\jackson\jackson-core-asl\1.8.8\jackson-core-asl-1.8.8.jar;C:\Users\osahon\.m2\repository\net\java\dev\jets3t\jets3t\0.6.1\jets3t-0.6.1.jar;C:\Users\osahon\.m2\repository\commons-httpclient\commons-httpclient\3.1\commons-httpclient-3.1.jar;C:\Users\osahon\.m2\repository\org\apache\avro\avro\1.7.4\avro-1.7.4.jar;C:\Users\osahon\.m2\repository\com\thoughtworks\paranamer\paranamer\2.3\paranamer-2.3.jar;C:\Users\osahon\.m2\repository\org\xerial\snappy\snappy-java\1.0.5\snappy-java-1.0.5.jar;C:\Users\osahon\.m2\repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;C:\Users\osahon\.m2\repository\org\tukaani\xz\1.0\xz-1.0.jar;C:\Users\osahon\.m2\repository\org\slf4j\slf4j-api\1.7.5\slf4j-api-1.7.5.jar;C:\Users\osahon\.m2\repository\org\apache\avro\avro-ipc\1.7.4\avro-ipc-1.7.4.jar;C:\Users\osahon\.m2\repository\org\mortbay\jetty\jetty\6.1.26\jetty-6.1.26.jar;C:\Users\osahon\.m2\repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;C:\Users\osahon\.m2\repository\org\mortbay\jetty\servlet-api\2.5-20081211\servlet-api-2.5-20081211.jar;C:\Users\osahon\.m2\repository\org\apache\velocity\velocity\1.7\velocity-1.7.jar;C:\Users\osahon\.m2\repository\org\apache\zookeeper\zookeeper\3.4.5\zookeeper-3.4.5.jar;C:\Users\osahon\.m2\repository\org\slf4j\slf4j-log4j12\1.7.5\slf4j-log4j12-1.7.5.jar;C:\Users\osahon\.m2\repository\log4j\log4j\1.2.17\log4j-1.2.17.jar;C:\Users\osahon\.m2\repository\javax\mail\mail\1.4.7\mail-1.4.7.jar;C:\Users\osahon\.m2\repository\junit\junit\4.8.2\junit-4.8.2.jar;C:\Users\osahon\.m2\repository\org\eclipse\jetty\jetty-server\7.6.8.v20121106\jetty-server-7.6.8.v20121106.jar;C:\Users\osahon\.m2\repository\org\eclipse\jetty\orbit\javax.servlet\2.5.0.v201103041518\javax.servlet-2.5.0.v201103041518.jar;C:\Users\osahon\.m2\repository\org\eclipse\jetty\jetty-continuation\7.6.8.v20121106\jetty-continuation-7.6.8.v20121106.jar;C:\Users\osahon\.m2\repository\org\eclipse\jetty\jetty-http\7.6.8.v20121106\jetty-http-7.6.8.v20121106.jar;C:\Users\osahon\.m2\repository\org\eclipse\jetty\jetty-io\7.6.8.v20121106\jetty-io-7.6.8.v20121106.jar;C:\Users\osahon\.m2\repository\org\eclipse\jetty\jetty-util\7.6.8.v20121106\jetty-util-7.6.8.v20121106.jar;C:\Users\osahon\.m2\repository\com\google\guava\guava\11.0.2\guava-11.0.2.jar;C:\Users\osahon\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\osahon\.m2\repository\com\ning\compress-lzf\1.0.0\compress-lzf-1.0.0.jar;C:\Users\osahon\.m2\repository\org\ow2\asm\asm\4.0\asm-4.0.jar;C:\Users\osahon\.m2\repository\com\twitter\chill_2.10\0.3.1\chill_2.10-0.3.1.jar;C:\Users\osahon\.m2\repository\com\twitter\chill-java\0.3.1\chill-java-0.3.1.jar;C:\Users\osahon\.m2\repository\com\esotericsoftware\kryo\kryo\2.21\kryo-2.21.jar;C:\Users\osahon\.m2\repository\com\esotericsoftware\reflectasm\reflectasm\1.07\reflectasm-1.07-shaded.jar;C:\Users\osahon\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\osahon\.m2\repository\org\objenesis\objenesis\1.2\objenesis-1.2.jar;C:\Users\osahon\.m2\repository\org\scala-lang\scala-library\2.10.3\scala-library-2.10.3.jar;C:\Users\osahon\.m2\repository\org\ow2\asm\asm-commons\4.0\asm-commons-4.0.jar;C:\Users\osahon\.m2\repository\org\ow2\asm\asm-tree\4.0\asm-tree-4.0.jar;C:\Users\osahon\.m2\repository\org\spark-project\akka\akka-remote_2.10\2.2.3-shaded-protobuf\akka-remote_2.10-2.2.3-shaded-protobuf.jar;C:\Users\osahon\.m2\repository\org\spark-project\akka\akka-actor_2.10\2.2.3-shaded-protobuf\akka-actor_2.10-2.2.3-shaded-protobuf.jar;C:\Users\osahon\.m2\repository\com\typesafe\config\1.2.0\config-1.2.0.jar;C:\Users\osahon\.m2\repository\io\netty\netty\3.6.2.Final\netty-3.6.2.Final.jar;C:\Users\osahon\.m2\repository\org\spark-project\protobuf\protobuf-java\2.4.1-shaded\protobuf-java-2.4.1-shaded.jar;C:\Users\osahon\.m2\repository\org\uncommons\maths\uncommons-maths\1.2.2a\uncommons-maths-1.2.2a.jar;C:\Users\osahon\.m2\repository\org\spark-project\akka\akka-slf4j_2.10\2.2.3-shaded-protobuf\akka-slf4j_2.10-2.2.3-shaded-protobuf.jar;C:\Users\osahon\.m2\repository\net\liftweb\lift-json_2.10\2.5.1\lift-json_2.10-2.5.1.jar;C:\Users\osahon\.m2\repository\it\unimi\dsi\fastutil\6.4.4\fastutil-6.4.4.jar;C:\Users\osahon\.m2\repository\colt\colt\1.2.0\colt-1.2.0.jar;C:\Users\osahon\.m2\repository\concurrent\concurrent\1.3.4\concurrent-1.3.4.jar;C:\Users\osahon\.m2\repository\org\apache\mesos\mesos\0.13.0\mesos-0.13.0.jar;C:\Users\osahon\.m2\repository\com\google\protobuf\protobuf-java\2.5.0\protobuf-java-2.5.0.jar;C:\Users\osahon\.m2\repository\io\netty\netty-all\4.0.13.Final\netty-all-4.0.13.Final.jar;C:\Users\osahon\.m2\repository\com\clearspring\analytics\stream\2.4.0\stream-2.4.0.jar;C:\Users\osahon\.m2\repository\com\codahale\metrics\metrics-core\3.0.0\metrics-core-3.0.0.jar;C:\Users\osahon\.m2\repository\com\codahale\metrics\metrics-jvm\3.0.0\metrics-jvm-3.0.0.jar;C:\Users\osahon\.m2\repository\com\codahale\metrics\metrics-json\3.0.0\metrics-json-3.0.0.jar;C:\Users\osahon\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.2.2\jackson-databind-2.2.2.jar;C:\Users\osahon\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.2.2\jackson-annotations-2.2.2.jar;C:\Users\osahon\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.2.2\jackson-core-2.2.2.jar;C:\Users\osahon\.m2\repository\com\codahale\metrics\metrics-ganglia\3.0.0\metrics-ganglia-3.0.0.jar;C:\Users\osahon\.m2\repository\info\ganglia\gmetric4j\gmetric4j\1.0.3\gmetric4j-1.0.3.jar;C:\Users\osahon\.m2\repository\org\acplt\oncrpc\1.0.7\oncrpc-1.0.7.jar;C:\Users\osahon\.m2\repository\com\codahale\metrics\metrics-graphite\3.0.0\metrics-graphite-3.0.0.jar;C:\Users\osahon\.m2\repository\org\apache\spark\spark-bagel_2.10\0.9.0-incubating\spark-bagel_2.10-0.9.0-incubating.jar;C:\Users\osahon\.m2\repository\org\apache\spark\spark-mllib_2.10\0.9.0-incubating\spark-mllib_2.10-0.9.0-incubating.jar;C:\Users\osahon\.m2\repository\org\jblas\jblas\1.2.3\jblas-1.2.3.jar;C:\Users\osahon\.m2\repository\org\apache\spark\spark-repl_2.10\0.9.0-incubating\spark-repl_2.10-0.9.0-incubating.jar;C:\Users\osahon\.m2\repository\org\scala-lang\scala-compiler\2.10.3\scala-compiler-2.10.3.jar;C:\Users\osahon\.m2\repository\org\scala-lang\scala-reflect\2.10.3\scala-reflect-2.10.3.jar;C:\Users\osahon\.m2\repository\org\scala-lang\jline\2.10.3\jline-2.10.3.jar;C:\Users\osahon\.m2\repository\org\fusesource\jansi\jansi\1.4\jansi-1.4.jar;C:\Users\osahon\.m2\repository\org\slf4j\jul-to-slf4j\1.7.2\jul-to-slf4j-1.7.2.jar;C:\Users\osahon\.m2\repository\org\apache\spark\spark-streaming_2.10\0.9.0-incubating\spark-streaming_2.10-0.9.0-incubating.jar;C:\Users\osahon\.m2\repository\commons-io\commons-io\2.1\commons-io-2.1.jar;C:\Users\osahon\.m2\repository\net\sf\py4j\py4j\0.8.1\py4j-0.8.1.jar;C:\Users\osahon\.m2\repository\org\apache\hadoop\hadoop-common\2.2.0\hadoop-common-2.2.0.jar;C:\Users\osahon\.m2\repository\org\apache\hadoop\hadoop-annotations\2.2.0\hadoop-annotations-2.2.0.jar;C:\Program Files (x86)\JetBrains\IntelliJ IDEA 13.1.1\jre\lib\tools.jar;C:\Users\osahon\.m2\repository\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;C:\Users\osahon\.m2\repository\javax\servlet\servlet-api\2.5\servlet-api-2.5.jar;C:\Users\osahon\.m2\repository\com\sun\jersey\jersey-core\1.9\jersey-core-1.9.jar;C:\Users\osahon\.m2\repository\com\sun\jersey\jersey-json\1.9\jersey-json-1.9.jar;C:\Users\osahon\.m2\repository\org\codehaus\jettison\jettison\1.1\jettison-1.1.jar;C:\Users\osahon\.m2\repository\stax\stax-api\1.0.1\stax-api-1.0.1.jar;C:\Users\osahon\.m2\repository\com\sun\xml\bind\jaxb-impl\2.2.6\jaxb-impl-2.2.6.jar;C:\Users\osahon\.m2\repository\org\codehaus\jackson\jackson-jaxrs\1.8.3\jackson-jaxrs-1.8.3.jar;C:\Users\osahon\.m2\repository\org\codehaus\jackson\jackson-xc\1.8.3\jackson-xc-1.8.3.jar;C:\Users\osahon\.m2\repository\com\sun\jersey\jersey-server\1.9\jersey-server-1.9.jar;C:\Users\osahon\.m2\repository\asm\asm\3.1\asm-3.1.jar;C:\Users\osahon\.m2\repository\tomcat\jasper-compiler\5.5.23\jasper-compiler-5.5.23.jar;C:\Users\osahon\.m2\repository\tomcat\jasper-runtime\5.5.23\jasper-runtime-5.5.23.jar;C:\Users\osahon\.m2\repository\javax\servlet\jsp\jsp-api\2.1\jsp-api-2.1.jar;C:\Users\osahon\.m2\repository\org\apache\hadoop\hadoop-auth\2.2.0\hadoop-auth-2.2.0.jar;C:\Users\osahon\.m2\repository\com\jcraft\jsch\0.1.42\jsch-0.1.42.jar;C:\Users\osahon\.m2\repository\org\apache\hadoop\hadoop-mapreduce-client-core\2.2.0\hadoop-mapreduce-client-core-2.2.0.jar;C:\Users\osahon\.m2\repository\org\apache\hadoop\hadoop-yarn-common\2.2.0\hadoop-yarn-common-2.2.0.jar;C:\Users\osahon\.m2\repository\org\apache\hadoop\hadoop-yarn-api\2.2.0\hadoop-yarn-api-2.2.0.jar;C:\Users\osahon\.m2\repository\com\google\inject\extensions\guice-servlet\3.0\guice-servlet-3.0.jar;C:\Users\osahon\.m2\repository\com\google\inject\guice\3.0\guice-3.0.jar;C:\Users\osahon\.m2\repository\javax\inject\javax.inject\1\javax.inject-1.jar;C:\Users\osahon\.m2\repository\aopalliance\aopalliance\1.0\aopalliance-1.0.jar;C:\Users\osahon\.m2\repository\com\sun\jersey\jersey-test-framework\jersey-test-framework-grizzly2\1.9\jersey-test-framework-grizzly2-1.9.jar;C:\Users\osahon\.m2\repository\com\sun\jersey\jersey-test-framework\jersey-test-framework-core\1.9\jersey-test-framework-core-1.9.jar;C:\Users\osahon\.m2\repository\javax\servlet\javax.servlet-api\3.0.1\javax.servlet-api-3.0.1.jar;C:\Users\osahon\.m2\repository\com\sun\jersey\jersey-client\1.9\jersey-client-1.9.jar;C:\Users\osahon\.m2\repository\com\sun\jersey\jersey-grizzly2\1.9\jersey-grizzly2-1.9.jar;C:\Users\osahon\.m2\repository\org\glassfish\grizzly\grizzly-http\2.1.2\grizzly-http-2.1.2.jar;C:\Users\osahon\.m2\repository\org\glassfish\grizzly\grizzly-framework\2.1.2\grizzly-framework-2.1.2.jar;C:\Users\osahon\.m2\repository\org\glassfish\gmbal\gmbal-api-only\3.0.0-b023\gmbal-api-only-3.0.0-b023.jar;C:\Users\osahon\.m2\repository\org\glassfish\external\management-api\3.0.0-b012\management-api-3.0.0-b012.jar;C:\Users\osahon\.m2\repository\org\glassfish\grizzly\grizzly-http-server\2.1.2\grizzly-http-server-2.1.2.jar;C:\Users\osahon\.m2\repository\org\glassfish\grizzly\grizzly-rcm\2.1.2\grizzly-rcm-2.1.2.jar;C:\Users\osahon\.m2\repository\org\glassfish\grizzly\grizzly-http-servlet\2.1.2\grizzly-http-servlet-2.1.2.jar;C:\Users\osahon\.m2\repository\org\glassfish\javax.servlet\3.1\javax.servlet-3.1.jar;C:\Users\osahon\.m2\repository\com\sun\jersey\contribs\jersey-guice\1.9\jersey-guice-1.9.jar;C:\Users\osahon\.m2\repository\org\twitter4j\twitter4j-core\3.0.6\twitter4j-core-3.0.6.jar;C:\Users\osahon\.m2\repository\org\twitter4j\twitter4j-stream\3.0.6\twitter4j-stream-3.0.6.jar;C:\Users\osahon\.m2\repository\org\apache\camel\camel-core\2.13.0\camel-core-2.13.0.jar;C:\Users\osahon\.m2\repository\org\apache\camel\camel-mail\2.13.0\camel-mail-2.13.0.jar;C:\Users\osahon\.m2\repository\com\typesafe\akka\akka-camel_2.10\2.3.1\akka-camel_2.10-2.3.1.jar;C:\Users\osahon\.m2\repository\com\typesafe\akka\akka-actor_2.10\2.3.1\akka-actor_2.10-2.3.1.jar;C:\Users\osahon\.m2\repository\com\typesafe\akka\akka-slf4j_2.10\2.3.1\akka-slf4j_2.10-2.3.1.jar;C:\Program Files (x86)\JetBrains\IntelliJ IDEA 13.1.1\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain com.algomacro.trafiki.Trafiki
14/04/06 19:22:44 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/04/06 19:22:44 INFO Remoting: Starting remoting
14/04/06 19:22:44 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@osas:61771]
14/04/06 19:22:44 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@osas:61771]
14/04/06 19:22:44 INFO spark.SparkEnv: Registering BlockManagerMaster
14/04/06 19:22:44 INFO storage.DiskBlockManager: Created local directory at C:\Users\osahon\AppData\Local\Temp\spark-local-20140406192244-b689
14/04/06 19:22:44 INFO storage.MemoryStore: MemoryStore started with capacity 806.4 MB.
14/04/06 19:22:44 INFO network.ConnectionManager: Bound socket to port 61774 with id = ConnectionManagerId(osas,61774)
14/04/06 19:22:44 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/04/06 19:22:44 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager osas:61774 with 806.4 MB RAM
14/04/06 19:22:44 INFO storage.BlockManagerMaster: Registered BlockManager
14/04/06 19:22:44 INFO spark.HttpServer: Starting HTTP Server
14/04/06 19:22:44 INFO server.Server: jetty-7.6.8.v20121106
14/04/06 19:22:44 INFO server.AbstractConnector: Started SocketC...@0.0.0.0:61775
14/04/06 19:22:44 INFO broadcast.HttpBroadcast: Broadcast server started at http://192.168.56.1:61775
14/04/06 19:22:44 INFO spark.SparkEnv: Registering MapOutputTracker
14/04/06 19:22:44 INFO spark.HttpFileServer: HTTP File server directory is C:\Users\osahon\AppData\Local\Temp\spark-152a33ab-250d-4a3f-acf9-4abe03db970f
14/04/06 19:22:44 INFO spark.HttpServer: Starting HTTP Server
14/04/06 19:22:44 INFO server.Server: jetty-7.6.8.v20121106
14/04/06 19:22:44 INFO server.AbstractConnector: Started SocketC...@0.0.0.0:61776
14/04/06 19:22:45 INFO server.Server: jetty-7.6.8.v20121106
14/04/06 19:22:45 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/04/06 19:22:45 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/04/06 19:22:45 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/04/06 19:22:45 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/04/06 19:22:45 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/04/06 19:22:45 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/04/06 19:22:45 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/04/06 19:22:45 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/04/06 19:22:45 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/04/06 19:22:45 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/04/06 19:22:45 INFO server.AbstractConnector: Started SelectChann...@0.0.0.0:4040
14/04/06 19:22:45 INFO ui.SparkUI: Started Spark Web UI at http://osas:4040
14/04/06 19:22:50 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-3] shutting down ActorSystem [spark]
java.lang.NoSuchMethodError: akka.util.Helpers$.ConfigOps(Lcom/typesafe/config/Config;)Lcom/typesafe/config/Config;
at akka.camel.CamelSettings.<init>(Camel.scala:67)
at akka.camel.internal.DefaultCamel.<init>(DefaultCamel.scala:43)
at akka.camel.CamelExtension$.createExtension(Camel.scala:138)
at akka.camel.CamelExtension$.createExtension(Camel.scala:132)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:654)
at akka.actor.ExtensionId$class.apply(Extension.scala:79)
at akka.camel.CamelExtension$.apply(Camel.scala:132)
at akka.camel.CamelSupport$class.$init$(CamelSupport.scala:17)
at com.algomacro.trafiki.streaming.Dummy.<init>(Dummy.scala:58)
at com.algomacro.trafiki.collectors.MailCollector$$anonfun$1.apply(MailCollector.scala:21)
at com.algomacro.trafiki.collectors.MailCollector$$anonfun$1.apply(MailCollector.scala:21)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
at akka.actor.Props.newActor(Props.scala:339)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/04/06 19:22:50 ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
at akka.actor.ActorCell.create(ActorCell.scala:578)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at akka.util.Reflect$.instantiate(Reflect.scala:65)
at akka.actor.Props.newActor(Props.scala:337)
at akka.actor.ActorCell.newActor(ActorCell.scala:534)
at akka.actor.ActorCell.create(ActorCell.scala:560)
... 9 more
Caused by: akka.actor.InvalidActorNameException: actor name [camel-supervisor] is not unique!
at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:338)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:186)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
at akka.actor.ActorCell.attachChild(ActorCell.scala:338)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518)
at akka.camel.internal.DefaultCamel.<init>(DefaultCamel.scala:30)
at akka.camel.CamelExtension$.createExtension(Camel.scala:138)
at akka.camel.CamelExtension$.createExtension(Camel.scala:132)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:654)
at akka.actor.ExtensionId$class.apply(Extension.scala:79)
at akka.camel.CamelExtension$.apply(Camel.scala:132)
at akka.camel.CamelSupport$class.$init$(CamelSupport.scala:17)
at akka.camel.internal.CamelSupervisor.<init>(CamelSupervisor.scala:22)
... 17 more
14/04/06 19:22:50 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
14/04/06 19:22:50 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
14/04/06 19:22:50 INFO Remoting: Remoting shut down
14/04/06 19:22:50 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
14/04/06 19:22:50 WARN storage.BlockManagerMaster: Error sending message to BlockManagerMaster in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://spark/user/BlockManagerMaster#-961751796]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:161)
at org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:52)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:97)
at org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:135)
at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80)
at akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.run(Scheduler.scala:464)
at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:281)
at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:280)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at akka.actor.LightArrayRevolverScheduler.close(Scheduler.scala:279)
at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:630)
at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply$mcV$sp(ActorSystem.scala:582)
at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582)
at akka.actor.ActorSystemImpl$$anonfun$_start$1.apply(ActorSystem.scala:582)
at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:596)
at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.runNext$1(ActorSystem.scala:750)
at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply$mcV$sp(ActorSystem.scala:753)
at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746)
at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:746)
at akka.util.ReentrantGuard.withGuard(LockUtil.scala:15)
at akka.actor.ActorSystemImpl$TerminationCallbacks.run(ActorSystem.scala:746)
at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593)
at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:593)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception in thread "main" java.lang.IllegalStateException: cannot create children while terminating or terminated
at akka.actor.dungeon.Children$class.makeChild(Children.scala:184)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
at akka.actor.ActorCell.attachChild(ActorCell.scala:338)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518)
at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:65)
at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:410)
at com.algomacro.trafiki.Trafiki$.main(Trafiki.scala:25)
at com.algomacro.trafiki.Trafiki.main(Trafiki.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

Process finished with exit code 1

Just in case I am including my pom.xml as I am also not sure if there is a conflict in libraries

---------------------------------------pom.xml-------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.algomacro.trafiki</groupId>
    <artifactId>trafiki</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>Trafiki App</name>
    <repositories>
        <repository>
            <id>Akka repository</id>
            <url>http://repo.akka.io/releases</url>
        </repository>
        <repository>
            <id>central</id>
            <url>http://repo1.maven.org/maven2/</url>
        </repository>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>
    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <configuration>
                    <mainClass>com.algomacro.trafiki.Trafiki</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <id>compile</id>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <phase>compile</phase>
                    </execution>
                    <execution>
                        <id>test-compile</id>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                        <phase>test-compile</phase>
                    </execution>
                    <execution>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.1</version>
                <configuration>
                    <mainClass>com.algomacro.trafiki.Trafiki</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-assembly_2.10</artifactId>
            <version>0.9.0-incubating</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-core</artifactId>
            <version>3.0.6</version>
        </dependency>
        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-stream</artifactId>
            <version>3.0.6</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>2.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-mail</artifactId>
            <version>2.13.0</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-camel_2.10</artifactId>
            <version>2.3.1</version>
        </dependency>
    </dependencies>

</project>
-------------------------------------------------------------------------------------------------

Thank you so much again, this is rather urgent project any help is highly appreciated


Osas



√iktor Ҡlang

unread,
Apr 6, 2014, 3:07:36 PM4/6/14
to Akka User List

You have a java.lang.NoSuchMethodError, in 99% of cases you have a binary compatibility problem.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Osahon Osabuohien

unread,
Apr 7, 2014, 11:43:37 AM4/7/14
to akka...@googlegroups.com
Hey thanks... I needed to ensure my binaries are compatible with 2.10.2.2.3 release.
<span class="Apple-tab-span" style="
...
Reply all
Reply to author
Forward
0 new messages