KryoSerializer exception in Spark Streaming JAVA

472 views
Skip to first unread message

mudassa...@northbaysolutions.net

unread,
Oct 13, 2014, 7:18:37 AM10/13/14
to kryo-...@googlegroups.com
Hi,

I'm implementing KryoSerializer for my custom class. Here is class

public class ImpressionFactsValue implements KryoSerializable {

        private int hits;
       
        public ImpressionFactsValue() {
               
        }
         
        public int getHits() {
                return hits;
        }

        public void setHits(int hits) {
                this.hits = hits;
        }
        public void read(Kryo kryo, Input input) {
                 this.hits = input.readInt();    
             //kryo.readClass(input);
        }

        public void write(Kryo arg0, Output output) {
                 output.writeInt(this.hits);    
             //arg0.writeClass(output, ImpressionFactsValue.class);
             
        }
}

Here is my KryoRegistrator


class MyRegistrator implements KryoRegistrator {

        public void registerClasses(Kryo kryo) {
                kryo.register(ImpressionFactsValue.class);
        }
 
}

Here is spark conf

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");                                          
conf.set("spark.kryo.registrator", "com.MyRegistrator");

Getting exception

14/10/01 17:15:38 ERROR serializer.KryoSerializer: Failed to run spark.kryo.registrator
java.lang.IllegalAccessException: Class org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2 can not access a member of class com.MyRegistrator with modifiers ""
        at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:95)
        at java.lang.Class.newInstance0(Class.java:366)
        at java.lang.Class.newInstance(Class.java:325)
        at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:78)
        at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:76)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:76)
        at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:133)
        at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:95)
        at org.apache.spark.util.collection.ExternalAppendOnlyMap.<init>(ExternalAppendOnlyMap.scala:109)
        at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:57)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
        at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:594)
        at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:594)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.Task.run(Task.scala:51)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)

Please help to resolve this

Martin Grigorov

unread,
Oct 13, 2014, 7:22:56 AM10/13/14
to kryo-...@googlegroups.com
Hi,

On Mon, Oct 13, 2014 at 2:18 PM, <mudassa...@northbaysolutions.net> wrote:
Hi,

I'm implementing KryoSerializer for my custom class. Here is class

public class ImpressionFactsValue implements KryoSerializable {

        private int hits;
       
        public ImpressionFactsValue() {
               
        }
         
        public int getHits() {
                return hits;
        }

        public void setHits(int hits) {
                this.hits = hits;
        }
        public void read(Kryo kryo, Input input) {
                 this.hits = input.readInt();    
             //kryo.readClass(input);
        }

        public void write(Kryo arg0, Output output) {
                 output.writeInt(this.hits);    
             //arg0.writeClass(output, ImpressionFactsValue.class);
             
        }
}

Here is my KryoRegistrator


class MyRegistrator implements KryoRegistrator {

Since this is Java class you need to give it 'public' visibility explicitly: public class ...
The error mesage says that Spark cannot instantiate it because of the reduced visibility. 

--
You received this message because you are subscribed to the "kryo-users" group.
http://groups.google.com/group/kryo-users
---
You received this message because you are subscribed to the Google Groups "kryo-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to kryo-users+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

mudassa...@northbaysolutions.net

unread,
Oct 13, 2014, 9:29:14 AM10/13/14
to kryo-...@googlegroups.com
Hi Martin,

Thank you for reply. It's working now.

Lei Hu

unread,
Dec 11, 2014, 10:16:31 PM12/11/14
to kryo-...@googlegroups.com
I had the same issue today and make it public would solve the problem. Did the Kryo improve the performance for you? since I did use Kryo, but I don't see any obvious improvement.

thanks

Lei
Reply all
Reply to author
Forward
0 new messages