Hello,
I am facing a common issue in a Spark application and a Spark Streaming application.
I need to use an object created outside the map() like the following
SampleClass obj = new SampleClass(list of args);
obj = .... some initialization via a fuction call
someStreamObj.map(
new Function<double[], String>() {
..
..
// need to access "obj" here, but cannot
..
..
}
I understand that since in map() we are creating a "new Function()" we won't be able to access an external object. So I tried to create this object inside the map() function so that it is accessible in the call() of Funciton() like
someStreamObj.map(
SampleClass obj = new SampleClass(list of args);
obj = .... some initialization via a fuction call
new Function<double[], String>() {
..
..
// need to access "obj" here, but cannot
..
..
}
Now, with this I faced two issues:
1. The "list of args" in "SampleClass obj = new SampleClass(list of args);" includes some command line parameters which are accessible only in main() of the class. So, I cannot create this object without them.
2. Even, if I create the object with hard coded values(just for test, since practically it won't be possible), the other issue comes up. In the "obj = .... some initialization via a fuction call" I need to create a JavaSparkContext object which won't get created since we cannot have multiple JavaSparkContext objects in a single application. So, due to multiple attempts that would be made to create those objects upon deploy it fails with the error below :
Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: /
192.168.145.194:44364 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:298)
at akka.remote.netty.NettyRemoteServer.start(Server.scala:53)
at akka.remote.netty.NettyRemoteTransport.start(NettyRemoteSupport.scala:89)
at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:94)
at akka.actor.ActorSystemImpl._start(ActorSystem.scala:588)
at akka.actor.ActorSystemImpl.start(ActorSystem.scala:595)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
at spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:51)
at spark.SparkEnv$.createFromSystemProperties(SparkEnv.scala:68)
at spark.SparkContext.<init>(SparkContext.scala:84)
at spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:38)
at spark.kmeans.clustering.KMeansStreamClustering$1.<init>(KMeansStreamClustering.java:127)
at spark.kmeans.clustering.KMeansStreamClustering.main(KMeansStreamClustering.java:126)
... 6 more
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:126)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.bind(NioServerSocketPipelineSink.java:138)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleServerSocket(NioServerSocketPipelineSink.java:90)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:64)
at org.jboss.netty.channel.Channels.bind(Channels.java:569)
at org.jboss.netty.channel.AbstractChannel.bind(AbstractChannel.java:187)
at org.jboss.netty.bootstrap.ServerBootstrap$Binder.channelOpen(ServerBootstrap.java:343)
at org.jboss.netty.channel.Channels.fireChannelOpen(Channels.java:170)
at org.jboss.netty.channel.socket.nio.NioServerSocketChannel.<init>(NioServerSocketChannel.java:80)
at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:158)
at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:86)
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:277)
... 18 more
I also tried to create a JavaPairRDD<double[], SampleObj> but for this too, I had to go through a map() like:
JavaPairRDD<double[], SampleObj> closest = inputStream.map(
new PairFunction<double[], SampleObj, double[]>() {
@Override
public Tuple2<double[], SampleObj> call(double[] point) throws Exception {
return new Tuple2<double[], SampleObj>(
point, obj); // but here the obj which has been declared & initialized outside the map function was again not accessible
}
}
);
Quite stuck up with this. Can you please suggest, what can be a solution?
Regards,
Jayati Tiwari