Cannot access objects declared and initialized outside the map() method of JavaRDD and JavaDStream

3,183 views
Skip to first unread message

jayati tiwari

unread,
May 15, 2013, 11:11:14 AM5/15/13
to spark...@googlegroups.com

Hello,

I am facing a common issue in a Spark application and a Spark Streaming application.

I need to use an object created outside the map() like the following

SampleClass obj = new SampleClass(list of args);
obj = .... some initialization via a fuction call

someStreamObj.map(
                new Function<double[], String>() {
                        ..
                        ..
                       // need to access "obj" here, but cannot
                       ..
                       ..
                }

I understand that since in map() we are creating a "new Function()" we won't be able to access an external object. So I tried to create this object inside the map() function so that it is accessible in the call() of Funciton() like

someStreamObj.map(
                SampleClass obj = new SampleClass(list of args);
                obj = .... some initialization via a fuction call
                new Function<double[], String>() {
                        ..
                        ..
                       // need to access "obj" here, but cannot
                       ..
                       ..
                }

Now, with this I faced two issues:

1. The "list of args" in "SampleClass obj = new SampleClass(list of args);" includes some command line parameters which are accessible only in main() of the class. So, I cannot create this object without them.
2. Even, if I create the object with hard coded values(just for test, since practically it won't be possible), the other issue comes up. In the "obj = .... some initialization via a fuction call" I need to create a JavaSparkContext object which won't get created since we cannot have multiple JavaSparkContext objects in a single application. So, due to multiple attempts that would be made to create those objects upon deploy it fails with the error below :

Caused by: org.jboss.netty.channel.ChannelException: Failed to bind to: /192.168.145.194:44364
    at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:298)
    at akka.remote.netty.NettyRemoteServer.start(Server.scala:53)
    at akka.remote.netty.NettyRemoteTransport.start(NettyRemoteSupport.scala:89)
    at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:94)
    at akka.actor.ActorSystemImpl._start(ActorSystem.scala:588)
    at akka.actor.ActorSystemImpl.start(ActorSystem.scala:595)
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
    at spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:51)
    at spark.SparkEnv$.createFromSystemProperties(SparkEnv.scala:68)
    at spark.SparkContext.<init>(SparkContext.scala:84)
    at spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:38)
    at spark.kmeans.clustering.KMeansStreamClustering$1.<init>(KMeansStreamClustering.java:127)
    at spark.kmeans.clustering.KMeansStreamClustering.main(KMeansStreamClustering.java:126)
    ... 6 more
Caused by: java.net.BindException: Address already in use
    at sun.nio.ch.Net.bind(Native Method)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:126)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.bind(NioServerSocketPipelineSink.java:138)
    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleServerSocket(NioServerSocketPipelineSink.java:90)
    at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:64)
    at org.jboss.netty.channel.Channels.bind(Channels.java:569)
    at org.jboss.netty.channel.AbstractChannel.bind(AbstractChannel.java:187)
    at org.jboss.netty.bootstrap.ServerBootstrap$Binder.channelOpen(ServerBootstrap.java:343)
    at org.jboss.netty.channel.Channels.fireChannelOpen(Channels.java:170)
    at org.jboss.netty.channel.socket.nio.NioServerSocketChannel.<init>(NioServerSocketChannel.java:80)
    at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:158)
    at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:86)
    at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:277)
    ... 18 more

I also tried to create a JavaPairRDD<double[], SampleObj> but for this too, I had to go through a map() like:

JavaPairRDD<double[], SampleObj> closest = inputStream.map(
                new PairFunction<double[], SampleObj, double[]>() {
                  @Override
                  public Tuple2<double[], SampleObj> call(double[] point) throws Exception {
                    return new Tuple2<double[], SampleObj>(
                      point, obj);  // but here the obj which has been declared & initialized outside the map function was again not accessible
                  }
                }
              );

Quite stuck up with this. Can you please suggest, what can be a solution?

Regards,
Jayati Tiwari

Josh Rosen

unread,
May 15, 2013, 1:26:35 PM5/15/13
to spark...@googlegroups.com
On Wed, May 15, 2013 at 8:11 AM, jayati tiwari <tiwari...@gmail.com> wrote:

Hello,

I am facing a common issue in a Spark application and a Spark Streaming application.

I need to use an object created outside the map() like the following

SampleClass obj = new SampleClass(list of args);
obj = .... some initialization via a fuction call

someStreamObj.map(
                new Function<double[], String>() {
                        ..
                        ..
                       // need to access "obj" here, but cannot
                       ..
                       ..
                }

I understand that since in map() we are creating a "new Function()" we won't be able to access an external object.

Why not?  Did you try this and run into an error?

You wouldn't be able to access "obj" if your Function was defined as a static class, but it should work since you've defined your map function as an anonymous inner class.

Some standard Java caveats apply: you will have to declare the variables referenced inside your function as "final" (see http://fishbowl.pastiche.org/2003/05/16/closures_and_java_a_tutorial/ for some discussion of this).

Also, note that although you may be able to mutate the state of "obj" inside of your map function, the updated "obj" will only be visible to that mapper (your changes won't be communicated to other machines or the master).
 
So I tried to create this object inside the map() function so that it is accessible in the call() of Funciton() like

someStreamObj.map(
                SampleClass obj = new SampleClass(list of args);
                obj = .... some initialization via a fuction call
                new Function<double[], String>() {
                        ..
                        ..
                       // need to access "obj" here, but cannot
                       ..
                       ..
                }

General note: if you need access to some sort of "helper" object in your map() function and you need to construct that object inside of the map() function, it's going to be much more efficient to use mapPartitions() so that the object is constructed once per partition instead of once per record.
 

Now, with this I faced two issues:

1. The "list of args" in "SampleClass obj = new SampleClass(list of args);" includes some command line parameters which are accessible only in main() of the class. So, I cannot create this object without them.
2. Even, if I create the object with hard coded values(just for test, since practically it won't be possible), the other issue comes up. In the "obj = .... some initialization via a fuction call" I need to create a JavaSparkContext object which won't get created since we cannot have multiple JavaSparkContext objects in a single application. So, due to multiple attempts that would be made to create those objects upon deploy it fails with the error below :

You shouldn't create SparkContext/JavaSparkContext instances on the worker machines / inside of map() functions.

Another tip: let's say that you have some heavyweight object that has state that you want to read in your mappers plus a bunch of other data that isn't serializable or that you don't need to read in your mapper.  You can create a local variable that references the subset of the state that you actually read, then access that in your map() function.

e.g.:

MyHugeObject data = ....
SmallerObject dataOfInterest = data.someField;
rdd.map(new Function<...> {
   ... dataOfInterest(...)


(This same trick applies in Python, too: PySpark's rdd.py file does this to prevent certain un-pickle-able objects from being added to system-generated closures).


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

jayati tiwari

unread,
May 17, 2013, 10:04:44 AM5/17/13
to spark...@googlegroups.com
Thanks for your reply Josh.

Based upon your reply, I checked my code and found that I am trying to access a final variable only in the map() which is causing a problem. Upon deeper study I found that its not a problem with accessing an externally declared variable rather the type of the variable that is being accessed.

To demonstrate the problem I have a attached a sample java file which is trying to access a "final" object of "org.apache.mahout.math.Vector" type which has been created outside the map().

In this scenario, the application is unable to create output files at "/home/test/resultsDir/" on the slave nodes of the cluster but they are created at the same location on the master node.

Just as I remove the statement

outputFile.println("The vector object is: " + vec);

from the map() function, it starts creating output files on all three nodes of the cluster.

I am also attaching the source file of "org.apache.mahout.math.Vector" class which is a class in mahout-distribution-0.5

and "Cloneable" is a  "java.lang" interface with the following source

public interface Cloneable {
}


Can you please look into this and provide some pointers on what could be the reason of such a behavior?

Thanks,
Jayati Tiwari
Test.java
Vector.java

Josh Rosen

unread,
May 17, 2013, 12:06:51 PM5/17/13
to spark...@googlegroups.com
I tried running your job locally and found the problem in the logs:

13/05/17 09:00:06 ERROR local.LocalScheduler: Exception in task 0
java.io.NotSerializableException: org.apache.mahout.math.RandomAccessSparseVector
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
[…]
at spark.scheduler.Task$.serializeWithDependencies(Task.scala:58)
[…]

Your job is crashing when Spark is trying to serialize tasks.  RandomAccessSparseVector isn't Serializable, so Spark is unable to serialize tasks whose closures contain RandomAccessSparseVector instances.

<Test.java><Vector.java>

Reply all
Reply to author
Forward
0 new messages